我们先来看看aggregate函数的官方文档定义:
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
函数原型
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
实例
/** * User: 过往记忆 * Date: 15-02-12 * Time: 上午08:30 * bolg: * 本文地址:/archives/1268 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ scala> def seqOP(a:Int, b:Int) : Int = { | println("seqOp: " + a + "\t" + b) | math.min(a,b) | } seqOP: (a: Int, b: Int)Int scala> def combOp(a:Int, b:Int): Int = { | println("combOp: " + a + "\t" + b) | a + b | } combOp: (a: Int, b: Int)Int scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2) scala> z. aggregate(3)(seqOP, combOp) seqOp: 3 1 seqOp: 3 4 seqOp: 1 2 seqOp: 3 5 seqOp: 1 3 seqOp: 3 6 combOp: 3 1 combOp: 4 3 res20: Int = 7 scala> def seqOp(a:String, b:String) : String = { | println("seqOp: " + a + "\t" + b) | math.min(a.length , b.length ).toString | } seqOp: (a: String, b: String)String scala> def combOp(a:String, b:String) : String = { | println("combOp: " + a + "\t" + b) | a + b | } combOp: (a: String, b: String)String scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2) scala> z. aggregate ("")(seqOp, combOp) seqOp: 345 seqOp: 12 seqOp: 0 4567 seqOp: 0 23 combOp: 1 combOp: 1 1 res25: String = 11 scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"") ,2) scala> z. aggregate ("")(seqOp, combOp) seqOp: 12 seqOp: 345 seqOp: 0 23 seqOp: 0 combOp: 0 combOp: 0 1 res26: String = 01
注意
1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark函数讲解:aggregate】(https://www.iteblog.com/archives/1268.html)
你好,请问“这里拿zeroValue和分区内的数比了”分区里的数是什么时候取出来的呢?是怎么取出来并加入到seqOp中和zeroValue进行比较的?
哪位能跟我讲讲第一个aggregate到底是在干啥啊。。。
seqOp函数定义为两者取小,combOp是把最后结果合并,但是底下打印的结果为什么是那样的呢。。没看懂。。
seqOp函数对每个分区取最小值,注意:是和zeroValue比较之后的最小值。比如 分区为(4, 5, 6), zeroValue为3. 那么对该分区seqOp,就得到3。
解释一下第一个例子,sc.parallelize(List(1, 2, 3, 4, 5, 6), 2) 获得分区(part_0,List(3, 2, 1)) 和 (part_1,List(6, 5, 4))
对第一个分区seqOp, 则有
seqOp : 3 1 //这里拿zeroValue和分区内的数比了,取小的1再和分区内剩下的数比较。
seqOp : 1 2
seqOp : 1 3 //结束,得到1
对第二个分区seqOp, 则有
seqOp : 3 4 //同理,取3
seqOp : 3 5
seqOp : 3 6 //结束,得到3
最后 comOp
comOp : 3 1 // 得4
comOp : 4 3 // 得7
你好,请问“这里拿zeroValue和分区内的数比了”分区里的数是什么时候取出来的呢?是怎么取出来并加入到seqOp中和zeroValue进行比较的?
很好,谢谢