我在《Hadoop&Spark解决二次排序问题(Hadoop篇)》文章中介绍了如何在Hadoop中实现二次排序问题,今天我将介绍如何在Spark中实现。
问题描述
二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0
我们期望的输出结果是
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
解决方案
在Spark中解决这种问题相对Hadoop来说是非常简单的,我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey
函数将同一个Key的所有Value全部弄到一起,然后对同一个Key的所有Value进行排序即可。
代码实例
为了简便,我直接在Spark-shell中解决这个问题,首先我们启动一个Spark-shell:
bin/spark-shell --master yarn-client --executor-memory 1g --num-executors 2 --queue iteblog --executor-cores 1 Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala>
在里面输入以下代码即可:
scala> val file = sc.textFile("/tmp/data.txt") file: org.apache.spark.rdd.RDD[String] = /tmp/data.txt MapPartitionsRDD[1] at textFile at <console>:27 scala> val data = file.map(_.split(",")).map(item => (s"${item(0)}-${item(1)}", item(2))) data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:29 scala> data.collect().foreach(println) (2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) (2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0) scala> val rdd = data.groupByKey rdd: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[5] at groupByKey at <console>:31 scala> rdd.collect().foreach(println) (2014-2,CompactBuffer(64)) (2015-1,CompactBuffer(24, 3, 4, 21)) (2015-2,CompactBuffer(35, 0, -43)) (2015-3,CompactBuffer(56, 46)) (2015-4,CompactBuffer(5)) scala> val result = rdd.map(item => (item._1, item._2.toList.sortWith(_.toInt<_.toInt))) result: org.apache.spark.rdd.RDD[(String, List[String])] = MapPartitionsRDD[20] at map at <console>:33 scala> result.collect.foreach(item => println(s"${item._1}\t${item._2.mkString(",")}")) 2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
可以看出,使用Spark来解决这个问题非常地简单。上面的CompactBuffer
实现了Scala中的Seq
类,所以可以直接转换成List或者Array,然后直接使用Scala中的排序即可。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Hadoop&Spark解决二次排序问题(Spark篇)】(https://www.iteblog.com/archives/1819.html)