Friday, August 25, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the throughput of ZooKeeper when the system is saturated and with various injected failure.It was seen that the throughput tanked for failure and recovery of a follower, failure and recovery of a different follower, failure of the leader, failure of the two followers in the first two marks and recovery at the third mark, and recovery of the leader. The most dip in throughput occurred with the failures of the leader. On the other hand, failure of the followers is tolerated with a quorum and therefore the throughput falls only as much as the failing read requests Also the leader election algorithm helps mitigate this further. Third even if the followers take more time to recover, ZooKeeper is able to raise the throughput with the distribution of the load after recovery.
The latency of requests was also measured. A worker process creates and deletes new node except that the deletes are asynchronous. The number of worker process is varied but a large number of nodes are attempted. The requests processed per second seemed to increase with the number of the workers but decrease with the number of servers. The average request latency was  found to be between 1.2ms - 1.4 ms.
A number of barriers was also executed sequentially to evaluate the behavior of primitives with ZooKeeper. For each barrier like a mutex, a client waits for all other clients before moving on to the instruction succeeding it. This is true for both entry and exit. The number of barriers were executed one after the other.  The time to process the barriers increased linearly with the number of barriers which indicated that the concurrent access to the data tree does not hamper the execution.  Also, the latency increased proportionally to the number of clients.
#codingexercise
Find the number of elements who have the same maximum number of duplicates in a contiguous sorted sequence
Solution:
        1. For each element in a contiguous sequence
        2.        Insert the element, count of repetitions in a dictionary
        3. Find the max count from the values in a dictionary
        4. for each key-value pair in the dictionary
                  if the value == max count
                      print the key

Thursday, August 24, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the throughput of ZooKeeper when the system is saturated and changes on various injected failure. A large number of clients connected and this number was kept the same while the servers were varied. This simulated a large load and it was seen that the number of servers had a negative impact on performance. Read throughput is consistently higher than write throughput.Write requests had to go through atomic broadcast. Transactions are logged to non-volatile store. These contribute to the difference in throughput.
In production systems, some performance is traded off for reliability especially because ZooKeeper is the source of truth for applications. More servers are used to tolerate more faults and partition write throughput. Load can be distributed because ZooKeeper has relaxed consistency guarantee. If all the requests were to be directed towards the leader, the read throughput goes down and even the write throughput is lower. This can be explained as the diminished ability of the leader to perform the atomic broadcast operations while servicing all requests. In fact, atomic broadcast is the only true limiting factor for the ZooKeeper. To measure this, the requests are all directed towards the leader  and the system is saturated. The broadcast protocol becomes CPU bound at maximum throughput  It is not the same as the performance with all write requests because some work goes towards client communication, ACL checks and transaction conversions.As mentioned earlier, some performance is traded off in favor of correctness and reliability but that said the authors stated that there is room for improvement with elimination of extra copies, multiple serializations and efficient internal data structures.
The experiments performed by the authors also included injecting failures such as killing server processes. For this the write requests were maintained at 30%.  It was seen that the throughput tanked for failure and recovery of a follower, failure and recovery of a different follower, failure of the leader, failure of the two followers in the first two marks and recovery at the third mark, and recovery of the leader. The most dip in throughput occurred with the failures of the leader. On the other hand, failure of the followers is tolerated with a quorum and therefore the throughput falls only as much as the failing read requests Also the leader election algorithm helps mitigate this further. Third even if the followers take more time to recover, ZooKeeper is able to raise the throughput with the distribution of the load after recovery.

#codingexercise
Given a list of 1s and 0s and an integer m, find the positions of upto m zeros which when flipped gives the maximum contiguous 1s.

void PrintPositions(List<int> A, int m)
{
// sliding window boundaries
int l = 0;
int r = 0;
// solution involving best size and boundaries
int best = INT_MIN;
int start = 0;
int end = 0;
// count of zeros
int c = 0;
while (r < A.Count)
{
if ( c <= m)
{
   r++;
   if (A[r] == 0) c++;
}
if (c > m)
{
   if (A[l] == 0) c--;
   l++;
}
if ( r - l > best)
{
best = r - l;
start = l;
end = r;
}
}
if ( best > INT_MIN)
{
   Console.WriteLine();
   for (int i = start; I <= end; I++)
        if (A[I] == 0)
            Console.Write("{0} ", i);
   Console.WriteLine();
}
}

In the above sliding window, we can also eliminate one of the best, start and end variables.

Wednesday, August 23, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the throughput of ZooKeeper when the system is saturated and changes on various injected failure. A large number of clients connected and this number was kept the same while the servers were varied. This simulated a large load and it was seen that the number of servers had a negative impact on performance. Read throughput is consistently higher than write throughput.Write requests had to go through atomic broadcast. Transactions are logged to non-volatile store. These contribute to the difference in throughput.

#codingexercise
Find three elements in an array that satisfy Pythagorean theorem
Solution:
        1. Square all the elements
        2. Sort all the squares
        3. For every element from the end as item
                  Iterate from start to the element just previous to this item as candidate
                            let difference = item - candidate
                                 if difference >= candidate and difference exists between candidate and item by binary_search
                                     return Tuple<int, int, int> as candidate, difference, item

Software for  Email Campaign updated: https://1drv.ms/w/s!Ashlm-Nw-wnWsEXQ3UmFVYv0GpFe  

Tuesday, August 22, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the durability guarantee of ZooKeeper. On every read request, a zxid is returned that relates to the last transaction seen by the server. Since the writes are all transactional, this zxid indicates a partial order of the read requests.  If a client connects to a different server, that server ensures that its view of the ZooKeeper data is at least as current as that of the client by comparing its zxid with that of the client. If the client is more recent, the server does not reestablish a session until it has caught up. Since a majority of the ZooKeepers would be current before the client received the zxid, the client is guaranteed to find another server that has a recent view of the system.
Client session failures are detected using timeouts. The leader determines that the client is gone if none of the servers received anything within the timeout. The client is encouraged to send a heartbeat if there are no active requests for a long time. This is automatically done in the client library from ZooKeeper.
The evaluations done with ZooKeeper by the authors included measuring throughput when the system is saturated and changes on various injected failure. A large number of clients connected and this number was kept the same while the servers were varied. This simulated a large load and it was seen that the number of servers had a negative impact on performance. Read throughput is consistently higher than write throughput.Write requests had to go through atomic broadcast. Transactions are logged to non-volatile store. These contribute to the difference in throughput.

Software for  Email Campaign updated: https://1drv.ms/w/s!Ashlm-Nw-wnWsEXQ3UmFVYv0GpFe  

Monday, August 21, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the components of ZooKeeper - the request processor, the atomic broadcast and the replicated database. The request processor is one which prepares the request for processing that also includes co-ordination activities among servers for write requests. If the request processing involves co-ordination, it is handled by an agreement protocol which is an implementation of atomic broadcast. The changes are committed in ZooKeeper database that is replicated across all servers.  This database is periodically snapshot because replaying all the messages in order would take too long to recover state. During the snapshot, the ZooKeeper state is not locked so the snapshots don't really reflect the state of the ZooKeeper at any point of time. However since the transactions are idempotent, snapshots allow the state to be restored because the changes can be applied more than once and they are in the same order as in replay.
We will now see how ZooKeeper provides durability.  On every read request, a zxid is returned that relates to the last transaction seen by the server. Since the writes are all transactional, this zxid indicates a partial order of the read requests. All responses including the heartbeats during periods of idle activity include the last zxid seen by the server that the client is connected to. If a client connects to a different server, that server ensures that its view of the ZooKeeper data is at least as current as that of the client by comparing its zxid with that of the client. If the client is more recent, the server does not reestablish a session until it has caught up. Since a majority of the ZooKeepers would be current before the client received the zxid, the client is guaranteed to find another server that has a recent view of the system. This gurantees durability. ZooKeeper also maintains a session timeout to detect client  session failures. Generally clients re-establish a session. The client library for ZooKeeper actually sends client heartbeats and switches servers if the servers are not responsive enough.
Software for  Email Campaign: https://1drv.ms/w/s!Ashlm-Nw-wnWsEXQ3UmFVYv0GpFe  
#codingexercise
Another method for finding the number of duplicates in a sorted contiguous sequence is similar to the earlier  binary search based method of traversing linearly from the current element but searching for the previous and next elements instead.

Sunday, August 20, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We were discussing the components of ZooKeeper - the request processor, the atomic broadcast and the replicated database. The request processor is one which prepares the request for processing that also includes co-ordination activities among servers for write requests. If the request processing involves co-ordination, it is handled by an agreement protocol which is an implementation of atomic broadcast. The changes are committed in ZooKeeper database that is replicated across all servers.  This database is periodically snapshot because replaying all the messages in order would take too long to recover state. During the snapshot, the ZooKeeper state is not locked so the snapshots don't really reflect the state of the ZooKeeper at any point of time. However since the transactions are idempotent, snapshots allow the state to be restored because the changes can be applied more than once and they are in the same order as in replay.
Writes and reads for clients are now handled easily with the above design  Since the server processes writes in order and there is no concurrency for writes, even the notifications are strictly ordered. On every write, the notifications are sent out and cleared.  Only the server that the client connects to, sends out notifications to the client. Read requests are also handled locally. Read requests are also directly served from the in-memory database and do not involve the request processor or the atomic broadcast. Such fast reads however however do not guarantee a precedence order for the reads which means they may read state values even though more recent changes have been committed. To alleviate this a 'sync' primitive is provided that ensures global consistency by performing the equivalent of flushing all writes. There is no broadcast required for sync, instead  it is placed at the end of the queue of requests between the leader and the server executing the call to sync.  The follower must be sure that the leader has not changed. This can be ascertained by looking at the pending queue. if the queue is not empty, then the leader is the same. If the queue is empty, the leader needs to issue a null transaction and orders the sync after that transaction.  A null transaction is not issued then the leader realizes it is not a leader before the follower abandon the leader.
#codingexercise
int binary_search(String input, int start, int end, char val)
{
int mid = (start + end)/2;
if (input[mid] == val) return mid;
if (start == end && input[mid] != val) return -1;
if (input[mid] < val)
return binary_search(nums, mid+1, end, val);
else
return binary_search(nums, start, mid, val);

}
int GetCount(String input, char val)
{
int current = binary_search(input, 0, input.Length - 1, val);
if (current == -1) return 0;
int mid = current;
while (current >= 0 && input[current] == val) current--;
current++;
int start = current;
current = mid;
while (current <= input.Length-1 && input[current] == val)
     current++;
current--;
int end = current;
return end-start+1;
}


Saturday, August 19, 2017

We continue discussing the ZooKeeper. It is a co-ordination service with elements from group messaging, shared registers and distributed lock services. It provides a interface that guarantees wait-free property and FIFO execution of requests from each client.  Requests across all clients are also linearized.
We saw a few examples of primitives that are possible with ZooKeeper. These primitives exist only on the client side. These included implementing dynamic configuration in a distributed application, specifying group membership and to implement locks and double barriers. These have been put to use with applications such as Yahoo web crawler which uses this for its page fetching service. Katta, a distributed indexer uses this for coordination. Even high traffic Yahoo message broker which is a distributed publish-subscribe system uses this for managing configuration, failure detection and group membership. We discussed their usages in detail.
We next review the components in the implementation of ZooKeeper. It provides high availability by replicating the ZooKeeper data  These include the following 1) request processor, 2) Atomic Broadcast and 3) replicated database
The request processor is one which prepares the request for processing that also includes co-ordination activities among servers for write requests. If the request processing involves co-ordination, it is handled by an agreement protocol which is an implementation of atomic broadcast. The changes are committed in ZooKeeper database that is replicated across all servers.  This database is an in-memory database containing the entire data tree. Each Znode is about 1MB of data by default and this can be configured. Disk updates are logged and updates are write-through.  They also keep a replay or write ahead log of committed operations and take snapshots of their in-memory database.
Clients connect to only one server each. If they perform read operations, they directly read the database avoiding the agreement protocol.  Write requests are forwarded to a leader which notifies the others with messages consisting of state changes and agreed upon state changes. Write requests are in a transaction and the messaging layer is atomic, so servers do not have local replicas that diverge. A request may not be idempotent but a transaction is one.  The leader converts a request into a transaction describing the state of the system when the request is applied. With the help of transactions, all requests are translated into success transactions or error transactions and the database is kept consistent. The agreement protocol is initiated by the leader which executes the request and broadcasts the change with Zab using a simple majority quorum.  The request processing pipeline is kept full to achieve high throughput.  Zab delivers the messages in order and exactly once. Even if a message is redelivered during recovery, all transactions are idempotent.