Saturday, December 25, 2010

How to dynamically assign reducers to a Hadoop Job in runtime

When we were working on setting our production jobs we came to a point where we needed a way to dynamically assign reducers to a Hadoop Job in runtime. Following is what we have implemented as of today.  If you are reading the blog and have a better idea to share then please leave a comment.
PROBLEM STATEMENT
We have a lot of cases where same produciton job works on different sizes of data sets, i.e. for an hour the job can process from 35GB to 400Gb data. We wanted to change the number of reducers depending on the data sets, in runtime, and also we did not want a job to hog all reducers because the grid was shared between jobs from different teams. All jobs were equally important.
SOLUTION
This is something that hadoop MR framework cannot do for you. We found that hive came up with a solution by providing a property that limits the maximum number of bytes that will be processed by one reducer hence if this property is set to 1GB and data size is 1.2 GB data then 2 reducers will be assigned to the job in runtime; if data size 10 GB data then 10 reducers will be assigned to the job in runtime.  This works great for hive because it is designed to work agnostic of the dataset. Also a big job in hive can still take all the reducers in the grid. Since we knew our data very well and also did not want a single job to take all reducers hence we decided to implement our own solution that solved our problem.
The solution is very simple. We asked all the job owners to run their jobs in the sandbox on a fixed set of data and provide us either of the following
1.       Required: Input size of the data in Mb
2.       Required: Output size of the data in Mb
3.       Required: Is the reducer calculation CPU bound or I/O bound
4.       Optional: A decimal number, the multiplier,  to fine tune the number of reducers (if not provided then 1 will be used)

OR

1.       Required: Provide fixed number of reducers (they have to be less than TotalReducersOnTheGrid/2)

For CPU bound jobs, total number of reducers were calculated as
Total Reducers = Minimum((InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
i.e. total number of reducers should be equal to either the Input data size divided by the HDFS block size multiplied by the multiplier or half the total number of reducers available in the grid, whichever is smaller.

For I/O bound jobs the total number of reducers were calculated using the following formula
Total Reducers = Minimum((OutputDataSize/InputDataSize) * (InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
The concept of multiplier was introduced to optimize the jobs when the generic formula was not enough to optimize the number of reducers for the job. We found that some jobs always required an exact number of reducers regardless of the size of data set hence we also provided the job owners a way to specify that.
This pretty much solved most of our problems.


Friday, December 17, 2010

Hadoop cluster at Ebay

I am always curious to know how other companies are installing Hadoop clusters. How are they using its ecosystem. Since Hadoop is still relatively new, there are no best practices. Every company is implementing what they think is the best infrastructure for the Hadoop Cluster.

At Hadoop NYC 2010 conference, ebay showcased there implementation of Hadoop production cluster. Following are some tidbits on ebay's implementation of Hadoop.

- JobTracker, Namenode, Zookeeper, HBase Master are all enterprise nodes running in Sun 64 bit architecture. They are running red hat linux with 72GB Ram and 4TB disks.
- There are 4000 datanodes, each running cent OS with 48 GB RAM and 10TB space
- Ganglia and Nagios are used for monitoring and alerting. Ebay is also building a custom solution to augment them.
- ETL is done using mostly Java Map Reduce programs
- Pig is used to build data pipelines
- Hive is used for AdHoc queries
- Mahout is used for Data Mining

They are toying with the idea of using Oozie to manage work flows but haven't decided to use it yet.

It looks like they are doing all the right things.

Friday, December 10, 2010

ERROR: hdfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Bad connect ack with firstBadLink

While running a job once I got the following exception


10/12/10 21:09:05 INFO hdfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Bad connect ack with firstBadLink 10.1.73.148:50010
10/12/10 21:09:05 INFO hdfs.DFSClient: Abandoning block blk_3623545154924652323_87440
10/12/10 21:09:11 INFO hdfs.DFSClient: Exception in createBlockOutputStream java.net.ConnectException: Connection refused
10/12/10 21:09:11 INFO hdfs.DFSClient: Abandoning block blk_-4726571439643867938_87441\


REASON
The error contains the IP address (10.1.73.148) of the tasktracker/datanode machine for which the exception is thrown. The exception is thrown because the datanode daemon is not running on that machine; you can check this by logging into this machine, lets use 10.1.73.148 in the example, and running command
ps -eaf | grep "DataNode" | grep -v "grep"
If no lines are returned then this means that datanode daemon is not running on 10.1.73.148.

What happened is that machine 10.1.73.148 contain a data block that is required for the job that you are trying to run. If this block is replicated on other machines and those machines are running datanode daemons then this is not a problem, Hadoop will get the data block from some other machine and continue the job but if for any reason the data block is not available on any other node then your job will fail.


RESOLUTION
Logon to 10.1.73.148 and run the following command
hadoop-daemon.sh start datanode
The above command should start the datanode daemon on 10.1.73.148. You can double check this my running command
ps -eaf | grep "DataNode" | grep -v "grep"
It should return 1 line

Thats it. Try running the job again. It should not throw exception anymore

How to see table definition (extended) in Hive

To see table definition in Hive, run command
describe table name;

To see more detailed information about the table, run command
describe extended [tablename];

Thursday, December 9, 2010

ERROR: java.lang.IllegalArgumentException: Name cannot be have a '' char

ERROR
Sometimes your Hadoop MapReduce job can fail with the following exception
java.lang.IllegalArgumentException: Name cannot be have a '' char
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:149)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:175)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:352)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:337)
at learnhadoop.MosMapper.map(MosMapper.java:38)
at learnhadoop.MosMapper.map(MosMapper.java:14)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)


REASON
This happens when you use MultipleOutputs class in your Hadoop Job and try to name the output file with non-alphanumeric characters (like : or - etc.)
i.e. <MultipleOutput Object>.write(KEY,VALUE,"Fruit::Mango") will throw this error because you are using colons in the output file name



RESOLUTION
Try to use only alphanumeric characters in the output filename and if you absolutely have to use some kind of delimiter, stick with dot (.)
i.e. <MultipleOutput Object>.write(KEY,VALUE,"Fruit..Mango")  will not throw this error

MultipleOutputs performance issues

MultipleOutputs class in Hadoop API provides a very neat way of separating disparate data but it comes with a performance hit.

I found that some of my production jobs slowed down after I refactored by code to use MultipleOutputs class. I did some benchmarking to ensure that its not the cluster but MultipleOutputs class that slowed my processes down.

I setup a small cluster with just 6 machines and some data
  • 1 machine running JobTracker
  • 1 machine running Namenode
  • 4 machines running Datanodes and tasktracker
  • Input data 8Gb
All machines were of same size and nothing else was running on them during benchmarking. 

Test 1: Mapper without MultipleOutputs
I created a mapper that 
  • Reads a file line by line
  • Creates output file name on the fly by taking first 3 characters of the hash of the input line. This information was not used to write output (because we are not using MultipleOutputs yet).  
  • Write the output key as input line and outputValue as NullWritable
I ran it 5 times and the median runtime was 4m 40s. 

Test 2: MultipleOutputs Mapper
Then I modified the above mapper to use the output file name and write data out using MultipleOutputs. I ran this 5 times and the median runtimes was 5m 48s. 

Based on this benchmark I found that MultipleOutputs slows down a job by almost 20%. 

This happens because more small files are created when you use MultipleOutputs class.
Say you have 50 mappers then assuming that you don't have skewed data, Test1 will always generate exactly 50 files but Test2 will generate somewhere between 50 to 1000 files (50Mappers x 20TotalPartitionsPossible) and this causes a performance hit in I/O. In my benchmark, 199 output files were generated for Test1 and 4569 output files were generated for Test2. 


Wednesday, December 8, 2010

Extended FileUtil class for Hadoop



While writing production jobs in Hadoop I identified following tasks that were required for some MapReduce jobs but were not readily available in Hadoop 0.20 API

  1. Get size of a file or directory in HDFS
    • We require this to dynamically change the number of reducers used for a job by looking at the amount of input data that the job will process
  2. Recursively remove all zero byte files from a directory in HDFS. 
    • This happens a lot when you use MultipleOutput class in reducer (impact is less when used in Mapper). A lot of times the reducer does not gets any record for which a MutipleOutput file needs to be created hence it creates a 0 byte files. These files have no use, its best to remove them after the job is finished. 
  3. Recursively get all subdirectories of a directories 
  4. Recursively get all files within a directory and its sub directories
    • By default, as of now, when Hadoop job is run, it only processes the immediate files under the input directory, any files in the subdirectories of the input path are not processed hence if you want your job to process all files under the subdirectories also then its better to create a comma delimited list of all files within the input path and submit it to the job.  

All the above tasks were implemented in the ExtendedFileUtil class. Source code can be found at
https://sites.google.com/site/hadoopandhive/home/ExtendedFileUtil.java?attredirects=0&d=1

The wrapper class on link http://hadoop-blog.blogspot.com/2010/12/java-templatesstubs-for-mapper-reducer.html contains an example of how to use ExtendedFileUtil class

How to combine small files in Hadoop

Currently Hadoop is not built to work with a lot of small files. The following are the architectural limitations of hadoop that causes this problem

  • In HDFS, all file metadata is stored in memory of the Namenode (which is most often a single big powerful machine). This means "more files=more memory". There is a limitation on the amount of memory you can add to a machine and that limits the amount of files that can be stored in Hadoop. 
  • Namenode is used heavily for all jobs that run on Hadoop. More data in the memory can slow down Namenode and might end of slowing down the job execution time (it might be insignificant for long jobs though)
  • There is a setup time required by Hadoop to run a mapper. By default, Hadoop will start minimum 1 mapper for every file in the input directory. Till Hadoop 0.20, hadoop does not lets us choose the number of mappers you want to run hence if your file is small, say 100K, then more time is wasted in Hadoop setup than actually processing the data.
There are couple different solutions to solve this problem
  • Keep an eye on all data that is entered into HDFS  from other data sources. Try to optimize the processes, that push data to HDFS, to create files of size 128Mb (block size of HDFS). 
  • If you have map reduce pipeline where output of a map reduce job become input of the next map reduce job then try to use reducers wisely in your jobs. If suppose your job uses 100 reducers and outputs files of size 10 MB each, and if the reducer computations are not CPU bound, then try to run the same job with less reducers (7-10). Remember - Hadoop creates one file for every reducer run even if reducer did not output any data. 
  • If all else fails then try to combine small files into bigger files. Media6degrees has come up with a faily good solution to combine small files in Hadoop. You can use their jar straight out. See here for more details http://www.jointhegrid.com/hadoop_filecrush/index.jsp

Java templates/stubs for Mapper Reducer and Wrapper classes

A lot of times I want to test a concept in Hadoop that requires me to quickly create a small job and run it. Every job contains minimum 3 components

  • Mapper Class
  • Reducer Class
  • Wrapper Class
The following are the templates I use to generate empty templates, just replace variable  <YOUNAME> with your class name 


-----------------------------------------------------------------------------------------------------------------------------------
MAPPER
-----------------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

/* In case you are using Multiple outputs */
//import org.apache.hadoop.io.NullWritable;
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Mapper extends Mapper<LongWritable, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    @Override
    public void setup(Mapper.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);
    }

    @Override
    public void map(LongWritable key, Text values, Context context)
            throws IOException, InterruptedException {
    }

    @Override
    public void cleanup (Mapper.Context context)throws IOException, InterruptedException {
        
        /* In case you are using Multiple outputs */
        //contextMulti.close();
    }
}

-----------------------------------------------------------------------------------------------------------------------------------
REDUCER
-----------------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/* In case you are using Multiple outputs */
//import org.apache.hadoop.io.NullWritable;
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Reducer extends Reducer<Text, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    @Override
    public void setup(Reducer.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
    }

    @Override
    public void cleanup(Reducer.Context context) {
        /* In case you are using Multiple outputs */
        //contextMulti.close();
    }
}
-----------------------------------------------------------------------------------------------------------------------------------
WRAPPER
This class uses following 2 classes 
https://sites.google.com/site/hadoopandhive/home/ExtendedFileUtil.java?attredirects=0&d=1
https://sites.google.com/site/hadoopandhive/home/StringUtil.java?attredirects=0&d=1
-----------------------------------------------------------------------------------------------------------------------------------



import StringUtil;

import ExtendedFileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;
import java.text.ParseException;


public class <YOUNAME> extends Configured implements Tool, Constants {
    private Configuration conf = null;
    private Job job = null;
    private String inputDirList = null;
    private String outputDir = null;
    private String[] filesToProcess = null;
    private int totalReducers = 0;
    private int jobRes = 0;
    private ExtendedFileUtil fileUtil = new ExtendedFileUtil();


    public static void main(String[] args) throws Exception {
        <YOUNAME> ob = new <YOUNAME>();
        int jobRes = ToolRunner.run(ob, args);        
    }


    public int run(String[] args)
            throws ClassNotFoundException, IOException, InterruptedException, ParseException {
        jobRes = readCmdArgs(args);
        if (jobRes == 0) {
            jobRes = readConfig();
        }
        if (jobRes == 0) {
            jobRes = runMrJob();
        }
        return jobRes;
    }


    private int readCmdArgs(String[] args) {
        if (args.length == 2) {
            inputDirList = args[0];
            outputDir = args[1];
        } else {
            printUsage();
            System.exit(1);
        }
        return 0;
    }


    private int readConfig() throws IOException, InterruptedException, ClassNotFoundException {
        conf = new Configuration();
        //conf.set("SET_NEW_CONFIG_NAME", SET_NEW_CONFIG_VALUE);
        job = new Job(conf);
        if ((job.getJar() == null) || (job.getJar() == "")) {
            job.setJarByClass(<YOUNAME>.class);
        }
        return 0;
    }


    private int runMrJob()
            throws IOException, InterruptedException, ClassNotFoundException {
        filesToProcess = fileUtil.getFilesOnly(inputDirList, true);
        job.setJobName("<YOUNAME>");
        TextInputFormat.addInputPaths(job, StringUtil.arrayToString(filesToProcess, ","));
        TextOutputFormat.setOutputPath(job, new Path(outputDir));
        System.out.println("Input Dir: " + inputDirList);
        System.out.println("Output Dir: " + outputDir);


        job.setMapperClass(<YOUNAME>Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        job.setReducerClass(<YOUNAME>Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        totalReducers = Math.round((fileUtil.size(inputDirList) / 134217728) * 0.1F);
        totalReducers = Math.max(totalReducers, 1);
        job.setNumReduceTasks(totalReducers );
        deleteOutputDirectory(outputDir);
        jobRes = job.waitForCompletion(true) ? 0 : 1;
        deleteLogsDirectory();
        fileUtil.removeAllZeroByteFiles(outputDir);
        return 0;
    }




    private int deleteOutputDirectory(String outputDir) throws IOException {
        fileUtil.removeHdfsPath(new Path(outputDir).toString());
        return 0;
    }


    private int printUsage() {
        System.out.println("USAGE: <YOUNAME> <inputDirList> <outputDir>");
        return 0;
    }


    private int deleteLogsDirectory()
            throws IOException {
        Path outputLogPath = new Path(new Path(outputDir).toString() + "/" + "_logs");
        fileUtil.removeHdfsPath(outputLogPath.toString());
        return 0;
    }
}


How configure Secondary namenode on a separate machine

If you have installed cloudera's hadoop distribution (CDH2) then you must have noticed that running command start-dfs.sh starts an instance of SecondaryNameNode process on all the datanodes. This is happening due to the way SecondaryNameNode startup is defined in file bin/start-dfs.sh. 


Scenario 1 : If you want to run your SecondaryNameNode on some other server (say sn.jeka.com) instead of the datanodes then do the following 

1. Logon to JobTracker (I am going to JobTracker because I have set variable HADOOP_MASTER in file ${HADOOP_HOME}/conf/hadoop-env.sh to point to the JobTracker hence any changes made there will be synched to your cluster) 
  • Create a new file ${HADOOP_HOME}/conf/secondarynamenode and add following line
    sn.jeka.com
  • In file ${HADOOP_HOME}/bin/start-dfs.sh, replace line
    "$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode
    with
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts secondarynamenode start secondarynamenode;exit"
  • In file ${HADOOP_HOME}/bin/stop-dfs.sh, replace line
    "$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters stop secondarynamenode
    with
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts secondarynamenode stop secondarynamenode;exit"

2.  Logon to Namenode and execute the following commands
  • ${HADOOP_HOME}/bin/stop-dfs.sh; ${HADOOP_HOME}/bin/start-dfs.sh; ${HADOOP_HOME}/bin/stop-dfs.sh; ${HADOOP_HOME}/bin/start-dfs.sh
You have to start and stop twice because in the first start, the code will be synched from JobTracker 
Thats! it. You secondary name node process will now start on the designated server, i.e. sn.jeka.com and not on the datanodes. 

Scenario 2 : If you want to run your SecondaryNameNode on the NameNode (say nn.jeka.com) itself then do the following 
Follow same steps as Scenario 1 except that replace all intances of sn.jeka.com to nn.jeka.com

Scenario 3 : If you do not want to run secondary name node at all then do the following
Follow same steps as Scenario 1 except that instead of replacing lines, delete them. 


Tuesday, December 7, 2010

How to control a Hadoop job using the web interfaces provided by the Job Tracker and Name Node

Hadoop provides a great way to manage your jobs and operating on HDFS using the web interface by setting the property webinterface.private.actions to true in file src/core/core-default.xml. 


When set to true, the web interfaces of JobTracker and NameNode may contain  actions, such as kill job, delete file, etc., that should  not be exposed to public.


Note: Enable this option only if the web interfaces for JobTracker and Name node are reachable by those who have the right authorizations. 

<property>
  <name>webinterface.private.actions</name>
  <value>false</value>
</property>

How to limit access to Job Tracker and Name node in Hadoop.

Hadoop provides properties that can be used to create an include or exclude list of hosts that are allowed to access Job Tracker and Name node.

Property to to create an include list of hosts for JobTracker in file mapred-site.xml
<property>
  <name>mapred.hosts</name>
  <value></value>
  <description>Names a file that contains the list of nodes that may
  connect to the jobtracker.  If the value is empty, all hosts are
  permitted.</description>
</property>

Property to to create an exclude list of hosts for JobTracker in file mapred-site.xml
<property>
  <name>mapred.hosts.exclude</name>
  <value></value>
  <description>Names a file that contains the list of hosts that
  should be excluded by the jobtracker.  If the value is empty, no
  hosts are excluded.</description>
</property>


Property to to create an include list of hosts for NameNode in file hdfs-site.xml
<property>
  <name>dfs.hosts</name>
  <value></value>
  <description>Names a file that contains a list of hosts that are
  permitted to connect to the namenode. The full pathname of the file
  must be specified.  If the value is empty, all hosts are
  permitted.</description>
</property>

Property to to create an exclude list of hosts for NameNode in file hdfs-site.xml
<property>
  <name>dfs.hosts.exclude</name>
  <value></value>
  <description>Names a file that contains a list of hosts that are
  not permitted to connect to the namenode.  The full pathname of the
  file must be specified.  If the value is empty, no hosts are
  excluded.</description>
</property>

Hadoop distcp error: java.lang.NumberFormatException: For input string: ""

Hadoop provide distcp command to copy data between clusters.


ERROR
When running this command I got the following error
java.lang.NumberFormatException: For input string: ""
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
        at java.lang.Integer.parseInt(Integer.java:470)
        at java.lang.Integer.parseInt(Integer.java:499)
        at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:149)
        at org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(NameNode.java:164)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:81)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1448)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:67)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1476)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:197)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
        at org.apache.hadoop.tools.DistCp.setup(DistCp.java:997)
        at org.apache.hadoop.tools.DistCp.copy(DistCp.java:650)
        at org.apache.hadoop.tools.DistCp.run(DistCp.java:857)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.tools.DistCp.main(DistCp.java:884)

REASON
This error normally happens when the port number in source or destination hdfs URI is missing.


Example: Execution of the following command will give this error
hadoop distcp hdfs://hadoop1.jeka.com:9000/user/hadoop/jeka hdfs://hadoop2.jeka.com:/user/hadoop/jeka

It happened because port in the destination URL /hdfs://hadoop2.jeka.com:/user/hadoop/jeka is missing but the colon that separates 

RESOLUTION
Add port number 9000 after the colon, i.e. the destination URI will look like hdfs://hadoop2.jeka.com:9000/user/hadoop/jeka 

or remove the colon, distcp by default looks for port 9000, hence the destination URI will look like 
hdfs://hadoop2.jeka.com/user/hadoop/jeka 

Thursday, December 2, 2010

How to check if network is the bottleneck in your hadoop cluster

We had an issue with our hadoop cluster that after a network upgrade the jobs on the cluster started running very slowly. We knew it had to do something with the network upgrade.

On further investigation we found that all the mapper in the jobs are running fine, all the performance degradation happened on reducers. This pretty much proved the point that network is the bottleneck because most mappers jobs were rack-local (and it requires very limited to almost no use of network) but the reducer phase uses network heavily for sorting and shuffling of data before it can apply the reduce function.

Wednesday, December 1, 2010

Search engine for Hadoop content, you might find it better than google

There is an excellent site to get help online for Hadoop, after the Apache wiki. The website http://search-hadoop.com lets you search Hadoop and all its subprojects  like Pig, Hive, Hbase HDFS etc.

The website aggregates, indexes, and makes searchable all content repositories for all Apache Hadoop Top Level Project (TLP) sub-projects. It indexes data from
- User lists
- Dev Lists
- Apache Hadoop Jira
- Apache Wiki
- Hadoop source code

Its a very good website. I compared some search results on this site with google and bing results and found that, for hadoop , its easier to get answers on this site then from google or bing. The search results are very well laid. See it to believe it.

Error in starting Datanode - ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in

While playing around with Hadoop setup I found that after I did something, the datanodes will not start when I run script bin/start-dfs.sh.

Following were the steps I followed to troubleshoot and fix the issue

TO FIND THE PROBLEM
  1. Login to any one of the datanodes that is not starting up (say t1.mydomain.com)
  2. Get the value of variable HADOOP_LOG_DIR in file conf/hadoop-env.sh. Say the value is /home/jeka/runtime_hadoop_data/logs
  3. Look into datanode log file /home/jeka/runtime_hadoop_data/logs/hadoop-jeka-datanode-t1.mydomain.com.log. Following error was logged
    2010-12-01 18:57:39,115 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in /home/jeka/runtime_hdfs/datanode: namenode namespaceID = 1509057607; datanode namespaceID = 1781994419
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(DataStorage.java:233)
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:148)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:298)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.
    (DataNode.java:216)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:1283)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:1238)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(DataNode.java:1246)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.main(DataNode.java:1368)

REASON
  1. This issue happened right after I reformatted the namenode (and did not reformat datanodes). Everytime Namenode is formatted, Hadoop creates a unique namespaceID and places it in a file in the Namenode but since I did not formatted the datanodes hence datanodes still had the old namespaceID and hence the problem. 


SOLUTION 
The solution is to copy the new namespaceID from Namenode to Datanodes. Following are the steps
  1. Logon to the namenode (say nn.mydomain.com) 
  2. Stop DFS by running command bin/stop-dfs.sh
  3. Find the values of property dfs.name.dir and dfs.data.dir. This can be found in file conf/hdfs-site.xml. If missing then look for it in file src/hdfs/hdfs-default.xml. Say the value of these properties are /home/jeka/runtime_hdfs/namenode and /home/jeka/runtime_hdfs/datanode respectively
  4. Note the value of field namespaceID (this is the new Namenode namespaceID that we need to copy to all datanodesin file /home/jeka/runtime_hdfs/namenode/current/VERSION.

    In our case its 1509057607.

    For your reference, following are all the contents of this file
    #Wed Dec 01 19:05:31 UTC 2010
    namespaceID=1509057607
    cTime=0
    storageType=NAME_NODE
    layoutVersion=-18
  5. Now copy the new namespaceID,1509057607 in our case, to file /home/jeka/runtime_hdfs/datanode/current/VERSION on all datanodes by running the following command on shell prompt on Namenode

    for dn in $(cat ~/hadoop-0.20.2/conf/slaves);do ssh $dn "cat /home/jeka/runtime_hdfs/datanode/current/VERSION | sed 's/namespaceID=[0-9]*/namespaceID=1509057607/' > /home/jeka/runtime_hdfs/datanode/current/VERSION.temp; mv /home/jeka/runtime_hdfs/datanode/current/VERSION.temp /home/jeka/runtime_hdfs/datanode/current/VERSION";done

    This command will go to each and every datanode listed in file conf/slaves and change the namespaceID of the datanode to 1509057607
  6. Start DFS by running command bin/start-dfs.sh. Thats it!, all datanodes should be up and running now.


How to start and stop datanodes and tasktrackers without bringing down the cluster

Assuming that Hadoop and Java is set up on the nodes, following are the commands that can be used to start and stop datanodes and tasktrackers.

cd <hadoop_installation>/bin

- To start datanode
hadoop-daemon.sh start datanode

- To start tasktracker
hadoop-daemon.sh start tasktracker



- To stop datanode (remove the datanode from the cluster)
hadoop-daemon.sh stop datanode

- To stop tasktracker (remove tasktracker from the cluster)
hadoop-daemon.sh stop tasktracker

How to see the HDFS statistics of a Hadoop cluster

When you maintain a cluster then often you want to know the stats of HDFS, things like total capacity of HDFS, percentage of HDFS used and free, list of all data nodes that are active, list of all datanodes that are not active etc.

The following command provides this information
hadoop dfsadmin -report


If your hadoop cluster has a lot of datanodes (which most likely will be the case in a production environment) then this will be a huge report. You will have to create some type of summary report on your own.