DataFrames是不可变的,且与其相关的Transformation操作和RDD的相关操作一样都是返回一个新的DataFrame.
DataFrames Transformations
- select
- selectExpr
- filter/where
- distinct/dropDuplicates
- sort/orderBy
- limit
- union
- withColumn
- withColumnRenamed
- drop
- sample
- randomSplit
- join
- groupBy
- describe
如何引用列
上面列的转换操作有的需要以字符串的形式传入一个列作为参数,有的需要一个Column
的方式传入一个列作为参数。所以在介绍Transformation相关的操作之前先介绍下Spark SQL种引用列的方式。
在Spark SQL中有5种方式引用一个列:
"columnName"
用双引号括住一个列名,这种方式以字符串的形式引用列。col("clolumnName")
.col
这个函数会返回一个Column
类的实例column("columnName")
. 这种方式的效果和col
是一样的。$"columnName"
. 这种方式是scala的语法糖, 也会返回一个Column
类的实例。'cloumnName
. scala的另一个语法糖,也会返回一个Column
的实例。
具体的Transformation介绍
首先从数据源创建一个DataFrame
val movies = spark.read.parquet("<path>/chapter4/data/movies/movies.parquet")
select(cloumns)
操作
一个例子:
movies.select("movie_title","produced_year").show(5)
输出:
+-------------------+--------------+
| movie_title| produced_year|
+-------------------+--------------+
| Coach Carter| 2005|
| Superman II| 1980|
| Apollo 13| 1995|
| Superman| 1978|
| Back to the Future| 1985|
+-------------------+--------------+
值得注意的是,在select种如何传入多个列,那么引用列的方式要求统一,不可以一个用string的方式一个用Column
的方式。
另一个例子,当以Column
引用列的时候,可以用利用Column
的内置函数就行数学,逻辑计算。
movies.select('movie_title,('produced_year - ('produced_year % 10)).as("produced_decade")).show(5)
输出为:
+-------------------+----------------+
| movie_title| produced_decade|
+-------------------+----------------+
| Coach Carter| 2000|
| Superman II| 1980|
| Apollo 13| 1990|
| Superman| 1970|
| Back to the Future| 1980|
+-------------------+----------------+
selectExpr(expression)
操作
这个操作和select
的区别是,它可以接受一个或多个SQL表达式,而不是列名作为参数。
例子1:
movies.selectExpr("*","(produced_year - (produced_year % 10)) as decade").show(5)
输出为:
+-----------------+-------------------+--------------+-------+
| actor_name| movie_title| produced_year| decade|
+-----------------+-------------------+--------------+-------+
|McClure, Marc (I)| Coach Carter| 2005| 2000|
|McClure, Marc (I)| Superman II| 1980| 1980|
|McClure, Marc (I)| Apollo 13| 1995| 1990|
|McClure, Marc (I)| Superman| 1978| 1970|
|McClure, Marc (I)| Back to the Future| 1985| 1980|
+-----------------+-------------------+--------------+-------+
例子2:
movies.selectExpr("count(distinct(movie_title)) as movies","count(distinct(actor_name)) as actors").show
输出为:
+-------+-------+
| movies| actors|
+-------+-------+
| 1409| 6527|
+-------+-------+
filter(function)
或where(function)
操作
首先这两个操作是等价的,是用来根据列值进行过滤的操作。
例子1:
movies.filter('produced_year === 2000).show(5)
输出为:
+------------------+---------------------+--------------+
| actor_name| movie_title| produced_year|
+------------------+---------------------+--------------+
| Cooper, Chris (I)| Me, Myself & Irene| 2000|
| Cooper, Chris (I)| The Patriot| 2000|
| Jolie, Angelina| Gone in Sixty Sec...| 2000|
| Yip, Françoise| Romeo Must Die| 2000|
| Danner, Blythe| Meet the Parents| 2000|
+------------------+---------------------+--------------+
例子2: 在Spark SQL中不等于的运算符是=!=
movies.select("movie_title","produced_year").filter('produced_year =!= 2000).show(5)
输出:
+-------------------+--------------+
| movie_title| produced_year|
+-------------------+--------------+
| Coach Carter| 2005|
| Superman II| 1980|
| Apollo 13| 1995|
| Superman| 1978|
| Back to the Future| 1985|
+-------------------+--------------+
从这个例子可以看出,如果要执行一些逻辑运算,需要以Column
的方式引用列。
例子3: 多个条件组合过滤
movies.filter('produced_year >= 2000 && length('movie_title) < 5).show(5)
输出:
+----------------+------------+--------------+
| actor_name| movie_title| produced_year|
+----------------+------------+--------------+
| Jolie, Angelina| Salt| 2010|
| Cueto, Esteban| xXx| 2002|
| Butters, Mike| Saw| 2004|
| Franko, Victor| 21| 2008|
| Ogbonna, Chuk| Salt| 2010|
+----------------+------------+--------------+
例子4: 另一种多个条件组合的方式是多次调用filter
函数
movies.filter('produced_year >= 2000).filter(length('movie_title) < 5).show(5)
distinct
或dropDuplicates
操作
这个操作很简单就是用来去掉某列值重复的行数据。
movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies").show
输出:
+------+
|movies|
+------+
| 1409|
+------+
sort(columns)
或orderBy(columns)
操作
根据指定的列值进行排序操作。
例子1:
val movieTitles = movies.dropDuplicates("movie_title")
.selectExpr("movie_title", "length(movie_title) as title_length", , "produced_year")
movieTitles.sort('title_length).show(5)
输出:
+-----------+-------------+--------------+
|movie_title| title_length| produced_year|
+-----------+-------------+--------------+
| RV| 2| 2006|
| 12| 2| 2007|
| Up| 2| 2009|
| X2| 2| 2003|
| 21| 2| 2008|
+-----------+-------------+--------------+
例子2:
默认是升序排序,如果要以逆序排序需要特别指定
movieTitles.orderBy('title_length.desc).show(5)
输出:
+---------------------+-------------+--------------+
| movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...| 83| 2006|
| The Chronicles of...| 62| 2005|
| Hannah Montana & ...| 57| 2008|
| The Chronicles of...| 56| 2010|
| Istoriya pro Rich...| 56| 1997|
+---------------------+-------------+--------------+
例子3:根据2个列进行排序
movieTitles.orderBy('title_length.desc, 'produced_year).show(5)
输出:
+---------------------+-------------+--------------+
| movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...| 83| 2006|
| The Chronicles of...| 62| 2005|
| Hannah Montana & ...| 57| 2008|
| Istoriya pro Rich...| 56| 1997|
| The Chronicles of...| 56| 2010|
+---------------------+-------------+--------------+
limit(n)
操作
这个操作返回前n行数据.
例子:
// first create a DataFrame with their name and associated length
val actorNameDF = movies.select("actor_name").distinct.selectExpr("*", "length(actor_name) as length")
// order names by length and retrieve the top 10
actorNameDF.orderBy('length.desc).limit(10).show
输出:
+-----------------------------+-------+
| actor_name| length|
+-----------------------------+-------+
| Driscoll, Timothy 'TJ' James| 28|
| Badalamenti II, Peter Donald| 28|
| Shepard, Maridean Mansfield| 27|
| Martino, Nicholas Alexander| 27|
| Marshall-Fricker, Charlotte| 27|
| Phillips, Christopher (III)| 27|
| Pahlavi, Shah Mohammad Reza| 27|
| Juan, The Bishop Don Magic| 26|
| Van de Kamp Buchanan, Ryan| 26|
| Lough Haggquist, Catherine| 26|
+-----------------------------+-------+
union(otherDataFrame)
操作
这个操作的参数是一个DataFrame,这个操作两个DataFrames 拥有相同的schema
例子:
val shortNameMovieDF = movies.where('movie_title === "12")
shortNameMovieDF.show
输出:
+---------------------+------------+---------------+
| actor_name| movie_title| produced_year |
+---------------------+------------+---------------+
| Efremov, Mikhail| 12| 2007|
| Stoyanov, Yuriy| 12| 2007|
| Gazarov, Sergey| 12| 2007|
| Verzhbitskiy, Viktor| 12| 2007|
+---------------------+------------+---------------+
创建另一个DataFrame
import org.apache.spark.sql.Row
val forgottenActor = Seq(Row("Brychta, Edita", "12", 2007L))
val forgottenActorRDD = spark.sparkContext.parallelize(forgottenActor)
val forgottenActorDF = spark.createDataFrame(forgottenActorRDD, shortNameMovieDF.schema)
执行union操作
val completeShortNameMovieDF = shortNameMovieDF.union(forgottenActorDF)
completeShortNameMovieDF.union(forgottenActorDF).show
输出:
+---------------------+------------+--------------+
| actor_name| movie_title| produced_year|
+---------------------+------------+--------------+
| Efremov, Mikhail| 12| 2007|
| Stoyanov, Yuriy| 12| 2007|
| Gazarov, Sergey| 12| 2007|
| Verzhbitskiy, Viktor| 12| 2007|
| Brychta, Edita| 12| 2007|
+---------------------+------------+--------------+
withColumn(colName, cloumn)
操作
这个操作用来向一个DataFrame里面新增一列。
例子:
movies.withColumn("decade", ('produced_year - 'produced_year % 10)).show(5)
输出:
+------------------+-------------------+--------------+-------+
| actor_name| movie_title| produced_year| decade|
+------------------+-------------------+--------------+-------+
| McClure, Marc (I)| Coach Carter| 2005| 2000|
| McClure, Marc (I)| Superman II| 1980| 1980|
| McClure, Marc (I)| Apollo 13| 1995| 1990|
| McClure, Marc (I)| Superman| 1978| 1970|
| McClure, Marc (I)| Back to the Future| 1985| 1980|
+------------------+-------------------+--------------+-------+
还有另一个比较tricky的用法是如果列名已经存在,就会用新的值替换原有列的值。
例子:
movies.withColumn("produced_year", ('produced_year - 'produced_year % 10)).show(5)
输出:
+------------------+-------------------+--------------+
| actor_name| movie_title| produced_year|
+------------------+-------------------+--------------+
| McClure, Marc (I)| Coach Carter| 2000|
| McClure, Marc (I)| Superman II| 1980|
| McClure, Marc (I)| Apollo 13| 1990|
| McClure, Marc (I)| Superman| 1970|
| McClure, Marc (I)| Back to the Future| 1980|
+------------------+-------------------+--------------+
withColumnRenamed(existingColName, newColName)
操作
这个操作用来给DataFrame中已经存在的列进行重命名。
例子:
movies.withColumnRenamed("actor_name", "actor")
.withColumnRenamed("movie_title", "title")
.withColumnRenamed("produced_year", "year").show(5)
输出:
+------------------+-------------------+-----+
| actor| title| year|
+------------------+-------------------+-----+
| McClure, Marc (I)| Coach Carter| 2005|
| McClure, Marc (I)| Superman II| 1980|
| McClure, Marc (I)| Apollo 13| 1995|
| McClure, Marc (I)| Superman| 1978|
| McClure, Marc (I)| Back to the Future| 1985|
+------------------+-------------------+-----+
drop(columnName1, columnName2)
操作
这个操作会丢弃指定的列,但是当指定的列名不存在时,不会做任何操作。
例子:
movies.drop("actor_name", "me").printSchema
|-- movie_title: string (nullable = true)
|-- produced_year: long (nullable = true)
sample(fraction)
,sample(fraction,seed)
和sample(fraction, seed, withReplacement)
操作
这个操作是随机地返回DataFrame中一些行数据, 返回的行数通过fraction控制。fraction
是一个百分比取值范围是0-1. seed
是随机数种子。 withReplacement
选项用来决定是否选中的行会被再次放在候选池里面。当withReplacement
是true的时候,某行数据可能会被选中多次。
例子:
// sample with no replacement and a fraction
movies.sample(false, 0.0003).show(3)
输出:
+---------------------+----------------------+--------------+
| actor_name| movie_title| produced_year|
+---------------------+----------------------+--------------+
| Lewis, Clea (I)| Ice Age: The Melt...| 2006|
| Lohan, Lindsay| Herbie Fully Loaded| 2005|
| Tagawa, Cary-Hiro...| Licence to Kill| 1989|
+---------------------+----------------------+--------------+
例子2:
// sample with replacement, a fraction and a seed
movies.sample(true, 0.0003, 123456).show(3)
输出:
+---------------------+---------------+--------------+
| actor_name| movie_title| produced_year|
+---------------------+---------------+--------------+
| Panzarella, Russ (V)| Public Enemies| 2009|
| Reed, Tanoai| Daredevil| 2003|
| Moyo, Masasa| Spider-Man 3| 2007|
+---------------------+---------------+--------------+
randomSplit(weights)
操作
这个操作用来在训练机器学习模型时准备数据。这个操作可以根据权重返回一个或多个DataFrames. 如果提供的权重和不为1,那么会被归一化为1。
例子:
// the weights need to be an Array
val smallerMovieDFs = movies.randomSplit(Array(0.6, 0.3, 0.1))
这时smallMovieDFs
是一个DataFrame数组
smallerMovieDFs(0).count + smallerMovieDFs(1).count + smallerMovieDFs(2).count
Long = 31393
而movies 的行数也是31393
movies.count
Long = 31393
describe(columnNames)
操作
有时如果能有相关数据的整体统计信息会非常有用。 describe
就是这样一个操作,会返回一个列的计数信息,最大值,最小值,标准差,方差等信息。
例子:
movies.describe("produced_year").show
输出:
+-------+-------------------+
|summary| produced_year|
+-------+-------------------+
| count| 31392|
| mean| 2002.7964449541284|
| stddev| 6.377236851493877|
| min| 1961|
| max| 2012|
+-------+-------------------+
I love this site – its so usefull and helpfull.
I had this page bookmarked a while before but my PC crashed. I have since gotten a new one and it took me a while to find this! I also in fact like the template though.
An impressive share! I have just forwarded this onto a friend who has been conducting a little homework
on this. And he in fact bought me breakfast simply because I discovered it for
him… lol. So allow me to reword this…. Thank YOU for the meal!!
But yeah, thanx for spending some time to discuss this topic
here on your web site.
My family all the time say that I am wasting my time here at net, however
I know I am getting familiarity daily by reading thes fastidious posts.
Hi, this weekend is good for me, for the reason that this time i am reading this wonderful
informative post here at my residence.