Monday, December 16, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.
Implementation is at http://github.com/ravibeta/pravega

Sunday, December 15, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.

Saturday, December 14, 2019

We discussed event time, we now discuss state. State is useful to resume task. Task can be done in various scopes and levels. There is an operator state bound to one operator instance. If there are multiple parallel instances, each instance maintains this state.
Keyed state is an Operator state that is bound to a tupe of <operator, key> where the key corresponds to a partition.  There is a state partition per key.
Keyed state comprises of key groups which are distribution units so that they can be transferred during parallelism. Each parallel instance works with the keys of one or more key groups.
State regardless of keyed state or operator state exists either as managed or raw.
Managed state usually is stored in internal hash tables or Rocks DB.  The state is encoded and written into the checkpoints.
Raw state is never shared outside the operator. When it is checkpointed, the raw state is dumped as byte range.  The Flink runtime knowns nothing about these raw bytes.
DataStream functions use managed state.  The serialization should be standard otherwise it may break between versions of Flink runtime
A program makes uses of state with methods such as
ValueState<T> which keeps a value that can be updated or retrieved
ListState<T> which keeps an iterable over state elements
ReducingState<T> which keeps a single value aggregated from the state
AggregatingState<T> which also keeps a single value aggregated from the state but not necessarily of the same type
FoldingState<T> which is similar to the above but makes use of a folding function
MapState<UK, UV> which keeps a list of mappings
Since state is stored externally, it has a handle often reffered to as the StateDescriptor similar to the notion of a filedescriptor
State is accessed only via the runtimecontext.
For example:
Sum = getRuntimeContext().getState(descriptor);

Also, every state has a time to live also called TTL.  State has to be ‘cleaned up’.

Friday, December 13, 2019

Events Records
Kubernetes is a container orchestration framework that hosts applications. The events from the applications are saved globally in the cluster by the kube-apiserver. Hosts are nodes of a cluster and can be scaled out to as many as required for the application workload. Events are different from logs. A forwarding agent forwards the logs from the container to a sink. A sink is a destination where all the logs forwarded can be redirected to a log service that can better handle the storage and analysis of the logs. The sink is usually external to the cluster so it can scale independently and may be configured to receive via a well-known protocol called syslog. The sink controller forwards cluster level data such as cpu, memory usages from nodes and may include cluster logs from api-server on the master node.  A continuously running process forwards the logs from the container to the node before the logs drain to the sink via the forwarding agent. This process is specified with a syntax referred to as a Daemonset which enables the same process to be started on a set of nodes which is usually all the nodes in the clusters. There is no audit event sink. Events are received by webhooks.
Kubernetes provides a security relevant chronological set of records documenting the sequence of activities that have affected the system by individual users and actions taken on their behalf by the system in these events.
Web application developers often record and restrict their logs of the flow of control messages to their database or choice storage. That is where they have kept their business objects and their master data. It seems convenient for them to persist the exceptions as well as the log messages in their choice store too although the norm is to keep the log separate from data. They save themselves the hassle of having to look somewhere else and with a different methodology. But is this really required?  Data in a database is the bread and butter for the business. Every change to the data be it addition, modification or deletion represents something that business understands and is usually billable. The state of the data reflects the standing of the business. Peoples’ name and credit cards are maintained in the database. And if this make is clear that something as dynamic as the activities of the system maintained in a log during the interactions with a user are not the same as the data, then it can be argued that they need not end up in the same place. But it's not just a database and a log file that we are comparing. A database comes with ACID guarantees and locking that a log file cannot provide.  In fact, we do see various other usages of the logs. Logs could flow to a file, an index, an emailing and alert system, and a database individually or all at once. Let us just take an example of comparing the option to save in a SQL database with an alternative to index the logs as events where key values are extracted and maintained for searches later on. Events are time-series inputs to a key value index. This association between the activities and a timestamp is helpful in determining and reconstructing chain of events that may be meaningful in such things as diagnosis, forensics and even analysis for charts and post-mortem actions. The store for such events could be a NoSQL database which has a methodology that would be different from the traditional SQL database. While we don't want to play into the differences between SQL and NoSQL for such a thing as logs because it can land up in both places either directly or after processing, we do want to realize that the workflow associated with logs are indeed different and more versatile than the CRUD activity of data. Logs for instance have enabled to move or copy data from one place to another without having to interrupt or lock the source. For example, we can ship the logs of one system to another where they can play back the entries to reconstruct the data to the same state as the original. Since the records are time series, we only need to ship what has transpired between last and current.  Alternatively, for every change to the original, we could log to the remote copy also. Both methods can be combined too and these are not exclusive. The scope of the changes to be affected in mirroring can be set on a container by container basis such as a database. In the case of the log shipping, the restore operation has to be completed on each copy of the log. It's important to recognize the difference in a log from a web application to the difference in the log of the database changes. The former is about the logic which may span both the web tier and the database. The latter is about changes to the data only without any regard for how it was affected. In other words, the log of a database system is about the changes to the state of the databases. What we are saying here is that the logs in the latter case are much more important and relevant and different from talking about the web logs or web access logs. That said, the flow of control and the exceptions they encounter or their successes can over time also provide useful information. What works for the logs of a database also can be put to use for that of the applications should there be a requirement for it. These application level activities and Logic can remain private while their visibility to system allows their observation and recording globally as events.
Kubernetes provides Webhooks as a way to interact with all system generated events. This is the equivalent of http handlers and modules in ASP. Net in terms of functionality to intercept and change requests and Responses. The webhooks however are an opportunity to work on System generated resources such as pod creation requests and so on.
There are two stages where webhooks can be run. They are correspondingly named as mutating or validating webhooks. The first is an opportunity to change the requests on Kubernetes core V1 resources. The second is an opportunity to add validations to requests and Responses.
Since these spans a lot of system calls the webhooks are invoked frequently. Therefore, they must be selective in the requests they modify to exclude the possibility that they touch requests that were not intended.
In addition to selectors, the power of webhooks is best demonstrated when they select all requests of a particular type to modify. For example, this is an opportunity for security to raise the baseline by allowing or denying all resources of a particular kind. The execution of privileged pods may be disabled in the cluster with the help of webhooks.
The webhooks are light to run and serve similar to nginx http parameter modifiers. A number of them may be allowed to run
#codingexercise
    private static class EventFilter implements FilterFunction<String> {
        @Override
        public boolean filter(String line) throws Exception {
               return !line.contains("NonEvent"); 
        }
    }

Thursday, December 12, 2019


#problemstatement:
Find streams that were read from using audit.log between a particular timewindow say ‘2019-12-05T15:25:19.977991Z’ < t < ‘2019-12-11T15:25:19.977991Z’ to know which stream was most vulnerable.
#FlinkApplication solution:
1)      Compile an FlinkApplication as follows:
StreamExecutionEnvironment env = new StreamExecutionEnvironment()
DataStream<Events> lines = env.readFromFile(‘</path/to/audit.log>’) / / mount file as K8s generic secret
DataStream<Events> events = lines.map((line) -> parse(line)).types(String.class, String.class, Long.class, Double.class);
DataStream<Statistics> stats = events.
      .keyBy(“stageTimestamp”)
      .filter(x => x.stageTimestamp > ‘2019-12-05T15:25:19.977991Z’ && x.stageTimestamp <= ‘2019-12-11T15:25:19.977991Z’)
      .groupBy(“stream”)
      .reduceGroup(new GroupReduceFunction<Tuple4<String, String, Long, Double>,
                                                                                    Tuple4<String, String, Long, Double>>() {
             @Override
              Public void reduce(Iterable<Tuple4<String, String, Long, Double>> values,
                                                               Collector<Tuple4<String, String, Long, Double>> out) throws Exception {
                                String stageTimestamp = null;
                                String streamName = null;
                                Long Count = 1;
                                Double Size = 0;
                             For(Tuple4<String, String, Long, Double> value: iterable) {
                                    stageTimestamp = value.f0;
                                    streamName = value.f1;
                                    Count++;
                                    Size += value.f3 != null ? value.f3 : 0;
                               }
                               If (count> 50) {
                                                Collector.collect(new Tuple4<>(stageTimestamp, streamName, count, size));
                                }
                         }
                   })
                 .partitionCustom(new Partitioner<Double>) {
                        @Override
                         Public int partition(Double key, int numPartition) {
                                     Return key.intValue() – 1;
                         }
                     }, 1);
                   }

      .print();

Step 2) Deploy the jar 

Wednesday, December 11, 2019

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
The assignment of timestamps and watermarks is immediately after the data source, but is not strictly required. We can parse and filter before the timestamp assigner.
The parse function is done with the help of MapFunction. This follows a syntax of
DataSet<Y> result = input.map(new MyMapFunction());
The filter function is done with the help of
DataSet<X> result = input.filter(new MyFilterFunction());
Timestamp assigners take a stream and produce a new version with the overwrites in case the original stream had these timestamps and watermarks. As long as the assignment happens prior to the first operation on the event time, the orderly processing of the events remains the same as if the assignment happened at the source. In some special cases, the assignment may even be possible at the source.
The order of the events within a window follows the order of the timestamp and watermarks. However, there can be elements that violate the watermarks. A violation happens when a watermark has been set but the event encountered has a timestamp that precedes it. The difference in time between the newly arrived event and the watermark is called the lateness of the event. This might occur due to many reasons such as when the watermark is placed due to a large number of events that arrive together and the watermark is placed to help with the skipping off reprocessing these events. Even when the window is large, watermarks may be placed to indicate progress based on the number of events processed. In fact, in real world conditions, lateness of events is noticeable and often accommodated for in terms of processing. Lateness is also one of the reasons for the setting the preference of time-based processing type in the stream execution environment. If we use a key to the events and it has to be a timestamp, we have to keep the notion of the timestamp the same across processors and consistent with the way the event processing works. Message queues do not suffer from this problem because they don’t necessarily have to enforce order. Sometimes even poisoning the message or putting it in dead letter queue is sufficient.


Tuesday, December 10, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to

: processing time, event time and ingestion time.

Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.

Event time also support watermarks

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.



stream



    .keyBy( (event) -> event.getUser() )



    .timeWindow(Time.hours(1))



    .reduce( (a, b) -> a.add(b) )



    .addSink(...);


Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.
When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
Watermarks have an interesting side effect in Flink. If the event is processed as ‘failing’, it is not in failed state. A failed state is a terminal state and consequently not restarted in processing. However, a failing event can  be resubmitted and this causes the event to re enter the failing state endlessly.