Monday, June 1, 2020

Application troubleshooting continued

The Stream is usually not a single sequence of bytes but a co-ordination of multiple parallel segments. Segments are sequence of bytes and is not mixed with anything that is not data. Metadata exists in its own stream and is usually internal.
Flink watermarks are available to view via its web interface or to query via the metrics-system. The value is computed in the standard way of looking for the minimum of all watermarks received by the upstream operators.
Every event read from a stream also informs the position from where that event was read. This enables a reader to retry reading that event or use it as the start offset to resume reading after a pause. This comes useful for readers that export data from a stream store to other heterogeneous systems which may not have a support for watermark. 
If the upstream is a stream store and the downstream is also a stream store, those instances come with their clients for readers and writers to utilize source timestamps to maintain order. For example, the writer comes with a method to noteTime and this is persisted in a separate stream. This metadata can be used by readers to determine if an event belongs to the current timeWindow. 
If the upstream store is not a stream store then the writer writing to the stream store can make use of eventTime.
When the task to retry and tolerate faults is implemented by both the reader and writer for the transfer of a single event between source and destination, it becomes a routine that can be made to run on multiple workers in parallel. The need for parallelization is very little when the event transfer is read and written once. The conversion of files to stream to blob is ubiquitous. The conversion within a datastore is also common. 
o The classes that JVM loads comes from three places, the jdk libraries under libs, the Flink plugins from plugins folder and the dynamic user code. 
o Application profiling can be set with env.java.opts
o There are parallel features available between Stream analytics software and stream store that can be leveraged by developers to troubleshoot issues in either Flink runtime execution or stream store execution. For example, to get a total count of events, we can use a batchClient from the stream store as follows:
        Checkpoint cp = readerGroup.initiateCheckpoint("batchClientCheckpoint", executor).join();
        StreamCut streamCut = cp.asImpl().getPositions().values().iterator().next();
        List<SegmentRange> ranges = Lists.newArrayList(batchClient.getSegments(stream, streamCut, StreamCut.UNBOUNDED).getIterator());
        int totalCountOfEvents = readFromRanges(ranges, batchClient);
 
        private int readFromRanges(List<SegmentRange> ranges, BatchClientFactory batchClient) {
        List<CompletableFuture<Integer>> eventCounts = ranges
                .parallelStream()
                .map(range -> CompletableFuture.supplyAsync(() -> batchClient.readSegment(range, new JavaSerializer<>()))
                                               .thenApplyAsync(segmentIterator -> {
                                                   log.debug("Thread " + Thread.currentThread().getId() + " reading events.");
                                                   int numEvents = Lists.newArrayList(segmentIterator).size();
                                                   segmentIterator.close();
                                                   return numEvents;
                                               }))
                .collect(Collectors.toList());
        return eventCounts.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
        }

No comments:

Post a Comment