December 17, 2013

How does Hadoop realize memory computing with esProc

The low efficiency of Hadoop computation is an undeniable truth. We believe, one of the major reasons is that the underlying computational structure of MapReduce for Hadoop is basically of the external memory computation. The external memory computation implements the data exchange through the frequent external memory read/write. Because the efficiency of file I/O is two orders of magnitude lower than that of memory, the computational performance of Hadoop is unlikely high.

While for the normal users, they usually have a small size of cluster with only tens or scores of nodes. The cluster environment is relatively reliable and the fault probability is very low. Moreover, most realtime computations would complete quite quickly in each run. Users can always choose to recompute even on errors, not having to consider much about the fault tolerance during computation. In this case, using esProc and alike parallel computing scheme to offer double supports for both in- and external memory computations is a better choice. esProc is also based on Hadoop whose in-memory computing can be utilized for the middle-and-small scale cluster to have a much higher performance.

In the example below, we will use a typical example for grouping to illustrate how esProc implements the Hadoop memory computation. The computational goal is to summarize the sales amount on the order list by place of origin. The data are from two files on HDFS: In the sales.txt, there are a great volume of order data. The major fields are: orderID, product (product ID), amount (order value); The product.txt generates fewer data, and the main fields include proID (product ID), and origin (product origin).

The intuitive solution is like this: On the summary machine, break up sales.txt into several sections, and each section for one task. Allocate these tasks to node machine for summarizing in groups. Once the computation is done on the node machine, the result will be returned to summary machine for grouping and summarizing for a second time. The node machine is to associate sales.txt with product.txt for associative computation, and then group by origin.

esProc code is shown below:
Code1: The task decomposing and summerizing (summary machine)

Code2: Generate global variable for product table (node machine)

Code3: Associate computation and summarize by place of origion (node machine)

As can be seen, esProc coding follows the “intuitive train of thoughts” for computation. Each procedure is implemented concisely and smoothly. Most importantly, esProc has a simple structure with no hidden specifics. The actual computation is carried out step by step strictly following the code. To optimize the codes, users can modify the code in each step easily. For example: Change the granularity of task-decomposing and specify the node machine for computation.
In the below section, we will discuss the four sections of task-decomposing, data exchange, in-memory computation, and memory sharing.

Task decomposing:
As can be seen from code 1, the sales.txt can be decomposed into 40 tasks according to the computational capability of node machine, with about 1 million data for each task. With esProc, the task-decomposing scale can be customized according to their practical computing environment. For example, if the node machine is a high performance PC Server, then the 100 million pieces of data can be processsd in 10 shares, and each node for 10 million data. If the computational node is an obsolete and outdated notebook, then the data can be processed in one thousand shares, and each node can process ten thousands pieces of data. The ability to adjust the task-decomposing granularity freely will save the cost of scheduling for task-decomposing, and boost the computational performace dramatically.
By default, MapReduce decomposes the task to the minimum gratuity to address the destabilizing factor in the large scale cluster environment. Each Map task will process one record. Although the infranstructure can be modified for granularity customization, the coding is difficult and is not practical. Data decomposing by this way can address the fault at relatively less cost. However, the scheduling cost for task-decomposing is relatively great.

In the code3, once the computation is done in the node machine, the result is not written into any file or sent back to the summary machine. Instead, the data exchange is done on the node machine directly. esProc is a scripting language allowing users to strike a balance between the security and the performance. For those who care more for the security on the intermediate result, esProc allows them to write the data to HDFS and then exchange; For those who care more for data exchange performance, esProc allows them to exchange the data directly.
In MapReduce, the data exchange must be done through the files to ensure the safety of intermediate result. Even if the node machine is broken down, the completed result will not disappear. In an environment of large cluster, the node machine may easily encounter such fault, which justfy this method. However, file exchanging data will definitly cause a large deal of disk IO operations and the computational performances will decline obviously.

Code2 is to read the two involved fields from the product table into the memory for computation all at once, as global variables on the node machine. Such in-memory sharing will save the time to retrieve the product table in each task. This is because every node machine will go through the computation for multiple rounds, and each round will perform the multi-threads/tasks computation. The smaller the node scale and the more computational tasks, and more obvious the performance increase will be.
MapReduce does not implement such memory sharing. That is because it is assumed that the computational node will crash freaquently in the environment of large cluster, and the data obtained from the crashed memory is meaningless. In this case, it is quite safe to use the HDFS file for sharing directly. MapReduce does not support the memory sharing. Each time, users must retrieve the data from hard disk before they can use the product table. So, its efficiency is two orders of magnitude worse.

As can be seen from the code 3, the product table is the global variable retrieved from the memory directly while the order table is still too great to read into memory. We use cursor to access. By this way, the efficient memory associative computation is achieved. Needless to say, if proceeding with the decomposing task, the section of order table can also be loaded into the memory. As can be seen, esProc allows for the arbitrary way to load the data. Both the file curosr method of relatively great data volume but slow spead, and the in-memory loading method for data of small volume but faster speed are enabled.
For MapReduce, the default external memory computation is to retrieve the data from file for associative computation and grouping computation. It is quite good to handle the unstable large-scale cluster enviroment. Although the memory buffer technique is adopted at the underlyer for MapReduce, it makes no difference on its poor performance because it is still heaviliy relies on the disk IO to the core. To change to the in-memory computation, users need to change the native infrastructure of MapReduce at the great development cost.

Judging from the four aspects above, we can conclude that esProc can efficiently implement the memory computation for Hadoop, and is suitable for users of middle and small scale cluster.