Sunday, January 17, 2016

How to speed up count distinct queries in Hive

Have you ever wondered why a count distinct query on a high cardinality column in Hive is slow.

Lets say we have a table that contains list of all products

Table: Products
Columns:
  1. product_id (this column and product_id together are unique)
  2. product_name
  3. price
  4.  available_in_country (this column and product_id together are unique)

Let's say this table contains 1 billion rows and around 200 million unique product_id's.

You need to find distinct number of products in the table.
Most people will write query like
select count(distinct product_id) from products

This query will start a map reduce job with just 1 reducer and the reducer will take a long time to finish. This is because all 1 billion rows will go to just one reducer and the reducer will have go through 1 billion rows and maintain distinct id's in memory.

Instead you should write query like
select count(*) from (select distinct product_id from products)T

This query will very run very fast because the sub query will use multiple reducers. The outer query will then just have to count the number of row and that is normally done in map with reducer just summing up the values of the map and hence it will be really fast. 

Thursday, January 14, 2016

How to do UNION ALL in hive

Suppose you have two table 
Boys
- Name
- Age

Girls
- Name
- Age

Now lets say we want to a query that returns all Boys and Girls. Most of us who come from Oracle/SqlServer/MySql etc world will write following query 

select name, age from Boys 
union all 
select name, age from Girls

This query will throw an error in Hive and it will look like UNION ALL does not works in Hive. UNION ALL does works in Hive, the only trick is to enclose the UNION ALL tables in a subquery, i.e. write the above as 

select name, age from (
     select name, age from Boys 
     union all 
     select name, age from Girls
) BoysAndGirls

Tuesday, April 24, 2012

Hadoop Example: Using a custom Java class as Key and Group Comparator

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyKey.java : CUSTOM KEY CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MyKey implements WritableComparable<MyKey> {
    String timestamp = null;
    String vrid = null;

    public MyKey() {
    }

    public void set(String timestamp, String vrid) {
        this.timestamp = timestamp;
        this.vrid = vrid;

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void write(DataOutput daout) throws IOException {
        daout.writeUTF(vrid);
        daout.writeUTF(timestamp);

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void readFields(DataInput dain) throws IOException {
        vrid = dain.readUTF();
        timestamp = dain.readUTF();
    }

    //////////////////////////////////////////// Implement Comparable
    @Override
    public int compareTo(MyKey ob) {
        int cmp = this.vrid.compareTo(ob.vrid);
        if (cmp != 0) {
            return cmp;
        }
        return this.timestamp.compareTo(ob.timestamp);
    }

    @Override
    public boolean equals(Object ob) {
        if (ob == null || this.getClass() != ob.getClass()) return false;

        MyKey k = (MyKey) ob;
        if (k.timestamp != null && this.timestamp != null && !k.timestamp.equals(this.timestamp)) return false;
        if (k.vrid != null && this.vrid != null && !k.vrid.equals(this.vrid)) return false;
        return true;
    }

    @Override
    public int hashCode() {
        int result = vrid != null ? vrid.hashCode() : 0;
        return 31 * result;

    }

    @Override
    public String toString() {
        return vrid + "\u0002" + timestamp;
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyGroupComparator.java : CUSTOM KEY CLASS GROUP COMPARATOR
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

class MyGroupComparator extends WritableComparator {

    protected MyGroupComparator(){
        super(MyKey.class,true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2){
        MyKey k1 = (MyKey) w1;
        MyKey k2 = (MyKey) w2;
        return k1.vrid.compareTo(k2.vrid);
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMapper.java : MAPPER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {

    private MyKey ok = new MyKey();
    private Text ov = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] valarr = value.toString().split("\u0002", -1);
        if (valarr.length > 3) {
            ok.set(valarr[0], valarr[1]);
            context.write(ok, new Text(valarr[2]));
        } else {
            context.getCounter("AMAN'S COUNTERS","!!! Bad Array !!!").increment(1);
        }
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyReducer.java : REDUCER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReducer extends Reducer<MyKey, Text, MyKey, Text> {

    private MyKey ok = new MyKey();

    @Override
    public void reduce(MyKey key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMr.java : RUN JOB
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option;

public class MyMr extends Configured implements Tool {
    @Option(name = "--input", required = true)
    String inputdir;

    @Option(name = "--output", required = true)
    String outputdir;

    @Option(name = "--reducers")
    int reducers;

    public int run(String[] arg) throws Exception {

        Configuration conf = super.getConf();
        conf.set("mapred.input.dir", inputdir);
        conf.set("mapred.output.dir", outputdir);

        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);

        conf.set("mapreduce.map.class", "com.amandoon.HadoopExample.MyMapper");
        conf.set("mapreduce.reduce.class", "com.amandoon.HadoopExample.MyReducer");

        conf.set("mapreduce.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
        conf.set("mapreduce.outputformat.class", "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat");

        conf.setInt("mapred.reduce.tasks", reducers);

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        conf.set("mapred.output.key.class", "com.amandoon.HadoopExample.MyKey");
        conf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");

        conf.set("mapred.job.name", "CustomKey");
        conf.set("mapred.output.value.groupfn.class", "com.amandoon.HadoopExample.MyGroupComparator");

        Job job = new Job(conf);
        job.setJarByClass(com.amandoon.HadoopExample.MyMr.class);

        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        MyMr fc = new MyMr();
        new CmdLineParser(fc).parseArgument(args);
        ToolRunner.run(fc, args);
    }
}

Which hadoop property needs to be set to change the key GroupComparator

If you have a custom key GroupComparator defined for your MR job then you can set it in your hadoop config file as

<property>
<name>mapred.output.value.groupfn.class</name>
<value>com.amandoon.CustomKey.MyGroupComparator</value>
</property>


OR you can set it for a job using the Job object
jobObject.setGroupingComparatorClass(MyGroupComparator.class)

See hadoop example that uses Custom Key GroupComparator
http://hadoop-blog.blogspot.com/2012/04/hadoop-example-using-custom-java-class.html

Tips & techniques to write data intensive text processing task in Hadoop

Stumbled upon a good book by Jimmy Lin and Chris Dyer (University of Maryland, College Park). The book discusses various techniques used to write data intensive alogorithms using Hadoop Map Reduce.

https://docs.google.com/viewer?a=v&pid=sites&srcid=ZGVmYXVsdGRvbWFpbnxoYWRvb3BhbmRoaXZlfGd4OjZjNmEzMzlhMzlhNjdkMWQ&pli=1

Got a fairly good starting point on how to approach graph algorithms using Map Reduce.

Wednesday, January 25, 2012

How to compress the interim output of Mappers and final output of a Map Reduce Jon in Hadoop

I was testing different ways to improve performance of hadoop jobs and was testing how compression helps.

There are 2 places where you can configure a hadoop job to use compression

1. Compress the intermediate output of the mapper
To do this for all jobs you can set it in mapred-site.xml by adding the following properties
<property>
    <name> mapreduce.map.output.compress </name
    <value>true</value
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

I am compressing using GzipCodec but you have the option to use any of the following
- GzipCodec
- DeflateCodec
- BZip2Codec
- SnappyCodec
Each of these have their strengths and weaknesses, choose what you can live with. Also not that due to some licensing differences, LZO does not ships with Hadoop. You can install it separately and use it if you'd like.

For just your job you can set it in the Configuration object
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.output.compress", true);
conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.GzipCodec");


2. Compress the final output of the job
To save the final output in gzip, run your M/R job with following code

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

Friday, December 9, 2011

Hadoop Cluster size deployed by CBS Interactive

Following are some details of the Hadoop Production Infrastructure deployed at CBS Interactive as of now
- Total Nodes: 80
- Total Disk Capacity: 1 PB
- ETL apps were written using Hadoop Streaming (Phython)

Friday, March 18, 2011

How, in hive, to create a column name that is same as a reserved keyword used by Hive

If you run the follwoing command in hive, it will fail because sort is a reserved keyword in hive

CREATE EXTERNAL TABLE aaaabc ( sort STRING  )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\002'
LINES TERMINATED BY '\n';

FAILED: Parse Error: line 1:31 mismatched input 'sort' expecting Identifier in column specification


To ovecome this, run the command by putting sort in backticks, i.e.
CREATE EXTERNAL TABLE aaaabc ( `sort` STRING  )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\002'
LINES TERMINATED BY '\n';

Wednesday, January 26, 2011

Hive Error: FAILED: Parse Error: line 54:4 mismatched input expecting Identifier in column specification

PROBLEM
I was trying to create an external table in hive using the following command
CREATE EXTERNAL TABLE tetl_fact_r
(
    custid STRING,
    value STRING,
    ph STRING,
    email STRING,
    sort STRING,
    address STRING,
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\002'
LINES TERMINATED BY '\n'

STORED AS TEXTFILE
 LOCATION '/user/hadoop-blog/abc.text';


and got the following error

FAILED: Parse Error: line <linenum>:4 mismatched input <column name> expecting Identifier in column specification

REASON
From https://issues.cloudera.org/browse/SQOOP-37, I found that this is happening because column name sort is a reserved keyword in hive.

SOLUTION
To be honest, I din't spend much time finding the solution, instead I just renamed my field and it worked after that. Hence the new query looked like
CREATE EXTERNAL TABLE tetl_fact_r
(
    custid STRING,
    value STRING,
    ph STRING,
    email STRING,
    sorttype STRING,
    address STRING,
)


If any of you figure out a way to make this query work without renaming the column then please leave a comment. It will be much appreciated.

Saturday, December 25, 2010

How to dynamically assign reducers to a Hadoop Job in runtime

When we were working on setting our production jobs we came to a point where we needed a way to dynamically assign reducers to a Hadoop Job in runtime. Following is what we have implemented as of today.  If you are reading the blog and have a better idea to share then please leave a comment.
PROBLEM STATEMENT
We have a lot of cases where same produciton job works on different sizes of data sets, i.e. for an hour the job can process from 35GB to 400Gb data. We wanted to change the number of reducers depending on the data sets, in runtime, and also we did not want a job to hog all reducers because the grid was shared between jobs from different teams. All jobs were equally important.
SOLUTION
This is something that hadoop MR framework cannot do for you. We found that hive came up with a solution by providing a property that limits the maximum number of bytes that will be processed by one reducer hence if this property is set to 1GB and data size is 1.2 GB data then 2 reducers will be assigned to the job in runtime; if data size 10 GB data then 10 reducers will be assigned to the job in runtime.  This works great for hive because it is designed to work agnostic of the dataset. Also a big job in hive can still take all the reducers in the grid. Since we knew our data very well and also did not want a single job to take all reducers hence we decided to implement our own solution that solved our problem.
The solution is very simple. We asked all the job owners to run their jobs in the sandbox on a fixed set of data and provide us either of the following
1.       Required: Input size of the data in Mb
2.       Required: Output size of the data in Mb
3.       Required: Is the reducer calculation CPU bound or I/O bound
4.       Optional: A decimal number, the multiplier,  to fine tune the number of reducers (if not provided then 1 will be used)

OR

1.       Required: Provide fixed number of reducers (they have to be less than TotalReducersOnTheGrid/2)

For CPU bound jobs, total number of reducers were calculated as
Total Reducers = Minimum((InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
i.e. total number of reducers should be equal to either the Input data size divided by the HDFS block size multiplied by the multiplier or half the total number of reducers available in the grid, whichever is smaller.

For I/O bound jobs the total number of reducers were calculated using the following formula
Total Reducers = Minimum((OutputDataSize/InputDataSize) * (InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
The concept of multiplier was introduced to optimize the jobs when the generic formula was not enough to optimize the number of reducers for the job. We found that some jobs always required an exact number of reducers regardless of the size of data set hence we also provided the job owners a way to specify that.
This pretty much solved most of our problems.