We continue with our discussion on logging in Kubernetes. In the example provided, a log service is assumed to be made available. This service accessible at a predefined host and port.
Implementing the log-service within the cluster and listening on a tcp port will help with moving the log destination inside the cluster as stream storage and yet another application within the cluster. A service listening on a port typical for a Java application. The application will read from the socket and write to a pre-determined stream defined by the following parameters:
Stream name
Stream scope name
Stream reading timeout
Stream credential username
Stream credential password
public void write(String scope, String streamName, URI controllerURI, String routingKey, String message) {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<String>(),
EventWriterConfig.builder().build())) {
System.out.format("Writing message: '%s' with routing-key: '%s' to stream '%s / %s'%n",
message, routingKey, scope, streamName);
final CompletableFuture writeFuture = writer.writeEvent(routingKey, message);
}
}
public void readAndTransform(String scope, String streamName, URI controllerURI) {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new JavaSerializer<String>(),
ReaderConfig.builder().build())) {
System.out.format("Reading all the events from %s/%s%n", scope, streamName);
EventRead<String> event = null;
do {
try {
event = reader.readNextEvent(READER_TIMEOUT_MS);
if (event.getEvent() != null) {
System.out.format("Read event '%s'%n", event.getEvent());
String message = transform(event.getEvent().toString());
write("srsScope", "srsStream", "tcp://127.0.0.1:9090", "srsRoutingKey", message);
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
e.printStackTrace();
}
} while (event.getEvent() != null);
System.out.format("No more events from %s/%s%n", scope, streamName);
}
}
}
Implementing the log-service within the cluster and listening on a tcp port will help with moving the log destination inside the cluster as stream storage and yet another application within the cluster. A service listening on a port typical for a Java application. The application will read from the socket and write to a pre-determined stream defined by the following parameters:
Stream name
Stream scope name
Stream reading timeout
Stream credential username
Stream credential password
public void write(String scope, String streamName, URI controllerURI, String routingKey, String message) {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<String>(),
EventWriterConfig.builder().build())) {
System.out.format("Writing message: '%s' with routing-key: '%s' to stream '%s / %s'%n",
message, routingKey, scope, streamName);
final CompletableFuture writeFuture = writer.writeEvent(routingKey, message);
}
}
public void readAndTransform(String scope, String streamName, URI controllerURI) {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new JavaSerializer<String>(),
ReaderConfig.builder().build())) {
System.out.format("Reading all the events from %s/%s%n", scope, streamName);
EventRead<String> event = null;
do {
try {
event = reader.readNextEvent(READER_TIMEOUT_MS);
if (event.getEvent() != null) {
System.out.format("Read event '%s'%n", event.getEvent());
String message = transform(event.getEvent().toString());
write("srsScope", "srsStream", "tcp://127.0.0.1:9090", "srsRoutingKey", message);
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
e.printStackTrace();
}
} while (event.getEvent() != null);
System.out.format("No more events from %s/%s%n", scope, streamName);
}
}
}
No comments:
Post a Comment