Skip to content

SparkSQL 高级篇(二) 连接操作

Last updated on June 27, 2020

SparkSQL 高级篇(二) 连接操作

连接操作由连接表达式和连接类型两部分组成。
连接表达式指定了通过数据集中的哪一列进行连接。而连接类型决定了连接后的数据集包含什么内容。
SparkSQL 支持以下连接类型:
– 内连接(Inner join)
– 左外部连接(left outer join)
– 右外部连接(right outer join)
– 外连接(outer join)
– 左反连接(left anti join)
– 左半连接(left semi join)
– 笛卡尔连接(cross join)

首先创建两个数据集:
1. 员工表:

case class Employee(first_name:String, dept_no:Long)
val employeeDF = Seq( Employee("John", 31),
                      Employee("Jeff", 33),
                      Employee("Mary", 33),
                      Employee("Mandy", 34),
                      Employee("Julie", 34),
                      Employee("Kurt", null.asInstanceOf[Int])
                    ).toDF
  1. 部门表:
case class Dept(id:Long, name:String)
val deptDF = Seq( Dept(31, "Sales"),
                  Dept(33, "Engineering"),
                  Dept(34, "Finance"),
                  Dept(35, "Marketing")
                ).toDF

并把他们注册成视图以便使用SQL进行操作

employeeDF.createOrReplaceTempView("employees")
deptDF.createOrReplaceTempView("departments")

内连接(Inner join)

内连接会返回两个数据集中满足连接表达式的所有信息。

Spark中使用第一种方法是使用Spark DF的方式:

// 创建Join Expression
val joinExpression=employeeDF.col("dept_no")===deptDF.col("id")

//执行连接
employeeDF.join(deptDF, joinExpression, "inner").show

结果如下:

+----------+-------+---+-----------+
|first_name|dept_no| id|       name|
+----------+-------+---+-----------+
|      John|     31| 31|      Sales|
|      Jeff|     33| 33|Engineering|
|      Mary|     33| 33|Engineering|
|     Mandy|     34| 34|    Finance|
|     Julie|     34| 34|    Finance|
+----------+-------+---+-----------+

此外由于inner join是Spark SQL的默认连接类型,因此也可以省略
可以直接调用

employeeDF.join(deptDF, joinExpression).show

另一种比较方便的方法是直接使用sql

spark.sql("select * from employees join departments where dept_no=id").show

除了预先定义连接表达式之外:还可以直接在join这个transformation中直接写连接表达式或者在where中指定:

  1. 如果列名在两个数据集中是唯一的,可以使用简化版本的连接表达式
 employeeDF.join(deptDF, 'dept_no==='id).show
  1. 如果列名不唯一就必须通过col指定具体的DataFrame的具体列
employeeDF.join(deptDF, employeeDF.col("dept_no")===deptDF.col("id")).show
  1. 除了在join中指定还可以在通过where这个transformation中指定:
employeeDF.join(deptDF).where('dept_no==='id).show

Left Outer Join(左外连接)

Left Outer Join的结果包括了Inner Join的所有行,再加上左边数据集的所有行即使连接表达式的值是false.
对于没有匹配的那些行,来自右边数据集的列的值会被置成NULL。

通过DataFrame

employeeDF.join(deptDF, 'dept_no==='id, "left_outer").show

或者通过spark.sql

spark.sql("select * from employees left outer join departments on dept_no==id").show

结果如下:

+----------+-------+----+-----------+
|first_name|dept_no|  id|       name|
+----------+-------+----+-----------+
|      John|     31|  31|      Sales|
|      Jeff|     33|  33|Engineering|
|      Mary|     33|  33|Engineering|
|     Mandy|     34|  34|    Finance|
|     Julie|     34|  34|    Finance|
|      Kurt|      0|null|       null|
+----------+-------+----+-----------+

通过结果我们可以看到Kurt的dpet_no是0,在右边数据集departments中没有对应的id,所以来自右边数据集中的列是null。

Right Outer Join(右外连接)

Right Outer Join的结果返回所有Inner Join的结果,再加上所有来自于右边数据集中的数据。
对于右边数据集中没有匹配的那些行,对应的左边数据集的列值是null.

通过DataFrame

employeeDF.join(deptDF, 'dept_no==='id, "right_outer").show

通过sql

spark.sql("select * from employees right outer join departments on dept_no==id").show

结果为:

+----------+-------+---+-----------+
|first_name|dept_no| id|       name|
+----------+-------+---+-----------+
|      John|     31| 31|      Sales|
|      Mary|     33| 33|Engineering|
|      Jeff|     33| 33|Engineering|
|     Julie|     34| 34|    Finance|
|     Mandy|     34| 34|    Finance|
|      null|   null| 35|  Marketing|
+----------+-------+---+-----------+

Outer Join (全连接)

Outer Join返回的是左外连接和右外连接结果的并集。

通过DataFrame

employeeDF.join(deptDF, 'dept_no==='id, "outer").show

通过sql

spark.sql("select * from employees full outer join departments on dept_no==id").show

注意在sql 中类型是”full outer join”

结果为:

+----------+-------+----+-----------+
|first_name|dept_no|  id|       name|
+----------+-------+----+-----------+
|      Kurt|      0|null|       null|
|     Mandy|     34|  34|    Finance|
|     Julie|     34|  34|    Finance|
|      John|     31|  31|      Sales|
|      Jeff|     33|  33|Engineering|
|      Mary|     33|  33|Engineering|
|      null|   null|  35|  Marketing|
+----------+-------+----+-----------+

Left Anti Join(左反连接)

Left Anti Join 会返回左边数据集中和右边数据集中没有匹配的那些行,并且返回结果中只包含左边数据集中的列。

通过DataFrame

employeeDF.join(deptDF, 'dept_no === 'id, "left_anti").show

通过sql

spark.sql("select * from employees LEFT ANTI JOIN departments on dept_no == id").show

结果为:

+----------+-------+
|first_name|dept_no|
+----------+-------+
|      Kurt|      0|
+----------+-------+

Left Semi Join(左半连接)

有两种方式理解Left Semi Join:
(1) 类似于Inner Join但是返回的数据集中不包含右边数据集中的列。
(2) 是Left Anti Join的相反操作。只返回左边数据集中和右边数据集有匹配的那些行。

通过DataFrame

employeeDF.join(deptDF, 'dept_no === 'id, "left_semi").show

通过sql

spark.sql("select * from employees left semi join departments on dept_no==id").show

结果如下:

+----------+-------+
|first_name|dept_no|
+----------+-------+
|      John|     31|
|      Jeff|     33|
|      Mary|     33|
|     Mandy|     34|
|     Julie|     34|
+----------+-------+

Cross Join(aka Cartesian)

交叉连接又称笛卡尔连接。笛卡尔连接是一个十分危险的操作,因为他会把左边数据集中的每一行和右边数据集中的所有行进行连接操作。如果左边数据集有M行,右边书记有N行,笛卡尔连接结果数据集中会有M*N 行,并且包含两个数据集中的所有列。 笛卡尔连接是一个开销很大的操作,以至于在DataFrame中为笛卡尔连接单独定义了一个Transformation。
而不能通过字符串指定连接类型。

通过DataFrame

scala> deptDF.count
res19: Long = 4

 employeeDF.count
res17: Long = 6

employeeDF.crossJoin(deptDF).count
res16: Long = 24

通过Spark sql

spark.sql("select * from employees cross join departments").show

如何解决执行连接操作时有重复的列名

问题描述

首先构造一个具有重复列名的数据集。
// add a new column to deptDF with name dept_no

val deptDF2 = deptDF.withColumn("dept_no", 'id)
deptDF2.printSchema
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- dept_no: long (nullable = false)

这样的话,employeeDF和deptDF2就都有一个列叫”dept_no”。

执行连接操作

val dupNameDF=employeeDF.join(deptDF2, employeeDF.col("dept_no")===deptDF2.col("dept_no"))

查看 dupNameDF的schema

dupNameDF.printSchema
|-- first_name: string (nullable = true)
|-- dept_no: long (nullable = false)
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- dept_no: long (nullable = false)

这样的话,如果直接引用dept_no会出现错误

 dupNameDF.select("dept_no")
org.apache.spark.sql.AnalysisException: Reference 'dept_no' is ambiguous, could be: dept_no, dept_no.;

如何解决呢?
三种方式:

(1)指定原先的DataFrame
连接后得到的DataFrame可以记住哪一个列来自于哪个DataFrame

dupNameDF.select(deptDF2.col("dept_no")).show

结果为:

+-------+
|dept_no|
+-------+
|     31|
|     33|
|     33|
|     34|
|     34|
+-------+

(2) 在执行连接操作前改名
DataFrame 可以通过withColumnRenamedTransformation改名

(3) 通过Join Transformation 自动去除重复的列名,此方法对自连接无效

val noDupNameDF = employeeDF.join(deptDF2, "dept_no")
noDupNameDF.show

结果:

+-------+----------+---+-----------+
|dept_no|first_name| id|       name|
+-------+----------+---+-----------+
|     31|      John| 31|      Sales|
|     33|      Jeff| 33|Engineering|
|     33|      Mary| 33|Engineering|
|     34|     Mandy| 34|    Finance|
|     34|     Julie| 34|    Finance|
+-------+----------+---+-----------+

注意一下这种写法,只需要指定一个列名,就会根据这个列名自动执行InnerJoin 并在结果数据集中去除重复列

noDupNameDF.printSchema

 |-- dept_no: long (nullable = false)
 |-- first_name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)

Join 背后的实现原理

Spark 背后两种Join策略: Shuffle Hash Join 和 Broadcast Hash Join。
具体选择哪一种策略取决于执行数据集的大小。当数据集较小时并且能够放到executor的内存中时选择Broadcast Join。否则采用Shuffle Hash Join。

(1) Shuffle Hash Join

此方法包含两步: 第一步在每个数据集中根据连接表达式中指定的列值进行hash,并把hash值相同的行分到同一个partition中。 第二步在partition中把这些具有相同hash值行的列拼接。这种操作开销很大。

(2) Broadcast Hash Join
当其中一个数据集小到可以放进内存中时这种策略才会被使用。与Shuffle Hash Join对两个数据集都执行Shuffle Hash不同, Broadcast Hash Join仅仅较小的数据集进行Shuffle Hash操作。
Broadcast Hash Join也分为两步: 第一步把较小数据集的一份拷贝分发到较大数据集中的每一个Partition。
第二步: 在每一个partition内部执行join的逻辑。

通常情况下Spark SQL会自动选择采用何种Hash Join方式,但是我们可以通过下面的方式提供使用Broadcast Hash Join的提示:

employeeDF.join(broadcast(deptDF), employeeDF.col("dept_no") === deptDF.col("id"))
Published inSpark大数据

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Author Copyriht by BackendSite