Tuesday, November 26, 2019

Apache Flink discussion continued:
Apache Spark query code and Apache Flink query code look very much similar, the former uses stream processing as a special case of batch processing while the latter does just the reverse.
Also Apache Flink provides a SQL abstraction over its Table API
The Apache Flink provides the ability to read from various data sources such as env.readFromFile, env.readFromCollection and readFromInput while allowing read and write from streams via connector.
Some more Flink Queries with examples below:
1) final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2) env.fromCollection(List<Tuple2<String, Integer>> tuples)
3)             .flatMap(new ExtractHashTags())
4)             .keyBy(0)
5)             .timeWindow(Time.seconds(30))
6)             .sum(1)
7)             .filter(new FilterHashTags())
8)             .timeWindowAll(Time.seconds(30))
9)             .apply(new GetTopHashTag())
10)             .print();
The Get top hash tag merely picks out the hash tag with the most count.
Notice the methods applied to the DataSet follow one after the other. This is a preferred convention for Flink
Since we use fromCollection, it does not have parallelism.
We can improve parallelism with fromParallelCollection
If we want to find a distribution, we can do something like :
env.readTextFile(ratingsCsvPath)
.flatMap(new ExtractRating())
            .groupBy(0)
            .sum(1)
            .print();
Here is the quintessential wordcount:
lines.flatMap((line, out) -> {
            String[] words = line.split("\\W+");
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));
            }
        })
        .returns(new TupleTypeInfo(TypeInformation.of(String.class), TypeInformation.of(Integer.class)))
        .groupBy(0)
        .sum(1)
        .print();

Flink also supports graphs, vertices and edges with:
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
Graph<Integer, NullValue, NullValue> followersGraph = Graph.fromDataSet(socialMediaEdgeSet, env);
We may need weighted graphs to run single source shortest path:
SingleSourceShortestPaths<Integer, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<>(sourceVertex, maxIterations);
     
There are connectors for different data sources which can be passed in via addSource

No comments:

Post a Comment