October 30, 2013

With esProc, Hadoop programming is not as hard as before

Hadoop is an outstanding big data solution. On one hand, its low cost and high scalability increases its popularity; on the other hand, its low development efficiency incurs user complaints.

Hadoop is based on the MapReduce framework for big data development and computation. Everything seems to be well if the computing task is simple. However, issues appear for those a little bit more complex computations. The poor development efficiency will bring more and more serious impacts with the growing difficulty of problem. One of the commonest computations is the "associative computing".

For example, in HDFS, there are 2 files holding the client data and the order data respectively, and the customerID is the associated field between them. How to perform the associated computation to add the client name to the order list?

The normal method is to input 2 source files first. Process each row of data in Map according to the file name. If the data is from Order, then mark the foreign key with ”O” to form the combined key; If the data is from Customer, then mark it with ”C”. After being processed with Map, the data is partitioned on keys, and then grouped and sorted on combined keys. Lastly, combine the result in the reduce and output. It is said that the below code is quite common:

public static class JMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    //mark every row with "O" or "C" according to file name
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
             if (pathName.contains("order.txt")) {//identify order by file name
            String values[] = value.toString().split("\t");
            TextPair tp = new TextPair(new Text(values[1]), new Text("O"));//mark with "O"
            context.write(tp, new Text(values[0] + "\t" + values[2]));
     if (pathName.contains("customer.txt")) {//identify customer by file name
           String values[] = value.toString().split("\t");
           TextPair tp = new TextPair(new Text(values[0]), new Text("C"));//mark with "C"
           context.write(tp, new Text(values[1]));
public static class JPartitioner extends Partitioner<TextPair, Text> {
    //partition by key, i.e. customerID
    public int getPartition(TextPair key, Text value, int numParititon) {
        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
public static class JComparator extends WritableComparator {
    //group by muti-key
    public JComparator() {
        super(TextPair.class, true);
    public int compare(WritableComparable a, WritableComparable b) {
        TextPair t1 = (TextPair) a;
        TextPair t2 = (TextPair) b;
        return t1.getFirst().compareTo(t2.getFirst());
public static class JReduce extends Reducer<TextPair, Text, Text, Text> {
    //merge and output
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
             Text pid = key.getFirst();
             String desc = values.iterator().next().toString();
             while (values.iterator().hasNext()) {
                 context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
public class TextPair implements WritableComparable<TextPair> {
    //make muti-key
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    public TextPair(Text first, Text second) {
        set(first, second);
    public void set(Text first, Text second) {
       this.first = first;
       this.second = second;
    public Text getFirst() {
       return first;
    public Text getSecond() {
       return second;
    public void write(DataOutput out) throws IOException {
    public void readFields(DataInput in) throws IOException {
    public int compareTo(TextPair tp) {
       int cmp = first.compareTo(tp.first);
       if (cmp != 0) {
         return cmp;
             return second.compareTo(tp.second);
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
    //job entrance
    Configuration conf = new Configuration();
    GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
    String[] otherArgs = parser.getRemainingArgs();
    if (agrs.length < 3) {
     System.err.println("Usage:  J <in_path_one> <in_path_two> <output>");
    Job job = new Job(conf, "J");
    job.setJarByClass(J.class);//Join class
    job.setMapperClass(JMapper.class);//Map class
    job.setMapOutputKeyClass(TextPair.class);//Map output key class
    job.setMapOutputValueClass(Text.class);//Map output value class
    job.setPartitionerClass(JPartitioner.class);//partition class
    job.setGroupingComparatorClass(JComparator.class);//condition group class after partition
    job.setReducerClass(Example_Join_01_Reduce.class);//reduce class
    job.setOutputKeyClass(Text.class);//reduce output key class
    job.setOutputValueClass(Text.class);//reduce ouput value class
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//one of source files
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//another file
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);//run until job ends


As can be seen above, to implement the associative computing, since programmers are unable to use the raw data directly, they have to compose some complex codes to handle the tag, bypass the original framework of MapReduce, and design and compute the associative relation between data from the bottom layer. Obviously, handling such computations by this way requires those programmers with strong programming skills. Plus, it is quite time-consuming, and there is no guarantee on computational efficiency. The above case is just the simplest kind of associated computation. As you can image, if using MapReduce for multi-table association or the associative computing with complex business logics, then the degree of complexity will rise in a geometric ratio. The difficulty and development efficiency becomes nearly unbearable.

In fact, the associative computing itself is common and by no means complex. The reason of the apparent difficulty is that MapReduce is not specialized enough in a certain sector though it has strong universality. Similarly, developing via MapReduce is also quite inefficient when it comes to the ordered computations like year-on-year comparison and median operation, and the align or enum grouping.

Although Hadoop has packaged Hive/Pig and other advanced solutions with MapReduce, these solutions on one hand is not powerful enough, on other hand, they only offer the rather simple and basic queries. To complete the business logics involving complex procedure, the hard coding is still unavoidable.

Then, what can we do to boost the development efficiency for Hadoop? esProc is quite a good choice!

esProc is a pure Java parallel computation framework with the focus on boosting the capability of Hadoop and the basic ability to improve the development efficiency of Hadoop programmers.

Still the above example, esProc solution is shown below:

Main program:

Sub program:


As can be seen, esProc code is very intuitive and straightforward. In the Main program, the number of tasks are specified and the parameters for each task are prepared. Tasks are allocated to 4 computational nodes. The results of task computation are merged and written to the file. The Sub programmer is equivalent to the Map procedure of MapReduce: intercept a range of data to join, and return the result to Main.  Intuitive, straightforward, and concise - that's the impression that esProc syntax gives us.

In fact, the associative computing is usually not the ultimate computational goal. The true business value is usually realized via summarizing in groups, sorting, and filtering after associative computing. For esProc, the whole procedure of all these computations can be written in a same graph of script, while MapReduce requires arranging another class for it. It is evident that MapReduce is far less efficient than esProc.

esProc is a scripting language specialized for big data, offering the true set data type, easy for algorithm design from user perspective, and effortless to implement the complex business logics of clients. In addition, esProc supports the ordered set for arbitrary access to the member of set and perform the serial-number-related computation. The set of set can be used to represent the complex grouping style easily, for example, the equal grouping, align grouping, and enum grouping. Users can operate on the single record in as same way of operating on an object. esProc scripts is written and presented in a grid. By this way, the intermediate result can be referenced without definition. To add convenience, the complete code editing and debugging mechanism is provided. In short, esProc can be regarded as a dynamic set-lized language which has something in common with R language, and offers native support for distributed parallel computation from the core. esProc programmers are benefited from the efficient parallel computation of esProc while still having the simple syntax of R. esProc is designed for data computing and optimized for big data processing. By working with HDFS, esProc can improve the development and computation efficiency of Hadoop significantly.

The framework of MapReduce restricts its use to the simple computation on the single data set, for example, searching. Due to this, it is not a good choice to use MapReduce for associative computing, ordered computation, and alike algorithms. esProc and other specialized computation scripts are essential to improving the development efficiency for such computations in Hadoop.