Saturday, January 18, 2020

Communication Protocols between independent programs – a comparisons of gRPC versus REST.
The popularity of web protocols has increased over the last decade because it helps connect heterogeneous applications and services that can be hosted anywhere. There are two popular protocols gRPC and REST. We will use their abbreviations as is with their comparisons as follows:
REST
This is a way of requesting resources from the remote end via standard verbs such as GET, PUT etc.
The advantages are:
Requires HTTP/1.1
Supports subscription mechanisms with REST hooks
Comes with widely accepted tool and browser support
Well defined road to development of the service that provides this communication
Supports discovery of resource identifiers with subsequent request response models.
Is supportive of software development kit where more than one language can be supported for the use of these communication interfaces.
The disadvantages are:
Is considered chatty because there are a number of requests and responses
Is considered heavy because the payload is usually large.
Is considered inflexible at times with versioning costs
gRPC:
This is a way of requesting resources from the remote end because the application by processing routines rather than asking for resources. Routines are the equivalent of verbs and resources and some treat this communication as a refinement of RPC and SOAP which were protocols that are now considered legacy.
The advantages are:
Supports high speed communication because it is lightweight and does not require the traversal of stack all the way up and down the networking layers.
The messages are over “Protocol Buffer” which is known for being efficient in packing and unpacking data
It works over newer HTTP/2
Best for traffic from devices (IoT)
The disadvantages are
Requires client to write code
Does not support browser
Both REST and gRPC support secure transport layer communication which makes communication between two parties as private. When corporations make significant investment in the development of each, they tend to be a choice for development teams. However, supporting both communication protocol only widens the audience and does not have to be mutually exclusive given enough resources and time. They also broaden the customer base.
Sample implementation: https://github.com/ravibeta/pravega
https://travis-ci.com/ravibeta/pravega

Friday, January 17, 2020


  1. Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...

    8. Eighth, there are special capabilities with statefulset which include the following:

    a. They can be used to create replicas when pods are being deployed. The pods are created sequentially in order from 0 to N-1 When the pods are deleted, they are terminated in the reverse order

    b. They can be used to created ordered and graceful scaling. All of the predecessors are ensured to be ready and running prior to scaling

    c. Before a pod is terminated, all of its successors must be completely shut down.

    The above set of guarantees is referred to as the “OrderedReady” pod management.

    There is also parallel pod management which does not chain the pods.

    Statefulset can also be used to perform rolling updates. This is one case where healthy pods may be terminated. Kubernetes slowly terminates old pods while spinning up new ones. If a node is drained, Kubernetes terminates all the pods on that node. If a node runs out of resources, pods may be terminated to free some resource. While we discussed SIGTERM and preStop hook, we have not discussed an appropriate limit for the terminationGracePeriodSeconds  on the pod spec. This is typically set to 30 or 60 seconds but it merely has to be greater than the duration of running all the chained handlers for the termination messages

    Please note that the use of “lifecycle: command: ” scripts in postStart and preStop. These should ideally not use “/bin/sh -c” because they don’t pass messages. It is preferable to either use dumb-init or actual executable that handles ^C event. 

  2. When a software product comprises of multiple independent applications, each application may get a message from the infrastructure. The application then handles the message as appropriate regardless of who sent the message. However, applications also tend to have coordinators in a cluster-based deployment model. In such a case, the coordinator might know a better way to gracefully shutdown the application.  For example, “./bin/flink stop” is a better way to shut down a long running analytical application. This gives the chance for the coordinator to relay any additional commands along with the shutdown and the application to piggy back a suitable response to the coordinator. The infrastructure message then takes a form of communication in the layer above that participating applications and coordinator knows best how to handle. The distributed model is especially beneficial for graceful shutdown because different roles in the cluster can now share the prepartion chores for the shutdown suitable to that application or globally. In such cases, the cleanup also provides an opportunity to save state for better and more efficient post shutdown activities. 

  1. Finally, a software product can choose to alleviate inefficiencies in the distribution of termination messages by providing one publisher and one subscriber model. The publisher will inevitably be the infrastructure while the subscriber will be the component of the product. The termination message is always an interrupt and will be most efficiently routed to a single destination which can guarantee a graceful shutdown. Efficiency in this case is not as much about cost from communication as it is about increasing reliability and data-safety during the graceful shutdown procedure by doing the necessary minimal. 
These are some of the techniques used for the purpose of a graceful shutdown of an application hosted on the Kubernetes orchestration framework. 
  
#Apache flink split events into windows:
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) 
                .allowedLateness(Time.milliseconds(1)) 

Is used to separate the events into windows. 

Thursday, January 16, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...
Even a script launched with dumb-init allows the message propagation to the programs as shown below:
#!/usr/bin/dumb-init /bin/sh
aBackgroundProcess &  # launch a process in the background
aForegroundProcess      # launch another process in the foreground
And the change to the Dockerfile is minimal as shown below:
ENTRYPOINT [“/usr/local/bin/dumb-init”, “--”]
CMD [“path/to/file”]
Note that the json syntax above is necessary and the path to the file should be a full literal otherwise the expanded “sh -c” does not propagate the message.
The terminationGracePeriod, if specified, must be greater than the duration of wait within any preStop hook execution.
8. Eighth, there are special capabilities with statefulset which include the following:
a. They can be used to create replicas when pods are being deployed. The pods are created sequentially in order from 0 to N-1 When the pods are deleted, they are terminated in the reverse order
b. They can be used to created ordered and graceful scaling. All of the predecessors are ensured to be ready and running prior to scaling
c. Before a pod is terminated, all of its successors must be completely shut down.
The above set of guarantees is referred to as the “OrderedReady” pod management.
There is also parallel pod management which does not chain the pods.
Statefulset can also be used to perform rolling updates. This is one case where healthy pods may be terminated. Kubernetes slowly terminates old pods while spinning up new ones. If a node is drained, Kubernetes terminates all the pods on that node. If a node runs out of resources, pods may be terminated to free some resource. While we discussed SIGTERM and preStop hook, we have not discussed an appropriate limit for the terminationGracePeriodSeconds  on the pod spec. This is typically set to 30 or 60 seconds but it merely has to be greater than the duration of running all the chained handlers for the termination messages
Please note that the use of “lifecycle: command: ” scripts in postStart and preStop. These should ideally not use “/bin/sh -c” because they don’t pass messages. It is preferable to either use dumb-init or actual executable that handles ^C event. 

Wednesday, January 15, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...

6. Sixth, the technique for transparently passing graceful shutdown message to only user activities through the system and application hosted on the system could be sufficient since they only user activities are the unknown. They could be long running jobs with or without checkpoints and savepoints. The ability to send command such as ./bin/flink stop -d "jobID" during flink container shutdown will be helpful to not lose state and allow the runtime to take the necessary steps prior to exit. User mode activities take precedence over system activities because the latter is already fairly robust and handled by the specifications to K8s.

7. Seventh, When the user process has been sufficiently enhanced to take care of long running jobs, the invocation of containers could be addressed next. These include using a dumb-init program to run as PID 1 in the minimal container that is popular on Kubernetes infrastructure. When a process is run as PID 1 directly, the operating system gives special treatment to the process. Most process don’t take advantange of this special treatment and the operating system does not terminate the process. Using a relay such as dumb-init which becomes the PID 1 process now conveys the SIGTERM and SIGKILL messages to the process for which the container was launched while getting the general treatment that the process needed to have for shutdown.
Even a script launched with dumb-init allows the message propagation to the programs as shown below:
#!/usr/bin/dumb-init /bin/sh
aBackgroundProcess &  # launch a process in the background
aForegroundProcess      # launch another process in the foreground
And the change to the Dockerfile is minimal as shown below:
ENTRYPOINT [“/usr/local/bin/dumb-init”, “--”]
CMD [“path/to/file”]

Tuesday, January 14, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...

4. Fourth the application translates the SIGTERM message to a control plane message for all the components.
This is the re-use of the pattern that the ‘preStop’ was designed with but at different scopes. An application might consist of different software programs as components. The programs may even come from different vendors. The ‘preStop’  lifecycle hook in the chart pertaining to the deployment specification of the program involved, might have a step that relays the command to the program. For example, a ‘preStop’ handling such as [“/usr/bin/nginx” , “-s”, “quit”] allows graceful termination of the ‘nginx’ program by translating the preStop to something that the program understands which, in this case, is the command quit. The specification instead could be one that relays commands to participants with varying scopes of influence to do what is required to exit gracefully.

5. Fifth, transparently passing graceful shutdown to only user activities since they could be long running jobs with or without checkpoints and savepoints. The ability to send command such as ./bin/flink stop -d "jobID" during flink container shutdown will be helpful to not lose state and allow the runtime to take the necessary steps prior to exit. User mode activities take precedence over system activities because the latter is already fairly robust and handled by the specifications to K8s

The preStop and postStart are life cycle hooks on the container. This is helpful to have for deployment but it does not guarantee that a new pod will come up only after an old one is closed. Even if we set the hooks, there are can be other containers with the same spec. A better way to ensure no new containers are spun up while the old one exits is to use the statefulset which is designed for this purpose.

#codingexercise
Notice the difference between KeyedDataStream and DataStream. 
The logic to analyze the stream change from: 
input.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<String>) new LogTimestampAssigner()) 
                .flatMap(new ExceptionTimestampExtractor()) 
                .keyBy(new TimestampExceptionKeySelector()) 
                .process(new LogExceptionKeyedProcessFunction()) 
                .print(); 

To  
input.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<String>) new LogTimestampAssigner()) 
                .flatMap(new ExceptionTimestampExtractor()) 
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) 
                .allowedLateness(Time.milliseconds(1)) 
                .process(new LogExceptionProcessFunction()) 
                .print(); 

Also, the timerService is no longer available in the context. 
public class LogExceptionProcessFunction extends ProcessWindowFunction<Tuple2<String, String>, LogException, String, TimeWindow> { 
  
    @Override 
    public void open(Configuration parameters) throws Exception { 
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", LogExceptionProcessFunction.CountWithTimestamp.class)); 
    } 
  
    @Override 
    public void process(String s, Context context, Iterable<Tuple2<String, String>> elements, Collector<LogException> out) throws Exception { 
: 
// context.timerService().registerProcessingTimeTimer(current.lastModified + 30000); 
} 
} 

Monday, January 13, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...
2. Second the application specific actions to save state.
The application exits with an internal server error when it encounters an internal failure. This lets the probe for its health check to fail letting the Kubernetes host to deregister the service for this pod from its active list. This is no different from what an application would do when it is encountering a SIGTERM message.  The incoming traffic to the application stops when the K8s decides or the readiness probe fails. The application has no control over the incoming traffic. It will often encounter failure to process current requests when such a message is received. There can be a number of incoming requests that become current after the SIGTERM was issued and during the grace period. This may be due to many reasons and cannot be avoided. For example, the alpine /bin/sh shell does not even send SIGTERM. This is a common pitfall when a Dockerfile ends with a “CMD myapp”. Instead, the following CMD [“myapp”] may be used. Applications written in GO language can certainly register for the syscall.SIGTERM rather than the os.Interrupt call.
3. Third the out-of-band packaging of state in a read-only manner.
Applications wishing to save state such as for backup, uninstall or out-of-band dynamic requests will usually trigger it in their components so that they can do so in their proprietary format. Since the application may track user resources per session, it is possible for the application to collect state corresponding to the user in a session from all of the components involved and package it in a manner that it can export and work with another instance of the application on another host or even an upgrade of the application on the same host. The tool that packs and unpacks this state from the resources used to serve the users’ requests can even be external to the application so that the application can remain online and focus on the business needs of the user rather the operational needs. Alternatively, the routines to save state may run on background workers of the application. The packing and unpacking of state allow restore either local or remote to the origin.

#codingexercise
Flink allows timestamps to be assigned but when we assign timestamps we ensure their distinctness and order. For example:
package com.dellemc.pravega.app;

import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class LogTimestampAssigner implements org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<String>, org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return null;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(String s, long l) {
        return null;
    }

    @Override
    public long extractTimestamp(String s, long l) {
        return System.currentTimeMillis();
    }
}

will assign timestamps for the following string entries:
2019-12-23 19:40:23,909', data='2019-12-23 19:40:23,909 ERROR Line1
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.,
as follows:
[KeyedProcess -> Sink: Print to Std. Out (1/1)] ERROR com.dellemc.pravega.app.LogExceptionKeyedProcessFunction - s=2019-12-23 19:40:23,909,timestamp=1578924603182
[KeyedProcess -> Sink: Print to Std. Out (1/1)] ERROR com.dellemc.pravega.app.LogExceptionKeyedProcessFunction - s=2019-12-23 19:40:24,557,timestamp=1578924603558
[KeyedProcess -> Sink: Print to Std. Out (1/1)] ERROR com.dellemc.pravega.app.LogExceptionKeyedProcessFunction - s=2019-12-23 19:40:23,909,timestamp=1578924603558
[KeyedProcess -> Sink: Print to Std. Out (1/1)] ERROR com.dellemc.pravega.app.LogExceptionKeyedProcessFunction - s=2019-12-23 19:40:23,909,timestamp=1578924603558
where the first value is the timestamp used to group the entries using ExceptionTimestampExtractor() and the second is the one assigned by LogTimestampAssigner.
The results remain  unchanged when converting them to LogExceptions with stacktraces:
LogException{id='c9685d77-8c07-40ed-890d-1101b744df11', timestamp='2019-12-23 19:40:23,909', data='2019-12-23 19:40:23,909 ERROR Line1, at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546), at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421),'}
LogException{id='0685d218-d7ce-43b6-823c-6bd9f40abcc2', timestamp='2019-12-23 19:40:24,557', data='2019-12-23 19:40:24,557 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.,'}

Sunday, January 12, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s)
Software, that serves mission critical commercial deployments such as hosting the web pages for a company, is expected to run 24x7 without interruptions. Kubernetes orchestration framework, or K8s for short, enables this mode of operations by providing layered runtime, operating system, containers, pods and hosts dynamically so that even if there is a failure in one, another may come online without any disruption to traffic from customers. This is the expectation from a fault tolerant system.
Applications are written primarily for business requirements to serve the customer rather than the onus of being fault-tolerant. Applications hosted on K8s these days are also different from the applications written earlier for traditional systems where predecessors were monolithic, bound to immovable systems, prone to failures and often requiring external monitoring software. K8s has lightened the load on the application even doing away with the requirement for monitoring. Instead, applications can opt to sign up for SIGTERM messages so that they can shutdown gracefully.  Most applications have a logic built in for backups or uninstalls and the routine to handle this message is no different from them. The software to perform backup or uninstall of application is usually packaged with a tool or an installer that knows how to shut down the application – either forcefully with loss of data or gracefully by performing the steps that lets the application save its state prior to exit.
This article describes a few techniques for the graceful shutdown:
1. First the Kubernetes compliant way for a graceful shutdown:
The handling of the order of the shutdown of its components is the additional piece required to handle this message from the K8s framework. When this routine is written, it makes no difference if it is invoked by a command-line tool, installer or K8s. The application also provides a centralized command mechanism to execute this operation. Most of the processing in the components pertains to computing, storage and networking.
There are steps taken by K8s before sending this message to the application. This is referred to at the K8s termination lifecycle. First, it stops the pod from receiving more traffic from customers by setting it to a terminating state. Then it invokes the ‘preStop’ http request that is sent to the containers in a pod which is a way to allow application deployers to take action for termination when there is no application logic to handle the SIGTERM. K8s gives a grace period after ‘preStop’. This is usually about thirty seconds and starts counting immediately after sending the request. This grace period can be customized to a preset limit. There is no exponential back off in this case since shutdown request is imperative. Finally, after the expiration of the timer, the SIGKILL message is sent and the pod is forcibly removed. After this all the resources Kubernetes maintained for the pod are cleaned up.