加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 建站 > 正文

干货分享:利用Java多线程技术导入数据到Elasticsearch

发布时间:2019-07-16 16:58:18 所属栏目:建站 来源:Wooola
导读:前言 近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool

监控线程-Monitor为了计算每分钟导入Elasticsearch的数据总条数,利用监控线程,可以调整线程池的线程数的大小,以便利用多线程更快速的导入数据。

  1. public void monitorToES() { 
  2.  new Thread(() -> { 
  3.  while (true) { 
  4.  StringBuilder sb = new StringBuilder(); 
  5.  sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT) 
  6.  .append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL) 
  7.  .append("::已办入库总数::").append(Const.COUNTER.LD_P); 
  8.  sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT); 
  9.  sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL) 
  10.  .append("::已阅入库总数::").append(Const.COUNTER.LD_R); 
  11.  if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
  12.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  13.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  14.  start = System.currentTimeMillis(); 
  15.  } else { 
  16.  long end = System.currentTimeMillis(); 
  17.  if ((end - start) / 1000 >= 60) { 
  18.  start = end; 
  19.  sb.append("n#########################################n"); 
  20.  sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条"); 
  21.  sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条"); 
  22.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  23.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  24.  } 
  25.  } 
  26.  System.out.println(sb.toString()); 
  27.  try { 
  28.  Thread.sleep(3000); 
  29.  } catch (InterruptedException e) { 
  30.  e.printStackTrace(); 
  31.  } 
  32.  } 
  33.  }).start(); 

初始化Elasticsearch:EsClient

  1. String cName = meta.get("cName");//es集群名字 
  2. String esNodes = meta.get("esNodes");//es集群ip节点 
  3. Settings esSetting = Settings.builder() 
  4.  .put("cluster.name", cName) 
  5.  .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 
  6.  .put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5 
  7.  .build(); 
  8. String[] nodes = esNodes.split(","); 
  9. client = new PreBuiltTransportClient(esSetting); 
  10. for (String node : nodes) { 
  11.  if (node.length() > 0) { 
  12.  String[] hostPort = node.split(":"); 
  13.  client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 
  14.  } 

初始化数据库连接

  1. conn = DriverManager.getConnection(url, user, password); 

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读