Skip to content

Spark RDD(一) RDD的创建和转化操作

Spark RDD(一) RDD的创建和转化操作

RDD 有几个特点

  • 不可变
  • 容错
  • 并行数据结构
  • 内存中计算
  • 数据分区和存放
  • 丰富的操作函数

RDD 的操作行为可以分为两类:转换和行为。
转换是一个延迟操作,返回的结果是另外一个RDD。
行为是另一种操作,会触发转换逻辑。行为的结果一般是返回一个结果,或者把结果写到磁盘。

创建RDD 有3种方式:

(1) parallelize一个object

val stringList = Array("Spark is awesome","Spark is cool")
val stringRDD = spark.sparkContext.parallelize(stringList)

(2) 从存储系统创建

val fileRDD = spark.sparkContext.textFile("/tmp/data.txt")

(3) 从已有RDD创建

转换操作

  • map(func)

    val allCapsRDD = stringRDD.map(line => line.toUpperCase)
    allCapsRDD.collect().foreach(println)

    自定义函数

    def toUpperCase(line:String) : String = {  line.toUpperCase }
    stringRDD.map(l => toUpperCase(l)).collect.foreach(println)

    定义类case class

    case class Contact(id:Long, name:String, email:String)
    val contactData = Array("1#John Doe#jdoe@domain.com","2#Mary Jane#mjane@domain.com")
    val contactDataRDD = spark.sparkContext.parallelize(contactData)
    val contactRDD = contactDataRDD.map(l => {
         val contactArray = l.split("#")
         Contact(contactArray(0).toLong, contactArray(1), contactArray(2))
    })
    contactRDD.collect.foreach(println)

    从已有的RDD

    val stringLenRDD = stringRDD.map(l => l.length)
    stringLenRDD.collect.foreach(println)
  • flatMap(func)

    val wordRDD = stringRDD.flatMap(line => line.split(" "))
    wordRDD.collect().foreach(println)

    输出结果如下:

    Spark
    is
    awesome
    Spark
    is
    cool

    map和flatMap的区别如下:

    stringRDD.map(line => line.split(" ")).collect //map
    stringRDD.flatMap(line => line.split(" ")).collect //flatMap

    结果分别二维数组为

    Array[Array[String]] = Array(Array(Spark, is, awesome), Array(Spark, is, cool))

    和一维数组

    Array[String] = Array(Spark, is, awesome, Spark, is, cool)
  • filter(func)

    val awesomeLineRDD = stringRDD.filter(line => line.contains("awesome"))
    awesomeLineRDD.collect
  • mapPartitions(func): 其作用范围是partition,用于更耗时的操作,比如建立数据连接等,用于一次初始化整个partition。

  • mapPartitionsWithIndex(func)

  • union(OtherRDD)

    val rdd1 = spark.sparkContext.parallelize(Array(1,2,3,4,5))
    val rdd2 = spark.sparkContext.parallelize(Array(1,6,7,8))
    val rdd3 = rdd1.union(rdd2)
    rdd3.collect()

    其结果:

    Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)
  • intersection(otherRDD)

    val rdd1 = spark.sparkContext.parallelize(Array("One", "Two", "Three"))
    val rdd2 = spark.sparkContext.parallelize(Array("two","One","threed","One"))
    val rdd3 = rdd1.intersection(rdd2)
    rdd3.collect()

    其结果为:

    Array[Int] = Array(One)
  • substract(otherRDD)

    val words = spark.sparkContext.parallelize(List("The amazing thing about spark is that it is very simple to learn")).flatMap(l => l.split(" ")).map(w => w.toLowerCase)
    val stopWords = spark.sparkContext.parallelize(List("the it is to that")).flatMap(l => l.split(" "))
    val realWords = words.substract(stopWords)
    realWords.collect()

    其结果为:

    Array[String] = Array(simple, learn, amazing, spark, about, very, thing)
  • distinct([numTasks])

    val duplicateValueRDD = spark.sparkContext.parallelize(List("one", 1, "two", 2, "three", "one", "two", 1, 2)
    duplicateValueRDD.distinct().collect

    其结果输出为:

    Array[Any] = Array(1, 2, two, one, three)
  • sample(withReplace, fraction, seed)

    val numbers =  spark.sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
    numbers.sample(true, 0.3).collect

    其结果为:

    Array[Int] = Array(1, 7, 7, 8)
    Array[Int] = Array(1, 6, 6, 7, 8, 9, 10)
Published inSpark大数据

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Author Copyriht by BackendSite