Tuesday, January 7, 2020

Flink provides three different types of processing based on timestamps which are independent of the above two methods. There can be three different types of timestamps corresponding to: processing time, event time and ingestion time. 
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries. 
Event time also support watermarks. Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. Flink also provides a way to coalesce events within the window. 
Flink-connector has an EventTimeOrderingOperator This uses watermark and managed  state to buffer elements which helps to order elements by event time. This class extends the AbstractStreamOperator and implements the OneInputStreamOperator. The last seen watermark is initialized to min valueIt uses a timer service and mapState stashed in the runtime Context. It processes each stream record one by one. If the event does not have a timestamp it simply forwards. If the event has a timestamp, it buffers all the events between the current and the next watermark. 
When the event Timer fires due to watermark progression, it polls all the event time stamp that are less than or equal to the current watermark. If the timestamps are empty, the queued state is cleared otherwise the next watermark is registered. The sorted list of timestamps from buffered events is maintained In a priority queue. 
AscendingTimestampExtractor is a timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. This is true in the case of log files. The local watermarks are easily assigned because they follow the strictly increasing timestamps which are periodic.  

Finally, 
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        env.getConfig().setAutoWatermarkInterval(1000);
will order events as they come.

Please note that Print to Std Out's parallelism is set to 4.
We can lower it with:
        env.setParallelism(1);
        DataStream<String> input = env.fromCollection(snippets).setParallelism(1);

        input.print();

No comments:

Post a Comment