Monday, December 23, 2019

Sample program to extract Exception from logs as stream


public class LogExceptionExtractor {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
            .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
            .build();

logger.info("001- creating stream");
     DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
                .flatMap(new ExceptionTimestampExtractor())
                .keyBy(0)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
                .allowedLateness(Time.seconds(4))
                .process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
                        if (getRuntimeContext() == null) {
                            setRuntimeContext((RuntimeContext) new LogException());
                        }
                        LogException logException = (LogException) getRuntimeContext();
                        for(Tuple2<String, String> item : iterable) {
                            logException.setTimestamp(item._1);
                            logException.getData().add(item._2);
                        }
                        collector.collect(logException);
                    }
                });

        FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream((Stream) exceptionDataStream)
            .withEventRouter(new LogExceptionRouter())
            .withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));


        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
        exceptionDataStream.addSink(flinkPravegaWriter);
        env.execute("Stream Writer");
    }

}
The above method works partly due to the parallelization based on key

Sunday, December 22, 2019


In this article, we continue with the discussion on stream processing with Flink and look into the use of windows. An infinite stream can only be processed if it were split into finite size. A window refers to this split and can have a size dependent on one or the other factors. A programmer generally refers to the use of windows with the help of syntax to define the split. In the case of keyed streams, she uses “keyBy” operator followed by “window” and in the case of non-keyed streams, she uses “windowAll”
The keyed stream helps with parallelizing the stream tasks.
There are four types of windows:
1)      Tumbling window: A tumbling window assigns each element to a window of a specified window size as with the code shown
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
2)      Sliding window: this window allows overlapping with other windows even though the size of the window is fixed as per the previous definition. An additional window slide parameter dictates how often the next window comes up.
3)      Session window: this window assigns elements by group elements by activity. The window does not have a fixed size and do not overlap The number of elements within a window can vary and the gap between the windows is called session gap.
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
    // determine and return session gap
}))
4)      Global window: This is a singleton spanning all events with the same key into the same window. Since it does not have a beginning or an end, no processing can be done unless there is a trigger specified. A trigger determines when a window is ready.
The computation on a window is performed with the help of a window function which can be one of ReduceFunction, AggregateFunction, FoldFunction, or ProcessingWindowFunction. Flink incrementally aggregates the elements for each window as they arrive for the first two window functions. The ProcessingWindowFunction requires buffering but it returns an iterator with metadata about the window. A fold function determines how each input value affects the output value and is called once for each element as it is added to the window.
The context object holds the state which are of two types:
1)      globalState()
2)      windowState()
Both allow access to the keyed state but the former is not scoped to a window while the latter is scoped to a window.

Saturday, December 21, 2019

Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination
When we mentioned the parallelism, we brought up keyBy operator. Let us look at the details of this method. It partitions the stream on the keyBy attribute and the windows are computed per key. A window operates on a finite set of elements a tumbling window has fixed boundaries.
For example,
stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

 empty window does not trigger a computation

The keyBy is called for keyed streams and the windowAll is called for non-keyed streams
The notion behind keyed streams is that all of the computations of a windowed stream can be executed in parallel. Each logical keyed stream is independent. Elements with the same key goto the same task.

Friday, December 20, 2019


The repackaging of source code and its unintended consequences:
This article is written with the findings from an investigation that I participated in and is still ongoing. It does not explain everything. The investigation has not concluded yet because the root cause has not been found yet. I have not heard any other similar case so far and I have had difficulty in outlining the defect but the learnings from the efforts stand clear.
A specific java code was packaged as a jar because it had a very narrow purpose and needed to be reused with applications from different sources.  This jar happened to be a fat jar which is a term to say that it contained all the dependencies needed to run its code. The jar worked well whenever the applications were run. However, it was inconvenient to copy it to each repository manually. The time-tested way of bringing in jars like these has been to use one or the other distribution repository.
The code was therefore forked which is a term used to denote another location where a copy of the code would be modified. The changes could include a rewrite and some trimmings so that the newer code was lean and mean.  The forked code would then be uploaded to a repository and consumers could then refer to it by name in the dependency and it would all work well with the elimination of the unnecessary step to bring it in any other way.
It turned out that this effort did not really work the way as intended. The reason for the failure is still under investigation. The code was supposed to be the same and it was reviewed to ensure that no defects were introduced. It had test cases that seemed to work. Yet the code was not working for users.  If all the factors that played out with the previous code were determined, the current effort would have panned out. This was the key learning and is often referred to in the industry as regression tests. These tests are written with the goal of keeping the behavior of the current code the same as that of the previous irrespective of the changes made to the current. Somewhere, there was a regression introduced and it was manifesting as a failure.
The nature of the software development is that it is always focused on functionality. However, it is the value that all the assets that guarantee the non-functional aspects of the code that determine how easy it is to use the code. The functional and non-functional aspects are both needed from any implementation but while the former has the bulk of the emphasis on the maker, it is the latter that determines the appreciation.

Thursday, December 19, 2019

The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.
There are two types of forwarding patterns between operators that help with parallelization one is one to one and another is one to many. The latter is the one that helps with multiplexing and is supported by streams that redistribute and are hence called redistributing streams. These streams use keyBy and map methods on the stream. The first method is a way for the incoming stream to be repartitioned into many outgoing streams and hence the term redistribution. When a stream is split into outgoing keyBy partitions, the events in the stream cannot be guaranteed to be in the same order as they arrived unless we are looking at events between the sender and one receiver. The events across receivers can be in any order depending on how the events were partitioned and how fast they were consumed by downstream operators. The second method is about transformation of a stream where the events get converted to other events that are easier to handle.
Within the interaction between a sender and receiver on a single stream, the events may be infinite. That is why a window is used to bound the events that will be considered in one set. This window can be of different types where the events overlap as in a sliding window or they don’t as in a tumbling wimdow. Windows can also span over periods of inactivity.
Operations that remember information across events are said to be Stateful. The state can only be maintained when events have keys associated with them. Let’s recall the keyBy operator that was used for repartitioning. This operator adds jets to events. The state is always kept together with the stream as if in an embedded key-value store. That is why the streams have to be keyed.
Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination

Wednesday, December 18, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink. 
For example, we have methods on StreamExecutionEnvironment such as 
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool. 
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream. 
For example, 
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.

We reviewed Flink’s DataStream and DataSet Api for its distinction of bounded and unbounded data.
Let us now take a look at the parallelizatio possibilities with Flink.
The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.

Tuesday, December 17, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink.
For example, we have methods on StreamExecutionEnvironment such as
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool.
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream.
For example,
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.