Saturday, January 11, 2020

Let us look at the use of timerservice in the process function a little closer.
public class LogExceptionKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, String>, LogException> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionKeyedProcessFunction.class);
    private static final Map<String, LogException> exceptionMap = new HashMap<>();
    private static final List<String> collected = new ArrayList<String>();
    private static Object lock = new Object();
    private static LogException exception  = new LogException();
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }
    @Override
    public void processElement(Tuple2<String, String> item, Context context, Collector<LogException> collector) throws Exception {
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = item._1;
        }
        logger.debug("s={},v={}", item._1, item._2);
        synchronized (lock) {
            if (exceptionMap.containsKey(item._1)) {
                exception = exceptionMap.get(item._1);
                exception.getData().add(item._2);
                logger.debug("id:{}, sizeof stacktrace : {}", exception.getId(), exception.getData().size());
                java.lang.Thread.sleep(1000);
            } else {
                exception = new LogException();
                exception.setTimestamp(item._1);
                exception.getData().add(item._2);
                exceptionMap.put(item._1, exception);
                logger.debug("id:{}, putting: {}:{}", exception.getId(), item._1, item._2);
            }
        }
        current.count++;
        current.lastModified = System.currentTimeMillis();
        current.exception = exception;
        state.update(current);
        context.timerService().registerProcessingTimeTimer(current.lastModified + 2000);
    }

    private void collectException(Collector<LogException> collector) {
            for (Map.Entry<String, LogException> entry: exceptionMap.entrySet()) {
                if (collected.contains(entry.getKey()) == false) {
                    collector.collect(entry.getValue());
                    collected.add(entry.getKey());
                }
            }
   }
    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<LogException> out) throws Exception {

            CountWithTimestamp result = state.value();
        logger.error("Timer_called_with_result:count:{}, {}", result.count, result.exception.getTimestamp());
        collectException(out);
    }

    public class CountWithTimestamp {
        public String key;
        public long count;
        public long lastModified;
        public LogException exception;
    }
}

Notice that the method collects different exceptions from the exceptionMap depending on the time and speed of the filling of exceptionMap. Fortunately, they are predictable on the time at which the method is invoked.

No comments:

Post a Comment