Saturday, September 19, 2020

An Email campaign management system

Problem statement 

An email campaign management system empowers a user to send automated emails to many recipients. The content and the broadcast of the email is referred to as a campaign. A sample use case for the campaign helps describe the problem and this solution. Let us say a job seeker wants to mail out a template with a standard cover letter and resume as a self-introduction and advertisement to all the acquaintances. In this case, the letter and the resume become part of the campaign and the user may want to change the campaign and the target audience. The ability to do so from a web interface helps make the interaction minimal and error-free. The content can be uploaded as files while the email recipients can be added from the browser. After the contents have accrued to the intended group, the candidate can click on a button to mail the recipients using SMTP. 

Role of a Database: 

 A database is useful to keep a table of entries and to support create, update, and delete operations independent of the purpose for which these contacts are accrued. The table serves well for an online transaction processing system and the interface to use such a table follows a standard convention for the usage.  

Role of a Message Broker: 

A message broker is useful for sending messages to multiple recipients with retries and dead letter queue. Besides, it journals the message and activities for review later. Messaging protocols are well-known and enable scriptability with a variety of libraries and packages. 

Role of a user interface:  

The user interface is intended only for one user – the campaign manager. The campaign manager can not only feed the recipients and the content but also review the activity and progress as the campaign is mailed out. 

Design: 

A system that enables campaigns to be generated is a good candidate for automation as a background job. Let us look at this job in a bit more detail: 

1)      First, we need a way to specify the criteria for selecting the email recipients. This can be done with a set of logical conditions using ‘or’ and ‘and’ operators. Each condition is a rule and the rules may have some order to them. They are best expressed as a stored procedure with versioning.  If the data to filter resides in a table such as say the customers' table, then the stored procedure resides as close to the data as possible. 

2)      The message may need to be formatted and prepared for each customer and consequently, these can be put in a queue where they are appropriately filled and made ready. A message broker comes very usefully in this regard. Preparation of emails is followed by sending them out consequently there may be separate queues for preparation and mailing out and they may be chained. 

3)      The number of recipients may be quite large and the mails for each of them may need to be prepared and sent out. This calls for parallelization. One way to handle this parallelization would be to have workers spawned to handle the load on the queue. Celery is a good example of such a capability, and it works well with a message broker. 

4)      A web interface to generate campaigns can be useful for administrators to interact with the system. 

 
 

The data flow begins with the administrator defining the campaign. This consists of at the very least the following: a) the email recipients b) the mail template c) the data sources from which to populate the databases and d) the schedule in which to send out the mails. 

The email recipients need not always be specified explicitly especially if they number in millions. On the other hand, the recipients may already be listed in a database somewhere. There may only be selection criteria for filtering the entire list for choosing the recipients. Such criteria are best expressed in the form of a stored procedure. The translation of the user-defined criteria into a stored procedure is not very hard. The user is given a set of constraints, logical operators, and valuable inputs and these can be joined to form predicates which are then entered as-is into the body of a stored procedure. Each time the criteria are executed through the stored procedure, the result set forms the recipients' list. When the criteria change, the stored procedure is changed, and this results in a new version.  Since the criteria and stored procedure are independent of the message preparation and mailing, they can be offline to the mailing process.  

The mailing process commences with the email list determined as above.  The next step is the data that needs to be acquired for each template. For example, the template may correspond to the resources that the recipients may have but the list of resources may need to be pulled from another database. It would be ideal if this could also be treated as SQL queries which provide the data that a task then uses to populate the contents of the email. Since this is per email basis, it can be parallelized to a worker pool where each worker grabs an email to prepare.  An email receives a recipient and content.  Initially, the template is dropped on the queue with just the email recipient mentioned. The task then manages the conversion of the template to the actual email message before putting it on the queue for dispatch. The dispatcher simply mails out the prepared email with SMTP.  

The task-parallel library may hide the message broker from the parallelization. Celery comes with its own message broker that also allows the status of the enqueued items to be logged. However, a fully-fledged message broker with a worker pool is preferred because it gives much more control over the queue and the messages permitted on the queue. Moreover, journaling and logging can with automation. Messages may be purged from the queue so that the automation stops on user demand. 

Therefore, data flows from data sources into the emails that are then mailed out.  The task that prepares the emails needs to have access to the database tables and stored procedures that determine who the recipients are and what the message is. Since they act on an individual email basis, they are scalable.   

Intelligent Routines:  

The ability to form groups of recipients based on classification rules is an intelligence added to the system that does away with manual entry of data. The use of classifiers from groups depends on a set of rules that can be specified independently. Each rule can be added via the user interface and mailed out to the recipients. 

Monitoring the progress: 

The message queue broker helps with the queue statistics where the number of orders on the queue determines the progress. When an order is complete the status of the item reflects the information. Subsequent read-only queries on the status give an indication of the progress. 

Testing: 

Each content and group should be verified independently. The mailing of a campaign should have a dry run before being mass-mailed to the intended recipients. 

Conclusion: 

Implementation of an email campaign system allows the flexibility to customize all parts of the campaign process even beyond the capabilities of off-the-shelf automation systems. 

Reference: paper titled “Queues are databases” by Jim Gray 

Alternatives:  

Instead of a database, an issue tracking software such as Jira can also be used together with the message broker. For example https://github.com/ravibeta/PythonExamples/blob/master/seemq.py 

conflation

 #conflation 

Streams can be conflated into an uber stream. This is helpful in devOps cases where streams are created by different application but need to be merged to a stream that can be truncated or appended by a global policy and useful for downstream systems.   


There can be thousands of streams to be merged and the incoming data rate for these streams could be different. The events in the stream can be compared based on timestamp and the earlier event can follow another.  If the events are distributed by key space, each key at a point of time spanning the key space can be considered to be in an independent stream but will retain the naming convention using the routing key. Since the routing key differentiates the events, the order of events with same timestamp does not matter.  


The controller is the right place to include an api to merge the streams since that is the interface to create the streams. The streams can be referenced by their names that are qualified by their scopes.  


The interface can also support a copy stream method that allows a stream to be copied. In this case, the copy saves the client from having to do so one event at a time. Instead, the entire collection of events, their sequence and offsets can be copied along with the metadata and with renamings. 


Streams can also be conflated to an external store. For example, we can overlay a stream store on top of object storage and have all the streams written to the same bucket. This is elaborated in https://1drv.ms/w/s!Ashlm-Nw-wnWvj_oHPVynmrD-SVI?e=k8EaG8 

Friday, September 18, 2020

Annotations

 #annotations 

Any key value can be used as annotations which helps to differentiate events. There are internal annotations and external annotations. The external ones come from customers and if it can be applied to any event, stream or scope, then they serve to improve the query operators used with the events.  


The internal annotations are stored in table segments which is designed to hold different key values. The data is stored independent from metadata using BTree data structures. This allows the data to be stored with or without any metadata. The metadata stored as key-values can be arbitrarily large and cannot pollute the data.  


The data structure for internal and external metadata is the same. Just the access is different. There is no access allowed to internal metadata because it must remain a source of truth. The external metadata can change but it helps to version the change.  


The metadata is injected using the same field extraction technique discussed as above. In this case, the annotations come from the user.  


The user provided metadata can be applied to different containers such as stream, scope and pipelines. There are several applications that can query this metadata such as reporting stacks, troubleshooting, and diagnostics, stamping and providing environmental information. Most of these applications are read-only stacks so they can quickly scale to any spatial or temporal elasticity. 



Thursday, September 17, 2020

Pipelines

 The stream store has the opportunity to streamline access to metadata independent of applications so that they can focus on their business needs. 

#pipelines

Build Automation jobs and message queues have a lot in common with streams and events. There is a pipeline pattern formed where events from one stream feed into another. The operators that extract, transform and load the events from one stream to another can form a continuous pipeline that transfers events as they come.  

The setting up of these pipelines requires readers and writers to be commissioned between streams. This relay can easily scale as and when the load increases given the parallelization supported by the stream store. But it would help to have an api that sets up a relay and takes simple filters as say predicate parameters. This automatic provisioning of relays is sufficient to setup multiple hop pipelines.

This higher-level api for relays to setup a pipeline can also support management features associated with a pipeline by collecting metrics and statistics as described earlier for streams or to report on status at various points of transitions. The ability to view end-to-end operations of a pipeline is helpful for business needs and for manageability.

There are many other patterns of inter stream activities that can also be supported via a set of operators like appenders, collectors and forwarders. These make pipelines a veritable storage container in itself along with its metadata.

The participation of stream store in data pipeline is now transformed as the participation of streams in a pipeline with managed features from the stream store. Some of the onus from applications has been removed by including automations within the stream store. 

 


Wednesday, September 16, 2020

Metrics continued

 We were discussing a set of features for stream store that brings the notion of accessing events in sorted order with skipped traversal.  The events can be considered to be in some predetermined sequence in the event stream whether it is by offset or by timestamp. These sequence numbers are in sorted order. Accessing any event in the stream, as if considered to be in a batch bounded by a head and a tail StreamCut that occur immediately before and after the event respectively, is now better than the linear traversal to read the event. This makes the access to the event from the historical set of events in the stream to be O(log N). The skip-level access links in the form of head and tail streamcuts can easily be built into the metadata on a catch-up basis after the events are accrued in the stream.

In addition, we have the opportunity to collect fields and the possible values that occur in the events to allow them to be leveraged in queries subsequently. This enhancement of metadata from events in the stream becomes useful to find similar events

The use of standard query operators with the events in the stream has been made possible by the Flink programming library but the logic written with those operators usually is not aware of all the fields that has been extracted from the events. By closing the gap between the field extraction and new fields in query logic, the applications can not only improve existing logic but also write new ones.

The extraction of fields and their values provides an opportunity to not only discover the range of values that certain keys can take across all the events in the stream but also their distribution. Events are numerous and there is no go-to source for statistics about events especially similar looking events. Two streams having similar events may have them in incredibly different order and arrival times. If the stream store is unaware of the contents of the events, it can tell the number of events and the size made up by those events. But with some insight into the events such as the information about their source, a whole new set of metrics are now available which can help with summary information, point of origin troubleshooting, contribution/spread calculation, and better resource allocations.

Metrics provide handy information about the stream that would otherwise have to be interpreted by running offline analysis on logs. Summary statistics from metrics can now be saved with metadata.

There are a few cases where the metadata might be little or none. This includes encrypted data and binary data. The bulk of the use cases for the stream store instead deals with textual data. So, the benefits from the features mentioned above are not marginal. 


Client applications can do without the support for such metadata from the stream store but it is cumbersome for them as they inject intermittent events with summary data that they would like to publish for their events. These special events can be skipped during data reads from the stream. As with any events, they will need to be iterated. 


Clients would find it inconvenient to write different types of events to the same stream unless they use different payloads in the same type of envelope or data type. On the other hand, it is easier for the clients to have readers and writers to read a single type of event from a stream. This favors the client to choose a model where they store processing related information in a dedicated stream. The stream store however can create internal streams specifically for the purpose of metadata. The latter approach ties the setup and teardown of the internal streams to the data stream and becomes a managed approach. 


The stream store does not force a centralized catalog. It maintains metadata per stream. Streams are isolated and the readers and writers can scale. The metrics and statistics that become part of the metadata reflect holistic information on the stream. There is a possibility that the metadata updates may lag the incoming data rate but the difference might be negligible. Also, the metadata survives stream store restart and the updates are typically incremental. 

#metric

Integer totalSizeOfEvents = ranges.stream() 

      .map(range - > getSizeOfEvents(range))  

     .reduce(0, Integer::sum) ; 

Tuesday, September 15, 2020

Metrics continued

Having described the collection of metrics from streams with primitive information on events, let us now see how to boost the variety and customization of metrics. In this regard, the field extraction performed from the parsing of historical events, provides additional datapoints which become helpful in generating metrics. For example, the metrics can now be based on field names and their values as they occur throughout the stream, giving information on the priority and severity about certain events. This summary information about the stream now includes metrics about criteria pertaining to the events.
The set of features mentioned earlier are independent of each other which allows them to be implemented in stages or evaluated for priority. The reference to metadata is only a suggestion that it pertains to a give stream and the events in the stream. The container for such interpreted data might be implemented instead as a global stream. This gives the ability to collect summary information across all streams in a scope or dedicate a stream for all the operational data for the stream store.
There are a few cases where the metadata might be little or none. This includes encrypted data and binary data. The bulk of the use cases for the stream store instead deals with textual data. So the benefits from the features mentioned above are not marginal.
Client applications can do without the support for such metadata from the stream store but it is cumbersome for them as they inject intermittent events with summary data that they would like to publish for their events. These special events can be skipped during data reads from the stream. As with any events, they will need to be iterated.
Clients would find it inconvenient to write different types of events to the same stream unless they use different payloads in the same type of envelope or data type. On the other hand, it is easier for the clients to have readers and writers to read a single type of event from a stream. This favors the client to choose a model where they store processing related information in a dedicated stream. The stream store however can create internal streams specifically for the purpose of metadata. The latter approach ties the setup and teardown of the internal streams to the data stream and becomes a managed approach.