December 30, 2013

Parallel computing in hadoop with esProc instead of MapReduce

Hadoop is an outstanding distributed computing system. However, many users find MapReduce is too complex and too tough to manage, the response is too slow, the realtime is too poor, and the running is inefficient. In a word, it is too heavy. They are users of middle-and-small-scale cluster. They do not need such heavyweight solution. In this case, using esProc to realize the lightweight data parallel computing for Hadoop would be one of the choices.

Let’s demonstrate how esProc realizes the lightweight parallel computing for Hadoop with a typical grouping computing example below. In this example, we summarize and count the order data from a file in HDFS to compute the sales value for each sales person. Because the data volume is too large for a single computer to process, we need to speed up by distributing the computing workload.

As the “most intuitive idea”, the distributed computing procedure is to decompose the big data into several segments. Each node is assigned with a segment of data for interior grouping and summarizing. Then, the summary machine collects and merges the data computing result from each node and carries out the secondary grouping and summarizing. esProc implements this hadoop parallel computing like this:
Code for summary machine: Decomposing tasks and secondary summary.


Code for computing node: Aggregate, group, and summarize in any segment.








If using one figure to illustrate the relation between codes on summary machine and node machine, then it should be like this:









As can be seen, esProc coding follows the “intuitive ideas” for computing. Each step is implemented concisely and smoothly. Most importantly, esProc has a simple structure with no hidden specifics. The actual computing is carried out step by step by strictly following the code flow. To optimize, users can easily modify the code during running in each step. For example: Change the granularity of task-decomposing and specify the node machine for computing.
In the below sections, let’s analyze the realtime, data exchange performance, maintenance and development of the above-mentioned codes.

Analysis on Realtime

With esProc, the task-decomposing scale can be customized according to their practical computing environment. For example, if the node machine is a high performance PC Server, then the 100 million pieces of data can be processed in 10 shares, with each node responsible for 10 million pieces of data. If the computing node is an obsolete and outdated notebook, then the data can be arranged to process in one thousand shares, and each node responsible for 10 thousand pieces of data. The ability to adjust the task decomposing granularity freely will dramatically lower the cost of scheduling for task decomposing. The realtime will thus be boosted.

The cluster computing has a certain failure rate as a result of network or node machine failure. The failure would lead to unavoidable resource-consuming re-computing. The smaller each task is, the fewer resources would be consumed for re-computing. However, decomposing a great job into smaller tasks is itself resource-consuming. The more you decomposing, the higher the scheduling cost and the poorer the realtime would be. They are two ends of the scale, and you cannot strike the balance.

MapReduce is designed for the large scale cluster. Each Map task is designed to process one data by default. Such mechanism of task allocating results from the complex infrastructure and is extremely difficult to change. Therefore, MapReduce costs more on scheduling, and people may think Hadoop is poor regarding its realtime. Users of the middle-and-small-scale cluster only need to handle fewer nodes which seldom fail and always provide the satisfactory uptime. Such users are more eager to customize the scale of task-decompose freely.

Analysis on Data Exchange Performance

In the above esProce codes, users not only have the choice to determine the granularity of task-decomposing, but also the method of data exchange. Users can choose to store the data into HDFS and then exchange, or just exchange directly. In the sample code, data is exchanged directly. By doing so, the data exchange time between nodes is significantly reduced, and the overall performance is improved effectively.

The node failure that occurs during computing can be solved by means of decomposing and breaking-down the task into smaller jobs. What if the failure occurs in the next round computing after the computing of a certain node is completed? MapReduce offers a rather simple solution: Write the result to HDFS, exchange the data with files, and no more depend on the legacy computers. However, this practice is obviously quite inefficient. Directly sending the data from memory to the summary machine is much faster. That’s why users of middle-and-small scale cluster would say the performance of Hadoop is poor. In a large scale cluster environment of high failure rate, such practice is necessary for effectively addressing the unstable cluster environment.

Users of middle-and-small scale cluster seldom experience cluster failures, and consider it desirable to choose a more flexible method to exchange the data: In order to minimize the required time to exchange data, the data is exchanged directly in most cases and file exchange is only performed when necessary.

Analysis on Maintenance and Development

From the above code, we can see: The structure of esProc is simple and thus can be deployed and managed at extremely low cost. Without the components in complex relations, few errors might occur, and the troubleshooting is quite easy if any error. Owing to the simple structure, the study and development procedure of esProc is also quite easy, and the code-debugging becomes quite intuitive. In fact, esProc offers the true script debugging and thus saves the efforts of checking logs for debugging in MapReduce.

In the great scale environment, MapReduce is designed to have a complex architecture for leveraging resources efficiently and performing the computing task and complex logic programming steadfastly and effectively. Such degree of complexity is very necessary in the cluster environment of hundreds or thousands of machines. However, this also brings about the complexity of deployment. Not only the great workload in maintenance and management, but also the increasing complexity to study and develop grows significantly. Besides, the complex structure will degrade the overall computing performance. Various components are very likely to encounter error, and troubleshooting is difficult. The complex structure compromises the flexibility of users and gives rise to the development cost. For example, MapReduce is designed to process one data each time by default. Although a modified MapReduce architecture can process multiple data at one time. Such modification is relatively hard to implement for it is time-consuming and only achievable for those with high technical competence.

Since there are only the relatively small number of node machines in an environment of the middle-and-small scale cluster, the machine is not so diversified, and the computing environment is stable. The complex computing architecture as such is thus not necessary to get the computing task run smoothly.

With all the above code analyses, we can reach the conclusion that: The simply structured database and big data parallel computing frameworks such as esProc enable users of middle-and-small scale cluster to take full advantage of the inexpensive scale-out feature of Hadoop for the highly efficient lightweight parallel computing solution.

December 17, 2013

How does Hadoop realize memory computing with esProc

The low efficiency of Hadoop computation is an undeniable truth. We believe, one of the major reasons is that the underlying computational structure of MapReduce for Hadoop is basically of the external memory computation. The external memory computation implements the data exchange through the frequent external memory read/write. Because the efficiency of file I/O is two orders of magnitude lower than that of memory, the computational performance of Hadoop is unlikely high.

While for the normal users, they usually have a small size of cluster with only tens or scores of nodes. The cluster environment is relatively reliable and the fault probability is very low. Moreover, most realtime computations would complete quite quickly in each run. Users can always choose to recompute even on errors, not having to consider much about the fault tolerance during computation. In this case, using esProc and alike parallel computing scheme to offer double supports for both in- and external memory computations is a better choice. esProc is also based on Hadoop whose in-memory computing can be utilized for the middle-and-small scale cluster to have a much higher performance.

In the example below, we will use a typical example for grouping to illustrate how esProc implements the Hadoop memory computation. The computational goal is to summarize the sales amount on the order list by place of origin. The data are from two files on HDFS: In the sales.txt, there are a great volume of order data. The major fields are: orderID, product (product ID), amount (order value); The product.txt generates fewer data, and the main fields include proID (product ID), and origin (product origin).

The intuitive solution is like this: On the summary machine, break up sales.txt into several sections, and each section for one task. Allocate these tasks to node machine for summarizing in groups. Once the computation is done on the node machine, the result will be returned to summary machine for grouping and summarizing for a second time. The node machine is to associate sales.txt with product.txt for associative computation, and then group by origin.

esProc code is shown below:
Code1: The task decomposing and summerizing (summary machine)















Code2: Generate global variable for product table (node machine)




Code3: Associate computation and summarize by place of origion (node machine)






As can be seen, esProc coding follows the “intuitive train of thoughts” for computation. Each procedure is implemented concisely and smoothly. Most importantly, esProc has a simple structure with no hidden specifics. The actual computation is carried out step by step strictly following the code. To optimize the codes, users can modify the code in each step easily. For example: Change the granularity of task-decomposing and specify the node machine for computation.
In the below section, we will discuss the four sections of task-decomposing, data exchange, in-memory computation, and memory sharing.

Task decomposing:
As can be seen from code 1, the sales.txt can be decomposed into 40 tasks according to the computational capability of node machine, with about 1 million data for each task. With esProc, the task-decomposing scale can be customized according to their practical computing environment. For example, if the node machine is a high performance PC Server, then the 100 million pieces of data can be processsd in 10 shares, and each node for 10 million data. If the computational node is an obsolete and outdated notebook, then the data can be processed in one thousand shares, and each node can process ten thousands pieces of data. The ability to adjust the task-decomposing granularity freely will save the cost of scheduling for task-decomposing, and boost the computational performace dramatically.
By default, MapReduce decomposes the task to the minimum gratuity to address the destabilizing factor in the large scale cluster environment. Each Map task will process one record. Although the infranstructure can be modified for granularity customization, the coding is difficult and is not practical. Data decomposing by this way can address the fault at relatively less cost. However, the scheduling cost for task-decomposing is relatively great.

In the code3, once the computation is done in the node machine, the result is not written into any file or sent back to the summary machine. Instead, the data exchange is done on the node machine directly. esProc is a scripting language allowing users to strike a balance between the security and the performance. For those who care more for the security on the intermediate result, esProc allows them to write the data to HDFS and then exchange; For those who care more for data exchange performance, esProc allows them to exchange the data directly.
In MapReduce, the data exchange must be done through the files to ensure the safety of intermediate result. Even if the node machine is broken down, the completed result will not disappear. In an environment of large cluster, the node machine may easily encounter such fault, which justfy this method. However, file exchanging data will definitly cause a large deal of disk IO operations and the computational performances will decline obviously.


Code2 is to read the two involved fields from the product table into the memory for computation all at once, as global variables on the node machine. Such in-memory sharing will save the time to retrieve the product table in each task. This is because every node machine will go through the computation for multiple rounds, and each round will perform the multi-threads/tasks computation. The smaller the node scale and the more computational tasks, and more obvious the performance increase will be.
MapReduce does not implement such memory sharing. That is because it is assumed that the computational node will crash freaquently in the environment of large cluster, and the data obtained from the crashed memory is meaningless. In this case, it is quite safe to use the HDFS file for sharing directly. MapReduce does not support the memory sharing. Each time, users must retrieve the data from hard disk before they can use the product table. So, its efficiency is two orders of magnitude worse.

As can be seen from the code 3, the product table is the global variable retrieved from the memory directly while the order table is still too great to read into memory. We use cursor to access. By this way, the efficient memory associative computation is achieved. Needless to say, if proceeding with the decomposing task, the section of order table can also be loaded into the memory. As can be seen, esProc allows for the arbitrary way to load the data. Both the file curosr method of relatively great data volume but slow spead, and the in-memory loading method for data of small volume but faster speed are enabled.
For MapReduce, the default external memory computation is to retrieve the data from file for associative computation and grouping computation. It is quite good to handle the unstable large-scale cluster enviroment. Although the memory buffer technique is adopted at the underlyer for MapReduce, it makes no difference on its poor performance because it is still heaviliy relies on the disk IO to the core. To change to the in-memory computation, users need to change the native infrastructure of MapReduce at the great development cost.

Judging from the four aspects above, we can conclude that esProc can efficiently implement the memory computation for Hadoop, and is suitable for users of middle and small scale cluster.

December 1, 2013

Which is faster in database computing,Stored Procedure or esProc?

As we know, the stored procedure is designed to handle computations involving complex business logics.

In the past, the data structure and business logics were so simple that one SQL statement was enough to achieve user's computational goal. With the rapid growing of information industry, users frequently find that they need to achieve the increasingly complex computational goals to out-perform their competitors. To address such computations, SQL alone is far from enough. Database programmers have the additional demands regarding the judge and loop statements, branches at multiple levels, or more accurate data transverse operations, as well as decomposing an obscure goal into several clear and actionable steps correlated with complex logics. It was for all these demands that the stored procedure was introduced.

The stored procedure has so far become the preferred tools for complex data computations, playing an quite important role. However, stored procedures still cause various inconveniences. For example, lots of their functions are inconvenient - hard to debug or migrate; and there are some databases with rather weak stored procedures. These problems sometimes affect the efficiency of database developers seriously.

The inconvenient functions of stored procedure include the incomplete step-by-step computation, weak support for set-lized data computing, no sequence number can be set for the data set, and no object-reference mechanism. Let's check it out with a simple example - find the "top n best-selling products in whatsoever State" by analyzing the regional sales table. In this scenario, those inconveniences makes it quite complex to write a stored procedure.

01    create or replace package salesPkg
02    as
03             type salesCur is ref cursor;
04    end;
05    CREATE OR REPLACE PROCEDURE topPro(io_cursor OUT salesPkg.salesCur) 
06    is
07       varSql varchar2(2000);
08       tb_count integer;
09    BEGIN 
10      select count(*) into tb_count from dba_tables where table_name='TOPPROTMP';
11      if tb_count=0 then
12      strCreate:='CREATE GLOBAL TEMPORARY TABLE TOPPROTMP (
                     stateTmp NUMBER not null,
                     productTmp varchar2(10)  not null,
                     amountTmp NUMBER not null
              )
              ON COMMIT PRESERVE ROWS';
13      execute immediate strCreate;
14      end if;
15      execute immediate 'truncate table TOPPROTMP';
16      insert into TOPPROTMP(stateTmp,productTmp,amountTmp)
     select state,product,amount from stateSales a
       where not(
         (a.state,a.product) in (
           select state,product from stateSales group by state,product having count(*) > 1
         )
         and rowid not in (
           select min(rowid) from stateSales group by state,product having count(*)>1
         )
       )
     order by state,product;
17      OPEN io_cursor for
18      SELECT productTmp  FROM (
    SELECT stateTmp,productTmp,amountTmp,rankorder
    FROM (SELECT stateTmp,productTmp,amountTmp,RANK() OVER(PARTITION BY stateTmp ORDER BY amountTmp DESC) rankorder 
          FROM TOPPROTMP
         ) 
    WHERE rankorder<=10 order by stateTmp
    )
  GROUP BY productTmp 
  HAVING COUNT(*)=(SELECT COUNT(DISTINCT stateTmp ) FROM TOPPROTMP);
END;

In which, the code at line 16 is to filter the duplicates, and write the filtered data to the ”temporary table”. Since it is difficult to retrieve the distinct data directly, try this tip: find the duplicate data, then use "not" to reverse the condition, and the remaining is the distinct data. This function can be implemented by embedding two sub-queries.

For another example, the line 18 is to find the products ranking among the top 10 in whatsoever State. Firstly, use the window function to get the product rankings rankorder of each State; Secondly, filter out the top 10 products in each State; Lastly, get the products ranking among the top 10 in whatsoever State. SQL does not provide any functions to seek the intersection sets. So, to address this weak point, here is another tip: group by product so as to check if the number of a same products is equal to the number of States; if they are equal, it indicates that the product is ranking among the top 10 in every State.

Besides the inconvenient functions, the rather weak support for debug function is another inconvenience of stored procedures.

Although there are Oracle, DB2, and other databases offering debug function for their respective stored procedures in the market, their debug functions are incomplete. In running the stored procedure, no matter a SQL statement is long or short, no matter how many nested loops or the computational steps it contains, programmers can only view the result of one statement, and the intermediate procedure is completely transparent to them. It rather defeats the purpose of step-by-step debug, and compromising the benefits of running to cursor or the next step. The programmers are actually only allowed to view the cursor and the simple variables. Such variables are useful, but by no means the same important as the intermediate procedure of SQL. Another trouble is that lots of setting and preparation workloads are required to launch the debugging tools. The beginners can hardly handle it without guides.

The third inconvenience is that the stored procedure is hard to migrate. Generally speaking, SQL can be migrated with a few simple modifications. Despite the slight difference in syntax details, the SQLs from various vendors are all the supersets of the ANSI standard. However, the stored procedure is quite another thing. Migrating a stored procedure is much more complex than rewriting one because the relevant standards of various vendors differ greatly. In this situation, users have no choice but stick to one database vendor rigidly. There is not any room left for users to beat down the price if database vendors overcharge them on upgrading their servers, storage, and user license agreements.

SQL is an essential function of any database, while the stored procedure is not. Some databases only offer the relatively poor stored procedures, and the others do not provide it at all. Take the stored procedures of MySQL for example. Its functions and performances are worse than that of the MS SQL, Oracle and some other databases, and MySQL may throw many exceptions on intensive concurrency. MSSQL Compact, SQLITE, Hive, Access, and other databases do not support the stored procedures.

Obviously, the inconveniences of stored procedures have compromised the computational performance of database, and given much troubles to programmers – considerable development difficulty, inefficient development, and awkward maintenances. In addition, these inconveniences also affect the result of implementing business logics, achieving complex computational goal, and making smart business decision. Then, how to empower the stored procedure?

esProc is a database computing script specially built for addressing the complex computational goal. esProc additionally provides the advantageous intuitive grid style, step-by-step computation, professional debug features, agile syntax, complete computational system, and seamless support for the interactive computations between various databases.












esProc is a scripting tool with a grid style. With esProc, the computational logics can be laid out in a 2D space conveniently. The business algorithm can thus be interpreted in the computer language more easily. esProc advocates the step-by-step computation. To put it concretely, esProc enable users to decompose a complex goal into several simple steps in its grid, and ultimately achieve a complex goal by accomplishing those simple objectives. Designed with the "step-by-step" thoughts, a really practical debug function is introduced with esProc, including various functions like the break point, stepping, run to cursor, start, and end. Unlike the fake debugging script as SQL/SP, esProc can debug the basic steps directly and straightforwardly, needless building a specific intermediate table. The break point can be set in any position without altering the code.

esProc supports the true data type of set. A member of a set can be the data of any simple data types, records, and/or other sets. esProc supports the ordered set, which means that users can access to the set member and perform the sequence-number-related computation, for example, ranking, sorting, year-over-year comparison, and link relative ratio comparison. The set-of-set can be used to represent the equal grouping, align grouping, and enum grouping. In addition, users can operate on the individual records in the same way as they used to operate on an object. esProc can represent the complex computation more easily with its agile syntax, for example, computing the relative positions in multi-level groupings, and grouping and summarizing by a specified set.

esProc can empower the stored procedures, and ultimately boost the computational capability of database, reduce the development difficulty for programmers, improve the development efficiency, and facilitate the code maintenance and migration. esProc can implement the complex data algorithms and business logics easily.

Finally, for the above case we've discussed to conclude the inconveniences of stored procedures, let's check out the esProc solution shown below: