Monday, December 30, 2019


Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to read the stream.
Flink program implement iterations using a step function and embed it into a special iteration operator. This operator comes in two forms : iterate and delta iterate. Both repeatedly invoke the step function on the current step until a termination condition is reached.
For example of Iteration:
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();
A delta iteration is created by calling the iterateDelta(DataSet, int, int) (or iterateDelta(DataSet, int, int[]) respectively for a work set or solution set. . The arguments are the initial delta set, the maximum number of iterations and the key positions. The results are available with  iteration.getWorkset() and iteration.getSolutionSet()

Sunday, December 29, 2019

When a Flink job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted stream.

The number of events written to a stream is generally not determined because it is boundless. However, the number of events in a window is also not available without counting
public class LogIncrementer implements MapFunction<String, Long> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
    private static Long counter = 0L;

    @Override
    public Long map(String record) throws Exception {
        logger.info("counter={}, record={}", counter, record);
        return counter + 1;
    }
}
Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to

Saturday, December 28, 2019

iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams
IterativeStream it = eventsRead.iterate();
It.withFeedbackType(String. Class) ;
DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;
It.closeWith(mapped);
When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted

Friday, December 27, 2019

Small improvements matter when the volume of events is high such as when the stream store is connected to data pipelines or IoT traffic. The analytical processing may also perform repeated iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams

Thursday, December 26, 2019


The use of defaultCredentials in stream store:
Most open source code that use some sort of authentication require the use of a built-in credential so that these packages can be run right out of the box. For example, we have keycloak as an open source authentication and authorization framework with federation and brokerage capabilities which can be downloaded and run with a mentioned username and password and no customizations. A stream store like Pravega can also be downloaded and run with little or no changes as a standalone application and with an out of box username and password that has administrator privileges.
When these packages are included within another product, these hardcoded credentials are allowed to be changed per deployment with the help of configuration that gets relayed to these packages. The use of a credential different hardens the product containing these packages in mission critical deployments. 
This administrator credential works independent of any integrations provided through the product in which these open source packages are used. Even when the password changes per deployment, it is still good for administrative usage regardless of what credentials the user may have or the role with which the user may be accessing the resources of the package.
The difficulty in guessing the password does not take away the possibilities with the use of the password in both the standalone and integrated deployments of the package. This provides an alternative for the users to rule out any issues concerning the privilege with which their actions are invoked if the privileges are those corresponding to the administrator credential as opposed to that of the user.
For example, we have
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials("well-known-password", "well-known-username"));
and this allows the user to bypass any constraints associated with their credentials. Neither the system nor the user interface has any way of corroborating that the credential supplied is indeed coming from the user to whom it belongs. The purpose of this article is to suggest the use of these credentials only as a last resort for troubleshooting purposes with an explanation of how and why the technique works.
 Finally, the use of built-in credentials cannot work across integrations unless the product as a whole integrates the use of administrative activities with those of the packages used within the product.


Wednesday, December 25, 2019

# basic write and read logic

public class BasicWriterReaderApp {
    private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        String scope = Constants.DEFAULT_SCOPE;
        String streamName = Constants.DEFAULT_STREAM_NAME;
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
                .build();

        logger.info("001- creating stream");
        Stream stream = pravegaConfig.resolve(streamName);

        logger.info("002- adding data");
        List<String> snippets = new ArrayList<>();
        snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
        snippets.add("2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.");
        DataStream<String> input = env.fromCollection(snippets);
        input.print();

        logger.info("003- iterate over data");
        IterativeStream<String> iteration = input.iterate();
        iteration.withFeedbackType(String.class);
        List<String> entries = new ArrayList<>();
        DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
        for (String entry: entries) {
             logger.info("entry={}", entry);
        }
        logger.info("Number_of_elements={}", String.valueOf(entries.size()));
        iteration.closeWith(mapped);

        logger.info("004 - creating a writer to write to stream");
        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
                .withEventRouter(new PravegaEventRouter<String >() {
                    @Override
                    public String getRoutingKey(String e) {
                        return e;
                    }
                })
            .withSerializationSchema(PravegaSerialization.serializationFor(String.class));
        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
        input.addSink(flinkPravegaWriter);
     
        java.lang.Thread.sleep(5000);
        logger.info("005 - creating a reader to read from stream");
        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        logger.info("006 - reading events from stream");
        DataStream<String> eventsRead = env
                    .addSource(flinkPravegaReader)
                    .name("eventsRead");
        IterativeStream<String> it = eventsRead.iterate();
        List<String> dataList = new ArrayList<>();
        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
        logger.info("count of events = {}", dataList.size());
        it.closeWith(newEvents);

        logger.info("007- done");
        env.execute("Stream Writer");
    }

}

Tuesday, December 24, 2019




Encapsulating logic for queries on logs:
DevOps engineers have to query logs on a day to day basis. Sometimes the queries are adhoc and at other times they are specific to the domain and can be used across incidents.  In both cases, they have very little time to struggle with the tools for querying the logs. Architects and system engineers who organize logs realize this and have favored the baseline use case of storing logs in filesystem and their search using shell commands and scripts. Every other log management solution can use these files on disk as input in order to facilitate enhanced log analysis experience from a user-interface. Files and folders for logs come with universal acceptance. Logs remain on file system only for limited duration and then they are rotated and periodically archived and finally aged and even removed.
Log files are therefore hard to beat. From single line shell commands to popular dscripts on logs, queries can be written, shared and curated. Sophisticated systems may allow extract-transform-load operations to import the logs into databases, object stores, stream stores or any other time-series products. But queries then become specific to the storage. For example, stream stores may require a library of jars each encapsulating specific logic bound to specific streams or dedicated to a form of query. With the exception of SQL, there is no universal query language that can span hybrid storage. These queries live within the database and there is no arguing that transferring large amounts of data to database is also costly and not maintenance-free.
The logic for querying logs are written usually in two layers – a low level primitive layers and a higher-level composites. Very rarely do we see joins or relations between logs. Instead pipelining of operators take precedence over the correlation of data because the stages of extracting from source, transforming, putting into sink and utilizing by-products of the transformation for subsequent storage and analysis follow a data flow model.
Indeed, a data driven approach of log analysis is not usually a concern as most users are willing to search all the logs if it weren’t so time consuming. What they really want is the ease of writing and refining queries because the curated library does not always eradicate the need for adhoc queries. In such cases, the library of existing code/script is merely a starting point for convenience which can then be edited for the current task.
Naming convention is also important. Most dev-ops engineers want to label their queries based on the defects or tickets that they are tracking for the case at hand. However, the name of the query must reflect its purpose to be useful and this remains the case whether the query is a sql script or a shell script or a java jar file. Sometimes, the library may provide storing the queries can provide ways to annotate metadata on the existing collection. They may also allow source and sink information on which the queries are best suited. However, adding this information as multi-part names is probably the easiest organization of all.
There is also no limit to the authors of the queries or their teams. If anything, an inventory may be persisted separate from the queries itself as a way to organize the library of queries.
These summarize some of the concerns regarding the organization of queries for logs.