加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

对Spark的那些【魔改】

发布时间:2018-08-16 08:44:35 所属栏目:教程 来源:祝威廉
导读:技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 前言 这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。 比如SparkEn

接着我们需要再封装一个LocalNonOpSerializer,

  1. class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable { 
  2.   val javaS = new JavaSerializer(conf) 
  3.  
  4.   override def newInstance(): SerializerInstance = { 
  5.     new LocalNonOpSerializerInstance(javaS.newInstance()) 
  6.   } 
  7.  
  8.   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { 
  9.     javaS.writeExternal(out) 
  10.   } 
  11.  
  12.   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { 
  13.     javaS.readExternal(in) 
  14.   } 

现在,万事俱备,只欠东风了,我们怎么才能把这些代码让Spark运行起来。具体做法非常魔幻,实现一个enhance类:

  1. def enhanceSparkEnvForAPIService(session: SparkSession) = { 
  2.       val env = SparkEnv.get 
  3.    //创建一个新的WowSparkEnv对象,然后将里面的Serializer替换成我们自己的LocalNonOpSerializer 
  4.     val wowEnv = new WowSparkEnv( 
  5.  ..... 
  6.       new LocalNonOpSerializer(env.conf): Serializer, 
  7.  ....) 
  8.     // 将SparkEnv object里的实例替换成我们的 
  9.     //WowSparkEnv 
  10.     SparkEnv.set(wowEnv) 
  11.   //但是很多地方在SparkContext启动后都已经在使用之前就已经生成的SparkEnv,我们需要做些调整 
  12. //我们先把之前已经启动的LocalSchedulerBackend里的scheduer停掉 
  13.     val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend] 
  14.  
  15.     val scheduler = ReflectHelper.field(localScheduler, "scheduler") 
  16.  
  17.     val totalCores = localScheduler.totalCores 
  18.     localScheduler.stop() 
  19.  
  20.   //创建一个新的LocalSchedulerBackend 
  21.     val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores) 
  22.     wowLocalSchedulerBackend.start() 
  23.  //把SparkContext里的_schedulerBackend替换成我们的实现 
  24.     ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend) 
  25.   } 

完工。

其实还有很多

比如在Spark里,Python Worker默认一分钟没有被使用是会被杀死的,但是在StreamingPro里,这些python worker因为都要加载模型,所以启动成本是非常高的,杀了之后再启动就没办法忍受了,通过类似的方式进行魔改,从而使得空闲时间是可配置的。如果大家感兴趣,可以翻看StreamingPro相关代码。

【编辑推荐】

  1. 一文理清Apache Spark内存管理脉络
  2. 深度:Hadoop对Spark五大维度正面比拼报告!
  3. 机器学习实践:如何将Spark与Python结合?
  4. Spark性能优化:开发调优篇
  5. 大数据处理引擎Spark与Flink大比拼
【责任编辑:未丽燕 TEL:(010)68476606】
点赞 0

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读