Action操作会真正触发转换逻辑(转换操作参见上一篇Spark RDD(一))。本文主要介绍常见的Action操作。
常见Action操作有以下9种:
- collect()
- count()
- first()
- take(n)
- reduce(func)
- takeSample(withReplacement, n, [seed])
- takeOrdered(n, [ordering])
- top(n, [ordering])
- saveAsTextFile(path)
collect()
操作
collect()
会从RDD的每一个partition中手机所有行数据并发送给driver程序。要注意不要对太多数据执行此操作,否则driver程序会报OOM。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.collect()
输出为:
Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
count()
操作
count()
操作通过获取每一个partition中的行数并求和来获得一个RDD的行数。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.count()
输出为:
Long = 10
first()
操作
first()
操作返回RDD中的第一行数据,通常是第一个partition的第一行数据。 当对一个空的RDD执行此操作时会包抛出异常。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.first()
输出:
Int = 1
take(n)
操作
take(n)
操作返回RDD中的前n行数据。当第一个partition中的数据行数小于n时,它会收集下一个partition中的数据,直到收集到n行数据或最后一个partition。 如果n超过了数据集中的所有行数,它会返回所有数据。此外 take(1)
等价于first()
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.take(6)
输出为:
Array[Int] = Array(1, 2, 3, 4, 5, 6)
reduce(func)
操作
reduce(func)
操作和其他action操作不同。reduce操作通过对数据集中的每一行数据执行func
提供的操作而生成一个单独的数值。最常见的一个例子是对数据集中的整数求和。
对于func
函数来讲需要满足两点要求:
- 这个函数必须是一个二元函数,即接受两个同样类型的参数作为输入,产生一个同样类型的输出。
- 这个函数必须满足交换律和结合律。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
//接下来定义一个func应用到reduce操作
def add(v1:Int, v2:Int) : Int = {
println(s"v1: $v1, v2: $v2 => (${v1 + v2})")
v1 + v2
}
//执行reduce
numberRDD.reduce(add)
输出为:
v1: 1, v2: 2 => (3)
v1: 6, v2: 7 => (13)
v1: 3, v2: 3 => (6)
v1: 13, v2: 8 => (21)
v1: 6, v2: 4 => (10)
v1: 10, v2: 5 => (15)
v1: 21, v2: 9 => (30)
v1: 30, v2: 10 => (40)
v1: 15, v2: 40 => (55)
res62: Int = 55
takeSample(withReplacement, n, [seed])
操作
这个action和之前介绍的一个叫sample
的转换(transformation)的操作一样。最主要的区别是takeSample
这个会把包含样本行数据的一个数组返回给driver程序。 执行这个时和collect
一样需要考虑数据量的问题,以免OOM。
takeOrdered(n, [ordering])
操作
这个action以一个确定的顺序返回n行数据。 默认的顺序是自然序,比如如果每行数据是一个整数的话,默认是升序。 如果需要以降序的顺序返回,则需要明确指定。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.takeOrdered(4)
numberRDD.takeOrdered(4)(Ordering[Int].reverse)
输出为:
Array[Int] = Array(1, 2, 3, 4)
Array[Int] = Array(10, 9, 8, 7)
top(n, [ordering])
操作
这个action用来找出一个RDD中top k(largest)的行数据。
也就是说默认情况下是找出最大的top k. 所以默认情况下,此action的结果和takeOrdered
的结果是相反的,因为默认的takeOrdered
是升序。
例子:
val numberRDD = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.top(4)
输出:
Array[Int] = Array(10, 9, 8, 7)
saveAsTextFile(path)
操作
不像之前的action,这个action不向driver程序返回任何数据,而是把对应的RDD行数据以string的形式写入到特定的path中。
这个action会把每一个partition的数据写入到一个单独的文件中,即有几个partition就会有几个文件。另外需要注意的是这个action需要的是一个path 名称而不是一个文件名。 如果一个path名已经存在,为了避免覆盖数据这个操作会失败。
Be First to Comment