问题描述
二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0
我们期望的输出结果是
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:
2014-2 64 2015-1 21,4,3,24 2015-2 0,35,-43 2015-3 46,56 2015-4 5
解决方案
针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError
,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key
;
2、我们可以将k和v组合成新的key(可以称为composite key
),也就是((k,v), v)
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
4、自定义分组函数,将k相同的键值对当作一个分组。
文字比较枯燥,我们来看看下面实例:
1、原始数据是
[root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0
我们将年、月组成key(natural key
),总数作为value,结果变成:
(2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) (2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0)
2、将value和key(natural key
)组成新的key(composite key
),如下:
((2015-1,24),24) ((2015-3,56),56) ((2015-1,3),3) ((2015-2,-43),-43) ((2015-4,5),5) ((2015-3,46),46) ((2014-2,64),64) ((2015-1,4),4) ((2015-1,21),21) ((2015-2,35),35) ((2015-2,0),0)
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:
[((2014-2,64),64)] [((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,56),56),((2015-3,46),46)] [((2015-4,5),5)]
4、自定义组排序函数,结果如下:
[((2014-2,64),64)] [((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,46),46),((2015-3,56),56)] [((2015-4,5),5)]
5、自定义分组函数,结果如下:
((2014-2,64),(64)) ((2015-1,24),(3,4,21,24)) ((2015-2,35),(-43,0,35)) ((2015-3,56),(46,56)) ((2015-4,5),(5))
6、最后输出的结果就是我们要的:
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
代码实例
下面将贴出使用MapReduce解决这个问题的代码:
package com.iteblog; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午23:49 * bolg: * 本文地址:/archives/1415.html * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ public class Entry implements WritableComparable<Entry> { private String yearMonth; private int count; public Entry() { } @Override public int compareTo(Entry entry) { int result = this.yearMonth.compareTo(entry.getYearMonth()); if (result == 0) { result = compare(count, entry.getCount()); } return result; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(yearMonth); dataOutput.writeInt(count); } @Override public void readFields(DataInput dataInput) throws IOException { this.yearMonth = dataInput.readUTF(); this.count = dataInput.readInt(); } public String getYearMonth() { return yearMonth; } public void setYearMonth(String yearMonth) { this.yearMonth = yearMonth; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public static int compare(int a, int b) { return a < b ? -1 : (a > b ? 1 : 0); } @Override public String toString() { return yearMonth; } }
上面就是将旧的Key(natural key
)和Value组合成新的Key(composite key
)的代码,接下来看下自定义的分区类:
package com.iteblog; import org.apache.hadoop.mapreduce.Partitioner; public class EntryPartitioner extends Partitioner<Entry, Integer> { @Override public int getPartition(Entry entry, Integer integer, int numberPartitions) { return Math.abs((entry.getYearMonth().hashCode() % numberPartitions)); } }
这个类使得natural key
相同的数据分派到同一个Reduce中。然后看下自定义分组类:
package com.iteblog; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午23:49 * bolg: * 本文地址:/archives/1415.html * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ public class EntryGroupingComparator extends WritableComparator { public EntryGroupingComparator() { super(Entry.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { Entry a1 = (Entry) a; Entry b1 = (Entry) b; return a1.getYearMonth().compareTo(b1.getYearMonth()); } }
只要是natural key
相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类
public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> { private Entry entry = new Entry(); private Text value = new Text(); @Override protected void map(LongWritable key, Text lines, Context context) throws IOException, InterruptedException { String line = lines.toString(); String[] tokens = line.split(","); // YYYY = tokens[0] // MM = tokens[1] // count = tokens[2] String yearMonth = tokens[0] + "-" + tokens[1]; int count = Integer.parseInt(tokens[2]); entry.setYearMonth(yearMonth); entry.setCount(count); value.set(tokens[2]); context.write(entry, value); } }
其实就是解析每一行的数据,然后将旧的Key(natural key
)和Value组合成新的Key(composite key
)。接下来看下Reduce类实现
public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> { @Override protected void reduce(Entry key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); for (Text value : values) { builder.append(value.toString()); builder.append(","); } context.write(key, new Text(builder.toString())); } }
builder存储的就是排序好的Value序列,最后来看看启动程序的使用:
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Iteblog.class); job.setJobName("SecondarySort"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Entry.class); job.setOutputValueClass(Text.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setPartitionerClass(EntryPartitioner.class); job.setGroupingComparatorClass(EntryGroupingComparator.class);
关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:
[root@iteblog.com /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar com.iteblog.Main /iteblog/data.txt /iteblog/output [root@iteblog.com /hadoop]# bin/hadoop fs -cat /iteblog/output/pa* 2014-2 64, 2015-1 3,4,21,24, 2015-2 -43,0,35, 2015-3 46,56, 2015-4 5,本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Hadoop&Spark解决二次排序问题(Hadoop篇)】(https://www.iteblog.com/archives/1415.html)
如果有多个reducer的,如何保证不同分区间是有序的,有可能出现key为6经hash后散列到分区1,而key为5经hash后散列到分区2,是不是有这种情况?
二次排序就是key之间有序,而且每个Key对应的value也是有序的
还是不太懂,我的理解是将原来的key-value作为新的key再自定义比较规则可以实现二次排序,这样在同一reduce分区中是好理解的,但是如果保证经分区函数分到不同分区中的key是相对有序的。。
你可以将上面的程序测试一下呢
如果有多个reduce同时计算像这样的二次排序请问有没有好的方案
上文其实在多个Reduce的情况下也是有效的。当然,这个不是唯一的方案。