Friday, May 2, 2014

In this post, we revert to designing a scalable queue alerts module. We describe a technique that involves getting all the messages from the queues and then filtering them for the actions involved. In this case, we add metadata to the messages particularly the queue to which it belongs. When the queues get added or deleted, we do or do not see messages from those queues so this does not affect the message processor in the alerts module. However the rules may or may not be in sync with the rules.To change the rules, the alerts module must be reloaded with the new set of rules. In order to do that, the alerts module provides an interface to reconfigure the rules it has. These rules could still be invalid. If the queues mentioned in the rules don't exist or if there are some errors encountered during the evaluation of the rules, they result in no-operation. The messages are evaluated only against those that are valid. For the valid messages the actions are invoked by workers from a pool. These help with message processing so that messages are not queued behind a single worker. Also the rules could specify the action in  a way that it serves as alerts.
The implementation has two layers. There is the message manager and the message worker. The worker receives the incoming messages and evaluates it against a rule to determine the action to be taken. The message manager does book keeping of the actions and the messages for a particular application.  The manager can work with workers across applications and rules. The manager can also start and stop the worker. By separating the management of the message, queue, rules configuration and actions, we have a separation of concerns and the layer above can assume that the messages are all read and acted on.
I will describe the API next that outlines the interaction between the alerts module and the  user
When discussing alerts and triggers in our previous post, one application of triggers comes to mind and this is replication. We talk about different replication techniques including trigger based replication in this post.
With replication one can hope to have a slightly out-of-date "warm standby" as a substitute for the main. There are several hardware and software techniques. We enumerate just a few.
Physical replication  : The simplest scheme is to physically duplicate the entire set of data every replication period. This scheme does not replicate large data-set because the expensive bandwidth for shipping the data and the cost for reinstalling it at the remote site.
This scheme is often used by the clients at the low end because of the performance costs.  Moreover, guaranteeing a transaction consistent snapshot is tricky. This can be avoided with the next technique.
Trigger based replication : In this scheme, triggers are placed on the data-set containers so that any insert, update or delete operations produced a 'difference' record which is installed in a special replication table. This replication table is shipped to the remote site and the modifications are replayed there.Since the triggers are for transactions, we know that the differences we get are consistent with transactions. However, there is more performance penalty with this technique.
The Log-based replication: This scheme is the replication solution of choice when feasible.  Here a log sniffer process intercepts the log writes and delivers them to the remote system. This is implemented using two broad techniques:
1) read the log and prepare the data changes to be replayed on the target system,
2) read the logs and ship them to the target system
The target system continually replays log records as they arrive. 
Often both mechanisms are used. This scheme overcomes all of the problems of the previous alternatives.

Thursday, May 1, 2014

Today we discuss some more on the queue alerts. We look into the possibility of polling as a way to read the messages in a non-invasive manner instead of instrumenting the queue processor for events. In polling, we are continuously seeking to read the queue. We read from all available queues and the moment we have read from a queue we post another read. There may be multiplexing involved and we may have several workers participating in the IO. When we get all the messages we then evaluate them against the rules. Reading from the queues in this manner is akin to promiscuous mode listening on the network.
We now evaluate whether we need to trigger alerts or if we can proceed to evaluating the rules on the messages. When we raise alerts on the messages received, it does not correspond to any of the events on the queue processor. The queue processor can tell us when the message arrived, when the message gets processed and when the message leaves the queue. It can also tell us any exceptions, retries or the destination the message is sent to. This information is not available to the poller. Moreover the poller can get messages in any order. Only with the help of timestamps, can the poller make any guess on the sequence of packets processed by the processors. Two or more messages may be simultaneously executed by the different processors in which case they have the same timestamp. These are serialized into the incoming message queue by the processors. Since the poller cannot make any events other than the arrival time of the messages unless there was some metadata associated with the message, the poller is only able to generate the arrival event and even that is of no consequence since the application can already look at the timestamp on the messages. If the poller were to execute the actions specified against the conditions in the rules, the poller doesn't need to raise events for itself. But let us look at what metadata could potentially be useful. The queue from which the message was read and the information about that queue is a useful addition to the information to have. Now we can trace the path of a message as it moves on the same or other queues. We can also easily filter out the messages arrived on a particular queue. Thus we see that a little bit more information collected by the poller such as a queue id of the queue from which the message was read is very helpful for evaluating the rules specified by the user to the queue alerts module. 

Wednesday, April 30, 2014

Today I'm going to elaborate on a noninvasive method of reading MSMQ to trigger alerts. Let us say we keep a round robin of buffers to read from different queues - one round robin per queue or one round robin per rule/application.  We will discuss more about the consumers of the round robin buffers shortly. But first we mention what they are. Basically we are reading the queues just as TCP maintains a sliding window. The queues are read in the order of the messages being processed. As each message arrives, it is evaluated against the rules to invoke the corresponding action. The same could have been done by the queue processor. The only difference is that this is now handled externally to the queue processor. The user for the queue alerts module could directly subscribe to the events and provide the delegates as necessary. There is no need for a singleton delegate in such a case. However, the queue alerts module facilitates the subscription to the events by letting the user focus exclusively on the rules. In addition, the queue alerts module provides more functionalities. For example, the queue alerts module translates all the rules registered to filter out the messages efficiently.  Secondly, the queue alerts module manages the lifetime of the messages and the actions performed on it. Thirdly, the queue alerts module makes the packets available in a streaming mode to the applications.
The rules to events mapping is avoided by having the rules evaluate against each of the message.  This means that all the rules registered by a single application are evaluated at once on the arrival of a message. If the rules evaluate positively, the message is copied to the buffer for the application to read. The messages are copied only as fast as the applications are reading. The messages are not copied to the applications if they are smaller than a size. It is better to provide a delayed write in such a case and this can be configurable. If the application provides a small buffer, the messages can be copied more often as if real-time. There can also be a timeout value specified which handles the case when messages are not available.  The flow of data is unidirectional from the source to the application.  The queue alerts module focuses on the buffers and the operations. If the buffers are per queue then it can handle bursts in traffic. As each message is pulled from the buffers, it is evaluated and acted upon either by the alerts module or by the application.
In both invasive (queue processor calls delegates) and non-invasive mode ( queue alerts module calls delegates ), the queue processor raises alerts. Additionally, the queue alerts module may mix and match delegates from different applications to each of the queues. As applications update the delegates or the conditions, the  queue alerts module reassigns the delegates across the queue buffers. Otherwise it has to evaluate the conditions for the delegate from all applications for every message.

Tuesday, April 29, 2014

Today we will refine the solution to the scalable queue alert design. If we take the approach that we want to subscribe to the events generated by the queue processor, then we must connect the events to the delegate. The event handling mechanism works by subscribing to the events with the Observer design pattern. The observers to the events provide a callback called Notify() and the subject for the observers has a method call NotifyObservers() that calls different notify on all the observers. Delegates are these callbacks. When the queue processor finds an event to raise at any point during its execution, the subscribers to the event know that the state changed because the raise method notifies all the subscribers registered.This is a behavior or interface that the events implement.
The events are not hierarchical. They are discrete. The events are encapsulated in the queues so that they are raised when the state of the queue or the message changes.
The delegates at this level of handling can then invoke the user level delegates that are registered for certain events. Events are generic but the queues that they belong to are specific. When the user specifies a filter, it may apply to one or more queues. Delegates may need to be added to all these queues. If the mapping between delegates and queues may not be clear from the filter such as when the filter is based on a  message attribute, it is added to all the queues and the delegates then decide based on the messages whether to take any action. In other words, the user level delegates may subscribe to as many or all events and then take the appropriate action given the queue and the message and the state. This means there is a single user level delegate that can take different actions based on different rules. In such a delegate, there would be several successive conditional checks involved.
We say that the rules are encapsulated in a single user level delegate, this is wired to all the events raised. When the event is raised we have the queue information, the message that it was acting on and the state such as arrival, process begin, process complete, depart etc.
In the queue alerts module, if we take the approach that we select the messages and the queues and store the individual rules to map against, we have a different data structure altogether. Here we get all the messages in a delta sweep  that are of interest to the rule evaluation and their corresponding actions, So we store a copy of the messages and queues outside of the queue processor. The mapping between different sets of messages for different rules is the purpose of this data structure. As such we could use different lists for each of the rules.

The design criteria for the queue alert module mentioned in the previous post include : 
Support for retries by the queue processor: The design of a queue alert module must consider the retries by the queue processor for the same message. All exception paths including dead letter queues should be considered for the same treatment.
Non-invasive: When possible, we should consider a non-invasive approach that doesn’t require instrumentation of the queues. In other words, it can work for any version of the queues and doesn’t affect the queue processing. It could work by sniffing the data changes or the logs.
Polling: Any polling approach must be robust and rely on relieving high CPU usages during its processing.
Support for transactional as well as non-transactional messages: The alerts module must work for both kinds of messages so that the user can specify the criteria and not be limited to only a set of messages. Concurrent processing of both messages must be supported.
Support for distributed transactions: When transactions involve messages across queues, this alert module should enable evaluating those messages as part of the transaction or at least log the transaction and the actions taken with the transactions so that the final state can be determined by the alerts module.
Support for clusters: The queues may not all be local to a machine and could be distributed on different nodes in a cluster or they may all be in a failover cluster. Alert module should target the messages and the queues and even the machines.
Scoping of alerts: Alerts need not be registered at the message level. They could be registered at the queue level or at the machine level. Whatever the hierarchy chosen would take care of all the alerts at the inner scope by the outer scope. This means that the CRUD on the alerts at a queue scope automatically performs the same at the message scope.
Changes to the rules or registration of alerts: Alerts registered with the alerts module will not take effect until the system reconfigures. This enables the changes to the alerts to be picked up for processing by the module and gives time for setup and cleanup operations by the module.
Deployment: The alerts module should come in a standalone service or executable so that it can be an add-on to existing queue processing. The module itself could be deployable by copying or via an installer.
Automatic administration of rules, actions, messages and queues could be performed where possible.
The use of message format: When interacting with the queues to read the messages, the alerts module will evaluate the fields of the messages against the criteria specified in the rules by the user. The message format should not be opaque and as in the case of MSMQ should expose known fields for evaluation against the rules.
Control of concurrency: The alerts module could make the invocation of actions registered with the rules as concurrent so that the evaluation of an action for a message does not block other.
Full-text or key-value search over message body: The expressions to search over the text of the messages could be resource intensive and optionally enabled. Rules to perform such search could be outside the alerts mechanism and done with the help of an indexer. As such this may not be in scope for the alerts module.
Text messages versus binary messages: The alerts module should support both formats. The module should rely on the fields versus the contents. Subsequent processing of say JSON vs. XML text could be offloaded to other systems.
Asynchronous communication mechanism: This could be enabled between producers and consumers so that they don’t block each other.

Performance: Volumes of hundred thousand transactions per submission that reach millions of transactions per day and involve several messages across different queues should be targeted. Working on a set of few messages or queues or rules or alerts at a time could enable this.

Monday, April 28, 2014

Today we will look at scalable queue alert design I describe a method to evaluate multiple queues for processing. Queues can have multiple messages. Messages can have different attributes. User would like to author rules for actions on queues based on attributes of both queues and  messages. If a message arrives in a queue, it is evaluated against all the rules authored by the users for the corresponding action to be taken. Rules comprise of conditions and actions. Conditions are expressions based on attributes and logical operators. Action can be any one of predetermined actions such as running a script or logging. The rules are specified in a user defined function. This helps the user to manage the rules. The rules are all evaluated against each message of each queue. This means that the attributes have to be deterministic, with no side effects and easy to lookup. 
When we scale out the queues, we are going to evaluate these rules based on each of the queues. When we process the messages we may do one after the other across queues. This means the user defined rules can work the same across queues and messages. 
The rules evaluation for any message in any queue will evaluate to one or more of the actions. The default action is a no-op which is not specified explicitly. No-op here in this case means that no additional actions will be triggered other than the default message processing by the queue. The alerting mechanism is independent of the queue processing and is checked right after the message is processed. This could be done right before the message processing but its only when the message is processed do we know that the current message has been handled. 
The queue alert mechanism can live outside the queue processing service. This implies that the queue alert mechanism can be used for journaling in a non-invasive manner. The action corresponding to the queue processing could be to log the messages.
Another use of the queue alert mechanism is to enable different actions to be specified for these messages. For example, action could be to launch a script for selected messages instead  of all messages. Scripts could trigger additional workflows
Trigger mechanism needs to be up-to-date with the queues. If the queues are added or deleted, then the rules may need to be re-defined. Evaluation of stale rules should default to no-op. This ensures execution of the messages.