Saturday, December 28, 2019

iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams
IterativeStream it = eventsRead.iterate();
It.withFeedbackType(String. Class) ;
DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;
It.closeWith(mapped);
When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted

Friday, December 27, 2019

Small improvements matter when the volume of events is high such as when the stream store is connected to data pipelines or IoT traffic. The analytical processing may also perform repeated iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams

Thursday, December 26, 2019


The use of defaultCredentials in stream store:
Most open source code that use some sort of authentication require the use of a built-in credential so that these packages can be run right out of the box. For example, we have keycloak as an open source authentication and authorization framework with federation and brokerage capabilities which can be downloaded and run with a mentioned username and password and no customizations. A stream store like Pravega can also be downloaded and run with little or no changes as a standalone application and with an out of box username and password that has administrator privileges.
When these packages are included within another product, these hardcoded credentials are allowed to be changed per deployment with the help of configuration that gets relayed to these packages. The use of a credential different hardens the product containing these packages in mission critical deployments. 
This administrator credential works independent of any integrations provided through the product in which these open source packages are used. Even when the password changes per deployment, it is still good for administrative usage regardless of what credentials the user may have or the role with which the user may be accessing the resources of the package.
The difficulty in guessing the password does not take away the possibilities with the use of the password in both the standalone and integrated deployments of the package. This provides an alternative for the users to rule out any issues concerning the privilege with which their actions are invoked if the privileges are those corresponding to the administrator credential as opposed to that of the user.
For example, we have
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials("well-known-password", "well-known-username"));
and this allows the user to bypass any constraints associated with their credentials. Neither the system nor the user interface has any way of corroborating that the credential supplied is indeed coming from the user to whom it belongs. The purpose of this article is to suggest the use of these credentials only as a last resort for troubleshooting purposes with an explanation of how and why the technique works.
 Finally, the use of built-in credentials cannot work across integrations unless the product as a whole integrates the use of administrative activities with those of the packages used within the product.


Wednesday, December 25, 2019

# basic write and read logic

public class BasicWriterReaderApp {
    private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        String scope = Constants.DEFAULT_SCOPE;
        String streamName = Constants.DEFAULT_STREAM_NAME;
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
                .build();

        logger.info("001- creating stream");
        Stream stream = pravegaConfig.resolve(streamName);

        logger.info("002- adding data");
        List<String> snippets = new ArrayList<>();
        snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
        snippets.add("2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.");
        DataStream<String> input = env.fromCollection(snippets);
        input.print();

        logger.info("003- iterate over data");
        IterativeStream<String> iteration = input.iterate();
        iteration.withFeedbackType(String.class);
        List<String> entries = new ArrayList<>();
        DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
        for (String entry: entries) {
             logger.info("entry={}", entry);
        }
        logger.info("Number_of_elements={}", String.valueOf(entries.size()));
        iteration.closeWith(mapped);

        logger.info("004 - creating a writer to write to stream");
        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
                .withEventRouter(new PravegaEventRouter<String >() {
                    @Override
                    public String getRoutingKey(String e) {
                        return e;
                    }
                })
            .withSerializationSchema(PravegaSerialization.serializationFor(String.class));
        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
        input.addSink(flinkPravegaWriter);
     
        java.lang.Thread.sleep(5000);
        logger.info("005 - creating a reader to read from stream");
        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        logger.info("006 - reading events from stream");
        DataStream<String> eventsRead = env
                    .addSource(flinkPravegaReader)
                    .name("eventsRead");
        IterativeStream<String> it = eventsRead.iterate();
        List<String> dataList = new ArrayList<>();
        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
        logger.info("count of events = {}", dataList.size());
        it.closeWith(newEvents);

        logger.info("007- done");
        env.execute("Stream Writer");
    }

}

Tuesday, December 24, 2019




Encapsulating logic for queries on logs:
DevOps engineers have to query logs on a day to day basis. Sometimes the queries are adhoc and at other times they are specific to the domain and can be used across incidents.  In both cases, they have very little time to struggle with the tools for querying the logs. Architects and system engineers who organize logs realize this and have favored the baseline use case of storing logs in filesystem and their search using shell commands and scripts. Every other log management solution can use these files on disk as input in order to facilitate enhanced log analysis experience from a user-interface. Files and folders for logs come with universal acceptance. Logs remain on file system only for limited duration and then they are rotated and periodically archived and finally aged and even removed.
Log files are therefore hard to beat. From single line shell commands to popular dscripts on logs, queries can be written, shared and curated. Sophisticated systems may allow extract-transform-load operations to import the logs into databases, object stores, stream stores or any other time-series products. But queries then become specific to the storage. For example, stream stores may require a library of jars each encapsulating specific logic bound to specific streams or dedicated to a form of query. With the exception of SQL, there is no universal query language that can span hybrid storage. These queries live within the database and there is no arguing that transferring large amounts of data to database is also costly and not maintenance-free.
The logic for querying logs are written usually in two layers – a low level primitive layers and a higher-level composites. Very rarely do we see joins or relations between logs. Instead pipelining of operators take precedence over the correlation of data because the stages of extracting from source, transforming, putting into sink and utilizing by-products of the transformation for subsequent storage and analysis follow a data flow model.
Indeed, a data driven approach of log analysis is not usually a concern as most users are willing to search all the logs if it weren’t so time consuming. What they really want is the ease of writing and refining queries because the curated library does not always eradicate the need for adhoc queries. In such cases, the library of existing code/script is merely a starting point for convenience which can then be edited for the current task.
Naming convention is also important. Most dev-ops engineers want to label their queries based on the defects or tickets that they are tracking for the case at hand. However, the name of the query must reflect its purpose to be useful and this remains the case whether the query is a sql script or a shell script or a java jar file. Sometimes, the library may provide storing the queries can provide ways to annotate metadata on the existing collection. They may also allow source and sink information on which the queries are best suited. However, adding this information as multi-part names is probably the easiest organization of all.
There is also no limit to the authors of the queries or their teams. If anything, an inventory may be persisted separate from the queries itself as a way to organize the library of queries.
These summarize some of the concerns regarding the organization of queries for logs.


Monday, December 23, 2019

Sample program to extract Exception from logs as stream


public class LogExceptionExtractor {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
            .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
            .build();

logger.info("001- creating stream");
     DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
                .flatMap(new ExceptionTimestampExtractor())
                .keyBy(0)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
                .allowedLateness(Time.seconds(4))
                .process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
                        if (getRuntimeContext() == null) {
                            setRuntimeContext((RuntimeContext) new LogException());
                        }
                        LogException logException = (LogException) getRuntimeContext();
                        for(Tuple2<String, String> item : iterable) {
                            logException.setTimestamp(item._1);
                            logException.getData().add(item._2);
                        }
                        collector.collect(logException);
                    }
                });

        FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream((Stream) exceptionDataStream)
            .withEventRouter(new LogExceptionRouter())
            .withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));


        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
        exceptionDataStream.addSink(flinkPravegaWriter);
        env.execute("Stream Writer");
    }

}
The above method works partly due to the parallelization based on key

Sunday, December 22, 2019


In this article, we continue with the discussion on stream processing with Flink and look into the use of windows. An infinite stream can only be processed if it were split into finite size. A window refers to this split and can have a size dependent on one or the other factors. A programmer generally refers to the use of windows with the help of syntax to define the split. In the case of keyed streams, she uses “keyBy” operator followed by “window” and in the case of non-keyed streams, she uses “windowAll”
The keyed stream helps with parallelizing the stream tasks.
There are four types of windows:
1)      Tumbling window: A tumbling window assigns each element to a window of a specified window size as with the code shown
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
2)      Sliding window: this window allows overlapping with other windows even though the size of the window is fixed as per the previous definition. An additional window slide parameter dictates how often the next window comes up.
3)      Session window: this window assigns elements by group elements by activity. The window does not have a fixed size and do not overlap The number of elements within a window can vary and the gap between the windows is called session gap.
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
    // determine and return session gap
}))
4)      Global window: This is a singleton spanning all events with the same key into the same window. Since it does not have a beginning or an end, no processing can be done unless there is a trigger specified. A trigger determines when a window is ready.
The computation on a window is performed with the help of a window function which can be one of ReduceFunction, AggregateFunction, FoldFunction, or ProcessingWindowFunction. Flink incrementally aggregates the elements for each window as they arrive for the first two window functions. The ProcessingWindowFunction requires buffering but it returns an iterator with metadata about the window. A fold function determines how each input value affects the output value and is called once for each element as it is added to the window.
The context object holds the state which are of two types:
1)      globalState()
2)      windowState()
Both allow access to the keyed state but the former is not scoped to a window while the latter is scoped to a window.