该函数和aggregate类似,但操作的RDD是Pair类型的。Spark 1.1.0版本才正式引入该函数。官方文档定义:
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
函数原型
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。
第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。
最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。
实例
/** * User: 过往记忆 * Date: 15-03-02 * Time: 上午06:30 * bolg: * 本文地址:/archives/1261 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3))) scala> def seq(a:Int, b:Int) : Int ={ | println("seq: " + a + "\t " + b) | math.max(a,b) | } seq: (a: Int, b: Int)Int scala> def comb(a:Int, b:Int) : Int ={ | println("comb: " + a + "\t " + b) | a + b | } comb: (a: Int, b: Int)Int scala> data.aggregateByKey(1)(seq, comb).collect seq: 1 3 seq: 1 2 seq: 1 4 seq: 1 3 comb: 3 2 comb: 5 4 res62: Array[(Int, Int)] = Array((1,9), (2,3))
注意
细心的读者肯定发现了aggregateByKey和aggregate结果有点不一样。如果用aggregate函数对含有3、2、4三个元素的RDD进行计算,初始值为1的时候,计算的结果应该是10,而这里是9,这是因为aggregate函数中的初始值需要和reduce函数以及combine函数结合计算,而aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark函数讲解:aggregateByKey】(https://www.iteblog.com/archives/1261.html)
就改了这点var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)),2),不是要跨分区合并的么
为什么我跑出来的是((1,7),(2,3))