Tuesday, December 31, 2019


The following implementations of createEvent and getEvent indicate that the Pravega data store as a stream store can allow methods as described by the above Flink iterations:
    @Override
    public CompletableFuture<Void> createEvent(final String  routingKey,
                                               final String scopeName,
                                               final String streamName,
                                               final String message) {

        CompletableFuture<Void> ack = CompletableFuture.completedFuture(null);
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchLatestRevision();
        revisedStreamClient.writeConditionally(r, message);
        return ack;
}

    @Override
    public CompletableFuture<String> getEvent(final String  routingKey,
                                              final String scopeName,
                                              final String streamName,
                                              final Long segmentNumber) {
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchOldestRevision();
        Segment s = r.getSegment();
        io.pravega.client.stream.Stream stream = s.getStream();
        StringBuffer sb = new StringBuffer();
        while (iter.hasNext()) {            Map.Entry<Revision, String> entry = iter.next();
            sb.append(entry.getValue());
            }
        CompletableFuture<String> ack = CompletableFuture.completedFuture(sb.toString());
        return ack;
    }

Which results in the following log output:

2019-12-31 03:51:00,674 22519 [grizzly-http-server-2] INFO i.p.c.s.i.RevisionedStreamClientImpl - Wrote from 0 to 20
2019-12-31 03:51:00,675 22520 [grizzly-http-server-2] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - revisioned client wrote to revision: project58/_MARKlogstream2/0.#epoch.0:0:0
2019-12-31 03:51:00,778 22623 [grizzly-http-server-3] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - fetchLatestRevision=project58/_MARKlogstream2/0.#epoch.0:20:0

Monday, December 30, 2019


Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to read the stream.
Flink program implement iterations using a step function and embed it into a special iteration operator. This operator comes in two forms : iterate and delta iterate. Both repeatedly invoke the step function on the current step until a termination condition is reached.
For example of Iteration:
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();
A delta iteration is created by calling the iterateDelta(DataSet, int, int) (or iterateDelta(DataSet, int, int[]) respectively for a work set or solution set. . The arguments are the initial delta set, the maximum number of iterations and the key positions. The results are available with  iteration.getWorkset() and iteration.getSolutionSet()

Sunday, December 29, 2019

When a Flink job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted stream.

The number of events written to a stream is generally not determined because it is boundless. However, the number of events in a window is also not available without counting
public class LogIncrementer implements MapFunction<String, Long> {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);
    private static Long counter = 0L;

    @Override
    public Long map(String record) throws Exception {
        logger.info("counter={}, record={}", counter, record);
        return counter + 1;
    }
}
Running the stream reader independent of the writer helps rule out any lags or delays between the two from the FlinkApplication scheduling. If the sealStream has been invoked, it is safe for the reader to read the stream. Some read and writes can be done as part of transactions. The checkpointing of state allows consistency in writing and reading the streams. If the reader can read one event from a stream after the writer has written to it, then it would be sufficient to show that the events are accessible to the applications reading the stream.  the match between the writers and readers on the event count is seldom necessary and can be taken for granted after the writers are gone. Any number of readers can be used to

Saturday, December 28, 2019

iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams
IterativeStream it = eventsRead.iterate();
It.withFeedbackType(String. Class) ;
DataStream mapped =it.map( t - > { logger. Info(t) ; return t;}) ;
It.closeWith(mapped);
When a job is performed in detached mode, the job execution result is not available immediately. That result is only available when the Flink application program is run in blocking mode which is usually kit the case for streaming mode.
There are ways to sleep between reads and writes but the scheduling of the job occurs when the execute is called.  This sometimes makes it harder for the program to be debugged via the application logs but the jobManager has up to date logs.

Whenever there is a read/write issue, such as verifying the data written and read, it is better to seal the stream to allow the system to finish all the bookkeeping. This results in the events to show up in the persisted

Friday, December 27, 2019

Small improvements matter when the volume of events is high such as when the stream store is connected to data pipelines or IoT traffic. The analytical processing may also perform repeated iterations of the events which will also be improved if each and every event processing is improved.
Flink Streaming jobs run for a long time because the stream has no bounds. The program that invokes the env.execute() is kicked off in detached mode. The other mode of execution is blocking mode and it does not apply to StreamExecutionEnvironment but only to LocalExecutionEnvironment. The job itself will either appear with status as started on success or appear as error on failure. The logs for the job execution will only be partial because the foreground disappears after making an asynchronous call.
The logs for the background will show all the activities performed after the invocation.
There are also a few ways to gather some counts programmatically. These include:
eventsRead.addSink( new SinkFunction<String> () {
Private int count;
@Override
Public void invoke(String value) throws Exception {
             count++;
             logger.error(“count = {}, valueRead ={}”, count, value) ;
           }
}) ;

And the other is with using iterative streams

Thursday, December 26, 2019


The use of defaultCredentials in stream store:
Most open source code that use some sort of authentication require the use of a built-in credential so that these packages can be run right out of the box. For example, we have keycloak as an open source authentication and authorization framework with federation and brokerage capabilities which can be downloaded and run with a mentioned username and password and no customizations. A stream store like Pravega can also be downloaded and run with little or no changes as a standalone application and with an out of box username and password that has administrator privileges.
When these packages are included within another product, these hardcoded credentials are allowed to be changed per deployment with the help of configuration that gets relayed to these packages. The use of a credential different hardens the product containing these packages in mission critical deployments. 
This administrator credential works independent of any integrations provided through the product in which these open source packages are used. Even when the password changes per deployment, it is still good for administrative usage regardless of what credentials the user may have or the role with which the user may be accessing the resources of the package.
The difficulty in guessing the password does not take away the possibilities with the use of the password in both the standalone and integrated deployments of the package. This provides an alternative for the users to rule out any issues concerning the privilege with which their actions are invoked if the privileges are those corresponding to the administrator credential as opposed to that of the user.
For example, we have
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials("well-known-password", "well-known-username"));
and this allows the user to bypass any constraints associated with their credentials. Neither the system nor the user interface has any way of corroborating that the credential supplied is indeed coming from the user to whom it belongs. The purpose of this article is to suggest the use of these credentials only as a last resort for troubleshooting purposes with an explanation of how and why the technique works.
 Finally, the use of built-in credentials cannot work across integrations unless the product as a whole integrates the use of administrative activities with those of the packages used within the product.


Wednesday, December 25, 2019

# basic write and read logic

public class BasicWriterReaderApp {
    private static final Logger logger = LoggerFactory.getLogger(BasicWriterReaderApp.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        String scope = Constants.DEFAULT_SCOPE;
        String streamName = Constants.DEFAULT_STREAM_NAME;
        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORD, Constants.DEFAULT_USERNAME));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
                .build();

        logger.info("001- creating stream");
        Stream stream = pravegaConfig.resolve(streamName);

        logger.info("002- adding data");
        List<String> snippets = new ArrayList<>();
        snippets.add("2019-12-23 19:40:23,909 ERROR Line1");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)");
        snippets.add("\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)");
        snippets.add("2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.");
        DataStream<String> input = env.fromCollection(snippets);
        input.print();

        logger.info("003- iterate over data");
        IterativeStream<String> iteration = input.iterate();
        iteration.withFeedbackType(String.class);
        List<String> entries = new ArrayList<>();
        DataStream<String> mapped = iteration.map(t -> {entries.add(t); return t;});
        for (String entry: entries) {
             logger.info("entry={}", entry);
        }
        logger.info("Number_of_elements={}", String.valueOf(entries.size()));
        iteration.closeWith(mapped);

        logger.info("004 - creating a writer to write to stream");
        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
                .withEventRouter(new PravegaEventRouter<String >() {
                    @Override
                    public String getRoutingKey(String e) {
                        return e;
                    }
                })
            .withSerializationSchema(PravegaSerialization.serializationFor(String.class));
        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build();
        input.addSink(flinkPravegaWriter);
     
        java.lang.Thread.sleep(5000);
        logger.info("005 - creating a reader to read from stream");
        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        logger.info("006 - reading events from stream");
        DataStream<String> eventsRead = env
                    .addSource(flinkPravegaReader)
                    .name("eventsRead");
        IterativeStream<String> it = eventsRead.iterate();
        List<String> dataList = new ArrayList<>();
        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;});
        logger.info("count of events = {}", dataList.size());
        it.closeWith(newEvents);

        logger.info("007- done");
        env.execute("Stream Writer");
    }

}

Tuesday, December 24, 2019




Encapsulating logic for queries on logs:
DevOps engineers have to query logs on a day to day basis. Sometimes the queries are adhoc and at other times they are specific to the domain and can be used across incidents.  In both cases, they have very little time to struggle with the tools for querying the logs. Architects and system engineers who organize logs realize this and have favored the baseline use case of storing logs in filesystem and their search using shell commands and scripts. Every other log management solution can use these files on disk as input in order to facilitate enhanced log analysis experience from a user-interface. Files and folders for logs come with universal acceptance. Logs remain on file system only for limited duration and then they are rotated and periodically archived and finally aged and even removed.
Log files are therefore hard to beat. From single line shell commands to popular dscripts on logs, queries can be written, shared and curated. Sophisticated systems may allow extract-transform-load operations to import the logs into databases, object stores, stream stores or any other time-series products. But queries then become specific to the storage. For example, stream stores may require a library of jars each encapsulating specific logic bound to specific streams or dedicated to a form of query. With the exception of SQL, there is no universal query language that can span hybrid storage. These queries live within the database and there is no arguing that transferring large amounts of data to database is also costly and not maintenance-free.
The logic for querying logs are written usually in two layers – a low level primitive layers and a higher-level composites. Very rarely do we see joins or relations between logs. Instead pipelining of operators take precedence over the correlation of data because the stages of extracting from source, transforming, putting into sink and utilizing by-products of the transformation for subsequent storage and analysis follow a data flow model.
Indeed, a data driven approach of log analysis is not usually a concern as most users are willing to search all the logs if it weren’t so time consuming. What they really want is the ease of writing and refining queries because the curated library does not always eradicate the need for adhoc queries. In such cases, the library of existing code/script is merely a starting point for convenience which can then be edited for the current task.
Naming convention is also important. Most dev-ops engineers want to label their queries based on the defects or tickets that they are tracking for the case at hand. However, the name of the query must reflect its purpose to be useful and this remains the case whether the query is a sql script or a shell script or a java jar file. Sometimes, the library may provide storing the queries can provide ways to annotate metadata on the existing collection. They may also allow source and sink information on which the queries are best suited. However, adding this information as multi-part names is probably the easiest organization of all.
There is also no limit to the authors of the queries or their teams. If anything, an inventory may be persisted separate from the queries itself as a way to organize the library of queries.
These summarize some of the concerns regarding the organization of queries for logs.


Monday, December 23, 2019

Sample program to extract Exception from logs as stream


public class LogExceptionExtractor {
    private static final Logger logger = LoggerFactory.getLogger(LogExceptionExtractor.class);

    public static void main(String argv[]) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(argv);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        DataSet<String> text;
PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv));
        StreamConfiguration streamConfig = StreamConfiguration.builder()
            .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS))
            .build();

logger.info("001- creating stream");
     DataStream<LogException> exceptionDataStream = env.readTextFile("exceptions.log")
                .flatMap(new ExceptionTimestampExtractor())
                .keyBy(0)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
                .allowedLateness(Time.seconds(4))
                .process(new ProcessWindowFunction<Tuple2<String, String>, LogException, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<String, String>> iterable, Collector<LogException> collector) throws Exception {
                        if (getRuntimeContext() == null) {
                            setRuntimeContext((RuntimeContext) new LogException());
                        }
                        LogException logException = (LogException) getRuntimeContext();
                        for(Tuple2<String, String> item : iterable) {
                            logException.setTimestamp(item._1);
                            logException.getData().add(item._2);
                        }
                        collector.collect(logException);
                    }
                });

        FlinkPravegaWriter.Builder<LogException> builder = FlinkPravegaWriter.<LogException>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream((Stream) exceptionDataStream)
            .withEventRouter(new LogExceptionRouter())
            .withSerializationSchema(PravegaSerialization.serializationFor(LogException.class));


        builder.enableWatermark(true);
        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE);
        FlinkPravegaWriter<LogException> flinkPravegaWriter = builder.build();
        exceptionDataStream.addSink(flinkPravegaWriter);
        env.execute("Stream Writer");
    }

}
The above method works partly due to the parallelization based on key

Sunday, December 22, 2019


In this article, we continue with the discussion on stream processing with Flink and look into the use of windows. An infinite stream can only be processed if it were split into finite size. A window refers to this split and can have a size dependent on one or the other factors. A programmer generally refers to the use of windows with the help of syntax to define the split. In the case of keyed streams, she uses “keyBy” operator followed by “window” and in the case of non-keyed streams, she uses “windowAll”
The keyed stream helps with parallelizing the stream tasks.
There are four types of windows:
1)      Tumbling window: A tumbling window assigns each element to a window of a specified window size as with the code shown
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
2)      Sliding window: this window allows overlapping with other windows even though the size of the window is fixed as per the previous definition. An additional window slide parameter dictates how often the next window comes up.
3)      Session window: this window assigns elements by group elements by activity. The window does not have a fixed size and do not overlap The number of elements within a window can vary and the gap between the windows is called session gap.
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
    // determine and return session gap
}))
4)      Global window: This is a singleton spanning all events with the same key into the same window. Since it does not have a beginning or an end, no processing can be done unless there is a trigger specified. A trigger determines when a window is ready.
The computation on a window is performed with the help of a window function which can be one of ReduceFunction, AggregateFunction, FoldFunction, or ProcessingWindowFunction. Flink incrementally aggregates the elements for each window as they arrive for the first two window functions. The ProcessingWindowFunction requires buffering but it returns an iterator with metadata about the window. A fold function determines how each input value affects the output value and is called once for each element as it is added to the window.
The context object holds the state which are of two types:
1)      globalState()
2)      windowState()
Both allow access to the keyed state but the former is not scoped to a window while the latter is scoped to a window.

Saturday, December 21, 2019

Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination
When we mentioned the parallelism, we brought up keyBy operator. Let us look at the details of this method. It partitions the stream on the keyBy attribute and the windows are computed per key. A window operates on a finite set of elements a tumbling window has fixed boundaries.
For example,
stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

 empty window does not trigger a computation

The keyBy is called for keyed streams and the windowAll is called for non-keyed streams
The notion behind keyed streams is that all of the computations of a windowed stream can be executed in parallel. Each logical keyed stream is independent. Elements with the same key goto the same task.

Friday, December 20, 2019


The repackaging of source code and its unintended consequences:
This article is written with the findings from an investigation that I participated in and is still ongoing. It does not explain everything. The investigation has not concluded yet because the root cause has not been found yet. I have not heard any other similar case so far and I have had difficulty in outlining the defect but the learnings from the efforts stand clear.
A specific java code was packaged as a jar because it had a very narrow purpose and needed to be reused with applications from different sources.  This jar happened to be a fat jar which is a term to say that it contained all the dependencies needed to run its code. The jar worked well whenever the applications were run. However, it was inconvenient to copy it to each repository manually. The time-tested way of bringing in jars like these has been to use one or the other distribution repository.
The code was therefore forked which is a term used to denote another location where a copy of the code would be modified. The changes could include a rewrite and some trimmings so that the newer code was lean and mean.  The forked code would then be uploaded to a repository and consumers could then refer to it by name in the dependency and it would all work well with the elimination of the unnecessary step to bring it in any other way.
It turned out that this effort did not really work the way as intended. The reason for the failure is still under investigation. The code was supposed to be the same and it was reviewed to ensure that no defects were introduced. It had test cases that seemed to work. Yet the code was not working for users.  If all the factors that played out with the previous code were determined, the current effort would have panned out. This was the key learning and is often referred to in the industry as regression tests. These tests are written with the goal of keeping the behavior of the current code the same as that of the previous irrespective of the changes made to the current. Somewhere, there was a regression introduced and it was manifesting as a failure.
The nature of the software development is that it is always focused on functionality. However, it is the value that all the assets that guarantee the non-functional aspects of the code that determine how easy it is to use the code. The functional and non-functional aspects are both needed from any implementation but while the former has the bulk of the emphasis on the maker, it is the latter that determines the appreciation.

Thursday, December 19, 2019

The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.
There are two types of forwarding patterns between operators that help with parallelization one is one to one and another is one to many. The latter is the one that helps with multiplexing and is supported by streams that redistribute and are hence called redistributing streams. These streams use keyBy and map methods on the stream. The first method is a way for the incoming stream to be repartitioned into many outgoing streams and hence the term redistribution. When a stream is split into outgoing keyBy partitions, the events in the stream cannot be guaranteed to be in the same order as they arrived unless we are looking at events between the sender and one receiver. The events across receivers can be in any order depending on how the events were partitioned and how fast they were consumed by downstream operators. The second method is about transformation of a stream where the events get converted to other events that are easier to handle.
Within the interaction between a sender and receiver on a single stream, the events may be infinite. That is why a window is used to bound the events that will be considered in one set. This window can be of different types where the events overlap as in a sliding window or they don’t as in a tumbling wimdow. Windows can also span over periods of inactivity.
Operations that remember information across events are said to be Stateful. The state can only be maintained when events have keys associated with them. Let’s recall the keyBy operator that was used for repartitioning. This operator adds jets to events. The state is always kept together with the stream as if in an embedded key-value store. That is why the streams have to be keyed.
Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination

Wednesday, December 18, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink. 
For example, we have methods on StreamExecutionEnvironment such as 
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool. 
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream. 
For example, 
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.

We reviewed Flink’s DataStream and DataSet Api for its distinction of bounded and unbounded data.
Let us now take a look at the parallelizatio possibilities with Flink.
The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.

Tuesday, December 17, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink.
For example, we have methods on StreamExecutionEnvironment such as
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool.
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream.
For example,
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.

Monday, December 16, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.
Implementation is at http://github.com/ravibeta/pravega

Sunday, December 15, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.

Saturday, December 14, 2019

We discussed event time, we now discuss state. State is useful to resume task. Task can be done in various scopes and levels. There is an operator state bound to one operator instance. If there are multiple parallel instances, each instance maintains this state.
Keyed state is an Operator state that is bound to a tupe of <operator, key> where the key corresponds to a partition.  There is a state partition per key.
Keyed state comprises of key groups which are distribution units so that they can be transferred during parallelism. Each parallel instance works with the keys of one or more key groups.
State regardless of keyed state or operator state exists either as managed or raw.
Managed state usually is stored in internal hash tables or Rocks DB.  The state is encoded and written into the checkpoints.
Raw state is never shared outside the operator. When it is checkpointed, the raw state is dumped as byte range.  The Flink runtime knowns nothing about these raw bytes.
DataStream functions use managed state.  The serialization should be standard otherwise it may break between versions of Flink runtime
A program makes uses of state with methods such as
ValueState<T> which keeps a value that can be updated or retrieved
ListState<T> which keeps an iterable over state elements
ReducingState<T> which keeps a single value aggregated from the state
AggregatingState<T> which also keeps a single value aggregated from the state but not necessarily of the same type
FoldingState<T> which is similar to the above but makes use of a folding function
MapState<UK, UV> which keeps a list of mappings
Since state is stored externally, it has a handle often reffered to as the StateDescriptor similar to the notion of a filedescriptor
State is accessed only via the runtimecontext.
For example:
Sum = getRuntimeContext().getState(descriptor);

Also, every state has a time to live also called TTL.  State has to be ‘cleaned up’.

Friday, December 13, 2019

Events Records
Kubernetes is a container orchestration framework that hosts applications. The events from the applications are saved globally in the cluster by the kube-apiserver. Hosts are nodes of a cluster and can be scaled out to as many as required for the application workload. Events are different from logs. A forwarding agent forwards the logs from the container to a sink. A sink is a destination where all the logs forwarded can be redirected to a log service that can better handle the storage and analysis of the logs. The sink is usually external to the cluster so it can scale independently and may be configured to receive via a well-known protocol called syslog. The sink controller forwards cluster level data such as cpu, memory usages from nodes and may include cluster logs from api-server on the master node.  A continuously running process forwards the logs from the container to the node before the logs drain to the sink via the forwarding agent. This process is specified with a syntax referred to as a Daemonset which enables the same process to be started on a set of nodes which is usually all the nodes in the clusters. There is no audit event sink. Events are received by webhooks.
Kubernetes provides a security relevant chronological set of records documenting the sequence of activities that have affected the system by individual users and actions taken on their behalf by the system in these events.
Web application developers often record and restrict their logs of the flow of control messages to their database or choice storage. That is where they have kept their business objects and their master data. It seems convenient for them to persist the exceptions as well as the log messages in their choice store too although the norm is to keep the log separate from data. They save themselves the hassle of having to look somewhere else and with a different methodology. But is this really required?  Data in a database is the bread and butter for the business. Every change to the data be it addition, modification or deletion represents something that business understands and is usually billable. The state of the data reflects the standing of the business. Peoples’ name and credit cards are maintained in the database. And if this make is clear that something as dynamic as the activities of the system maintained in a log during the interactions with a user are not the same as the data, then it can be argued that they need not end up in the same place. But it's not just a database and a log file that we are comparing. A database comes with ACID guarantees and locking that a log file cannot provide.  In fact, we do see various other usages of the logs. Logs could flow to a file, an index, an emailing and alert system, and a database individually or all at once. Let us just take an example of comparing the option to save in a SQL database with an alternative to index the logs as events where key values are extracted and maintained for searches later on. Events are time-series inputs to a key value index. This association between the activities and a timestamp is helpful in determining and reconstructing chain of events that may be meaningful in such things as diagnosis, forensics and even analysis for charts and post-mortem actions. The store for such events could be a NoSQL database which has a methodology that would be different from the traditional SQL database. While we don't want to play into the differences between SQL and NoSQL for such a thing as logs because it can land up in both places either directly or after processing, we do want to realize that the workflow associated with logs are indeed different and more versatile than the CRUD activity of data. Logs for instance have enabled to move or copy data from one place to another without having to interrupt or lock the source. For example, we can ship the logs of one system to another where they can play back the entries to reconstruct the data to the same state as the original. Since the records are time series, we only need to ship what has transpired between last and current.  Alternatively, for every change to the original, we could log to the remote copy also. Both methods can be combined too and these are not exclusive. The scope of the changes to be affected in mirroring can be set on a container by container basis such as a database. In the case of the log shipping, the restore operation has to be completed on each copy of the log. It's important to recognize the difference in a log from a web application to the difference in the log of the database changes. The former is about the logic which may span both the web tier and the database. The latter is about changes to the data only without any regard for how it was affected. In other words, the log of a database system is about the changes to the state of the databases. What we are saying here is that the logs in the latter case are much more important and relevant and different from talking about the web logs or web access logs. That said, the flow of control and the exceptions they encounter or their successes can over time also provide useful information. What works for the logs of a database also can be put to use for that of the applications should there be a requirement for it. These application level activities and Logic can remain private while their visibility to system allows their observation and recording globally as events.
Kubernetes provides Webhooks as a way to interact with all system generated events. This is the equivalent of http handlers and modules in ASP. Net in terms of functionality to intercept and change requests and Responses. The webhooks however are an opportunity to work on System generated resources such as pod creation requests and so on.
There are two stages where webhooks can be run. They are correspondingly named as mutating or validating webhooks. The first is an opportunity to change the requests on Kubernetes core V1 resources. The second is an opportunity to add validations to requests and Responses.
Since these spans a lot of system calls the webhooks are invoked frequently. Therefore, they must be selective in the requests they modify to exclude the possibility that they touch requests that were not intended.
In addition to selectors, the power of webhooks is best demonstrated when they select all requests of a particular type to modify. For example, this is an opportunity for security to raise the baseline by allowing or denying all resources of a particular kind. The execution of privileged pods may be disabled in the cluster with the help of webhooks.
The webhooks are light to run and serve similar to nginx http parameter modifiers. A number of them may be allowed to run
#codingexercise
    private static class EventFilter implements FilterFunction<String> {
        @Override
        public boolean filter(String line) throws Exception {
               return !line.contains("NonEvent"); 
        }
    }

Thursday, December 12, 2019


#problemstatement:
Find streams that were read from using audit.log between a particular timewindow say ‘2019-12-05T15:25:19.977991Z’ < t < ‘2019-12-11T15:25:19.977991Z’ to know which stream was most vulnerable.
#FlinkApplication solution:
1)      Compile an FlinkApplication as follows:
StreamExecutionEnvironment env = new StreamExecutionEnvironment()
DataStream<Events> lines = env.readFromFile(‘</path/to/audit.log>’) / / mount file as K8s generic secret
DataStream<Events> events = lines.map((line) -> parse(line)).types(String.class, String.class, Long.class, Double.class);
DataStream<Statistics> stats = events.
      .keyBy(“stageTimestamp”)
      .filter(x => x.stageTimestamp > ‘2019-12-05T15:25:19.977991Z’ && x.stageTimestamp <= ‘2019-12-11T15:25:19.977991Z’)
      .groupBy(“stream”)
      .reduceGroup(new GroupReduceFunction<Tuple4<String, String, Long, Double>,
                                                                                    Tuple4<String, String, Long, Double>>() {
             @Override
              Public void reduce(Iterable<Tuple4<String, String, Long, Double>> values,
                                                               Collector<Tuple4<String, String, Long, Double>> out) throws Exception {
                                String stageTimestamp = null;
                                String streamName = null;
                                Long Count = 1;
                                Double Size = 0;
                             For(Tuple4<String, String, Long, Double> value: iterable) {
                                    stageTimestamp = value.f0;
                                    streamName = value.f1;
                                    Count++;
                                    Size += value.f3 != null ? value.f3 : 0;
                               }
                               If (count> 50) {
                                                Collector.collect(new Tuple4<>(stageTimestamp, streamName, count, size));
                                }
                         }
                   })
                 .partitionCustom(new Partitioner<Double>) {
                        @Override
                         Public int partition(Double key, int numPartition) {
                                     Return key.intValue() – 1;
                         }
                     }, 1);
                   }

      .print();

Step 2) Deploy the jar 

Wednesday, December 11, 2019

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
The assignment of timestamps and watermarks is immediately after the data source, but is not strictly required. We can parse and filter before the timestamp assigner.
The parse function is done with the help of MapFunction. This follows a syntax of
DataSet<Y> result = input.map(new MyMapFunction());
The filter function is done with the help of
DataSet<X> result = input.filter(new MyFilterFunction());
Timestamp assigners take a stream and produce a new version with the overwrites in case the original stream had these timestamps and watermarks. As long as the assignment happens prior to the first operation on the event time, the orderly processing of the events remains the same as if the assignment happened at the source. In some special cases, the assignment may even be possible at the source.
The order of the events within a window follows the order of the timestamp and watermarks. However, there can be elements that violate the watermarks. A violation happens when a watermark has been set but the event encountered has a timestamp that precedes it. The difference in time between the newly arrived event and the watermark is called the lateness of the event. This might occur due to many reasons such as when the watermark is placed due to a large number of events that arrive together and the watermark is placed to help with the skipping off reprocessing these events. Even when the window is large, watermarks may be placed to indicate progress based on the number of events processed. In fact, in real world conditions, lateness of events is noticeable and often accommodated for in terms of processing. Lateness is also one of the reasons for the setting the preference of time-based processing type in the stream execution environment. If we use a key to the events and it has to be a timestamp, we have to keep the notion of the timestamp the same across processors and consistent with the way the event processing works. Message queues do not suffer from this problem because they don’t necessarily have to enforce order. Sometimes even poisoning the message or putting it in dead letter queue is sufficient.


Tuesday, December 10, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to

: processing time, event time and ingestion time.

Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.

Event time also support watermarks

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.



stream



    .keyBy( (event) -> event.getUser() )



    .timeWindow(Time.hours(1))



    .reduce( (a, b) -> a.add(b) )



    .addSink(...);


Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.
When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
Watermarks have an interesting side effect in Flink. If the event is processed as ‘failing’, it is not in failed state. A failed state is a terminal state and consequently not restarted in processing. However, a failing event can  be resubmitted and this causes the event to re enter the failing state endlessly.

Monday, December 9, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to
: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

Sunday, December 8, 2019

Flink Applications brings the best of historical data processing to time-series data since it follows a streaming model. This applies even to metrics not just logs. There can even be better monitoring and alerting mechanisms applied to streams given the datastream Apis of Flink. The Table Apis and SQL abstraction of Flink provides even more convenience to users and these can support the use of existing reporting mechanisms. SQL has been the language of queries and tables have been the means of storage for a while. The existing solutions, stacks and technology built on top of these existing queries hold a lot of business value. As long as they do not get regressions and can leverage the additional benefits of stream processing which was not possible earlier, they will get even more acceptance.
Let us now take a few examples of historical data processing with Flink APIs:
Please note that the historical data uses the notion of Event time which is the time at which the records were processed.  The stream processing might make use of a different timestamp which is the referred to as the processing time. This is the time of the local clock on the host where the stream is processed. An hourly processing time window will include all records from that hour as determined by the system clock. The processing time does not require coordination between streams and machines. Therefore it has the best performance and lowest latency but is prone to delays and outages. This is mitigated by the use of Event timestamps which are global in nature and work even in distributed systems with completely consistent and deterministic results.
The time characteristic can be set before the evaluation of queries:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Once we instantiate the stream from a data source, we can then execute a query:
stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. 

Saturday, December 7, 2019

Sample code signing with Microsoft tools:
Step 1. Install Microsoft Windows SDK suitable to the desktop Windows version.
Step 2. Check that the following two files exists:
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\makecert.exe
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\pvk2pfx.exe
Step 3. MakeCert /n "CN=My Company Name, O=My Company, C=US" /r /h 0 /eku "1.3.6.1.5.5.7.3.3,1.3.6.1.4.1.311.10.3.13" /e 12/30/2025 /sv \EMCCerts\testcert.out \EMCCerts\testcert.cer
Succeeded
Step 4. Pvk2Pfx /pvk \EMCCerts\testcert.out  /pi <password> /spc \EMCCerts\testcert.cer  /pfx \EMCCerts\testcertNew.pfx /po <password>
Step 5. Certutil -addStore TrustedPeople \EMCCerts\testcert.cer
TrustedPeople "Trusted People"
Signature matches Public Key
Certificate "DellEMC Streaming Data Platform" added to store.
CertUtil: -addstore command completed successfully.
Step 6. signtool.exe sign /fd SHA256 /a /f \EMCCerts\testcertNew.pfx /p <password> \my-file-to-sign.zip
Step 7. makeappx.exe pack /f \unsigned\Appx.map /p \Signed\my-file-to-sign.zip
Microsoft (R) MakeAppx Tool
Copyright (C) 2013 Microsoft.  All rights reserved.

The path (/p) parameter is: "\\?\Signed\my-file-to-sign.appx"
The mapping file (/f) parameter is: "unsigned\Appx.map"
Reading mapping file "unsigned\Appx.map"
Packing 3 file(s) listed in "unsigned\Appx.map" (mapping file) to "\\?\Signed\my-file-to-sign.zip" (output file name).
Memory limit defaulting to 8529401856 bytes.
Using "unsigned\AppxManifest.xml" as the manifest for the package.
Processing "unsigned\my-file-to-sign.zip" as a payload file.  Its path in the package will be "my-file-to-sign.zip".
Processing "unsigned\AppTile.png" as a payload file.  Its path in the package will be "AppTile.png".
Package creation succeeded.

Sample Code signing with gpg tool:
Gpg –output doc.sig –sign doc
You need a passphrase to unlock the private key for
User: “Alice (Judge) alice@cyb.org”
1024-bit DSA key, ID BB7576AC, created 1999-06-04
Enter passphrase:

Friday, December 6, 2019

Signing files using private key
Signing is the process by which a digital signature is created from the file contents. The signature proves
that there was no tampering with the contents of the file. The signing itself does not need to encrypt the
file contents to generate the signature. In some cases, a detached signature may be stored as a separate
file. Others may choose to include the digital signature along with the set of files as an archive.
There are other processes as well that can help with checking the integrity of files. These include
hashing of files, generating a checksum and others. Signing differs from those methods in that it uses a
private-public key pair to compute the digital signature. The private key is used to sign a file while the
public key is used to verify the signature. The public key can be published with the signature or it can be
made available in ways that are well-known to the recipients of the signed files.
The process of signing can take any form of encryption methods. The stronger the encryption the better
the signature and lesser the chances that the file could have been tampered. The process of signing
varies across operating system.
Popular linux family hosts often use the ‘gpg’ tool to sign and verify the files. This tool even generates
the key-pair with which to sign the files. The resulting signature is in the Pretty Good Privacy protocol
format and stored as a file with extension .asc. Publishing the public key along with the detached
signature is a common practice for many distributions of code.
Microsoft uses separate tools for making the key-certificate pair and the generation of the signature.
The signer tool used by this company packs the payload into an altogether new file. The signature is part
of the new file and can function just like an archive. The same algorithm and strength can also be used
with the signer tool as it was with the gpg tool.
The use of certificates is popular because they can be shared easily and have no liability. The recipient
uses the certificate to verify the signature.
These are some of the means for signing and its use is widespread across use cases such as code
commits, blockchain and digital rights management.

Thursday, December 5, 2019

We were discussing Flink applications and the use of stream store such as Pravega

it is not appropriate to encapsulate an Flink connector within the http request handler for data ingestion at the store. This API is far more generic than the upstream software used to send the data because the consumer of this REST API could be the user interface, a language specific SDK, or shell scripts that want to make curl requests. It is better for the rest API implementation to directly accept the raw message along with the destination and authorization.
The policy for read operations should be such that they can be independent and scalable. A read only policy suffices for this purpose. The rest can be read-write.
The separation of read-write from read-only also helps with their treatment differently. For example, it is possible to replace the technology for the read-only separately from the technology for read-write. Even the technology for read-only can be swapped from one to another for improvements on this side.
An example of analysis on a read only path is the extraction of exception stack trace from logs:

The utility of Flink applications is unparalleled such as with a stackTrace hasher example to collect the top exceptions encountered from log archives:
private static class ExtractStackTraceHasher implements FlatMapFunction<String, String>{
                @Override
                public void flatMap(String value, Collector< String> out) throws Exception {
StringTokenizer tokenizer = new StringTokenizer(value);
While (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken();
If (word.contains(“Exception:”) {
int start = value.indexOf(word); // word has unique timestamp
int end = value.indexOf(word.substring(8), start+8);
if (end != -1 && start != -1 && end > start) {
String exceptionString = value.substring(start+11, end); // skip the timestamp
Throwable error = Throwable.parse(exceptionString);
var stackHasher = new net.logstash.logback.stacktrace.StackHasher();
out.collect(stackHasher.hexHash(error));
}
}
}
                }
        }

The results from the above iteration can be combined combined for each iteration of the files on the archive location.
public class TopStackTraceMerger implements ReduceFunction<map<String, int>> {
  @Override
  public Integer reduce(map<String, int> set1, map<String, int> set2) {
    return merge(set1, set2);
  }
}

Wednesday, December 4, 2019

We were discussing Flink applications and the use of stream store such as Pravega
it is not appropriate to encapsulate an Flink connector within the http request handler for data ingestion at the store. This API is far more generic than the upstream software used to send the data because the consumer of this REST API could be the user interface, a language specific SDK, or shell scripts that want to make curl requests. It is better for the rest API implementation to directly accept the raw message along with the destination and authorization.
There are two factors I want to discuss further  when comparing the analytical applications:
First, the use of Pravega as a stream store should not mandate the use of an Flink Application with Flink Connector. Data can be sent and read directly to and from the Pravega store respectively. However, the use of FlinkApplication is generally for performing transformations and queries which are both helpful when done in a cluster mode so that they can scale to large data sets.
Second data path is critical so it is not necessary to combine the collection and the analysis together.
Although most analysis works well only when certain collection happens, it is not necessary to make the collection heavy in terms of its processing. The collection is a data path and can almost always be streamlined.  The analysis can happen as the collection happens because it is stream processing. However, analysis does not have to happen within the collection. It can execute separately and as long as there is a queue of events collected, analysis can begin with stream processing even for varying rates of ingestion and analysis because they are write and read paths respectively that are best kept separate.

Tuesday, December 3, 2019

The utility of Flink applications is unparalleled such as with a stackTrace hasher example to collect the top exceptions encountered from log archives:
private static class ExtractStackTraceHasher implements FlatMapFunction<String, String>{
                @Override
                public void flatMap(String value, Collector< String> out) throws Exception {
StringTokenizer tokenizer = new StringTokenizer(value);
While (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken();
If (word.contains(“Exception:”) {
int start = value.indexOf(word); // word has unique timestamp
int end = value.indexOf(word.substring(8), start+8);
if (end != -1 && start != -1 && end > start) {
String exceptionString = value.substring(start+11, end); // skip the timestamp
Throwable error = Throwable.parse(exceptionString);
var stackHasher = new net.logstash.logback.stacktrace.StackHasher();
out.collect(stackHasher.hexHash(error));
}
}
}
                }
        }

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
For (string path: pathsInExtractedLogArchive(archiveLocation)) {
env.readTextFile(path)
            .flatMap(new ExtractStackTraceHasher ())
            .keyBy(0)
            .sum(1)
            .print();
}