接着我们需要再封装一个LocalNonOpSerializer,
- class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable {
- val javaS = new JavaSerializer(conf)
-
- override def newInstance(): SerializerInstance = {
- new LocalNonOpSerializerInstance(javaS.newInstance())
- }
-
- override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
- javaS.writeExternal(out)
- }
-
- override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
- javaS.readExternal(in)
- }
- }
现在,万事俱备,只欠东风了,我们怎么才能把这些代码让Spark运行起来。具体做法非常魔幻,实现一个enhance类:
- def enhanceSparkEnvForAPIService(session: SparkSession) = {
- val env = SparkEnv.get
- //创建一个新的WowSparkEnv对象,然后将里面的Serializer替换成我们自己的LocalNonOpSerializer
- val wowEnv = new WowSparkEnv(
- .....
- new LocalNonOpSerializer(env.conf): Serializer,
- ....)
- // 将SparkEnv object里的实例替换成我们的
- //WowSparkEnv
- SparkEnv.set(wowEnv)
- //但是很多地方在SparkContext启动后都已经在使用之前就已经生成的SparkEnv,我们需要做些调整
- //我们先把之前已经启动的LocalSchedulerBackend里的scheduer停掉
- val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend]
-
- val scheduler = ReflectHelper.field(localScheduler, "scheduler")
-
- val totalCores = localScheduler.totalCores
- localScheduler.stop()
-
- //创建一个新的LocalSchedulerBackend
- val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores)
- wowLocalSchedulerBackend.start()
- //把SparkContext里的_schedulerBackend替换成我们的实现
- ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend)
- }
完工。
其实还有很多
比如在Spark里,Python Worker默认一分钟没有被使用是会被杀死的,但是在StreamingPro里,这些python worker因为都要加载模型,所以启动成本是非常高的,杀了之后再启动就没办法忍受了,通过类似的方式进行魔改,从而使得空闲时间是可配置的。如果大家感兴趣,可以翻看StreamingPro相关代码。 【编辑推荐】 - 一文理清Apache Spark内存管理脉络
- 深度:Hadoop对Spark五大维度正面比拼报告!
- 机器学习实践:如何将Spark与Python结合?
- Spark性能优化:开发调优篇
- 大数据处理引擎Spark与Flink大比拼
【责任编辑:未丽燕 TEL:(010)68476606】
点赞 0 (编辑:核心网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|