Monday, December 9, 2019

We were discussing that Flink Applications support historical data processing with the help of timestamps on entries. There can be three different types of timestamps corresponding to
: processing time, event time and ingestion time.
Out of these only the event time guarantees completely consistent and deterministic results. All three processing types can be set on the StreamExecutionEnvironment prior to the execution of queries.
Event time also support watermarks
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator.

stream

    .keyBy( (event) -> event.getUser() )

    .timeWindow(Time.hours(1))

    .reduce( (a, b) -> a.add(b) )

    .addSink(...);

Sunday, December 8, 2019

Flink Applications brings the best of historical data processing to time-series data since it follows a streaming model. This applies even to metrics not just logs. There can even be better monitoring and alerting mechanisms applied to streams given the datastream Apis of Flink. The Table Apis and SQL abstraction of Flink provides even more convenience to users and these can support the use of existing reporting mechanisms. SQL has been the language of queries and tables have been the means of storage for a while. The existing solutions, stacks and technology built on top of these existing queries hold a lot of business value. As long as they do not get regressions and can leverage the additional benefits of stream processing which was not possible earlier, they will get even more acceptance.
Let us now take a few examples of historical data processing with Flink APIs:
Please note that the historical data uses the notion of Event time which is the time at which the records were processed.  The stream processing might make use of a different timestamp which is the referred to as the processing time. This is the time of the local clock on the host where the stream is processed. An hourly processing time window will include all records from that hour as determined by the system clock. The processing time does not require coordination between streams and machines. Therefore it has the best performance and lowest latency but is prone to delays and outages. This is mitigated by the use of Event timestamps which are global in nature and work even in distributed systems with completely consistent and deterministic results.
The time characteristic can be set before the evaluation of queries:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Once we instantiate the stream from a data source, we can then execute a query:
stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
Watermarks is the mechanism in Flink to measure progress in event time. They are simply inlined with the events. As a processor advances its timestamp, it introduces a watermark for the downstream operators to process. In the case of distributed systems where an operator might get inputs from more than one streams, the watermark on the outgoing stream is determined from the minimum of the watermarks from the invoking streams. As the input streams update their event times, so does the operator. 

Saturday, December 7, 2019

Sample code signing with Microsoft tools:
Step 1. Install Microsoft Windows SDK suitable to the desktop Windows version.
Step 2. Check that the following two files exists:
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\makecert.exe
C:\Program Files (x86)\Windows Kits\10\bin\10.0.18362.0\x64\pvk2pfx.exe
Step 3. MakeCert /n "CN=My Company Name, O=My Company, C=US" /r /h 0 /eku "1.3.6.1.5.5.7.3.3,1.3.6.1.4.1.311.10.3.13" /e 12/30/2025 /sv \EMCCerts\testcert.out \EMCCerts\testcert.cer
Succeeded
Step 4. Pvk2Pfx /pvk \EMCCerts\testcert.out  /pi <password> /spc \EMCCerts\testcert.cer  /pfx \EMCCerts\testcertNew.pfx /po <password>
Step 5. Certutil -addStore TrustedPeople \EMCCerts\testcert.cer
TrustedPeople "Trusted People"
Signature matches Public Key
Certificate "DellEMC Streaming Data Platform" added to store.
CertUtil: -addstore command completed successfully.
Step 6. signtool.exe sign /fd SHA256 /a /f \EMCCerts\testcertNew.pfx /p <password> \my-file-to-sign.zip
Step 7. makeappx.exe pack /f \unsigned\Appx.map /p \Signed\my-file-to-sign.zip
Microsoft (R) MakeAppx Tool
Copyright (C) 2013 Microsoft.  All rights reserved.

The path (/p) parameter is: "\\?\Signed\my-file-to-sign.appx"
The mapping file (/f) parameter is: "unsigned\Appx.map"
Reading mapping file "unsigned\Appx.map"
Packing 3 file(s) listed in "unsigned\Appx.map" (mapping file) to "\\?\Signed\my-file-to-sign.zip" (output file name).
Memory limit defaulting to 8529401856 bytes.
Using "unsigned\AppxManifest.xml" as the manifest for the package.
Processing "unsigned\my-file-to-sign.zip" as a payload file.  Its path in the package will be "my-file-to-sign.zip".
Processing "unsigned\AppTile.png" as a payload file.  Its path in the package will be "AppTile.png".
Package creation succeeded.

Sample Code signing with gpg tool:
Gpg –output doc.sig –sign doc
You need a passphrase to unlock the private key for
User: “Alice (Judge) alice@cyb.org”
1024-bit DSA key, ID BB7576AC, created 1999-06-04
Enter passphrase:

Friday, December 6, 2019

Signing files using private key
Signing is the process by which a digital signature is created from the file contents. The signature proves
that there was no tampering with the contents of the file. The signing itself does not need to encrypt the
file contents to generate the signature. In some cases, a detached signature may be stored as a separate
file. Others may choose to include the digital signature along with the set of files as an archive.
There are other processes as well that can help with checking the integrity of files. These include
hashing of files, generating a checksum and others. Signing differs from those methods in that it uses a
private-public key pair to compute the digital signature. The private key is used to sign a file while the
public key is used to verify the signature. The public key can be published with the signature or it can be
made available in ways that are well-known to the recipients of the signed files.
The process of signing can take any form of encryption methods. The stronger the encryption the better
the signature and lesser the chances that the file could have been tampered. The process of signing
varies across operating system.
Popular linux family hosts often use the ‘gpg’ tool to sign and verify the files. This tool even generates
the key-pair with which to sign the files. The resulting signature is in the Pretty Good Privacy protocol
format and stored as a file with extension .asc. Publishing the public key along with the detached
signature is a common practice for many distributions of code.
Microsoft uses separate tools for making the key-certificate pair and the generation of the signature.
The signer tool used by this company packs the payload into an altogether new file. The signature is part
of the new file and can function just like an archive. The same algorithm and strength can also be used
with the signer tool as it was with the gpg tool.
The use of certificates is popular because they can be shared easily and have no liability. The recipient
uses the certificate to verify the signature.
These are some of the means for signing and its use is widespread across use cases such as code
commits, blockchain and digital rights management.

Thursday, December 5, 2019

We were discussing Flink applications and the use of stream store such as Pravega

it is not appropriate to encapsulate an Flink connector within the http request handler for data ingestion at the store. This API is far more generic than the upstream software used to send the data because the consumer of this REST API could be the user interface, a language specific SDK, or shell scripts that want to make curl requests. It is better for the rest API implementation to directly accept the raw message along with the destination and authorization.
The policy for read operations should be such that they can be independent and scalable. A read only policy suffices for this purpose. The rest can be read-write.
The separation of read-write from read-only also helps with their treatment differently. For example, it is possible to replace the technology for the read-only separately from the technology for read-write. Even the technology for read-only can be swapped from one to another for improvements on this side.
An example of analysis on a read only path is the extraction of exception stack trace from logs:

The utility of Flink applications is unparalleled such as with a stackTrace hasher example to collect the top exceptions encountered from log archives:
private static class ExtractStackTraceHasher implements FlatMapFunction<String, String>{
                @Override
                public void flatMap(String value, Collector< String> out) throws Exception {
StringTokenizer tokenizer = new StringTokenizer(value);
While (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken();
If (word.contains(“Exception:”) {
int start = value.indexOf(word); // word has unique timestamp
int end = value.indexOf(word.substring(8), start+8);
if (end != -1 && start != -1 && end > start) {
String exceptionString = value.substring(start+11, end); // skip the timestamp
Throwable error = Throwable.parse(exceptionString);
var stackHasher = new net.logstash.logback.stacktrace.StackHasher();
out.collect(stackHasher.hexHash(error));
}
}
}
                }
        }

The results from the above iteration can be combined combined for each iteration of the files on the archive location.
public class TopStackTraceMerger implements ReduceFunction<map<String, int>> {
  @Override
  public Integer reduce(map<String, int> set1, map<String, int> set2) {
    return merge(set1, set2);
  }
}

Wednesday, December 4, 2019

We were discussing Flink applications and the use of stream store such as Pravega
it is not appropriate to encapsulate an Flink connector within the http request handler for data ingestion at the store. This API is far more generic than the upstream software used to send the data because the consumer of this REST API could be the user interface, a language specific SDK, or shell scripts that want to make curl requests. It is better for the rest API implementation to directly accept the raw message along with the destination and authorization.
There are two factors I want to discuss further  when comparing the analytical applications:
First, the use of Pravega as a stream store should not mandate the use of an Flink Application with Flink Connector. Data can be sent and read directly to and from the Pravega store respectively. However, the use of FlinkApplication is generally for performing transformations and queries which are both helpful when done in a cluster mode so that they can scale to large data sets.
Second data path is critical so it is not necessary to combine the collection and the analysis together.
Although most analysis works well only when certain collection happens, it is not necessary to make the collection heavy in terms of its processing. The collection is a data path and can almost always be streamlined.  The analysis can happen as the collection happens because it is stream processing. However, analysis does not have to happen within the collection. It can execute separately and as long as there is a queue of events collected, analysis can begin with stream processing even for varying rates of ingestion and analysis because they are write and read paths respectively that are best kept separate.

Tuesday, December 3, 2019

The utility of Flink applications is unparalleled such as with a stackTrace hasher example to collect the top exceptions encountered from log archives:
private static class ExtractStackTraceHasher implements FlatMapFunction<String, String>{
                @Override
                public void flatMap(String value, Collector< String> out) throws Exception {
StringTokenizer tokenizer = new StringTokenizer(value);
While (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken();
If (word.contains(“Exception:”) {
int start = value.indexOf(word); // word has unique timestamp
int end = value.indexOf(word.substring(8), start+8);
if (end != -1 && start != -1 && end > start) {
String exceptionString = value.substring(start+11, end); // skip the timestamp
Throwable error = Throwable.parse(exceptionString);
var stackHasher = new net.logstash.logback.stacktrace.StackHasher();
out.collect(stackHasher.hexHash(error));
}
}
}
                }
        }

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
For (string path: pathsInExtractedLogArchive(archiveLocation)) {
env.readTextFile(path)
            .flatMap(new ExtractStackTraceHasher ())
            .keyBy(0)
            .sum(1)
            .print();
}