Sunday, November 15, 2015

Parallel and Concurrent Programming in NoSQL databases.

In NoSQL databases, the preferred method of parallelizing tasks is the MapReduce algorithm. It is a simple scatter gather programming model that enables all mappers to behave the same on their respective localized subgroups of data and a reducer that can aggregate the data from all the mappers. This applies well to algorithms  that calculate sums over the entire data range.
However, we look at dataflow parallelism that is not as strict or simplistic and enables even more algorithms to be applied to Big Data.  In Dataflow processing ,  there is a foundation for message passing and parallelizing CPU intensive and I/O intensive applications that have high throughput and low latency. It also gives you explicit control over how data is buffered and moved around the system While traditionally, we required callbacks and synchronization objects, such as locks, to coordinate tasks and access to shared data, by using this programming model, we can create data flow objects that process the data as they become available.  You only declare how the data is handled when it becomes available and also any dependencies between the data. Requirements to synchronize the shared data and other maintenance activities are common across algorithms and can be facilitated with a library. Moreover, this library can schedule asynchronous arrival of data, so tthroughput and latency can be improved. This kind of stream processing enables algorithms to be applied that are difficult to be expressed in Summation Form.
This framework consists of dataflow blocks which are data structures that buffer and process data. There are three kinds of dataflow blocks. Source blocks, target blocks and propagator blocks. A source block acts as a  source of data and can be read from. A target block acts as both a source block and a target block, and can be read from and written to. Interfaces can be defined separate for sources, targets and propagators. Propagator interface should inherit from both source and target. By implementing the interfaces, we define dataflow blocks that form a pipeline which are linear sequences of dataflow blocks or networks which are graphs of dataflow blocks. A pipeline is a simple case of a network and one where a source asynchronously propagates data to targets.  As more and more data blocks are added and removed, the blocks handle all thread safety operations of linking and unlinking. Just like protocol handlers, blocks can choose to accept or reject message as in a filtering mechanism. This enables explicit data blocks to be earmarked for processing
This programming model  uses the concept of message passing where independent  components of a program communicate with one another by sending messages. One way to propagate messages is to call say a post method which acts asynchronously. Source blocks offer data to target blocks by calling the OfferMessage method. The target block responds to an offered message returning either an Accepted, Declined or Deferred status. When the target block requires the message, it calls ConsumeMessage or calls ReleaseReservation when it defers a  message.
Dataflow blocks also support the concept of completion.  A dataflow block that is in the completed state does not perform any further work. Each dataflow block has an associated task  and it represents the completion status of the block. Because we can wait for a task to finish, we can wait for the terminal nodes of a dataflow network to finish. Tasks can throw an exception or return an error on failures thus denoting success otherwise. As with most tasks, dataflow blocks can be paused, resumed or canceled thus enabling robustness and reliability.

Some dataflow blocks can be predefined – for example we can have buffering blocks, execution blocks and grouping blocks which have now relaxed the count of mappers and reducers in the conventional way of using MapReduce algorithms. The degree of parallelism can also be explicitly specified for these dataflow blocks or it can be handled automatically.

Let us take an example of the summation form to compute the mean
For example if we take 3,1,5, 7, 9  the mean is 25/5 = 5
If we were to add 10,15,17,19,24, the mean would now become 85  + 25 / 10 = 11
We could arrive at this by adding 85 /5  = 17
and taking the sum (5 + 17) / 2 for two equal ranges otherwise we take the weights based on their element count = 11
In this case we are reusing the previous computation of the data and we don't have to revisit that data again. 

Saturday, November 14, 2015

Let us look at applying GLMNet algorithm to big data using MapReduce in continuation of our discussion of previous arguments.
This is one of the popular algorithms for Machine Learning. As with most regression analysis, more data is better. But this algorithm can work even when we have fewer data because it gives methods to 1) determine if we are overfitting and
2) hedge our answers if we are overfitting.
With ordinary least squares regression, objective is to minimize the sum squared error between actual values and our linear approximation. Coefficient shrinkage methods add a penalty on coefficients.
It yields a family of regression solutions - from completely insensitive to input data to unconstrained ordinary least squares.
First we build a Regularized Regression. We want to eliminate over-fitting because it tunes the model and it gives best performance on held out data.
In this step in order to avoid over-fitting, we  have to exert control over degrees of freedom in regression. We cut back on attributes when we make a subset selection  and we penalize regression coefficients with coefficient shrinkage, ridge regression and lasso regression.
With coefficient shrinkage, different penalties give solutions with different properties.
With more than one attribute the coefficient penalty function has important effects on solutions. Some choices of coefficient penalty give control over sparseness of the solutions. Two types of co-efficient penalty are most frequently used: sum of squared coefficients and sum of absolute value of coefficients. GLMNet algorithm incorporates ElasticNet penalty which is a weighted combination of sum squares and sum of absolute values.
Courtesy : Michael Bowles on applying GLMNet to MapReduce and PBWorks for GLMNet.
What I find interesting in these discussions it the use of a coefficient shrinkage, a somewhat complicating factor,  which seems to improve an otherwise simple technique of regression. Similarly 'summation form' lends itself to applying on MapReduce because different mappers can compute those sums locally for their group and reducer can aggregate them and is inherently simple However we could achieve even more sophisticated MapReduce beyond the summation form to incorporate more algorithms based on other techniques of parallelization. Such techniques include not only partitioning data and computations but also time-slicing. One such example is to parallelize iterations. Currently only a reducer does iterations based on the aggregation of previous iteration variables from mappers but if this could be pushed down to  mappers where the reducer may aggregate not just by sum but by other functions such as max, min or count.

Friday, November 13, 2015

Extending NoSQL Databases with User Defined document types and key values
The structure and operations on the NoSQL databases facilitate querying against values. For example, if we have documents in XML, we can translate them to JSON and store as documents or key-values in the NoSQL databases. Then a query might look like the following:
Db.inventory.find(  {  type: “snacks” } )
This will return all the documents where the type field is “snacks”.  The corresponding map-reduce function may look like this:
Db.inventory.mapReduce(
       Map à function() {emit(this.id, this.calories);},
       Reduce à function(key, values) { return Array.sum(values) },
                      {
      query à  query: {type: “snacks”},
       output à out: “snack_calories”
                  }
                )
This works well for json data types and values. However, we are not restricted to the builtin types. We can extend the key values with user defined types and values. They will just be marked differently from the builtin types. When the mapper encounters data like this, it loads the associated code to interpret the user types and values. The code applies the same query operators such as equality and comparision against values that the mapper would have done if it were in native JSON format. This delegation of interpretation and execution allows the NoSQL databases to be extended in forms such as computed keys and computed values.
Let us take the above example where the calories have to be computed from ingredients.
In this case, the code would look like the following
Function (ingredients, calories){ 
var total_calories = 0;
ingredients.forEach(ingredient, index, ingredients){
                total_calories += calories[index];
}
Return total_calories;
}
While this logic for computed key –values can be written outside the database as map-reduce jobs, this logic can stay as close to the data it operates on and consequently be stored in the database.
Moreover logic can be expressed with different runtimes and each runtime can be loaded and unloaded to execute the logic.
One advantage of having a schema for some of the data is that it brings you the seamless use of structured queries to these specific data. As an example, we can even use XML data itself given the XPath queries that can be run on them. Although we will load an XML parsing runtime for this data, it will behave the same as other data types for the overall Map-Reduce.
Another example of user defined datatype is tuples. Tuples are easier to understand both in terms of representation and search. Let us use an example here:
We have a tuple called ‘Alias’ for data about a person. This tuple consists of (known_alias, use_always, alternate_alias). The first part is text, the second Boolean and the third is a map<text, text>
The person data consists of id, name, friends and status.
We could still insert data into the person using JSON as follows:
[{"id":"1","name":"{"firstname":"Berenguer", "surname": "Blasi", "alias_data":{"know_alias":"Bereng", "use_alias_always":true}}", "friends":"[{"firstname":"Sergio", "surname": "Bossa"}, {"firstname":"Maciej", "surname": "Zasada"}]"}]'
However, when we search we can explicitly use the fields of the type as native as those of the JSON.
There are standard query operators of where, select, join, intersect, distinct, contains, SequenceEqual that we can apply to these tuples.
The reason tuples become easier to understand is that each field can be dot notation qualified and the entire data can be exploded into their individual fields with this notation as follows:
<fields>
<field indexed="true" multiValued="false" name="id" stored="true" type="StrField"/>
<field indexed="true" multiValued="true" name="friends" stored="true" type="TupleField"/>
<field indexed="true" multiValued="false" name="friends.firstname" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.surname" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.alias_data" stored="true" type="TupleField"/>
<field indexed="true" multiValued="false" name="friends.alias_data.known_alias" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.alias_data.use_alias_always" stored="true" type="BoolField"/>
:
:
</fields>
The above example is taken from Datastax and it serves to highlight the seamless integration of tuples or User Defined types in NoSql databases.

In addition, Tuples/UDTs are read/written in one single block, not on a per field basis, so they are read as a single block read and write. Tuples/ UDTs can also participate in a map like data model although the are not exactly map values. For example, a collection of tuples/UDTs have a type field that represent what would have been the map key. We just have to declare a UDT type that includes tuples as well and for this UDT we specify the search the same way as in a map like data model but using dot notations for the fields . For example we can search with {!UDT}alias.type:Bereng AND alias.use_alias_always:True

#coding question
Determine the maximum gradient in a sequence of sorted numbers
Int GetMaxGradient (int [] numbers)
{
   Int max = 0;
   For (int I = 1; I < numbers.length; i++){
     Int grad = Math.abs (numbers [i] - numbers [i-1]);
If (grad > max) max = grad;
    }
Return max;
}

Tuesday, November 10, 2015

Let us look at  a few examples of using summation form to a few chosen algorithms that were selected because of their popularity with NIPS. The summation form is one that is suitable for MapReduce jobs.
1) Locally Weighted Linear Regression (LWLR) This one is solved by solving the normal equations A.theta = b where A is the matrix formed by summing components. Each component is a weighted combination of the training vector and its transpose. The term on the right hand side of the normal equation is also a summation of components but one consisting of the weighted combination of training vector and training label. Since there are two different kinds of summation, one set of mappers compute one kind and another set of mappers compute another kind. Two reducers respectively sum up the partial values for A and b, and the algorithm finally computes the solution by calculating A-inverse on b. If the weights are ignored, this problem reduces to the case of ordinary least squares.
2) Naive Bayes - we have to estimate conditional probabilities here. One that involves either the condition that a  support label occurs or another that it does not occur and calculate the probabilities for the support vector with these conditions. In order to do so on MapReduce, a summation is done over the support vector to be k for each support label in the training data to calculate the probability that the support of a support vector given a support label. Therefore in this case we need different kinds of mappers - one to compute the subgroup for the support vector to be k when the support label doesn't, another to calculate the subgroup for the support vector to be k when the support label occurs,  and another for subgroup all when the support label doesn't occur and another for the subgroup for all when the support label does occur. Notice that these four categories match precision and recall definitions. The reducer then sums up intermediate results to get the final results for the parameters.
3) Gaussian Discriminative Analysis -  The classic GDA algorithm needs to learn the following four statistics - probability that a support label will occur, mean with a support label, mean without a support label and a summation that the data occurs with the condition of the chosen support label.  This method uses the normal distribution of each class / label. The normal distribution is usually represented by the mean and standard deviation for the class. For each label we compute that the data belongs to it. The class with the highest probability is chosen. Consequently, the mappers are different for each of these subgroups.Finally one reducer aggregates the intermediate sums and calculate the final result for the parameters.
4) K-means Here we know how many clusters we want. Therefore, we can divide the dataset into that many subgroups. Each mapper runs through local data, determines the closest centroid and accumulates the vector sum of points closest to each centroid. These are emitted as key and tuple of partial sum of set and set index. Reducer aggregates sum from all mappers and calculates new centroid.
5) Logistic Regression (LR). In this algorithm, the formula that lends itself to MapReduce is one described as (1/1 + exp(-Theta-transposed times x)). Learning is done by fitting theta to the training data where the likelihood function can be optimized. In each iteration of the algorithm, the theta is improved. This is done with multiple mappers that sum up subgroups of partitioned data. The iterations also take previous gradient computed for improving the theta and use it in the current iteration. The reducer sums up the values for the gradient and hessian to perform the update for theta.
6) Neural Network: This uses a strategy called backpropagation.  Each mapper propagates its set of data through the network.  A three layer network is used for this purpose with two output neurons classifying the data into two categories. Each of the weights in the network will be adjusted by backpropagating the error that is used to calculate the partial gradient for each of the weights in the network. The reducer then sums the partial gradient from each mapper and does a batch gradient descent to update the weights of the network.
7) Principal Component . This algorithm computes the principal eigenvectors of the covariance matrix which is expressed with the first term as 1/m times the summation of the support vector and support vector transposed minus the second term comprising of  mean and mean transposed. The latter term can also be expressed as summation form. Therefore the covariance matrix is all in summation form. The sums can be mapped to different cores and the reducer can aggregate them.
8) Independent component analysis -  This is a method where the observed data is assumed to be linearly transformed from the source data and tries to identify the independent source vectors. So the goal is to find the unmixing matrix W, Since the transformations are linear, we can use batch gradient descent  to optimize the W's likelihood. Each mapper therefore independently calculates the unmixing matrix and they are aggregated in the reducer.
#coding exercise
How to know which side of a sorted array has a steeper gradient
void PrintSideWithSteeperGradient(int[] numbers)
{
   if (numbers.length <= 0) {console.writeline("None"); return;}
   if (numbers.length == 1) {console.writeline("Both"); return;}
   int leftstart = 0;
   int leftend = numbers.length/2-1;
   if (numbers.length %2 != 0) { leftend = numbers.length/2;}
   int rightstart = numbers.length/2;
   int rightend = numbers.length -1;
   double leftgradient = 0;
   double rightgradient = 0;
   if (leftstart == leftend) leftgradient = numbers[leftstart];
   if (rightstart == rightend) rightgradient = numbers[rightend];
   if (leftstart < leftend) leftgradient = (numbers[leftend] - numbers[leftstart])/(leftend-leftstart);
   if (rightstart < rightend) rightgradient = (numbers[rightend] - numbers[rightstart])/(rightend-rightstart);
   if (leftgradient == rightgradient) Console.WriteLine("Both");
   else if (leftgradient > rightgradient) Console.WriteLine("Left");
   else Console.WriteLine("Right");
}


We return to the discussion of applying machine learning theorems with mapreduce

In expectation maximization  the expected pseudo count number is calculated and this is used to update the probability, mean and sum from the training data. The probability  is computed on subgroups by having each mapper compute the expected pseudo count for its subgroup and the reducer aggregating it and dividing it by m. To calculate mean, each mapper computes the partial sum of  the expected pseudo weight and the support  vector as well as the sum of the pseudo counts. The reducer then aggregates the partial sums and divides it. For the summation, each mapper calculates the subgroup sum of the expected pseudo count times the difference of the support vector from the mean times this difference transposed. As for the mean, the sum of expected pseudo counts
for the local subgroup is also turned in by the mapper. The reducer then sums up the partial result and divides them.

In Support Vector Machine - the linear SVMs primary goal is to optimize the minimum square of weight vector and a constant times the slack variable for either hinge loss or quadratic loss. such that the training vector times the displaced support vector is greater than 1 - slack variable. These are usually written as summation forms of delta and hessian. The mappers will calculate the partial gradients  and the reducer will sum up the partial results to update the weight vector.

Monday, November 9, 2015

Today we start reviewing the paper Map-Reduce for Machine Learning on Multicore by Chu et al. In this paper they take advantage of Map Reduce methods to scale Machine Learning algorithms. As we had read from Boyles discussion that summation is an integral part of  Statistical Query models, this paper too talks about taking advantage of summation forms that can be performed on multiple cpus using Map-Reduce methods. They show how this can be done for a variety of learning algorithms such as locally weighted linear regression, k-means, logistic regression, naive Bayes, principal component analysis, independent component analysis, Expectation Maximization, Support Vector Machine etc. Most of these algorithms involve a summation.  When an algorithm computes a sum over the data, the calculation can be distributed over multiple cores. The data is divided into as many pieces as there are cores. Each cpu computes a partial result over its local data. The results are then aggregated. In a cluster framework, a master breaks down the data to several mappers and there is one reducer that aggregates the results. This is therefore scalable horizontally to arbitrary data size. Some mapper and reducer functions require additional scalar information from the algorithms. In order to support these operations, the mapper or reducer can get this information  from the query _info interface. In addituon, this can be customized for each algorithm. Moreover, if some algorithms require feedback from previous operations, the cycle of map-reduce can be repeated over and over again.
#codingexercise
Given a dictionary with limited words. Check if the string given to you is a composite of two words which are already present in the dictionary.

bool IsComposite(List<string> dictionary, string word)
{
for (int i=1; i<dictionary.Length-1; i++)
{
     var word1 = word.Substring(0, i);
     var word2 = word.Substring(i);
     if (dictionary.contains(word1) && dictionary.contains(word2))
     {
         return true;
      }
}
return false;
}

Sunday, November 8, 2015

 Find sum of all numbers that are formed from root to leaf path (code) expected time complexity O(n)
void DFSSearch(Node root, ref List<List<Int>> listOfRootToLeafPaths, ref List<int> curList)
{
 if (root != null)
  {
      if (curList == null) curList = new List<int>();
      curList.Add(root.value);
      If (root.left == null && root.right==null){
      listOfRootToLeafPaths.Add(curList);
      }
      DFSSearch(root.left, ref listOfRootToLeafPaths, ref curList);
      DFSSearch(root.right, ref listOfRootToLeafPaths, ref curList);
      curList.RemoveAt(curList.Length - 1); 
}
}


Combinations of  a string 
Void Combine (List<int> a, List<int> b, int start, int level, ref int Sum) 
{ 
   For (int I  = start ; I < a.Lengthi++) 
   {  
       b[level] = a[i]; 
       Sum +=  b.ToNumber(); // digits multiplied by place order of tens
       If (I < a.Length) 
            Combine(a,b, start+1,level+1) 
       B.RemoveAt(B.Length - 1);
   } 
} 

Each combination can also have different permutations that can be added to Sum
Void Permute (List<int> a, List<int> b, bool[] used, ref Sum) 
{ 
  If ( b.Length == a.Length { Sum +=  b.ToNumber(); return;} 
  For (int I = 0; I < a.Length ; i++) 
  { 
    If (used[i]) continue; 
     used[i] = true; 
     B += A[i]; 
     Permute(a, b, used, ref Sum); 
     B.RemoveAt(B.Length - 1);
     used[i] = false; 
  } 
} 


Saturday, November 7, 2015

Machine Learning tools on NoSQL databases 
MapReduce is inherently suited for operations on BigData. And Machine Learning statistical methods love more and more data. Naturally a programmer will look to implementing such methods on BigData using MapReduce 
What is Map-Reduce ? 
MapReduce is an arrangement of tasks that enable relatively easy scaling.  It include: 
  1. Hardware arrangement – nodes as part of cluster that can communicate and process parallel 
  1. File System – that provides distributed storage across multiple disks 
  1. Software processes that run on various cpu in the assembly. 
  1. Controller – that manages mapper and reducer tasks 
  1. Mapper – that assigns identical tasks to multiple cpus for each to run over its local data 
  1. Reducer – aggregates output from several  mappers to form end product 
Mappers emit a key-value pair 
     Controllers sort key-value pairs by key 
     Reducers get pairs grouped by key 
The map-reduce is the only task that the programmer will need to write and the rest of the chores are handled by the platform. 
Map-reduce can be written in python or javascript depending on the NoSQL database and associated technology. For example , the following databases support Map-Reduce operations: 
MongoDB – this provides a mapReduce database command 
Riak – This comes with an Erlang shell that can provide this functionality 
CouchDB – this comes with document identifiers that can be used with simple Javascript functions to MapReduce 
Machine Learning methods that involve statistics usually have a summation over some expressions or term components. For example, a least squares regression requires to compute the sum of the squared residuals and tries to keep it minimum.  
This suits map reduce very well because the mappers can find the partial sums of squared residuals while the reducer can merely aggregate the partial sums into totals to complete the calculation. Many algorithms can be arranged in Statistical Query Model form. In some cases, iteration is required. Each iterative step involves a map-reduce sequence. 
Let us now look at clustering techniques and how to fit it on map-reduce
For K-means clustering that proceeds this way:
 Initialize :
            Pick K starting guesses for Centroid at random
 Iterate:
            Assign points to cluster whose centroid is closest
            Calculate cluster centroids
The corresponding map-reduce is as follows:
 Mapper - run through local data and for each point, determine closest centroid. Accumulate the vector sum of points closest to each centroid. Combiner emits centroid as key and tuple of partial sum of set and set index
Reducer - for each old centroid, aggregate sum and n from all mappers and calculate new centroid.
Courtesy : Michael Bowles PhD introduction to BigData