加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程 > 正文

MongoDB Stream是如何实现完美数据增量迁移的?

发布时间:2018-08-18 04:55:11 所属栏目:编程 来源:zale
导读:技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 一、背景介绍 最近微服务架构火得不行,但本质上也只是风口上的一个热点词汇。 作为笔者的经验来说,想要应用一个新的架构需要带来的变革成本是非常高的。 尽管如此,目前还是有许

每一个变更任务会不断对topic产生写操作,触发一系列ChangeEvent产生:

  • doInsert:生成随机频道的topic后,执行insert;
  • doUpdate:随机取得一个topic,将其channel字段改为随机值,执行update;
  • doReplace:随机取得一个topic,将其channel字段改为随机值,执行replace;
  • doDelete:随机取得一个topic,执行delete。

以doUpdate为例,实现代码如下:

MongoDB Stream是如何实现完美数据增量迁移的?

启动一个全量迁移任务,将topic表中数据迁移到topic_new新表:

MongoDB Stream是如何实现完美数据增量迁移的?

在全量迁移开始前,先获得当前时刻的的最大 _id 值(可以将此值记录下来)作为终点,随后逐个完成迁移转换。

在全量迁移完成后,便开始最后一步:增量迁移。

注:增量迁移过程中,变更操作仍然在进行。

  1. final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);  
  2. final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);  
  3. ObjectId currentId = null;  
  4. Document sort = new Document("_id", 1);  
  5. MongoCursor<Document> cursor = null;  
  6. // 批量大小  
  7. int batchSize = 100;AtomicInteger count = new AtomicInteger(0);  
  8. try {  
  9.     while (true) {  
  10.         boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;  
  11.         // 按ID增量分段拉取  
  12.         if (currentId == null) {  
  13.             cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();  
  14.         } else {  
  15.             cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))  
  16.                     .sort(sort).limit(batchSize).iterator();  
  17.         }  
  18.         boolean hasIncrRecord = false;  
  19.         while (cursor.hasNext()) {  
  20.             hasIncrRecord = true;  
  21.             Document incrDoc = cursor.next();  
  22.             OperationType opType = OperationType.fromString(incrDoc.getString(field_op));  
  23.             ObjectId docId = incrDoc.getObjectId(field_key);  
  24.             // 记录当前ID  
  25.             currentId = incrDoc.getObjectId("_id"); 
  26.             if (opType == OperationType.DELETE) {  
  27.                 topicNewCollection.deleteOne(new Document("_id", docId));  
  28.             } else {  
  29.                 Document doc = incrDoc.get(field_data, Document.class);  
  30.                 // channel转换  
  31.                 String oldChannel = doc.getString(field_channel);  
  32.                 doc.put(field_channel, Channel.toNewName(oldChannel));  
  33.                 // 启用upsert  
  34.                 UpdateOptions options = new UpdateOptions().upsert(true);  
  35.                 topicNewCollection.replaceOne(new Document("_id", docId),  
  36.                         incrDoc.get(field_data, Document.class), options);  
  37.             }  
  38.             if (count.incrementAndGet() % 10 == 0) {  
  39.                 logger.info("IncrTransferTask progress, count: {}", count.get());  
  40.             }  
  41.         }  
  42.         // 当watch停止工作(没有更多变更),同时也没有需要处理的记录时,跳出  
  43.         if (!isWatchTaskStillRunning && !hasIncrRecord) {  
  44.             break;  
  45.         }  
  46.         sleep(200);  
  47.     } 
  48.  } catch (Exception e) {  
  49.     logger.error("IncrTransferTask ERROR", e);  

增量迁移的实现是一个不断tail的过程,利用 **_id 字段的有序特性 ** 进行分段迁移;即记录下当前处理的_id值,循环拉取在该_id值之后的记录进行处理。

增量表(topic_incr)中除了DELETE变更之外,其余的类型都保留了整个文档,因此可直接利用replace + upsert追加到新表。

最后,运行整个程序。

MongoDB Stream是如何实现完美数据增量迁移的?

查看topic表和topic_new表,发现两者数量是相同的。为了进一步确认一致性,我们对两个表的分别做一次聚合统计:

topic表

MongoDB Stream是如何实现完美数据增量迁移的?

topic_new表

MongoDB Stream是如何实现完美数据增量迁移的?

前者输出结果:

MongoDB Stream是如何实现完美数据增量迁移的?

后者输出结果:

MongoDB Stream是如何实现完美数据增量迁移的?

前后对比的结果是一致的。

五、后续优化

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读