Thursday, May 28, 2020

Application troubleshooting guide

Application debugging, job submission result and logging are not interactive during stream processing. Consequently, application developers rely on runtime querying and state persistence. A few other options are listed here:
Use of side outputs
When the events are processed from an infinite sequence in the main stream, the results can be stored in one or more additional output streams. This resulting stream is called side output. The type of events stored in a side output does not have to be the same type as the main stream. 
The side output stream is denoted with the OutputTag as shown here:
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
The side-output stream is retrieved using the getSideOutput(OutputTag) method on the result of the DataStream operation.
o Testing:
Testing of user-defined functions is made possible with mocks. The Flink collection comes with the so-called test harnesses such as OneInputStreamOperatorTestHarness for operator on DataStreams, KeyedOneInputStreamOperatorTestHarness for operators on KeyedStream, TwoInputStreamOperatorTestHarness for operators of ConnectedDataStreams of two DataStreams, KeyedTwoInputStreamOperatorTestHarness for operators on ConnectedStreams of two KeyedStreams
The ProcessFunction is the most common method used for processing the events in a stream. This can be unit-tested with the ProcessFunctionTestHarnesses. The PassThroughProcessFunction along with the previously mentioned types of DataStreams come useful to invoke the processElement method on the ProcessFunctionTestHarness. The extractOutputValues method on the harness comes useful to retrieve the list of emitted records for assertions.

Deployments:
Stream Store and analytics automates deployments of applications written using Flink.  This includes options for 
o Authentication and authorization
o Stream store metrics
o High availability
o State and fault tolerance via state backends
o Configuration including memory configuration and 
o All of the production readiness checklist, that includes:
Setting an explicit max parallelism
Setting UUID for all operators
Choosing the right State backend
Configuring high availability for job managers
Debugging and monitoring:
o Flink Applications have support for extensible user metrics in addition to System metrics. Although not all of these might be exposed via the stream and store analytics user interface, the applications can register metric types such as Counters, guages, Histograms and meters.
o Log4j and Logback can be configured with the appropriate log levels to emit log entries for the appropriate operators. These logs can also be collected continuously as the system makes progress.


Wednesday, May 27, 2020

Some more Flink CEP +ML examples

FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events. Machine Learning helps with utilizing patterns discovered in events to make predictions. Together ML and FlinkCEP can improve analytics and predictions.
public static void main(String[] args) throws Exception {
 
        File file =  StreamHandler.writeToFile(StreamHelper.resolve("sequences"));
        Dataset data = FileHandler.loadDataset(file, 4, ",");

// ensemble feature selection
        RecursiveFeatureEliminationSVM[] svmrfes = new RecursiveFeatureEliminationSVM[10];
        for (int i = 0; i < svmrfes.length; i++)
            svmrfes[i] = new RecursiveFeatureEliminationSVM(0.2);
        LinearRankingEnsemble ensemble = new LinearRankingEnsemble(svmrfes);
        ensemble.build(data);
        for (int i = 0; i < ensemble.noAttributes(); i++)
            System.out.println(ensemble.rank(i));

        // feature scoring
        GainRatio ga = new GainRatio();
        ga.build(data);
        for (int i = 0; i < ga.noAttributes(); i++)
            System.out.println(ga.score(i));

         // feature ranking
        RecursiveFeatureEliminationSVM svmrfe = new RecursiveFeatureEliminationSVM(0.2);
        svmrfe.build(data);
        for (int i = 0; i < svmrfe.noAttributes(); i++)
            System.out.println(svmrfe.rank(i)); 
    }
 
}




Tuesday, May 26, 2020

Flink CEP and ML:
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events. Machine Learning helps with utilizing patterns discovered in events to make predictions. Together ML and FlinkCEP can improve analytics and predictions.
This article describes some of those possibilities. We begin with the CEP’s Pattern API, which allows us to specify the patterns that we want to detect in the stream. This can be used to predict with matching event sequences.
A pattern is a sequence described by the invariants for beginning, middle and end. For example,
DataStream<Event> input = …
Pattern<Event, ?> pattern = Pattern<Event>
 .begin(“start”).where(predicate)
 .next(“middle”).subType(SubEvent.class).where(predicate)
 .followedBy(“end”).where(predicate);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(patternProcessFunction);
start.oneOrMore().greedy();
The patternProcessFunction is where we plugin in ML api involved processing such as classifying, grouping, ranking or sorting
The syntax involving one or more occurrences and greedily looking for repetitions is the one involving pattern detection.
Predicates are specified with conditions where an event is either accepted or rejected. This is inlined in the predicate for each filter. If we want historical evaluation of all events for this predicate, the actual acceptance or rejection of those events occurs with the call to ctx.getEventsForPattern(“patternName”) which may be quite expensive given the data for the stream processing. 
The way to describe complicated or conjunction-based pattern predicates is described in the CEP’s Pattern API documentation. Instead, we look at some of the ML routines.
from recommendations import patterns
 recommendations.sim_distance(recommendations.patterns, ... stream1, stream2)
This applies to StreamCuts just as much as it applies to Streams.  
Also, the ability to find similarities between streams or StreamCuts allows us to use clustering to form groups.
public class KMeansSample {
 
    public static void main(String[] args) throws Exception {
 
        File file =  StreamHandler.writeToFile(StreamHelper.resolve("sequences"));
        Dataset data = FileHandler.loadDataset(file, 4, ",");
        Clusterer km = new KMeans();
        Dataset[] clusters = km.cluster(data);
        System.out.println("Cluster count: " + clusters.length);
 
    }
 
}




Sunday, May 24, 2020

A typical stream processing program would look like this: 

    public static void main(String argv[]) throws Exception { 

    final ParameterTool params = ParameterTool.fromArgs(argv); 

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    env.getConfig().setGlobalJobParameters(params); 

        String scope = Constants.DEFAULT_SCOPE; 

        String streamName = Constants.DEFAULT_STREAM_NAME; 

        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv)); 

        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORDConstants.DEFAULT_USERNAME)); 

        StreamConfiguration streamConfig = StreamConfiguration.builder() 

                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS)) 

                .build(); 

  

        logger.info("001- creating stream"); 

        Stream stream = pravegaConfig.resolve(streamName); 

  

        logger.info("002- adding data"); 

        DataStream<String> input = env.fromCollection(snippets); 

        snippets.add("data1"); 

        snippets.add("data2"); 

        Snippets.add("data3"); 

        input.print(); 

  

        logger.info("003- iterate over data"); 

        IterativeStream<String> iteration = input.iterate(); 

        iteration.withFeedbackType(String.class); 

        List<String> entries = new ArrayList<>(); 

        iteration.map(t -> {entries.add(t); return t;}); 

        iteration.closeWith(mapped); 

  

        logger.info("004 - creating a writer to write to stream"); 

        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder() 

            .withPravegaConfig(pravegaConfig) 

            .forStream(stream) 

                .withEventRouter(new PravegaEventRouter<String >() { 

                    @Override 

                    public String getRoutingKey(String e) { 

                        return e; 

                    } 

                }) 

            .withSerializationSchema(PravegaSerialization.serializationFor(String.class)); 

        builder.enableWatermark(true); 

        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE); 

        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build(); 

        input.addSink(flinkPravegaWriter); 

       

        logger.info("005 - creating a reader to read from stream"); 

        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder() 

                .withPravegaConfig(pravegaConfig) 

                .forStream(stream) 

                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class)) 

                .build(); 

  

        logger.info("006 - reading events from stream"); 

        DataStream<String> eventsRead = env 

                    .addSource(flinkPravegaReader) 

                    .name("eventsRead"); 

        IterativeStream<String> it = eventsRead.iterate(); 

        List<String> dataList = new ArrayList<>(); 

        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;}); 

        logger.info("count of events = {}", dataList.size()); 

        it.closeWith(newEvents); 

  

        logger.info("007- done"); 

        env.execute("Stream Writer"); 

    } 

 


The application maintenance requirements for these applications involve the following:
1) Application data: Dat adoes not only come from stream store. IT comes from files and collections as well. This means the program must have access to those data stores.
2) Application object model: Application should be conveniently developed using those data sources. It should not require standalone development prior to integration.
3) Application Core components: The application should be convenient for the administrator and developer alike to turn on or off the above mentioned interaction with other components. 
4) Application Services: This kind of application works over infinite data as opposed to conventional applciations. This means the APIs should support runtime performance query. 
5)  Application UI: This organizes the view models and the views so that the business objects can be modified to enable the workflows required from the application. The properties on the UI are defined usually by declarative markup in views.  
6) Application Exception handling: The entire application handles exceptions at each layer and across layers. Usually applications don't allow unhandled exceptions to propagate to the user. Messages are translated into what's meaningful to the user. 
7) Application Logging: Application logging via standard application blocks or other utilities is necessary both for development as well as production support. 
8) Application user access: controlling who has access to what is another important aspect of design for an application. Applications tend to handle security as close to the entry points as possible. The entry points typically have a login that comprises of authentication, authorization lines that need to be crossed before the user has access. 
9) Application performance: Application design requires consideration for performance and often include fast code paths in addition to general more expensive ones as well as external tools or frameworks such as caching. 
10) Application messaging: Service bus or some messaging framework is involved to send messages between application and its dependencies. These could be other services or other data providers. These could also be external gateway when there are heterogeneous systems involved.