This is a code sample for the suggestion made in the article about writing log4j appenders for stream store:
public class StreamAppender extends AppenderBase<ILoggingEvent> {
private static final String COMPLETION_EXCEPTION_NAME = CompletionException.class.getSimpleName();
private static final String EXECUTION_EXCEPTION_NAME = ExecutionException.class.getSimpleName();
private final static String CONTROLLER_URI = "CONTROLLER_URI";
private final static String SCOPE_NAME = "SCOPE_NAME";
private final static String STREAM_NAME = "STREAM_NAME";
public String scope;
public String streamName;
public URI controllerURI;
private StreamManager streamManager;
private EventStreamClientFactory clientFactory;
private EventStreamWriter<String> writer;
public StreamAppender(String scope, String streamName, URI controllerURI) {
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
}
@Override
public void start() {
final String scope = getEnv(SCOPE_NAME);
final String streamName = getEnv(STREAM_NAME);
final String uriString = getEnv(CONTROLLER_URI);
final URI controllerURI = URI.create(uriString);
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
init();
super.start();
}
@Override
public void stop() {
if (writer != null) writer.close();
if (clientFactory != null) clientFactory.close();
if (streamManager != null) streamManager.close();
super.stop();
}
private static String getEnv(String variable) {
Optional<String> value = Optional.ofNullable(System.getenv(variable));
return value.orElseThrow( () -> new IllegalStateException(String.format("Missing env variable %s", variable)));
}
private void init() {
StreamManager streamManager = StreamManager.create(controllerURI);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
streamManager.createStream(scope, streamName, streamConfig);
clientFactory = EventStreamClientFactory.withScope(scope, ClientConfig.builder().controllerURI(controllerURI).build());
writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<String>(),
EventWriterConfig.builder().build());
}
//region Appender Implementation
@Override
public String getName() {
return "Stream Appender";
}
@Override
public void append(ILoggingEvent event) throws LogbackException {
if (event.getLevel() == Level.ERROR) {
recordEvent("error", event);
} else if (event.getLevel() == Level.WARN) {
recordEvent("warn", event);
}
}
private void recordEvent(String level, ILoggingEvent event) {
IThrowableProxy p = event.getThrowableProxy();
while (shouldUnwrap(p)) {
p = p.getCause();
}
if (writer != null) {
writer.writeEvent(level, event.getMessage());
}
}
private boolean shouldUnwrap(IThrowableProxy p) {
return p != null
&& p.getCause() != null
&& (p.getClassName().endsWith(COMPLETION_EXCEPTION_NAME) || p.getClassName().endsWith(EXECUTION_EXCEPTION_NAME));
}
//endregion
}
The sample above refers to opening the stream each time to write an event. This may be avoided with doing it once on the instantiation of the bean.
public class StreamAppender extends AppenderBase<ILoggingEvent> {
private static final String COMPLETION_EXCEPTION_NAME = CompletionException.class.getSimpleName();
private static final String EXECUTION_EXCEPTION_NAME = ExecutionException.class.getSimpleName();
private final static String CONTROLLER_URI = "CONTROLLER_URI";
private final static String SCOPE_NAME = "SCOPE_NAME";
private final static String STREAM_NAME = "STREAM_NAME";
public String scope;
public String streamName;
public URI controllerURI;
private StreamManager streamManager;
private EventStreamClientFactory clientFactory;
private EventStreamWriter<String> writer;
public StreamAppender(String scope, String streamName, URI controllerURI) {
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
}
@Override
public void start() {
final String scope = getEnv(SCOPE_NAME);
final String streamName = getEnv(STREAM_NAME);
final String uriString = getEnv(CONTROLLER_URI);
final URI controllerURI = URI.create(uriString);
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
init();
super.start();
}
@Override
public void stop() {
if (writer != null) writer.close();
if (clientFactory != null) clientFactory.close();
if (streamManager != null) streamManager.close();
super.stop();
}
private static String getEnv(String variable) {
Optional<String> value = Optional.ofNullable(System.getenv(variable));
return value.orElseThrow( () -> new IllegalStateException(String.format("Missing env variable %s", variable)));
}
private void init() {
StreamManager streamManager = StreamManager.create(controllerURI);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
streamManager.createStream(scope, streamName, streamConfig);
clientFactory = EventStreamClientFactory.withScope(scope, ClientConfig.builder().controllerURI(controllerURI).build());
writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<String>(),
EventWriterConfig.builder().build());
}
//region Appender Implementation
@Override
public String getName() {
return "Stream Appender";
}
@Override
public void append(ILoggingEvent event) throws LogbackException {
if (event.getLevel() == Level.ERROR) {
recordEvent("error", event);
} else if (event.getLevel() == Level.WARN) {
recordEvent("warn", event);
}
}
private void recordEvent(String level, ILoggingEvent event) {
IThrowableProxy p = event.getThrowableProxy();
while (shouldUnwrap(p)) {
p = p.getCause();
}
if (writer != null) {
writer.writeEvent(level, event.getMessage());
}
}
private boolean shouldUnwrap(IThrowableProxy p) {
return p != null
&& p.getCause() != null
&& (p.getClassName().endsWith(COMPLETION_EXCEPTION_NAME) || p.getClassName().endsWith(EXECUTION_EXCEPTION_NAME));
}
//endregion
}
The sample above refers to opening the stream each time to write an event. This may be avoided with doing it once on the instantiation of the bean.
No comments:
Post a Comment