Tuesday, January 14, 2020

Ideas for a graceful shutdown of an application hosted on Kubernetes orchestration framework (K8s) continued...

4. Fourth the application translates the SIGTERM message to a control plane message for all the components.
This is the re-use of the pattern that the ‘preStop’ was designed with but at different scopes. An application might consist of different software programs as components. The programs may even come from different vendors. The ‘preStop’  lifecycle hook in the chart pertaining to the deployment specification of the program involved, might have a step that relays the command to the program. For example, a ‘preStop’ handling such as [“/usr/bin/nginx” , “-s”, “quit”] allows graceful termination of the ‘nginx’ program by translating the preStop to something that the program understands which, in this case, is the command quit. The specification instead could be one that relays commands to participants with varying scopes of influence to do what is required to exit gracefully.

5. Fifth, transparently passing graceful shutdown to only user activities since they could be long running jobs with or without checkpoints and savepoints. The ability to send command such as ./bin/flink stop -d "jobID" during flink container shutdown will be helpful to not lose state and allow the runtime to take the necessary steps prior to exit. User mode activities take precedence over system activities because the latter is already fairly robust and handled by the specifications to K8s

The preStop and postStart are life cycle hooks on the container. This is helpful to have for deployment but it does not guarantee that a new pod will come up only after an old one is closed. Even if we set the hooks, there are can be other containers with the same spec. A better way to ensure no new containers are spun up while the old one exits is to use the statefulset which is designed for this purpose.

#codingexercise
Notice the difference between KeyedDataStream and DataStream. 
The logic to analyze the stream change from: 
input.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<String>) new LogTimestampAssigner()) 
                .flatMap(new ExceptionTimestampExtractor()) 
                .keyBy(new TimestampExceptionKeySelector()) 
                .process(new LogExceptionKeyedProcessFunction()) 
                .print(); 

To  
input.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<String>) new LogTimestampAssigner()) 
                .flatMap(new ExceptionTimestampExtractor()) 
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) 
                .allowedLateness(Time.milliseconds(1)) 
                .process(new LogExceptionProcessFunction()) 
                .print(); 

Also, the timerService is no longer available in the context. 
public class LogExceptionProcessFunction extends ProcessWindowFunction<Tuple2<String, String>, LogException, String, TimeWindow> { 
  
    @Override 
    public void open(Configuration parameters) throws Exception { 
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", LogExceptionProcessFunction.CountWithTimestamp.class)); 
    } 
  
    @Override 
    public void process(String s, Context context, Iterable<Tuple2<String, String>> elements, Collector<LogException> out) throws Exception { 
: 
// context.timerService().registerProcessingTimeTimer(current.lastModified + 30000); 
} 
} 

No comments:

Post a Comment