#problemstatement:
Find streams that were read from
using audit.log between a particular timewindow say ‘2019-12-05T15:25:19.977991Z’
< t < ‘2019-12-11T15:25:19.977991Z’ to know which stream was most
vulnerable.
#FlinkApplication solution:
1)
Compile an FlinkApplication as follows:
StreamExecutionEnvironment env = new
StreamExecutionEnvironment()
DataStream<Events> lines = env.readFromFile(‘</path/to/audit.log>’)
/ / mount file as K8s generic secret
DataStream<Events> events = lines.map((line) ->
parse(line)).types(String.class, String.class, Long.class, Double.class);
DataStream<Statistics> stats = events.
.keyBy(“stageTimestamp”)
.filter(x =>
x.stageTimestamp > ‘2019-12-05T15:25:19.977991Z’ && x.stageTimestamp
<= ‘2019-12-11T15:25:19.977991Z’)
.groupBy(“stream”)
.reduceGroup(new
GroupReduceFunction<Tuple4<String, String, Long, Double>,
Tuple4<String, String, Long, Double>>() {
@Override
Public
void reduce(Iterable<Tuple4<String, String, Long, Double>> values,
Collector<Tuple4<String,
String, Long, Double>> out) throws Exception {
String
stageTimestamp = null;
String
streamName = null;
Long
Count = 1;
Double
Size = 0;
For(Tuple4<String, String, Long, Double> value: iterable) {
stageTimestamp = value.f0;
streamName
= value.f1;
Count++;
Size +=
value.f3 != null ? value.f3 : 0;
}
If (count>
50) {
Collector.collect(new
Tuple4<>(stageTimestamp, streamName, count, size));
}
}
})
.partitionCustom(new Partitioner<Double>) {
@Override
Public int partition(Double key, int numPartition) {
Return
key.intValue() – 1;
}
}, 1);
}
.print();
Step 2) Deploy the jar
No comments:
Post a Comment