Thursday, January 9, 2020

The processFunction takes action on every event but the user decides on what the collector collects.  
There are four aspects of these timers: 
1) Timers work with KeyedStreams. 
2) Timers allow events to be de-duplicated. 
3) Timers are checkpointed 
4) Timers can be deleted. 
 
There is at most one timer per key. This results in deduplication of events. Multiple timers are registered for the same key or timestamp then only one will fire which will automatically de-duplicate timers for events. They are also persisted so the process function becomes fault-tolerant. Timers can also be deregistered. 

The ReduceFunction can be used to find the sum: 
keyedStream.reduce(new ReduceFunction<Integer>() { 
    @Override 
    public Integer reduce(Integer val1, Integer val2) 
    throws Exception { 
        return val1 + val2; 
    } 
}); 

This can also be done with processFunction method as well: 
eventsRead.process((s, context, collector) -> { 
        Logger.info(“processElement:s={}”, s); 
        collector.collect(1); 
}); 

When the Job execution is in detached mode, the results may not be immediately available. If the results are to be sought, it is better to write it to another stream. This stream can then be read by other means that don’t need an Flink job execution.  

The JobExecutionResult is available on env.execute()  These include the result from accumulators and counters. 
For example, a Histogram accumulator can be instantiated inside RichFlatMapFunction.  This would include 
Private IntCounter intCounter = new IntCounter(); 
GetRuntimeContext().addAccumulator(“counterName”, this.intCounter()); 
this.intCounter.add(1); 

Wednesday, January 8, 2020

Flink-connector has an EventTimeOrderingOperator This uses watermark and managed  state to buffer elements which helps to order elements by event time. This class extends the AbstractStreamOperator and implements the OneInputStreamOperator. The last seen watermark is initialized to min valueIt uses a timer service and mapState stashed in the runtime Context. It processes each stream record one by one. If the event does not have a timestamp it simply forwards. If the event has a timestamp, it buffers all the events between the current and the next watermark. 
When the event Timer fires due to watermark progression, it polls all the event time stamp that are less than or equal to the current watermark. If the timestamps are empty, the queued state is cleared otherwise the next watermark is registered. The sorted list of timestamps from buffered events is maintained In a priority queue. 
AscendingTimestampExtractor is a timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. This is true in the case of log files. The local watermarks are easily assigned because they follow the strictly increasing timestamps which are periodic.  
ProcessFunction allows the use of timers. This allows receiving callback with the OnTimer method.
the TimerService provides methods like:
currentProcessingTime
currentWatermark
registerEventTimeTimer
registerProcessingTimeTimer
which can peg the time when the timer fires. Within the function there can be any criteria to set the timer and because a callback should be received, selective action can be taken for that event.
One of the most common applications of the Timer callback is the use of collector.collect method. This lets the processFunction take action on every event but decides what the collector collects.
There are four aspects of these timers:
1) Timers work with KeyedStreams.
2) Timers allow events to be de-duplicated.
3) Timers are checkpointed
4) Timers can be deleted.

There is at most one timer per key. This results in deduplication of events. Multiple timers are registered for the same key or timestamp then only one will fire which will automatically de-duplicate timers for events. They are also persisted so the process function becomes fault-tolerant. Timers can also be deregistered.

Tuesday, January 7, 2020

Flink provides three different types of processing based on timestamps which are independent of the above two methods. There can be three different types of timestamps corresponding to: processing time, event time and ingestion time. 
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries. 
Event time also support watermarks. Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. Flink also provides a way to coalesce events within the window. 
Flink-connector has an EventTimeOrderingOperator This uses watermark and managed  state to buffer elements which helps to order elements by event time. This class extends the AbstractStreamOperator and implements the OneInputStreamOperator. The last seen watermark is initialized to min valueIt uses a timer service and mapState stashed in the runtime Context. It processes each stream record one by one. If the event does not have a timestamp it simply forwards. If the event has a timestamp, it buffers all the events between the current and the next watermark. 
When the event Timer fires due to watermark progression, it polls all the event time stamp that are less than or equal to the current watermark. If the timestamps are empty, the queued state is cleared otherwise the next watermark is registered. The sorted list of timestamps from buffered events is maintained In a priority queue. 
AscendingTimestampExtractor is a timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. This is true in the case of log files. The local watermarks are easily assigned because they follow the strictly increasing timestamps which are periodic.  

Finally, 
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        env.getConfig().setAutoWatermarkInterval(1000);
will order events as they come.

Please note that Print to Std Out's parallelism is set to 4.
We can lower it with:
        env.setParallelism(1);
        DataStream<String> input = env.fromCollection(snippets).setParallelism(1);

        input.print();

Monday, January 6, 2020

Ordering events in Flink involves two aspects:
First, it requires the events to be timestamped. This can be done either at the source or by methods in Flink
Second, it requires serialized execution when the events are processed as they come rather than by looking at the timestamps.

The method to do first is demonstrated with the following code:

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

The method to do second is by ensuring:
stream.setParallelism(1);

Or by the use of synchronized locks within Function objects.

Flink provides three different types of processing based on timestamps which are independent of the above two methods. There can be three different types of timestamps corresponding to: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks. Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. Flink also provides a way to coalesce events within the window.

Sunday, January 5, 2020


Apache Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

The timer service is called for future event processing via registered callbacks. The onTimer method is invoked for this particular processing. Inside this method, states are scoped to the key for which the timer was created.

Joins are possible on low-level operations of two inputs. The ProcessFunction or the KeyedCoProcessFunction is a function that is bound to two different inputs and it gets individual calls to processElement1 and processElement2 for records for respective inputs. One of the inputs is processed first. Its state is updated. Then the other input is processed. The earlier state is probed and the result is emitted. Since events may correspond to different times including delays, a watermark may be used as a reference and when it has elapsed, the operation may be performed.

The timers are fault-tolerant and checkpointed This helps with recovery if the task gets paused. There does not need to be an ordered set of events with the logic above.

The Flink coalescing may maintain only one timer per key and timestamp. The timer resolution is in milliseconds but the runtime may not permit it at this granularity although the system clock can. Besides, the timer may work upto +/- one-time unit equal to resolution from the time specified. Timers can be coalesced between watermarks. However, there can be only one timer per key per resolution time-unit.

We now review asynchronous I/O for external data access in Flink.  A MapFunction is synchronous. This might take arbitrary time for a single statement in an Flink Stream application.  Asynchronous access helps enable high stream throughput.

The number of request responses increased dramatically with high stream throughput which incurs a high resource cost and bookkeeping cost. Each asynchronous request returns a future and the results of the asynchronous operation are passed as the result of the future.

Asynchronous api implementations extend an AsyncFunction and specify a callback with asyncInvoke override. The I/O operation is then applied as a transformation with the help of
AsyncDataStream.unorderedWait( stream, new AsyncImplementation(), 1000, TimeUnit.MILLISECONDS, 100);

Asynchronous requests come with timeouts. The results can be ordered or unordered.

Saturday, January 4, 2020

Apache Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

The timer service is called for future event processing via registered callbacks. The onTimer method is invoked for this particular processing. Inside this method, states are scoped to the key for which the timer was created.

Joins are possible on low-level operations of two inputs. The ProcessFunction or the KeyedCoProcessFunction is a function that is bound to two different inputs and it gets individual calls to processElement1 and processElement2 for records for respective inputs. One of the inputs is processed first. It’s state is updated. Then the other input is processed. The earlier state is probed and the result is emitted. Since events may correspond to different times including delays, a watermark may be used as a reference and when it has elapsed, the operation may be performed.

The timers are fault-tolerant and checkpointed This helps with recovery if the task gets paused. There does not need to be an ordered set of events with the logic above.

The Flink coalescing may maintain only one timer per key and timestamp. The timer resolution is in milliseconds but the runtime may not permit it at this granularity although the system clock can. Besides, the timer may work upto +/- one-time unit equal to resolution from the time specified. Timers can be coalesced between watermarks. However, there can be only one timer per key per resolution time-unit.