cover_image

中通异构数据同步平台:ZDTP

vhicool 科技中通
2023年08月15日 08:00

图片







一、背景

随着业务体量逐步增大,跨系统数据抽取的应用场景越来越多,而对于业务开发人员,需要实现异构数据源数据的抽取和录入,除了自己写代码实现数据抽取、转换、写入,另一个选择是使用开源的数据同步工具来做。对于手动实现数据同步,一方面增大了数据投产周期(需要经历应用发布完整的流程),另一方面每个人代码实现差异,项目维护难度增大。如果每个项目引入了各版本、技术栈的第三方数据同步工具,又无法实现数据安全管控,并且多引入一个中间件,也增大项目的复杂度,降低应用可移植性。







二、介绍

ZDTP(ZTO data transmission platform)是一个异构数据同步平台,ZDTP屏蔽各异构数据源的差异,支持数据过滤、处理与加工,并且提供状态监控、异常告警的能力。用户在ZDTP提供的SQL编辑器,写数据同步逻辑,不用关心底层的数据结构。同时容器化部署,一方面实现同步作业资源的动态扩缩容,提升物理资源利用率,另一方面增加了系统可移植性和扩展性。

1. 需要解决的问题

  • 数据格式不兼容

    由于不同系统使用的数据格式可能不同,因此在进行数据同步时需要进行数据转换和映射

  • 数据完整性

    在数据同步过程中,需要确保所有的数据都能够被正确地复制到目标系统中,并且不会发生数据损坏或丢失等问题

  • 数据版本控制

    需要支持数据重推,同步历史数据

  • 同步性能

    不同的数据系统,数据量和同步实时性有较大的差异,需要支持按需调配同步资源

    相同的数据系统,数据数据量也会随着源端系统的数据量动态变化,同步需要支持自动伸缩

2. 平台介绍

一般数据同步是用户程序直接操作底层数据,首先将数据从源数据源查出,并对数据预处理,如日期格式转换、数据打码、生成主键等,然后将数据转换为目标数据源的数据格式,调用API写入数据。ZDTP首先将物理层数据结构映射为逻辑数据源,用户面向逻辑数据源,只需要关注数据处理逻辑,这样即使将数据同步到不同的数据源类型(比如从RocketMQ到Kafka,这样的操作对于项目前期预研阶段是比较常见的),只要修改目标逻辑数据源完成数据流切换。

图片

数据同步平台能力介绍

  • 零代码

    用户使用SQL只需要描述想要什么,而不需要关注内部是如何运行,减少落地周期

  • 低数据延迟

    基于流抽取增量数据,增量数据实时性高

  • 任务动态扩缩容

    将同步任务容器化,一方面实现资源动态扩缩容,提高资源利用率,另一方面增强系统兼容性、可扩展性

  • 数据监控、告警

    对任务异常心跳、同步延迟、大事务等进行告警。数据同步进度、延迟一目了然,异常及时感知

3. 案例

下面的例子是将mysql中的user_table数据同步到RocketMQ中的主题:user_topic。其中同步作业分为两个部分:数据模型和作业SQL

图片

同步作业修改

图片







三、技术架构

在ZDTP项目成立之初,首先考虑到需要面向用户简单易用,并且具备高的同步性能和高稳定性。如何能让用户以较低的成本接入并且又有较大灵活性呢?大家在日常中想必都做过数据库(如mysql)将一张表数据迁移到另一张表的同库数据迁移,只需要写一个简单的SQL如 :INSET INTO user_bak select * from user;,只是这种数据同步是相同数据结构的,那么是不是同样使用 SQL来实现异构数据源的数据同步?

1. 技术选型-为什么选择flink

Flink作为开源的流处理和批处理框架,旨在解决大规模数据处理中的实时计算和数据流处理问题。它提供了高效、可靠和容错的流处理引擎,可用于处理实时数据流和离线批处理作业。同时支持UDF和可扩展connector,并且封装了多种API,如Stream API、Table API,有很高的灵活性和可定制性,Flink具有如下特性:

  • 低延迟

    支持事件驱动的处理,可以在毫秒级别内对数据进行处理,并且支持状态管理,以便更好地处理有状态的应用程序

  • 高吞吐

    Flink 可以同时处理批量和流式数据,并支持复杂的数据管道,可以轻松处理数据流和数据集。

  • 灵活性

    具有非常灵活的 API,可以实现多种类型的数据处理任务。它支持 Java 和 Scala 两种编程语言,还提供了 SQL 和图形处理等开发方式

  • 健壮性

    提供了可靠的容错机制,在出现故障时能够自动恢复并继续运行,从而保证数据处理的正确性和可靠性

  • 扩展性

    通过分布式计算模型实现了横向扩展,可以添加更多的节点来增加计算资源以满足需求。此外,Flink 还支持在云平台上部署,可以根据需要快速扩展计算资源

2. 架构

为了满足高可扩展性,我们将Flink集群进行容器化部署。每个同步作业都是一个小的Flink集群,这样就实现的各同步任务资源隔离,同时具备可扩展性。如果使用本地化集群部署,需要提前部署TaskManager,并预留足够多的Slot(Slot是作业执行所需的资源的最小单元),这样在就比较难做到资源快速扩容和释放。而基于容器化的集群,在Manager层,会基于每个作业需要的配置,预先计算出作业集群的规模,在启动集群时,拉取JobManager(Master进程负责集群调度)和TaskManager(工作进程负责数据处理与流转),每个TaskManager指定了Slot数据,确保所有的Slot能够分配到任务。并且在同步完成或者任务暂停,集群资源主动释放。

ZDTP架构图

图片

3. 如何解决异构数据同步痛点

3.1 数据格式转换

Flink主要有Source(数据抽取)、数据计算(Transformation)和Sink(数据写入)三个部分

  • Source:数据输入源,读取外部数据并将其转换为内部数据结构

  • Transformation:表示对数据进行计算和转换的操作,包括Map、Reduce、Filter、 KeyBy 等操作

  • Sink:数据输出目标,将数据写入外部存储

图片

首先Source根据源端的数据类型,来匹配对应的Source connector,如JDBC connector、Kafka connector等,每个connector会实现数据数据解析逻辑,JDBC通过SQL从数据库中查询数据源、Kafka使用Pull从集群拉取数据,然后将数据类型转换为Flink Data Type。Transformation根据SQL中的处理函数和数据映射关系,将数据加工、转换,并且和目标逻辑数据源字段、数据类型一一对应,并把处理好的数据传递到Sink。Sink接收到数据后,根据数据制定的序列化格式(如json、csv),调用目标数据结构的本地化API product.send(...)将序列化数据持久化

图片

3.2 数据完整性

在数据同步中,最棘手的莫过于数据不一致。在进行数据过程中,由于作业调整和数据源因素,会出现同步异常的情况。发生同步异常时,首先需要记录发生故障前一刻的现场,并且在任务恢复后,从上次中断的位置继续同步数据。ZDTP利用Flink 检查点和At-least-once语义来保证数据的完整性。实现原理是同步算子定时上报各自当前的同步状态(binlog保存GTID,kafka保存partition offset),ZDTP将这些状态保存到外部存储中,在下次任务启动时,从外部存储拉取任务状态,各算子从制定状态继续同步

  • 检查点(Checkpoint)

    定期将应用程序的状态保存到外部存储系统中,以便在故障发生时能够恢复其状态。当发生故障时,将使用最近的 Checkpoint 来恢复应用程序的状态,并从上一个 Checkpoint 处继续执行。这样可以避免数据丢失,同时也确保了数据的一致性

  • At-least-once语义

    在处理消息之前将所有传入的消息保存到耐久性存储系统中,这样,当出现故障并需要从最后一个已知的检查点重新启动处理时,可以进行恢复,以确保不会丢失任何消息

Checkpoint的执行流程如下:

  1. JobManager Checkpoint Coordinator向所有 TaskManager 的Source 算子发送barrier

  2. 当算子接收barrier到请求后,停止接收新的数据,并将所有未处理完的数据写入缓冲区。将自己的状态快照写入到持久化存储中,并将barrier传递到输出流,恢复数据处理

  3. 当算子状态快照持久化后,存储的地址通知到Coordinator

  4. 当Coordinator收到所有算子完成快照的消息后,认为此次快照制作完成,并将状态快照地址写入到持久化存储。如果没有在规定的时间收到所有算子的报告,认为此次快照制作失败


图片

3.3 数据版本控制

数据是具有时效性的,同一个数据在不同的时间的状态会发生变化,如汇率在不同的时间可能出现上下波动,这就出现了不同时间出现不同的数据版本。数据同步过程中,一般有两种:全量数据同步(最新数据快照)和增量数据同步(基于时间历史版本数据)。根据同步类型可以将数据表划分为普通表和版本表

  • 普通表

    表中的数据仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库(mysql、oracle、postgreSQL)、redis的表可以定义成普通表。通常进行实时数据查询,进行全量数据抽取

  • 版本表

    表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog (如mysql binlog)可以定义成版本表,版本表内的数据始终不会自动清理,只能通过upsert触发。通常用于追踪数据最新状态,进行增量数据抽取

图片

普通表、版本表转换

对于普通表类型的数据源,要实现增量同步追踪数据实时变化,需要将普通表转化为版本表。如将汇率表的更新时间作为事件时间,加个货币作为主键约束

图片

普通表如何转化为版本表

  1. 在源端将数据的更新数据作为事件时间

  2. 设置数据时间步长,如30m,每执行一个时间窗口,抽取数据的范围:[开始时间,开始时间+时间步长)

  3. 抽取的数据设置主键,和事件时间一同传递到下游算子,生成upsert事件

图片


ZDTP将普通表转换为版本表

对于有些数据源不具备版本表的特性而有需要增量同步数据,需要如何处理呢,如Oracle在通过Logminer无法满足抽取性能的情况下,如何实现增量同步。从上面普通表和版本表的介绍和转换可以看到,将普通表转换为版本表,只需普通表上指定主键和事件时间。对于Oracle来说,我们通过SQL查询数据,实际上是在操作Oracle 普通表,想要转换为版本表的话,我们需要在Oracle的table中指定一个主键和时间,一般表中都会指定一个主键(如ID),并且有更新时间(如update_time)。这样就可以在SQL抽取Oracle数据(SELECT ID,NAME,STATUS,UPDATE_TIME FROM ORDER WHERE UPDATE_TIME >=$start_time AND UPDATE_TIME < $start_time + $step),其中$start_time是每个时间窗口的开始时间,$step是每个窗口的长度,窗口的结束时间为:$start_time + $step,这样随着下一个窗口的开始时间为上一个窗口的结束时间,这样随着窗口的移动直到窗口的结束时间大于等于当前数据处理时间(程序当前时间)则暂停移动。将处于窗口的数据传递到下游算子,同时记录此时的窗口结束时间作为同步进度,用于作业异常恢复时,从断点继续抽取数据。最终每个窗口数据具备两个基本的元素:主键和事件时间,窗口移动将批次数据转换为数据流持续不断增量抽取数据

图片

基于版本表的增量数据同步,存在以下问题:

  • 数据冲突

    数据主键唯一约束upsert,数据在目标库不存在新增,存在则更新

  • 数据恢复

    1.基于checkpoint 周期性执行快照,并保存最新一次成功的状态快照,并存储在数据库

    2.在任务重启、异常恢复时,根据作业id,加载任务状态快照,设置数据开启位置

  • 数据回溯

    1.回溯时间、位点(kafka、rocketmq、mysql binlog)存储在数据库

    2.任务启动根据作业id,加载作业状态,设置数据开启位置

3.4 同步性能

ZDTP是基于容器化部署,提升了同步作业的水平扩展能力。主要是通过下面两个手段:

  1. 一是根据同步作业的并发要求,计算出集群的规模
  2. 二是在作业运行中时,根据实际流量需要的处理能力,实现物理资源(CPU、内存)动态伸缩

水平扩展

每个任务被划分为多个子任务,每个子任务被分配到一个 TaskManager 上执行。当需要扩展 Flink 任务时,可以增加 TaskManager 节点,以提供更多的计算资源。Flink 的 ResourceManager 将会自动检测新的节点,并将其加入到集群中。

物理资源动态伸缩

基于K8s实现TaskManager中的资源动态伸缩,在TaskManager申请时,会设置此TaskManager最少资源和资源的上线,在数据高、低峰期资源自动伸缩

ZDTP但作业集群提前计算出需要的资源和集群规模,需要多少申请多少,提高资源利用

图片

Flink通过Slot来确定集群的处理能力,每一个Slot提供一个并发度。一个同步作业需要的总Slot大小为,单个算子(Source、Transformation、Sink都是算子的一种)的最大并发度。如Source的并发度为6,Transformation的是6,Sink的是12,此时的最大并发度是12,这个作业需要12个Slot

图片


总结


ZDTP提供了异构数据同步的能力,现在使用的场景包括:Mysql分库分表、ES、消息队列、Clickhouse等数据源的单逻辑表数据同步。未来如果接入RPC框架,消费上游数据经过加工调用RPC实现上下游业务数据传递(如用户注册);接入告警,对上游(kafka、数据库等)实时数据分析发送告警;支持多表JOIN,支持数据跨数据源关联等。新增这些新的应用场景,将数据流转配置化,提升了数据的流动性。一些标准化的业务可以通过配置数据流转来实现,降低维护工作的同时,提升数据管控能力.

图片

RECOMMEND

往期干货

•中通IM测试实践

浅谈zto大数据计算与业务系统的融合

•中通数据架构治理

•ZCAT在前端APM领域的探索与应用

欢迎各位技术大佬向本公众号积极投稿,提升经验分享、信息互通的技术交流氛围,共同解决技术难题、共同进步!

图片

继续滑动看下一个
科技中通
向上滑动看下一个