Monday, December 23, 2019

Sample program to extract Exception from logs as stream


public class LogExceptionExtractor {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
            .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
            .build();

logger.info("001- creating stream");
     DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
                .flatMap(new ExceptionTimestampExtractor())
                .keyBy(0)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
                .allowedLateness(Time.seconds(4))
                .process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
                        if (getRuntimeContext() == null) {
                            setRuntimeContext((RuntimeContext) new LogException());
                        }
                        LogException logException = (LogException) getRuntimeContext();
                        for(Tuple2<String, String> item : iterable) {
                            logException.setTimestamp(item._1);
                            logException.getData().add(item._2);
                        }
                        collector.collect(logException);
                    }
                });

        FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream((Stream) exceptionDataStream)
            .withEventRouter(new LogExceptionRouter())
            .withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));


        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
        exceptionDataStream.addSink(flinkPravegaWriter);
        env.execute("Stream Writer");
    }

}
The above method works partly due to the parallelization based on key

No comments:

Post a Comment