加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

为啥Spark 的Broadcast要用单例模式

发布时间:2019-06-12 01:01:24 所属栏目:教程 来源:浪院长
导读:很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因: 广播变量大多数情况下是不会变更的,使用单例模式可以减少spark streaming每次job生成执行

可以看到代码里首先会执行job生成代码

  1. graph.generateJobs(time) 
  2.  
  3. 具体代码块儿 
  4.  
  5. def generateJobs(time: Time): Seq[Job] = { 
  6.     logDebug("Generating jobs for time " + time) 
  7.     val jobs = this.synchronized { 
  8.       outputStreams.flatMap { outputStream => 
  9.         val jobOption = outputStream.generateJob(time) 
  10.         jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
  11.         jobOption 
  12.       } 
  13.     } 
  14.     logDebug("Generated " + jobs.length + " jobs for time " + time) 
  15.     jobs 
  16.   } 

每个输出流都会生成一个job,输出流就类似于foreachrdd,print这些。其实内部都是ForEachDStream。所以生成的是一个job集合。

然后就会将job集合提交到线程池里去执行,这些都是在driver端完成的哦。

  1. jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  2.  
  3. 具体h函数内容 
  4. def submitJobSet(jobSet: JobSet) { 
  5.     if (jobSet.jobs.isEmpty) { 
  6.       logInfo("No jobs added for time " + jobSet.time) 
  7.     } else { 
  8.       listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
  9.       jobSets.put(jobSet.time, jobSet) 
  10.       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
  11.       logInfo("Added jobs for time " + jobSet.time) 
  12.     } 
  13.   } 

其实就是遍历生成的job集合,然后提交到线程池jobExecutor内部执行。这个也是在driver端的哦。

jobExecutor就是一个固定线程数的线程池,默认是1个线程。

  1. private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  2.   private val jobExecutor = 
  3.     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") 

需要的话可以配置spark.streaming.concurrentJobs来同时提交执行多个job。

那么这种情况下,job就可以并行执行了吗?

显然不是的!

还要修改一下调度模式为Fair,详细的配置可以参考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

简单的均分的话只需要

  1. conf.set("spark.scheduler.mode", "FAIR") 

然后,同时运行的job就会均分所有executor提供的资源。

这就是整个job生成的整个过程了哦。

因为Spark Streaming的任务存在Fair模式下并发的情况,所以需要在使用单例模式生成broadcast的时候要注意声明同步。

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读