Tuesday, January 7, 2020

Flink provides three different types of processing based on timestamps which are independent of the above two methods. There can be three different types of timestamps corresponding to: processing time, event time and ingestion time. 
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries. 
Event time also support watermarks. Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. Flink also provides a way to coalesce events within the window. 
Flink-connector has an EventTimeOrderingOperator This uses watermark and managed  state to buffer elements which helps to order elements by event time. This class extends the AbstractStreamOperator and implements the OneInputStreamOperator. The last seen watermark is initialized to min valueIt uses a timer service and mapState stashed in the runtime Context. It processes each stream record one by one. If the event does not have a timestamp it simply forwards. If the event has a timestamp, it buffers all the events between the current and the next watermark. 
When the event Timer fires due to watermark progression, it polls all the event time stamp that are less than or equal to the current watermark. If the timestamps are empty, the queued state is cleared otherwise the next watermark is registered. The sorted list of timestamps from buffered events is maintained In a priority queue. 
AscendingTimestampExtractor is a timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. This is true in the case of log files. The local watermarks are easily assigned because they follow the strictly increasing timestamps which are periodic.  

Finally, 
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        env.getConfig().setAutoWatermarkInterval(1000);
will order events as they come.

Please note that Print to Std Out's parallelism is set to 4.
We can lower it with:
        env.setParallelism(1);
        DataStream<String> input = env.fromCollection(snippets).setParallelism(1);

        input.print();

Monday, January 6, 2020

Ordering events in Flink involves two aspects:
First, it requires the events to be timestamped. This can be done either at the source or by methods in Flink
Second, it requires serialized execution when the events are processed as they come rather than by looking at the timestamps.

The method to do first is demonstrated with the following code:

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

The method to do second is by ensuring:
stream.setParallelism(1);

Or by the use of synchronized locks within Function objects.

Flink provides three different types of processing based on timestamps which are independent of the above two methods. There can be three different types of timestamps corresponding to: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks. Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. Flink also provides a way to coalesce events within the window.

Sunday, January 5, 2020


Apache Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

The timer service is called for future event processing via registered callbacks. The onTimer method is invoked for this particular processing. Inside this method, states are scoped to the key for which the timer was created.

Joins are possible on low-level operations of two inputs. The ProcessFunction or the KeyedCoProcessFunction is a function that is bound to two different inputs and it gets individual calls to processElement1 and processElement2 for records for respective inputs. One of the inputs is processed first. Its state is updated. Then the other input is processed. The earlier state is probed and the result is emitted. Since events may correspond to different times including delays, a watermark may be used as a reference and when it has elapsed, the operation may be performed.

The timers are fault-tolerant and checkpointed This helps with recovery if the task gets paused. There does not need to be an ordered set of events with the logic above.

The Flink coalescing may maintain only one timer per key and timestamp. The timer resolution is in milliseconds but the runtime may not permit it at this granularity although the system clock can. Besides, the timer may work upto +/- one-time unit equal to resolution from the time specified. Timers can be coalesced between watermarks. However, there can be only one timer per key per resolution time-unit.

We now review asynchronous I/O for external data access in Flink.  A MapFunction is synchronous. This might take arbitrary time for a single statement in an Flink Stream application.  Asynchronous access helps enable high stream throughput.

The number of request responses increased dramatically with high stream throughput which incurs a high resource cost and bookkeeping cost. Each asynchronous request returns a future and the results of the asynchronous operation are passed as the result of the future.

Asynchronous api implementations extend an AsyncFunction and specify a callback with asyncInvoke override. The I/O operation is then applied as a transformation with the help of
AsyncDataStream.unorderedWait( stream, new AsyncImplementation(), 1000, TimeUnit.MILLISECONDS, 100);

Asynchronous requests come with timeouts. The results can be ordered or unordered.

Saturday, January 4, 2020

Apache Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

The timer service is called for future event processing via registered callbacks. The onTimer method is invoked for this particular processing. Inside this method, states are scoped to the key for which the timer was created.

Joins are possible on low-level operations of two inputs. The ProcessFunction or the KeyedCoProcessFunction is a function that is bound to two different inputs and it gets individual calls to processElement1 and processElement2 for records for respective inputs. One of the inputs is processed first. It’s state is updated. Then the other input is processed. The earlier state is probed and the result is emitted. Since events may correspond to different times including delays, a watermark may be used as a reference and when it has elapsed, the operation may be performed.

The timers are fault-tolerant and checkpointed This helps with recovery if the task gets paused. There does not need to be an ordered set of events with the logic above.

The Flink coalescing may maintain only one timer per key and timestamp. The timer resolution is in milliseconds but the runtime may not permit it at this granularity although the system clock can. Besides, the timer may work upto +/- one-time unit equal to resolution from the time specified. Timers can be coalesced between watermarks. However, there can be only one timer per key per resolution time-unit.

Friday, January 3, 2020


Flink Streaming jobs appear to run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.

The logs for the background will show all the activities performed after the invocation.

There are also a few ways to gather some counts programmatically. These include:

eventsRead.addSink( new SinkFunction<String> () {

Private int count;

@Override

Public void invoke(String value) throws Exception {

             count++;

             logger.error(“count = {}, valueRead ={}”, count, value) ;

           }

}) ;



And the other is with using iterative streams

IterativeStream it = eventsRead.iterate();

It.withFeedbackType(String. Class) ;

DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;

It.closeWith(mapped);

When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.

There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

Thursday, January 2, 2020

Exception Stream:
When a computer program has a fault, it generates an Exception with a stacktrace of all the methods leading up to the point of failure. Developers write code in a way that lets exceptions to be written out to logs for offline analysis by DevOps Engineers. This article tries to list the implementation considerations for an automated solution to find the top generators of exceptions on a continuous basis.
We list the high-level steps first and then document the implementation considerations specific to continuous automated reporting of top exception generators. These are:
1) parse the logs by age and extract the exception
2) generate hash and store exception stacktrace as well as its hash in a separate stream
3) run a program to continuously monitor the exceptions written to the stream for categorizing and creating a histogram
4) report the top generators of exceptions from the histogram at any point of time on demand.
The choice of a stream store for exceptions is unique to meeting the demands of continuous reporting.  Traditional solutions usually poll the exception histogram periodically to draw the bar chart for top generators of exceptions.
The number of generators of exceptions and the number of exceptions for any generator can be arbitrary.  It helps to store each exception one by one in the stream. The generation of the histogram and its persistence with the positional reference of the last accounted exception from the exception stack trace stream is optional. Having a separate stream for log entries, exception stack traces and histograms enables efficiency in computation without repetitions. Otherwise just the exception stack trace stream is sufficient in the stream store.
The natural choice for writing and reading the exception stacktraces stream is Apache Flink and the programs to read and write can be separate.
The writer of exception stacktraces to its stream has to parse the logs. Usually the logs are available on the filesystem and seldom in the stream store but it is advisable to propagate the log to its own stream store. The parsing of log entries for extracting exceptions and the hashing of exception stack trace is well-known with stackhasher programs. The generation of histogram is a straight forward Flink query. The persistence of the histogram and the position of the last read exception only reduces the work set for which the histogram needs to be improved. Regular state persistence in the Flink application code can flush the histogram to disk.
It may be argued that the histogram is a metric rather than the log entry or the exception stack trace. A suitable metric stack and an influxQL can suffice to do it. Even storing the histogram in an object store for universal web access, metrics and alerts may be an alternative. However, the convenience of updating the histogram at the point of reading the exception stack trace makes it easy to include it in the Flink Application.

Wednesday, January 1, 2020

A REST based data path to a stream store can work as a log sink in a PKS hosted and Kubernetes cluster deployed stream storage so that the data from the source can find its way to a sink with little or no intervention. Then the generation, collection, sink and analysis of the log entries follows a staged propagation in a pipeline model and makes the logs available for extract-transform-load, analysis and reporting solutions downstream.

The stages are:

Kube api-server is outside the Kubernetes cluster and any products hosted on Kubernetes. As an infrastructure it is well suited to turn on these collection items and determine their transmission techniques. The upshot is that we have a set of command line parameters as input and and data flow as output  

Transformation of data. This is a required step because this data is generally read only. Transformation means select, project, map, filter, reduce and other functionalities. Flink application can be leveraged for this purpose.

Sink of event where we leverage a data path directly to the stream store allowing all reporting stacks to read from the store instead of the source.

The logic for querying logs is written usually in two layers – a low level primitive layers and a higher-level composite. Very rarely do we see joins or relations between logs. Instead pipelining of operators take precedence over the correlation of data because the stages of extracting from source, transforming, putting into sink and utilizing by-products of the transformation for subsequent storage and analysis follow a data flow model.  


Indeed, a data driven approach of log analysis is not usually a concern as most users are willing to search all the logs if it weren’t so time consuming. What they really want is the ease of writing and refining queries because the curated library does not always eradicate the need for adhoc queries. In such cases, the library of existing code/script is merely a starting point for convenience which can then be edited for the current task.