Last updated on June 27, 2020
聚合操作
聚合操作是大数据分析过程中一个常用的操作,用来分析汇总数据,或者生成汇总报告。
聚合通常需要对整个数据集或一个或多个列进行某种形式的分组,然后对每个组应用聚合函数,如求和、计数或求平均值。Spark提供了许多常用的聚合函数。这里介绍下基本的聚合函数和分组聚合操作函数。
首先创建一个用于demo的DataFram
val flight_summary = spark.read.format("csv").option("header","true")
.option("inferSchema","true").load("flight-summary.csv")
flight_summary.count()
Long = 4693
这里的count()
是DataFrame的一个Action。
而我们这次介绍的聚合函数中的count()
是一个function,所有的聚合函数都是延迟计算的函数
基本聚合函数
- count(col)
- countDistinct(col)
- approx_count_distinct(col)
- min(col)
- max(col)
- sum(col)
- sumDistinct(col)
- avg(col)
- skewness(col)
- kurtosis(col)
- variance(col)
- stddev(col)
- collect_list(col)
- collect_set(col)
首先创建一个DataFrame
count(col)
函数
count(col)
函数用于统计一个分组中的项目的数量。比如统计数据中某些列的数量
例子:
flight_summary.select(count("origin_airport"), count("dest_airport").as("dest_count")).show
输出为:
+----------------------+-----------+
| count(origin_airport)| dest_count|
+----------------------+-----------+
| 4693| 4693|
+----------------------+-----------+
为了易读性这里用.as
对结果列重命名,并使用show
查看结果
默认情况下count(col)
统计一个列中的条目数的时候是不包含值为null
的那些行的。
如果要统计包含值为null
的那些行,需要使用count(*)
。
具体看下面这个例子
首先数据如下:
badMoviesDF.show
+----------+--------------+--------------+
|actor_name| movie_title| produced_year|
+----------+--------------+--------------+
| null| null| 2018|
| John Doe| Awesome Movie| 2018|
| null| Awesome Movie| 2018|
| Mary Jane| Awesome Movie| 2018|
+----------+--------------+--------------+
然后执行查询
badMoviesDF.select(count("actor_name"), count("movie_title"), count("produced_year"), count("*")).show
结果如下:
+------------------+-------------------+---------------------+---------+
| count(actor_name)| count(movie_title)| count(produced_year)| count(1)|
+------------------+-------------------+---------------------+---------+
| 2| 3| 4| 4|
+------------------+-------------------+---------------------+---------+
可以看到count(col)
不会包含列值为null的那些行。
countDistinct(col)
countDistinct(col)
和count(col)
类似,从名字也可以看出countDistinct(col)
只包含值不重复的那些行。
例子
flight_summary.select(countDistinct("origin_airport"), countDistinct("dest_airport"), count("*")).show
+-------------------------------+-----------------------------+---------+
| count(DISTINCT origin_airport)| count(DISTINCT dest_airport)| count(1)|
+-------------------------------+-----------------------------+---------+
| 322| 322| 4693|
+-------------------------------+-----------------------------+---------+
从结果中可以看出其数量远小于count(col)
得到的数量。
approx_count_distinct (col, max_estimated_error=0.05)
当一个数据集很大时,精确的统计一个数据集中不重复行的数量是一个十分耗时的操作。
在某些情况下我们只需要得到一个对不重复行数进行估计的总数。
既然是估计就有误差,用户可以通过max_estimated_error
指定可以接受的误差。
例子:
flight_summary.select(count("count"),countDistinct("count"), approx_count_distinct("count", 0.05)).show
结果:
+--------------+----------------------+-----------------------------+
| count(count) | count(DISTINCT count)| approx_count_distinct(count)|
+--------------+----------------------+-----------------------------+
| 4693| 2033| 2252|
+--------------+----------------------+-----------------------------+
可以看到在4693行中实际不重复数是2033,估计得到的值是2252。
当允许的误差越大时,执行的速度就越快。
min(col), max(col)
这两个聚合函数很容易理解,就是统计列中的最小值和最大值。
例子:
flight_summary.select(min("count"), max("count")).show
结果:
+-----------+-----------+
| min(count)| max(count)|
+-----------+-----------+
| 1| 13744|
+-----------+-----------+
sum(col)
对列中所有值进行求和操作。
flight_summary.select(sum("count")).show
结果:
+-----------+
| sum(count)|
+-----------+
| 5332914|
+-----------+
sumDistinct(col)
就像名字一样,对列中不重复值进行求和操作。
flight_summary.select(sumDistinct("count")).show
结果:
+--------------------+
| sum(DISTINCT count)|
+--------------------+
| 3612257|
+--------------------+
avg(col)
求平均函数。其值等价于sum(col)/count(col)
例子:
flight_summary.select(avg("count"), (sum("count") / count("count"))).show
结果:
+-------------------+----------------------------+
| avg(count)| (sum(count) / count(count))|
+-------------------+----------------------------+
| 1136.3549968037503| 1136.3549968037503|
+-------------------+----------------------------+
variance(col), stddev(col)
计算方差和标准差函数。方差(variance)和标准差(stddev)
例子:
flight_summary.select(variance("count"), var_pop("count"), stddev("count"), stddev_pop("count")).show
结果:
+-----------------+------------------+------------------+-----------------+
| var_samp(count)| var_pop(count)|stddev_samp(count)|stddev_pop(count)|
+-----------------+------------------+------------------+-----------------+
|1879037.7571558713|1878637.3655604832| 1370.779981308405| 1370.633928355957|
+-----------------+------------------+------------------+-----------------+
分组聚合函数
分组执行聚合是一个分两步的过程。第一步是使用groupBy(col1,col2,.)
执行分组,转换,可以在其中指定按哪些列对行进行分组。与返回DataFrame的其他转换不同,这里的groupBy
转换返回RelationalGroupedDataset
类的实例,然后可以对其应用一个或多个聚合函数。
- 一个简单的在一个列上应用一个聚合的例子
flight_summary.groupBy("origin_airport").count().show(5, false)
结果:
+--------------------------------------------------+------+
| origin_airport | count|
+--------------------------------------------------+------+
|Melbourne International Airport | 1|
|San Diego International Airport (Lindbergh Field) | 46|
|Eppley Airfield | 21|
|Kahului Airport | 18|
|Austin-Bergstrom International Airport | 41|
+--------------------------------------------------+------+
- 通过两个列进行分组,并应用
count()
统计个数
flight_summary.groupBy('origin_state, 'origin_city).count
.where('origin_state === "CA").orderBy('count.desc).show(5)
结果:
+-------------+-----------------+-------+
| origin_state| origin_city| count|
+-------------+-----------------+-------+
| CA| San Francisco| 80|
| CA| Los Angeles| 80|
| CA| San Diego| 47|
| CA| Oakland| 35|
| CA| Sacramento| 27|
+-------------+-----------------+-------+
类RelationalGroupedDataset提供了一组可应用于每个子组的标准聚合函数。
它们是avg(cols)
,count()
,mean(cols)
,min(cols)
,max(cols)
和sum(cols)
。
除count()
函数外,其余所有函数都对数值列进行操作。
以上两个例子都只应用了一个聚合函数,下面举几个多个聚合函数的例子。
比如我们想同时统计一个列的行数,最大值,最小值等。
核心词是agg
函数
flight_summary.groupBy("origin_airport")
.agg(
count("count").as("count"),
min("count"), max("count"),
sum("count")
).show(5)
结果为:
+--------------------+------+-----------+-----------+-----------+
| origin_airport| count| min(count)| max(count)| sum(count)|
+--------------------+------+-----------+-----------+-----------+
|Melbourne Interna...| 1| 1332| 1332| 1332|
|San Diego Interna...| 46| 4| 6942| 70207|
| Eppley Airfield| 21| 1| 2083| 16753|
| Kahului Airport| 18| 67| 8313| 20627|
|Austin-Bergstrom ...| 41| 8| 4674| 42067|
+--------------------+------+-----------+-----------+-----------+
通过agg
函数我们同时对一个分组进行了求和,求最大值,求最小值,计数的操作。
此外agg
函数提供了另一种通过基于字符串的键值映射来表示列表达式的方法。键是列名,值是聚合函数。
比如:
flight_summary.groupBy("origin_airport")
.agg(
"count" -> "count",
"count" -> "min",
"count" -> "max",
"count" -> "sum")
.show(5)
collect_list(col)
和collect_set(col)
collect_list(col)
和collect_set(col)
用来在分组后收集特定组的所有值。
唯一的区别是collect_list(col)
返回可能包含重复值的集合。
collect_set(col)
返回仅包含唯一值的集合就像list
和set
的区别一样。例子:
val highCountDestCities =
flight_summary.where('count > 5500)
.groupBy("origin_state")
.agg(collect_list("dest_city").as("dest_cities"))
highCountDestCities.withColumn("dest_city_count", size('dest_cities)).show(5, false) # withColumn 增加一列
结果:
+-------------+--------------------------------------+----------------+
| origin_state| dest_cities | dest_city_count|
+-------------+--------------------------------------+----------------+
| AZ| [Seattle, Denver, Los Angeles] | 3|
| LA| [Atlanta] | 1|
| MN| [Denver, Chicago] | 2|
| VA| [Chicago, Boston, Atlanta] | 3|
| NV| [Denver, Los Angeles, San Francisco] | 3|
+-------------+--------------------------------------+----------------+
- Pivot 聚合操作
Pivot 的翻译很多比如旋转,透视。其单词本意是枢纽,旋转的中心点。
这里其实就是行转列的意思。
旋转是一种汇总数据的方式,方法是指定一个分类列,然后在另一列上执行聚合,以便将分类值从行转置到单独的列。关于透视的另一种思考方式是,它是一种在应用一个或多个聚合的同时将行转换为列的方法。此技术通常用于数据分析或报告。旋转过程开始于对一个或多个列进行分组,然后旋转到一个列上,最后以在一个或多个列上应用一个或多个聚合结束。
这里比较绕口,举个例子就明白了。
例子:
有个数据集如下:
case class Student(name:String, gender:String, weight:Int, graduation_year:Int)
val studentsDF = Seq(Student("John", "M", 180, 2015),
Student("Mary", "F", 110, 2015),
Student("Derek", "M", 200, 2015),
Student("Julie", "F", 109, 2015),
Student("Allison", "F", 105, 2015),
Student("kirby", "F", 115, 2016),
Student("Jeff", "M", 195, 2016)).toDF
现在我们想知道每一个毕业年份每一个性别的平均体重。
使用Pivot 操作
studentsDF.groupBy("graduation_year").pivot("gender").avg("weight").show()
+----------------+------+------+
| graduation_year| F| M|
+----------------+------+------+
| 2015| 108.0| 190.0|
| 2016| 115.0| 195.0|
+----------------+------+------+
这里和普通按年份和性别分组,然后统计平均体重的区别是什么呢?
区别就是Pivot 有行专列的功能。
下面是一个普通的按年份和性别分组,然后统计平均体重的例子
studentsDF.groupBy('graduation_year, 'gender).agg(avg('weight)).show()
结果:
+---------------+------+-----------+
|graduation_year|gender|avg(weight)|
+---------------+------+-----------+
| 2015| M| 190.0|
| 2016| M| 195.0|
| 2015| F| 108.0|
| 2016| F| 115.0|
+---------------+------+-----------+
对比输出可以看出使用Pivot后,Pivot(col)会把指定的这个列进行行专列的操作。
因为gender 有两个可能的值,所以Pivot后结果会有两列。
同样的我们也可以在Povit指定的列上同时应用多个聚合函数。
例子:
studentsDF.groupBy("graduation_year").pivot("gender")
.agg(
min("weight").as("min"),
max("weight").as("max"),
avg("weight").as("avg")
).show()
结果:
+---------------+------+------+------+------+------+------+
|graduation_year| F_min| F_max| F_avg| M_min| M_max| M_avg|
+---------------+------+------+------+------+------+------+
| 2015| 105| 110| 108.0| 180| 200| 190.0|
| 2016| 115| 115| 115.0| 195| 195| 195.0|
+---------------+------+------+------+------+------+------+
我们知道进行Pivot的列有多少不同的值,最后就会生成多少列。
当一个列的值有很多时,我们可以单独指定根据哪一个值应用聚合函数。
例子:
studentsDF.groupBy("graduation_year").pivot("gender", Seq("M"))
.agg(
min("weight").as("min"),
max("weight").as("max"),
avg("weight").as("avg")
).show()
结果:
+----------------+------+------+------+
| graduation_year| M_min| M_max| M_avg|
+----------------+------+------+------+
| 2015| 180| 200| 190.0|
| 2016| 195| 195| 195.0|
+----------------+------+------+------+
可以看出,我们只根据了gender值为M进行了聚合操作
Youre so right. Im there with you. Your blog is surely worth a read if anybody comes across it. Im lucky I did because now Ive got a whole new view of this. I didnt realise that this issue was so important and so universal. You absolutely put it in perspective for me.