加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

对Spark的那些【魔改】

发布时间:2018-08-16 08:44:35 所属栏目:教程 来源:祝威廉
导读:技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 前言 这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。 比如SparkEn
副标题[/!--empirenews.page--] 技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战

前言

这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。

对Spark的那些【魔改】

比如SparkEnv里有个属性叫closureSerializer,是专门做任务的序列化反序列化的,当然也负责对函数闭包的序列化反序列化。我们看看内部是怎么实现的:

  1. val serializer = instantiateClassFromConf[Serializer]( 
  2.       "spark.serializer", "org.apache.spark.serializer.JavaSerializer") 
  3.     logDebug(s"Using serializer: ${serializer.getClass}") 
  4.  
  5.     val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) 
  6.  
  7.     val closureSerializer = new JavaSerializer(conf) 
  8.  
  9. val envInstance = new SparkEnv( 
  10. ..... 
  11.  closureSerializer, .... 

这里直接new了一个JavaSerializer,并不能做配置。如果不改源码,你没有任何办法可以替换掉掉这个实现。同理,如果我想替换掉Executor的实现,基本也是不可能的。

今年有两个大地方涉及到了对Spark的【魔改】,也就是不通过改源码,使用原有发型包,通过添加新代码的方式来对Spark进行增强。

二层RPC的支持

我们知道,在Spark里,我们只能通过Task才能touch到Executor。现有的API你是没办法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加载一个资源文件,现在是没办法做到的。为了能够对Executor进行直接的操作,那就需要建立一个新的通讯层。那具体怎么做呢?

首先,在Driver端建立一个Backend,这个比较简单,

  1. class PSDriverBackend(sc: SparkContext) extends Logging { 
  2.  
  3.   val conf = sc.conf 
  4.   var psDriverRpcEndpointRef: RpcEndpointRef = null 
  5.  
  6.   def createRpcEnv = { 
  7.     val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER 
  8.     val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS) 
  9.     val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS) 
  10.     var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt 
  11.     val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) { 
  12.       Some(CryptoStreamUtils.createKey(sc.conf)) 
  13.     } else { 
  14.       None 
  15.     } 
  16.     logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}") 
  17.     var createSucess = false 
  18.     var count = 0 
  19.     val env = new AtomicReference[RpcEnv]() 
  20.     while (!createSucess && count < 10) { 
  21.       try { 
  22.         env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf, 
  23.           sc.env.securityManager, clientMode = !isDriver)) 
  24.         createSucess = true 
  25.       } catch { 
  26.         case e: Exception => 
  27.           logInfo("fail to create rpcenv", e) 
  28.           count += 1 
  29.           port += 1 
  30.       } 
  31.     } 
  32.     if (env.get() == null) { 
  33.       logError(s"fail to create rpcenv finally with attemp ${count} ") 
  34.     } 
  35.     env.get() 
  36.   } 
  37.  
  38.   def start() = { 
  39.     val env = createRpcEnv 
  40.     val pSDriverBackend = new PSDriverEndpoint(sc, env) 
  41.     psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend) 
  42.   } 
  43.  

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读