October 30, 2013

With esProc, Hadoop programming is not as hard as before

Hadoop is an outstanding big data solution. On one hand, its low cost and high scalability increases its popularity; on the other hand, its low development efficiency incurs user complaints.

Hadoop is based on the MapReduce framework for big data development and computation. Everything seems to be well if the computing task is simple. However, issues appear for those a little bit more complex computations. The poor development efficiency will bring more and more serious impacts with the growing difficulty of problem. One of the commonest computations is the "associative computing".

For example, in HDFS, there are 2 files holding the client data and the order data respectively, and the customerID is the associated field between them. How to perform the associated computation to add the client name to the order list?

The normal method is to input 2 source files first. Process each row of data in Map according to the file name. If the data is from Order, then mark the foreign key with ”O” to form the combined key; If the data is from Customer, then mark it with ”C”. After being processed with Map, the data is partitioned on keys, and then grouped and sorted on combined keys. Lastly, combine the result in the reduce and output. It is said that the below code is quite common:

public static class JMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    //mark every row with "O" or "C" according to file name
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
             if (pathName.contains("order.txt")) {//identify order by file name
            String values[] = value.toString().split("\t");
            TextPair tp = new TextPair(new Text(values[1]), new Text("O"));//mark with "O"
            context.write(tp, new Text(values[0] + "\t" + values[2]));
        }
     if (pathName.contains("customer.txt")) {//identify customer by file name
           String values[] = value.toString().split("\t");
           TextPair tp = new TextPair(new Text(values[0]), new Text("C"));//mark with "C"
           context.write(tp, new Text(values[1]));
        }
    }
}
public static class JPartitioner extends Partitioner<TextPair, Text> {
    //partition by key, i.e. customerID
    @Override
    public int getPartition(TextPair key, Text value, int numParititon) {
        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
    }
}
public static class JComparator extends WritableComparator {
    //group by muti-key
    public JComparator() {
        super(TextPair.class, true);
    }
    @SuppressWarnings("unchecked")
    public int compare(WritableComparable a, WritableComparable b) {
        TextPair t1 = (TextPair) a;
        TextPair t2 = (TextPair) b;
        return t1.getFirst().compareTo(t2.getFirst());
    }
}
public static class JReduce extends Reducer<TextPair, Text, Text, Text> {
    //merge and output
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
             Text pid = key.getFirst();
             String desc = values.iterator().next().toString();
             while (values.iterator().hasNext()) {
                 context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
     }
    }
}
public class TextPair implements WritableComparable<TextPair> {
    //make muti-key
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) {
        set(first, second);
    }
    public void set(Text first, Text second) {
       this.first = first;
       this.second = second;
    }
    public Text getFirst() {
       return first;
    }
    public Text getSecond() {
       return second;
    }
    public void write(DataOutput out) throws IOException {
       first.write(out);
       second.write(out);
    }
    public void readFields(DataInput in) throws IOException {
       first.readFields(in);
       second.readFields(in);
    }
    public int compareTo(TextPair tp) {
       int cmp = first.compareTo(tp.first);
       if (cmp != 0) {
         return cmp;
       }
             return second.compareTo(tp.second);
    }
}
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
    //job entrance
    Configuration conf = new Configuration();
    GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
    String[] otherArgs = parser.getRemainingArgs();
    if (agrs.length < 3) {
     System.err.println("Usage:  J <in_path_one> <in_path_two> <output>");
     System.exit(2);
    }
    Job job = new Job(conf, "J");
    job.setJarByClass(J.class);//Join class
    job.setMapperClass(JMapper.class);//Map class
    job.setMapOutputKeyClass(TextPair.class);//Map output key class
    job.setMapOutputValueClass(Text.class);//Map output value class
    job.setPartitionerClass(JPartitioner.class);//partition class
    job.setGroupingComparatorClass(JComparator.class);//condition group class after partition
    job.setReducerClass(Example_Join_01_Reduce.class);//reduce class
    job.setOutputKeyClass(Text.class);//reduce output key class
    job.setOutputValueClass(Text.class);//reduce ouput value class
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//one of source files
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//another file
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);//run until job ends

}

As can be seen above, to implement the associative computing, since programmers are unable to use the raw data directly, they have to compose some complex codes to handle the tag, bypass the original framework of MapReduce, and design and compute the associative relation between data from the bottom layer. Obviously, handling such computations by this way requires those programmers with strong programming skills. Plus, it is quite time-consuming, and there is no guarantee on computational efficiency. The above case is just the simplest kind of associated computation. As you can image, if using MapReduce for multi-table association or the associative computing with complex business logics, then the degree of complexity will rise in a geometric ratio. The difficulty and development efficiency becomes nearly unbearable.

In fact, the associative computing itself is common and by no means complex. The reason of the apparent difficulty is that MapReduce is not specialized enough in a certain sector though it has strong universality. Similarly, developing via MapReduce is also quite inefficient when it comes to the ordered computations like year-on-year comparison and median operation, and the align or enum grouping.

Although Hadoop has packaged Hive/Pig and other advanced solutions with MapReduce, these solutions on one hand is not powerful enough, on other hand, they only offer the rather simple and basic queries. To complete the business logics involving complex procedure, the hard coding is still unavoidable.

Then, what can we do to boost the development efficiency for Hadoop? esProc is quite a good choice!

esProc is a pure Java parallel computation framework with the focus on boosting the capability of Hadoop and the basic ability to improve the development efficiency of Hadoop programmers.

Still the above example, esProc solution is shown below:

Main program:
esProc










Sub program:

esProc





As can be seen, esProc code is very intuitive and straightforward. In the Main program, the number of tasks are specified and the parameters for each task are prepared. Tasks are allocated to 4 computational nodes. The results of task computation are merged and written to the file. The Sub programmer is equivalent to the Map procedure of MapReduce: intercept a range of data to join, and return the result to Main.  Intuitive, straightforward, and concise - that's the impression that esProc syntax gives us.

In fact, the associative computing is usually not the ultimate computational goal. The true business value is usually realized via summarizing in groups, sorting, and filtering after associative computing. For esProc, the whole procedure of all these computations can be written in a same graph of script, while MapReduce requires arranging another class for it. It is evident that MapReduce is far less efficient than esProc.

esProc is a scripting language specialized for big data, offering the true set data type, easy for algorithm design from user perspective, and effortless to implement the complex business logics of clients. In addition, esProc supports the ordered set for arbitrary access to the member of set and perform the serial-number-related computation. The set of set can be used to represent the complex grouping style easily, for example, the equal grouping, align grouping, and enum grouping. Users can operate on the single record in as same way of operating on an object. esProc scripts is written and presented in a grid. By this way, the intermediate result can be referenced without definition. To add convenience, the complete code editing and debugging mechanism is provided. In short, esProc can be regarded as a dynamic set-lized language which has something in common with R language, and offers native support for distributed parallel computation from the core. esProc programmers are benefited from the efficient parallel computation of esProc while still having the simple syntax of R. esProc is designed for data computing and optimized for big data processing. By working with HDFS, esProc can improve the development and computation efficiency of Hadoop significantly.

The framework of MapReduce restricts its use to the simple computation on the single data set, for example, searching. Due to this, it is not a good choice to use MapReduce for associative computing, ordered computation, and alike algorithms. esProc and other specialized computation scripts are essential to improving the development efficiency for such computations in Hadoop.




October 22, 2013

Four Methods for Structured Big Data Computation

All data can only have the existence value by getting involved in the computation to create value. The big data makes no exception. The computational capability on structural big data determines the range of practical applications of big data. In this article, I'd like to introduce several commonest computation methods: API, Script, SQL, and SQL-like languages.
         
API: The "API" here refers to a self-contained API access method without using JDBC or ODBC. Let's take MapReduce as an example. MapReduce is designed to handle the parallel computation cost-effectively from the very bottom layer. So, MapReduce offers superior scale-out, hot-swap, and cost efficiency. MapReduce is one of the Hadoop components with open-source code and abundant resources.
Sample code:

            public void reduce(Text key, Iterator<Text> value,
            OutputCollector<Text, Text> output, Reporter arg3)
            throws IOException {
            double avgX=0;
            double avgY=0;
            double sumX=0;
            double sumY=0;
            int count=0;
            String [] strValue = null;
            while(value.hasNext()){
                count++;
                strValue = value.next().toString().split("\t");
                sumX = sumX + Integer.parseInt(strValue[1]);
                sumY = sumY + Integer.parseInt(strValue[1]);
            }
           
            avgX = sumX/count;
            avgY = sumY/count;
            tKey.set("K"+key.toString().substring(1,2));
            tValue.set(avgX + "\t" + avgY);
            output.collect(tKey, tValue);
        }
Since the universal programing language adopted is unsuitable for the specialized data computing, MapReduce is less capable than SQL and other specialized computation languages in computing. Plus, it is inefficient in developing. No wonder that the programmers generally complain it is "painful". In addition, the rigid framework of MapReduce results in the relatively poorer performance.
         
There are several products using API, and MapReduce is the most typical one among them.

Script: The "Script" here refers to the specialized script for computing. Take esProc as an example. esProc is designed to improve the computational capability of Hadoop. So, in addition to the inexpensive scale-out, it also offers the high performance, great computational capability, and convenient computation between heterogeneous data sources, especially ideal for achieving the complex computational goal. In addition, it is the grid-style script characterized with the high development efficiency and complete debug functions.

Sample code:


A
B
1
=file(“hdfs://192.168.1.200/data/sales.txt”).size() 
//file size
2
=10     
//number of tasks
3
=to(A2)
//1 ~ 10, 10 tasks
4
=A3.(~*int(A1/A2))
//parameter list for start pos
5
=A3.((~-1)*int(A1/A2)+1)
//parameter list for end pos
6
=callx(“groupSub.dfx”,A5,A4;[“192.168.1.107:8281”, “192.168.1.108:8281”]) 
//sub-program calling, 10 tasks to 2 parallel nodes
7
=A6.merge(empID)    
//merging task result
8
=A7.group@i(empID;~.sum(totalAmount):
orderAmount,~.max(max):maxAmount,~.min(min)
:minAmount,~.max(max)/~.sum(quantity):avgAmount)       
//summarizing is completed


Java users can invoke the result from esProc via JDBC, but they are only allowed to invoke the result in the form of stored procedure instead of any SQL statement. Plus, esProc is not open source. These are two disadvantages of esProc.
         
The Script is widespread used in MongoDB, Redis, and many other big data solutions, but they are not specialized enough in computing. For another example, the multi-table joining operation for MongoDB is not only inefficient, but also involves the coding of one order of magnitude more complex than that of SQL or esProc.

SQL: The "SQL" here refers to the complete and whole SQL/SP, i.e. ANSI 2000 and its superset. Take Greenplum as an example, the major advantages of Greenplum SQL are the powerful computing, highly efficient developing, and great performance. Other advantages include the widespread use of its language, low learning cost, simple maintenance, and migration possibility - not to mention its trump-card of offering support for stored procedure to handle the complex computation. By this way, business value can be exploited from the big data conveniently.
Sample code:
CREATE OR REPLACE function view.merge_emp()
returns void as $$
BEGIN
                 truncate view.updated_record;
                  insert into view.updated_record select y.* from view.emp_edw x right outer join       emp_src y       on x.empid=y.empid where         x.empid is not null;
                  update view.emp_edw set deptno=y.deptno,sal=y.sal from view.updated_record y   where          view.emp_edw.empid=y.empid;
                  insert into emp_edw select y.* from emp_edw x right outer join emp_src y on   x.empid=y.empid where  x.empid is null;
end;
$$ language 'plpgsql';

The other databases with the similar structure to MPP include Teradata, Vertical, Oracle, and IBM. Their syntax characteristics are mostly alike. The disadvantages are similar. The acquisition cost and the ongoing maintenance expenses are extremely high. Charging its users by data scale, the so-called inexpensive Greenplum is actually not a bargain at all - it is way more like making big money under cover of big data. Other disadvantages include awkward debugging, incompatible syntax, lengthy down-time if expansion, and awkward multi-data-source computation.

SQL-like language: It refers to the output interfaces like JDBC/ODBC and only limited to those scripting languages that are the subset of standard SQL. Take Hive QL as an example. The greatest advantage of Hive QL is its ability to scale out cost-effectively while still a convenient tool for users to develop. The SQL syntax feature is kept in Hive QL, so that the learning cost is low, development efficient, and maintenance simple. In addition, Hive is a component of Hadoop. The open-source is another advantage.
Sample code:

SELECT e.* FROM (
         SELECT name, salary, deductions["Federal Taxes"] as ded,
                  salary * (1 – deductions["Federal Taxes"]) as salary_minus_fed_taxes
        FROM employees
         ) e
WHERE round(e.salary_minus_fed_taxes) > 70000;
         
The weak point of Hive QL is its non-support for stored procedure. Due to this, it is difficult for Hive QL to undertake the complex computation, and thus difficult to provide the truly valuable result. The slightly more complex computation will rely on MapReduce. Needless to say, the development efficiency is low. The poor performance and the threshold time can be regarded as a bane, especially in task allocation, multi-table joining, inter-row computation, multi-level query, and ordered grouping, as well as implementing other algorithm alike. So, it is quite difficult for Hive QL to implement the real-time Hadoop application for big data.
         
There are also some other products with SQL-like languages - MongoDB as an example - they are still worse than Hive yet.

The big data computation methods currently available are no more than these 4 types of API, Script, SQL, and SQL-like languages. Wish they would develop steadfastly and there would be more and more cost-effective, powerful, practical and usable products for data computing.

October 14, 2013

Why esProc is needed in Hadoop


esProc is a brand new parallel computing framework with support for reading and writing to the files in HDFS and the full commitment to improve the computational capability, performance, and development efficiency of Hadoop.

>Enhance Computational Capability
The computational capability of Hadoop is developed on the basis of the Java language and the MapReduce parallel framework. Java is really outstanding for it is broadly and generally used in many common applications. However, Java is not powerful enough for the computation in many specialized fields. MapReduce lacks the library functions to support even the simplest data algorithm. No direct support for typical data algorithms of associated computation, sub-query, inter-row computation, and ordered computation. Its computational capability is rather weak.

esProc is also a Java-based parallel framework and provides the script more specific to the big data computation and optimal to process the big data. esProc can work with HDFS to improve the computational capability of Hadoop greatly.

In order to boost the computational capability, Hive SQL is packaged on MapReduce for Hadoop. The computational capability of Hive SQL is quite limited since it is just a subset of SQL with no support for stored procedure, incapable of completing the complex data computation.
 
With the complete computational system and powerful computational capability, esProc can meet any computational demand effortlessly, and solve the complex data computing in a way easier than stored procedures do. esProc allows for invoking the computational result of Hive, and improves the computational capability of Hadoop by working with Hive.

>Boost Computational Performance
Developed with a rigid frame, MapReduce is inflexible in decomposing and allocating tasks, extremely resource-consuming, and relatively poor in the real-time actions. By comparison, esProc enables the arbitrary task allocation. In the extreme conditions, the time spent on allocating task is only one out often million of the time required by MapReduce, and the parallel performance of esProc is superior.

In MapReduce, the intermediate result of the cross-machine interactions must be stored in HDFS as a file. Although this is an advantage for fault tolerance, the great obstacle of delay is also incurred due to this. By comparison, esProc allows users to make the flexible choice according to the duration of computation. The intermediate result can either be used directly to reduce the obstacle of delay or stored in HDFS to increase the fault tolerance.

It is awkward for MapReduce to complete the common data computations such as the multi-table association, year-over-year and link relative ratio comparisons. If implementing such computation with any workarounds or indirect solutions of MapReduce, then the computation performance will decline dramatically. By comparison, esProc provides the native support for such computation. The combination use of esProc and HDFS boosts the computation efficiency of Hadoop dramatically.

The infrastructure of Hive is still the MapReduce, which implements the common algorithms like associated computation at the cost of performance, usually resulting in a performance of one order of magnitude inferior to that of RDB. The performance of esProc is close or even partly superior to RDB. esProc can work with Hive via JDBC to undertake the computational task with strict requirements on real-time processing.

>Improve Development Efficiency
Even for the simplest computation, MapReduce users will have to program manually - the development efficiency is low. Moreover, MapReduce requires relatively stronger development skills and greater workload to implement the associated computation, ordered computation, equal grouping, year-over-year and link relative ratio comparisons. Hive does not support the stored procedure, and still have to rely on MapReduce to handle a little bit more complex computations.

For the common algorithms, esProc provides abundant library functions for direct use; For the complex algorithm logics, esProc provides the agile syntax and professional IDE for implementing with ease. Working with HDFS and Hive, esProc can greatly boost the development efficiency of Hadoop. With the true support for the data type of set, esProc enables the ordered set and the set-lized groupings, such as the equal grouping, align grouping, and enum grouping. esProc scripts are written in a grid-style cellset so that users can reference the intermediate computational result directly without defining anything.

The debugging function of MapReduce is so outdated that users can only identify the error by checking error messages in the log file. By comparison, esProc supports the break point, step-by-step run, run to cursor, start, end, and other specialized debugging function to ensure the development efficiency.

To define the task scale arbitrarily, MapReduce users will have to customize MapReduce framework, which is not only tough but also compromises the development efficiency seriously. esProc is flexible and arbitrary in task allocation, and the development efficiency is quite high.

esProc has all outstanding features of Hadoop - parallel computations on multiple nodes, inexpensive hardware for scale-out, and open external interface. In addition, esProc renovates Hadoop with the flexible parallel framework, specialized script for big data, agile syntax, and professional IDE.