Sample program to extract Exception from logs as stream
public class LogExceptionExtractor {
private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
.build();
logger.info("001- creating stream");
DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
.flatMap(new ExceptionTimestampExtractor())
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.allowedLateness(Time.seconds(4))
.process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
if (getRuntimeContext() == null) {
setRuntimeContext((RuntimeContext) new LogException());
}
LogException logException = (LogException) getRuntimeContext();
for(Tuple2<String, String> item : iterable) {
logException.setTimestamp(item._1);
logException.getData().add(item._2);
}
collector.collect(logException);
}
});
FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
.withPravegaConfig(pravegaConfig)
.forStream((Stream) exceptionDataStream)
.withEventRouter(new LogExceptionRouter())
.withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));
builder.enableWatermark(true);
builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
exceptionDataStream.addSink(flinkPravegaWriter);
env.execute("Stream Writer");
}
}
The above method works partly due to the parallelization based on key
public class LogExceptionExtractor {
private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
.build();
logger.info("001- creating stream");
DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
.flatMap(new ExceptionTimestampExtractor())
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.allowedLateness(Time.seconds(4))
.process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
if (getRuntimeContext() == null) {
setRuntimeContext((RuntimeContext) new LogException());
}
LogException logException = (LogException) getRuntimeContext();
for(Tuple2<String, String> item : iterable) {
logException.setTimestamp(item._1);
logException.getData().add(item._2);
}
collector.collect(logException);
}
});
FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
.withPravegaConfig(pravegaConfig)
.forStream((Stream) exceptionDataStream)
.withEventRouter(new LogExceptionRouter())
.withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));
builder.enableWatermark(true);
builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
exceptionDataStream.addSink(flinkPravegaWriter);
env.execute("Stream Writer");
}
}
The above method works partly due to the parallelization based on key
No comments:
Post a Comment