本期内容:
什么是DTS
binlog介绍
数据库主从同步机制
当下主流DTS工具和对比
Maxwell的工作原理
我们封装的dts
数据传输服务(Data Transmission Service,简称DTS)帮助我们在关系型数据库、NoSQL数据库、数据仓库等数据源之间迁移数据。
DTS支持多种数据传输方式,包括数据迁移、数据同步及数据订阅。我们可以根据使用场景选择最适合的数据传输方式。
介绍DTS之前,我们先简单聊一聊binlog和数据库主从同步机制,以便更好的理解DTS的设计思想和工作原理。
MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。binlog有两个最重要的使用场景:
其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
其二:数据恢复,通过使用mysqlbinlog工具来恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
Mysql binlog的格式有三种,分别是STATEMENT、ROW、MIXED
在配置文件中可以选择配置 binlog_format= statement | mixed | row
格式 | 级别 | 记录时机 | 优点 | 缺点 |
---|---|---|---|---|
statement | 语句级 | binlog会记录每次一执行写操作的语句 | 节省空间 | 有可能造成数据不一致① |
row | 行级 | binlog会记录每次操作后每行记录的变化 | 保持数据的绝对一致性② | 占用较大空间 |
mixed | 看情况 | 看情况③ | 节省空间,同时兼顾了一定的一致性 | 监控情况不方便 |
①由于执行时间不同可能产生的数据就不同(比如时间函数)
②因为不管sql是什么,引用了什么函数,他只记录执行后的效果
③默认还是statement,在某些情况下譬如:当函数中包含 UUID() 时、包含 AUTO_INCREMENT 字段的表被更新时、执行 INSERT DELAYED 语句时、用 UDF 时;会按照 ROW的方式进行处理
row模式下的binlog日志案例:
show master logs; -- binlog文件
show master status; -- 当前正写入的binlog文件
show binlog events IN 'mysql-bin.000331' FROM 123 LIMIT 3, 5; -- binlog详情
EVENT_TYPE | 解释 |
---|---|
QUERY_EVENT | 存储的是SQL,主要是一些与数据无关的操作 |
TABLE_MAP_EVENT | 记录了下一条事件所对应的表信息,在其中存储了数据库名和表名 |
WRITE_ROWS_EVENT | 操作类型为insert |
UPDATE_ROWS_EVENT | 操作类型为update |
DELETE_ROWS_EVENT | 操作类型为delete |
XID_EVENT | 用于标识事务提交 |
mysql的主库(master)对表的变更(除查之外)都会记录到Binlog文件中,那么从库( slave ) 的IO Thread异步地同步binlog文件并写入到本地的Replay文件,SQL Thread( slave ) 再抽取Replay文件中的SQL语句在从库进行执行,实现数据更新达到主从同步的目的。
binlog是Mysql主要的主从同步方式,是一种基于stream的异步复制(并不能做到实时同步,有一定延迟)
推荐使用Row这种格式,可以很方便的反应行级别的数据变化。
OK,介绍完binlog和数据库主从同步概念,我们就可以来聊一聊DTS了!
Canal:Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Canal分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大。Cannal官网>>
Maxwell:Maxwell由zendesk开源,由java开发。是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。Maxwell网站>>
特色 | Cannal | Maxwell |
---|---|---|
开源方 | 阿里巴巴 | zendesk |
语言 | Java | Java |
活跃度 | 活跃 | 活跃 |
HA | 支持 | 定制 |
数据落地 | 定制 | Kafka等 |
分区 | 支持 | 不支持 |
bootstrap | 不支持 | 支持 |
数据格式 | 格式自由 | Json(固定) |
文档 | 详细 | 详细 |
随机读 | 支持 | 支持 |
我们目前的DTS服务是在Maxwell的基础上做的二次开发,那我们先来研究下Maxwell的工作原理。
Maxwell的工作原理主要逻辑就是伪装成mysql的slave,然后利用mysql的主从复制机制实现binlog的消费。
入口:com.zendesk.maxwell.Maxwell#start
首先校验数据源是否满足Maxwell规范,同时会检查Maxwell自身记录数据的库表是否符合规范。
// 1. 确保binlog打开、确保binlog为 row格式、binlog_row_image为MINIMAL报警
MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
// 2. 仅读模式OFF
MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection);
// 3. gtid 全局唯一事务id
if (config.gtidMode) {
MaxwellMysqlStatus.ensureGtidMysqlState(connection);
}
//校验方法中摘取
MaxwellMysqlStatus m = new MaxwellMysqlStatus(c);
m.ensureVariableState("log_bin", "ON");
m.ensureVariableState("binlog_format", "ROW");
m.ensureVariableState("read_only", "OFF");
//if gtid
m.ensureVariableState("gtid_mode", "ON");
m.ensureVariableState("log_slave_updates", "ON");
m.ensureVariableState("enforce_gtid_consistency", "ON");
Mawell需要自己存储数据的库和表,用于记录任务的配置信息和进度、心跳等等
// 4. 确保 maxwell相关元数据表已经创建完成- 如果未创建则通过sql文件初始化创建
SchemaStoreSchema.ensureMaxwellSchema(rawConnection, this.config.databaseName);
// 5. 确保 相关表中有关键字段 没有则创建
try (Connection schemaConnection = this.context.getMaxwellConnection()) {
SchemaStoreSchema.upgradeSchemaStoreSchema(schemaConnection);
}
//如果表不存在则创建表
private static void createStoreDatabase(Connection connection, String schemaDatabaseName) throws SQLException, IOException {
LOGGER.info("Creating " + schemaDatabaseName + " database");
executeSQLInputStream(connection, SchemaStoreSchema.class.getResourceAsStream("/sql/maxwell_schema.sql"), schemaDatabaseName);
executeSQLInputStream(connection, SchemaStoreSchema.class.getResourceAsStream("/sql/maxwell_schema_bootstrap.sql"), schemaDatabaseName);
executeSQLInputStream(connection, SchemaStoreSchema.class.getResourceAsStream("/sql/maxwell_schema_heartbeats.sql"), schemaDatabaseName);
}
//如果缺少字段则创建字段 for example
if ( !getTableColumns("schemas", c).containsKey("deleted") ) {
performAlter(c, "alter table `schemas` add column deleted tinyint(1) not null default 0");
}
一般我们使用kafka消息,但Maxwell也支持其他类型的输出包括rabbitmq、redis等等
// 6. 获取 producer 会通过我们传递的参数来选择 --producer=kafka
AbstractProducer producer = this.context.getProducer();
获取Maxwell中对任务position的记录,当然,如果失败的话依次从主机交换中恢复、之前记录的位置、使用master的pos。
// 7. 获取初始化位置
Position initPosition = getInitialPosition();
logBanner(producer, initPosition);
this.context.setPosition(initPosition);
这里我们通过构建binaryLogClient 去拉取mysql log相关信息 使用的是开源的组件。
当任务启动后,BinlogConnectorEventListener.onEvent() 写向queue入数据。
// 8. 创建binlog连接复制器
this.replicator = new BinlogConnectorReplicator(
//...
);
// 9. 主服务启动 偏移量服务启动
this.context.start();
开始拉取binlog。
// 10. binlog服务启动 调用了BinaryLogClient.connect 那重要的一步>读binlog
replicator.startReplicator();
this.onReplicatorStart();
public void onEvent(Event event) {
//...
while (mustStop.get() != true) {
try {
// 向队列插入数据
if ( queue.offer(ep, 100, TimeUnit.MILLISECONDS ) ) {
break;
}
} catch (InterruptedException e) {
return;
}
}
//...
}
拉取binlog并解析,最终组装成RowMap的形式,并通过product发送。
// 11. 复制服务真正的启动
try {
replicator.runLoop();
} catch (ColumnDefCastException e) {
logColumnCastError(e);
}
我们总结一下Maxwell的执行逻辑
Maxwell生产的kafka消息数据结构:
任务列表
创建任务
获取执行器
try {
//0. 等待选主
try {
registry.waitToBeLeader(taskId, "migration");
} catch (EOFException e) {
return;
}
读取任务配置
任务配置包括数据库主机、库名、账号、密码、数据源类型、目标类型等,最终这些信息会被组装交给Maxwell去启动
//1. 读取taskConfig
Optional<MigrationTask> taskConfigOpt = taskService.get(taskId);
if (!taskConfigOpt.isPresent()) {
return;
}
taskConfig = taskConfigOpt.get();
MigrationTaskStatus taskStatus = MigrationTaskStatus.getByCode(taskConfig.getStatus());
if (taskStatus == MigrationTaskStatus.ended) {//已结束,直接退出
return;
}
//2. 获取要迁移的表和表名列名映射
List<String> tables = taskConfig.getSourceTables().isEmpty() ? Collections.emptyList() :
Arrays.asList(taskConfig.getSourceTables().split(","));
String mapping = taskConfig.getMapping();
Map<String, TableMapping> mappings = getMapping(mapping);
全量数据支持
如果需要全量数据,需要先扫表,完成之后再处理增量数据。当然我们会及时记录当前迁移位置。
//如果从暂停中恢复,读取上次迁移的位置
List<Object> pk = getLastPK(physicalSchema, physicalTable);
启动Maxwell
public void start() {
//准备Kafka-product
createKafkaTopicsIfNotExist(config.getKafkaProperties(), config.kafkaTopic, taskConfig.getSourceSchema(),
taskConfig.getSourceTables().split(","));
new Thread(() -> {
try {
maxwell.start();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}).start();
}
消费Maxwell生产的Kafka消息
如果是数据订阅的话,需要订阅方自己处理消费逻辑
//5. 开始监听kafka
this.incrementalMigrationTask = new IncrementalMigrationTask(taskConfig, mappings, taskService, maxwellYamlConfig);
incrementalMigrationTask.start();
private void push(ConsumerRecord<DTSEventKey, DTSEvent> record) {
DTSEvent value = record.value();
log.debug("row: {}", value);
String table = value.getTable();
if (task.getSourceType() != SourceType.mysql.getCode()) {
table = ShardingUtils.getLogicTable(task.getSourceSchema(), table);
}
for ( ;; ) {
try {
writeToMySQL(value, table, new ArrayList<>(record.key().getPk().keySet()));
break;
} catch (Exception exception) {
log.error("IncrementalMigrationTask.push param:record={}", record, exception);
AlertUtils.yuyinAlert(exception,task.getId(),task.getName(), "增量执行失败!!!!");
TimeUnit.SECONDS.sleep(1);
}
}
delayTimer.update(System.currentTimeMillis() - value.getTs() * 1000, TimeUnit.MILLISECONDS);
tpsMeter.mark();
}
文末撒花🎉🎉🎉
Copyright by 玩物得志研发部