Monday, June 22, 2020

Application troubleshooting continued

Ephemeral streams
Stream processing unlike batch processing runs endlessly as and when new events arrive. Applications use state persistence, watermarks, side outputs, and metrics to monitor progress. These collections serve no other purpose beyond the processing and are a waste of resources when there are several processors. If they are disposed off at the end of processing, they become ephemeral in nature. 
There is no construct in Flink or stream store client libraries to automatically cleanup at the end of the stream processing. This usually falls on the application to do the cleanup. There are a number of applications that don’t. And these leave behind lots of stale collections. It aggravates when data is copied from system to system or scope to scope. 
The best way to tackle auto closure of streams is to tie it to the constructor or destructor and invoking the cleanup from their respective resource providers. 
If the resources cannot be scoped because they are shared, then workers are most likely attaching or detaching to these resources. In such cases they can be globally shut down just like the workers at the exit of the process.
User Interface
The streaming applications are a new breed. Unlike time series database that have popular dashboards and their own query language, the dashboards for stream processing are constructed one at a time. Instead it could evolve into a Tableau or visualization library that decouples the rendering from the continuous data source. Even the metrics dashboard or time series data support a notion of windows and historical queries. They become inadequate for stream processing only because of the query language. In some cases, they are proprietary. It would be much better if they can reuse the stream processing language and construct queries that can be fed to active applications already running in the Flink Runtime. 
Each query can be written to an artifact, deployed and executed on the  FlinkCluster in much the same way as lambda processing does serverless computing except that in this case it is hosted on the FlinkCluster. Also, the query initiation is a one-time cost and its execution will not bear that cost again and again. The execution will result in a continuous manner with windowed results which can be streamed to the dashboard. Making web requests and responses from the application to the dashboard and vice versa is easy because the application has network connectivity.

Sunday, June 21, 2020

File System Management interface exists because users find themselves as manager of their own artifacts. They create a lot of files, then they have to find, organize, secure, service, migrate and perform the same actions on one or more artifacts. Many universal runtimes recognized this need as much as the operating system and the FileSystemObject Interface became widely recognized standard across languages, runtimes and operating systems.
With the move from personal computing to cloud computing, we now have files, blobs and streams. There are a few differences between filesystems and object storage in terms of file operations such as find and grep that are not well-suited for object storage. However, the ability to search object storage is not limited from the API. The S3 API can be used with options such as cp to dump the contents of the object to stdout or with grok. In these cases, it becomes useful to extend the APIs.
 Most developers prefer the filesystem for the ability to save with name and hierarchy. In addition, some setup watch on the file systems. In the object storage, we have equivalents of paths and we can enable versioning as well as retention. As long as there are tools, SDK and API for promoting the object storage, we have the ability to finish it as a storage tier as popular as the filesystem.  There is no more a chore to maintain a file-system mount and the location that it points to. The storage is also virtual since it can be stretched over many virtual datacenters.  The ability to complete some tools such as for grep, SDK and connectors will improve the usage considerably.
File-systems have long been the destination to store artifacts on disk and while file-system has evolved to stretch over clusters and not just remote servers, it remains inadequate as a blob storage. Data writers have to self-organize and interpret their files while frequently relying on the metadata stored separate from the files.  Files also tend to become binaries with proprietary interpretations. Files can only be bundled in an archive and there is no object-oriented design over data. If the storage were to support organizational units in terms of objects without requiring hierarchical declarations and supporting is-a or has-a relationships, it tends to become more usable than files.
Streams is considered a nascent technology but one that is more natural to storage because most storage often sees an append only access to writing data over a long period of time. Stream storage provide abilities to read and write but have yet to provide a stream management interface similar to those seen on file-systems that automate workflows using these containers such as finding, copying etc.  
The Stream data store is equally suited for both a sink and a participant in a data pipeline. The Stream management operations will be suited for the latter. The use of connectors and appenders enhance the use of storage product for data transfers.  Connectors funnel data from different data sources and are typically one per type of source. Perhaps the simplest form of appender is an adapter pattern that appears as a proxy between the stream store as the sender and the application as the receiver. The publisher-subscriber pattern recognizes that receiver may not need to be a single entity and can be a distributed application. The backup pattern buffers the data with scheduled asynchronous writes more suitable for stream store and with little or no disruption to the data sink. The compression and deduplication pattern is about the archival form of data. The replication pattern is about the cloning and syncing of data between similar storage. Each of these workflows translate to stream management layer functions. The implementation of this layer could even be written in the applications that use the client to the stream store. 
Sample Stream workflow automation: https://github.com/ravibeta/JavaSamples/StreamDuplicity

 

 

Friday, June 19, 2020

Application troubleshooting continued


Data Ingestion Path techniques:
 

There are multiple paths of ingestion to the data store within the SDP instance. These paths are via the store client, via the store connector, and via connector used in the maven artifact uploaded to the streaming analytics platform. They do not include external access to the Pravega store which could be from other data pipelines.
In this case, we want to look at all the requirements of a lo long-runningg-running application that generates data using the maven artifact. It is assumed that the data is pulled from different sources over the web programmatically by the application and written to a stream in the SDP instance stream store instance stream store.
The set of techniques described in this document will improve all aspects of serviceability for Flink Applications - present and future by improving
visibility into its execution
serviceability
troubleshooting
backup and restore of application state
and data handling
All analytical applications require data and often preparing the data using some form of extract-transform-load is the bulk of the activities for its successful execution. Although data migration encompasses transfering data from external systems into the Pravega data store within stream analytics platform with or without the use of FlinkApplications, this part is about the features of stream analytics platform that facilitate these data flows 
Stream analytics platform application developers have the dual concern of code and data maintenance. While the code is exclusively hosted in the FlinkRuntime, the data makes its way through multiple paths. This feature set will improve the streamlining of data transfer to the store inside the stream analytics platform while improving the transparency of the hosted code
Most of these techniques can be implemented by the application developers but stream analytics platform can take ownership of data flows and application serviceability from the application developer and provide a smooth execution environment for the running of analytical applications using enhanced features from its store and runtime. 
This way the stream analytics facilitates and persist runtime information that enables analytical applications to run for a long time smoothly.




Thursday, June 18, 2020

Application troubleshooting continued

Permissions and Access Control:
Troubleshooting applications require access control to be eliminated with full permissions in order to see the problem better. Yet, production applications have lots of access controls and permission checks to let the code be executed. This calls for prudence in narrowing down the search over specified snipptes of the application instead of just replacing the whole code with one that has full permissions. 
The stream store enables two different permission set on the controller. The first is the READ permission and the second is the READ-WRITE permission. These permissions serve to separate read-only access from the read-write access on the controller methods and are determined by the credentials passed in via ClientConfig. This configuration forms the basis for using the streamManager, Controller and EventStreamClientFactory which are used to create and manage the streams, seal or truncate the stream and create readers and writers respectively.
The standalone stream store has users specified in ‘./conf/passwd’ file such as with “nonadmin:P@ssw0rd:*,READ;” but it bypasses authentication in emulation mode. In the stream analytics platform hosted on a K8s cluster, this would require a change to /etc/auth-passwd-volume/password-file.txt in the stream store controller deployment.  
We can also test the code paths to see if read-only or read-write access is required:
[main] INFO io.pravega.client.ClientConfig - Client credentials were extracted using the explicitly supplied credentials object.
[main] INFO io.pravega.client.admin.impl.ReaderGroupManagerImpl - Creating reader group: test-reader-group-1 for streams: [StreamImpl(scope=testscope, streamName=teststream)] with configuration: ReaderGroupConfig(groupRefreshTimeMillis=0, automaticCheckpointIntervalMillis=-1, startingStreamCuts={StreamImpl(scope=testscope, streamName=teststream)=UNBOUNDED}, endingStreamCuts={StreamImpl(scope=testscope, streamName=teststream)=UNBOUNDED}, maxOutstandingCheckpointRequest=3)
The application when deployed in production typically only the minimal necessary permission for it to execute.
The configuration is not the only place the credentials are used. In fact, the flink application is not even required to pass in client credentials. Since the application is hosted in the project of the stream analytics platform, it is automatically permitted to use the streams in the stream store that this project has access to. 
The internal access is granted by the service account and the service binding in the Kubernetes catalog and they are registered by the analytics platform.

Wednesday, June 17, 2020

Application troubleshooting continued

The metrics exposed by Flink include: 
Flink supports Counters, Gauges, Histograms and Meters. A counter is used to count something and can be incremented or decremented by a step function. It is registered on a metric group. 
A gauge provides a value of any type on demand. Since it only returns a value and for any given type, it can be registered on the metric group. The reporters will turn the exposed object into a String.  

The Histogram measures the distribution of values. There is no default implementation of one in Flink but it is available from flink-metrics-dropwizard dependency. A meter measures an average throughput and can be registered on the metric group.  Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.  

Each of the connectors such as Kafka and Kinesis connectors have their own metrics. This can be emulated in the Pravega connector as well. 

Long Running jobs can make use of metrics on connectors, checkpoints, system resources, and Latency 
If we choose the metrics for typical troubleshooting scenarios like long running applications, it becomes easier to investigate the case. Other forms of diagnostics such as logs and events may not always be available or may not support the kind of querying that us easy from metrics dashboard, so having these comes handy.  

The metrics are also available to view via REST API. This makes it convenient to use them dynamically from a variety of applications either for one time analysis or recurring. These APIs are available from both the taskManager and jobManager. Each job also has its set of metrics. The request metrics can be aggregated over a subset of all entities with just a mention in query parameters. 

The taskManagers and the jobManager can both list their metrics. When the metrics are aggregated the json will usually be an array of metrics  

Metrics gathered for each tasks or operator can be visualized in the dashboard. Task metrics are will have a task index and metric name. The operator metrics are listed with name of the operator in addition to subtask and metric name. 

The set of metrics that can be visualized in a dashboard can be of any number since there are no restrictions but typically we need one to form a hypothesis and another to test it with.  

 

Tuesday, June 16, 2020

Application troubleshooting continued

The metrics exposed by Flink include:
Flink supports Counters, Gauges, Histograms and Meters. A counter is used to count something and can be incremented or decremented by a step function. It is registered on a metric group.
A gauge provides a value of any type on demand. Since it only returns a value and for any given type, it can be registered on the metric group. The reporters will turn the exposed object into a String. 
The Histogram measures the distribution of values. There is no default implementation of one in Flink but it is available from flink-metrics-dropwizard dependency. A meter measures an average throughput and can be registered on the metric group.  Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported. 
The system level metrics for jobManager include the following:
numRegisteredTaskManagers
numRunningJobs
taskSlotsAvailable
TaskSlotsTotal
all of which are available as type gauge.
The RocksDB metrics are not available by default.
Those for checkpoint involve
lastCheckpointDuration
lastCheckpointSize
lastCheckpointExternalPath
lastCheckpointRestoreTimestamp    
lastCheckpointAlignmentBuffered
numberOfInProgressCheckpoints
numberOfCompletedCheckpoints
numberOfFailedCheckpoints
totalNumberOfCheckpoints
       And are available only on the jobManager
Each of the connectors such as Kafka and Kinesis connectors have their own metrics. This can be emulated in the Pravega connector as well.
Long Running jobs can make use of metrics on connectors, checkpoints, system resources, and Latency

Monday, June 15, 2020

Application troubleshooting continued

Providing REST API:
StreamProcessing is usually bound to a stream in the stream store which it accesses over the gRPC based connectors. But the usefulness and convenience of making web requests from the application cannot be written off. REST Apis are independently provisioned per resource and the services providing those resources can be hosted anywhere. This flat enumeration of APIs helps with the microservice model and available for mashups within the application. SOAP requires tools to inspect the message. REST can be intercepted by web proxy and displayed with browser and add-on.
SOAP methods require declarative address, binding and contract. REST is URI based and has qualifiers for resources. This is what makes REST popular over gRPC but the latter is more suited for IoT traffic.
There is a technique called HATEOAS where a client can get more information from the web API servers with the help of hypermedia. This technique helps the client to get information outside the documentation by machines instead of humans. Since the storage products are considered commodity, this technique is suitable for plugging in one storage product versus another. The clients that use the web API programmability can then switch services and their resource qualifiers with ease.
Some of the implementors for REST API follow a convention.  This convention makes it easy to construct the APIs for the resources and to bring consistency among those for different resources.
REST Apis also make authentication and authorization simpler to setup and check. There are several solutions to choose from with some involving offloaded functionality or the use of frameworks like Java which can support annotation-based checks for adequate privilege and access control. 
Supported actions are filtered. Whether a user can read or write is determined based on the capability for the corresponding read or write permission. The user is checked for the permission involved. This could be an RBAC access control such that it is just a check against the role for the user. The system user for instance has all the privileges. The capabilities a user has is determined from the license manager for instance or by the explicitly added capabilities. This is also necessary from audit purposes. If the auditing is not necessary, then the check against capabilities could always return true. On the other hand, if all the denies were to be audited, it would leave a trail in the audit log.
One of the best advantages of using REST APIs in application code, is that it can bring in services with little or no dependencies. External services can now be leveraged in the application by simply making a web request. Each invocation of the web request can now generate events and these events along with the data can also be subscribed via a store the persists events or via webhooks. 
In this regard, the application can save both events and data in a stream store. A publisher-subscriber can then be implemented over the stream store.  The persistence of side-outputs and events to a stream enable their querying via stream processing language which would otherwise not be possible.
There are many applications in the K8s marketplace that follow a similar model except that they replace the use of a stream store with a one-off database for their purpose. Some of them also install a separate database such as postgres, badgerdb, and other stores other than the etcd that is available natively on the K8s. These stores come with their own burdens of requiring expertise, troubleshooting and performance tuning which increases the cost for the user. Instead, the use of the stream store scales well to all the runtime events because the events will seldom be as many as the data events.  The use of a stream store with the same model as these products will drive down costs for the end users.