Sunday, July 12, 2020

Reader Group Notifications

Stream Store Notifications:
A stream store is elastic, unbounded and continuous data storage. Events arrive in a stream to the stream store and are serialized by time. A partitioned collection of events that span a start time t0 and end time t1 is called a segment. There can be more than one segment between t0 and t1 to allow parallel read and write. The multiple segments are spread out over a key space determined by the use of routing keys used to write the events. Segments do not overlap in both horizontal and vertical directions and need not align on the same time boundaries. Each segment can be started and sealed independently so that no more events are appended to it. The number of segments for a stream depends on the scaling policy specified at the time of stream creation. A scaling policy can be fixed, data-based or event-based to accommodate varying size and count of segments according to anticipated load.
This logical representation of a stream is hard for a user to visualize when the data is being read while it is arriving by the order of millions from a device.  The user has the guarantee that the events will be written in order of their arrival and none of the events will be lost during write and subsequent read but the user does not know the progress made in terms of reading the segments or if all the data has been read. If the user knew the progress, the readers could be scaled accordingly to keep up or complete reading at a faster rate.
The stream store publishes notifications for segments that come to the aid of the user in this regard. This article describes these notifications and their usage with potential improvements in the future.  
The notifications are usually based on their type. For example, a well-known stream store publishes a SegmentNotification and an EndOfDataNotification when all the streams managed by a reader group have been read. The notifications are posted to the reader group so that the user can take action associated with the entire reader group
The SegmentNotification is published only when the number of segment changes. With its payload of the current number of segments and the current number of readers, a reader group can determine if the number of readers needs to be scaled up or scaled down. The trend in changes to the number of segments gives an indication of the rate at which the data is arriving. A reader is bound to a segment until all the events in that segment have been read. Each segment is read exactly by one reader in any reader group configured with that stream. Even if the readers are scaled up or down, the stream store maintains the assignments of segments to readers in a way that balances the active readers to the segments uniformly. There are at most as many active readers in a reader group as there are segments in the stream. Ideally, there is a one-to-one mapping between a segment and a reader.
The notifications are purely a client-side concept because a periodic polling agent that watches for the number of segments and the number of readers can raise this notification. The stream store does not have to persist any information. Yet the state associated with a stream can also be synchronized by persistence instead of retaining it purely as an in-memory data structure. This alleviates the need for the users to keep track of the notifications themselves although the last notification is typically the most actionable one and the historical trend only gives an indication for the frequency of adjustments.


When the events are persisted in the stream itself, the publisher-subscriber interface then becomes similar to writer-reader that the store already supports. The stack that analyzes and reports the data can read directly from the stream. Should this stream be internal, a publisher-subscriber interface would be helpful. The ability to keep the notification stream internal enables the store to cleanup as necessary. Persistence of events also helps with offline validation and introspection for assistance with product support.
Notifications on the reader group are interesting when readers come and go. Changes in both number of segments and number of readers could renew notifications to the reader group.


Notifications are asynchronous to the activities that creates them. Some notifications may altogether be missed, raised incorrectly or occur more than necessary. Tests for notifications span all such operations that result in notifications, target all types of notifications and their payloads. Any policy parameters that result in a different outcome is also candidate for tests.
Notifications are events by nature. All tests applicable to events are also candidates for notification with the observation that notifications are in response to operations on the control plane. They are not data and should ideally be published and subscribed once during its lifetime. There is no purpose to notifications after they have been addressed.
That said, there can be many subscribers to the same notification. All registered subscribers should be able to get each notification. Depending on the scope, purpose and duration, subscribers may be registered on different interfaces to receive only the notification they are interested in. This alleviates the need to provide filters on notifications. Persisted events follow a different model. They are available for analysis via stream processing language.
The number of segments in a stream changes when it is auto scaled by the store. The segments can also be scaled manually with the help of scaleStream method on the controller. This method allows parallelization of read and with the use of readers assigned to a reader group should receive notifications. The poller on the client detects changes in the synchronization state but this is invoked only when the stream store updates it. The reassignment of readers to segments is an indicator for notifications on the reader group. Therefore, manual scaling must be followed by stream store reconciliation of reader group prior to notifications getting sent.

One of the reason notifications are not raised is that the there is no detection of changes. This is the case when say the subscriber registers itself after the object that needs to be watched, has been created. Objects tend to be initialization heavy, so that it is one time and the repeated cost is avoided during operational use. 

All notifications are based on a record that is called the ReaderGroupState. It contains useful information pertaining to segments and positions but the primary thing to note is that it uses a stream for its persistence and versions the changes. This in itself provides much of the desired serialization of the changes to the states and allows the manager to optimize updates if the revision is not exceeded.
The ReaderGroupStateManager is the one that handles the state update on every reader coming online or going offline. It does this with the help of StateSynchronizer for ReaderGroupState. Additionally, this manager knows how to handle a segment being completed or to release a segment on redistribution of segments to readers or to determine if a reader owns multiple segments. It fetches and applies updates as necessary and maintains timers to calculate most of the information associated with a ReaderGroupState.
The StateSynchronizer handles all the low-level operations associated with the state such as creating a new update, persisting and retrieving it.  The primary purpose of this synchronizer is to handle state updates across multiple processes. It needs to keep track of only the current state as opposed to others that may compare versions of the state. It knows the segment to be associate with that state. As usual, each segment points to a scoped stream that is unique by the scope-stream combination along with full qualification.
The ClientFactory that creates readers in the reader group creates both the reader group state manager and a state synchronizer. Additionally, it creates a revisioned stream client for watermarks. This is the main interface for all applications that want to create readers and writers. At the point of admission for a reader, all the associated data structures are initialized that enable the ReaderGroupState to be created and updated. The ReaderGroup data structure provides a notifier to the application.
The notifications are raised by the notifier such as the SegmentNotifier which polls and retrieves the state from the synchronizer. Only if the data has changed does the SegmentNotifier raise a notification to the listener registered with it by the application. The publishing of the notification is done with the help of a notification system.
The NotificationSystem has a notify method that invokes the listener with the corresponding notification. The application can then use the details in the notification with custom logic to scale readers up or down.
There is one synchronizer associated with each Reader and with each ReaderGroup. The synchronizer is instantiated with a segment and a revisioned stream client. The client can be numerous just like the synchronizer and the readergroups but the writes to the stream serialized and the revision is checked before the updates are made. The stream is internal and starts with a prefix that denotes it is part of the scope of streams for internal use and includes the name of the source stream for which the reader and the readerGroup were created. 
Each Reader created by the application references a set of internal readers called waterMarkReaders which are capable of reading the revisions to each stream. There is a map of one such internal reader to a stream and keyed by stream name that is maintained in each reader from the application. Before each read event, the reader forwards the positions of these internal readers to the latest revision.  
The synchronizer maintains revisions so it does not need to synchronize on the client. But the revisions should be handled consistently otherwise the reader created on one thread will not generate a revision for a readergroup created on another thread. Usually the reader group and readers are created in the same thread and the notifier is always created after both the reader and the readerGroup have been created.
The number of revisions the observables can take depends on the next update from the reader group state change. Readers come online or go offline only once but their numbers in a reader group can vary. A stream having a lot of segments will have a large number of readers for uniform balancing. The reader group can even show the distribution of readers to segments.  The application can use the notifications and the reader distribution to scale up or down the readers.
Since the synchronizer is the only one making the revisions with the help of a revisioned-stream client, it can synchronize the revisions made from across the processes. The synchronizer and therefore its client can be one per reader even as it synchronizes the state change per reader.  The call out here is that there are three stages to smooth out the notification process. First, is the detection of every state change such as the changes in the number of readers and the number of segments from all sources. For example, segment changes can come from both segment sealing and auto-scaling.  The changes in the readers can be detected if the reader group is kept notified. Second, each and every change should be serialized to the revisioned-stream. If the appends to the stream are skipped because the revision turns to be not as new as the most recent revision, then all the detection will still lead to lossy serialization. Third, the revisioned streams should be polled and read by notifier frequently so that all the events are sent to the notification system. If any of the events are skipped and the application is not informed, the notifications will not be accurate in their sequence. Each notification and the sequence of notifications are both useful to the application.
One of the best ways to detect all reader group segment changes is to check the number of segments at the time of addmission of readers into the reader group. 
This would involve:
Map<SegmentWithRange, Long> segments = ReaderGroupImpl.getSegmentsForStreams(controller, config);
synchronizer.initialize(new ReaderGroupState.ReaderGroupStateInit(config, segments, getEndSegmentsForStreams(config)));
Where the segments are read from the ReaderGroup which knows the segment map and the distribution of readers rather than getting the segments from the state.
---

No comments:

Post a Comment