Sunday, June 7, 2020

Application troubleshooting continued

Multiple levels of scaling: 


There are multiple levels of scaling involved. First, level is the Kubernetes level. Second level is the per component cluster level. Third level is the resources assigned to the application at the application level.  Not all scaling is the same. Some increase replicas while others increase resources per cluster.  

If the tasks are partitioned, scaling out the replicas help with the compute. If the compute is not the bottleneck but the storage is, then the pravega cluster can be scaled independent of others.  The patching of resources in the PravegaCluster occurs in the following order: Zookeeper, BookKeeperPravega Segment Store and lastly Pravega Controller. When these components are scaled, the existing application containers can continue to work without restarting. 


Scaling script may be available to help with the sizing but the following preparation is required in any case: 

  • Require credentials to access the cluster and for the admin privilege on the software components 


  • Check the health of the clusters in terms of desired and ready members 


  • Check for errors and number of restarts on the containers 


  • Check for the availability of resources to scale specific components or whole clusters 


  • The actual scaling step will be easy to check with the number of members and their status 

  • After the scaling, it is important to check the logs for containers to ensure that the they are healthy and running successfully 


A simple check of the dashboard for the analytics and the stream can provide visual confirmation that all the components are healthy. 


It is preferable not to attempt scaling prior to upgrade since it can be done when the components are running smoothly and the steps for scaling do not affect the operations of the components that are not touched. 

Saturday, June 6, 2020

Application troubleshooting continued...

Application Scaling: 

The runtime for stream processing such as Flink is deployed in the form of a dedicated cluster along with the necessary software components such as zookeeper, stream analytics platform components and even clients to the stream store. When the application is deployed using the isolation of a project in the stream analytics platform, all the necessary cpu, memory and disk is specified upfront for all these software components to run. It becomes necessary to predict the resource usage of the application.  

This section focuses on just the Flink cluster with its jobManagertaskManagerresourceManager and dispatcher. The jobManager is the master process that controls the execution of a single application. Each application is controlled by a different jobManager and is usually represented by a logical dataflow graph called the jobGraph and a jar file. The jobManager converts the job graph into a physical data flow graph and parallelizes as much of the execution as possible.  

There are multiple resource managers based on the cluster of choice. It is responsible for managing task manager’s slots which is the unit of processing resources. It offers idle slots to the jobManager and launches new containers.  

The task manager are the worker processes of Flink. There are multiple task managers running in a Flink setup. Each task manager provides a certain number of slots. These slots limit the number of tasks that a task manager can execute.  

The dispatcher runs across job execution and when an application is submitted via its REST interface, it starts a jobManager and hands the application over. The REST interface enables the dispatcher to serve as an HTTP entrypoint to clusters that are behind a firewall. 

Since the Flink runs as a standalone within the project namespace in the analytics platform, its dashboard is made accessible as-is with a project namespace qualified name. This gives us enough indicators of the resource usages for Flink job execution and enables the project user to deploy the project with the adequate resources for mission critical deployments. 

It is recommended to scale out the cluster based on the loads over the software components. The compute intensive resources will require more members in the cluster while the storage intensive components may require larger fileshare allocation. 

 

Multiple levels of scaling: 

There are multiple levels of scaling involved. First, level is the Kubernetes level. Second level is the per component cluster level. Third level is the resources assigned to the application at the application level.  Not all scaling is the same. Some increase replicas while others increase resources per cluster.  

Friday, June 5, 2020

Application troubleshooting continued

Application security:
When a stream processing application is deployed on a stream analytics platform involving a stream store, the application access to the data goes through several access control checks. These checks depend on several factors:
1) the way the stream store and analytics platform is deployed ( both networking and access control )
2) the way the project is setup with members and permissions ( which determine whether an application can run )
3) the streams that the application requests access to ( both within the project and outside )

The networking aspect is simple to cover. 
Most of the stream store and analytics components such as segmentstore and controller are deployed as services.
Services are of three types
1) Default aka type clusterIP where service traffic is allowed internal to the K8s cluster
2) NodePort where the static node configuration is proxied via dynamic ports for other pods
3) LoadBalancer where a NAT is provisioned
4) External name where a service is directly accessible over IP via the fully qualified DNS name
Regardless of how the service is provisioned, an ingress resource from K8s is dedicated to securing and provisioning external access with best practice.
That is why the networking is usually taken care of with Services for internal traffic and ingress for external access

The access control is provided with credentials separate for the stream store and for the stream analytics.
The stream store access is secured for all applications in the project in a uniform way but an application can provide the connector configuration with specific credentials such as username/password to connect to the stream store or optionally turn of other access related validations.
The credential used by the application is for the use of the data in the stream store. The credential used to deploy the application pertains to the stream store analytics projects and requires the project administrator to bless the user to deploy the application. Typically the user is made a project member where that role allows access to deploy and run the application.

Thursday, June 4, 2020

Application troubleshooting continued

6) Integration: software analytical and storage stack do not necessarily agree in their nature. However, data that can be linked expands the value and the usage. Data library that spans heterogenous data sources and their software stacks are preferred yet not every stack can be plugged into a library layer.    
7) Reliability: Data must always remain available. Unfortunately, the storage engineering best practice for data do not always make it into each and every product or their deployments. Object storage has been found to solve durability and availability of data which makes it easier to use with the storage servers. But production data does not always use object storage. That data lives in different data stores or content stores and needs to be fetched.    
8) Push versus pull semantics: Queries pull the data into the analytics and the same query can be repeated on a time series data store to represent charts and graphs on a dashboard constantly However message queues may need to be involved so that push and pull semantics can be delegated to publishers and subscribers themselves. This generally means that the tools of the trade may incur a delay from the source of truth.    
9) Pipelines become separate for batch processing and stream processing. Where Message queues may be used with batch processing, Apache Kafka is used for stream processing. While pipelines may be converted to Kafka, legacy pipelines will continue to be used. However, Apache Flume enables both RabbitMQ Flows and Kafka events to be journaled to S3.    
10) Analysis stacks: Each stack may have its own quirks. Grafana works well with time series database which means getting good charts and graphs on a dashboard would require data to flow from their source into a time series database. An object storage itself is not a full-fledged time series database but it can be made to work as a junction for data in transit from different sources and destinations.    A stream store unlike queues has a rich support for analytics via stream processing languages and frameworks.
These examples show some of the application roles in vectorized execution.

Wednesday, June 3, 2020

Application troubleshooting continued

Application role in vectorized execution:
Analytical software landscape has been trending towards newer forms of pipeline execution. The notion of pipeline execution stems from the practice that data can only be analyzed when it is in the form that analytics software can understand. This has inevitably meant getting the data into the system and preferably one with single unified and infinite storage. Earlier this meant getting data into the system in the form of Extract-Transform-Load with tools long used in the industry. Nowadays, a pipeline is built to facilitate capturing data, providing streaming access and providing infinite storage. Companies are showing increased usage of event based processing and pipelined execution   
Pipelined execution involves the following stages:    
1) acquisition    
2) extraction    
3) integration    
4) analysis    
5) interpretation    
The challenges facing pipelined execution involve:    
1) scale: A pipeline must hold against a data tsunami. In addition, data flow may fluctuate and the pipeline must hold against the ebb and the flow. Data may be measured in rate, duration and size and the pipeline may need to become elastic. Column stores and time series database have typically helped with this.    
2) Heterogeneity: Data types – their format and semantics have evolved to a wide degree. The pipeline must at least support primitives such as variants, objects and arrays. Support for different data stores has become more important as services have become microservices and independent of each other in their implementations including data storage. Data and their services have a large legacy which also needs to be accommodated in the pipeline. ETL operations and structured storage are still the norm for many industries.    
3) Extension: ETL may require flexibility in extending logic and in their usages on clusters versus servers. Microservices are much easier to be written. They became popular with Big Data storage. Together they have bound compute and storage into their own verticals with data stores expanding in number and variety.  Queries written in one service now need to be written in another service while the pipeline may or may not support data virtualization.     
4) Timeliness: Both synchronous and asynchronous processing need to be facilitated so that some data transfers can be run online while others may be relegated to the background. Publisher-subscriber message queues may be used in this regard.  Services and brokers do not scale as opposed to cluster- based message queues. It might take nearly a year to fetch the data into the analytics system and only a month for the analysis. While the benefit for the user may be immense, their patience for the overall time elapsed may be thin.    
5) Privacy: User’s location, personally identifiable information and location-based services are required to be redacted. This involves not only parsing for such data but also doing it over and over starting from the admission control on the boundaries of integration    

Tuesday, June 2, 2020

Application troubleshooting continued...

The Stream is usually not a single sequence of bytes but a co-ordination of multiple parallel segments. Segments are sequence of bytes and is not mixed with anything that is not data. Metadata exists in its own stream and is usually internal.
Flink watermarks are available to view via its web interface or to query via the metrics-system. The value is computed in the standard way of looking for the minimum of all watermarks received by the upstream operators.
Every event read from a stream also informs the position from where that event was read. This enables a reader to retry reading that event or use it as the start offset to resume reading after a pause. This comes useful for readers that export data from a stream store to other heterogeneous systems which may not have a support for watermark. 
Since an S3 bucket can store any number of objects and they can be listed, it is useful to store each event as an object in the same bucket. The name of a bucket can have a mapping to where the event originated from in the stream store. For example, an event has an event pointer that refers to its location in the stream that allows us to retrieve just the event. Similarly, the object can be given the event pointer as its prefix so that there is always a one to one mapping for the same event between the source and the destination. Since the event pointers maintain sequence in the representation as a string, the prefix of these objects are also sequential. It is now easy to select a sublist of these objects with a selection criteria in the S3 list request. Now with the one to one mapping, the readers are free to complete the data transfer in any order of events because they will be listed sequentially in the S3 listing.  
The stream store accepts data via gRPCIt exposes a client for this purpose. The client is at the same level as the stream store. The use of client for standalone instances of the stream store is probably the most preferred way to send data to the stream store because it involves no traversal of layers. The use of connectors on the other hand is used for the purpose of reaching a higher layer which is typical for applications and runtimes hosted on the Kubernetes cluster. This layer sits on top of the stream store and is demonstrated by Flink. Any application hosted on the Flink runtime will work with the streams in the store only via the connector. There is also a layer on top of this layer where we involve upstream of downstream heterogeneous storage system that neither has a connector nor a client and the only data transfer allowed is over the web. There are very few examples of data pipelines connected this way but this is probably the only user mode way to convert data from a file to a stream and in turn to a blob otherwise it is internal to the store and its tier 2. 

Monday, June 1, 2020

Application troubleshooting continued

The Stream is usually not a single sequence of bytes but a co-ordination of multiple parallel segments. Segments are sequence of bytes and is not mixed with anything that is not data. Metadata exists in its own stream and is usually internal.
Flink watermarks are available to view via its web interface or to query via the metrics-system. The value is computed in the standard way of looking for the minimum of all watermarks received by the upstream operators.
Every event read from a stream also informs the position from where that event was read. This enables a reader to retry reading that event or use it as the start offset to resume reading after a pause. This comes useful for readers that export data from a stream store to other heterogeneous systems which may not have a support for watermark. 
If the upstream is a stream store and the downstream is also a stream store, those instances come with their clients for readers and writers to utilize source timestamps to maintain order. For example, the writer comes with a method to noteTime and this is persisted in a separate stream. This metadata can be used by readers to determine if an event belongs to the current timeWindow. 
If the upstream store is not a stream store then the writer writing to the stream store can make use of eventTime.
When the task to retry and tolerate faults is implemented by both the reader and writer for the transfer of a single event between source and destination, it becomes a routine that can be made to run on multiple workers in parallel. The need for parallelization is very little when the event transfer is read and written once. The conversion of files to stream to blob is ubiquitous. The conversion within a datastore is also common. 
o The classes that JVM loads comes from three places, the jdk libraries under libs, the Flink plugins from plugins folder and the dynamic user code. 
o Application profiling can be set with env.java.opts
o There are parallel features available between Stream analytics software and stream store that can be leveraged by developers to troubleshoot issues in either Flink runtime execution or stream store execution. For example, to get a total count of events, we can use a batchClient from the stream store as follows:
        Checkpoint cp = readerGroup.initiateCheckpoint("batchClientCheckpoint", executor).join();
        StreamCut streamCut = cp.asImpl().getPositions().values().iterator().next();
        List<SegmentRange> ranges = Lists.newArrayList(batchClient.getSegments(stream, streamCut, StreamCut.UNBOUNDED).getIterator());
        int totalCountOfEvents = readFromRanges(ranges, batchClient);
 
        private int readFromRanges(List<SegmentRange> ranges, BatchClientFactory batchClient) {
        List<CompletableFuture<Integer>> eventCounts = ranges
                .parallelStream()
                .map(range -> CompletableFuture.supplyAsync(() -> batchClient.readSegment(range, new JavaSerializer<>()))
                                               .thenApplyAsync(segmentIterator -> {
                                                   log.debug("Thread " + Thread.currentThread().getId() + " reading events.");
                                                   int numEvents = Lists.newArrayList(segmentIterator).size();
                                                   segmentIterator.close();
                                                   return numEvents;
                                               }))
                .collect(Collectors.toList());
        return eventCounts.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
        }