# basic write and read logic
public class BasicWriterReaderApp {
private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);
public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
String scope = Constants.DEFAULT_SCOPE;
String streamName = Constants.DEFAULT_STREAM_NAME;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
.build();
logger.info("001- creating stream");
Stream stream = pravegaConfig.resolve(streamName);
logger.info("002- adding data");
List<String> snippets = new ArrayList<>();
snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
snippets.add("2019-12-23 19:40:24,557 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.");
DataStream<String> input = env.fromCollection(snippets);
input.print();
logger.info("003- iterate over data");
IterativeStream<String> iteration = input.iterate();
iteration.withFeedbackType(String.class);
List<String> entries = new ArrayList<>();
DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
for (String entry: entries) {
logger.info("entry={}", entry);
}
logger.info("Number_of_elements={}", String.valueOf(entries.size()));
iteration.closeWith(mapped);
logger.info("004 - creating a writer to write to stream");
FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withEventRouter(new PravegaEventRouter<String >() {
@Override
public String getRoutingKey(String e) {
return e;
}
})
.withSerializationSchema(PravegaSerialization.serializationFor(String.class));
builder.enableWatermark(true);
builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
input.addSink(flinkPravegaWriter);
java.lang.Thread.sleep(5000);
logger.info("005 - creating a reader to read from stream");
FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
.build();
logger.info("006 - reading events from stream");
DataStream<String> eventsRead = env
.addSource(flinkPravegaReader)
.name("eventsRead");
IterativeStream<String> it = eventsRead.iterate();
List<String> dataList = new ArrayList<>();
DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
logger.info("count of events = {}", dataList.size());
it.closeWith(newEvents);
logger.info("007- done");
env.execute("Stream Writer");
}
}
public class BasicWriterReaderApp {
private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);
public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
String scope = Constants.DEFAULT_SCOPE;
String streamName = Constants.DEFAULT_STREAM_NAME;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
.build();
logger.info("001- creating stream");
Stream stream = pravegaConfig.resolve(streamName);
logger.info("002- adding data");
List<String> snippets = new ArrayList<>();
snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
snippets.add("2019-12-23 19:40:24,557 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.");
DataStream<String> input = env.fromCollection(snippets);
input.print();
logger.info("003- iterate over data");
IterativeStream<String> iteration = input.iterate();
iteration.withFeedbackType(String.class);
List<String> entries = new ArrayList<>();
DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
for (String entry: entries) {
logger.info("entry={}", entry);
}
logger.info("Number_of_elements={}", String.valueOf(entries.size()));
iteration.closeWith(mapped);
logger.info("004 - creating a writer to write to stream");
FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withEventRouter(new PravegaEventRouter<String >() {
@Override
public String getRoutingKey(String e) {
return e;
}
})
.withSerializationSchema(PravegaSerialization.serializationFor(String.class));
builder.enableWatermark(true);
builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
input.addSink(flinkPravegaWriter);
java.lang.Thread.sleep(5000);
logger.info("005 - creating a reader to read from stream");
FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
.build();
logger.info("006 - reading events from stream");
DataStream<String> eventsRead = env
.addSource(flinkPravegaReader)
.name("eventsRead");
IterativeStream<String> it = eventsRead.iterate();
List<String> dataList = new ArrayList<>();
DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
logger.info("count of events = {}", dataList.size());
it.closeWith(newEvents);
logger.info("007- done");
env.execute("Stream Writer");
}
}
No comments:
Post a Comment