Flink CEP and ML:
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events. Machine Learning helps with utilizing patterns discovered in events to make predictions. Together ML and FlinkCEP can improve analytics and predictions.
This article describes some of those possibilities. We begin with the CEP’s Pattern API, which allows us to specify the patterns that we want to detect in the stream. This can be used to predict with matching event sequences.
A pattern is a sequence described by the invariants for beginning, middle and end. For example,
DataStream<Event> input = …
Pattern<Event, ?> pattern = Pattern<Event>
 .begin(“start”).where(predicate)
 .next(“middle”).subType(SubEvent.class).where(predicate)
 .followedBy(“end”).where(predicate);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(patternProcessFunction);
start.oneOrMore().greedy();
The patternProcessFunction is where we plugin in ML api involved processing such as classifying, grouping, ranking or sorting
The syntax involving one or more occurrences and greedily looking for repetitions is the one involving pattern detection.
Predicates are specified with conditions where an event is either accepted or rejected. This is inlined in the predicate for each filter. If we want historical evaluation of all events for this predicate, the actual acceptance or rejection of those events occurs with the call to ctx.getEventsForPattern(“patternName”) which may be quite expensive given the data for the stream processing. 
The way to describe complicated or conjunction-based pattern predicates is described in the CEP’s Pattern API documentation. Instead, we look at some of the ML routines.
from recommendations import patterns
 recommendations.sim_distance(recommendations.patterns, ... stream1, stream2)
This applies to StreamCuts just as much as it applies to Streams.  
Also, the ability to find similarities between streams or StreamCuts allows us to use clustering to form groups.
public class KMeansSample {
    public static void main(String[] args) throws Exception {
        File file =  StreamHandler.writeToFile(StreamHelper.resolve("sequences"));
        Dataset data = FileHandler.loadDataset(file, 4, ",");
        Clusterer km = new KMeans();
        Dataset[] clusters = km.cluster(data);
        System.out.println("Cluster count: " + clusters.length);
    }
}
No comments:
Post a Comment