[转 ]Hadoop – How to do a secondary sort on values ?

关于在hadoop中,如何让reduce阶段同一个key下的values有序,一篇很好的文章,写的比《Hadoop权威指南》清楚!

转载自:

http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html

The problem at hand here is that you need to work upon a sorted values set in your reducer.

Inputs were of the following form:

udid,datetime,<other-details>

Now, say we have following inputs:

udid1,1970-01-11 23:00:00,<…>

udid1,1970-01-01 23:00:00,<…>

udid1,1970-01-21 23:00:00,<…>

So, when udids are made to be key in the mappers, you would expect following input group in reducer (ignoring the rest of the input details other than datetime for clarity):

<udid1, { 1970-01-11 23:00:00, 1970-01-01 23:00:00,1970-01-21 23:00:00 } >

But we needed to sort the records as per datetime before we might process them. Hence, instead of the above, following is what was required (note that dates are sorted now) :

<udid1, { 1970-01-01 23:00:00, 1970-01-11 23:00:00,1970-01-21 23:00:00 } >

We had a very tough time sorting all the values based on a datetime in the reducer as this needed to be done in-memory and was a great performance killer. Sometimes due to skewed data, i.e. a lot of records for a particular udid, we used to get into Out-Of-Memory issues.

But, not anymore. 🙂

What we did?

We made hadoop to do this job for us. It isn’t very simple and straight forward but I will try to make as much sense as possible.

Hadoop doesn’t sort on values. Period. So, we will have to trick it. For this we will have to make our secondary sort column to be a part of the key of the mapper as well as have to leave it in the value also. Hence, a sample output from mapper would something like:

< [udid1,1970-01-11 23:00:00] , [1970-01-11 23:00:00,<other-details>>

Following four things need to be created/customized:

  1. Map output key class (I have named it CompositeKey)
  2. Partitioner class (ActualKeyPartitioner)
  3. Grouping comparator class (ActualKeyGroupingComparator)
  4. Comparator class (CompositeComparator)

The CompositeKey class is more or less self-explanatory, it would be something like following class:

2013.8.12备注:Key的compareTo方法,应该不是必须的,可以不重写!

Secondly, we need to implement our own partitioner. The reason we need to do so is that now we are emitting both udid and datetime as the key and the defaut partitioner (HashPartitioner) would then not be able to ensure that all the records related to a certain udid comes to the same reducer (partition). Hence, we need to make the partitioner to only consider the actual key part (udid) while deciding on the partition for the record. This can be done as follows:

Now, the partitioner only makes sure that the all records related to the same udid comes to a particular reducer, but it doesn’t guarantee that all of them will come in the same input group (i.e. in a single reduce() call as the list of values). In order to make sure of this, we will need to implement our own grouping comparator. We shall do similar thing as we did for the partitioner, i.e. only look at the actual key (udid) for grouping of reducer inputs. This can be done as follows:

The final thing left is the secondary sorting over datetime field. To achieve this we shall just create our own comparator which first checks for the equality of udids and only if they are equal goes on to check for the datetime field. This shall be implemented as follows and is pretty much self-explanatory:

After creating all these classes, you just need to set this up in your run() as follows:

You are done. You shall get pre-sorted values(sorted over datetime) for each udid in your reduce method.

Leave a Reply

Your email address will not be published.