Saturday, January 4, 2020

Apache Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

The timer service is called for future event processing via registered callbacks. The onTimer method is invoked for this particular processing. Inside this method, states are scoped to the key for which the timer was created.

Joins are possible on low-level operations of two inputs. The ProcessFunction or the KeyedCoProcessFunction is a function that is bound to two different inputs and it gets individual calls to processElement1 and processElement2 for records for respective inputs. One of the inputs is processed first. It’s state is updated. Then the other input is processed. The earlier state is probed and the result is emitted. Since events may correspond to different times including delays, a watermark may be used as a reference and when it has elapsed, the operation may be performed.

The timers are fault-tolerant and checkpointed This helps with recovery if the task gets paused. There does not need to be an ordered set of events with the logic above.

The Flink coalescing may maintain only one timer per key and timestamp. The timer resolution is in milliseconds but the runtime may not permit it at this granularity although the system clock can. Besides, the timer may work upto +/- one-time unit equal to resolution from the time specified. Timers can be coalesced between watermarks. However, there can be only one timer per key per resolution time-unit.

Friday, January 3, 2020


Flink Streaming jobs appear to run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.

The logs for the background will show all the activities performed after the invocation.

There are also a few ways to gather some counts programmatically. These include:

eventsRead.addSink( new SinkFunction<String> () {

Private int count;

@Override

Public void invoke(String value) throws Exception {

             count++;

             logger.error(“count = {}, valueRead ={}”, count, value) ;

           }

}) ;



And the other is with using iterative streams

IterativeStream it = eventsRead.iterate();

It.withFeedbackType(String. Class) ;

DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;

It.closeWith(mapped);

When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.

There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Flink Applications can use ProcessFunction with streams. It is similar to FlatMap Function but handles all three : events, state and timers. The function applies to each and every event in the input stream. It also gives access to the Flink keyed state via the runtime Context. The timer allows changes in event time and processing time to be handled by the application. Both the timestamps and timerservice are available via the context object. This is only possible with process function on a keyed stream.

Thursday, January 2, 2020

Exception Stream:
When a computer program has a fault, it generates an Exception with a stacktrace of all the methods leading up to the point of failure. Developers write code in a way that lets exceptions to be written out to logs for offline analysis by DevOps Engineers. This article tries to list the implementation considerations for an automated solution to find the top generators of exceptions on a continuous basis.
We list the high-level steps first and then document the implementation considerations specific to continuous automated reporting of top exception generators. These are:
1) parse the logs by age and extract the exception
2) generate hash and store exception stacktrace as well as its hash in a separate stream
3) run a program to continuously monitor the exceptions written to the stream for categorizing and creating a histogram
4) report the top generators of exceptions from the histogram at any point of time on demand.
The choice of a stream store for exceptions is unique to meeting the demands of continuous reporting.  Traditional solutions usually poll the exception histogram periodically to draw the bar chart for top generators of exceptions.
The number of generators of exceptions and the number of exceptions for any generator can be arbitrary.  It helps to store each exception one by one in the stream. The generation of the histogram and its persistence with the positional reference of the last accounted exception from the exception stack trace stream is optional. Having a separate stream for log entries, exception stack traces and histograms enables efficiency in computation without repetitions. Otherwise just the exception stack trace stream is sufficient in the stream store.
The natural choice for writing and reading the exception stacktraces stream is Apache Flink and the programs to read and write can be separate.
The writer of exception stacktraces to its stream has to parse the logs. Usually the logs are available on the filesystem and seldom in the stream store but it is advisable to propagate the log to its own stream store. The parsing of log entries for extracting exceptions and the hashing of exception stack trace is well-known with stackhasher programs. The generation of histogram is a straight forward Flink query. The persistence of the histogram and the position of the last read exception only reduces the work set for which the histogram needs to be improved. Regular state persistence in the Flink application code can flush the histogram to disk.
It may be argued that the histogram is a metric rather than the log entry or the exception stack trace. A suitable metric stack and an influxQL can suffice to do it. Even storing the histogram in an object store for universal web access, metrics and alerts may be an alternative. However, the convenience of updating the histogram at the point of reading the exception stack trace makes it easy to include it in the Flink Application.

Wednesday, January 1, 2020

A REST based data path to a stream store can work as a log sink in a PKS hosted and Kubernetes cluster deployed stream storage so that the data from the source can find its way to a sink with little or no intervention. Then the generation, collection, sink and analysis of the log entries follows a staged propagation in a pipeline model and makes the logs available for extract-transform-load, analysis and reporting solutions downstream.

The stages are:

Kube api-server is outside the Kubernetes cluster and any products hosted on Kubernetes. As an infrastructure it is well suited to turn on these collection items and determine their transmission techniques. The upshot is that we have a set of command line parameters as input and and data flow as output  

Transformation of data. This is a required step because this data is generally read only. Transformation means select, project, map, filter, reduce and other functionalities. Flink application can be leveraged for this purpose.

Sink of event where we leverage a data path directly to the stream store allowing all reporting stacks to read from the store instead of the source.

The logic for querying logs is written usually in two layers – a low level primitive layers and a higher-level composite. Very rarely do we see joins or relations between logs. Instead pipelining of operators take precedence over the correlation of data because the stages of extracting from source, transforming, putting into sink and utilizing by-products of the transformation for subsequent storage and analysis follow a data flow model.  


Indeed, a data driven approach of log analysis is not usually a concern as most users are willing to search all the logs if it weren’t so time consuming. What they really want is the ease of writing and refining queries because the curated library does not always eradicate the need for adhoc queries. In such cases, the library of existing code/script is merely a starting point for convenience which can then be edited for the current task.  

Tuesday, December 31, 2019


The following implementations of createEvent and getEvent indicate that the Pravega data store as a stream store can allow methods as described by the above Flink iterations:
    @Override
    public CompletableFuture<Void> createEvent(final String  routingKey,
                                               final String scopeName,
                                               final String streamName,
                                               final String message) {

        CompletableFuture<Void> ack = CompletableFuture.completedFuture(null);
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchLatestRevision();
        revisedStreamClient.writeConditionally(r, message);
        return ack;
}

    @Override
    public CompletableFuture<String> getEvent(final String  routingKey,
                                              final String scopeName,
                                              final String streamName,
                                              final Long segmentNumber) {
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchOldestRevision();
        Segment s = r.getSegment();
        io.pravega.client.stream.Stream stream = s.getStream();
        StringBuffer sb = new StringBuffer();
        while (iter.hasNext()) {            Map.Entry<Revision, String> entry = iter.next();
            sb.append(entry.getValue());
            }
        CompletableFuture<String> ack = CompletableFuture.completedFuture(sb.toString());
        return ack;
    }

Which results in the following log output:

2019-12-31 03:51:00,674 22519 [grizzly-http-server-2] INFO i.p.c.s.i.RevisionedStreamClientImpl - Wrote from 0 to 20
2019-12-31 03:51:00,675 22520 [grizzly-http-server-2] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - revisioned client wrote to revision: project58/_MARKlogstream2/0.#epoch.0:0:0
2019-12-31 03:51:00,778 22623 [grizzly-http-server-3] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - fetchLatestRevision=project58/_MARKlogstream2/0.#epoch.0:20:0

Monday, December 30, 2019


Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to read the stream.
Flink program implement iterations using a step function and embed it into a special iteration operator. This operator comes in two forms : iterate and delta iterate. Both repeatedly invoke the step function on the current step until a termination condition is reached.
For example of Iteration:
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();
A delta iteration is created by calling the iterateDelta(DataSet, int, int) (or iterateDelta(DataSet, int, int[]) respectively for a work set or solution set. . The arguments are the initial delta set, the maximum number of iterations and the key positions. The results are available with  iteration.getWorkset() and iteration.getSolutionSet()

Sunday, December 29, 2019

When a Flink job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted stream.

The number of events written to a stream is generally not determined because it is boundless. However, the number of events in a window is also not available without counting
public class LogIncrementer implements MapFunction<String, Long> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
    private static Long counter = 0L;

    @Override
    public Long map(String record) throws Exception {
        logger.info("counter={}, record={}", counter, record);
        return counter + 1;
    }
}
Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to