January 27, 2014

Parallel Computing and Columnar Storage in one program language

The columnar storage is good, especially when there are lots of tabular fields (this is quite common). In querying, the data to traverse is far less than that on the row storage. Less data to traverse brings less I/O workloads and higher query speed. However, the Hadoop application consumes most time on the hard disk I/O without columnar storage.

Both Hive and Impala support columnar storage, but columnar storage only available with the basic SQL interface. As for some more complex data computing, it seems quite difficult for the MapReduce framework to do columnar storage.

The sample data is a big data file sales.txt on the HDFS. This file has twenty fields, 100 million data entries, and the file size is 14G. Let’s take the GROUP computing for example to demonstrate the whole procedure. Concretely speaking, it is to summarize the total sales for everyone by the sales person. The algorithm involves two fields, empID and amount.

Firstly, compare it with the classic code for Grouping and Summarizing on Columnar Storage:


Code for summary machine:

main node












Code for node machine:

sub node







The above columnar storage algorithm follows such train of thought: Scheduling a large task into forty smaller tasks according to the file bytes; distribute them to the node machine for the initial summarization; and then further secondary summarize on the summary machine. This train of thoughts is similar to that for MapReduce. The difference is that this columnar storage thinking pattern is simple and intuitive because the task scale could be customized, and most people can understand it easily.

As can be seen, the row storage is to store all fields in the file in the form of fields. Therefore, no matter there are two or twenty fields, we still have to traverse all data - the file of 14 G. However, the columnar storage is not like this. The data of each field are stored as a file. If only two fields are stored in the query, then you only need to retrieve the corresponding file of these two fields, i.e. 1.4 G. Please note that the 1.4 G is an average value. The total volume of the data of the two fields is slightly higher than 1.4 G.

As mentioned above, to implement the columnar storage, we must decompose the file field by field first, by using f.cursor() to retrieve first and then f.export() to export. The detailed procedure about columnar storage will be explained at the end of this article.

Code for Grouping and summarizing once Columnar Storage is adopted:

Code for summary machine:

main node









Code for node machine:

sub node









As can be seen, the greatest change is the code for node machine. The A3 cell uses the [file list]. Use the cursor() to merge a group of field files to generate a cursor of 2 dimension table. The remaining codes for grouping and summarizing are just the same as before.

Another change is about the task-scheduling. In fact, great trouble can be caused in this stage. As we know, in the row storage, the task-scheduling can be performed based on the number of byes. For example, a 14 G file can be divided into 40 segments evenly, and each segment will be assigned to a node machine as a task. But this method does not work in the columnar storage because of the record misalignment, and the result can be always wrong. The right method is to divide evenly by the number of records (Any suggestions on dividing it evenly, please leave a comment. Thanks.). 100 million entries can be allocated to 40 segments, and each segment will hold 2’500’000 entries.

For example, the empID column will be ultimately divided into 40 smaller files: empID1. dat”,”empID2. dat”……”empID40. dat”. Then, the algorithm codes are shown below:









In order to prevent the memory overflow, the above algorithm will retrieve 100’000 entries from cursor, and append to the file. By doing this, a new file will be created with every 2’500’000 entries of data. In which, A3 is the file count, C3 is used to monitor if the data entries in the current files reaches 2.5 M.

The data-scheduling is surprisingly troublesome, but the columnar computing after splitting is quite simple.

January 16, 2014

To write Hadoop code using agile program language

Hadoop is an outstanding parallel computing system whose default parallel computing mode is MapReduce. However, such parallel computing is not specially designed for parallel data computing. Plus, it is not an agile parallel computing program language, the coding efficiency for data computing is relatively low, and this parallel computing is even more difficult to compose the universal algorithm.

Regarding the agile program language and parallel computing, esProc and MapReduce are very similar in function.
        
Here is an example illustrating how to develop parallel computing in Hadoop with an agile program language. Take the common Group algorithm in MapReduce for example: According to the order data on HDFS, sum up the sales amount of sales person, and seek the top N salesman. In the example code of agile program language, the big data file fileName, fields-to-group groupField, fileds-to-summarizing sumField, syntax-for-summarizing method, and the top-N-list topN are all parameters. In esProc, the corresponding agile program language codes are shown below:
        
Agile program language code for summary machine:










Agile program language code for node machine:




        
How to perform the parallel data computing over big data? The most intuitive idea occurs to you would be: Decompose a task into several parallel segments to conduct parallel computing; distribute them to the unit machine to summarize initially; and then further summarize the summary machine for the second time.
        
From the above codes, we can see that esProc has parallel data computing into two categories: The respective codes for summary machine and node machine. The summary machine is responsible for task scheduling, distributing the task to every parallel computing node in the form of parameter to conduct parallel computing, and ultimately consolidating and summarizing the parallel computing results from parallel computing node machines. The node machines are used to get a segment of the whole data piece as specified by parameters, and then group and summarize the data of this segment.
        
Then, let’s discuss the above-mentioned parallel data computing codes in details.

Variable definition in parallel computing
        
As can be seen from the above parallel computing codes, esProc is the codes written in the cells. Each cell is represented with a unique combination of row ID and column ID. The variable is the cell name requiring no definition, for example, in the summary machine code:
n  A2: =40
n  A6: = ["192. 168. 1. 200: 8281","192. 168. 1. 201: 8281","192. 168. 1. 202: 8281","192. 168. 1. 203: 8281"]
A2 and A6 are just two variables representing the number of parallel computing tasks and the list of node machines respectively. The other agile program language codes can reference the variables with the cell name directly. For example, the A3, A4, and A5 all reference A2, and A7 references A6.
        
Since the variable is itself the cell name, the reference between cells is intuitive and convenient. Obviously, this parallel computing method allows for decomposing a great goal into several simple parallel computing steps, and achieving the ultimate goal by invoking progressively between steps. In the above codes: A8 makes references to A7, A9 references the A8, and A9 references A10. Each step is aimed to solve a small problem in parallel computing. Step by step, the parallel computing goal of this example is ultimately solved.

External parameter in parallel computing
        
In esProc, a parameter can be used as the normal parameter or macro. For example, in the agile program language code of summary machine, the fileName, groupField, sumField, and method are all external parameters:
n  A1: =file(fileName). size()
n  A7: =callx(“groupSub. dfx”,A5,A4,fileName,groupField,sumField,method;A6)
         They respectively have the below meanings:
n  filename, the name of big data file, for example, " hdfs: //192. 168. 1. 10/sales. txt"
n  groupField, fields to group, for example: empID
n  sumField, fields to summarize, for example: amount
n  parallel computing method, method for summarizing, for example: sum, min, max, and etc.
If enclosing parameter with ${}, then this enclosed parameter can be used as macro, for example, the piece of agile program language code from summary machine
n  A8: =A7. merge(${gruopField})    
n  A9: =A8. groups@o(${gruopField};${method}(Amount): sumAmount)
In this case, the macro will be interpreted as code by esProc to execute, instead of the normal parameters. The translated parallel computing codes can be:
n  A8: =A7. merge(empID)    
n  A9: =A8. groups@o(empID;sum(Amount): sumAmount)
        
Macro is one of the dynamic agile program languages. Compared with parameters, macro can be used directly in data computing as codes in a much more flexible way, and reused very easily.

Two-dimensional table in A10
Why A10 deserves special discussion? It is because A10 is a two-dimensional table. This type of tables is frequently used in our parallel data computing. There are two columns, representing the character string type and float type respectively. Its structure is like this:
     






In this parallel computing solution, the application of two-dimensional table itself indicates that esProc supports the dynamic data type. In other words, we can organize various types of data to one variable, not having to make any extra effort to specify it. The dynamic data type not only saves the effort of defining the data type, but is also convenient for its strong ability in expressing. In using the above two-dimensional table, you may find that using the dynamic data type for big data parallel computing would be more convenient.
        
Besides the two-dimensional table, the dynamic data type can also be array, for example, A3: =to(A2), A3 is an array whose value is [1,2,3…. . 40]. Needless to say, the simple values are more acceptable. I’ve verified the data of date, string, and integer types.
        
The dynamic data type must support the nested data structure. For example, the first member of array is a member, the second member is an array, and the third member is a two-dimensional table. This makes the dynamic data type ever more flexible.

Parallel computing functions for big data
In esProc, there are many functions that are aimed for the big data parallel computing, for example, the A3 in the above-mentioned codes: =to(A2), then it generates an array [1,2,3…. . 40].
        
Regarding this array, you can directly compute over each of its members without the loop statements, for example, A4: =A3. (long(~*A1/A2)). In this formula, the current member of A3 (represented with “~”) will be multiplied with A1, and then divided by A2. Suppose A1=20000000, then the computing result of A4 would be like this: [50000, 100000, 1500000, 2000000… 20000000]
        
The official name of such function is loop function, which is designed to make the agile program language more agile by reducing the loop statements.
        
The loop functions can be used to handle whatsoever big data parallel computing; even the two-dimensional tables from the database are also acceptable. For example, A8, A9, A10 - they are loop functions acting on the two dimensional table:
n  A8: =A7. merge(${gruopField})    
n  A9: =A8. groups@o(${gruopField};${method}(Amount): sumAmount)
n  A10: =A9. sort(sumAmount: -1). select(#<=10)

Parameters in the loop function
Check out the codes in A10: =A9. sort(sumAmount: -1). select(#<=10)
        
sort(sumAmount: -1) indicates to sort in reverse order by the sumAmount field of the two-dimensional table of A9. select(#<=10) indicates to filter the previous result of sorting, and filter out the records whose serial numbers (represented with #) are not greater than 10.
        
The parameters of these two parallel computing functions are not the fixed parameter value but parallel computing method. They can be formulas or functions. The usage of such parallel computing parameter is the parameter formula.
        
As can be seen here, the parameter formula is also more agile syntax program language. It makes the usage of parameters more flexible. The function calling is more convenient, and the workload of coding can be greatly reduced because of its parallel computing mechanism. 

From the above example, we can see that esProc can be used to write Hadoop with an agile program language with parallel computing.By doing so, the code maintenance cost is greatly reduced, and the code reuse and data migration would be ever more convenient and better performance with parallel computing mechanism.

Official web: http://www.raqsoft.com/

January 8, 2014

Hadoop code reuse and step by step simplify business computing

The MapReduce of Hadoop is a widely-used parallel computing framework. However, its code reuse mechanism is inconvenient, and it is quite cumbersome to pass parameters. Far different from our usual experience of calling the library function easily, I found both the coder and the caller must bear a sizable amount of precautions in mind when writing even a short pieces of program for calling by others.

However, we finally find that esProc could easily realize code reuse in hadoop. Still a simple and understandable example of grouping and summarizing, let’s check out a solution with not so great reusability. Suppose we need to group the big data of order (sales.txt) on HDFS by salesman (empID), and seek the corresponding sales amount of each Salesman. esProc codes are:

Code for summary machine:








Code for node machine:





esProc classifies the distributed computing into two categories: The respective codes for summary machine and node machine. The summary machine is responsible for task scheduling, distributing the task to every task in the form of parameter, and finally integrating and summarizing the computing results from node machines. The node machines are used to get a segment of the whole data piece as specified by parameters, and then group and summarize the data of this segment.

As can be seen, esProc code is intuitive and straightforward, just like the natural and common thinking patterns. The summary machine distributes a task into several segments; distributes them to the unit machine to summarize initially; and then further summarizes the summary machine for the second time. Another thing to note is the esProc grouping and summarizing function “groups”, which is used to perform the grouping action over the two-dimensional table A1 by empID and sum up the values of amount fields. The result will be renamed to the understandable totalAmount. This whole procedure of grouping and summarizing is quite concise and intuitive: A1.groups(empID;sum(amount): totalAmount)

In addition, the groups function can be applied to not only the small 2D table, but also the 2D table that is too great to be held in the memory. For example, the cursor mode is adopted for the above codes.

But there are some obvious defects in the above example: The reusability of code is not great. In the steps followed, we will rewrite the above example to a universal algorithm independent of any concrete business. It will be rewritten to control the code flow with parameters, so as to summarize whatsoever data file. In which, the task granularity can be scheduled into arbitrary number of segments, and the computing nodes can be specified at will. Then, the revised codes are shown below:

Code for summary machine. There are altogether 4 parameters defined here: fileName: Big data file to analyze; taskNumber: Number of tasks to distribute; groupField: Fields to group; sumField: Fields to summarize. In addition, the node machine is obtained via reading the profiles.








Code for node machine. In the revised codes, 4 variables are used to receive the parameter from summary machine. Besides the file starting and ending positions (start and end) from the first example, there are two newly-added fields. They are groupField: Fields to group; and sumField: Fields to summarize.





In esProc, it is much easier to pass and use parameter because users can implement the common grouping and summarizing with the least modification workload, and reuse the codes easily.

In Hadoop, the complicated business algorithm is mainly implemented by writing the MapReduce class. By comparison, it is much more inflexible to pass and use parameters in MapReduce. Though it is possible to implement a flexible algorithm independent of the concrete business, it is really cumbersome. Judging the Hadoop codes, the coupling degree of code and business is great. To pass the parameters, a global-variable-like mechanism is required, which is not only inconvenient but also hard to understand. That’s why so many questions about MapReduce parameter-passing are here and there on many Web pages. Lots of people feel confused about developing universal algorithms with MapReduce.

In addition, the default separator in the above codes is the comma. It is obvious that users only need to add a variable in a similar way to customize it to any more commonly-used symbol. With it, they can also implement the common action of data filtering and then grouping and summarizing easily. Please note the usage of parameter groupField. It is used as the character parameter in the cell A6, but the macro in A8. In other words, ${gruopField} can be resolved as the formula itself, instead of any parameter in the formula alone. This is the work of dynamic language. Therefore, esProc can realize the completely flexible code, for example, using the parameter to control the summary algorithm to perform sum up or just count, seek the average value or maximum.

“Macro” is a simple special case of dynamic language. esProc supports a more flexible and complete dynamic language system.

As you may find from the above example, esProc can implement Hadoop code reuse easily, and basically achieve the goal of “Write once, run anywhere!”. Needless to say, the development efficiency can be boosted dramatically.