加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Adaptive Execution 让 Spark SQL 更智能更高效

发布时间:2018-10-24 20:43:10 所属栏目:教程 来源:郭俊 Jason Guo
导读:本文转发自技术世界,原文链接 http://www.jasongj.com/spark/adaptive_execution/ 1 背景 前面《Spark SQL / Catalyst 内部原理 与 RBO》与《Spark SQL 性能优化再进一步 CBO 基于代价的优化》介绍的优化,从查询本身与目标数据的特点的角度尽可能保证了

为了解决这个问题,Spark 新增接口,一次 Shuffle Read 可以读多个 Partition 的数据。如下图所示,Task 1 通过一轮请求即可同时读取 Task 0 内 Partition 0、1 和 2 的数据,减少了网络请求数量。同时 Mapper 0 一次性读取并返回三个 Partition 的数据,相当于顺序 IO,从而提升了性能。

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL adaptive reducer 2

由于 Adaptive Execution 的自动设置 Reducer 是由 ExchangeCoordinator 根据 Shuffle Write 统计信息决定的,因此即使在同一个 Job 中不同 Shuffle 的 Reducer 个数都可以不一样,从而使得每次 Shuffle 都尽可能最优。

上文 原有 Shuffle 的问题 一节中的例子,在启用 Adaptive Execution 后,三次 Shuffle 的 Reducer 个数从原来的全部为 3 变为 2、4、3。

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL with adaptive Shuffle

2.4 使用与优化方法

可通过 spark.sql.adaptive.enabled=true 启用 Adaptive Execution 从而启用自动设置 Shuffle Reducer 这一特性

通过 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可设置每个 Reducer 读取的目标数据量,其单位是字节,默认值为 64 MB。上文例子中,如果将该值设置为 50 MB,最终效果仍然如上文所示,而不会将 Partition 0 的 60MB 拆分。具体原因上文已说明

3 动态调整执行计划

3.1 固定执行计划的不足

在不开启 Adaptive Execution 之前,执行计划一旦确定,即使发现后续执行计划可以优化,也不可更改。如下图所示,SortMergJoin 的 Shuffle Write 结束后,发现 Join 一方的 Shuffle 输出只有 46.9KB,仍然继续执行 SortMergeJoin

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL with fixed DAG

此时完全可将 SortMergeJoin 变更为 BroadcastJoin 从而提高整体执行效率。

3.2 SortMergeJoin 原理

SortMergeJoin 是常用的分布式 Join 方式,它几乎可使用于所有需要 Join 的场景。但有些场景下,它的性能并不是最好的。

SortMergeJoin 的原理如下图所示

  • 将 Join 双方以 Join Key 为 Key 按照 HashPartitioner 分区,且保证分区数一致
  • Stage 0 与 Stage 1 的所有 Task 在 Shuffle Write 时,都将数据分为 5 个 Partition,并且每个 Partition 内按 Join Key 排序
  • Stage 2 启动 5 个 Task 分别去 Stage 0 与 Stage 1 中所有包含 Partition 分区数据的 Task 中取对应 Partition 的数据。(如果某个 Mapper 不包含该 Partition 的数据,则 Redcuer 无须向其发起读取请求)。
  • Stage 2 的 Task 2 分别从 Stage 0 的 Task 0、1、2 中读取 Partition 2 的数据,并且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 分别从 Stage 1 的 Task 0、1 中读取 Partition 2 的数据,且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 在上述两步 MergeSort 的同时,使用 SortMergeJoin 对二者进行 Join

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL SortMergeJoin

3.3 BroadcastJoin 原理

当参与 Join 的一方足够小,可全部置于 Executor 内存中时,可使用 Broadcast 机制将整个 RDD 数据广播到每一个 Executor 中,该 Executor 上运行的所有 Task 皆可直接读取其数据。(本文中,后续配图,为了方便展示,会将整个 RDD 的数据置于 Task 框内,而隐藏 Executor)

对于大 RDD,按正常方式,每个 Task 读取并处理一个 Partition 的数据,同时读取 Executor 内的广播数据,该广播数据包含了小 RDD 的全量数据,因此可直接与每个 Task 处理的大 RDD 的部分数据直接 Join

Adaptive Execution 让 Spark SQL 更智能更高效

Spark SQL BroadcastJoin

根据 Task 内具体的 Join 实现的不同,又可分为 BroadcastHashJoin 与 BroadcastNestedLoopJoin。后文不区分这两种实现,统称为 BroadcastJoin

与 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,减少了 Shuffle 带来的开销,同时也避免了 Shuffle 带来的数据倾斜,从而极大地提升了 Job 执行效率

同时,BroadcastJoin 带来了广播小 RDD 的开销。另外,如果小 RDD 过大,无法存于 Executor 内存中,则无法使用 BroadcastJoin

对于基础表的 Join,可在生成执行计划前,直接通过 HDFS 获取各表的大小,从而判断是否适合使用 BroadcastJoin。但对于中间表的 Join,无法提前准确判断中间表大小从而精确判断是否适合使用 BroadcastJoin

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读