Wednesday, December 11, 2019

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
The assignment of timestamps and watermarks is immediately after the data source, but is not strictly required. We can parse and filter before the timestamp assigner.
The parse function is done with the help of MapFunction. This follows a syntax of
DataSet<Y> result = input.map(new MyMapFunction());
The filter function is done with the help of
DataSet<X> result = input.filter(new MyFilterFunction());
Timestamp assigners take a stream and produce a new version with the overwrites in case the original stream had these timestamps and watermarks. As long as the assignment happens prior to the first operation on the event time, the orderly processing of the events remains the same as if the assignment happened at the source. In some special cases, the assignment may even be possible at the source.
The order of the events within a window follows the order of the timestamp and watermarks. However, there can be elements that violate the watermarks. A violation happens when a watermark has been set but the event encountered has a timestamp that precedes it. The difference in time between the newly arrived event and the watermark is called the lateness of the event. This might occur due to many reasons such as when the watermark is placed due to a large number of events that arrive together and the watermark is placed to help with the skipping off reprocessing these events. Even when the window is large, watermarks may be placed to indicate progress based on the number of events processed. In fact, in real world conditions, lateness of events is noticeable and often accommodated for in terms of processing. Lateness is also one of the reasons for the setting the preference of time-based processing type in the stream execution environment. If we use a key to the events and it has to be a timestamp, we have to keep the notion of the timestamp the same across processors and consistent with the way the event processing works. Message queues do not suffer from this problem because they don’t necessarily have to enforce order. Sometimes even poisoning the message or putting it in dead letter queue is sufficient.


No comments:

Post a Comment