加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

一篇文章看懂 Spark RDD

发布时间:2019-04-18 14:26:48 所属栏目:教程 来源:大数据进击之路
导读:1 简介 Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。它产生于 UC Berkeley AMP Lab,继承了 MapReduce 的优点,但是不同于 MapReduce 的是,Spark 可以将结果保存在内存中,一直迭代计算下去,除非遇到 shuffle 。因此 Spark 能更好的

相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD 的 Lineage 记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个 RDD 的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

4 控制算子

控制算子有三种:cache, persist, checkpoint, 以上算子都可以将 RDD 持久化、持久化的单位是 Partition。

cache 和 persist 都是懒执行的,必须有一个 action 算子来触发他们执行。checkpoint 不仅可以将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。

说几点区别:

  • cache 的持久化级别是 Memory_Only,就这一个。
  • persist 的持久化级别:常用的有Memory_Only 和Memory_and_Disk_2, 数字 2 表示副本数。
  • checkpoint 主要是用来做容错的。

checkpoint 的执行原理是:当 RDD 的 job 执行完毕之后,会从 finalRDD 进行回溯。当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。Spark 框架会自动启动一个新的 Job ,重新计算这个 RDD 的数据,将数据持久化到 HDFS 上。根据这个原理,我们可以进行优化,对 RDD 进行 checkpoint 之前,最好先对这个 RDD 进行 cache, 这样启动新的 job 只需要将内存中的数据拷贝到 HDFS 上就可以了,节省了重新计算这一步。

5 RDD 的依赖关系

窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。图中,map/filter/union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。窄依赖不会产生 shuffle。

宽依赖:指子RDD的分区依赖于父RDD的所有分区,这是因为 shuffle 类操作,如图中的 groupByKey 和未经协同划分的 join。 遇到宽依赖会产生 shuffle 。

上面我们说到了 RDD 之间的依赖关系,这些依赖关系形成了一个人 DAG 有向无环图。DAG 创建完成之后,会被提交给 DAGScheduler, 它负责把 DAG 划分相互依赖的多个 stage ,划分依据就是 RDD 之间的窄宽依赖。换句话说就是,遇到一个宽依赖就划分一个 stage,每一个 stage 包含一个或多个 stask 任务。然后将这些 task 以 taskset 的方式提交给 TaskScheduler 运行。也可以说 stage 是由一组并行的 task 组成。下图很清楚的描述了 stage 的划分。​

6 Stage划分思路

接上图,Spark 划分 stage 的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个 RDD 加入该 stage 中。

因此在图中 RDD C, RDD D, RDD E, RDD F 被构建在一个 stage 中, RDD A被构建在一个单独的Stage中,而 RDD B 和 RDD G 又被构建在同一个 stage中。

另一个角度

一个 Job 会被拆分为多组 Task,每组任务被称为一个Stage就像 Map Stage,Reduce Stage。

Stage 的划分简单的说是以 shuffle 和 result 这两种类型来划分。在 Spark中有两类 task,一类是 shuffleMapTask,一类是 resultTask,第一类 task的输出是 shuffle 所需数据,第二类 task 的输出是 result,stage的划分也以此为依据,shuffle 之前的所有变换是一个 stage,shuffle之后的操作是另一个stage。

如果 job 中有多次 shuffle,那么每个 shuffle 之前都是一个 stage. 会根据 RDD 之间的依赖关系将 DAG图划分为不同的阶段,对于窄依赖,由于 partition 依赖关系的确定性,partition 的转换处理就可以在同一个线程里完成,窄依赖就被 spark 划分到同一个 stage 中,而对于宽依赖,只能等父 RDD shuffle 处理完成后,下一个 stage 才能开始接下来的计算。之所以称之为 ShuffleMapTask 是因为它需要将自己的计算结果通过 shuffle 到下一个 stage 中。

【编辑推荐】

  1. Apache Kafka与Spark Streaming的两种整合方法及其优缺点
  2. Adaptive Execution 让 Spark SQL 更智能更高效
  3. 用Spark 来做大规模图形挖掘:第一部分
  4. 大数据的技术生态?Hadoop、Hive、Spark之间是什么关系?
  5. 大数据计算框架Spark之任务调度
【责任编辑:未丽燕 TEL:(010)68476606】
点赞 0

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读