加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

对Spark的那些【魔改】

发布时间:2018-08-16 08:44:35 所属栏目:教程 来源:祝威廉
导读:技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 前言 这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。 比如SparkEn

这样,你可以理解为在Driver端启动了一个PRC Server。要运行这段代码也非常简单,直接在主程序里运行即可:

  1. // parameter server should be enabled by default 
  2.     if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) { 
  3.       logger.info("ps enabled...") 
  4.       if (ss.sparkContext.isLocal) { 
  5.         localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext) 
  6.         localSchedulerBackend.start() 
  7.       } else { 
  8.         logger.info("start PSDriverBackend") 
  9.         psDriverBackend = new PSDriverBackend(ss.sparkContext) 
  10.         psDriverBackend.start() 
  11.       } 
  12.     } 

这里我们需要实现local模式和cluster模式两种。

Driver启动了一个PRC Server,那么Executor端如何启动呢?Executor端似乎没有任何一个地方可以让我启动一个PRC Server? 其实有的,只是非常trick,我们知道Spark是允许自定义Metrics的,并且会调用用户实现的metric特定的方法,我们只要开发一个metric Sink,在里面启动RPC Server,骗过Spark即可。具体时下如下:

  1. class PSServiceSink(val property: Properties, val registry: MetricRegistry, 
  2.                     securityMgr: SecurityManager) extends Sink with Logging { 
  3.   def env = SparkEnv.get 
  4.  
  5.   var psDriverUrl: String = null 
  6.   var psExecutorId: String = null 
  7.   var hostname: String = null 
  8.   var cores: Int = 0 
  9.   var appId: String = null 
  10.   val psDriverPort = 7777 
  11.   var psDriverHost: String = null 
  12.   var workerUrl: Option[String] = None 
  13.   val userClassPath = new mutable.ListBuffer[URL]() 
  14.  
  15.   def parseArgs = { 
  16.     //val runtimeMxBean = ManagementFactory.getRuntimeMXBean(); 
  17.     //var argv = runtimeMxBean.getInputArguments.toList 
  18.     var argv = System.getProperty("sun.java.command").split("s+").toList 
  19.  
  20.    ..... 
  21.     psDriverHost = host 
  22.     psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort 
  23.   } 
  24.  
  25.   parseArgs 
  26.  
  27.   def createRpcEnv = { 
  28.     val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER 
  29.     val bindAddress = hostname 
  30.     val advertiseAddress = "" 
  31.     val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt 
  32.     val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) { 
  33.       Some(CryptoStreamUtils.createKey(env.conf)) 
  34.     } else { 
  35.       None 
  36.     } 
  37.     //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}") 
  38.     RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf, 
  39.       env.securityManager, clientMode = !isDriver) 
  40.   } 
  41.  
  42.   override def start(): Unit = { 
  43.  
  44.     new Thread(new Runnable { 
  45.       override def run(): Unit = { 
  46.         logInfo(s"delay PSExecutorBackend 3s") 
  47.         Thread.sleep(3000) 
  48.         logInfo(s"start PSExecutor;env:${env}") 
  49.         if (env.executorId != SparkContext.DRIVER_IDENTIFIER) { 
  50.           val rpcEnv = createRpcEnv 
  51.           val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores) 
  52.           PSExecutorBackend.executorBackend = Some(pSExecutorBackend) 
  53.           rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend) 
  54.         } 
  55.       } 
  56.     }).start() 
  57.  
  58.   } 
  59. ... 

到这里,我们就能成功启动RPC Server,并且连接上Driver中的PRC Server。现在,你就可以在不修改Spark 源码的情况下,尽情的写通讯相关的代码了,让你可以更好的控制Executor。

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读