Saturday, December 14, 2019

We discussed event time, we now discuss state. State is useful to resume task. Task can be done in various scopes and levels. There is an operator state bound to one operator instance. If there are multiple parallel instances, each instance maintains this state.
Keyed state is an Operator state that is bound to a tupe of <operator, key> where the key corresponds to a partition.  There is a state partition per key.
Keyed state comprises of key groups which are distribution units so that they can be transferred during parallelism. Each parallel instance works with the keys of one or more key groups.
State regardless of keyed state or operator state exists either as managed or raw.
Managed state usually is stored in internal hash tables or Rocks DB.  The state is encoded and written into the checkpoints.
Raw state is never shared outside the operator. When it is checkpointed, the raw state is dumped as byte range.  The Flink runtime knowns nothing about these raw bytes.
DataStream functions use managed state.  The serialization should be standard otherwise it may break between versions of Flink runtime
A program makes uses of state with methods such as
ValueState<T> which keeps a value that can be updated or retrieved
ListState<T> which keeps an iterable over state elements
ReducingState<T> which keeps a single value aggregated from the state
AggregatingState<T> which also keeps a single value aggregated from the state but not necessarily of the same type
FoldingState<T> which is similar to the above but makes use of a folding function
MapState<UK, UV> which keeps a list of mappings
Since state is stored externally, it has a handle often reffered to as the StateDescriptor similar to the notion of a filedescriptor
State is accessed only via the runtimecontext.
For example:
Sum = getRuntimeContext().getState(descriptor);

Also, every state has a time to live also called TTL.  State has to be ‘cleaned up’.

Friday, December 13, 2019

Events Records
Kubernetes is a container orchestration framework that hosts applications. The events from the applications are saved globally in the cluster by the kube-apiserver. Hosts are nodes of a cluster and can be scaled out to as many as required for the application workload. Events are different from logs. A forwarding agent forwards the logs from the container to a sink. A sink is a destination where all the logs forwarded can be redirected to a log service that can better handle the storage and analysis of the logs. The sink is usually external to the cluster so it can scale independently and may be configured to receive via a well-known protocol called syslog. The sink controller forwards cluster level data such as cpu, memory usages from nodes and may include cluster logs from api-server on the master node.  A continuously running process forwards the logs from the container to the node before the logs drain to the sink via the forwarding agent. This process is specified with a syntax referred to as a Daemonset which enables the same process to be started on a set of nodes which is usually all the nodes in the clusters. There is no audit event sink. Events are received by webhooks.
Kubernetes provides a security relevant chronological set of records documenting the sequence of activities that have affected the system by individual users and actions taken on their behalf by the system in these events.
Web application developers often record and restrict their logs of the flow of control messages to their database or choice storage. That is where they have kept their business objects and their master data. It seems convenient for them to persist the exceptions as well as the log messages in their choice store too although the norm is to keep the log separate from data. They save themselves the hassle of having to look somewhere else and with a different methodology. But is this really required?  Data in a database is the bread and butter for the business. Every change to the data be it addition, modification or deletion represents something that business understands and is usually billable. The state of the data reflects the standing of the business. Peoples’ name and credit cards are maintained in the database. And if this make is clear that something as dynamic as the activities of the system maintained in a log during the interactions with a user are not the same as the data, then it can be argued that they need not end up in the same place. But it's not just a database and a log file that we are comparing. A database comes with ACID guarantees and locking that a log file cannot provide.  In fact, we do see various other usages of the logs. Logs could flow to a file, an index, an emailing and alert system, and a database individually or all at once. Let us just take an example of comparing the option to save in a SQL database with an alternative to index the logs as events where key values are extracted and maintained for searches later on. Events are time-series inputs to a key value index. This association between the activities and a timestamp is helpful in determining and reconstructing chain of events that may be meaningful in such things as diagnosis, forensics and even analysis for charts and post-mortem actions. The store for such events could be a NoSQL database which has a methodology that would be different from the traditional SQL database. While we don't want to play into the differences between SQL and NoSQL for such a thing as logs because it can land up in both places either directly or after processing, we do want to realize that the workflow associated with logs are indeed different and more versatile than the CRUD activity of data. Logs for instance have enabled to move or copy data from one place to another without having to interrupt or lock the source. For example, we can ship the logs of one system to another where they can play back the entries to reconstruct the data to the same state as the original. Since the records are time series, we only need to ship what has transpired between last and current.  Alternatively, for every change to the original, we could log to the remote copy also. Both methods can be combined too and these are not exclusive. The scope of the changes to be affected in mirroring can be set on a container by container basis such as a database. In the case of the log shipping, the restore operation has to be completed on each copy of the log. It's important to recognize the difference in a log from a web application to the difference in the log of the database changes. The former is about the logic which may span both the web tier and the database. The latter is about changes to the data only without any regard for how it was affected. In other words, the log of a database system is about the changes to the state of the databases. What we are saying here is that the logs in the latter case are much more important and relevant and different from talking about the web logs or web access logs. That said, the flow of control and the exceptions they encounter or their successes can over time also provide useful information. What works for the logs of a database also can be put to use for that of the applications should there be a requirement for it. These application level activities and Logic can remain private while their visibility to system allows their observation and recording globally as events.
Kubernetes provides Webhooks as a way to interact with all system generated events. This is the equivalent of http handlers and modules in ASP. Net in terms of functionality to intercept and change requests and Responses. The webhooks however are an opportunity to work on System generated resources such as pod creation requests and so on.
There are two stages where webhooks can be run. They are correspondingly named as mutating or validating webhooks. The first is an opportunity to change the requests on Kubernetes core V1 resources. The second is an opportunity to add validations to requests and Responses.
Since these spans a lot of system calls the webhooks are invoked frequently. Therefore, they must be selective in the requests they modify to exclude the possibility that they touch requests that were not intended.
In addition to selectors, the power of webhooks is best demonstrated when they select all requests of a particular type to modify. For example, this is an opportunity for security to raise the baseline by allowing or denying all resources of a particular kind. The execution of privileged pods may be disabled in the cluster with the help of webhooks.
The webhooks are light to run and serve similar to nginx http parameter modifiers. A number of them may be allowed to run
#codingexercise
    private static class EventFilter implements FilterFunction<String> {
        @Override
        public boolean filter(String line) throws Exception {
               return !line.contains("NonEvent"); 
        }
    }

Thursday, December 12, 2019


#problemstatement:
Find streams that were read from using audit.log between a particular timewindow say ‘2019-12-05T15:25:19.977991Z’ < t < ‘2019-12-11T15:25:19.977991Z’ to know which stream was most vulnerable.
#FlinkApplication solution:
1)      Compile an FlinkApplication as follows:
StreamExecutionEnvironment env = new StreamExecutionEnvironment()
DataStream<Events> lines = env.readFromFile(‘</path/to/audit.log>’) / / mount file as K8s generic secret
DataStream<Events> events = lines.map((line) -> parse(line)).types(String.class, String.class, Long.class, Double.class);
DataStream<Statistics> stats = events.
      .keyBy(“stageTimestamp”)
      .filter(x => x.stageTimestamp > ‘2019-12-05T15:25:19.977991Z’ && x.stageTimestamp <= ‘2019-12-11T15:25:19.977991Z’)
      .groupBy(“stream”)
      .reduceGroup(new GroupReduceFunction<Tuple4<String, String, Long, Double>,
                                                                                    Tuple4<String, String, Long, Double>>() {
             @Override
              Public void reduce(Iterable<Tuple4<String, String, Long, Double>> values,
                                                               Collector<Tuple4<String, String, Long, Double>> out) throws Exception {
                                String stageTimestamp = null;
                                String streamName = null;
                                Long Count = 1;
                                Double Size = 0;
                             For(Tuple4<String, String, Long, Double> value: iterable) {
                                    stageTimestamp = value.f0;
                                    streamName = value.f1;
                                    Count++;
                                    Size += value.f3 != null ? value.f3 : 0;
                               }
                               If (count> 50) {
                                                Collector.collect(new Tuple4<>(stageTimestamp, streamName, count, size));
                                }
                         }
                   })
                 .partitionCustom(new Partitioner<Double>) {
                        @Override
                         Public int partition(Double key, int numPartition) {
                                     Return key.intValue() – 1;
                         }
                     }, 1);
                   }

      .print();

Step 2) Deploy the jar 

Wednesday, December 11, 2019

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
The assignment of timestamps and watermarks is immediately after the data source, but is not strictly required. We can parse and filter before the timestamp assigner.
The parse function is done with the help of MapFunction. This follows a syntax of
DataSet<Y> result = input.map(new MyMapFunction());
The filter function is done with the help of
DataSet<X> result = input.filter(new MyFilterFunction());
Timestamp assigners take a stream and produce a new version with the overwrites in case the original stream had these timestamps and watermarks. As long as the assignment happens prior to the first operation on the event time, the orderly processing of the events remains the same as if the assignment happened at the source. In some special cases, the assignment may even be possible at the source.
The order of the events within a window follows the order of the timestamp and watermarks. However, there can be elements that violate the watermarks. A violation happens when a watermark has been set but the event encountered has a timestamp that precedes it. The difference in time between the newly arrived event and the watermark is called the lateness of the event. This might occur due to many reasons such as when the watermark is placed due to a large number of events that arrive together and the watermark is placed to help with the skipping off reprocessing these events. Even when the window is large, watermarks may be placed to indicate progress based on the number of events processed. In fact, in real world conditions, lateness of events is noticeable and often accommodated for in terms of processing. Lateness is also one of the reasons for the setting the preference of time-based processing type in the stream execution environment. If we use a key to the events and it has to be a timestamp, we have to keep the notion of the timestamp the same across processors and consistent with the way the event processing works. Message queues do not suffer from this problem because they don’t necessarily have to enforce order. Sometimes even poisoning the message or putting it in dead letter queue is sufficient.


Tuesday, December 10, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to

: processing time, event time and ingestion time.

Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.

Event time also support watermarks

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.



stream



    .keyBy( (event) -> event.getUser() )



    .timeWindow(Time.hours(1))



    .reduce( (a, b) -> a.add(b) )



    .addSink(...);


Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.
When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
Watermarks have an interesting side effect in Flink. If the event is processed as ‘failing’, it is not in failed state. A failed state is a terminal state and consequently not restarted in processing. However, a failing event can  be resubmitted and this causes the event to re enter the failing state endlessly.

Monday, December 9, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to
: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

Sunday, December 8, 2019

Flink Applications brings the best of historical data processing to time-series data since it follows a streaming model. This applies even to metrics not just logs. There can even be better monitoring and alerting mechanisms applied to streams given the datastream Apis of Flink. The Table Apis and SQL abstraction of Flink provides even more convenience to users and these can support the use of existing reporting mechanisms. SQL has been the language of queries and tables have been the means of storage for a while. The existing solutions, stacks and technology built on top of these existing queries hold a lot of business value. As long as they do not get regressions and can leverage the additional benefits of stream processing which was not possible earlier, they will get even more acceptance.
Let us now take a few examples of historical data processing with Flink APIs:
Please note that the historical data uses the notion of Event time which is the time at which the records were processed.  The stream processing might make use of a different timestamp which is the referred to as the processing time. This is the time of the local clock on the host where the stream is processed. An hourly processing time window will include all records from that hour as determined by the system clock. The processing time does not require coordination between streams and machines. Therefore it has the best performance and lowest latency but is prone to delays and outages. This is mitigated by the use of Event timestamps which are global in nature and work even in distributed systems with completely consistent and deterministic results.
The time characteristic can be set before the evaluation of queries:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Once we instantiate the stream from a data source, we can then execute a query:
stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.