Wednesday, December 25, 2019

# basic write and read logic

public class BasicWriterReaderApp {
    private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        String scope = Constants.DEFAULT_SCOPE;
        String streamName = Constants.DEFAULT_STREAM_NAME;
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
                .build();

        logger.info("001- creating stream");
        Stream stream = pravegaConfig.resolve(streamName);

        logger.info("002- adding data");
        List<String> snippets = new ArrayList<>();
        snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
        snippets.add("2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.");
        DataStream<String> input = env.fromCollection(snippets);
        input.print();

        logger.info("003- iterate over data");
        IterativeStream<String> iteration = input.iterate();
        iteration.withFeedbackType(String.class);
        List<String> entries = new ArrayList<>();
        DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
        for (String entry: entries) {
             logger.info("entry={}", entry);
        }
        logger.info("Number_of_elements={}", String.valueOf(entries.size()));
        iteration.closeWith(mapped);

        logger.info("004 - creating a writer to write to stream");
        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
                .withEventRouter(new PravegaEventRouter<String >() {
                    @Override
                    public String getRoutingKey(String e) {
                        return e;
                    }
                })
            .withSerializationSchema(PravegaSerialization.serializationFor(String.class));
        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
        input.addSink(flinkPravegaWriter);
     
        java.lang.Thread.sleep(5000);
        logger.info("005 - creating a reader to read from stream");
        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        logger.info("006 - reading events from stream");
        DataStream<String> eventsRead = env
                    .addSource(flinkPravegaReader)
                    .name("eventsRead");
        IterativeStream<String> it = eventsRead.iterate();
        List<String> dataList = new ArrayList<>();
        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
        logger.info("count of events = {}", dataList.size());
        it.closeWith(newEvents);

        logger.info("007- done");
        env.execute("Stream Writer");
    }

}

No comments:

Post a Comment