Skip to content

Spark RDD(二) RDD的相关行为(Action)操作

Action操作会真正触发转换逻辑(转换操作参见上一篇Spark RDD(一))。本文主要介绍常见的Action操作。

常见Action操作有以下9种:

  • collect()
  • count()
  • first()
  • take(n)
  • reduce(func)
  • takeSample(withReplacement, n, [seed])
  • takeOrdered(n, [ordering])
  • top(n, [ordering])
  • saveAsTextFile(path)
  1. collect() 操作

collect() 会从RDD的每一个partition中手机所有行数据并发送给driver程序。要注意不要对太多数据执行此操作,否则driver程序会报OOM。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.collect()

输出为:

Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  1. count()操作

count() 操作通过获取每一个partition中的行数并求和来获得一个RDD的行数。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)

numberRDD.count()

输出为:

Long = 10
  1. first() 操作

first() 操作返回RDD中的第一行数据,通常是第一个partition的第一行数据。 当对一个空的RDD执行此操作时会包抛出异常。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)

numberRDD.first()

输出:

Int = 1
  1. take(n) 操作

take(n) 操作返回RDD中的前n行数据。当第一个partition中的数据行数小于n时,它会收集下一个partition中的数据,直到收集到n行数据或最后一个partition。 如果n超过了数据集中的所有行数,它会返回所有数据。此外 take(1) 等价于first()

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.take(6)

输出为:

Array[Int] = Array(1, 2, 3, 4, 5, 6)
  1. reduce(func) 操作

reduce(func)操作和其他action操作不同。reduce操作通过对数据集中的每一行数据执行func提供的操作而生成一个单独的数值。最常见的一个例子是对数据集中的整数求和。
对于func 函数来讲需要满足两点要求:

  • 这个函数必须是一个二元函数,即接受两个同样类型的参数作为输入,产生一个同样类型的输出。
  • 这个函数必须满足交换律和结合律。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)

//接下来定义一个func应用到reduce操作

def add(v1:Int, v2:Int) : Int = {
      println(s"v1: $v1, v2: $v2 => (${v1 + v2})")
      v1 + v2
}

//执行reduce

numberRDD.reduce(add)

输出为:

v1: 1, v2: 2 => (3)
v1: 6, v2: 7 => (13)
v1: 3, v2: 3 => (6)
v1: 13, v2: 8 => (21)
v1: 6, v2: 4 => (10)
v1: 10, v2: 5 => (15)
v1: 21, v2: 9 => (30)
v1: 30, v2: 10 => (40)
v1: 15, v2: 40 => (55)
res62: Int = 55
  1. takeSample(withReplacement, n, [seed]) 操作

这个action和之前介绍的一个叫sample的转换(transformation)的操作一样。最主要的区别是takeSample这个会把包含样本行数据的一个数组返回给driver程序。 执行这个时和collect一样需要考虑数据量的问题,以免OOM。

  1. takeOrdered(n, [ordering]) 操作

这个action以一个确定的顺序返回n行数据。 默认的顺序是自然序,比如如果每行数据是一个整数的话,默认是升序。 如果需要以降序的顺序返回,则需要明确指定。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.takeOrdered(4)
numberRDD.takeOrdered(4)(Ordering[Int].reverse)

输出为:

Array[Int] = Array(1, 2, 3, 4)
Array[Int] = Array(10, 9, 8, 7)
  1. top(n, [ordering]) 操作

这个action用来找出一个RDD中top k(largest)的行数据。
也就是说默认情况下是找出最大的top k. 所以默认情况下,此action的结果和takeOrdered的结果是相反的,因为默认的takeOrdered是升序。

例子:

val numberRDD =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
numberRDD.top(4)

输出:

Array[Int] = Array(10, 9, 8, 7)
  1. saveAsTextFile(path) 操作

不像之前的action,这个action不向driver程序返回任何数据,而是把对应的RDD行数据以string的形式写入到特定的path中。
这个action会把每一个partition的数据写入到一个单独的文件中,即有几个partition就会有几个文件。另外需要注意的是这个action需要的是一个path 名称而不是一个文件名。 如果一个path名已经存在,为了避免覆盖数据这个操作会失败。

Published inSpark大数据

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Author Copyriht by BackendSite