Saturday, January 11, 2020

Let us look at the use of timerservice in the process function a little closer.
public class LogExceptionKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, String>, LogException> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionKeyedProcessFunction.class);
    private static final Map<String, LogException> exceptionMap = new HashMap<>();
    private static final List<String> collected = new ArrayList<String>();
    private static Object lock = new Object();
    private static LogException exception  = new LogException();
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }
    @Override
    public void processElement(Tuple2<String, String> item, Context context, Collector<LogException> collector) throws Exception {
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = item._1;
        }
        logger.debug("s={},v={}", item._1, item._2);
        synchronized (lock) {
            if (exceptionMap.containsKey(item._1)) {
                exception = exceptionMap.get(item._1);
                exception.getData().add(item._2);
                logger.debug("id:{}, sizeof stacktrace : {}", exception.getId(), exception.getData().size());
                java.lang.Thread.sleep(1000);
            } else {
                exception = new LogException();
                exception.setTimestamp(item._1);
                exception.getData().add(item._2);
                exceptionMap.put(item._1, exception);
                logger.debug("id:{}, putting: {}:{}", exception.getId(), item._1, item._2);
            }
        }
        current.count++;
        current.lastModified = System.currentTimeMillis();
        current.exception = exception;
        state.update(current);
        context.timerService().registerProcessingTimeTimer(current.lastModified + 2000);
    }

    private void collectException(Collector<LogException> collector) {
            for (Map.Entry<String, LogException> entry: exceptionMap.entrySet()) {
                if (collected.contains(entry.getKey()) == false) {
                    collector.collect(entry.getValue());
                    collected.add(entry.getKey());
                }
            }
   }
    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<LogException> out) throws Exception {

            CountWithTimestamp result = state.value();
        logger.error("Timer_called_with_result:count:{}, {}", result.count, result.exception.getTimestamp());
        collectException(out);
    }

    public class CountWithTimestamp {
        public String key;
        public long count;
        public long lastModified;
        public LogException exception;
    }
}

Notice that the method collects different exceptions from the exceptionMap depending on the time and speed of the filling of exceptionMap. Fortunately, they are predictable on the time at which the method is invoked.

Friday, January 10, 2020

Communication Protocols between independent programs – a comparisons of gRPC versus REST.
The popularity of web protocols has increased over the last decade because it helps connect heterogeneous applications and services that can be hosted anywhere. There are two popular protocols gRPC and REST. We will use their abbreviations as is with their comparisons as follows:
REST
This is a way of requesting resources from the remote end via standard verbs such as GET, PUT etc.
The advantages are:
Requires HTTP/1.1
Supports subscription mechanisms with REST hooks
Comes with widely accepted tool and browser support
Well defined road to development of the service that provides this communication
Supports discovery of resource identifiers with subsequent request response models.
Is supportive of software development kit where more than one language can be supported for the use of these communication interfaces.
The disadvantages are:
Is considered chatty because there are a number of requests and responses
Is considered heavy because the payload is usually large.
Is considered inflexible at times with versioning costs
gRPC:
This is a way of requesting resources from the remote end because the application by processing routines rather than asking for resources. Routines are the equivalent of verbs and resources and some treat this communication as a refinement of RPC and SOAP which were protocols that are now considered legacy.
The advantages are:
Supports high speed communication because it is lightweight and does not require the traversal of stack all the way up and down the networking layers.
The messages are over “Protocol Buffer” which is known for being efficient in packing and unpacking data
It works over newer HTTP/2
Best for traffic from devices (IoT)
The disadvantages are
Requires client to write code
Does not support browser
Both REST and gRPC support secure transport layer communication which makes communication between two parties as private. When corporations make significant investment in the development of each, they tend to be a choice for development teams. However, supporting both communication protocol only widens the audience and does not have to be mutually exclusive given enough resources and time. They also broaden the customer base.

Thursday, January 9, 2020

The processFunction takes action on every event but the user decides on what the collector collects.  
There are four aspects of these timers: 
1) Timers work with KeyedStreams. 
2) Timers allow events to be de-duplicated. 
3) Timers are checkpointed 
4) Timers can be deleted. 
 
There is at most one timer per key. This results in deduplication of events. Multiple timers are registered for the same key or timestamp then only one will fire which will automatically de-duplicate timers for events. They are also persisted so the process function becomes fault-tolerant. Timers can also be deregistered. 

The ReduceFunction can be used to find the sum: 
keyedStream.reduce(new ReduceFunction<Integer>() { 
    @Override 
    public Integer reduce(Integer val1, Integer val2) 
    throws Exception { 
        return val1 + val2; 
    } 
}); 

This can also be done with processFunction method as well: 
eventsRead.process((s, context, collector) -> { 
        Logger.info(“processElement:s={}”, s); 
        collector.collect(1); 
}); 

When the Job execution is in detached mode, the results may not be immediately available. If the results are to be sought, it is better to write it to another stream. This stream can then be read by other means that don’t need an Flink job execution.  

The JobExecutionResult is available on env.execute()  These include the result from accumulators and counters. 
For example, a Histogram accumulator can be instantiated inside RichFlatMapFunction.  This would include 
Private IntCounter intCounter = new IntCounter(); 
GetRuntimeContext().addAccumulator(“counterName”, this.intCounter()); 
this.intCounter.add(1); 

Wednesday, January 8, 2020

Flink-connector has an EventTimeOrderingOperator This uses watermark and managed  state to buffer elements which helps to order elements by event time. This class extends the AbstractStreamOperator and implements the OneInputStreamOperator. The last seen watermark is initialized to min valueIt uses a timer service and mapState stashed in the runtime Context. It processes each stream record one by one. If the event does not have a timestamp it simply forwards. If the event has a timestamp, it buffers all the events between the current and the next watermark. 
When the event Timer fires due to watermark progression, it polls all the event time stamp that are less than or equal to the current watermark. If the timestamps are empty, the queued state is cleared otherwise the next watermark is registered. The sorted list of timestamps from buffered events is maintained In a priority queue. 
AscendingTimestampExtractor is a timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. This is true in the case of log files. The local watermarks are easily assigned because they follow the strictly increasing timestamps which are periodic.  
ProcessFunction allows the use of timers. This allows receiving callback with the OnTimer method.
the TimerService provides methods like:
currentProcessingTime
currentWatermark
registerEventTimeTimer
registerProcessingTimeTimer
which can peg the time when the timer fires. Within the function there can be any criteria to set the timer and because a callback should be received, selective action can be taken for that event.
One of the most common applications of the Timer callback is the use of collector.collect method. This lets the processFunction take action on every event but decides what the collector collects.
There are four aspects of these timers:
1) Timers work with KeyedStreams.
2) Timers allow events to be de-duplicated.
3) Timers are checkpointed
4) Timers can be deleted.

There is at most one timer per key. This results in deduplication of events. Multiple timers are registered for the same key or timestamp then only one will fire which will automatically de-duplicate timers for events. They are also persisted so the process function becomes fault-tolerant. Timers can also be deregistered.