Tuesday, April 14, 2020

This is a code sample for the suggestion made in the article about writing log4j appenders for stream store:
public class StreamAppender extends AppenderBase<ILoggingEvent> {
    private static final String COMPLETION_EXCEPTION_NAME = CompletionException.class.getSimpleName();
    private static final String EXECUTION_EXCEPTION_NAME = ExecutionException.class.getSimpleName();
    private final static String CONTROLLER_URI = "CONTROLLER_URI";
    private final static String SCOPE_NAME = "SCOPE_NAME";
    private final static String STREAM_NAME = "STREAM_NAME";

    public String scope;
    public String streamName;
    public URI controllerURI;
    private StreamManager streamManager;
    private EventStreamClientFactory clientFactory;
    private EventStreamWriter<String> writer;

    public StreamAppender(String scope, String streamName, URI controllerURI) {
        this.scope = scope;
        this.streamName = streamName;
        this.controllerURI = controllerURI;
    }

    @Override
    public void start() {
        final String scope = getEnv(SCOPE_NAME);
        final String streamName = getEnv(STREAM_NAME);
        final String uriString = getEnv(CONTROLLER_URI);
        final URI controllerURI = URI.create(uriString);

        this.scope = scope;
        this.streamName = streamName;
        this.controllerURI = controllerURI;
        init();
        super.start();
    }

    @Override
    public void stop() {
        if (writer != null) writer.close();
        if (clientFactory != null) clientFactory.close();
        if (streamManager != null) streamManager.close();
        super.stop();
    }

    private static String getEnv(String variable) {
        Optional<String> value = Optional.ofNullable(System.getenv(variable));
        return value.orElseThrow( () -> new IllegalStateException(String.format("Missing env variable %s", variable)));
    }

    private void init() {
        StreamManager streamManager = StreamManager.create(controllerURI);

        StreamConfiguration streamConfig = StreamConfiguration.builder()
            .scalingPolicy(ScalingPolicy.fixed(1))
            .build();
        streamManager.createStream(scope, streamName, streamConfig);
        clientFactory = EventStreamClientFactory.withScope(scope, ClientConfig.builder().controllerURI(controllerURI).build());
        writer = clientFactory.createEventWriter(streamName,
                 new JavaSerializer<String>(),
                 EventWriterConfig.builder().build());
    }

    //region Appender Implementation

    @Override
    public String getName() {
        return "Stream Appender";
    }

    @Override
    public void append(ILoggingEvent event) throws LogbackException {
        if (event.getLevel() == Level.ERROR) {
            recordEvent("error", event);
        } else if (event.getLevel() == Level.WARN) {
            recordEvent("warn", event);
        }
    }

    private void recordEvent(String level, ILoggingEvent event) {
        IThrowableProxy p = event.getThrowableProxy();
        while (shouldUnwrap(p)) {
            p = p.getCause();
        }
        if (writer != null) {
            writer.writeEvent(level, event.getMessage());
        }
    }

    private boolean shouldUnwrap(IThrowableProxy p) {
        return p != null
                && p.getCause() != null
                && (p.getClassName().endsWith(COMPLETION_EXCEPTION_NAME) || p.getClassName().endsWith(EXECUTION_EXCEPTION_NAME));

    }

    //endregion

}

The sample above refers to opening the stream each time to write an event. This may be avoided with doing it once on the instantiation of the bean.

No comments:

Post a Comment