Last updated on June 27, 2020
本篇文章主要介绍和key/value pair RDD相关的转换(transformation)和动作(action)操作。
键值对RDD和普通RDD不同,在普通RDD中,一行数据代表一个单独的值,比如一个整数或者一个字符串。 而键值对RDD中一行数据是一个元组,在元组中第一个元素是key值,第二个元素值vaule值。 key和value的类型可以是简单类型也可以是负责的object或者另一个元组的结合。
如何创建一个key/value pair RDD呢?
val rdd = sc.parallelize(List("Spark","is","an", "amazing", "piece", "of","technology"))
val pairRDD = rdd.map(w => (w.length,w))
pairRDD.collect().foreach(println)
输出如下:
(5,Spark)
(2,is)
(2,an)
(7,amazing)
(5,piece)
(2,of)
(10,technology)
key/value pair RDD transformation操作介绍
- groupByKey([numTasks])
- reduceByKey(func, [numTasks])
- sortByKey([ascending], [numTasks])
- join(otherRDD, [numTasks])
groupByKey([numTasks])
把拥有同样key值的所有value分成一组。对于一个数据集(K,V)对来讲,返回的RDD的结构是(K, Iterable)
例子:
val rdd = sc.parallelize(List("Spark","is","an", "amazing", "piece", "of","technology"))
val pairRDD = rdd.map(w => (w.length,w))
val wordByLenRDD = pairRDD.groupByKey()
wordByLenRDD.collect().foreach(println)
输出:
(10,CompactBuffer(technology))
(2,CompactBuffer(is, an, of))
(5,CompactBuffer(Spark, piece))
(7,CompactBuffer(amazing))
reduceByKey(func, [numTasks])
这个转换操作把用于同样key的所有value根据func汇成一个单独的值。 整个过程分为两步: 分组和应用reduce 函数。
内置的转换实现做了一些优化: 它会现在每一个partition内部做分组和应用reduce操作, 然后再跨partition做类似操作,这样一来需要跨partition移动的数据量就大大减少了。
例子:
val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5),("candy1", 2.0), ("candy2", 6.0), ("candy3", 3.0))
val summaryTx = candyTx.reduceByKey((total, value) => total + value)
summaryTx.collect()
输出为:
(candy1,7.2)
(candy2,9.5)
(candy3,3.0)
sortByKey([ascending], [numTasks])
根据行数据的key对所有行进行排序,默认情况下使用升序。如果要使用降序,需要把第一个参数设置成false
。
例子:
val summaryByPrice = summaryTx.map(t => (t._2, t._1)).sortByKey()
summaryByPrice.collect
输出为:
Array[(Double, String)] = Array((3.0,candy3), (7.2,candy1), (9.5,candy2))
如果要降序则指定一个参数为false
。
val summaryByPrice = summaryTx.map(t => (t._2, t._1)).sortByKey(false)
summaryByPrice.collect
输出:
(9.5,candy2)
(7.2,candy1)
(3.0,candy3)
join(otherRDD, [numTasks])
和数据库的join类似。 一个数据集(K, V)和另一个数据集(K, W)的join结果为(K, (V,W)).
例子:
val memberTx = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),
(105, 6.0),(165, 31.0), (110, 40.11)))
val memberInfo = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"), (105, "a"),(165, "c")))
val memberTxInfo = memberTx.join(memberInfo)
memberTxInfo.collect().foreach(println)
输出为:
(105,(6.0,a))
(165,(31.0,c))
(110,(50.35,a))
(110,(40.11,a))
(126,(211.0,b))
(127,(305.2,b))
key/value pair RDD action操作介绍
- countByKey()
- collectAsMap()
- lookup(key)
countByKey()
汇总RDD中每一个key对应多少个行数据。
例子:
val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.countByKey()
输出:
scala.collection.Map[String,Long] = Map(candy1 -> 2, candy2 -> 1, candy3 -> 1)
collectAsMap()
和collect
类似收集数据作为一个map到driver程序,每一项代表一行数据。
例子:
val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.collectAsMap()
输出:
scala.collection.Map[String,Double] = Map(candy2 -> 3.5, candy1 -> 2.0, candy3 -> 6.0)
从结果中可以看出两个 candy1的键结果却只包含一个candy1。这是因为如果多行数据拥有同一个键值,结果会把覆盖成一个项目。
lookup(key)
这个action用来快速校验一个特殊的key是否在RDD中存在。
val candyTx = sc.parallelize(List(("candy1", 5.2), ("candy2", 3.5), ("candy1", 2.0), ("candy3", 6.0)))
candyTx.lookup("candy1")
candyTx.lookup("candy2")
candyTx.lookup("candy5")
输出为:
Seq[Double] = WrappedArray(5.2, 2.0)
Seq[Double] = WrappedArray(3.5)
Seq[Double] = WrappedArray()
如果多行数据拥有同样的key,对应行的所有value都会被返回.
Be First to Comment