Tuesday, November 17, 2015

Previously we discussed using computations that don’t span the entire data all at once albeit in parallel but compute based on as and when data is made available. We took an example of computing mean this way.  
Let us take an example of the summation form to compute the mean 
For example if we take 3,1,5, 7, 9  the mean is 25/5 = 5 
If we were to add 10,15,17,19,24, the mean would now become 85  + 25 / 10 = 11 
We could arrive at this by adding 85 /5  = 17 and taking the sum (5 + 17) / 2 for two equal ranges otherwise we take the weights based on their element count = 11 
In this case we are reusing the previous computation of the data and we don't have to revisit that data again. We make summarized information already which we adjust based on new data. In the case the summary from the first data set is the result of the computation from the first data set. 
Now let we can generalize this technique to be applied to all summation forms. 
In the summation form, the data is already chunked into subgroups and then the computation is performed. The reducer merely aggregates the results. In other words, the data doesn’t overlap so it is easier to aggregate the results.  In a streaming mode for the data when it becomes available the same computations can be done as and when the non-overlapping data arrives. Therefore the summary from each data set now need to be combined and since they represent non-overlapping data, it becomes easier to combine since each summary will contribute independently to the overall picture and the reducer in this case will merely applying smoothing of the aggregated results.  
For example in a linear regression, the lines may be drawn in chunks across different data as they arrive and then the reducer merely has to sort the summaries and apply a generalized line across the data. In this case it can take the beginning and the end points of the pieces to draw the line or find one that matches most of the slopes. Thus it is possible to aggregate results on non-overlapping data 
For the overlapping data however, the summaries are not trivial to combine because the previous summary may be completely overwritten. However, we look at the datapoints that are new in the current summary and calculate their affect on the previous slope. Since the new data points may be scattered over and beyond the previous data points, it should be easy to determine whether they affect the previous slope uniformly or non-uniformly. For the uniform case, there may not need to be a correction as in a change of the gradient and may at most require a displacement of the regression line. For the non-uniform case, depending on which end has more of the newer data points assuming all points contribute equally, a clockwise or anticlockwise rotation may be performed to get the correction to the previous slope. Thus we have an algorithm where we use the previous summary to have created the current summary. 
Furthermore, if there are two data point blocks that have a partial overlap, then they are split into two non-overlapping and one overlapping regions The regression line for the non-overlapping regions is preserved. The regression lines from the two contributing blocks in the overlap can merely be replaced by a line that is equidistant from both and passing through their intersection. In other words, summaries are additive too.  
Since we already mentioned that summation forms are all applicable this way, and now with the summaries being additive too, we have expanded the kind of algorithms that can be written this way. 

Monday, November 16, 2015

#codingquestion
List of stations and distances between them are given and find all pairs shortest distance.
Solution:
A straightforward implementation could be Floyd Warshall algorithm. This algorithm computes the shortest path weights in a bottom-up manner. It exploits the relationship between a pair of intermediary vertices and the shortest paths that pass through them. If there is no intermediary vertex, then such a path has at most one edge and the weight of the edge is the minimum. Otherwise, the minimum weight is the minimum of the path from I to j or the path from I to k and k to j. Thus this algorithm iterates for each of the intermediary vertices for each of the given input of an N*N matrix to compute the shortest path weight.
Algorithm Warshall(A[1..n, 1..n])
{
   R-0  is initialized with the adjacency matrix ; // R-0 is the node to itself.
   for k = 1  to n do
      for i = 1 to n do
        for j = 1 to n do
            R-k[i, j] = R-(k-1) [i, j] OR
                             (R-(k-1) [i, k] AND  R-(k-1) [k,j])
  return R-(n)

}
Another method could be to use the Bellman Ford algorithm to find the shortest pair distance between two vertices and also to find all paths.
      public static bool GetShortestPath(int[,] graph, int numVertex, int start, ref List<int> distances, ref List<int> parents)
        {
            // initialize Single Source
            for (int i = 0; i < numVertex; i++)
            {
                distances[i] = int.MaxValue;
                parents[i] = -1;
            }

            distances[start] = 0;

            var allEdges = GetAllEdges(graph, numVertex);
            for (int k = 0; k < numVertex - 1; k++)
            {
                for (int i = 0; i < allEdges.Count; i++)
                {
                    // relax
                    int sum = distances[allEdges[i].Item1] == int.MaxValue ?
                        distances[allEdges[i].Item1] :
                        distances[allEdges[i].Item1] + allEdges[i].Item3;
                    if (distances[allEdges[i].Item2] > sum)
                    {
                        distances[allEdges[i].Item2] = distances[allEdges[i].Item1] + allEdges[i].Item3;
                        parents[allEdges[i].Item2] = allEdges[i].Item1;
                    }
                }
            }

            for (int i = 0; i < allEdges.Count; i++)
            {
                if (distances[allEdges[i].Item2] > distances[allEdges[i].Item1] + allEdges[i].Item3)
                {
                    return false; // cycle exists;
                }
            }

            return true;
        }
        public static void GetAllPaths(int[,] graph, int numVertex, int source, int destination, int threshold, ref List<int> candidatePath, ref List<int> candidateDist, ref List<List<int>> pathList, ref List<List<int>> distanceList)
        {
            if (candidatePath.Count > threshold) return;
         
            var path = new List<int>();
            var distances = new List<int>();
            GetOutboundEdges(graph, numVertex, source, ref path, ref distances);
            if (path.Contains(destination) && (candidatePath.Count == 0  || (candidatePath.Count > 0 && candidatePath.Last() != destination)))
            {
                candidatePath.Add(destination);
                candidateDist.Add(distances[path.IndexOf(destination)]);
                if (pathList.Contains(candidatePath) == false)
                    pathList.Add(new List<int>(candidatePath));
                if (distanceList.Contains(candidateDist) == false)
                    distanceList.Add(new List<int>(candidateDist));
                candidatePath.RemoveAt(candidatePath.Count - 1);
                candidateDist.RemoveAt(candidateDist.Count - 1);
            }

            for (int i = 1; i < path.Count; i++)
            {
                // if (i != source)
                {
                    candidatePath.Add(path[i]);
                    candidateDist.Add(distances[i]);
                    GetAllPaths(graph, numVertex, path[i], destination, threshold, ref candidatePath, ref candidateDist, ref pathList, ref distanceList);
                    candidatePath.RemoveAt(candidatePath.Count - 1);
                    candidateDist.RemoveAt(candidateDist.Count - 1);
                }
            }

        }

Sunday, November 15, 2015

Parallel and Concurrent Programming in NoSQL databases.

In NoSQL databases, the preferred method of parallelizing tasks is the MapReduce algorithm. It is a simple scatter gather programming model that enables all mappers to behave the same on their respective localized subgroups of data and a reducer that can aggregate the data from all the mappers. This applies well to algorithms  that calculate sums over the entire data range.
However, we look at dataflow parallelism that is not as strict or simplistic and enables even more algorithms to be applied to Big Data.  In Dataflow processing ,  there is a foundation for message passing and parallelizing CPU intensive and I/O intensive applications that have high throughput and low latency. It also gives you explicit control over how data is buffered and moved around the system While traditionally, we required callbacks and synchronization objects, such as locks, to coordinate tasks and access to shared data, by using this programming model, we can create data flow objects that process the data as they become available.  You only declare how the data is handled when it becomes available and also any dependencies between the data. Requirements to synchronize the shared data and other maintenance activities are common across algorithms and can be facilitated with a library. Moreover, this library can schedule asynchronous arrival of data, so tthroughput and latency can be improved. This kind of stream processing enables algorithms to be applied that are difficult to be expressed in Summation Form.
This framework consists of dataflow blocks which are data structures that buffer and process data. There are three kinds of dataflow blocks. Source blocks, target blocks and propagator blocks. A source block acts as a  source of data and can be read from. A target block acts as both a source block and a target block, and can be read from and written to. Interfaces can be defined separate for sources, targets and propagators. Propagator interface should inherit from both source and target. By implementing the interfaces, we define dataflow blocks that form a pipeline which are linear sequences of dataflow blocks or networks which are graphs of dataflow blocks. A pipeline is a simple case of a network and one where a source asynchronously propagates data to targets.  As more and more data blocks are added and removed, the blocks handle all thread safety operations of linking and unlinking. Just like protocol handlers, blocks can choose to accept or reject message as in a filtering mechanism. This enables explicit data blocks to be earmarked for processing
This programming model  uses the concept of message passing where independent  components of a program communicate with one another by sending messages. One way to propagate messages is to call say a post method which acts asynchronously. Source blocks offer data to target blocks by calling the OfferMessage method. The target block responds to an offered message returning either an Accepted, Declined or Deferred status. When the target block requires the message, it calls ConsumeMessage or calls ReleaseReservation when it defers a  message.
Dataflow blocks also support the concept of completion.  A dataflow block that is in the completed state does not perform any further work. Each dataflow block has an associated task  and it represents the completion status of the block. Because we can wait for a task to finish, we can wait for the terminal nodes of a dataflow network to finish. Tasks can throw an exception or return an error on failures thus denoting success otherwise. As with most tasks, dataflow blocks can be paused, resumed or canceled thus enabling robustness and reliability.

Some dataflow blocks can be predefined – for example we can have buffering blocks, execution blocks and grouping blocks which have now relaxed the count of mappers and reducers in the conventional way of using MapReduce algorithms. The degree of parallelism can also be explicitly specified for these dataflow blocks or it can be handled automatically.

Let us take an example of the summation form to compute the mean
For example if we take 3,1,5, 7, 9  the mean is 25/5 = 5
If we were to add 10,15,17,19,24, the mean would now become 85  + 25 / 10 = 11
We could arrive at this by adding 85 /5  = 17
and taking the sum (5 + 17) / 2 for two equal ranges otherwise we take the weights based on their element count = 11
In this case we are reusing the previous computation of the data and we don't have to revisit that data again. 

Saturday, November 14, 2015

Let us look at applying GLMNet algorithm to big data using MapReduce in continuation of our discussion of previous arguments.
This is one of the popular algorithms for Machine Learning. As with most regression analysis, more data is better. But this algorithm can work even when we have fewer data because it gives methods to 1) determine if we are overfitting and
2) hedge our answers if we are overfitting.
With ordinary least squares regression, objective is to minimize the sum squared error between actual values and our linear approximation. Coefficient shrinkage methods add a penalty on coefficients.
It yields a family of regression solutions - from completely insensitive to input data to unconstrained ordinary least squares.
First we build a Regularized Regression. We want to eliminate over-fitting because it tunes the model and it gives best performance on held out data.
In this step in order to avoid over-fitting, we  have to exert control over degrees of freedom in regression. We cut back on attributes when we make a subset selection  and we penalize regression coefficients with coefficient shrinkage, ridge regression and lasso regression.
With coefficient shrinkage, different penalties give solutions with different properties.
With more than one attribute the coefficient penalty function has important effects on solutions. Some choices of coefficient penalty give control over sparseness of the solutions. Two types of co-efficient penalty are most frequently used: sum of squared coefficients and sum of absolute value of coefficients. GLMNet algorithm incorporates ElasticNet penalty which is a weighted combination of sum squares and sum of absolute values.
Courtesy : Michael Bowles on applying GLMNet to MapReduce and PBWorks for GLMNet.
What I find interesting in these discussions it the use of a coefficient shrinkage, a somewhat complicating factor,  which seems to improve an otherwise simple technique of regression. Similarly 'summation form' lends itself to applying on MapReduce because different mappers can compute those sums locally for their group and reducer can aggregate them and is inherently simple However we could achieve even more sophisticated MapReduce beyond the summation form to incorporate more algorithms based on other techniques of parallelization. Such techniques include not only partitioning data and computations but also time-slicing. One such example is to parallelize iterations. Currently only a reducer does iterations based on the aggregation of previous iteration variables from mappers but if this could be pushed down to  mappers where the reducer may aggregate not just by sum but by other functions such as max, min or count.

Friday, November 13, 2015

Extending NoSQL Databases with User Defined document types and key values
The structure and operations on the NoSQL databases facilitate querying against values. For example, if we have documents in XML, we can translate them to JSON and store as documents or key-values in the NoSQL databases. Then a query might look like the following:
Db.inventory.find(  {  type: “snacks” } )
This will return all the documents where the type field is “snacks”.  The corresponding map-reduce function may look like this:
Db.inventory.mapReduce(
       Map à function() {emit(this.id, this.calories);},
       Reduce à function(key, values) { return Array.sum(values) },
                      {
      query à  query: {type: “snacks”},
       output à out: “snack_calories”
                  }
                )
This works well for json data types and values. However, we are not restricted to the builtin types. We can extend the key values with user defined types and values. They will just be marked differently from the builtin types. When the mapper encounters data like this, it loads the associated code to interpret the user types and values. The code applies the same query operators such as equality and comparision against values that the mapper would have done if it were in native JSON format. This delegation of interpretation and execution allows the NoSQL databases to be extended in forms such as computed keys and computed values.
Let us take the above example where the calories have to be computed from ingredients.
In this case, the code would look like the following
Function (ingredients, calories){ 
var total_calories = 0;
ingredients.forEach(ingredient, index, ingredients){
                total_calories += calories[index];
}
Return total_calories;
}
While this logic for computed key –values can be written outside the database as map-reduce jobs, this logic can stay as close to the data it operates on and consequently be stored in the database.
Moreover logic can be expressed with different runtimes and each runtime can be loaded and unloaded to execute the logic.
One advantage of having a schema for some of the data is that it brings you the seamless use of structured queries to these specific data. As an example, we can even use XML data itself given the XPath queries that can be run on them. Although we will load an XML parsing runtime for this data, it will behave the same as other data types for the overall Map-Reduce.
Another example of user defined datatype is tuples. Tuples are easier to understand both in terms of representation and search. Let us use an example here:
We have a tuple called ‘Alias’ for data about a person. This tuple consists of (known_alias, use_always, alternate_alias). The first part is text, the second Boolean and the third is a map<text, text>
The person data consists of id, name, friends and status.
We could still insert data into the person using JSON as follows:
[{"id":"1","name":"{"firstname":"Berenguer", "surname": "Blasi", "alias_data":{"know_alias":"Bereng", "use_alias_always":true}}", "friends":"[{"firstname":"Sergio", "surname": "Bossa"}, {"firstname":"Maciej", "surname": "Zasada"}]"}]'
However, when we search we can explicitly use the fields of the type as native as those of the JSON.
There are standard query operators of where, select, join, intersect, distinct, contains, SequenceEqual that we can apply to these tuples.
The reason tuples become easier to understand is that each field can be dot notation qualified and the entire data can be exploded into their individual fields with this notation as follows:
<fields>
<field indexed="true" multiValued="false" name="id" stored="true" type="StrField"/>
<field indexed="true" multiValued="true" name="friends" stored="true" type="TupleField"/>
<field indexed="true" multiValued="false" name="friends.firstname" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.surname" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.alias_data" stored="true" type="TupleField"/>
<field indexed="true" multiValued="false" name="friends.alias_data.known_alias" stored="true" type="TextField"/>
<field indexed="true" multiValued="false" name="friends.alias_data.use_alias_always" stored="true" type="BoolField"/>
:
:
</fields>
The above example is taken from Datastax and it serves to highlight the seamless integration of tuples or User Defined types in NoSql databases.

In addition, Tuples/UDTs are read/written in one single block, not on a per field basis, so they are read as a single block read and write. Tuples/ UDTs can also participate in a map like data model although the are not exactly map values. For example, a collection of tuples/UDTs have a type field that represent what would have been the map key. We just have to declare a UDT type that includes tuples as well and for this UDT we specify the search the same way as in a map like data model but using dot notations for the fields . For example we can search with {!UDT}alias.type:Bereng AND alias.use_alias_always:True

#coding question
Determine the maximum gradient in a sequence of sorted numbers
Int GetMaxGradient (int [] numbers)
{
   Int max = 0;
   For (int I = 1; I < numbers.length; i++){
     Int grad = Math.abs (numbers [i] - numbers [i-1]);
If (grad > max) max = grad;
    }
Return max;
}

Tuesday, November 10, 2015

Let us look at  a few examples of using summation form to a few chosen algorithms that were selected because of their popularity with NIPS. The summation form is one that is suitable for MapReduce jobs.
1) Locally Weighted Linear Regression (LWLR) This one is solved by solving the normal equations A.theta = b where A is the matrix formed by summing components. Each component is a weighted combination of the training vector and its transpose. The term on the right hand side of the normal equation is also a summation of components but one consisting of the weighted combination of training vector and training label. Since there are two different kinds of summation, one set of mappers compute one kind and another set of mappers compute another kind. Two reducers respectively sum up the partial values for A and b, and the algorithm finally computes the solution by calculating A-inverse on b. If the weights are ignored, this problem reduces to the case of ordinary least squares.
2) Naive Bayes - we have to estimate conditional probabilities here. One that involves either the condition that a  support label occurs or another that it does not occur and calculate the probabilities for the support vector with these conditions. In order to do so on MapReduce, a summation is done over the support vector to be k for each support label in the training data to calculate the probability that the support of a support vector given a support label. Therefore in this case we need different kinds of mappers - one to compute the subgroup for the support vector to be k when the support label doesn't, another to calculate the subgroup for the support vector to be k when the support label occurs,  and another for subgroup all when the support label doesn't occur and another for the subgroup for all when the support label does occur. Notice that these four categories match precision and recall definitions. The reducer then sums up intermediate results to get the final results for the parameters.
3) Gaussian Discriminative Analysis -  The classic GDA algorithm needs to learn the following four statistics - probability that a support label will occur, mean with a support label, mean without a support label and a summation that the data occurs with the condition of the chosen support label.  This method uses the normal distribution of each class / label. The normal distribution is usually represented by the mean and standard deviation for the class. For each label we compute that the data belongs to it. The class with the highest probability is chosen. Consequently, the mappers are different for each of these subgroups.Finally one reducer aggregates the intermediate sums and calculate the final result for the parameters.
4) K-means Here we know how many clusters we want. Therefore, we can divide the dataset into that many subgroups. Each mapper runs through local data, determines the closest centroid and accumulates the vector sum of points closest to each centroid. These are emitted as key and tuple of partial sum of set and set index. Reducer aggregates sum from all mappers and calculates new centroid.
5) Logistic Regression (LR). In this algorithm, the formula that lends itself to MapReduce is one described as (1/1 + exp(-Theta-transposed times x)). Learning is done by fitting theta to the training data where the likelihood function can be optimized. In each iteration of the algorithm, the theta is improved. This is done with multiple mappers that sum up subgroups of partitioned data. The iterations also take previous gradient computed for improving the theta and use it in the current iteration. The reducer sums up the values for the gradient and hessian to perform the update for theta.
6) Neural Network: This uses a strategy called backpropagation.  Each mapper propagates its set of data through the network.  A three layer network is used for this purpose with two output neurons classifying the data into two categories. Each of the weights in the network will be adjusted by backpropagating the error that is used to calculate the partial gradient for each of the weights in the network. The reducer then sums the partial gradient from each mapper and does a batch gradient descent to update the weights of the network.
7) Principal Component . This algorithm computes the principal eigenvectors of the covariance matrix which is expressed with the first term as 1/m times the summation of the support vector and support vector transposed minus the second term comprising of  mean and mean transposed. The latter term can also be expressed as summation form. Therefore the covariance matrix is all in summation form. The sums can be mapped to different cores and the reducer can aggregate them.
8) Independent component analysis -  This is a method where the observed data is assumed to be linearly transformed from the source data and tries to identify the independent source vectors. So the goal is to find the unmixing matrix W, Since the transformations are linear, we can use batch gradient descent  to optimize the W's likelihood. Each mapper therefore independently calculates the unmixing matrix and they are aggregated in the reducer.
#coding exercise
How to know which side of a sorted array has a steeper gradient
void PrintSideWithSteeperGradient(int[] numbers)
{
   if (numbers.length <= 0) {console.writeline("None"); return;}
   if (numbers.length == 1) {console.writeline("Both"); return;}
   int leftstart = 0;
   int leftend = numbers.length/2-1;
   if (numbers.length %2 != 0) { leftend = numbers.length/2;}
   int rightstart = numbers.length/2;
   int rightend = numbers.length -1;
   double leftgradient = 0;
   double rightgradient = 0;
   if (leftstart == leftend) leftgradient = numbers[leftstart];
   if (rightstart == rightend) rightgradient = numbers[rightend];
   if (leftstart < leftend) leftgradient = (numbers[leftend] - numbers[leftstart])/(leftend-leftstart);
   if (rightstart < rightend) rightgradient = (numbers[rightend] - numbers[rightstart])/(rightend-rightstart);
   if (leftgradient == rightgradient) Console.WriteLine("Both");
   else if (leftgradient > rightgradient) Console.WriteLine("Left");
   else Console.WriteLine("Right");
}


We return to the discussion of applying machine learning theorems with mapreduce

In expectation maximization  the expected pseudo count number is calculated and this is used to update the probability, mean and sum from the training data. The probability  is computed on subgroups by having each mapper compute the expected pseudo count for its subgroup and the reducer aggregating it and dividing it by m. To calculate mean, each mapper computes the partial sum of  the expected pseudo weight and the support  vector as well as the sum of the pseudo counts. The reducer then aggregates the partial sums and divides it. For the summation, each mapper calculates the subgroup sum of the expected pseudo count times the difference of the support vector from the mean times this difference transposed. As for the mean, the sum of expected pseudo counts
for the local subgroup is also turned in by the mapper. The reducer then sums up the partial result and divides them.

In Support Vector Machine - the linear SVMs primary goal is to optimize the minimum square of weight vector and a constant times the slack variable for either hinge loss or quadratic loss. such that the training vector times the displaced support vector is greater than 1 - slack variable. These are usually written as summation forms of delta and hessian. The mappers will calculate the partial gradients  and the reducer will sum up the partial results to update the weight vector.

Monday, November 9, 2015

Today we start reviewing the paper Map-Reduce for Machine Learning on Multicore by Chu et al. In this paper they take advantage of Map Reduce methods to scale Machine Learning algorithms. As we had read from Boyles discussion that summation is an integral part of  Statistical Query models, this paper too talks about taking advantage of summation forms that can be performed on multiple cpus using Map-Reduce methods. They show how this can be done for a variety of learning algorithms such as locally weighted linear regression, k-means, logistic regression, naive Bayes, principal component analysis, independent component analysis, Expectation Maximization, Support Vector Machine etc. Most of these algorithms involve a summation.  When an algorithm computes a sum over the data, the calculation can be distributed over multiple cores. The data is divided into as many pieces as there are cores. Each cpu computes a partial result over its local data. The results are then aggregated. In a cluster framework, a master breaks down the data to several mappers and there is one reducer that aggregates the results. This is therefore scalable horizontally to arbitrary data size. Some mapper and reducer functions require additional scalar information from the algorithms. In order to support these operations, the mapper or reducer can get this information  from the query _info interface. In addituon, this can be customized for each algorithm. Moreover, if some algorithms require feedback from previous operations, the cycle of map-reduce can be repeated over and over again.
#codingexercise
Given a dictionary with limited words. Check if the string given to you is a composite of two words which are already present in the dictionary.

bool IsComposite(List<string> dictionary, string word)
{
for (int i=1; i<dictionary.Length-1; i++)
{
     var word1 = word.Substring(0, i);
     var word2 = word.Substring(i);
     if (dictionary.contains(word1) && dictionary.contains(word2))
     {
         return true;
      }
}
return false;
}