Tuesday, June 30, 2020

Stream operations continued

The stream manager is also the right interface to add these capabilities required at the stream store level from these automation workflows. Most of the existing methods on that interface target streams and the operations taken on the stream as a whole and these automation workflows are served by equivalent commands on the same stream. The new capabilities can be added cleanly without any issues with the earlier releases.
One of the advantages of providing this capability is that the applications can close the stream manager when they are done with its use allowing proper finalization in all cases. The stream manager interface already comes with the close method to facilitate this. 
The stream Manager can also facilitate copying segmentRanges by utilizing head and tail streamcuts This works just like whole streams except that the reader/iterator is initialized with the segment ranges. The segmentRange has to be contiguous because only a head and tail stream cut is specified. This is not a problem if multiple segmentRanges have to be copied. In such a case, the copy operation is repeated in every contiguous range 
Scopes can also be copied by iterating over all the streams and making copies of each stream. This is generally a long running operation as each stream could be massive by itself. The operation to perform scope copy is the similar to the copy of a folder containing streams that were written to files
The utility of stream copy is only enhanced by the use cases of scope copy, segment range copy and archival. In place editing of streams is avoided by performing sealing of segments written. There is no overwrite and all write operations are append only 
The streamManager could also move a stream from one scope to another by using copy operation although metadata only operation would be significantly more efficient in this case. 

Monday, June 29, 2020

StreamManager enhancements

The ability to append a stream to an existing stream is the same as copy operation to an empty stream. Therefore, comparisons between various usages of append, copy and stitching are left out of this discussion. 


There are a number of applications where one strategy might work better than another. This is left to the discretion of the callers with the minimal required from the stream store described here.  


File management functionality of archiving is another capability that is commonly used for export or import. Stream store has metadata file as well as data both of which can be persisted on tier 2 and imported as exported. 


This functionality is not native to the stream store. Yet high level primitives on stream manager interface alleviates the concern from out of band access and possible tampering. It also makes programmability easier to work with. 


Until now, stream store has largely relied on open, close, read and write functionalities primarily as it tries to become at par with the demands of the stream processing language but these are mostly for saving and analysis purposes leaving much of the data transfer in the hands of the administrator.  


The data import or export is certainly something that belongs outside the analytical applications for the most part but that does not leave it completely out of automations and particularly those that server data pipelines. Modern applications are increasingly targeting continuous data and streams serve very well in data pipelines with a little bit more enhancement to the programmability. 


The stream manager is also the right interface to add these capabilities required at the stream store level from these automation workflows. Most of the existing methods on that interface target streams and the operations taken on the stream as a whole and these automation workflows are served by equivalent commands on the same stream. The new capabilities can be added cleanly without any issues with the earlier releases. 


One of the advantages of providing this capability is that the applications can close the stream manager when they are done with its use allowing proper finalization in all cases. The stream manager interface already comes with the close method to facilitate this.  

Sunday, June 28, 2020

Application troubleshooting continued

Application troubleshooting continued:

Ephemeral streams 

Stream processing unlike batch processing runs endlessly as and when new events arrive. Applications use state persistence, watermarks, side outputs, and metrics to monitor progress. These collections serve no other purpose beyond the processing and are a waste of resources when there are several processors. If they are disposed off at the end of processing, they become ephemeral in nature.  

There is no construct in Flink or stream store client libraries to automatically cleanup at the end of the stream processing. This usually falls on the application to do the cleanup. There are a number of applications that don’t. And these leave behind lots of stale collections. It aggravates when data is copied from system to system or scope to scope.  

The best way to tackle auto closure of streams is to tie it to the constructor or destructor and invoking the cleanup from their respective resource providers.  

If the resources cannot be scoped because they are shared, then workers are most likely attaching or detaching to these resources. In such cases they can be globally shut down just like the workers at the exit of the process. 

User Interface 

The streaming applications are a new breed. Unlike time series database that have popular dashboards and their own query language, the dashboards for stream processing are constructed one at a time. Instead it could evolve into a Tableau or visualization library that decouples the rendering from the continuous data source. Even the metrics dashboard or time series data support a notion of windows and historical queries. They become inadequate for stream processing only because of the query language. In some cases, they are proprietary. It would be much better if they can reuse the stream processing language and construct queries that can be fed to active applications already running in the Flink Runtime.  

Each query can be written to an artifact, deployed and executed on the  FlinkCluster in much the same way as lambda processing does serverless computing except that in this case it is hosted on the FlinkCluster. Also, the query initiation is a one-time cost and its execution will not bear that cost again and again. The execution will result in a continuous manner with windowed results which can be streamed to the dashboard. Making web requests and responses from the application to the dashboard and vice versa is easy because the application has network connectivity. 

Query Adapters  

Many dashboards from different analysis stacks such as influxdb, SQL, come with their own query language. These queries are not all similar to stream processing but they can be expressed using Apache FlinkThese dashboards can be supported if the queries can be translated. The Flink language has the support for within with tables and batches. The querying translation is easier with parametrization and templates for common queries.  

Instead of query adapters, it is also possible to import and export data to external stacks that have dedicated dashboards. Also, development environments like Jupyter notebook are also able to promote other languages and dashboards if the data is made available. This data import and export logic is usually encapsulated in data transformation operators that are usually called extract-transform-load operators. 

Saturday, June 27, 2020

Use of cache by stream store clients


The copyStream operation on the Stream store involves the same operations that are invoked as part of client’s request to read from a stream and write to a stream. In this case, each event is actually copied from source to destination stream. The entire copy of all events is fault-tolerant with pause and resume capability just like that for a single event.  In this case, the stream store takes a snapshot of the stream and proceeds to copy it without waiting for new events at the tail of the source stream. Since the head and the tail streamcuts for the stream is clearly demarcated, a batch client could be used to iterate over the events in sequence. Each event read is immediately written to the destination stream before proceeding. Prior to the write, the last event is checked and if it is already the same event as the current event to be copied, the event write is skipped. There could be duplicate events in the source stream but during iteration, we have the sequence number and they are distinct even for duplicate events in the source stream. The sequence number or the time noted of the previous write could serve to keep a watermark and progressively move it forward. This detail is internal to the implementation of the stream store, the only requirement is that the selection of events is strictly progressive and the destination write is exactly once. The stream store is particularly capable of doing this. 
The copyStream operation can be repeated over and over again to produce clones but if there is already a copy made, then all other operations become idempotent. If additional copies are needed, the existing stream has to be renamed or a different name needs to be specified for the new stream. There is a start and an end and each event is processed for read and write only once which guarantees progression, so the copy operation is guaranteed to complete with the added robustness. 
The semantics of copyStream is sufficient for append operation as well. In this case, the append occurs only after the last event in the destination stream. The current segment in the destination stream is closed and all the readers and writers are taken offline prior to the copy operation and when it is complete, the last segment is sealed again.
The copyStream operation can also work with regular reader and writer because the event read has an envelope information such as position that does away with the limitations of the historical reader. These limitations also include the need to specify a start and end to the historical reader. Instead, they are regular reader and writer are better able to include the latest events in the copy operation. The regular reader can stop after a threshold of inactivity or a limit imposed by the writer. These operations continue for a duration.
The notion of copying a stream seems inappropriate for a stream given the emphasis on continuity but the stream can be written to a file. File copy is universally accepted. Therefore, one way to do stream copy is to persist it to a file and then copy and import it back again into another stream. This mechanism is independent of the stream store and loses the authenticity of the streams.
Another approach is to leverage tier2 replication strategy for automatically creating copies and them promoting them to be first class streams of the stream store by adding metadata. 

The copyStream functionality could also be implemented directly in a cache or message queue broker that is layered over the stream store. For example, a message broker can queue the segments in the order they need to be stitched and using the store operations may transfer those segments between source and destination. This technique works very well with the methods available from the stream store and the client is able to take the compute out of the store. 
There are tradeoffs between performing the copyStream operation at all these levels. Performance increases significantly as we move down the stack from the application to the stream store. Flexibility, access control, pause and resume and other functionalities are available higher up the stack than in the stream store. 
The ability to append a stream to an existing stream is the same as copy operation to an empty stream. Therefore, comparisons between various usages of append, copy and stitching are left out of this discussion.
There are a number of applications where one strategy might work better than another. This is left to the discretion of the callers with the minimal required from the stream store described here. 


Friday, June 26, 2020

Use of cache by stream store clients





The stream store does not need anything more complicated than a copyStream operation which can then be appended to a stream builder. Each stream already knows how to append an event at the tail. If a collection of events cannot be appended, a copy stream operation will enable the events to be duplicated and since it is does not have an identity, it can be stitched with builder stream by rewriting the metadata. This can be done on an event by event basis also but that already is exposed via the reader and writer interface. Instead, the copyStream operation on the stream manager can handle a collection of events at a time.
The copyStream operation on the Stream store involves the same operations that are invoked as part of client’s request to read from a stream and write to a stream. In this case, each event is actually copied from source to destination stream. The entire copy of all events is fault-tolerant with pause and resume capability just like that for a single event.  In this case, the stream store takes a snapshot of the stream and proceeds to copy it without waiting for new events at the tail of the source stream. Since the head and the tail streamcuts for the stream is clearly demarcated, a batch client could be used to iterate over the events in sequence. Each event read is immediately written to the destination stream before proceeding. Prior to the write, the last event is checked and if it is already the same event as the current event to be copied, the event write is skipped. There could be duplicate events in the source stream but during iteration, we have the sequence number and they are distinct even for duplicate events in the source stream. The sequence number or the time noted of the previous write could serve to keep a watermark and progressively move it forward. This detail is internal to the implementation of the stream store, the only requirement is that the selection of events is strictly progressive and the destination write is exactly once. The stream store is particularly capable of doing this. 
The copyStream operation can be repeated over and over again to produce clones but if there is already a copy made, then all other operations become idempotent. If additional copies are needed, the existing stream has to be renamed or a different name needs to be specified for the new stream. There is a start and an end and each event is processed for read and write only once which guarantees progression, so the copy operation is guaranteed to complete with the added robustness. 
The semantics of copyStream is sufficient for append operation as well. In this case, the append occurs only after the last event in the destination stream. The current segment in the destination stream is closed and all the readers and writers are taken offline prior to the copy operation and when it is complete, the last segment is sealed again.
The copyStream operation can also work with regular reader and writer because the event read has an envelope information such as position that does away with the limitations of the historical reader. These limitations also include the need to specify a start and end to the historical reader. Instead, they are regular reader and writer are better able to include the latest events in the copy operation. The regular reader can stop after a threshold of inactivity or a limit imposed by the writer. These operations continue for a duration.
The notion of copying a stream seems inappropriate for a stream given the emphasis on continuity but the stream can be written to a file. File copy is universally accepted. Therefore, one way to do stream copy is to persist it to a file and then copy and import it back again into another stream. This mechanism is independent of the stream store and loses the authenticity of the streams.
Another approach is to leverage tier2 replication strategy for automatically creating copies and them promoting them to be first class streams of the stream store by adding metadata. 

Thursday, June 25, 2020

Use of cache by stream store clients ( continued from previous article )


The stitching of segment ranges can be performed in the cache instead of the stream store by the design mentioned yesterday. Even a message queue broker or a web accessible storage can benefit the use of conflating of streams


As long as the historical readers follow the same order as the order of segmentRanges in the source stream, the events will be guaranteed to be written in the same order  in the destination stream. This calls for the historical readers to send their payload to a blocking Queue  and the writer  will follow the same order of writes as the order in which they are in the queue . The blocking Queue backed by the cache becomes a serializer of the different segmentRanges to the destination stream and the writes can be superfast when all the events for the segmentRange are available in memory


The stitching of streams into a single stream at the stream store varies considerably from the same logic in the clients. The stream store not only is the best place to do so but it also is the authoritative one. The clients can conflate the stream from a source to a destination regardless of whether the destination is the same store from which the stream was read. Instead, the stream store is the one that can maintain the integrity of the streams as they are stitched together. The resulting stream is guaranteed to be the true one as compared to the result of any other actors. 


The conflation of stream at the stream store is an efficient operation because the events in the original streams now become part of the conflated stream if the original stream does not need to retain its identity. The events are also not copied one by one in this case. The entire segmentRange of the streams to be conflated will simply have its metadata replaced with what corresponds to the new stream. The rewrite of metadata across all the participating streams in the conflation is then a low cost super fast operation. 


The stream store does not need anything more complicated than a copyStream operation which can then be appended to a stream builder. Each stream already knows how to append an event at the tail. If a collection of events cannot be appended, a copy stream operation will enable the events to be duplicated and since it is does not have an identity, it can be stitched with builder stream by rewriting the metadata. This can be done on an event by event basis also but that already is exposed via the reader and writer interface. Instead, the copyStream operation on the stream manager can handle a collection of events at a time. 

Wednesday, June 24, 2020

Use of cache by stream store clients ( continued from previous article )

The benefits of Async stream readers in cache for data pipelines to stream store
The cache essentially projects batches of segmentRanges so that the entire stream does not have to scanned from begin to end repeatedly by every reader. This mode of operation is different from pairing every reader with a writer to the destination.
The cache may use a queue to hold several segmentRanges from a stream. 
The choice of queue is extremely important for peak throughput. The use of a lock free data structure as opposed to an ArrayBlockingQueue can do away with lock contention. The queues work satisfactorily until they are full. Garbage free Async stream readers have the best response time. 
Asynchronous read together with parallelism boost performance. The cache may encounter significant size for each segment and a writer that transfers a segment over the network may take time in the order of hundreds of milliseconds. Instead, having a continuous background import for adjustable window of steam segments tremendously improves the load on the stream store while responding to seek requests on segments. The throttling and backpressure can be well handled by adjustible window. 
Another approach is to have a continuous reader read the stream from beginning to end in a loop and have the cache  or the writer decide which of the segments are prioritized to be written to the destination stream store when conflating different streams. This approach requires the cache to discard segments that are not necessary for the store such as when they are duplicates. The cache or the writer can even empty the blocking queue to suggest more parallelism in the readers as necessary to replenish the window of segments it needs. 
The stitching of segment ranges can be performed in the cache instead of the stream store by the design above. Even a message queue broker or a web accessible storage can benefit the use of conflating of streams
As long as the historical readers follow the same order as the order of segmentRanges in the source stream, the events will be guaranteed to be written in the same order  in the destination stream. This calls for the historical readers to send their payload to a blocking Queue  and the writer  will follow the same order of writes as the order in which they are in the queue . The blocking Queue backed by the cache becomes a serializer of the different segmentRanges to the destination stream and the writes can be superfast when all the events for the segmentRange are available in memory

Tuesday, June 23, 2020

Historical stream readers and exporting events from Stream

: the historical readers don’t give the eventPointer record, because it iterates over the events in the segment. It is not advisable to reconstruct the eventPointer records from the segmentId and the sequence number or event size as offset. The events in any one segment read by the historical reader will be in sequence but their eventPointer record is only known to the stream store and not available to the segmentIterator.
The segmentIterator is also issued one per segment by the historical reader factory. This involves multiple workers to read each segment independently. This means the sequence will not be in continuation between workers which makes the case against the use of event pointer records for historical reads but it is possible sequentially traverse the segments with one worker. The regular stream readers don’t suffer from this disadvantage as they read the next event from start to finish and these boundaries can be adjusted.

The historical reader, also called a batch client because it iterates over a segmentRange, also does not give position of the events. The only indicator that the events returned are in sequence is that the iterator moves from one event to the next in sequence within a given segment and those segments that it is iterating on within its segment range so long as those ranges are iterated segment by segment sequentially by the same worker that is instantiating the iterator.  
A client application that wishes to parallelize this task simply has to provide non-overlapping partitioned segment ranges indicated by a head and a tail streamcuts. This parallelizes the read of the entire stream in terms of partitioned segmentRanges. The interface for the historical reader is available via the BatchClientFactory but the writer cannot be a historical writer.  It has to be a regular writer that writes events one after the other. Therefore, the application reading the events from a segment range in sequence will acquire and hold the writer for the duration of its read. As long as the historical readers follow the same order as the order of segmentRanges in the source stream, the events will be guaranteed to be written in the same order as they are read in the destination stream. This calls for the historical readers to acquire a lock on the writer for synchronization and follow the same order of writes as the order in which they are assigned. The writer becomes a serializer of the different segmentRanges to the destination stream and the writes can be superfast when all the events for the segmentRange are available in memory. Typically, this is not feasible as the number of events and their total size even in a small segment range may far exceed the available memory. Therefore, they spill over to local disk after being read which is again another inelegant solution. 
The stream store does not provide a way to stitch stream segmentRange that is read from the BatchClient. It is also not possible to append one stream to another inside its store. The streamManager provides the ability to create, delete and seal a stream but no copy function yet. The good thing is that it doesn’t have to. The client applications can map the segmentRange order read to the order written without being restricted to one reader and writer. 
As long as the segmentRanges area read and written directly to independent streams on the destination, there is no memory bufferPool or temporary disk required. Once the events from segmentRanges make it to the destination stream store via independent streams, the individual streams can be conflated to a single stream in a separate processing which happens local to the stream store and those independent streams can be done away with. Even the copying operation can be circumvented if the stream store provide ability to stitch different streams together and until then, it is still cheap to do.