Blog

Howto: Hadoop Streaming with Ruby and Wukong

Introduction

It’s been a while since Google introduced their MapReduce framework for distributed computing on large data sets on clusters of computers. Paradigma Labs was thinking of trying Apache Hadoop to run this kind of tasks, so it was the proper choice to run some web scraping we have in our hands. I was the designated developer for the task, and my love for Ruby on Rails led me to give Hadoop Streaming a try so I could avoid Java to write the scripts. Many will agree on the virtues of Ruby, specially considering gems like Mechanize for web scraping and Rails gems like ActiveResource for accessing REST web services and ActiveRecord for ORM with a database like MySQL.

All the howto steps after the jump.

The first step is to develop your Ruby script following the MapReduce model. Hadoop Streaming is very powerful but it is also complex, so I recommend you use the Wukong Ruby gem as an abstraction layer. You can write your map-reduce methods easily following the Wukong tutorial.
This is the example they use in there, to count how often each word appears in a text:

word_count_example.rb

require 'wukong'
module WordCount
  class Mapper < Wukong::Streamer::LineStreamer
    # Emit each word in the line.
    def process line
      words = line.strip.split(/\W+/).reject(&:blank?)
      words.each{|word| yield [word, 1] }
    end
  end

  class Reducer < Wukong::Streamer::ListReducer
    def finalize
      yield [ key, values.map(&:last).map(&:to_i).sum ]
    end
  end
end

Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
).run # Execute the script

Hadoop Setup

We will use Linux in all the hosts. In our case, one host will be the master (as namenode and jobtracker) and the rest will be the slaves (as datanodes and tasktrackers). Unless indicated, ALL the steps must be repeated in all the hosts.

These steps work for some specific versions and environments, please see the end of the post for a list of links with further information and troubleshooting (unfortunately, the system is complex and many things can go wrong).

To avoid messing with other applications running in the cluster hosts, we set everything just for a hadoop system user.

AS ROOT OR USING SUDO:

Install necessary packages

apt-get install rsync openssh-client openssh-server

If your hosts don’t have accessible names, you can edit your /etc/hosts file and add the mapping of names to IP addresses. For example, if you want to add a host with a name host1 and an IP address 1.2.3.4, add this line to /etc/hosts

host1 1.2.3.4

Create hadoop user

adduser hadoop

Create the hadoop folders and make hadoop their owner:

mkdir /usr/local/hadoop
mkdir /usr/local/hadoop-datastore
chown hadoop:hadoop /usr/local/hadoop
chown hadoop:hadoop /usr/local/hadoop-datastore

Login AS THE HADOOP USER that we have just created:

su - hadoop

Set up ssh logins without a passphrase from the master to the slaves (only in the MASTER host):
Generate keys:

ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

Add the public key: run this for every slave, replacing ‘hostname’ with the name of the slave:

ssh hadoop@hostname "echo `cat ~/.ssh/id_rsa.pub` >> ~/.ssh/authorized_keys"

Download and install Java JRE

Download a Java JRE. In my case, the version was jre-6u25-linux-x64.bin

Add user executable permission to the installer file:

chmod 744 jre-6u25-linux-x64.bin

Run the installer (this will put the files in /home/hadoop/jre1.6.0_25 )

./jre-6u25-linux-x64.bin

Download and install Hadoop

Download Hadoop. In my case, the version was hadoop-0.21.0.tar.gz

Unpack the file and copy the contents to the installation directory:

tar -xzf hadoop-0.21.0.tar.gz
cp -R hadoop-0.21.0/* /usr/local/hadoop

Download and install RVM

bash < <(curl -s https://rvm.beginrescueend.com/install/rvm)

If necessary, add this line to the end of your /home/hadoop/.profile , to load RVM into the session:

[[ -s "/home/hadoop/.rvm/scripts/rvm" ]] && source "/home/hadoop/.rvm/scripts/rvm"  # This loads RVM into a shell session.

If it worked, this should output “rvm is a function”:

type rvm | head -1

Install Ruby 1.9.2 and make it the default

rvm install 1.9.2

rvm --default use 1.9.2

This should output something like “/home/hadoop/.rvm/rubies/ruby-1.9.2-p180/bin/ruby”

which ruby

Configure Hadoop

File /usr/local/hadoop/conf/hadoop-env.sh

Replace this:

# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.6-sun

With this:

# The java implementation to use.  Required.
export JAVA_HOME=/home/hadoop/jre1.6.0_25

And add the RVM line at the end:

[[ -s "/home/hadoop/.rvm/scripts/rvm" ]] && source "/home/hadoop/.rvm/scripts/rvm"  # This loads RVM into a shell session.
File /usr/local/hadoop/conf/core-site.xml

Replace ‘mastername’ with the name of the master host:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/local/hadoop-datastore/hadoop-${user.name}</value>
    <description>A base for other temporary directories.</description>
  </property>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://mastername:54310</value>
    <description>The name of the default file system.  A URI whose
      scheme and authority determine the FileSystem implementation.  The
      uri's scheme determines the config property (fs.SCHEME.impl) naming
      the FileSystem implementation class.  The uri's authority is used to
      determine the host, port, etc. for a filesystem.</description>
  </property>
</configuration>
File /usr/local/hadoop/conf/hdfs-site.xml

Replace ‘replication_number’ with the number of replications you want for the HDFS, in our case we used 3 for a cluster with 1 master host and 3 slave hosts:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>replication_number</value>
    <description>Default block replication.
      The actual number of replications can be specified when the file is created.
      The default is used if replication is not specified in create time.
    </description>
  </property>
</configuration>
File /usr/local/hadoop/conf/mapred-site.xml

Replace ‘mastername’ with the name of the master host, ‘map_number’ with 10x the number of slaves and ‘reduce_number’ with 2x the number of slaves.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>mapreduce.jobtracker.address</name>
    <value>mastername:54311</value>
    <description>The host and port that the MapReduce job tracker runs
      at.  If "local", then jobs are run in-process as a single map
      and reduce task.
    </description>
  </property>
  <property>
    <name>mapred.local.dir</name>
    <value>${hadoop.tmp.dir}/mapred/local</value>
    <description>Determines where temporary MapReduce data is written. It also may be a list of directories.
    </description>
  </property>
  <property>
    <name>mapred.map.tasks</name>
    <value>map_number</value>
    <description>As a rule of thumb, use 10x the number of slaves (i.e., number of tasktrackers).
    </description>
  </property>
  <property>
    <name>mapred.reduce.tasks</name>
    <value>reduce_number</value>
    <description>As a rule of thumb, use 2x the number of slave processors (i.e.,number of tasktrackers).
    </description>
  </property>
</configuration>

Hadoop bug fixing

In case the bug #6953 in Hadoop has not been fixed yet in your version, you will need to edit these 2 files:

File /usr/local/hadoop/bin/hdfs-config.sh
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# included in all the hdfs scripts with source command
# should not be executed directly

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

export HADOOP_HDFS_HOME="${HADOOP_HDFS_HOME:-$bin/..}"

if [ -d "${HADOOP_COMMON_HOME}" ]; then
  . "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh
elif [ -d "${HADOOP_HOME}" ]; then
  . "$HADOOP_HOME"/bin/hadoop-config.sh
elif [ -e "${HADOOP_HDFS_HOME}"/bin/hadoop-config.sh ]; then
  . "$HADOOP_HDFS_HOME"/bin/hadoop-config.sh
else
  echo "Hadoop common not found."
  exit
fi
File /usr/local/hadoop/bin/mapred-config.sh
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# included in all the mapred scripts with source command
# should not be executed directly

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

export HADOOP_MAPRED_HOME="${HADOOP_MAPRED_HOME:-$bin/..}"

if [ -d "${HADOOP_COMMON_HOME}" ]; then
  . "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh
elif [ -d "${HADOOP_HOME}" ]; then
  . "$HADOOP_HOME"/bin/hadoop-config.sh
elif [ -e "${HADOOP_MAPRED_HOME}"/bin/hadoop-config.sh ]; then
  . "$HADOOP_MAPRED_HOME"/bin/hadoop-config.sh
else
  echo "Hadoop common not found."
  exit
fi
Master and slaves name specification

Only in the MASTER host:
Edit the file /usr/local/hadoop/conf/masters and replace ‘localhost’ with the name of the master host
Edit the file /usr/local/hadoop/conf/slaves and replace ‘localhost’ with the names of the slave hosts, one per line

Install the gems

Wukong
gem install wukong

Only in the MASTER host:

If the pull request #4 from Wukong has not been fixed yet in your version, edit the file /home/hadoop/.rvm/gems/ruby-1.9.2-p180/gems/wukong-2.0.0/lib/wukong/script/hadoop_command.rb and replace this part:

[...]
def execute_hadoop_workflow
  # Input paths join by ','
  input_paths = @input_paths.join(',')
  #
  # Use Settings[:hadoop_home] to set the path your config install.
  hadoop_commandline = [
    hadoop_runner,
    "jar #{options[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar",
    hadoop_jobconf_options,
[...]

with this:

[...]
def execute_hadoop_workflow
  jar_path = File.join('contrib', 'streaming', 'hadoop-*streaming*.jar')
  jar_path = File.join('mapred', jar_path)

  # Input paths join by ','
  input_paths = @input_paths.join(',')
  #
  # Use Settings[:hadoop_home] to set the path your config install.
  hadoop_commandline = [
    hadoop_runner,
    "jar #{File.join(options[:hadoop_home], jar_path)}",
    hadoop_jobconf_options,
[...]
Rest of the gems

Remember to install all the necessary gems for your script in all the master and slave hosts. You can run “gem install name_of_the_gem” for every gem in every host, but I strongly recommend you use Bundler in order to make it easy to manage all the gems with their corresponding versions.

Run the script (finally!)

ALL THESE STEPS ARE RUN ONLY IN THE MASTER HOST

Remember that you have many useful logs in /usr/local/hadoop/logs (specially the stderr and stdout files from each job in /usr/local/hadoop/logs/userlogs )

Clean temporary files and storage (BE CAREFUL, THIS WILL ERASE THE DATA IN THERE):

rm -Rf /tmp/hadoop* ; rm -Rf /usr/local/hadoop-datastore/*

Format the namenode:

/usr/local/hadoop/bin/hadoop namenode -format

Start the HDFS:

/usr/local/hadoop/bin/start-dfs.sh

Wait around 5 minutes till you see the updates in the files from /usr/local/backups/hadoop/logs

Upload the file with your input data to the HDFS. For example, if your file is in /home/hadoop/hadoop_input/input_file.txt

/usr/local/hadoop/bin/hadoop fs -put /home/hadoop/hadoop_input/input_file.txt /user/hadoop/hadoop_input/input_file.txt

Start the mappers-reducers:

/usr/local/hadoop/bin/start-mapred.sh

Run your Ruby Wukong script. For example, if the script is the same as the example at the beginning of this post and it is located in /home/hadoop/word_count_example.rb and you want the output folder in the HDFS to be /user/hadoop/hadoop_output

export HADOOP_HOME="/usr/local/hadoop"
ruby /home/hadoop/word_count_example.rb --run hadoop_input/input_file.txt hadoop_output

Allow “some” time for the task to run :-)

You can show the contents of the output folder with:

/usr/local/hadoop/bin/hadoop dfs -ls /user/hadoop/hadoop_output

You can show the contents of a file. For example, if the file in the HDFS is /user/hadoop/hadoop_output/part1.txt

/usr/local/hadoop/bin/hadoop dfs -cat /user/hadoop/hadoop_output/part1.txt

For more operations on the HDFS, check the Hadoop File System Shell Guide.

Stopping Hadoop

ALL THESE STEPS ARE RUN ONLY IN THE MASTER HOST

Stop the HDFS:

/usr/local/hadoop/bin/stop-dfs.sh

Stop the mappers-reducers:

/usr/local/hadoop/bin/stop-mapred.sh

Links for further information and troubleshooting

General

Hadoop Streaming
Wukong
Hadoop File System Shell Guide

Installation help

Hadoop Cluster Setup
Wukong Install Notes

Bugs and fixes

Hadoop bug #6953
Wukong pull request #4

MapReduce

Google MapReduce tutorial
MapReduce on Wikipedia
Apache Hadoop

Ruby gems for web scraping

Mechanize
Rails ActiveResource
Rails ActiveRecord

Sweeties!

Here there is a treat for the brave readers who have reached this far: Google Lectures: Cluster Computing and MapReduce with all the lecture videos, slides and related articles. First video here for your convenience:

About the authors

Oscar Marín Miró

3 comentarios

  1. Well I really liked reading it. This subject offered by you is very helpful for good planning.

  2. I am glad you liked it. There is a couple of things that I would like to mention:

    1.- In the section: “Run the script (finally!)”, the command to clean temporary files and storage must be run in the slaves and in the master, not only in the master. Also, beware of starting Hadoop when there is another Hadoop process running (you can check for processes with the command ‘ps axu | grep hadoop’).

    2.- In case you are running a task that has frequent failures (such as web scraping, in our case), add these 2 properties to /usr/local/hadoop/conf/mapred-site.xml (replacing ‘allowed_failures’ with the number of attempts before killing the job):

    <property>
    <name>mapred.map.max.attempts</name>
    <value>allowed_failures</value>
    <description>Maximum number of attemps per task before killing the job.</description>
    </property>
    <property>
    <name>mapred.reduce.max.attempts</name>
    <value>allowed_failures</value>
    <description>Maximum number of attemps per task before killing the job.</description>
    </property>

    Check this for more details about Hadoop fault tolerance: http://devblog.factual.com/practical-hadoop-streaming-dealing-with-brittle-code

  3. Sally May says:

    Greetings from Brisbane. Thanks for the info. Very useful for my university assignment :)

Escribe un comentario