Tuesday, December 31, 2019


The following implementations of createEvent and getEvent indicate that the Pravega data store as a stream store can allow methods as described by the above Flink iterations:
    @Override
    public CompletableFuture<Void> createEvent(final String  routingKey,
                                               final String scopeName,
                                               final String streamName,
                                               final String message) {

        CompletableFuture<Void> ack = CompletableFuture.completedFuture(null);
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchLatestRevision();
        revisedStreamClient.writeConditionally(r, message);
        return ack;
}

    @Override
    public CompletableFuture<String> getEvent(final String  routingKey,
                                              final String scopeName,
                                              final String streamName,
                                              final Long segmentNumber) {
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchOldestRevision();
        Segment s = r.getSegment();
        io.pravega.client.stream.Stream stream = s.getStream();
        StringBuffer sb = new StringBuffer();
        while (iter.hasNext()) {            Map.Entry<Revision, String> entry = iter.next();
            sb.append(entry.getValue());
            }
        CompletableFuture<String> ack = CompletableFuture.completedFuture(sb.toString());
        return ack;
    }

Which results in the following log output:

2019-12-31 03:51:00,674 22519 [grizzly-http-server-2] INFO i.p.c.s.i.RevisionedStreamClientImpl - Wrote from 0 to 20
2019-12-31 03:51:00,675 22520 [grizzly-http-server-2] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - revisioned client wrote to revision: project58/_MARKlogstream2/0.#epoch.0:0:0
2019-12-31 03:51:00,778 22623 [grizzly-http-server-3] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - fetchLatestRevision=project58/_MARKlogstream2/0.#epoch.0:20:0

Monday, December 30, 2019


Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to read the stream.
Flink program implement iterations using a step function and embed it into a special iteration operator. This operator comes in two forms : iterate and delta iterate. Both repeatedly invoke the step function on the current step until a termination condition is reached.
For example of Iteration:
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();
A delta iteration is created by calling the iterateDelta(DataSet, int, int) (or iterateDelta(DataSet, int, int[]) respectively for a work set or solution set. . The arguments are the initial delta set, the maximum number of iterations and the key positions. The results are available with  iteration.getWorkset() and iteration.getSolutionSet()

Sunday, December 29, 2019

When a Flink job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted stream.

The number of events written to a stream is generally not determined because it is boundless. However, the number of events in a window is also not available without counting
public class LogIncrementer implements MapFunction<String, Long> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
    private static Long counter = 0L;

    @Override
    public Long map(String record) throws Exception {
        logger.info("counter={}, record={}", counter, record);
        return counter + 1;
    }
}
Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to

Saturday, December 28, 2019

iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams
IterativeStream it = eventsRead.iterate();
It.withFeedbackType(String. Class) ;
DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;
It.closeWith(mapped);
When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted

Friday, December 27, 2019

Small improvements matter when the volume of events is high such as when the stream store is connected to data pipelines or IoT traffic. The analytical processing may also perform repeated iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams

Thursday, December 26, 2019


The use of defaultCredentials in stream store:
Most open source code that use some sort of authentication require the use of a built-in credential so that these packages can be run right out of the box. For example, we have keycloak as an open source authentication and authorization framework with federation and brokerage capabilities which can be downloaded and run with a mentioned username and password and no customizations. A stream store like Pravega can also be downloaded and run with little or no changes as a standalone application and with an out of box username and password that has administrator privileges.
When these packages are included within another product, these hardcoded credentials are allowed to be changed per deployment with the help of configuration that gets relayed to these packages. The use of a credential different hardens the product containing these packages in mission critical deployments. 
This administrator credential works independent of any integrations provided through the product in which these open source packages are used. Even when the password changes per deployment, it is still good for administrative usage regardless of what credentials the user may have or the role with which the user may be accessing the resources of the package.
The difficulty in guessing the password does not take away the possibilities with the use of the password in both the standalone and integrated deployments of the package. This provides an alternative for the users to rule out any issues concerning the privilege with which their actions are invoked if the privileges are those corresponding to the administrator credential as opposed to that of the user.
For example, we have
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials("well-known-password", "well-known-username"));
and this allows the user to bypass any constraints associated with their credentials. Neither the system nor the user interface has any way of corroborating that the credential supplied is indeed coming from the user to whom it belongs. The purpose of this article is to suggest the use of these credentials only as a last resort for troubleshooting purposes with an explanation of how and why the technique works.
 Finally, the use of built-in credentials cannot work across integrations unless the product as a whole integrates the use of administrative activities with those of the packages used within the product.


Wednesday, December 25, 2019

# basic write and read logic

public class BasicWriterReaderApp {
    private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        String scope = Constants.DEFAULT_SCOPE;
        String streamName = Constants.DEFAULT_STREAM_NAME;
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
                .build();

        logger.info("001- creating stream");
        Stream stream = pravegaConfig.resolve(streamName);

        logger.info("002- adding data");
        List<String> snippets = new ArrayList<>();
        snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
        snippets.add("2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.");
        DataStream<String> input = env.fromCollection(snippets);
        input.print();

        logger.info("003- iterate over data");
        IterativeStream<String> iteration = input.iterate();
        iteration.withFeedbackType(String.class);
        List<String> entries = new ArrayList<>();
        DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
        for (String entry: entries) {
             logger.info("entry={}", entry);
        }
        logger.info("Number_of_elements={}", String.valueOf(entries.size()));
        iteration.closeWith(mapped);

        logger.info("004 - creating a writer to write to stream");
        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
                .withEventRouter(new PravegaEventRouter<String >() {
                    @Override
                    public String getRoutingKey(String e) {
                        return e;
                    }
                })
            .withSerializationSchema(PravegaSerialization.serializationFor(String.class));
        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
        input.addSink(flinkPravegaWriter);
     
        java.lang.Thread.sleep(5000);
        logger.info("005 - creating a reader to read from stream");
        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        logger.info("006 - reading events from stream");
        DataStream<String> eventsRead = env
                    .addSource(flinkPravegaReader)
                    .name("eventsRead");
        IterativeStream<String> it = eventsRead.iterate();
        List<String> dataList = new ArrayList<>();
        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
        logger.info("count of events = {}", dataList.size());
        it.closeWith(newEvents);

        logger.info("007- done");
        env.execute("Stream Writer");
    }

}