The following implementations of createEvent and getEvent indicate that the Pravega data store as a stream store can allow methods as described by the above Flink iterations:
@Override
public CompletableFuture<Void> createEvent(final String routingKey,
final String scopeName,
final String streamName,
final String message) {
CompletableFuture<Void> ack = CompletableFuture.completedFuture(null);
ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
RevisionedStreamClient<String> revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
NameUtils.getMarkStreamForStream(streamName),
new JavaSerializer<String>(), SynchronizerConfig.builder().build());
Revision r = revisedStreamClient.fetchLatestRevision();
revisedStreamClient.writeConditionally(r, message);
return ack;
}
@Override
public CompletableFuture<String> getEvent(final String routingKey,
final String scopeName,
final String streamName,
final Long segmentNumber) {
ClientConfig clientConfig = this.getStoreHelper().getSegmentHelper().getConnectionFactory().getClientConfig();
SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(scopeName, ClientConfig.builder().build());
RevisionedStreamClient<String> revisedStreamClient = synchronizerClientFactory.createRevisionedStreamClient(
NameUtils.getMarkStreamForStream(streamName),
new JavaSerializer<String>(), SynchronizerConfig.builder().build());
Revision r = revisedStreamClient.fetchOldestRevision();
Segment s = r.getSegment();
io.pravega.client.stream.Stream stream = s.getStream();
StringBuffer sb = new StringBuffer();
while (iter.hasNext()) { Map.Entry<Revision, String> entry = iter.next();
sb.append(entry.getValue());
}
CompletableFuture<String> ack = CompletableFuture.completedFuture(sb.toString());
return ack;
}
Which results in the following log output:
2019-12-31 03:51:00,674 22519 [grizzly-http-server-2] INFO i.p.c.s.i.RevisionedStreamClientImpl - Wrote from 0 to 20
2019-12-31 03:51:00,675 22520 [grizzly-http-server-2] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - revisioned client wrote to revision: project58/_MARKlogstream2/0.#epoch.0:0:0
2019-12-31 03:51:00,778 22623 [grizzly-http-server-3] INFO i.p.c.s.s.PravegaTablesStreamMetadataStore - fetchLatestRevision=project58/_MARKlogstream2/0.#epoch.0:20:0
No comments:
Post a Comment