Introduction
MapReduce is a programming model widely used in data centers for processing large data sets in a highly parallel way. Current MapReduce systems are based on master-slave architectures that do not cope well with dynamic node participation, since they are mostly designed for conventional parallel computing platforms. On the contrary, in dynamic computing environments, node churn and failures – including master failures – are likely to happen since nodes join and leave the network at an unpredictable rate. P2P-MapReduce is a framework developed at the University of Calabria that exploits a peer-to-peer model to manage intermittent node participation, master failures and job recovery in a decentralized but effective way, so as to provide a more robust MapReduce middleware that can be effectively used in dynamic computing environments.
The P2P-MapReduce framework is currently available as an open-source research prototype. It employs the JXTA framework for managing the peer-to-peer network, and code from the Apache Hadoop project for managing the MapReduce computations.
Installation
Distribution file
The distribution file of P2P-MapReduce 1.0 contains:
- The core library of the P2P-MapReduce framework: P2PMapReduce.jar
- All the required external libraries inside the ‘lib’ directory:
Library Name | Description | Dependencies |
JXSE_2.7.jar | JXTA 2.7 core library http://jxta.kenai.com/ | bcprov-jdk16-145.jar; h2-1.2.127.jar; org.mortbay.jetty.jar; netty-3.1.5.GA.jar; httptunnel-0.92.jar |
bcprov-jdk16-145.jar | Used by JXTA for encryption | |
h2-1.2.127.jar | Used by JXTA for H2 implementation of the cache manager (Cm) | |
org.mortbay.jetty.jar | Required by JXTA HTTP transport | javax.servlet.jar |
javax.servlet.jar | Required by Jetty | |
netty-3.1.5.GA.jar | Required by JXTA TCP transport | |
httptunnel-0.92.jar | Required by JXTA | |
commons-net-ftp-2.0.jar | Used for the implementation of a FTP client | |
ftplet-api-1.0.5.jar | FTP Server API | |
ftpserver-core-1.0.5.jar | FTP Server API implementation | mina-core-2.0.0-RC1.jar; slf4j-api-1.5.2.jar |
mina-core-2.0.0-RC1.jar | mina-core-2.0.0-RC1.jar; slf4j-api-1.5.2.jar | |
slf4j-api-1.5.2.jar | slf4j-log4j12-1.5.2.jar | |
slf4j-log4j12-1.5.2.jar | log4j-1.2.14.jar | |
log4j-1.2.14.jar |
- Configuration files inside the ‘conf’ directory:
- P2PMapReduce.properties: A property file for specifying some configuration parameters of P2P-MapReduce
- user.properties: FTP user configurations
- Binary files:
- startNode.sh: Simple bash script to start a new P2P-MapReduce node on a Unix/Linux platform
How to install
To install the P2P-MapReduce framework just extract P2P-MapReduce-1.0.tar.gz into a directory of your choice, calling it <installation_dir>. The directory structure will be as follows:
- <installation_dir>
- P2PMapReduce.jar
- lib/
- <all the required external libraries>
- conf/
- <all the configuration files>
- src/
- <all the source files>
- doc/
- <Javadoc files>
- startNode.sh
Run a computing node
All the configurable framework parameters can be modified through the <conf/P2PMapReduce.properties> file. The most important parameters are:
Key | Description | Default value |
direct.find.waiting.time | Time (seconds) to wait for receiving query responses from the network. | 3 |
peridoc.publish.timeout | Time interval (sec.) to wait before publishing an updated version of a resource in the network | 60 |
adv.expiration.time | Maximum time period (sec.) for considering a discovered resource still available. | 120 |
coordinator.master.ratio | Master/Slave ratio. Values accepted are those in the interval [0,1). Special value 0 indicates one master only. Values between 0 and 1 indicate the desired ratio between masters and slaves | 0.2 |
job.backupmanager.required | The desired number of backup masters for a single job. All positive values, 0 included, are valid. Special value -1 indicates that all the available masters must be used as backup masters | 2 |
mapreduce.job.inputsplit.size | The minimum input split size for MapReduce files (MBytes) | 1 |
gui | True if you want a GUI interface | true |
logging.level | P2P-MapReduce framework logging level (see java.util.logging package) | WARNING |
Start a node
To make the P2P-MapRedcuce system work, at least two node have to been stared in order to have at least one master node and one slave node. To start a P2P-MapReduce node, just execute the script startNode.sh available in <installation_dir>.
Node GUI
When a node is started with parameter gui=true a graphical interface is started:
Figure 1
The windows title indicates the node type (master (M) or slave (S)) and node ID (e.g., D39F03). The window includes three panel: Net, Exec and Log. The Log panel shows the application log output. The Net panel (shown in Figure 1) shows network information. The upper side shows JXTA network information like GroupID, PeerID and CoordinatorID. In the middle two lists are shown, which respectively represents all the slave nodes and all the masters nodes known. Each list entry shows some node information:
- node ID
- node load
- node information version number
The Exec panel is made by three lists showing a summary about the job or tasks managed by the node. If a node is a master (Figure 2) it will show the jobs managed as the primary master and those managed as a backup master. For each of them, some summary information is shown:
- job ID
- job state
- completion status (percentage) of the map and reduce phases
Figure 2
By double-clicking on one of the list entries, an information window associated with the job will be shown (Figure 3). The information includes:
- Long version of the job ID
- ID of the primary master associtated with the job
- number of tasks submitted (in execution, failed and successful completed)
- list of all the backup masters for the job
- list of the tasks submitted for the job, chronologically ordered
Figure 3
If a node is a slave (Figure 4) the list of all managed tasks will be shown.
Figure 4
Job submission
When at least one master node is available in the network it is possible to submit a new job. To submit a job, you must write you own application that configures the job and submits it.
As an example, the canonical WordCount application will be considered.
Application environment setup
In order to correctly run your application, you must setup the application environment. First, theP2PMapReduce.jar file and all the libraries inside the ‘lib’ directory must be added to your application class path. Second, the ‘conf’ directory must be copied in your application working directory, calling it<application_working_dir>.
Writing Mapper and Reducer classes
To specify map and reduce functions you need to write two classes that implement respectively the Mapper and the Reducer abstract classes, as in Apache Hadoop.
In the following, we report the code of the map and reduce classes used for the WordCount example, WordCountMapper and WordCountReducer:
public class WordCountMapper extends Mapper<Long, String, StringWritable, IntWritable> { private static final String separator = " \n\t\r\b\f\"\'\\.,;:'()-/[]{}+*=!?<>&|%"; private final static IntWritable ONE = new IntWritable(1); @Override public void map(Long key, String value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value, separator); while (tokenizer.hasMoreTokens()) { StringWritable word = new StringWritable(tokenizer.nextToken()); context.write(word, ONE); } } } public class WordCountReducer extends Reducer<StringWritable, IntWritable, StringWritable, IntWritable> { @Override public void reduce(StringWritable key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator it = values.iterator(); while (it.hasNext()) { IntWritable value = it.next(); sum += value.getValue(); } context.write(key, new IntWritable(sum)); } }
Once you have written the map and reduce classes, you must package them into a single jar file, calling it <job.jar>, together with all the other classes they require.
Job configuration
The configuration of a MapReduce job is very similar to the Apache Hadoop Job configuration, except that some different classes must be used instead. This is an example of a job configuration:
Job job = new Job(); job.setJobName("Word Count"); job.setInputFormatClass(TextFileInputFormat.class); job.setOutputFormatClass(TextFileOutputFormat.class); TextFileInputFormat.addLocalInputPath(job, "<path in local fs to input file or directory>"); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(StringWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(StringWritable.class); job.setOutputValueClass(IntWritable.class); job.setJar("<path in local fs to <job.jar> file>"); job.setNumReduceTasks(3);
Instead the hadoop TextInputFormat and TextOutputFormat we have TextFileInputFormat andTextFileOutputFormat. In particular, the TextFileInputFormat.addLocalInputPath method must be used to provide the path, on the local file system, to the job input files or directories.
Submit a job
Differently from Hadoop, to submit a Job you must create an UserNode instance.
This is the simple code that you must write to submit a Job through a UserNode:
UserNode userNode = UserNode.createUserNode(); userNode.submit(job);
Otherwise you can use the UserNode.submit() method that take a Runnable object as second parameter:
UserNode userNode = UserNode.createUserNode(); Runnable actionAtCompletion = <runnable creation code> userNode.submit(job,actionAtCompletion);
This allows you to specify an action that must be executed after the job has been successfully completed. In this way you can also construct a simple chain of jobs by submitting the next job with all the previous job output files as input files, like this:
UserNode userNode = UserNode.createUserNode(); Job firstJob = <firstJob creation code>; Runnable firstJobCompletionAction = new Runnable(){ public void run() { Job secondJob = <second_job_creation_code>; FileSystemView fsv = userNode.getFileSystemView(); File jobOutputDir = fsv.createFile(firstJob.getConfiguration().get(MRJobConfig.JOB_OUTPUT_DIR_ATTR)); for (File outputFile : jobOutputDir.listFiles()) { TextFileInputFormat.addLocalInputPath(secondJob, outputFile.getPath()); } Runnable secondJobCompletionAction = <runnable creation code>; userNode.submit(job, secondJobCompletionAction); } } userNode.submit(job, firstJobCompletionAction);
Job execution
During job execution, the user node receives updates by the primary master and by the network module in case of node failures. On each reduce task completion, the user node is notified by the primary master and starts the retrieval of the reduce output in a local output directory named like this: “<application_working_dir>/ftp/<job_id>/output”.
If the job is successfully completed, all the reduce output files will be located in this directory.
Download
The current P2P-MapReduce package (1.0, dated 26th of September 2011) can be downloaded here:
- P2P-MapReduce framework
- TAR.GZ (7.6MB)
Copyright (C) 2008-2011 University of Calabria – Dept. of Electronics, Computer Science and Systems
How to cite
People
This project was designed and developed by Francesco De Luca, Fabrizio Marozzo, Domenico Talia and Paolo Trunfio.
For comments and suggestions please use the form below.