Tuesday, November 30, 2010

For beginners, quickly install Hadoop 0.20 on Linux in cluster mode

Its very easy to set up a small Hadoop cluster on Linux for testing and development purposes. In this blog post I will demonstrate how to setup a small Hadoop 0.20 cluster in 10 easy steps. You will be able to set it up in less than an hour.

SCENARIO
In the cluster we have 5 machines as follows
#SERVERNAMEHADOOP COMPONENTSHORT NAME
1jt.mydomain.comJobtrackerjt
2nn.mydomain.comNamenodenn
3sn.mydomain.comSecondary Namenodesn
4tt1.mydomain.comTasktracker1tt1
5tt2.mydomain.comTasktracker2tt2


STEPS

1. Download Hadoop and Java to all machines 
See the INSTALL section on  http://hadoop-blog.blogspot.com/2010/11/how-to-install-standalone-hadoop-for.html to get more details. For the purpose of this document we will assume that hadoop is installed in directory /home/${USER}/hadoop-0.20.2 and java is installed in directory /home/${USER}/jdk1.6.0_22

2. Ensure that machines in the cluster can see each other
 Setup password less ssh between following machines 
  1. jt to nn 
  2. jt to sn
  3. jt to tt1
  4. jt to tt2
  5. nn to jt
  6. nn to sn
  7. nn to tt1
  8. nn to tt2
  9. sn to nn
  10. sn to jt

3. Set up the Namenode
On jt.mydomain.com, overwrite file /home/${USER}/hadoop-0.20.2/conf/core-site.xml with following lines
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://nn.mydomain.com/</value>
</property>
</configuration>

4. Set up path to Java, Master and Slave directories/files 
In Hadoop, JobTracker and Namenode are called Masters and tasktracker and datanodes are called slaves. Every slave runs both Datanode and Tasktracker.
On jt.mydomain.com, add following 3 lines to file  /home/${USER}/hadoop-0.20.2/conf/hadoop-env.sh
export JAVA_HOME=/home/${USER}/jdk1.6.0_22
export HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves
export HADOOP_MASTER=jt.mydomain.com:/home/${USER}/hadoop-0.20.2

    5. Set up Jobtracker
    On jt.mydomain.com, overwrite file /home/${USER}/hadoop-0.20.2/conf/mapred-site.xml with following lines
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>mapred.job.tracker</name>
        <value>jt.mydomain.com:9001</value>
      </property>
    </configuration>

    6. List Masters
    On jt.mydomain.com, overwrite file /home/${USER}/hadoop-0.20.2/conf/masters with following line 
    sn.mydomain.com

    7. List Slaves
    On jt.mydomain.com, overwrite file /home/${USER}/hadoop-0.20.2/conf/slaves with following lines
    t1.mydomain.com
    t2.mydomain.com

    8. Format Namenode
    Format the namenode by running the following command on nn.mydomain.com
    /home/${USER}/hadoop-0.20.2/bin/hadoop namenode -format


    9. Start DFS
    On nn.mydomain.com command prompt, run the following command to start HDFS daemon on Name node and data nodes and will also setup secondard name node
    sh /home/${USER}/hadoop-0.20.2/bin/start-dfs.sh

    10. Start MapReduce
    On jt.mydomain.com command prompt, run the following command to start MapReduce daemon on Jobtracker and tasktrackers.
    sh /home/${USER}/hadoop-0.20.2/bin/start-mapred.sh

    That's it!  Hadoop cluster is up and running now. 

    NOTES

    • The cluster is defined in files slaves and masters
    • You can use IP addresses instead of host names, i.e. 10.2.3.4 instead of jt.mydomain.com
    • After you execute #8 and #9, you will notice that all files that were updated in steps #3-7 on jt.mydomain.com are also updated on nn, sn, tt1 and tt2 machines. This happened because we set property HADOOP_MASTER  in step #4 to jt.mydomain.com. Setting this property means that use the config files on jt.mydomain.com as master files and sync them across all nodes that you find in the cluster.
    • Jobtracker WebUI should be up and running on http://jt.mydomain.com:50030/jobtracker.jsp
    • HDFS WebUI should be up and running on http://nn.mydomain.com:50070/dfshealth.jsp
    I also found another excellent tutorial, better than my blog post for sure. 

    Error in starting jobtracker or namenode: rsync: connection unexpectedly closed (0 bytes received so far) [receiver]

    After setting the hadoop cluster, it is possible that you get error rsync: connection unexpectedly closed (0 bytes received so far) [receiver] or unexplained error (code 255) at io.c(463) [receiver=2.6.8] when you try to start the mapred daemon (start-mapred.sh) or dfs daemon (start-dfs.sh) or all daemons (start-all.sh) . 


    This means that for something is wrong in the ssh connections in the cluster. For rsync (or hadoop in cluster) to work, the you should be able to ssh between the following hadoop components without any password or prompts.
    - Jobtracker to Tasktrackers
    Jobtracker to Namenode

    Namenode to DataNodes
    Namenode to Jobtracker
    Datanodes to NameNode
    - Tasktrackers to Jobtracker


    Once ssh is working between the above 6 directions, these errors should go away.

    EXAMPLE
    Supppose a hadoop cluster is composed of the following machines
    j.jeka.com : Jobtracker
    n.jeka.com: Name node
    t1.jeka.com: Datanode and Tasktracker
    t1.jeka.com: Datanode and Tasktracker

    then from following ssh's should work
    Jobtracker to Tasktrackers
    j.jeka.com > n.jeka.com

    Jobtracker to Tasktrackers
    j.jeka.com > t1.jeka.com
    j.jeka.com > t2.jeka.com

    Namenode to Jobtracker

    n.jeka.com > j.jeka.com

    Namenode to DataNodes
    n.jeka.com > t1.jeka.com
    n.jeka.com > t2.jeka.com

    Datanodes to NameNode

    t1.jeka.com > n.jeka.com
    t2.jeka.com > n.jeka.com

    Tasktrackers to Jobtracker

    t1.jeka.com > j.jeka.com
    t2.jeka.com > j.jeka.com


    Please note that it is not required to be able to ssh from one task tracker to another




    Hadoop Administrator Interview Questions Part 1


    Following are some questions and answers to ask a Hadoop Administrator Interviewee

    Q1. What are the default configuration files that are used in Hadoop 
    As of 0.20 release, Hadoop supported the following read-only default configurations
    - src/core/core-default.xml
    - src/hdfs/hdfs-default.xml
    - src/mapred/mapred-default.xml

    Q2. How will you make changes to the default configuration files 
    Hadoop does not recommends changing the default configuration files, instead it recommends making all site specific changes in the following files
    - conf/core-site.xml
    - conf/hdfs-site.xml
    - conf/mapred-site.xml

    Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:
    - core-default.xml : Read-only defaults for hadoop.
    - core-site.xml: Site-specific configuration for a given hadoop installation.

    Hence if same configuration is defined in file core-default.xml and src/core/core-default.xml then the values in file core-default.xml (same is true for other 2 file pairs) is used.

    Q3. Consider case scenario where you have set property mapred.output.compress to true to ensure that all output files are compressed for efficient space usage on the cluster.  If a cluster user does not want to compress data for a specific job then what will you recommend him to do ? 
    Ask him to create his own configuration file and specify configuration mapred.output.compress to false and load this file as a resource in his job.

    Q4. In the above case scenario, how can ensure that user cannot override the configuration mapred.output.compress to false in any of his jobs
    This can be done by setting the property final to true in the core-site.xml file

    Q5. What of the following is the only required variable that needs to be set in file conf/hadoop-env.sh for hadoop to work 
    - HADOOP_LOG_DIR
    - JAVA_HOME
    - HADOOP_CLASSPATH
    The only required variable to set is JAVA_HOME that needs to point to <java installation> directory

    Q6. List all the daemons required to run the Hadoop cluster 
    - NameNode
    - DataNode
    - JobTracker
    - TaskTracker

    Q7. Whats the default port that jobtrackers listens to
    50030

    Q8. Whats the default  port where the dfs namenode web ui will listen on
    50070

    Sunday, November 28, 2010

    Java interview questions for Hadoop developer Part 3

     Q21. Explain difference of Class Variable and Instance Variable and how are they declared in Java 
    Class Variable is a variable which is declared with static modifier.
    Instance variable is a variable in a class without static modifier.
    The main difference between the class variable and Instance variable is, that first time, when class is loaded in to memory, then only memory is allocated for all class variables. That means, class variables do not depend on the Objets of that classes. What ever number of objects are there, only one copy is created at the time of class loding.

    Q22. Since an Abstract class in Java cannot be instantiated then how can you use its non static methods 
    By extending it

    Q23. How would you make a copy of an entire Java object with its state? 
    Have this class implement Cloneable interface and call its method clone().

    Q24. Explain Encapsulation,Inheritance and Polymorphism 
    Encapsulation is a process of binding or wrapping the data and the codes that operates on the data into a single entity. This keeps the data safe from outside interface and misuse. One way to think about encapsulation is as a protective wrapper that prevents code and data from being arbitrarily accessed by other code defined outside the wrapper.
    Inheritance is the process by which one object acquires the properties of another object.
    The meaning of Polymorphism is something like one name many forms. Polymorphism enables one entity to be used as as general category for different types of actions. The specific action is determined by the exact nature of the situation. The concept of polymorphism can be explained as "one interface, multiple methods".

    Q25. Explain garbage collection? 
    Garbage collection is one of the most important feature of Java.
    Garbage collection is also called automatic memory management as JVM automatically removes the unused variables/objects (value is null) from the memory. User program cann't directly free the object from memory, instead it is the job of the garbage collector to automatically free the objects that are no longer referenced by a program. Every class inherits finalize() method from java.lang.Object, the finalize() method is called by garbage collector when it determines no more references to the object exists. In Java, it is good idea to explicitly assign null into a variable when no more in us

    Q26. What is similarities/difference between an Abstract class and Interface? 
    Differences- Interfaces provide a form of multiple inheritance. A class can extend only one other class.
    - Interfaces are limited to public methods and constants with no implementation. Abstract classes can have a partial implementation, protected parts, static methods, etc.
    - A Class may implement several interfaces. But in case of abstract class, a class may extend only one abstract class.
    - Interfaces are slow as it requires extra indirection to find corresponding method in in the actual class. Abstract classes are fast.
    Similarities
    - Neither Abstract classes or Interface can be instantiated

    Q27. What are different ways to make your class multithreaded in Java 
    There are two ways to create new kinds of threads:
    - Define a new class that extends the Thread class
    - Define a new class that implements the Runnable interface, and pass an object of that class to a Thread's constructor.

    Q28. What do you understand by Synchronization? How do synchronize a method call in Java? How do you synchonize a block of code in java ?
    Synchronization is a process of controlling the access of shared resources by the multiple threads in such a manner that only one thread can access one resource at a time. In non synchronized multithreaded application, it is possible for one thread to modify a shared object while another thread is in the process of using or updating the object's value. Synchronization prevents such type of data corruption.
    - Synchronizing a method: Put keyword synchronized as part of the method declaration
    - Synchronizing a block of code inside a method: Put block of code in synchronized (this) { Some Code }

    Q29. What is transient variable? 
    Transient variable can't be serialize. For example if a variable is declared as transient in a Serializable class and the class is written to an ObjectStream, the value of the variable can't be written to the stream instead when the class is retrieved from the ObjectStreamthe value of the variable becomes null.

    Q30. What is Properties class in Java. Which class does it extends? 
    The Properties class represents a persistent set of properties. The Properties can be saved to a stream or loaded from a stream. Each key and its corresponding value in the property list is a string

    Q31. Explain the concept of shallow copy vs deep copy in Java 
    In case of shallow copy, the cloned object also refers to the same object to which the original object refers as only the object references gets copied and not the referred objects themselves.
    In case deep copy, a clone of the class and all all objects referred by that class is made.

    Q32. How can you make a shallow copy of an object in Java 
    Use clone() method inherited by Object class

    Q33. How would you make a copy of an entire Java object (deep copy) with its state? 
    Have this class implement Cloneable interface and call its method clone().

    Java interview questions for Hadoop developer Part 2

    Q11. Which of the following object oriented principal is met with method overloading in java
    - Inheritance
    - Polymorphism
    - Inheritance 

    Polymorphism

    Q12. Which of the following object oriented principal is met with method overriding in java
    - Inheritance
    - Polymorphism
    - Inheritance 

    Polymorphism

    Q13. What is the name of collection interface used to maintain unique elements 
    Map

    Q14. What access level do you need to specify in the class declaration to ensure that only classes from the same directory can access it? What keyword is used to define this specifier? It has to have default specifier.
    You do not need to specify any access level, and Java will use a default package access level

    Q15. What's the difference between a queue and a stack? 
    Stacks works by last-in-first-out rule (LIFO), while queues use the FIFO rule

    Q16. How can you write user defined exceptions in Java 
    Make your class extend Exception Class

    Q17. What is the difference between checked and Unchecked Exceptions in Java ? Give an example of each type 
    All predefined exceptions in Java are either a checked exception or an unchecked exception. Checked exceptions must be caught using try .. catch() block or we should throw the exception using throws clause. If you dont, compilation of program will fail.
    - Example checked Exception: ParseTextException
    - Example unchecked exception: ArrayIndexOutOfBounds

    Q18. We know that FileNotFoundExceptionis inherited from IOExceptionthen does it matter in what order catch statements for FileNotFoundExceptionand IOExceptipon are written? 
    Yes, it does. The FileNoFoundExceptionis inherited from the IOException. Exception's subclasses have to be caught first.

    Q19. How do we find if two string are same or not in Java. If answer is equals() then why do we have to use equals, why cant we compare string like integers 
    We use method equals() to compare the values of the Strings. We can't use == like we do for primitive types like int because == checks if two variables point at the same instance of a String object.

    Q20. What is "package" keyword 
    This is a way to organize files when a project consists of multiple modules. It also helps resolve naming conflicts when different packages have classes with the same names. Packages access level also allows you to protect data from being used by the non-authorized classes

    Java interview questions for Hadoop developer Part 1

    Since Hadoop and all its eco-system is built in java hence when hiring for a hadoop developer it makes sense to test the core java skills of the interviewee as well. Following are some questions that I have compiled that test the basic java understanding of the candidate. I would expect any decent candidate to answer 90% of these questions

    Q1. What is mutable object and immutable object
    If a object value is changeable then we can call it as Mutable object. (Ex., StringBuffer) If you are not allowed to change the value of an object, it is immutable object. (Ex., String, Integer, Float)

    Q2. What are wrapped classes in Java. Why do they exist. Give examples 
    Wrapped classes are classes that allow primitive types to be accessed as objects, e.g. Integer, Float etc

    Q3. Even though garbage collection cleans memory, why can't it guarantee that a program will run out of memory? Give an example of a case when garbage collection will run out ot memory 
    Because it is possible for programs to use up memory resources faster than they are garbage collected. It is also possible for programs to create objects that are not subject to garbage collection. Once example can be if yuo try to load a very big file into an array.

    Q4. What is the difference between Process and Thread? 
    A process can contain multiple threads. In most multithreading operating systems, a process gets its own memory address space; a thread doesn't. Threads typically share the heap belonging to their parent process. For instance, a JVM runs in a single process in the host O/S. Threads in the JVM share the heap belonging to that process; that's why several threads may access the same object. Typically, even though they share a common heap, threads have their own stack space. This is how one thread's invocation of a method is kept separate from another's

    Q5. How can you write a indefinate loop in java 
    while(true) {
    }
    OR
    for ( ; ; ){
    }

    Q6. How can you create singleton class in Java 
    Make the constructor of the class private and provide a static method to get instance of the class

    Q7. What do keywords "this" and "super" do in Java 
    "this" is used to refer to current object. "super" is used to refer to the class extended by the current class

    Q8. What are access specifiers in java. List all of them. Access specifiers are used to define score of variables in Java. There are four levels of access specifiers in java- public
    - private
    - protected
    - default

    Q9. Which of the following 3 object oriented principals does access specifiers implement in java
    - Encapsulation
    - Polymorphism
    - Intheritance 

    Encapsulation

    Q10. What is method overriding and method overloading 
    With overriding, you change the method behavior for a subclass class. Overloading involves having a method with the same name within the class with different signature

    Hadoop Distributed File Systems (HDFS) Interview Questions

    Q1. What is HDFS  HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications

    Q2. What does the statement "HDFS is block structured file system" means  It means that in HDFS individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity

    Q3. What does the term "Replication factor" mean  Replication factor is the number of times a file needs to be replicated in HDFS

    Q4. What is the default replication factor in HDFS  3

    Q5. What is the typical block size of an HDFS block  64Mb to 128Mb

    Q6. What is the benefit of having such big block size (when compared to block size of linux file system like ext)  It allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk

    Q7. Why is it recommended to have few very large files instead of a lot of small files in HDFS  This is because the Name node contains the meta data of each and every file in HDFS and more files means more metadata and since namenode loads all the metadata in memory for speed hence having a lot of files may make the metadata information big enough to exceed the size of the memory on the Name node

    Q8. True/false question. What is the lowest granularity at which you can apply replication factor in HDSF
    - You can choose replication factor per directory
    - You can choose replication factor per file in a directory
    - You can choose replication factor per block of a file

    - True
    - True
    - False

    Q9. What is a datanode in HDFS  ndividual machines in the HDFS cluster that hold blocks of data are called datanodes

    Q10. What is a Namenode in HDSF  The Namenode stores all the metadata for the file system

    Q11. What alternate way does HDFS provides to recover data in case a Namenode, without backup, fails and cannot be recovered  There is no way. If Namenode dies and there is no backup then there is no way to recover data

    Q12. Describe how a HDFS client will read a file in HDFS, like will it talk to data node or namenode ... how will data flow etc  To open a file, a client contacts the Name Node and retrieves a list of locations for the blocks that comprise the file. These locations identify the Data Nodes which hold each block. Clients then read file data directly from the Data Node servers, possibly in parallel. The Name Node is not directly involved in this bulk data transfer, keeping its overhead to a minimum.

    Q13. Using linux command line. how will you - List the the number of files in a HDFS directory
    - Create a directory in HDFS
    - Copy file from your local directory to HDSF
    - hadoop fs -ls
    - hadoop fs -mkdir
    - hadoop fs -put localfile hdfsfile OR hadoop fs -copyFromLocal localfile hdfsfile

    Saturday, November 27, 2010

    How to install standalone Hadoop for development and debugging purposes

    It is very easy to setup a standalone hadoop installation for development and testing purposes. Infact having a standlone hadoop installation on your local linux machine can be of great help in debugging issues.

    The following are the steps involved in setting up the standalone installation of Hadoop 0.20 on Java 5. Please note that setting up a Hadoop cluster is very different than setting up a standalone version.

    INSTALL
    - Get a linux machine
       - Suppose your username is jeka and home directory is /home/jeka or ~
       - Create directories ~/java and ~/hadoop

    - Download required software if required
       - Download java release (file jdk-6u22-linux-i586.bin) from here  to directory ~/java
       - Download Hadoop release(file hadoop-0.20.2.tar.gz) from here to directory~/hadoop

    - Install Java
       - Make java file executable
                 chmod a+x java/jdk-6u22-linux-i586.bin
       - Install it
                ~/java/jdk-6u22-linux-i586.bin

    - Install Hadoop
       - Unzip
              gunzip ~/hadoop/hadoop-0.20.2.tar.gz
        - Untar
               tar -xvf ~/hadoop/hadoop-0.20.2.tar.gz

    Thats it!. The installation is done and hadoop is ready to be used but to make life a little easier we should set up some environment variables.

    CONFIGURE
    Both Java and Hadoop provides command line clients  (or executables) java and hadoop respectively. These executables can found in the bin directory of the installation.
    - Create a file ~/.hadoop_profile and add following lines in it

    export JAVA_HOME="~/java/jdk1.6.0_22"
    export HADOOP_HOME="~/hadoop/hadoop-0.20.2"
    export PATH=${PATH}:${JAVA_HOME}:${HADOOP_HOME}

    Save this file and source it 
        source ~/.hadoop_profile

    Now instead of running hadoop like ~/hadoop/hadoop-0.20.2/bin/hadoop you can simple use it as hadoop. 


    RUN YOUR FIRST HADOOP JOB
    Note: This job will run on your local machine and not HDFS
    File ~/hadoop/hadoop-0.20.2/hadoop-0.20.2-examples.jar comes with some examples. We can use one of the examples "grep" from that.

    In the following example, we will use one of the map reduce examples to read the number of times the work "copyright" appeared in file  LICENSE.txt. 


     cd ~/hadoop/hadoop-0.20.2
     hadoop jar hadoop-0.20.2-examples.jar grep LICENSE.txt ~/tmp/out "copyright"

    Output: 4
    cat ~/tmp/out/* 

    It's very simple to create your own jar and run it instead of using the examples jar. See blog post
    http://hadoop-blog.blogspot.com/2010/11/how-to-run-and-compile-hadoop-program.html for more details





    Friday, November 26, 2010

    Hadoop Interview questions - Part 4

    Q31. How will you write a custom partitioner for a Hadoop job  
    To have hadoop use a custom partitioner you will have to do minimum the following three
    - Create a new class that extends Partitioner class
    - Override method getPartition
    - In the wrapper that runs the Map Reducer, either
      - add the custom partitioner to the job programtically using method setPartitionerClass or
      - add the custom partitioner to the job as a config file (if your wrapper reads from config file or oozie)

    Q32. How did you debug your Hadoop code  
    There can be several ways of doing this but most common ways are
    - By using counters
    - The web interface provided by Hadoop framework

    Q33. Did you ever built a production process in Hadoop ? If yes then what was the process when your hadoop job fails due to any reason
    Its an open ended question but most candidates, if they have written a production job, should talk about some type of alert mechanisn like email is sent or there monitoring system sends an alert. Since Hadoop works on unstructured data, its very important to have a good alerting system for errors since unexpected data can very easily break the job.

    Q34. Did you ever ran into a lop sided job that resulted in out of memory error, if yes then how did you handled it
    This is an open ended question but a candidate who claims to be an intermediate developer and has worked on large data set (10-20GB min) should have run into this problem. There can be many ways to handle this problem but most common way is to alter your algorithm and break down the job into more map reduce phase or use a combiner if possible.

    Hadoop Interview questions - Part 3

    Q22. Whats is Distributed Cache in Hadoop
    Distributed Cache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications during execution of the job. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node.

    Q23. What is the benifit of Distributed cache, why can we just have the file in HDFS and have the application read it  
    This is because distributed cache is much faster. It copies the file to all trackers at the start of the job. Now if the task tracker runs 10 or 100 mappers or reducer, it will use the same copy of distributed cache. On the other hand, if you put code in file to read it from HDFS in the MR job then every mapper will try to access it from HDFS hence if a task tracker run 100 map jobs then it will try to read this file 100 times from HDFS. Also HDFS is not very efficient when used like this.

    Q.24 What mechanism does Hadoop framework provides to synchronize changes made in Distribution Cache during runtime of the application  
    This is a trick questions. There is no such mechanism. Distributed Cache by design is read only during the time of Job execution

    Q25. Have you ever used Counters in Hadoop. Give us an example scenario
    Anybody who claims to have worked on a Hadoop project is expected to use counters

    Q26. Is it possible to provide multiple input to Hadoop? If yes then how can you give multiple directories as input to the Hadoop job  
    Yes, The input format class provides methods to add multiple directories as input to a Hadoop job

    Q27. Is it possible to have Hadoop job output in multiple directories. If yes then how  
    Yes, by using Multiple Outputs class

    Q28. What will a hadoop job do if you try to run it with an output directory that is already present? Will it
    - overwrite it
    - warn you and continue
    - throw an exception and exit
    The hadoop job will throw an exception and exit.

    Q29. How can you set an arbitary number of mappers to be created for a job in Hadoop  
    This is a trick question. You cannot set it

    Q30. How can you set an arbitary number of reducers to be created for a job in Hadoop  
    You can either do it progamatically by using method setNumReduceTasksin the JobConfclass or set it up as a configuration setting

    Hadoop Interview questions - Part 2

    Q11. Give an example scenario where a cobiner can be used and where it cannot be used
    There can be several examples following are the most common ones
    - Scenario where you can use combiner
      Getting list of distinct words in a file

    - Scenario where you cannot use a combiner
      Calculating mean of a list of numbers 

    Q12. What is job tracker
    Job Tracker is the service within Hadoop that runs Map Reduce jobs on the cluster

    Q13. What are some typical functions of Job Tracker
    The following are some typical tasks of Job Tracker
    - Accepts jobs from clients
    - It talks to the NameNode to determine the location of the data
    - It locates TaskTracker nodes with available slots at or near the data
    - It submits the work to the chosen Task Tracker nodes and monitors progress of each task by receiving heartbeat signals from Task tracker 

    Q14. What is task tracker
    Task Tracker is a node in the cluster that accepts tasks like Map, Reduce and Shuffle operations - from a JobTracker 

    Q15. Whats the relationship between Jobs and Tasks in Hadoop
    One job is broken down into one or many tasks in Hadoop. 

    Q16. Suppose Hadoop spawned 100 tasks for a job and one of the task failed. What will hadoop do ?
    It will restart the task again on some other task tracker and only if the task fails more than 4 (default setting and can be changed) times will it kill the job

    Q17. Hadoop achieves parallelism by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program and slow down the program. What mechanism Hadoop provides to combat this  
    Speculative Execution 

    Q18. How does speculative execution works in Hadoop  
    Job tracker makes different task trackers process same input. When tasks complete, they announce this fact to the Job Tracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the Task Trackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first. 

    Q19. Using command line in Linux, how will you 
    - see all jobs running in the hadoop cluster
    - kill a job
    - hadoop job -list
    - hadoop job -kill jobid 

    Q20. What is Hadoop Streaming  
    Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations 


    Q21. What is the characteristic of streaming API that makes it flexible run map reduce jobs in languages like perl, ruby, awk etc.  
    Hadoop Streaming allows to use arbitrary programs for the Mapper and Reducer phases of a Map Reduce job by having both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

    Hadoop Interview questions - Part 1

    Q1. Name the most common InputFormats defined in Hadoop? Which one is default ?
    Following 2 are most common InputFormats defined in Hadoop
    - TextInputFormat
    - KeyValueInputFormat
    - SequenceFileInputFormat

    TextInputFormatis the hadoop default

    Q2. What is the difference between TextInputFormatand KeyValueInputFormat class
    TextInputFormat: It reads lines of text files and provides the offset of the line as key to the Mapper and actual line as Value to the mapper
    KeyValueInputFormat: Reads text file and parses lines into key, val pairs. Everything up to the first tab character is sent as key to the Mapper and the remainder of the line is sent as value to the mapper.

    Q3. What is InputSplit in Hadoop
    When a hadoop job is run, it splits input files into chunks and assign each split to a mapper to process. This is called Input Split 

    Q4. How is the splitting of file invoked in Hadoop Framework  
    It is invoked by the Hadoop framework by running getInputSplit()method of the Input format class (like FileInputFormat) defined by the user 

    Q5. Consider case scenario: In M/R system,
        - HDFS block size is 64 MB
        - Input format is FileInputFormat
        - We have 3 files of size 64K, 65Mb and 127Mb 
    then how many input splits will be made by Hadoop framework?
    Hadoop will make 5 splits as follows 
    - 1 split for 64K files 
    - 2  splits for 65Mb files 
    - 2 splits for 127Mb file 

    Q6. What is the purpose of RecordReader in Hadoop
    The InputSplithas defined a slice of work, but does not describe how to access it. The RecordReaderclass actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is defined by the InputFormat 

    Q7. After the Map phase finishes, the hadoop framework does "Partitioning, Shuffle and sort". Explain what happens in this phase?
    - Partitioning
    Partitioning is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same

    - Shuffle
    After the first map tasks have completed, the nodes may still be performing several more map tasks each. But they also begin exchanging the intermediate outputs from the map tasks to where they are required by the reducers. This process of moving map outputs to the reducers is known as shuffling.

    - Sort
    Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer 

    Q9. If no custom partitioner is defined in the hadoop then how is data partitioned before its sent to the reducer  
    The default partitioner computes a hash value for the key and assigns the partition based on this result 

    Q10. What is a Combiner 
    The Combiner is a "mini-reduce" process which operates only on data generated by a mapper. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers.

    Hadoop Java API version 0.21

    We use cloudera's Hadoop distribution (cdh3). Cloudera has enhanced some java classes and made them more easier to use. They have also added some good security features in Hadoop that are currently missing in Apache Hadoop distribution.

    Here is the link to the Hadoop Java API docs

    http://archive.cloudera.com/cdh/3/hadoop-0.20.2+228/api/index.html?org/apache/hadoop/mapred/lib/package-summary.html

    Sunday, November 21, 2010

    Where to get Hadoop traning

    We keep on hearing how all companies are trying to move to Hadoop to analyze big data but even as of today, Hadoop traning is very limited.

    At this time Cloudera is only company that gives training for Hadoop. You can get more details at www.cloudera.com/hadoop-training. We had this training and found it useful to get introduced to Hadoop.

    It walks you through
    - All the concepts of Hadoop
    - Discuss the ecosystem/tools that is getting built around hadoop, like Hive, Zookeeper, Hbase etc.
    - Make you write some hive queries
    - Make you write a MapReduce program

    The training is good but it definately have lot of room for improvement. 3 top concepts that I would have liked the training to cover were
    - Real world example of how sorting of keys is a very powerful feature of hadoop
    - Real world example of how to use the Partitioner class
    - How to handle data skews in Hadoop

    From my own experience, grasp of these 3 concepts means you can start using Hadoop in real world production. If you dont understand any of these 3 concepts then you will find yourself very limited in the world of Hadoop.

    Thursday, November 18, 2010

    How to use distcp in Hadoop


    DistCp (distributed copy) is a tool used for large inter/intra-cluster copying. It uses Map/Reduce to effect its distribution, error handling and recovery, and reporting. It expands a list of files and directories into input to map tasks, each of which will copy a partition of the files specified in the source list. Its Map/Reduce pedigree has endowed it with some quirks in both its semantics and execution. The purpose of this document is to offer guidance for common tasks and to elucidate its model.

    Following are some examples of distcp using the hadoop client
    hadoop distcp hdfs://SourceCluster:9000/foo/bar hdfs://DestinationCluster:9000/bar/foo

    You can get the values hdfs://SourceCluster:9000 and hdfs://DestinationCluster:9000 from property fs.default.name from file core-site.xml of your hadoop implementation.

    Following is an example of how to copy multiple directories
    hadoop distcp hdfs:// SourceCluster:9000/foo/a hdfs:// SourceCluster:9000/foo/b hdfs://DestinationCluster:9000/bar/foo

    Wednesday, November 17, 2010

    The Hitler parody on Hadoop that focuses on the gotcha "secondary name node"

    All the metadata of the HDFS file system that hadoop runs is managed by a server that run the Name node daemon and is called the "Name Node". If this node fail, data on your whole cluster is pretty much gone. Hadoop introduced something called secondary namenode which most people think is a hot spare but it actually is more like a checkpoint node. It takes snapshots of the metadata on Name Node.

    See the video and understand this concept in detail if you are running a hadoop cluster.
    http://vimeo.com/15782414

    Video discussion that talks about Vertica & Hadoop integration

    Video discussion that talks about Vertica & Hadoop integration
    http://vimeo.com/16863984

    The video talk about the connector Vertica provides for Hadoop. As of Nov 2010, this connector had a lot of issues.

    Following are the cons of using the connector

    • The connector is very very slow and alsmot unusable if you have more than couple GBs of data
    • If a Hadoop task that pushes data to Vertica using the connector fails half then Hadoop restarts the task from scratch but the Vertica connector is not aware of this hence it insert duplicate data 
    • Not sure how the connector pushes data to Vertica, if you try to push more than Vertica can handle then there is no graceful degradation in performance, your Vertica Cluster can pretty much go down. 

    Video tutorial to install Hadoop on your Linux cluster

    Following video shows how to install Arch Linux from the Live CD and install Hadoop on top of that
    http://vimeo.com/8615628

    Video that talks about handling big data challenges in hadoop

    A good video that talks about handling big data challenges in hadoop
    http://vimeo.com/15362922

    Tuesday, November 16, 2010

    How to set all variables that can be set in hive

    In the hive shell,
    set -v

    How to sort reducer input values in hadoop

    How to sort reducer input values in hadoop
    I recently found the need to sort by value (intead of key) in Hadoop. I've seen some comments that call this a "secondary sort". Essentially, I wanted the reducer's values iterator to be sorted. There seem to be almost no docs, tutorials, or examples (that I could find) on the net for this.

    PROBLEM STATEMENT
    Suppose we have a file with a bunch of comma/line separated letters:
    l,f,a,e,a,a,l
    f,g,b,c,b,d,f
    x,i,t,u,f,e,h
    ...etc

    We want our reducer to receive bigrams (lf, fa, ae, ea, aa, al, etc), but partitioned by the first letter, and sorted (ascending) by the second. For example, for the letter a, the reducer should receive:

    This is actually somewhat difficult to do, since we want to partition by key, but sort the reducer's values iterator. The trick is to have the mapper output the bigram in the key, and only the second letter in the value. For the example above, the mapper would emit:



    ...

    We can then use a custom partitioner/sorter to partition and sort according to our needs.

    SORTING BY VALUE
    To sort Hadoop's mapper output by value, you need to set three settings in your JobConf:
    setPartitionerClass
    setOutputValueGroupingComparator
    setOutputKeyComparatorClass
    There are many threads that say that you can't sort by value in Hadoop. This is true. What you can do, instead, is have your mapper output all data in the key, rather than the value. Then you can use a specialized Partitioner classes and two RawComparator classes to sort and partition your map output properly.

    PARTITIONER
    The first class that you need to set is a class that extends org.apache.hadoop.mapred.Partitioner. This class has a single function that determines which partition your map output should go to. This means that you can't go below 0, or above numPartitions - 1. Mostly, you'll want to hashCode() some portion of your key and mod it by numPartitions.

    In our example, the partitioner will partition by the first letter of the key.

    OUTPUT VALUE GROUPING COMPARATOR
    The OutputValueGroupingComparator JobConf setting takes in a org.apache.hadoop.io.RawComparator. This RawComparator is used to determine which reducer the mapper output row should go to. This RawComparator does not sort a reducer's value iterator. Instead, it's used to sort reducer input, so that the reducer knows when a new grouping starts.

    In our example, the value grouping comparator will sort by the first letter of the key.

    OUTPUT KEY COMPARATOR
    The OutputKeyComparatorClass JobConf setting also takes in a org.apache.hadoop.io.RawComparator. This RawComparator is used to sort the values iterator that the reducer gets, which is what we want. It should be noted, that although the RawComparator is used to sort the values iterator, the data that gets passed into the comparator is the mapper key output. This is the reason that we must put all data in the key as well as the value.

    A very important thing to note is that they key compartor must also enforce the value grouping comparator's rules. In our example, this means that it must first check if the first letter is equal. If it's not equal, it should return the same ruls as the value comparator. Only if the first letter of the key is equal should we apply our value-level sorting (comparing the second letter). If you do not do this, you will break your grouping.

    In our example, the key comparator will sort by the second letter of the key.

    RUNNING THE JOB
    Now, all we need to do is run the job.


    public class SortReducerByValues {
    public static final String INPUT = "/tmp/data_in";
    public static final String OUTPUT = "/tmp/data_out";

    public static void main(String[] args) throws IOException {
    new SortReducerByValues().run();
    }

    public void run() throws IOException {
    JobConf conf = new JobConf();

    conf.setInputFormat(SequenceFileInputFormat.class);
    conf.setOutputFormat(SequenceFileOutputFormat.class);

    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(SortReducerByValuesMapper.class);
    conf.setReducerClass(SortReducerByValuesReducer.class);

    conf.setOutputKeyComparatorClass(SortReducerByValuesKeyComparator.class);
    conf.setOutputValueGroupingComparator(SortReducerByValuesValueGroupingComparator.class);
    conf.setPartitionerClass(SortReducerByValuesPartitioner.class);

    FileInputFormat.addInputPath(conf, new Path(INPUT));
    FileOutputFormat.setOutputPath(conf, new Path(OUTPUT));

    conf.getWorkingDirectory().getFileSystem(conf).delete(new Path(INPUT), true);
    conf.getWorkingDirectory().getFileSystem(conf).delete(new Path(OUTPUT), true);

    loadFakeData(INPUT);

    JobClient.runJob(conf).waitForCompletion();
    }

    public static final class SortReducerByValuesKeyComparator implements RawComparator {
    public int compare(byte[] text1, int start1, int length1, byte[] text2, int start2, int length2) {
    // hadoop gives you an extra byte before text data. get rid of it.
    byte[] trimmed1 = new byte[2];
    byte[] trimmed2 = new byte[2];
    System.arraycopy(text1, start1 + 1, trimmed1, 0, 2);
    System.arraycopy(text2, start2 + 1, trimmed2, 0, 2);

    char char10 = (char) trimmed1[0];
    char char20 = (char) trimmed2[0];
    char char11 = (char) trimmed1[1];
    char char21 = (char) trimmed2[1];

    // first enforce the same rules as the value grouping comparator
    // (first letter of key)
    int compare = new Character(char10).compareTo(char20);

    if (compare == 0) {
    // ONLY if we're in the same reduce aggregate should we try and
    // sort by value (second letter of key)
    return -1 * new Character(char11).compareTo(char21);
    }

    return compare;
    }

    public int compare(Text o1, Text o2) {
    // reverse the +1 since the extra text byte is not passed into
    // compare() from this function
    return compare(o1.getBytes(), 0, o1.getLength() - 1, o2.getBytes(), 0, o2.getLength() - 1);
    }
    }

    public static final class SortReducerByValuesPartitioner implements Partitioner {
    public int getPartition(Text key, Text value, int numPartitions) {
    // just partition by the first character of each key since that's
    // how we are grouping for the reducer
    return key.toString().charAt(0) % numPartitions;
    }

    public void configure(JobConf conf) {
    }
    }

    public static final class SortReducerByValuesValueGroupingComparator implements RawComparator {
    public int compare(byte[] text1, int start1, int length1, byte[] text2, int start2, int length2) {
    // look at first character of each text byte array
    return new Character((char) text1[0]).compareTo((char) text2[0]);
    }

    public int compare(Text o1, Text o2) {
    return compare(o1.getBytes(), 0, o1.getLength(), o2.getBytes(), 0, o2.getLength());
    }
    }

    protected void loadFakeData(String path) throws IOException {
    JobConf conf = new JobConf();
    Writer writer = SequenceFile.createWriter(FileSystem.get(conf), conf, new Path(path), Text.class, Text.class);

    for (int i = 0; i < 100; ++i) { String letterCSV = ""; for (int j = 0; j < 10; ++j) { letterCSV += (char) (65 + (int) (Math.random() * 26)) + ","; } writer.append(new Text(), new Text(letterCSV.substring(0, letterCSV.length() - 1))); } writer.close(); } public static final class SortReducerByValuesMapper implements Mapper { public void map(Text key, Text val, OutputCollector collector, Reporter reporter) throws IOException { String[] chars = val.toString().split(","); for (int i = 0; i < chars.length - 1; ++i) { collector.collect(new Text(chars[i] + chars[i + 1]), new Text(chars[i + 1])); } } public void configure(JobConf conf) { } public void close() throws IOException { } } public static final class SortReducerByValuesReducer implements Reducer { @Override public void reduce(Text key, Iterator values, OutputCollector collector, Reporter reporter) throws IOException { // values should now be in order String check = key + ": "; while (values.hasNext()) { check += values.next(); } System.err.println(check); } public void configure(JobConf conf) { } public void close() throws IOException { } }}


    OUTPUT
    As you can see, the reducer input is grouped by the first letter (our logical key), and the values are sorted ascending.
    AY: YWUTSSSRRQPPPPOMMKJIIIFB
    BZ: ZYYXXXWVUUURRRRQPPPPPPOONMMLLKJHGEEDDBB
    CZ: ZZZZZYYXXWVUUUTSSSRQQOOMKKKHHHGGFFDDCCCBB
    DY: YXWWSSQQPPPONMMKIHGEDDCCBB
    EW: WVUTRRRQPOOOOONMLLKKKJJHFEEDDCCBA
    FY: YXXXWVVVUUTSSRPNNNLLKJHGFFECBBBBA
    GZ: ZZYXVTSQQPOONLJIHHHFFCCCBBA
    HZ: ZYYYYXWVUUTTTRQQPOOMKJJIIIGFEDAAAA
    IY: YYYXWWVVVUTTSRRRQMKJJJIIIHGGFFEEEECBBBA
    JZ: ZZYYXXWRRRQPPOOOMLJJIIHHHHCCCBBA
    KZ: ZZYXWWVSSSSRQONMJIIHFEDB
    LZ: ZZZYYXXWUTRQQQPLKJIIIHHGGFDDCCCBBBAA
    MZ: ZZYYYWTTTSSQQQQOJIIIHGGFCCCBBAA
    NZ: ZZYYXVVUTSSSSRQPNMKIHGFFFECAA
    OZ: ZZZYYXWWUSRRPPOONNNMMLJIIHHHGGFEEEDDCCBA
    PZ: ZXXWWTSSSSSRRRQQQMMLLLKJIIIHEEDCBA
    QZ: ZZYXWWWWWWVVVUTTSSSSRRRRQQQPOOONMLKJIHHFDD
    RY: YYYXXXWVUURRQQPPOOOLLLLLLKJJJJHHHHGFFEEEDDCCCBAA
    SY: YXWWWVVVUUUTTSRRRQQQNNNMLJHHHGGGGFEDDCCCBB
    TZ: ZZYYYYXXWVTTTTSRRQPPONNNNMMLLJIIIICBB
    UZ: ZZZYXWWVVVSRRRQPPONMHHGEDCBBA
    VZ: ZYWVVUTTTQPPPOOMKIIGFEEDDCCCBB
    WX: XWWWVUTTSSSRRPNNNMMLLKKKKKJJIGEDAAAA
    XZ: ZZYXXXXSSRRQQOOMLLKKJIIIIHGFFEDDBA
    YZ: ZZZYXWWVUUUTTTSSRQPPPOONNNMLJIIFFFFEDCCCAA
    ZZ: ZYYWVVUUTSSSSRRQQQPOONMMLLLJJIIGFDBBBA

    How to write a file in HDFS using hadoop

    The following is the code to write a file from HDFS file system

    1. Open File Write.java and paste the following code

    package org.myorg;
    import java.io.*;
    import java.util.*;
    import java.net.*;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;

    public class Write{
    public static void main (String [] args) throws Exception{
    try{
    Path pt=new Path("hdfs://jp.seka.com:9000/user/jfor/out/abc");
    FileSystem fs = FileSystem.get(new Configuration());
    BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
    // TO append data to a file, use fs.append(Path f)
    String line;
    line="Disha Dishu Daasha";
    System.out.println(line);
    br.write(line);
    br.close();
    }catch(Exception e){
    System.out.println("File not found");
    }
    }
    }


    2. Compile the code

    javac -classpath hadoop-0.20.1-dev-core.jar -d Write/ Write.java


    3. Create jar

    jar -cvf Write.jar -C Write/ .


    4. Run

    hadoop jar Write.jar org.myorg.Write

    How to write output to multiple named files in Hadoop using MultipleTextOutputFormat

    Sometime we want our Map Reduce job to output data in named files.

    For e.g.
    Suppose you have input file that contains the following data
    Name:Dash
    Age:27
    Name:Nish
    Age:29

    This can be done in Hadoop by using MultipleTextOutputFormat class. The following is a simple example implementation of MultipleTextOutputFormat class which will read the file above and create 2 output files Name and Age
    The code where the action happens is highlighted in red

    The code is at https://sites.google.com/site/hadoopandhive/home/how-to-write-output-to-multiple-named-files-in-hadoop-using-multipletextoutputformat


    The output would be files Name and Age.
    File Name contains data
    Name Nish
    Name Dash


    File Age contains data
    Age 27
    Age 29


    Class MultiFileOutput extends MultipleTextOutputFormat. What this means is that when the reducer is ready to spit out the Key/Value pair then before writing it to a file, it passes them to method generateFileNameForKeyValue. The logic to name the output file is the embedded in this method (in this case the logic is to create 1 file per key). The String returned by method generateFileNameForKeyValue determines the name of the file where this Key/Value pair is logged.

    Some known limitations of hive SQL language

    1.You cannot update data after it is inserted
    2.There is no "insert into table values ... " statement
    3.You can only load data using bulk load
    4.There is no "delete from " command
    5.You can only do bulk delete

    How to see table definition (extended) in Hive

    To see table definition in Hive, run command
    describe table name;

    To see more detailed information about the table, run command
    describe extended [tablename];

    How to see list of all tables in Hive

    To see all tables in Hive, go to hive command line and type
    show tables;

    How to see query plan in Hive

    To see a query plan for Hive, before running any query, add work explain

    For example
    explain select * from table_name

    How to run Map Reduce programs within Hive

    You can run Map Reduce program within Hive using command TRANSFORM

    How to run distributed file system command (HDFS) in hive

    To run a hdfs command in Hive shell run the command with with prefix dfs

    For example, the command hadoop fs -ls from hive shell can be run as
    dfs -ls

    How to run and compile a Hadoop program

    Suppose you created a Java map reduce file WordCount.java then running a hadoop program needs to be done in 3 steps.

    COMPILE
    javac -classpath ${HADOOP_CLASSES} -d WordCount/ WordCount.java

    CREATE JAR
    jar -cvf WordCount.jar -C WordCount/ .

    RUN
    hadoop jar WordCount.jar org.myorg.WordCount /user/tommy/test/ /user/tommy/testout

    How to use Hadoop Streaming

    RUN HADOOP STREAMING

    hadoop jar ${HADOOP_STREAMING} -input test/a' -output 'testout' -mapper '/bin/cat' -reducer '/usr/bin/wc -l '


    RUN HADOOP STREAMING WITH NO REDUCE TASK

    hadoop fs -rmr testout;hadoop jar ${HADOOP_STREAMING} -input 'testin' -output 'testout' -mapper '/bin/cat -n' -reducer '' -jobconf mapred.reduce.tasks=0


    HOW TO RUN A UNIX COMMAND USING COMMAND SUBSTITUTION IN STREAMING

    mawk="awk '{print NR,\$0}'"; hadoop fs -rmr testout;hadoop jar ${HADOOP_STREAMING} -input 'testin' -output 'testout' -mapper '/bin/cat -n' -reducer "${mawk}"

    How to rename a table in Apache Hive

    Run the following command to rename a table in Hive

    ALTER TABLE [old_table_name] RENAME TO [new_table_name];

    How to remove files in hadoop hdfs

    To remove files from HDFS using hadoop, run command

    hadoop fs -rmr [directory/files]

    How to read Hadoop HDFS Sequence file using Hadoop streaming

    Its very common to use unix "cat" command with hadoop streaming but it does not works if the underlying file is a sequence file. To get around the situation, run hadoop streaming with configuration

    -inputformat SequenceFileAsTextInputFormat



    The following code will read total lines in the file that is stored as sequence file in HDFS

    HADOOP=$HADOOP_HOME/bin/hadoop
    $HADOOP jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-dev-streaming.jar \
    -input
    -output \
    -mapper "/bin/cat" \
    -reducer "/bin/wc -l" \
    -inputformat SequenceFileAsTextInputFormat

    How to parse apache web/http logs into Hive table

    Apahe logs are analyzed are lot. It makes sense to put the apache logs in hive to take advantage of Hadoop.

    The following commands can be used to load a standard apache access log into a Hive table

    add jar ../build/contrib/hive_contrib.jar;

    CREATE TABLE apachelog (
    host STRING,
    identity STRING,
    user STRING,
    time STRING,
    request STRING,
    status STRING,
    size STRING,
    referer STRING,
    agent STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
    "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
    )
    STORED AS TEXTFILE;

    How to output a table or result of a query to a local file or HDFS in Hive

    The following command outputs the table to local directory
    INSERT OVERWRITE LOCAL DIRECTORY '' SELECT * FROM table_name;

    The following command outputs the table to a HDFS file
    INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM table_name;

    How to optimize hadoop jobs: Best Practices and thumb rules

    Hadoop provides a lot of ways to optimize the jobs to make them run faster for your data but the following are the thumb rules that should be followed before spending more time on it.

    - Number of mappers
    How long are you mappers running for? If they are only running for a few seconds on average, then you should see if there’s a way to have fewer mappers and make them all run longer, a minute or so, as a rule of thumb. The extent to which this is possible depends on the input format you are using.

    - Number of reducers
    For maximum performance, the number of reducers should be slightly less than the number of reduce slots in the cluster. This allows the reducers to finish in one wave, and fully utilizes the cluster during the reduce phase.

    - Combiners
    Can your job take advantage of a combiner to reduce the amount of data in passing through the shuffle?

    - Intermediate compression
    Job execution time can almost always benefit from enabling map output compression.

    - Custom serialization
    If you are using your own custom Writable objects, or custom comparators,then make sure you have implemented RawComparator.

    - Shuffle tweaks
    The MapReduce shuffle exposes around a dozen tuning parameters for memory management, which may help you eke out the last bit of performance.

    How to load date from RDBS (like Sql Server, Oracle) to Hadoop HDFS

    Use utility sqoop

    How to get the name of the file being executed by the mapper in Hadoop Map reduce framework

    Sometimes when we are processing data in hadoop, it helps to know the name of the file chunk being processed by the mapper. The default InputFormat will provide the Mapper with (key, value) pairs where the key is the byte offset into the file, and the value is a line of text.

    To get the filename of the current input, use the following code in your mapper

    FileSplit fileSplit = (FileSplit)context.getInputSplit();
    String filename = fileSplit.getPath().getName();
    System.out.println("File name "+filename);
    System.out.println("Directory and File name"+fileSplit.getPath().toString());

    How to manage partitions in Hadoop Hive

    Create Table

    CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
      [(col_name data_type [COMMENT col_comment], ...)]
      [table_comment]
      [PARTITIONED BY (col_name data_type [col_comment], col_name data_type [COMMENT col_comment], ...)]
      [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name, ...)] INTO num_buckets BUCKETS]
      [ROW FORMAT row_format]
      [STORED AS file_format]
      [LOCATION hdfs_path]
     
    data_type
      : primitive_type
      | array_type
      | map_type
     
    primitive_type
      : TINYINT
      | INT
      | BIGINT
      | BOOLEAN
      | DOUBLE
      | STRING
     
    array_type
      : ARRAY < primitive_type > 
     
    map_type
      : MAP < primitive_type, primitive_type >
     
    row_format
      : DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char] 
            [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
      | SERDE serde_name [WITH SERDEPROPERTIES property_name=property_value, property_name=property_value, ...]
     
    file_format:
      : SEQUENCEFILE
      | TEXTFILE

    CREATE TABLE creates a table with given name. An error is thrown if a table with the same name exists. You can use IF NOT EXISTS to skip the error.

    EXTERNAL keyword lets you create a table and provide a LOCATION so that Hive does not use default location for this table. This comes in handy if you already have data generated.

    You can create tables with custom SerDe or using native SerDe. A native SerDe is used if ROW FORMAT is not specified or ROW FORMAT DELIMITED is specified. You can use DELIMITED clause to read delimited files. Use SERDE clause to create a table with custom SerDe. Refer to SerDe section of User Guide for more information on SerDe.

    You must specify list of columns for tables with native SerDe. Refer to Types part of the User Guide for the allowable column types. List of columns for tables with custom SerDe may be specified but Hive will query the SerDe to determine the list of columns for this table.

    Use STORED AS TEXTFILE if the data needs to be stored as plain text files. Use STORED AS SEQUENCEFILE if the data needs to be compressed. Please read more about CompressedStorage if you are planning to keep data compressed in your Hive tables.

    Partitioned tables can be created using PARTIONED BY clause. A table can have one or more partition columns and a separate data directory is created for each set of partition columns values. Further tables or partitions can be bucketed using CLUSTERD BY columns and data can be sorted with in that bucket by SORT BY columns. This can improve performance on certain kind of queries.

    Table names and column names are case insensitive but SerDe and property names are case sensitive.

    Examples:

    An example statement that would create a table would be like:

    CREATE TABLE page_view(viewTime INT, userid BIGINT, 
         page_url STRING, referrer_url STRING, 
         ip STRING COMMENT 'IP Address of the User') 
     COMMENT 'This is the page view table' 
     PARTITIONED BY(dt STRING, country STRING) 
     STORED AS SEQUENCEFILE;

    The statement above creates page_view table with viewTime, userid, page_url, referrer_url, up columns with a comment. The table is also partitioned and data is stored in sequence files. The data in the files assumed to be field delimited by ctrl-A and row delimited by newline.

    CREATE TABLE page_view(viewTime INT, userid BIGINT, 
         page_url STRING, referrer_url STRING, 
         ip STRING COMMENT 'IP Address of the User') 
     COMMENT 'This is the page view table' 
     PARTITIONED BY(dt STRING, country STRING) 
     ROW FORMAT DELIMITED 
       FIELDS TERMINATED BY '\001' 
       LINES TERMINATED BY '\012' 
    STORED AS SEQUENCEFILE;

    The above statement lets you to create same table as previous table but the lines are delimited by '\012' instead of newline.

    CREATE TABLE page_view(viewTime INT, userid BIGINT, 
         page_url STRING, referrer_url STRING, 
         ip STRING COMMENT 'IP Address of the User') 
     COMMENT 'This is the page view table' 
     PARTITIONED BY(dt STRING, country STRING) 
     CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS 
     ROW FORMAT DELIMITED 
       FIELDS TERMINATED BY '\001' 
       COLLECTION ITEMS TERMINATED BY '\002' 
       MAP KEYS TERMINATED BY '\003' 
       LINES TERMINATED BY '\012' 
     STORED AS SEQUENCEFILE;

    In the example above, the page_view table is bucketed(clustered by) userid and within each bucket the data is sorted in the increasing order of viewTime. Such an organization allows the user to do efficient sampling on the clustered column - in this case userid. The sorting property allows internal operators to take advantage of the better-known data structure while evaluating queries, also increasing efficiency. MAP KEYS and COLLECTION ITEMS keywords can be used if any of the columns are lists or maps.

    In all the examples until now the data is stored in <hive.metastore.warehouse.dir>/page_view. Specify value of the key hive.metastore.warehouse.dir in Hive config file hive-site.xml.

    CREATE EXTERNAL TABLE page_view(viewTime INT, userid BIGINT, 
         page_url STRING, referrer_url STRING, 
         ip STRING COMMENT 'IP Address of the User', 
         country STRING COMMENT 'country of origination') 
     COMMENT 'This is the staging page view table' 
     ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' LINES TERMINATED BY '\012' 
     STORED AS TEXTFILE 
     LOCATION '<hdfs_location>';

    You can use above statement to create page_view table which points to data location any hdfs location. But you have to make sure that the data is delimited as specified in the query above.

    Inserting Data Into Bucketed Tables

    The CLUSTER BY and SORTED BY creation commands do not effect how data is inserted into a table -- only how it is read. This means that users must actively insert data correctly by specifying the number of reducers to be equal to the number of buckets, and using CLUSTER BY and SORT BY commands in their query.

    There is also an example of creating and populating bucketed tables.

    Drop Table

    DROP TABLE table_name

    DROP TABLE removes metadata and data for this table. The data is actually moved to the .Trash/Current directory if Trash is configured. The metadata is completely lost.

    Drop Partitions

    ALTER TABLE table_name DROP partition_spec, partition_spec,...
     
    partition_spec:
      : PARTITION (partition_col = partition_col_value, partition_col = partiton_col_value, ...)

    You can use ALTER TABLE DROP PARTITION to drop a partition for a table. This removes the data and metadata for this partition. Partition value should be quoted only if it is a string.

    alter table page_view drop partition(dt='2008-08-08', country='us')

    Alter Table Statements

    Alter table statements enable you to change the structure of an existing table. You can add columns, change serde, add table and serde properties, or rename the table itself.

    Rename Table

    ALTER TABLE table_name RENAME TO new_table_name

    This statement lets you change the name of a table to a different name. The location of the data and the names of partitions do not change! In other words, the old table name is not "freed," and writing to it will alter the "renamed" table's data!

    Add/Replace Columns

    ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)

    ADD COLUMNS lets you add new columns to the end of the existing columns but before the partition columns.

    REPLACE COLUMNS removes all existing columns and adds the new set of columns. This can be done only for native tables. Native tables are those tables that are created with DynamicSerDe or MetadataTypedColumnsetSerDe serdes. Refer to SerDe section of User Guide for more information.

    Add Table Properties

    ALTER TABLE table_name SET PROPERTIES table_properties
     
    table_properties:
      : (property_key = property_value, property_key = property_value, ... )

    You can use this statement add your own metadata to the tables. Currently last_modified_user, last_modified_time properties are automatically added and managed by Hive. Users can add their own properties to this list. You can do DESCRIBE EXTENDED TABLE to get this information.

    Add Serde Properties

    ALTER TABLE table_name SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties]
    ALTER TABLE table_name SET SERDEPROPERTIES serde_properties
     
    serde_properties:
      : (property_key = property_value, property_key = property_value, ... )

    This statement enables you to add user defined metadata to table SerDe object. The serde properties are passed to the table's SerDe when it is being initialized by Hive to serialize and deserialize data. So users can store any information required for their custom serde here. Refer to SerDe section of Users Guide for more information.

    Show/Describe Statements

    These statements provide a way to query Hive metastore for existing data and metadata accessible to this Hive system.

    Show Tables

    SHOW TABLES identifier_with_wildcards

    SHOW TABLES lists all the tables matching the given regular expression. Regular expression can contain only '*' for any character[s] or '|' for a choice. Examples are 'page_view', 'page_v*', '*view|page*', all which will match 'page_view' table. Matching tables are listed in alphabetical order. It is not an error if there are no matching tables found in metastore.

    Show Partitions

    SHOW PARTITIONS table_name

    SHOW PARTITIONS lists all the existing partitions for a given table. Partitions are listed in alphabetical order.

    Describe Table/Column

    DESCRIBE [EXTENDED] table_name[DOT col_name]
    DESCRIBE [EXTENDED] table_name[DOT col_name ( [DOT field_name] | [DOT '$elem$'] | [DOT '$key$'] | [DOT '$value$'] )* ]

    DESCRIBE TABLE shows the list of columns including partition column for the given table. If EXTENDED keyword is specified then it will show all the metadata for the table in Thrift serialized form. This is generally only useful for debugging and not for general use.

    If a table has complex column then you can examine the attributes of this column by specifying table_name.complex_col_name (and '$elem$' for array element, '$key$' for map key, and '$value$' for map value). You can specify this recursively to explore the complex column type.

    Describe Partition

    DESCRIBE [EXTENDED] table_name partition_spec

    This statement lists metadata for a given partition. The output is similar to that of DESCRIBE TABLE. Presently, the column information is not used while preparing plans.

    Create Function

    CREATE TEMPORARY FUNCTION function_name AS class_name 

    This statement lets you create a function that is implemented by the class_name. You can use this function in Hive queries as long as the session lasts. You can use any class that is in the class path of Hive. You can add jars to class path by executing 'ADD FILES' statements. Please refer to CLI section in User Guide for more information on how to add/delete files from Hive classpath. Using this you can define User Defined Functions.