Sunday, May 24, 2020

A typical stream processing program would look like this: 

    public static void main(String argv[]) throws Exception { 

    final ParameterTool params = ParameterTool.fromArgs(argv); 

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    env.getConfig().setGlobalJobParameters(params); 

        String scope = Constants.DEFAULT_SCOPE; 

        String streamName = Constants.DEFAULT_STREAM_NAME; 

        PravegaConfig pravegaConfig = PravegaConfig.fromParams(ParameterTool.fromArgs(argv)); 

        pravegaConfig.withCredentials(new DefaultCredentials(Constants.DEFAULT_PASSWORDConstants.DEFAULT_USERNAME)); 

        StreamConfiguration streamConfig = StreamConfiguration.builder() 

                .scalingPolicy(ScalingPolicy.fixed(Constants.NO_OF_SEGMENTS)) 

                .build(); 

  

        logger.info("001- creating stream"); 

        Stream stream = pravegaConfig.resolve(streamName); 

  

        logger.info("002- adding data"); 

        DataStream<String> input = env.fromCollection(snippets); 

        snippets.add("data1"); 

        snippets.add("data2"); 

        Snippets.add("data3"); 

        input.print(); 

  

        logger.info("003- iterate over data"); 

        IterativeStream<String> iteration = input.iterate(); 

        iteration.withFeedbackType(String.class); 

        List<String> entries = new ArrayList<>(); 

        iteration.map(t -> {entries.add(t); return t;}); 

        iteration.closeWith(mapped); 

  

        logger.info("004 - creating a writer to write to stream"); 

        FlinkPravegaWriter.Builder<String> builder = FlinkPravegaWriter.<String>builder() 

            .withPravegaConfig(pravegaConfig) 

            .forStream(stream) 

                .withEventRouter(new PravegaEventRouter<String >() { 

                    @Override 

                    public String getRoutingKey(String e) { 

                        return e; 

                    } 

                }) 

            .withSerializationSchema(PravegaSerialization.serializationFor(String.class)); 

        builder.enableWatermark(true); 

        builder.withWriterMode(PravegaWriterMode.EXACTLY_ONCE); 

        FlinkPravegaWriter<String> flinkPravegaWriter = builder.build(); 

        input.addSink(flinkPravegaWriter); 

       

        logger.info("005 - creating a reader to read from stream"); 

        FlinkPravegaReader<String> flinkPravegaReader = FlinkPravegaReader.<String>builder() 

                .withPravegaConfig(pravegaConfig) 

                .forStream(stream) 

                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class)) 

                .build(); 

  

        logger.info("006 - reading events from stream"); 

        DataStream<String> eventsRead = env 

                    .addSource(flinkPravegaReader) 

                    .name("eventsRead"); 

        IterativeStream<String> it = eventsRead.iterate(); 

        List<String> dataList = new ArrayList<>(); 

        DataStream<String> newEvents = it.map(t -> {dataList.add(t); return t;}); 

        logger.info("count of events = {}", dataList.size()); 

        it.closeWith(newEvents); 

  

        logger.info("007- done"); 

        env.execute("Stream Writer"); 

    } 

 


The application maintenance requirements for these applications involve the following:
1) Application data: Dat adoes not only come from stream store. IT comes from files and collections as well. This means the program must have access to those data stores.
2) Application object model: Application should be conveniently developed using those data sources. It should not require standalone development prior to integration.
3) Application Core components: The application should be convenient for the administrator and developer alike to turn on or off the above mentioned interaction with other components. 
4) Application Services: This kind of application works over infinite data as opposed to conventional applciations. This means the APIs should support runtime performance query. 
5)  Application UI: This organizes the view models and the views so that the business objects can be modified to enable the workflows required from the application. The properties on the UI are defined usually by declarative markup in views.  
6) Application Exception handling: The entire application handles exceptions at each layer and across layers. Usually applications don't allow unhandled exceptions to propagate to the user. Messages are translated into what's meaningful to the user. 
7) Application Logging: Application logging via standard application blocks or other utilities is necessary both for development as well as production support. 
8) Application user access: controlling who has access to what is another important aspect of design for an application. Applications tend to handle security as close to the entry points as possible. The entry points typically have a login that comprises of authentication, authorization lines that need to be crossed before the user has access. 
9) Application performance: Application design requires consideration for performance and often include fast code paths in addition to general more expensive ones as well as external tools or frameworks such as caching. 
10) Application messaging: Service bus or some messaging framework is involved to send messages between application and its dependencies. These could be other services or other data providers. These could also be external gateway when there are heterogeneous systems involved. 

No comments:

Post a Comment