Wednesday, December 11, 2019

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
The assignment of timestamps and watermarks is immediately after the data source, but is not strictly required. We can parse and filter before the timestamp assigner.
The parse function is done with the help of MapFunction. This follows a syntax of
DataSet<Y> result = input.map(new MyMapFunction());
The filter function is done with the help of
DataSet<X> result = input.filter(new MyFilterFunction());
Timestamp assigners take a stream and produce a new version with the overwrites in case the original stream had these timestamps and watermarks. As long as the assignment happens prior to the first operation on the event time, the orderly processing of the events remains the same as if the assignment happened at the source. In some special cases, the assignment may even be possible at the source.
The order of the events within a window follows the order of the timestamp and watermarks. However, there can be elements that violate the watermarks. A violation happens when a watermark has been set but the event encountered has a timestamp that precedes it. The difference in time between the newly arrived event and the watermark is called the lateness of the event. This might occur due to many reasons such as when the watermark is placed due to a large number of events that arrive together and the watermark is placed to help with the skipping off reprocessing these events. Even when the window is large, watermarks may be placed to indicate progress based on the number of events processed. In fact, in real world conditions, lateness of events is noticeable and often accommodated for in terms of processing. Lateness is also one of the reasons for the setting the preference of time-based processing type in the stream execution environment. If we use a key to the events and it has to be a timestamp, we have to keep the notion of the timestamp the same across processors and consistent with the way the event processing works. Message queues do not suffer from this problem because they don’t necessarily have to enforce order. Sometimes even poisoning the message or putting it in dead letter queue is sufficient.


Tuesday, December 10, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to

: processing time, event time and ingestion time.

Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.

Event time also support watermarks

Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.



stream



    .keyBy( (event) -> event.getUser() )



    .timeWindow(Time.hours(1))



    .reduce( (a, b) -> a.add(b) )



    .addSink(...);


Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.
When watermarking and timestamps are assigned, the source timestamps and watermarks are ignored. The assigner overrides and overwrites the timestamps and watermarks in the source. It is easy for the assignee to assign timestamps as well as watermarks since they go hand in hand. The timestamps and assignees can directly be added to the source which just use the collectWithTimestamp method on the source Context. Similarly the emitWatermark is used to generate watermarks.
Watermarks have an interesting side effect in Flink. If the event is processed as ‘failing’, it is not in failed state. A failed state is a terminal state and consequently not restarted in processing. However, a failing event can  be resubmitted and this causes the event to re enter the failing state endlessly.

Monday, December 9, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to
: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

Sunday, December 8, 2019

Flink Applications brings the best of historical data processing to time-series data since it follows a streaming model. This applies even to metrics not just logs. There can even be better monitoring and alerting mechanisms applied to streams given the datastream Apis of Flink. The Table Apis and SQL abstraction of Flink provides even more convenience to users and these can support the use of existing reporting mechanisms. SQL has been the language of queries and tables have been the means of storage for a while. The existing solutions, stacks and technology built on top of these existing queries hold a lot of business value. As long as they do not get regressions and can leverage the additional benefits of stream processing which was not possible earlier, they will get even more acceptance.
Let us now take a few examples of historical data processing with Flink APIs:
Please note that the historical data uses the notion of Event time which is the time at which the records were processed.  The stream processing might make use of a different timestamp which is the referred to as the processing time. This is the time of the local clock on the host where the stream is processed. An hourly processing time window will include all records from that hour as determined by the system clock. The processing time does not require coordination between streams and machines. Therefore it has the best performance and lowest latency but is prone to delays and outages. This is mitigated by the use of Event timestamps which are global in nature and work even in distributed systems with completely consistent and deterministic results.
The time characteristic can be set before the evaluation of queries:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Once we instantiate the stream from a data source, we can then execute a query:
stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. 

Saturday, December 7, 2019

Sample code signing with Microsoft tools:
Step 1. Install Microsoft Windows SDK suitable to the desktop Windows version.
Step 2. Check that the following two files exists:
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\makecert.exe
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\pvk2pfx.exe
Step 3. MakeCert /n "CN=My Company Name, O=My Company, C=US" /r /h 0 /eku "1.3.6.1.5.5.7.3.3,1.3.6.1.4.1.311.10.3.13" /e 12/30/2025 /sv \EMCCerts\testcert.out \EMCCerts\testcert.cer
Succeeded
Step 4. Pvk2Pfx /pvk \EMCCerts\testcert.out  /pi <password> /spc \EMCCerts\testcert.cer  /pfx \EMCCerts\testcertNew.pfx /po <password>
Step 5. Certutil -addStore TrustedPeople \EMCCerts\testcert.cer
TrustedPeople "Trusted People"
Signature matches Public Key
Certificate "DellEMC Streaming Data Platform" added to store.
CertUtil: -addstore command completed successfully.
Step 6. signtool.exe sign /fd SHA256 /a /f \EMCCerts\testcertNew.pfx /p <password> \my-file-to-sign.zip
Step 7. makeappx.exe pack /f \unsigned\Appx.map /p \Signed\my-file-to-sign.zip
Microsoft (R) MakeAppx Tool
Copyright (C) 2013 Microsoft.  All rights reserved.

The path (/p) parameter is: "\\?\Signed\my-file-to-sign.appx"
The mapping file (/f) parameter is: "unsigned\Appx.map"
Reading mapping file "unsigned\Appx.map"
Packing 3 file(s) listed in "unsigned\Appx.map" (mapping file) to "\\?\Signed\my-file-to-sign.zip" (output file name).
Memory limit defaulting to 8529401856 bytes.
Using "unsigned\AppxManifest.xml" as the manifest for the package.
Processing "unsigned\my-file-to-sign.zip" as a payload file.  Its path in the package will be "my-file-to-sign.zip".
Processing "unsigned\AppTile.png" as a payload file.  Its path in the package will be "AppTile.png".
Package creation succeeded.

Sample Code signing with gpg tool:
Gpg –output doc.sig –sign doc
You need a passphrase to unlock the private key for
User: “Alice (Judge) alice@cyb.org”
1024-bit DSA key, ID BB7576AC, created 1999-06-04
Enter passphrase:

Friday, December 6, 2019

Signing files using private key
Signing is the process by which a digital signature is created from the file contents. The signature proves
that there was no tampering with the contents of the file. The signing itself does not need to encrypt the
file contents to generate the signature. In some cases, a detached signature may be stored as a separate
file. Others may choose to include the digital signature along with the set of files as an archive.
There are other processes as well that can help with checking the integrity of files. These include
hashing of files, generating a checksum and others. Signing differs from those methods in that it uses a
private-public key pair to compute the digital signature. The private key is used to sign a file while the
public key is used to verify the signature. The public key can be published with the signature or it can be
made available in ways that are well-known to the recipients of the signed files.
The process of signing can take any form of encryption methods. The stronger the encryption the better
the signature and lesser the chances that the file could have been tampered. The process of signing
varies across operating system.
Popular linux family hosts often use the ‘gpg’ tool to sign and verify the files. This tool even generates
the key-pair with which to sign the files. The resulting signature is in the Pretty Good Privacy protocol
format and stored as a file with extension .asc. Publishing the public key along with the detached
signature is a common practice for many distributions of code.
Microsoft uses separate tools for making the key-certificate pair and the generation of the signature.
The signer tool used by this company packs the payload into an altogether new file. The signature is part
of the new file and can function just like an archive. The same algorithm and strength can also be used
with the signer tool as it was with the gpg tool.
The use of certificates is popular because they can be shared easily and have no liability. The recipient
uses the certificate to verify the signature.
These are some of the means for signing and its use is widespread across use cases such as code
commits, blockchain and digital rights management.

Thursday, December 5, 2019

We were discussing Flink applications and the use of stream store such as Pravega

it is not appropriate to encapsulate an Flink connector within the http request handler for data ingestion at the store. This API is far more generic than the upstream software used to send the data because the consumer of this REST API could be the user interface, a language specific SDK, or shell scripts that want to make curl requests. It is better for the rest API implementation to directly accept the raw message along with the destination and authorization.
The policy for read operations should be such that they can be independent and scalable. A read only policy suffices for this purpose. The rest can be read-write.
The separation of read-write from read-only also helps with their treatment differently. For example, it is possible to replace the technology for the read-only separately from the technology for read-write. Even the technology for read-only can be swapped from one to another for improvements on this side.
An example of analysis on a read only path is the extraction of exception stack trace from logs:

The utility of Flink applications is unparalleled such as with a stackTrace hasher example to collect the top exceptions encountered from log archives:
private static class ExtractStackTraceHasher implements FlatMapFunction<String, String>{
                @Override
                public void flatMap(String value, Collector< String> out) throws Exception {
StringTokenizer tokenizer = new StringTokenizer(value);
While (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken();
If (word.contains(“Exception:”) {
int start = value.indexOf(word); // word has unique timestamp
int end = value.indexOf(word.substring(8), start+8);
if (end != -1 && start != -1 && end > start) {
String exceptionString = value.substring(start+11, end); // skip the timestamp
Throwable error = Throwable.parse(exceptionString);
var stackHasher = new net.logstash.logback.stacktrace.StackHasher();
out.collect(stackHasher.hexHash(error));
}
}
}
                }
        }

The results from the above iteration can be combined combined for each iteration of the files on the archive location.
public class TopStackTraceMerger implements ReduceFunction<map<String, int>> {
  @Override
  public Integer reduce(map<String, int> set1, map<String, int> set2) {
    return merge(set1, set2);
  }
}