Spark RDD(一) RDD的创建和转化操作
RDD 有几个特点
- 不可变
- 容错
- 并行数据结构
- 内存中计算
- 数据分区和存放
- 丰富的操作函数
RDD 的操作行为可以分为两类:转换和行为。
转换是一个延迟操作,返回的结果是另外一个RDD。
行为是另一种操作,会触发转换逻辑。行为的结果一般是返回一个结果,或者把结果写到磁盘。
创建RDD 有3种方式:
(1) parallelize一个object
val stringList = Array("Spark is awesome","Spark is cool")
val stringRDD = spark.sparkContext.parallelize(stringList)
(2) 从存储系统创建
val fileRDD = spark.sparkContext.textFile("/tmp/data.txt")
(3) 从已有RDD创建
转换操作
-
map(func)
val allCapsRDD = stringRDD.map(line => line.toUpperCase) allCapsRDD.collect().foreach(println)
自定义函数
def toUpperCase(line:String) : String = { line.toUpperCase } stringRDD.map(l => toUpperCase(l)).collect.foreach(println)
定义类case class
case class Contact(id:Long, name:String, email:String) val contactData = Array("1#John Doe#jdoe@domain.com","2#Mary Jane#mjane@domain.com") val contactDataRDD = spark.sparkContext.parallelize(contactData) val contactRDD = contactDataRDD.map(l => { val contactArray = l.split("#") Contact(contactArray(0).toLong, contactArray(1), contactArray(2)) }) contactRDD.collect.foreach(println)
从已有的RDD
val stringLenRDD = stringRDD.map(l => l.length) stringLenRDD.collect.foreach(println)
-
flatMap(func)
val wordRDD = stringRDD.flatMap(line => line.split(" ")) wordRDD.collect().foreach(println)
输出结果如下:
Spark is awesome Spark is cool
map和flatMap的区别如下:
stringRDD.map(line => line.split(" ")).collect //map stringRDD.flatMap(line => line.split(" ")).collect //flatMap
结果分别二维数组为
Array[Array[String]] = Array(Array(Spark, is, awesome), Array(Spark, is, cool))
和一维数组
Array[String] = Array(Spark, is, awesome, Spark, is, cool)
-
filter(func)
val awesomeLineRDD = stringRDD.filter(line => line.contains("awesome")) awesomeLineRDD.collect
-
mapPartitions(func): 其作用范围是partition,用于更耗时的操作,比如建立数据连接等,用于一次初始化整个partition。
-
mapPartitionsWithIndex(func)
-
union(OtherRDD)
val rdd1 = spark.sparkContext.parallelize(Array(1,2,3,4,5)) val rdd2 = spark.sparkContext.parallelize(Array(1,6,7,8)) val rdd3 = rdd1.union(rdd2) rdd3.collect()
其结果:
Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)
-
intersection(otherRDD)
val rdd1 = spark.sparkContext.parallelize(Array("One", "Two", "Three")) val rdd2 = spark.sparkContext.parallelize(Array("two","One","threed","One")) val rdd3 = rdd1.intersection(rdd2) rdd3.collect()
其结果为:
Array[Int] = Array(One)
-
substract(otherRDD)
val words = spark.sparkContext.parallelize(List("The amazing thing about spark is that it is very simple to learn")).flatMap(l => l.split(" ")).map(w => w.toLowerCase) val stopWords = spark.sparkContext.parallelize(List("the it is to that")).flatMap(l => l.split(" ")) val realWords = words.substract(stopWords) realWords.collect()
其结果为:
Array[String] = Array(simple, learn, amazing, spark, about, very, thing)
-
distinct([numTasks])
val duplicateValueRDD = spark.sparkContext.parallelize(List("one", 1, "two", 2, "three", "one", "two", 1, 2) duplicateValueRDD.distinct().collect
其结果输出为:
Array[Any] = Array(1, 2, two, one, three)
-
sample(withReplace, fraction, seed)
val numbers = spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2) numbers.sample(true, 0.3).collect
其结果为:
Array[Int] = Array(1, 7, 7, 8) Array[Int] = Array(1, 6, 6, 7, 8, 9, 10)
Be First to Comment