Tuesday, April 24, 2012

Hadoop Example: Using a custom Java class as Key and Group Comparator

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyKey.java : CUSTOM KEY CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MyKey implements WritableComparable<MyKey> {
    String timestamp = null;
    String vrid = null;

    public MyKey() {
    }

    public void set(String timestamp, String vrid) {
        this.timestamp = timestamp;
        this.vrid = vrid;

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void write(DataOutput daout) throws IOException {
        daout.writeUTF(vrid);
        daout.writeUTF(timestamp);

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void readFields(DataInput dain) throws IOException {
        vrid = dain.readUTF();
        timestamp = dain.readUTF();
    }

    //////////////////////////////////////////// Implement Comparable
    @Override
    public int compareTo(MyKey ob) {
        int cmp = this.vrid.compareTo(ob.vrid);
        if (cmp != 0) {
            return cmp;
        }
        return this.timestamp.compareTo(ob.timestamp);
    }

    @Override
    public boolean equals(Object ob) {
        if (ob == null || this.getClass() != ob.getClass()) return false;

        MyKey k = (MyKey) ob;
        if (k.timestamp != null && this.timestamp != null && !k.timestamp.equals(this.timestamp)) return false;
        if (k.vrid != null && this.vrid != null && !k.vrid.equals(this.vrid)) return false;
        return true;
    }

    @Override
    public int hashCode() {
        int result = vrid != null ? vrid.hashCode() : 0;
        return 31 * result;

    }

    @Override
    public String toString() {
        return vrid + "\u0002" + timestamp;
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyGroupComparator.java : CUSTOM KEY CLASS GROUP COMPARATOR
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

class MyGroupComparator extends WritableComparator {

    protected MyGroupComparator(){
        super(MyKey.class,true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2){
        MyKey k1 = (MyKey) w1;
        MyKey k2 = (MyKey) w2;
        return k1.vrid.compareTo(k2.vrid);
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMapper.java : MAPPER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {

    private MyKey ok = new MyKey();
    private Text ov = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] valarr = value.toString().split("\u0002", -1);
        if (valarr.length > 3) {
            ok.set(valarr[0], valarr[1]);
            context.write(ok, new Text(valarr[2]));
        } else {
            context.getCounter("AMAN'S COUNTERS","!!! Bad Array !!!").increment(1);
        }
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyReducer.java : REDUCER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReducer extends Reducer<MyKey, Text, MyKey, Text> {

    private MyKey ok = new MyKey();

    @Override
    public void reduce(MyKey key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMr.java : RUN JOB
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option;

public class MyMr extends Configured implements Tool {
    @Option(name = "--input", required = true)
    String inputdir;

    @Option(name = "--output", required = true)
    String outputdir;

    @Option(name = "--reducers")
    int reducers;

    public int run(String[] arg) throws Exception {

        Configuration conf = super.getConf();
        conf.set("mapred.input.dir", inputdir);
        conf.set("mapred.output.dir", outputdir);

        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);

        conf.set("mapreduce.map.class", "com.amandoon.HadoopExample.MyMapper");
        conf.set("mapreduce.reduce.class", "com.amandoon.HadoopExample.MyReducer");

        conf.set("mapreduce.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
        conf.set("mapreduce.outputformat.class", "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat");

        conf.setInt("mapred.reduce.tasks", reducers);

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        conf.set("mapred.output.key.class", "com.amandoon.HadoopExample.MyKey");
        conf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");

        conf.set("mapred.job.name", "CustomKey");
        conf.set("mapred.output.value.groupfn.class", "com.amandoon.HadoopExample.MyGroupComparator");

        Job job = new Job(conf);
        job.setJarByClass(com.amandoon.HadoopExample.MyMr.class);

        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        MyMr fc = new MyMr();
        new CmdLineParser(fc).parseArgument(args);
        ToolRunner.run(fc, args);
    }
}

Which hadoop property needs to be set to change the key GroupComparator

If you have a custom key GroupComparator defined for your MR job then you can set it in your hadoop config file as

<property>
<name>mapred.output.value.groupfn.class</name>
<value>com.amandoon.CustomKey.MyGroupComparator</value>
</property>


OR you can set it for a job using the Job object
jobObject.setGroupingComparatorClass(MyGroupComparator.class)

See hadoop example that uses Custom Key GroupComparator
http://hadoop-blog.blogspot.com/2012/04/hadoop-example-using-custom-java-class.html

Tips & techniques to write data intensive text processing task in Hadoop

Stumbled upon a good book by Jimmy Lin and Chris Dyer (University of Maryland, College Park). The book discusses various techniques used to write data intensive alogorithms using Hadoop Map Reduce.

https://docs.google.com/viewer?a=v&pid=sites&srcid=ZGVmYXVsdGRvbWFpbnxoYWRvb3BhbmRoaXZlfGd4OjZjNmEzMzlhMzlhNjdkMWQ&pli=1

Got a fairly good starting point on how to approach graph algorithms using Map Reduce.

Wednesday, January 25, 2012

How to compress the interim output of Mappers and final output of a Map Reduce Jon in Hadoop

I was testing different ways to improve performance of hadoop jobs and was testing how compression helps.

There are 2 places where you can configure a hadoop job to use compression

1. Compress the intermediate output of the mapper
To do this for all jobs you can set it in mapred-site.xml by adding the following properties
<property>
    <name> mapreduce.map.output.compress </name
    <value>true</value
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

I am compressing using GzipCodec but you have the option to use any of the following
- GzipCodec
- DeflateCodec
- BZip2Codec
- SnappyCodec
Each of these have their strengths and weaknesses, choose what you can live with. Also not that due to some licensing differences, LZO does not ships with Hadoop. You can install it separately and use it if you'd like.

For just your job you can set it in the Configuration object
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.output.compress", true);
conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.GzipCodec");


2. Compress the final output of the job
To save the final output in gzip, run your M/R job with following code

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);