Sunday, August 20, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the components of ZooKeeper - the request processor, the atomic broadcast and the replicated database. The request processor is one which prepares the request for processing that also includes co-ordination activities among servers for write requests. If the request processing involves co-ordination, it is handled by an agreement protocol which is an implementation of atomic broadcast. The changes are committed in ZooKeeper database that is replicated across all servers.  This database is periodically snapshot because replaying all the messages in order would take too long to recover state. During the snapshot, the ZooKeeper state is not locked so the snapshots don't really reflect the state of the ZooKeeper at any point of time. However since the transactions are idempotent, snapshots allow the state to be restored because the changes can be applied more than once and they are in the same order as in replay.
Writes and reads for clients are now handled easily with the above design  Since the server processes writes in order and there is no concurrency for writes, even the notifications are strictly ordered. On every write, the notifications are sent out and cleared.  Only the server that the client connects to, sends out notifications to the client. Read requests are also handled locally. Read requests are also directly served from the in-memory database and do not involve the request processor or the atomic broadcast. Such fast reads however however do not guarantee a precedence order for the reads which means they may read state values even though more recent changes have been committed. To alleviate this a 'sync' primitive is provided that ensures global consistency by performing the equivalent of flushing all writes. There is no broadcast required for sync, instead  it is placed at the end of the queue of requests between the leader and the server executing the call to sync.  The follower must be sure that the leader has not changed. This can be ascertained by looking at the pending queue. if the queue is not empty, then the leader is the same. If the queue is empty, the leader needs to issue a null transaction and orders the sync after that transaction.  A null transaction is not issued then the leader realizes it is not a leader before the follower abandon the leader.
#codingexercise
int binary_search(String input, int start, int end, char val)
{
int mid = (start + end)/2;
if (input[mid] == val) return mid;
if (start == end && input[mid] != val) return -1;
if (input[mid] < val)
return binary_search(nums, mid+1, end, val);
else
return binary_search(nums, start, mid, val);

}
int GetCount(String input, char val)
{
int current = binary_search(input, 0, input.Length - 1, val);
if (current == -1) return 0;
int mid = current;
while (current >= 0 && input[current] == val) current--;
current++;
int start = current;
current = mid;
while (current <= input.Length-1 && input[current] == val)
     current++;
current--;
int end = current;
return end-start+1;
}


Saturday, August 19, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We saw a few examples of primitives that are possible with ZooKeeper. These primitives exist only on the client side. These included implementing dynamic configuration in a distributed application, specifying group membership and to implement locks and double barriers. These have been put to use with applications such as Yahoo web crawler which uses this for its page fetching service. Katta, a distributed indexer uses this for coordination. Even high traffic Yahoo message broker which is a distributed publish-subscribe system uses this for managing configuration, failure detection and group membership. We discussed their usages in detail.
We next review the components in the implementation of ZooKeeper. It provides high availability by replicating the ZooKeeper data  These include the following 1) request processor, 2) Atomic Broadcast and 3) replicated database
The request processor is one which prepares the request for processing that also includes co-ordination activities among servers for write requests. If the request processing involves co-ordination, it is handled by an agreement protocol which is an implementation of atomic broadcast. The changes are committed in ZooKeeper database that is replicated across all servers.  This database is an in-memory database containing the entire data tree. Each Znode is about 1MB of data by default and this can be configured. Disk updates are logged and updates are write-through.  They also keep a replay or write ahead log of committed operations and take snapshots of their in-memory database.
Clients connect to only one server each. If they perform read operations, they directly read the database avoiding the agreement protocol.  Write requests are forwarded to a leader which notifies the others with messages consisting of state changes and agreed upon state changes. Write requests are in a transaction and the messaging layer is atomic, so servers do not have local replicas that diverge. A request may not be idempotent but a transaction is one.  The leader converts a request into a transaction describing the state of the system when the request is applied. With the help of transactions, all requests are translated into success transactions or error transactions and the database is kept consistent. The agreement protocol is initiated by the leader which executes the request and broadcasts the change with Zab using a simple majority quorum.  The request processing pipeline is kept full to achieve high throughput.  Zab delivers the messages in order and exactly once. Even if a message is redelivered during recovery, all transactions are idempotent.

Friday, August 18, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We saw a few examples of primitives that are possible with ZooKeeper. These primitives exist only on the client side. These included implementing dynamic configuration in a distributed application, specifying group membership and to implement locks and double barriers. These have been put to use with applications such as Yahoo web crawler which uses this for its page fetching service. Katta, a distributed indexer uses this for coordination. Even high traffic Yahoo message broker which is a distributed publish-subscribe system uses this for managing configuration, failure detection and group membership.
The fetching service is part of the Yahoo crawler and involves a master that commands page fetching processes. The masters provide the fetchers with configuration, and the fetchers keep the master posted with status and health. ZooKeeper gives the advantage to recover from failures of masters, guaranteeing availability despite failures and decoupling  the client from the servers.
Similarly Katta divides the work into shards and a master assigns shards to slaves Katta uses this to track the status of slave servers and the master to handle reelecting a master. Katta uses the ZooKeeper to track and propagate the assignment of shards to slaves.
The servers of a Yahoo message broker use a shared-nothing distributed architecture which makes co-ordination essential for correct operation and ZooKeeper is used to manage the distribution of topics, deal with failures and control system processes.
#codingexercise
Given an index of an element in a sorted array with duplicates, count the number of duplicates
int GetCount(String input, char val)
{
int current = binary_search(input, 0, input.Length - 1, val);
if (current == -1) return 0;
int mid = current;
while (current >= 0 && input[current] == val) current--;
current++;
int start = current;
current = mid;
while (current <= input.Length-1 && input[current] == val)
          current++;
current--;
int end = current;
return end-start+1;
}

Thursday, August 17, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We saw a few examples of primitives that are possible with ZooKeeper. These primitives exist only on the client side. For example, 1) ZooKeeper can be used to implement dynamic configuration in a distributed application.  This is one of the basic schemes where each configuration is stored in the form of ZNode and tasks that are dependent on each other are stored as Rendezvous nodes. 2) GroupMembership is another common co-ordination requirement. This primitive is implemented using ephemeral nodes. 3) ZooKeeper can also be used to implement locks. Applications implement locks using lock files represented by a ZNode. To acquire a lock, the client tries to create the designated node with the ephemeral flag. If the create succeeds, the client holds the lock otherwise the client watches the node in case the leader dies. Whenever the znode is deleted, other clients may try to acquire the lock.  We see that this locking scheme  ensures that the removal of a znode causes only one client  to wake up. There is no polling or timeouts. and the state of the ZNodes helps with monitoring, troubleshooting and debugging.  In addition to simple locks, ZooKeeper also has read-write locks These locks differ from simple locks where only earlier write lock znodes prevent a client from obtaining a read lock. Read locks may be shared. Here the herd effect is put to use so that all read clients may be used.
Double barriers are yet another locking mechanism. They enable client to synchronize the beginning and end of a computation.  The threshold for the barrier decides how many processes join. They leave the barrier only when they are done. These processes create and delete a child of the znode maintained by the barrier. Processes can leave the barrier when all of the processes have removed their children. When the threshold is exceeded, a ready znode is created which is watched by all waiting processes. This allows the processes to know when to enter. Similarly to leave, the processes watch for a znode to disappear. Together these locking services enable a wide variety of applications.  For example, Yahoo web crawler uses this for its page fetching service. Katta, a distributed indexer uses this for coordination. Even high traffic Yahoo message broker which is a distributed publish-subscribe system uses this for managing configuration, failure detection and group membership.
#codingexercise
Given a sorted string, find number of occurrences of a given character in string.
int binary_search(String input, int start, int end, char val)
{
int mid = (start + end)/2;
if (input[mid] == val) return mid;
if (start == end && input[mid] != val) return -1;
if (input[mid] < val)
return binary_search(nums, mid+1, end, val);
else
return binary_search(nums, start, mid, val);
}
int GetCount(String input, char val)
{
int current = binary_search(input, 0, input.Length - 1, val);
if (current == -1) return 0;
while (current >= 0 && input[current] == val) current--;
current++;
int start = current;
while (current <= input.Length-1 && input[current] == val)
          current++;
current--;
int end = current;
return end-start+1;
}
Optimization: We could also skip ahead to resume counting in the other direction from the location we started 

Wednesday, August 16, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We now look at some examples of primitives that are possible with ZooKeeper. These primitives exist only on the client side.
ConfigurationManagement - ZooKeeper can be used to implement dynamic configuration in a distributed application.  This is one of the basic schemes where each configuration is stored in the form of ZNode. Processes wake up with the fullname of ZNode and set a watch on it. If the configuration is updated, the processes are notified and they apply the updates.
Rendezvous Node. Sometimes a configuration change is dependent others. For example, host and port may be known only afterwards. In this case clients can co-ordinate with a rendezvous node.
The node is filled in with details as and when they become available. The workers set a watch on this node before they begin their changes.
GroipMembership is another common co-ordination requirement. This primitive is implemented using ephemeral nodes. A node is designated to represent the group and is created at the start of the session. When a process member of the group starts, it creates an ephemeral child node under this group node. This child node keeps information for the process that creates it. If the process fails or exits, the node is automatically removed. Group membership can simply be enumerated by listing these child nodes.
Simple Locks: ZooKeeper can also be used to implement locks. Applications implement locks using lock files represented by a ZNode. To acquire a lock, the client tries to create the designated node with the ephemeral flag. If the create succeeds, the client holds the lock otherwise the client watches the node in case the leader dies. Whenever the znode is deleted, other clients may try to acquire the lock. There are two side-effects of this locking mechanism: First, there is a herd effect when all clients vie for a lock even though only one acquires it. Second, it only implements exclusive locking.
Locks can also be created without herd effect.   A node is designated as the lock node. Each of the clients do the following:
Create an ephemeral or sequential node as a child node under the lock node.
Enumerate the children under the lock node.
If the created node is the lowest sequential among the children, the client has acquired the lock and exits
If there is a node preceding the created node, a watch is set on it
The process is repeated by enumerating the children under the lock node.
To release the lock, the node may merely be deleted.
This completes the locking primitive implemented by ZooKeeper.

Tuesday, August 15, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
ZooKeeper has two basic guarantees - linearizable writes and FIFO client order.  All requests that update the state of ZooKeeper are serializable and respect precedence. All requests from a given client are executed in order that they were sent by the client. ZooKeeper handles both by designating a path as the "ready" ZNode. The other clients may make changes only when that ZNode exists.
There are a few caveats with these guarantees.
If a process sees the ready exists before the new leader starts to make a change, then starts reading the configuration while the change is in progress, then it will see an inconsistent state.  This is solved with the ordering guarantees of the notifications where the notification happens before the client sees the new state.
Another consideration is that clients may have their own communication channels in addition to ZooKeeper. For example, two clients may have  a shared configuration and a shared communication channel. Changes from one may lag behind the other and may not see the updates until it issues a write before re-reading the configuration. For this purpose, ZooKeeper provides a sync request which when followed by a read is equivalent to a slow read. This primitive is similar to flush.
Since ZooKeeper maintains several servers, it is able to provide durability guarantees based on a simple majority. As long as there is a simple majority of servers up and running, it will respond successfully to a change request. Similarly as long as this quorum of servers is maintained, it can eventually recover from any number of failures. These guarantees therefore conclude the requirements for durability and consistency from this co-ordination service.
#codingexercise
Generate palindromes less than n
int ReverseAndAppend(int number, int base, bool isOdd)
{
int n = number;
int result = number;
if (isOdd)
    n /= base;
while (n > 0)
{
result = result * base + (n % base);
n /= base;
}
return result;
}
void GeneratePalindromes(int n)
{
int  result = 1;
for (int j = 1; j <= 2; j++)
{
int i = 1;
result = ReverseAndAppend(i, 10, j%2);
while( result < n)
{
Console.WriteLine(result);
i++;
result = ReverseAndAppend(i, 10, j%2);
}
}
}

Monday, August 14, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
Both the linearizable writes and the FIFO client order form the guarantees for ZooKeeper. These two guarantees also interact.  ZooKeeper handles both by designating a path as the "ready" ZNode. The other clients may make changes only when that ZNode exists.
There are a few caveats with these guarantees.
If a process sees the ready exists before the new leader starts to make a change, then starts reading the configuration while the change is in progress, then it will see an inconsistent state.  This is solved with the ordering guarantees of the notifications where the notification happens before the client sees the new state.
Ideas for source code visualization tool: https://1drv.ms/w/s!Ashlm-Nw-wnWsEOTGP5op9hPrVVe