Tuesday, December 31, 2019


The following implementations of createEvent and getEvent indicate that the Pravega data store as a stream store can allow methods as described by the above Flink iterations:
    @Override
    public CompletableFuture<Void> createEvent(final String  routingKey,
                                               final String scopeName,
                                               final String streamName,
                                               final String message) {

        CompletableFuture<Void> ack = CompletableFuture.completedFuture(null);
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchLatestRevision();
        revisedStreamClient.writeConditionally(r, message);
        return ack;
}

    @Override
    public CompletableFuture<String> getEvent(final String  routingKey,
                                              final String scopeName,
                                              final String streamName,
                                              final Long segmentNumber) {
        ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
        SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
        RevisionedStreamClient<String>  revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
                NameUtils.getMarkStreamForStream(streamName),
                new JavaSerializer<String>(), SynchronizerConfig.builder().build());
        Revision r = revisedStreamClient.fetchOldestRevision();
        Segment s = r.getSegment();
        io.pravega.client.stream.Stream stream = s.getStream();
        StringBuffer sb = new StringBuffer();
        while (iter.hasNext()) {            Map.Entry<Revision, String> entry = iter.next();
            sb.append(entry.getValue());
            }
        CompletableFuture<String> ack = CompletableFuture.completedFuture(sb.toString());
        return ack;
    }

Which results in the following log output:

2019-12-31 03:51:00,674 22519 [grizzly-http-server-2] INFO i.p.c.s.i.RevisionedStreamClientImpl - Wrote from 0 to 20
2019-12-31 03:51:00,675 22520 [grizzly-http-server-2] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - revisioned client wrote to revision: project58/_MARKlogstream2/0.#epoch.0:0:0
2019-12-31 03:51:00,778 22623 [grizzly-http-server-3] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - fetchLatestRevision=project58/_MARKlogstream2/0.#epoch.0:20:0

No comments:

Post a Comment