文章目录
《Spark Python API函数学习:pyspark API(1)》
《Spark Python API函数学习:pyspark API(2)》
《Spark Python API函数学习:pyspark API(3)》
《Spark Python API函数学习:pyspark API(4)》
《Spark Python API函数学习:pyspark API(2)》
《Spark Python API函数学习:pyspark API(3)》
《Spark Python API函数学习:pyspark API(4)》
Spark支持Scala、Java以及Python语言,本文将通过图片和简单例子来学习pyspark API。
countByKey
# countByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) y = x.countByKey() print(x.collect()) print(y) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] defaultdict(<type 'int'>, {'A': 3, 'B': 2})
join
# join x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.join(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]
leftOuterJoin
# leftOuterJoin x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.leftOuterJoin(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]
rightOuterJoin
# rightOuterJoin x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.rightOuterJoin(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7)), ('D', (None, 5))]
partitionBy
# partitionBy x = sc.parallelize([(0,1),(1,2),(2,3)],2) y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc print(x.glom().collect()) print(y.glom().collect()) [[(0, 1)], [(1, 2), (2, 3)]] [[(0, 1)], [(1, 2)], [(2, 3)]]
combineByKey
# combineByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) createCombiner = (lambda el: [(el,el**2)]) mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2 y = x.combineByKey(createCombiner,mergeVal,mergeComb) print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
aggregateByKey
# aggregateByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = [] # empty list is 'zero value' for append operation mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) mergeComb = (lambda agg1,agg2: agg1 + agg2 ) y = x.aggregateByKey(zeroValue,mergeVal,mergeComb) print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
foldByKey
# foldByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = 1 # one is 'zero value' for multiplication y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', 60), ('B', 2)]
groupByKey
# groupByKey x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)]) y = x.groupByKey() print(x.collect()) print([(j[0],[i for i in j[1]]) for j in y.collect()]) [('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)] [('A', [3, 2, 1]), ('B', [5, 4])]
flatMapValues
# flatMapValues x = sc.parallelize([('A',(1,2,3)),('B',(4,5))]) y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened print(x.collect()) print(y.collect()) [('A', (1, 2, 3)), ('B', (4, 5))] [('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]
mapValues
# mapValues x = sc.parallelize([('A',(1,2,3)),('B',(4,5))]) y = x.mapValues(lambda x: [i**2 for i in x]) # function is applied to entire value print(x.collect()) print(y.collect()) [('A', (1, 2, 3)), ('B', (4, 5))] [('A', [1, 4, 9]), ('B', [16, 25])]
groupWith
# groupWith x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))]) z = sc.parallelize([('D',9),('B',(8,8))]) a = x.groupWith(y,z) print(x.collect()) print(y.collect()) print(z.collect()) print("Result:") for key,val in list(a.collect()): print(key, [list(i) for i in val]) [('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('B', (7, 7)), ('A', 6), ('D', (5, 5))] [('D', 9), ('B', (8, 8))] Result: D [[], [(5, 5)], [9]] C [[4], [], []] B [[(3, 3)], [(7, 7)], [(8, 8)]] A [[2, (1, 1)], [6], []]
cogroup
# cogroup x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))]) z = x.cogroup(y) print(x.collect()) print(y.collect()) for key,val in list(z.collect()): print(key, [list(i) for i in val]) [('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))] A [[2, (1, 1)], [8, 6]] C [[4], []] B [[(3, 3)], [7]] D [[], [(5, 5)]]
sampleByKey
# sampleByKey x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)]) y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2}) print(x.collect()) print(y.collect()) [('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)] [('B', 2), ('C', 3), ('B', 4)]
subtractByKey
# subtractByKey x = sc.parallelize([('C',1),('B',2),('A',3),('A',4)]) y = sc.parallelize([('A',5),('D',6),('A',7),('D',8)]) z = x.subtractByKey(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 1), ('B', 2), ('A', 3), ('A', 4)] [('A', 5), ('D', 6), ('A', 7), ('D', 8)] [('C', 1), ('B', 2)]
subtract
# subtract x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('C',8),('A',2),('D',1)]) z = x.subtract(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('C', 8), ('A', 2), ('D', 1)] [('A', 1), ('C', 4), ('B', 3)]
keyBy
# keyBy x = sc.parallelize([1,2,3]) y = x.keyBy(lambda x: x**2) print(x.collect()) print(y.collect()) [1, 2, 3] [(1, 1), (4, 2), (9, 3)]
repartition
# repartition x = sc.parallelize([1,2,3,4,5],2) y = x.repartition(numPartitions=3) print(x.glom().collect()) print(y.glom().collect()) [[1, 2], [3, 4, 5]] [[], [1, 2, 3, 4], [5]]
coalesce
# coalesce x = sc.parallelize([1,2,3,4,5],2) y = x.coalesce(numPartitions=1) print(x.glom().collect()) print(y.glom().collect()) [[1, 2], [3, 4, 5]] [[1, 2, 3, 4, 5]]
zip
# zip x = sc.parallelize(['B','A','A']) # zip expects x and y to have same #partitions and #elements/partition y = x.map(lambda x: ord(x)) z = x.zip(y) print(x.collect()) print(y.collect()) print(z.collect()) ['B', 'A', 'A'] [66, 65, 65] [('B', 66), ('A', 65), ('A', 65)]
zipWithIndex
# zipWithIndex x = sc.parallelize(['B','A','A'],2) y = x.zipWithIndex() print(x.glom().collect()) print(y.collect()) [['B'], ['A', 'A']] [('B', 0), ('A', 1), ('A', 2)]
zipWithUniqueId
# zipWithUniqueId x = sc.parallelize(['B','A','A'],2) y = x.zipWithUniqueId() print(x.glom().collect()) print(y.collect()) [['B'], ['A', 'A']] [('B', 0), ('A', 1), ('A', 3)]
PDF版下载
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Python API函数学习:pyspark API(4)】(https://www.iteblog.com/archives/1400.html)