Saturday, December 21, 2019

Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination
When we mentioned the parallelism, we brought up keyBy operator. Let us look at the details of this method. It partitions the stream on the keyBy attribute and the windows are computed per key. A window operates on a finite set of elements a tumbling window has fixed boundaries.
For example,
stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

 empty window does not trigger a computation

The keyBy is called for keyed streams and the windowAll is called for non-keyed streams
The notion behind keyed streams is that all of the computations of a windowed stream can be executed in parallel. Each logical keyed stream is independent. Elements with the same key goto the same task.

Friday, December 20, 2019


The repackaging of source code and its unintended consequences:
This article is written with the findings from an investigation that I participated in and is still ongoing. It does not explain everything. The investigation has not concluded yet because the root cause has not been found yet. I have not heard any other similar case so far and I have had difficulty in outlining the defect but the learnings from the efforts stand clear.
A specific java code was packaged as a jar because it had a very narrow purpose and needed to be reused with applications from different sources.  This jar happened to be a fat jar which is a term to say that it contained all the dependencies needed to run its code. The jar worked well whenever the applications were run. However, it was inconvenient to copy it to each repository manually. The time-tested way of bringing in jars like these has been to use one or the other distribution repository.
The code was therefore forked which is a term used to denote another location where a copy of the code would be modified. The changes could include a rewrite and some trimmings so that the newer code was lean and mean.  The forked code would then be uploaded to a repository and consumers could then refer to it by name in the dependency and it would all work well with the elimination of the unnecessary step to bring it in any other way.
It turned out that this effort did not really work the way as intended. The reason for the failure is still under investigation. The code was supposed to be the same and it was reviewed to ensure that no defects were introduced. It had test cases that seemed to work. Yet the code was not working for users.  If all the factors that played out with the previous code were determined, the current effort would have panned out. This was the key learning and is often referred to in the industry as regression tests. These tests are written with the goal of keeping the behavior of the current code the same as that of the previous irrespective of the changes made to the current. Somewhere, there was a regression introduced and it was manifesting as a failure.
The nature of the software development is that it is always focused on functionality. However, it is the value that all the assets that guarantee the non-functional aspects of the code that determine how easy it is to use the code. The functional and non-functional aspects are both needed from any implementation but while the former has the bulk of the emphasis on the maker, it is the latter that determines the appreciation.

Thursday, December 19, 2019

The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.
There are two types of forwarding patterns between operators that help with parallelization one is one to one and another is one to many. The latter is the one that helps with multiplexing and is supported by streams that redistribute and are hence called redistributing streams. These streams use keyBy and map methods on the stream. The first method is a way for the incoming stream to be repartitioned into many outgoing streams and hence the term redistribution. When a stream is split into outgoing keyBy partitions, the events in the stream cannot be guaranteed to be in the same order as they arrived unless we are looking at events between the sender and one receiver. The events across receivers can be in any order depending on how the events were partitioned and how fast they were consumed by downstream operators. The second method is about transformation of a stream where the events get converted to other events that are easier to handle.
Within the interaction between a sender and receiver on a single stream, the events may be infinite. That is why a window is used to bound the events that will be considered in one set. This window can be of different types where the events overlap as in a sliding window or they don’t as in a tumbling wimdow. Windows can also span over periods of inactivity.
Operations that remember information across events are said to be Stateful. The state can only be maintained when events have keys associated with them. Let’s recall the keyBy operator that was used for repartitioning. This operator adds jets to events. The state is always kept together with the stream as if in an embedded key-value store. That is why the streams have to be keyed.
Flink considers batch processing to be a special case of stream processing. A batch program is one that operates on a batch of events at a time. It differs from the above mentioned Stateful stream processing in that it uses data structures in memory rather than the embedded key store and therefore can export its state externally and does not require to be kept together with streams. Since the state can  be stored externally, the use if checkpoints mentioned earlier is avoided since their purpose was also similar to states. The iterations are slightly different from that in stream processing, because there is now a superstep level that might involve coordination

Wednesday, December 18, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink. 
For example, we have methods on StreamExecutionEnvironment such as 
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool. 
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream. 
For example, 
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.

We reviewed Flink’s DataStream and DataSet Api for its distinction of bounded and unbounded data.
Let us now take a look at the parallelizatio possibilities with Flink.
The Flink data flow model consists of the streams, transformation and sinks. An example of stream generation is the addSource method on the StreamExecutionEnvironment. An example for transformation is the map operator on the DataStream. An example of using a sink is the addSink method on the DataStream.
Each of these stages can partition the data to take advantage of parallelization. The addSource method may take a partition to begin the data flow on a smaller subset of the data. The transformation opeator may further partition the stream.  Some operators can even multiplex the partitions with keyBy operator and the sink may group the events together.

Tuesday, December 17, 2019

Flink provides two sets of APIs to work with Data. One is for unbounded data such as streams and provided by DataStream APIs. Another is for the bounded data and provided by DataSet API.
Since Java programmers like collections, the latter provides methods to work with collections as source and sink.
For example, we have methods on StreamExecutionEnvironment such as
DataSet<T> result = env.fromCollection(data) which returns a DataSet.
And we have methods to specify a collection data sink as follows:
List<T> dataList = new ArrayList<T>();
result.output(new CollectionDataFormat(dataList));
This collection data sink works well as a debugging tool.
Both the dataType and iterators must be serializable for the read from source and write to sink.
The DataStream API only has support for source from collections. If we need to write a stream to a sink we can access the console or file-system because that will most likely be for debugging. There is no direct conversion from stream to collection.
DataStream<T>  stream = env.fromCollection(data);
Iterator<T> it = DataStreamUtils.collect(stream); // expensive
There is however another method called addSink which can allow custom sinks.
For most purposes of a collection, a DataStream provides an iterate() method that returns an Iterative Stream.
For example,
IterativeStream<T> = stream.iterate()
DataStream<T> newStream = iteration.map(new MyMapFunction()); where the map is invoked inside the loop.
Finally, iteration.closeWith(…) to define the tail of the iteration.

Monday, December 16, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.
Implementation is at http://github.com/ravibeta/pravega

Sunday, December 15, 2019

Pravega serves as a stream store. Its control path is available at 9090 port in standalone mode with REST API. The data path is over Flink connector to segment store port 6000
The netty wire commands to segment store are best suited for FlinkPravegaReader and FlinkPravegaWriter as demonstrated in https://1drv.ms/w/s!Ashlm-Nw-wnWvEMNlrUUJvmgd5UY?e=bnWCdU
The REST API data path can make it simpler to send data to Pravega over HTTP. It just needs to translate a POST request data to a netty WireCommand or it could bridge http to netty over even higher level as shown in https://github.com/ravibeta/JavaSamples/tree/master/tcp-connector-pravega-workshop Generally lower levels are preferred internally for performance and efficiency.
At the minimum, there needs to be an HTTP Get and Post method corresponding to the read and write operation on the data path involving a stream.  The create,update and delete of the stream fall in the control path and are already provided as REST APIs by the metadata controller.
The Post method implementation for example may look like this:
@Override
public CompleteableFuture<Void> createEvent(String scopeName, String streamName, String message) {
final ClientFactoryimpl clientFactory = new ClientFactoryImpl(scopeName, this);
final Serializer<String> serializer = new JavaSerializer<>();
final Random random = new Random();
final Supplier<String> keyGenerator = () -> String.valueOf(random.nextInt());
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName, serializer, EventWriterConfig.builder().build());
return writer.writeEvent(keyGenerator.get().message);
}
The Get method may similarly utilize a suitable stream event reader.
We will talk about lower-level bridging of HTTP requests with Pravega wire protocol next assuming that the Get and Post method implementations may be wired up to the service and exposed via the “/v1/events” @Path annotation. Yes, and now returning to the wire protocol, Pravega has its own and the post request data may directly translate to the self-contained messages in the wire protocol or it could choose to make use of request headers for the wire protocol fields and the use of entire request data as the payload. There will be two headers corresponding to the message type and length in the wire protocol
Therefore, we not only see a need for using request headers and parameters to suitably capture all the fields of the metdata and the data enclosed within the message but also see that the message itself varies a lot across requests and responses.
Let us take a look at some of these messages from the wire protocol:
The request type messages include the following:
1) Read Segment request type which is equivalent to our typical Get method usage. This has the following fields:
Segment
Offset
SuggestedLength
DelegationToken
RequestId
2) Setup Append request type to establish a connection to the host which includes the following fields:
RequestID
WriterID
Segment
DelegationToken
3) AppendBlock request type which includes the following fields:
WriterID
Data
RequestID
4) Partial Event request type to be applied the end of an Append block which did not fully fit within the Append block
5) the Event request for the full data that completely fits within a block
6) the Segment attribute request that takes a
RequestID
SegmentName
AttributeID
DelegationToken
7) the ReadTable request where there is a tableKey involved in the list below:
RequestID
Segment
DelegationToken
All these request types are also paired with their corresponding response types.
Looking at the approach above where the messages are all existing and currently being processed by the wire protocol, the http layer may simply encapsulate in the request body with a PUT method while retaining only the message type and message length as headers. This can then send the response body directly to the lower layer as a WireCommand which can parse itself from the bytes. The other approach could involve listing all the distinct fields from all the messages and choosing an appropriate header name for each. This allows the request to include the data as the entire payload but will require its own processor for fulfilling the requests with responses.  It would therefore appear that having a segment postion based read and an append only based write of typed events would be sufficient to allow data transmission over HTTP and significant expansion of audience for the store.