Thursday, December 12, 2019


#problemstatement:
Find streams that were read from using audit.log between a particular timewindow say ‘2019-12-05T15:25:19.977991Z’ < t < ‘2019-12-11T15:25:19.977991Z’ to know which stream was most vulnerable.
#FlinkApplication solution:
1)      Compile an FlinkApplication as follows:
StreamExecutionEnvironment env = new StreamExecutionEnvironment()
DataStream<Events> lines = env.readFromFile(‘</path/to/audit.log>’) / / mount file as K8s generic secret
DataStream<Events> events = lines.map((line) -> parse(line)).types(String.class, String.class, Long.class, Double.class);
DataStream<Statistics> stats = events.
      .keyBy(“stageTimestamp”)
      .filter(x => x.stageTimestamp > ‘2019-12-05T15:25:19.977991Z’ && x.stageTimestamp <= ‘2019-12-11T15:25:19.977991Z’)
      .groupBy(“stream”)
      .reduceGroup(new GroupReduceFunction<Tuple4<String, String, Long, Double>,
                                                                                    Tuple4<String, String, Long, Double>>() {
             @Override
              Public void reduce(Iterable<Tuple4<String, String, Long, Double>> values,
                                                               Collector<Tuple4<String, String, Long, Double>> out) throws Exception {
                                String stageTimestamp = null;
                                String streamName = null;
                                Long Count = 1;
                                Double Size = 0;
                             For(Tuple4<String, String, Long, Double> value: iterable) {
                                    stageTimestamp = value.f0;
                                    streamName = value.f1;
                                    Count++;
                                    Size += value.f3 != null ? value.f3 : 0;
                               }
                               If (count> 50) {
                                                Collector.collect(new Tuple4<>(stageTimestamp, streamName, count, size));
                                }
                         }
                   })
                 .partitionCustom(new Partitioner<Double>) {
                        @Override
                         Public int partition(Double key, int numPartition) {
                                     Return key.intValue() – 1;
                         }
                     }, 1);
                   }

      .print();

Step 2) Deploy the jar 

No comments:

Post a Comment