In esProc we can perform parallel processing on big text files
conveniently. The following example will illustrate the method.
There is a text file, sales.txt, with ten million sales records. Its
main fields include SellerID, OrderDate and Amount. Now compute each salesman’s
total amount of big orders in the past four years. The big orders refer to
those whose amount is above 2,000.
To do parallel processing, first we must segment the file. esProc
provides cursor data object and the related functions to segment and read big
text files conveniently. For example file(“e:/sales.txt”).cursor@tz(;,
3:24) means the file is divided roughly into 24 segments by
bytes, and the third one will be fetched. Dividing a file simply by bytes will
produce incomplete rows, or incomplete records, which need to be processed with
additional program. If splitting the file by rows, we have to traverse all the
rows while handling one segment of data, which cannot attain a high efficiency expected
from parallel processing based on segments. In contrast, esProc automatically
rounds off each segment by skipping the incomplete beginning row and completing
the ending one, thus ensuring the correctness of the data.
Then we simply need to perform the parallel processing after the
file is segmented. Like the following code shows:
Main Program
A1: Set the number of parallel tasks as 24,
that is, divide the file into 24 segments.
A2: Perform multithreaded computation. The
tasks are represented by to(A1), whose result is [1,2,3…24], which is the
serial number of each segment assigned to each thread. When all the threads
complete their computations, the results will be found in A2. B2-B5 is the code
within the fork statement.
B2: Read the file with cursor. Which
segment should be processed in the current task should be decided according to
parameter passed from the main thread.
B3: Select records whose order amount is
above 2,000 after the year of 2011.
B4: Group and summarize data for the
current segment.
B5: Return the result of the current thread
to A2, where the main thread resides.
A6: Merge the results of A2’s
multithreading tasks. Below is the selection of the result table:
A7: Group and summarize the merged data to get each seller’s sales amount. The result is as follows:
Code explanation
For the CPU with N cores, it appears natural to set N tasks. The
fact is that some tasks always run faster than others, say, according to the
differences of filtered data. So the common situation is that some cores are in
an idle state after finishing those faster tasks, but a few other cores are
still in running with slower tasks. Comparatively, if each core performs
multiple tasks in turn, the speed of different tasks will be averaged out and
the general operation will become more stable. In the above example, therefore,
the task is divided into 24 parts and given to CPU’s 8 cores to process (how
many parallel threads are allowed at the same time can be configured in the
environment of esProc). But, on the other hand, too many segmented tasks may
have weaknesses. One is the decline of whole performance, the other is that
there will be more computed results produced by each task when added together,
and they will occupy more memory.
fork makes development
process simpler by encapsulating complex multithread computation, which enables
programmers to focus on business algorithm instead of being distracted by
complicated semaphore control.
Computed results of A6 in the above main
program have been sorted automatically according to Sellerld, so it is not
necessary to sort again in A7 for the grouping and summarizing. @o, the
function option of groups, can group
and summarize data efficiently without sorting.
Further
illustration:
Sometimes when the data size of a text file
amounts to several TB, it is required to use multi-node parallel computation based
on a cluster. esProc can perform parallel computation easily. Because its
cursor and related functions support non-expensive
scale-out file system and distributed file system (DFS). The code will be
like this: =callx("sub.dfx", to(A1),A1; ["192.168.1.200:8281",
"192.168.1.201:8281", ”......”]). See related documents for detailed
usage description.
No comments:
Post a Comment