Skip to content

Spark SQL(二) DataFrames相关的Transformation操作

DataFrames是不可变的,且与其相关的Transformation操作和RDD的相关操作一样都是返回一个新的DataFrame.

DataFrames Transformations

  • select
  • selectExpr
  • filter/where
  • distinct/dropDuplicates
  • sort/orderBy
  • limit
  • union
  • withColumn
  • withColumnRenamed
  • drop
  • sample
  • randomSplit
  • join
  • groupBy
  • describe

如何引用列

上面列的转换操作有的需要以字符串的形式传入一个列作为参数,有的需要一个Column的方式传入一个列作为参数。所以在介绍Transformation相关的操作之前先介绍下Spark SQL种引用列的方式。

在Spark SQL中有5种方式引用一个列:

  • "columnName" 用双引号括住一个列名,这种方式以字符串的形式引用列。
  • col("clolumnName"). col这个函数会返回一个Column类的实例
  • column("columnName"). 这种方式的效果和col是一样的。
  • $"columnName". 这种方式是scala的语法糖, 也会返回一个Column类的实例。
  • 'cloumnName. scala的另一个语法糖,也会返回一个Column的实例。

具体的Transformation介绍

首先从数据源创建一个DataFrame

val movies = spark.read.parquet("<path>/chapter4/data/movies/movies.parquet")
  1. select(cloumns) 操作

一个例子:

movies.select("movie_title","produced_year").show(5)

输出:

+-------------------+--------------+
|        movie_title| produced_year|
+-------------------+--------------+
|       Coach Carter|          2005|
|        Superman II|          1980|
|          Apollo 13|          1995|
|           Superman|          1978|
| Back to the Future|          1985|
+-------------------+--------------+

值得注意的是,在select种如何传入多个列,那么引用列的方式要求统一,不可以一个用string的方式一个用Column的方式。

另一个例子,当以Column引用列的时候,可以用利用Column的内置函数就行数学,逻辑计算。

movies.select('movie_title,('produced_year - ('produced_year % 10)).as("produced_decade")).show(5)

输出为:

+-------------------+----------------+
|        movie_title| produced_decade|
+-------------------+----------------+
|       Coach Carter|            2000|
|        Superman II|            1980|
|          Apollo 13|            1990|
|           Superman|            1970|
| Back to the Future|            1980|
+-------------------+----------------+
  1. selectExpr(expression)操作

这个操作和select的区别是,它可以接受一个或多个SQL表达式,而不是列名作为参数。
例子1:

movies.selectExpr("*","(produced_year - (produced_year % 10)) as decade").show(5)

输出为:

+-----------------+-------------------+--------------+-------+
|       actor_name|        movie_title| produced_year| decade|
+-----------------+-------------------+--------------+-------+
|McClure, Marc (I)|       Coach Carter|          2005|   2000|
|McClure, Marc (I)|        Superman II|          1980|   1980|
|McClure, Marc (I)|          Apollo 13|          1995|   1990|
|McClure, Marc (I)|           Superman|          1978|   1970|
|McClure, Marc (I)| Back to the Future|          1985|   1980|
+-----------------+-------------------+--------------+-------+

例子2:

movies.selectExpr("count(distinct(movie_title)) as movies","count(distinct(actor_name)) as actors").show

输出为:

+-------+-------+
| movies| actors|
+-------+-------+
|   1409|   6527|
+-------+-------+
  1. filter(function)where(function) 操作

首先这两个操作是等价的,是用来根据列值进行过滤的操作。
例子1:

movies.filter('produced_year === 2000).show(5)

输出为:

+------------------+---------------------+--------------+
|        actor_name|          movie_title| produced_year|
+------------------+---------------------+--------------+
| Cooper, Chris (I)|   Me, Myself & Irene|          2000|
| Cooper, Chris (I)|          The Patriot|          2000|
|   Jolie, Angelina| Gone in Sixty Sec...|          2000|
|    Yip, Françoise|       Romeo Must Die|          2000|
|    Danner, Blythe|     Meet the Parents|          2000|
+------------------+---------------------+--------------+

例子2: 在Spark SQL中不等于的运算符是=!=

movies.select("movie_title","produced_year").filter('produced_year =!= 2000).show(5)

输出:

+-------------------+--------------+
|        movie_title| produced_year|
+-------------------+--------------+
|       Coach Carter|          2005|
|        Superman II|          1980|
|          Apollo 13|          1995|
|           Superman|          1978|
| Back to the Future|          1985|
+-------------------+--------------+

从这个例子可以看出,如果要执行一些逻辑运算,需要以Column的方式引用列。
例子3: 多个条件组合过滤

movies.filter('produced_year >= 2000 && length('movie_title) < 5).show(5)

输出:

+----------------+------------+--------------+
|      actor_name| movie_title| produced_year|
+----------------+------------+--------------+
| Jolie, Angelina|        Salt|          2010|
|  Cueto, Esteban|         xXx|          2002|
|   Butters, Mike|         Saw|          2004|
|  Franko, Victor|          21|          2008|
|   Ogbonna, Chuk|        Salt|          2010|
+----------------+------------+--------------+

例子4: 另一种多个条件组合的方式是多次调用filter函数

movies.filter('produced_year >= 2000).filter(length('movie_title) < 5).show(5)
  1. distinctdropDuplicates操作

这个操作很简单就是用来去掉某列值重复的行数据。

movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies").show

输出:

+------+
|movies|
+------+
|  1409|
+------+
  1. sort(columns)orderBy(columns)操作

根据指定的列值进行排序操作。
例子1:

val movieTitles = movies.dropDuplicates("movie_title")
                        .selectExpr("movie_title", "length(movie_title) as title_length", , "produced_year")
movieTitles.sort('title_length).show(5)

输出:

+-----------+-------------+--------------+
|movie_title| title_length| produced_year|
+-----------+-------------+--------------+
|         RV|            2|          2006|
|         12|            2|          2007|
|         Up|            2|          2009|
|         X2|            2|          2003|
|         21|            2|          2008|
+-----------+-------------+--------------+

例子2:
默认是升序排序,如果要以逆序排序需要特别指定

movieTitles.orderBy('title_length.desc).show(5)

输出:

+---------------------+-------------+--------------+
|          movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...|           83|          2006|
| The Chronicles of...|           62|          2005|
| Hannah Montana & ...|           57|          2008|
| The Chronicles of...|           56|          2010|
| Istoriya pro Rich...|           56|          1997|
+---------------------+-------------+--------------+

例子3:根据2个列进行排序

movieTitles.orderBy('title_length.desc, 'produced_year).show(5)

输出:

+---------------------+-------------+--------------+
|          movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...|           83|          2006|
| The Chronicles of...|           62|          2005|
| Hannah Montana & ...|           57|          2008|
| Istoriya pro Rich...|           56|          1997|
| The Chronicles of...|           56|          2010|
+---------------------+-------------+--------------+
  1. limit(n) 操作

这个操作返回前n行数据.
例子:

// first create a DataFrame with their name and associated length
val actorNameDF = movies.select("actor_name").distinct.selectExpr("*", "length(actor_name) as length")
// order names by length and retrieve the top 10
actorNameDF.orderBy('length.desc).limit(10).show

输出:

+-----------------------------+-------+
|                   actor_name| length|
+-----------------------------+-------+
| Driscoll, Timothy 'TJ' James|     28|
| Badalamenti II, Peter Donald|     28|
|  Shepard, Maridean Mansfield|     27|
|  Martino, Nicholas Alexander|     27|
|  Marshall-Fricker, Charlotte|     27|
|  Phillips, Christopher (III)|     27|
|  Pahlavi, Shah Mohammad Reza|     27|
|   Juan, The Bishop Don Magic|     26|
|   Van de Kamp Buchanan, Ryan|     26|
|   Lough Haggquist, Catherine|     26|
+-----------------------------+-------+
  1. union(otherDataFrame) 操作

这个操作的参数是一个DataFrame,这个操作两个DataFrames 拥有相同的schema
例子:

val shortNameMovieDF = movies.where('movie_title === "12")

shortNameMovieDF.show

输出:

+---------------------+------------+---------------+
|           actor_name| movie_title| produced_year |
+---------------------+------------+---------------+
|     Efremov, Mikhail|          12|           2007|
|      Stoyanov, Yuriy|          12|           2007|
|      Gazarov, Sergey|          12|           2007|
| Verzhbitskiy, Viktor|          12|           2007|
+---------------------+------------+---------------+

创建另一个DataFrame

import org.apache.spark.sql.Row
val forgottenActor = Seq(Row("Brychta, Edita", "12", 2007L))
val forgottenActorRDD = spark.sparkContext.parallelize(forgottenActor)
val forgottenActorDF = spark.createDataFrame(forgottenActorRDD, shortNameMovieDF.schema)

执行union操作

val completeShortNameMovieDF = shortNameMovieDF.union(forgottenActorDF)
completeShortNameMovieDF.union(forgottenActorDF).show

输出:

+---------------------+------------+--------------+
|           actor_name| movie_title| produced_year|
+---------------------+------------+--------------+
|     Efremov, Mikhail|          12|          2007|
|      Stoyanov, Yuriy|          12|          2007|
|      Gazarov, Sergey|          12|          2007|
| Verzhbitskiy, Viktor|          12|          2007|
|       Brychta, Edita|          12|          2007|
+---------------------+------------+--------------+
  1. withColumn(colName, cloumn) 操作

这个操作用来向一个DataFrame里面新增一列。
例子:

movies.withColumn("decade", ('produced_year - 'produced_year % 10)).show(5)

输出:

+------------------+-------------------+--------------+-------+
|        actor_name|        movie_title| produced_year| decade|
+------------------+-------------------+--------------+-------+
| McClure, Marc (I)|       Coach Carter|          2005|   2000|
| McClure, Marc (I)|        Superman II|          1980|   1980|
| McClure, Marc (I)|          Apollo 13|          1995|   1990|
| McClure, Marc (I)|           Superman|          1978|   1970|
| McClure, Marc (I)| Back to the Future|          1985|   1980|
+------------------+-------------------+--------------+-------+

还有另一个比较tricky的用法是如果列名已经存在,就会用新的值替换原有列的值。
例子:

movies.withColumn("produced_year", ('produced_year - 'produced_year % 10)).show(5)

输出:

+------------------+-------------------+--------------+
|        actor_name|        movie_title| produced_year|
+------------------+-------------------+--------------+
| McClure, Marc (I)|       Coach Carter|          2000|
| McClure, Marc (I)|        Superman II|          1980|
| McClure, Marc (I)|          Apollo 13|          1990|
| McClure, Marc (I)|           Superman|          1970|
| McClure, Marc (I)| Back to the Future|          1980|
+------------------+-------------------+--------------+
  1. withColumnRenamed(existingColName, newColName) 操作

这个操作用来给DataFrame中已经存在的列进行重命名。
例子:

movies.withColumnRenamed("actor_name", "actor")
      .withColumnRenamed("movie_title", "title")
      .withColumnRenamed("produced_year", "year").show(5)

输出:

+------------------+-------------------+-----+
|             actor|              title| year|
+------------------+-------------------+-----+
| McClure, Marc (I)|       Coach Carter| 2005|
| McClure, Marc (I)|        Superman II| 1980|
| McClure, Marc (I)|          Apollo 13| 1995|
| McClure, Marc (I)|           Superman| 1978|
| McClure, Marc (I)| Back to the Future| 1985|
+------------------+-------------------+-----+
  1. drop(columnName1, columnName2) 操作

这个操作会丢弃指定的列,但是当指定的列名不存在时,不会做任何操作。
例子:

movies.drop("actor_name", "me").printSchema
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
  1. sample(fraction), sample(fraction,seed)sample(fraction, seed, withReplacement) 操作

这个操作是随机地返回DataFrame中一些行数据, 返回的行数通过fraction控制。fraction是一个百分比取值范围是0-1. seed是随机数种子。 withReplacement选项用来决定是否选中的行会被再次放在候选池里面。当withReplacement是true的时候,某行数据可能会被选中多次。

例子:

// sample with no replacement and a fraction
movies.sample(false, 0.0003).show(3)

输出:

+---------------------+----------------------+--------------+
|           actor_name|           movie_title| produced_year|
+---------------------+----------------------+--------------+
|      Lewis, Clea (I)|  Ice Age: The Melt...|          2006|
|       Lohan, Lindsay|   Herbie Fully Loaded|          2005|
| Tagawa, Cary-Hiro...|       Licence to Kill|          1989|
+---------------------+----------------------+--------------+

例子2:

// sample with replacement, a fraction and a seed
movies.sample(true, 0.0003, 123456).show(3)

输出:

+---------------------+---------------+--------------+
|           actor_name|    movie_title| produced_year|
+---------------------+---------------+--------------+
| Panzarella, Russ (V)| Public Enemies|          2009|
|         Reed, Tanoai|      Daredevil|          2003|
|         Moyo, Masasa|   Spider-Man 3|          2007|
+---------------------+---------------+--------------+
  1. randomSplit(weights) 操作

这个操作用来在训练机器学习模型时准备数据。这个操作可以根据权重返回一个或多个DataFrames. 如果提供的权重和不为1,那么会被归一化为1。

例子:

// the weights need to be an Array
val smallerMovieDFs = movies.randomSplit(Array(0.6, 0.3, 0.1))

这时smallMovieDFs是一个DataFrame数组

smallerMovieDFs(0).count + smallerMovieDFs(1).count + smallerMovieDFs(2).count
Long = 31393

而movies 的行数也是31393

movies.count
Long = 31393
  1. describe(columnNames) 操作

有时如果能有相关数据的整体统计信息会非常有用。 describe就是这样一个操作,会返回一个列的计数信息,最大值,最小值,标准差,方差等信息。
例子:

movies.describe("produced_year").show

输出:

+-------+-------------------+
|summary|      produced_year|
+-------+-------------------+
|  count|              31392|
|   mean| 2002.7964449541284|
| stddev|  6.377236851493877|
|    min|               1961|
|    max|               2012|
+-------+-------------------+
Published inSpark大数据

5 Comments

  1. I love this site – its so usefull and helpfull.

  2. I had this page bookmarked a while before but my PC crashed. I have since gotten a new one and it took me a while to find this! I also in fact like the template though.

  3. An impressive share! I have just forwarded this onto a friend who has been conducting a little homework
    on this. And he in fact bought me breakfast simply because I discovered it for
    him… lol. So allow me to reword this…. Thank YOU for the meal!!
    But yeah, thanx for spending some time to discuss this topic
    here on your web site.

  4. My family all the time say that I am wasting my time here at net, however
    I know I am getting familiarity daily by reading thes fastidious posts.

  5. Hi, this weekend is good for me, for the reason that this time i am reading this wonderful
    informative post here at my residence.

Leave a Reply

Your email address will not be published. Required fields are marked *

Author Copyriht by BackendSite