Big data processing: chaining Hadoop jobs using Taverna

Processing very large data sets is a core challenge of the SCAPE project. Using the SCAPE platform and a variety of services and tools, the SCAPE Testbeds are developing solutions for real world institutional scenarios dealing with big data.

The SCAPE platform is based on Apache Hadoop, an implementation for MapReduce and a programming model especially designed for distributed processing of large datasets. I remember Alan Akbik, a SCAPE colleague from Technische Universität Berlin, saying in one of our initial meetings: “Hadoop is a hammer”, and I soon realised that, indeed, Hadoop is a very powerful hammer, but still, as a hammer it usually only solves part of a problem given by a real world institutional scenario.

Not surprisingly, there is not only a hammer, but a rich variety of sophisticated tools available in the Apache Hadoop ecosystem, like Pig and Hivefor example, providing higher-level languages, which translate processing instructions (Pig) and SQL like queries (Hive) into a (set of) MapReduce job(s). Or, one abstraction level higher, a set of jobs can be linked simply through the job objects using the Apache Hadoop API, or using Apache Oozie for managing even mixed types of jobs (for example MapReduce, Pig, and Hive) with complex dependencies.

In the SCAPE project, the Taverna Workflow Workbench is used for orchestrating long term preservation tools and services operating on an underlying data flow. In the following I will therefore present a simple way of chaining hadoop jobs using Taverna’s Tool service invocation mechanism.

To be more concrete, the institutional scenario is to parse large amounts of HTML files that are part of a huge book collection where each HTML page represents layout and text of a corresponding book page image. These HTML files have block level elements described by the HTML tag ‘div’. Each of them has a position, width and height representing the surrounding rectangle of a text or image block. My former colleague at the Austrian National Library, Martin Reisacher, was using the average block width of these ‘div’ elements for detecting quality issues due to cropping errors. I don’t want to go into greater depth on this issue now, let us just assume that you can make use of the average block width for some kind of further analysis. Generally, this is a good use case for demonstrating MapReduce because, on the one hand, it not only uses the Map function for parallelisation of the HTML parsing, but also uses the Reduce function for calculating the average block width, and, on the other hand, it requires some data preparation before the MapReduce programming model can be applied effectively.

Back to the question how these jobs are linked using the Taverna Workbench, the following diagram shows the Taverna workflow (also available on myExperiment) and it’s components used in this simple linear data flow:

First of all, dealing with lots of HTML files, means that we are facing Hadoop’s “Small Files Problem”. In brief, this is to say that the files we want to process are too small for taking them directly as input for the Map function. In fact, loading 1000 HTML files into HDFS – which by the way would require quite some time – in order to parse them in a Map function, would let the Hadoop JobTracker create 1000 Map tasks. Given the task creation overhead this would result in a very bad processing performance. In short, Hadoop does not like small files, but, on the contrary, the larger the better.

One approach to overcome this shortcoming is to create one large file, a so called SequenceFile, in a first step, and subsequently load it into the distributed file system (HDFS). This is done by the HadoopSequenceFileCreator Taverna component in the figure above. The component is based on a Map function which reads HTML files directly from the file server, and stores a file identifier as ‘key’ and the content as BytesWritable ‘value’ (key-value-pair), as illustrated in the following figure:

As each processing node of the cluster has access to the file server, and given that each node executes several tasks simultaneously using all CPU cores of the worker nodes, the SequenceFile is created in a highly parallelised manner, limited basically by the bandwidth of the internal network (in this case SequenceFile creation is highly I/O bound). Using block compression for the sequence files, there will be less I/O traffic when running Hadoop jobs later on.
The JobTracker can then split the SequenceFile into 64 MegaByte splits, so that each TaskTracker parses a bundle of HTML files and the task creation does not weigh so much compared to the amount of data it processes.

Once the data is loaded into HDFS, the SequenceFileInputFormat can be used as input in the subsequent MapReduce job which parses the HTML files using the Java HTML parser Jsoup in the Map function and calculates the average block width in the Reduce function. This is done by the HadoopHocrAvBlockWidthMapReduce Taverna component.

The handover mechanism between the two jobs is simply established by the first job writing the output HDFS path to standard out which the second job takes then as the HDFS input path. The second job only starts after the first job has completely finished

Let k1, as the identifier of the HTML file (data type: org.apache.hadoop.io.Text), and v1, as the value holding the content of the HTML file (data type: org.apache.hadoop.io.BytesWritable) be the key-value pair <k1, v1> input of the Map function. A book page usually contains several block level elements, therefore the Mapper writes one <k1, v1> key value pair for each block that the parser finds. The value is a string with coordinates, width, and height of the block element.

The Reduce function now receives a <k1, <v1>> list input, so that we can iterate over the blocks <v1> of each HTML file k1 in order to calculate the average block width. The output of the Reduce function is then <k1, v2>, v2 (data type: org.apache.hadoop.io.LongWritable) being the average block width.

Finally, the HadoopFsCat Taverna component simply writes content of the result file out to standard out which is only used for demonstration on small data sets.

Job execution can be monitored, both, in the Taverna Workflow Workbench (if the component is gray, processing finished successfully):

and using the Hadoop Map/Reduce Administration:

where each job will be listed separately.

Let us now extend the “hOCR use case” and include image metadata (image width) of the book page images. For orchestrating the jobs, we create a workflow which is using 1) a Hadoop Streaming API component (HadoopStreamingExiftoolRead) based on a bash script for reading image metadata using Exiftool, 2) the Map/Reduce component (HadoopHocrAvBlockWidthMapReduce) presented above, and 3) Hive components for creating data tables (HiveLoad*Data) and performing queries on the result files (HiveSelect):
 


Conceptually, this workflow is intended to be a “data preparation” component which makes data available for doing analytic queries using Hive’s MySQL-like query language. The ‘HiveSelect’ component is for testing that data has been loaded successfully, it executes a SELECT query with a JOIN on the two tables created by Hive:

select hocr_data.identifier,hocr_data.width,exif_data.width from hocr_data inner join exif_data on hocr_data.identifier=exif_data.identifier;

Identifier                       Average width    Exif width
Z119585409/00000218    1041                   2210
Z119585409/00000219    826                     2245
Z119585409/00000220    1122                   2266
Z119585409/00000221    1092                   2281
Z119585409/00000222    1026                   2102
Z119585409/00000223    1046                   2217
Z119585409/00000224    864                     2263

Using a small data set (here: one book with 815 pages) during development we can always monitor the execution time of the components involved and analyse where improvement is needed:

In this case, we can see that the Hadoop Map/Reduce job runs about 45 seconds and decide to focus on this component for improving the overall workflow runtime.

As a conclusion, Taverna offers a simple way of linking jobs using Taverna’s Tool Service invocation mechanism.

The principal use of the Taverna Workbench is for demonstrating and sharing workflows during the design and development phase. As Hadoop jobs are usually long-running, since Taverna version 2.3 the processing can be started on the command line for running productive workflows on a cluster:

<taverna-install-dir>/taverna-2.3.0/executeworkflow.sh -embedded -inputvalue rootpath <path-to-input-dir> -inputvalue <job-name-prefix> -outputdir <output-dir> <path-to-workflow>/Hadoop_hOCR_parser_with_exiftool.t2flow

The Taverna workflows are available on myExperiment (workflow1 and workflow 2) and the code for the two hadoop jobs is available on Github: tb-lsdr-seqfilecreator and  tb-lsdr-hocrparser.

Have fun!

By shsdev, posted in shsdev's Blog

7th Aug 2012  10:07 AM  16443 Reads  No comments

Comments

There are no comments on this post.


Leave a comment