Friday, November 29, 2019

A methodology to use Flink APIs for text summarization:
Text summarization has been treated as a multi-stage processing of a problem starting with word-vectorization, followed by SoftMax classification and keyword extraction and ending with salient sentences, projected as the summary. These stages may be numerous and each stage may involve multiple operations simultaneously such as when text classification and bounding box regression are run simultaneously.  Most of the stages are also batch oriented where-as the Flink APIs are known for their suitability to streaming operations. Yet the data that appears in a text regardless of its length is inherently a stream. If the Flink APIs can count words in a stream of text, then it can be applied with sophisticated operators to the same stream of text for text summarization.
With this motivation, this article tries to look for the application of Flink APIs towards text summarization starting with light-weight peripheral application to become more intrinsic to the text summarization solution.
Flink APIs give the comfort of choosing from a wide variety of transforming operators including map and reduce like operators and with first class standard query operators over Table API. In addition, they support data representation in tabular as well as graph forms by providing abstractions for both.  Words and their neighbors can easily by represented as vertices and weighted edges of a graph so long as the weights are found and applied to the edges. The finding of weights such as with SoftMax classification from neural net is well-known and outside the scope of this article. With the graph once established for a given text, Flink APIs can be used to work with the graphs to determine the vertices based on centrality and then the extraction of sentences according to those keywords and their positional relevance. We focus on this latter part.
For example, we can use the following technique to find the shortest path between two candidates:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(List<Tuple2<Edges, Integer>> tuples) 
            .flatMap(new ExtractWeights())
            .keyBy(0)
            .timeWindow(Time.seconds(30))
            .sum(1)
            .filter(new FilterWeights())
            .timeWindowAll(Time.seconds(30))
            .apply(new GetTopWeights())
            .print();
We can even use single-source shortest path for a classification. It might involve multiple iterations with at least one pass for each vertex as source:
SingleSourceShortestPaths<Integer, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<>(sourceVertex, maxIterations);
Advanced techniques may utilize streaming graph operators that are not restricted by the size of the graph and are first-class operators over graph.

Thursday, November 28, 2019

We were discussing Flink Apis and connector:
A typical query looks like this:

env.readTextFile(ratingsCsvPath)

.flatMap(new ExtractRating())

            .groupBy(0)

            .sum(1)

            .print();

There are connectors for different data sources which can be passed in via addSource

In order to add parallelization, that governs the tasks, replicas and runtime, we can set it globally with env.setParallelism(dop) ;

Otherwise we can set it at the DataSet API

There is even a similar method to set the maximum parallelism. This would prevent the parallelism from going beyond the limit

The newer DataStream API allows methods such as: assignTimestampsAndWatermarks because it supports the watermark and allows the ordering of the events in the stream. A sink can be added to write the events to store.

The Flink connector library for Pravega adds a data source and sink for use with the Flink Dara API.

The Flink Pravega Writer and Flink Pravega Reader work with generic events as long as the events implement serializability and de-serializability  respectively.

The FlinkPravegaWriter extends RichSinkFunction and implements ListCheckpointed and CheckpointListener interfaces.

The FlinkPravegaReader extends RichParallelSourceFunction implements ResultTypeQueryable, StoppableFunction and ExternallyInducedSource interfaces.

Notice the extension of  RichParallelSourceFunction from Flink is meant for high scalability from reader groups. This is required in use case such as reading from socket where the data may arrive at a fast rate

Wednesday, November 27, 2019

The solutions that are built on top of SQL queries are not necessarily looking to write SQL. They want programmability and are just as content to write code using Table API as they are with SQL queries. Consequently, business intelligence and CRM solutions are writing their code with standard query operators over their data. These well-defined operators are applicable to most collections of data and the notion is shared with other programming lConsistency also places hard requirements on the choice of the processing. Strict consistency has traditionally been associated with relational databases. Eventual consistency has been made possible in distributed storage with the help of messaging algorithms such as Paxos. Big Data is usually associated with eventual consistency.anguages such as. Net.
Partitioning is probably the reason why tools like Spark, Flink becomes popular for BigData:
Apache Flink provides a SQL abstraction over its Table API. It also provides syntax fo make use of Partitioning and increase parallelism.
The Apache Flink provides the ability to read from various data sources such as env.readFromFile, env.readFromCollection and readFromInput while allowing read and write from streams via connector.
A typical query looks like this:
env.readTextFile(ratingsCsvPath)
.flatMap(new ExtractRating())
            .groupBy(0)
            .sum(1)
            .print();
There are connectors for different data sources which can be passed in via addSource
In order to add parallelization, that governs the tasks, replicas and runtime, we can set it globally with env.setParallelism(dop) ;
Otherwise we can set it at the DataSet API
There is even a similar method to set the maximum parallelism. This would prevent the parallelism from going beyond the limit
The newer DataStream API allows methods such as: assignTimestampsAndWatermarks because it supports the watermark and allows the ordering of the events in the stream. A sink can be added to write the events to store.
The Flink connector library for Pravega adds a data source and sink for use with the Flink Dara API. 
The Flink Pravega Writer and Flink Pravega Reader work with generic events as long as the events implement serializability and de-serializability  respectively.
The FlinkPravegaWriter extends RichSinkFunction and implements ListCheckpointed and CheckpointListener interfaces.
The FlinkPravegaReader extends RichParallelSourceFunction implements ResultTypeQueryable, StoppableFunction and ExternallyInducedSource interfaces. 
Notice the extension of  RichParallelSourceFunction from Flink is meant for high scalability from reader groups. This is required in use case such as reading from socket where the data may arrive at a fast rate 

Tuesday, November 26, 2019

Apache Flink discussion continued:
Apache Spark query code and Apache Flink query code look very much similar, the former uses stream processing as a special case of batch processing while the latter does just the reverse.
Also Apache Flink provides a SQL abstraction over its Table API
The Apache Flink provides the ability to read from various data sources such as env.readFromFile, env.readFromCollection and readFromInput while allowing read and write from streams via connector.
Some more Flink Queries with examples below:
1) final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2) env.fromCollection(List<Tuple2<String, Integer>> tuples)
3)             .flatMap(new ExtractHashTags())
4)             .keyBy(0)
5)             .timeWindow(Time.seconds(30))
6)             .sum(1)
7)             .filter(new FilterHashTags())
8)             .timeWindowAll(Time.seconds(30))
9)             .apply(new GetTopHashTag())
10)             .print();
The Get top hash tag merely picks out the hash tag with the most count.
Notice the methods applied to the DataSet follow one after the other. This is a preferred convention for Flink
Since we use fromCollection, it does not have parallelism.
We can improve parallelism with fromParallelCollection
If we want to find a distribution, we can do something like :
env.readTextFile(ratingsCsvPath)
.flatMap(new ExtractRating())
            .groupBy(0)
            .sum(1)
            .print();
Here is the quintessential wordcount:
lines.flatMap((line, out) -> {
            String[] words = line.split("\\W+");
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));
            }
        })
        .returns(new TupleTypeInfo(TypeInformation.of(String.class), TypeInformation.of(Integer.class)))
        .groupBy(0)
        .sum(1)
        .print();

Flink also supports graphs, vertices and edges with:
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
Graph<Integer, NullValue, NullValue> followersGraph = Graph.fromDataSet(socialMediaEdgeSet, env);
We may need weighted graphs to run single source shortest path:
SingleSourceShortestPaths<Integer, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<>(sourceVertex, maxIterations);
     
There are connectors for different data sources which can be passed in via addSource

Monday, November 25, 2019

A comparision of Flink SQL execution and Facebook’s Presto continued:



The Flink Application provides the ability to write SQL query expressions. This abstraction works closely with the Table API and SQL queries can be executed over tables. The Table API is a language centered around tables and follows a relational model. Tables have a schema attached and the API provides the relational operators of selection, projection and join. Programs written with Table API go through an optimizer that applies optimization rules before execution.



Presto from Facebook is  a distributed SQL query engine can operate on streams from various data source supporting adhoc queries in near real-time.



Just like the standard query operators of .Net the FLink SQL layer is merely a convenience over the table APIs. On the other hand, Presto offers to run over any kind of data source not just Table APIs.



Although Apache Spark query code and Apache Flink query code look very much similar, the former uses stream processing as a special case of batch processing while the latter does just the reverse.

Also Apache Flink provides a SQL abstraction over its Table API


While Apache Spark provided ".map(...)" and  ".reduce(...)" programmability syntax to support batch oriented processing, Apache Flink provides Table APIs with  ".groupby(...)" and  ".order(...)" syntax. It provides SQL abstraction and supports steam processing as the norm. 

Sunday, November 24, 2019

Flink Savepoint versus checkpoint
Frequently Apache Flink software users find the tasks they submit to the Flink runtime to run long. This happens when the data that these tasks process is very large. There are usually more than one tasks that perform the same operation by dividing the data among themselves. These tasks do not perform in batch operations. They are most likely performing stream-based operations handling a window at a time. The state of the Flink application can be persisted with savepoints. This is something that the application can configure the tasks before they run.  Tasks are usually run as part of Flink ‘jobs’ depending on the parallelism set by the application. The savepoint is usually stored as a set of files under the <base>/.flink/savepoint folder
Savepoints can be manually generated from running link jobs with the following command:
# ./bin/flink savepoint <jobID>
The checkpoints are system generated. The do not depend on the user. The checkpoints are also written to the file system much the same way as savepoints. Although they use different names and are meant to be triggered by user versus system, they share the system method to persist state for any jobs as long as those jobs are enumerable.
The list of running jobs can be seen from the command line with
# ./bin/flink list –r
where the –r option is an option to list the running jobs
Since the code to persist savepoint and checkpoints are the same, the persistence of the state needs to follow the policies of the system which include a limit on the number of states an application can persist.
This implies that the manual savepointing will fail if the limits are violated.  This results in all savepoint triggers to fail. The configuration of savepointing and checkpointing is described by the Apache Flink documentation. Checkpointing may also be possible with the reader group for the stream store. It's limits may need to be honored as well.
User has syntax from Flink such as savepoints to interpret progress.  Users can create, own or delete savepoints which represents the execution state of a streaming job. These savepoints point to actual files on the storage. Flink provides checkpointing which it creates and deletes without user intervention.
While checkpoints are focused on recovery, much more lightweight than savepoints, and bound to the job lifetime, they can become equally efficient diagnostic mechanisms
The detection of failed savepoints is clear from both the jobManager logs as well as the taskManager logs. Shell execution of the flink command for savepoints is run from the jobManager where the flink binary is available. The logs are conveniently available in the log directory.

Saturday, November 23, 2019

A comparision of Flink SQL execution and Facebook’s Presto continued:

The Flink Application provides the ability to write SQL query expressions. This abstraction works closely with the Table API and SQL queries can be executed over tables. The Table API is a language centered around tables and follows a relational model. Tables have a schema attached and the API provides the relational operators of selection, projection and join. Programs written with Table API go through an optimizer that applies optimization rules before execution.

Presto from Facebook is  a distributed SQL query engine can operate on streams from various data source supporting adhoc queries in near real-time.
Microsoft facilitated streaming operators with StreamInsight queries. The Microsoft StreamInsight Queries follow a five-step procedure:
1)     define events in terms of payload as the data values of the event and the shape as the lifetime of the event along the time axis
2)     define the input streams of the event as a function of the event payload and shape. For example, this could be a simple enumerable over some time interval
3)     Based on the events definitions and the input stream, determine the output stream and express it as a query. In a way this describes a flow chart for the query
4)     Bind the query to a consumer. This could be to a console. For example
        Var query = from win in inputStream.TumblingWindow( TimeSpan.FromMinutes(3)) select win.Count();
5)     Run the query and evaluate it based on time.
The StreamInsight queries can be written with .Net and used with standard query operators. The use of ‘WindowedDataStream’ in Flink Applications and TumblingWindow in .Net are constructs that allow operation on stream based on a window at a time.


Just like the standard query operators of .Net the FLink SQL layer is merely a convenience over the table APIs. On the other hand, Presto offers to run over any kind of data source not just Table APIs.

Although Apache Spark query code and Apache Flink query code look very much similar, the former uses stream processing as a special case of batch processing while the latter does just the reverse.
Also Apache Flink provides a SQL abstraction over its Table API