Skip to content

Spark RDD(三) 键值对RDD相关的转换和动作操作

Last updated on June 27, 2020

本篇文章主要介绍和key/value pair RDD相关的转换(transformation)和动作(action)操作。

键值对RDD和普通RDD不同,在普通RDD中,一行数据代表一个单独的值,比如一个整数或者一个字符串。 而键值对RDD中一行数据是一个元组,在元组中第一个元素是key值,第二个元素值vaule值。 key和value的类型可以是简单类型也可以是负责的object或者另一个元组的结合。

如何创建一个key/value pair RDD呢?

val rdd = sc.parallelize(List("Spark","is","an", "amazing", "piece", "of","technology"))
val pairRDD = rdd.map(w => (w.length,w))
pairRDD.collect().foreach(println)

输出如下:

(5,Spark)
(2,is)
(2,an)
(7,amazing)
(5,piece)
(2,of)
(10,technology)

key/value pair RDD transformation操作介绍

  • groupByKey([numTasks])
  • reduceByKey(func, [numTasks])
  • sortByKey([ascending], [numTasks])
  • join(otherRDD, [numTasks])
  1. groupByKey([numTasks])
    把拥有同样key值的所有value分成一组。对于一个数据集(K,V)对来讲,返回的RDD的结构是(K, Iterable)

例子:

val rdd = sc.parallelize(List("Spark","is","an", "amazing", "piece", "of","technology"))
val pairRDD = rdd.map(w => (w.length,w))
val wordByLenRDD = pairRDD.groupByKey()
wordByLenRDD.collect().foreach(println)

输出:

(10,CompactBuffer(technology))
(2,CompactBuffer(is, an, of))
(5,CompactBuffer(Spark, piece))
(7,CompactBuffer(amazing))
  1. reduceByKey(func, [numTasks])
    这个转换操作把用于同样key的所有value根据func汇成一个单独的值。 整个过程分为两步: 分组和应用reduce 函数。
    内置的转换实现做了一些优化: 它会现在每一个partition内部做分组和应用reduce操作, 然后再跨partition做类似操作,这样一来需要跨partition移动的数据量就大大减少了。

例子:

val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5),("candy1", 2.0), ("candy2", 6.0), ("candy3", 3.0))

val summaryTx = candyTx.reduceByKey((total, value) => total + value)

summaryTx.collect()

输出为:

(candy1,7.2)
(candy2,9.5)
(candy3,3.0)
  1. sortByKey([ascending], [numTasks])

根据行数据的key对所有行进行排序,默认情况下使用升序。如果要使用降序,需要把第一个参数设置成false
例子:

val summaryByPrice = summaryTx.map(t => (t._2, t._1)).sortByKey()
summaryByPrice.collect

输出为:

Array[(Double, String)] = Array((3.0,candy3), (7.2,candy1), (9.5,candy2))

如果要降序则指定一个参数为false

val summaryByPrice = summaryTx.map(t => (t._2, t._1)).sortByKey(false)
summaryByPrice.collect

输出:

(9.5,candy2)
(7.2,candy1)
(3.0,candy3)
  1. join(otherRDD, [numTasks])
    和数据库的join类似。 一个数据集(K, V)和另一个数据集(K, W)的join结果为(K, (V,W)).

例子:

val memberTx = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),
                                    (105, 6.0),(165, 31.0), (110, 40.11)))
val memberInfo = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"), (105, "a"),(165, "c")))
val memberTxInfo = memberTx.join(memberInfo)
memberTxInfo.collect().foreach(println)

输出为:

(105,(6.0,a))
(165,(31.0,c))
(110,(50.35,a))
(110,(40.11,a))
(126,(211.0,b))
(127,(305.2,b))

key/value pair RDD action操作介绍

  • countByKey()
  • collectAsMap()
  • lookup(key)
  1. countByKey()

汇总RDD中每一个key对应多少个行数据。
例子:

val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.countByKey()

输出:

scala.collection.Map[String,Long] = Map(candy1 -> 2, candy2 -> 1, candy3 -> 1)
  1. collectAsMap()

collect类似收集数据作为一个map到driver程序,每一项代表一行数据。

例子:

val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.collectAsMap()

输出:

scala.collection.Map[String,Double] = Map(candy2 -> 3.5, candy1 -> 2.0, candy3 -> 6.0)

从结果中可以看出两个 candy1的键结果却只包含一个candy1。这是因为如果多行数据拥有同一个键值,结果会把覆盖成一个项目。

  1. lookup(key)

这个action用来快速校验一个特殊的key是否在RDD中存在。

val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.lookup("candy1")
candyTx.lookup("candy2")
candyTx.lookup("candy5")

输出为:

Seq[Double] = WrappedArray(5.2, 2.0)
Seq[Double] = WrappedArray(3.5)
Seq[Double] = WrappedArray()

如果多行数据拥有同样的key,对应行的所有value都会被返回.

Published inSpark大数据

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Author Copyriht by BackendSite