加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

DataPipeline在大数据平台的数据流实践

发布时间:2018-11-24 08:31:49 所属栏目:大数据 来源:dockone
导读:进入大数据时代,实时作业有着越来越重要的地位。本文将从以下几个部分进行讲解DataPipeline在大数据平台的实时数据流实践。 一、企业级数据面临的主要问题和挑战 1.数据量不断攀升 随着互联网+的蓬勃发展和用户规模的急剧扩张,企业数据量也在飞速增长,
副标题[/!--empirenews.page--]

VbU77fz

进入大数据时代,实时作业有着越来越重要的地位。本文将从以下几个部分进行讲解DataPipeline在大数据平台的实时数据流实践。

一、企业级数据面临的主要问题和挑战

1.数据量不断攀升

随着互联网+的蓬勃发展和用户规模的急剧扩张,企业数据量也在飞速增长,数据的量以GB为单位,逐渐的开始以TB/GB/PB/EB,甚至ZB/YB等。同时大数据也在不断深入到金融、零售、制造等行业,发挥着越来越大的作用。

2. 数据质量的要求不断地提升

当前比较流行的AI、数据建模,对数据质量要求高。尤其在金融领域,对于数据质量的要求是非常高的。

3. 数据平台架构的复杂化

企业级应用架构的变化随着企业规模而变。规模小的企业,用户少、数据量也小,可能只需一个MySQL就搞能搞;中型企业,随着业务量的上升,这时候可能需要让主库做OLTP,备库做OLAP;当企业进入规模化,数据量非常大,原有的OLTP可能已经不能满足了,这时候我们会做一些策略,来保证OLTP和OLAP隔离,业务系统和BI系统分开互不影响,但做了隔离后同时带来了一个新的困难,数据流的实时同步的需求,这时企业就需要一个可扩展、可靠的流式传输工具。

BjyABvE

二、大数据平台上的实践案例

下图是一个典型的BI平台设计场景,以MySQL为例,DataPipeline是如何实现MySQL的SourceConnector。MySQL作为Source端时:

全量+ 增量; 全量:通过select 方式,将数据加载到kafka中; 增量:实时读取 binlog的方式;

使用binlog时需要注意开启row 模式并且image设置为 full。

1. MySQL SourceConnector 全量+增量实时同步的实现

下面是具体的实现流程图,首先开启repeatable read事务,保证在执行读锁之前的数据可以确实的读到。然后进行flush table with read lock 操作,添加一个读锁,防止这个时候有新的数据进入影响数据的读取,这时开始一个truncation with snapshot,我们可以记录当前binlog的offset 并标记一个snapshot start,这时的offset 为增量读取时开始的offset。当事务开始后可以进行全量数据的读取。record marker这时会将生成record 写到 kafka 中,然后commit 这个事务。当全量数据push完毕后我们解除读锁并且标记snapshot stop,此时全量数据已经都进入kafka了,之后从之前记录的offset开始增量数据的同步。

EjU7fqv

2. DataPipeline做了哪些优化工作

1)以往在数据同步环节都分为全量同步和增量同步,全量同步为一个批处理。在批处理时我们都是进行all or nothing的处理,但当大数据情况下一个批量会占用相当长的时间,时间越长可靠性就越难保障,所以往往会出现断掉的情况,这时一个重新处理会让很多人崩溃。DataPipeline 解决了这一痛点,通过管理数据传输时的position 来做到断点续传,这时当一个大规模的数据任务即使发生了意外,也可以重断掉的点来继续之前的任务,大大缩短了同步的时间,提高了同步的效率。

2)在同步多个任务的时候,很难平衡数据传输对源端的压力和目的端的实时性,在大数据量下的传输尤其能够体现,这时DataPipeline 在此做了大量相关测试来优化不同的连接池,开放数据传输效率的自定义化,供客户针对自己的业务系统定制合适的传输任务,对于不同种类的数据库的传输进行优化和调整,保证数据传输的高效性。

3)自定义异构数据类型的转化,往往开源类大数据传输工具如 sqoop 等,对异构数据类型的支持不够灵活,种类也不够齐全。像金融领域中对数据精度要求较高的场景,在传统数据库向大数据平台传输时造成的精度丢失是很大的一个问题。DataPipeline 对此做了更多数据类型的支持,比如hive 支持的复杂类型以及 decimal 和 timestamp 等。

3. Sink端之Hive

1)Hive的特性

Hive 内部表和外部表; 依赖HDFS; 支持事务和非事务; 多种压缩格式; 分区分桶。

2)Hive同步的问题

如何保证实时的写入? schema change了怎么办? 怎么扩展我想保存的格式? 怎么实现多种分区方式? 同步中断了怎么办? 如何保证我的数据不丢?

3)KafkaConnect HDFS 的 Hive 同步实践

使用外表:Hive外部表,能够提高写入效率,直接写HDFS,减少IO消耗,而内表会比外表多一次IO; Schema change:目前的做法是目的端根据源端的变化而变化,当有增加列删除列的情况,目标端会跟随源端改动; 目前支持的存储格式:parquet,avro ,csv 插件化的partitioner,提供多种分区方式,如 Wallclock RecordRecordField:wallclock是使用写入到hive端时的系统时间,Record使用是读取时生成record的时间,RecordField是使用用户自定义的时间戳来定义分区,未来会实现可自定义化的partitioner来满足不同的需求; Recover 机制保障中断后不会丢失数据; 使用WAL (Write-AheadLogging)机制,保证数据目的端数据一致性。

4)Recover的机制

recover 是一种恢复的机制,在数据传输的阶段往往可能出现各种不同的问题,如网络问题等等。当出现问题后我们需要恢复数据同步,那么recover是怎么保证数据正常传输不丢失呢?当recover开始的时候,获取目标文件在hdfs 上的租约,如果这时候需要读写的HDFS当前文件是被占用的,那我们需要等待它直到可以获取到租约。当我们获取到租约后就可以开始读之前写入时候的log,如果第一次会创建一个新的log,并标记一个begin,然后记录了当时的kafka offset。这时候需要清理之前遗留下来的临时数据,清理掉之后再重新开始同步直到同步结束会标记一个end。如果没有结束的话就相当于正在进行中,正在进行中每次都会提交当前同步的offset,来保证出现意外后会回滚到之前offset。

Qb6R3uB

5)WAL (Write-Ahead Logging)机制

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读