不知不觉【数据迁移专题】已经进行了两期,在先前《跨越异构鸿沟,Redis 迁移同步过程中的挑战与解决方案》和《在线数据迁移,数字化时代的必修课》中,我们为大家介绍了数据迁移挑战与技术选型,并详细分享京东云自研开源的RedisSyncer 项目。本篇是系列内容第三篇,我们来聊一聊如何用RedisSyncer实现数据双向同步。
RedisSyncer是京东云自研的redis多任务同步中间件工具集,应用于redis单实例及集群同步。该工具集包括:
目前在github开源:
https://github.com/TraceNature/redissyncer-server
利用数据冲销的方式破除数据写入环。该方案的必要条件是,同步的实例或集群写入的key无冲突,即在数据中心A写入的key,不会同一时间在B中心写入相异值。
假设我们有两个redis实例redis1和redis2,再分别定义两个冲销池set1和set2,记录key及同步次数。
为什么增加第五步改变redis1->redis2增量任务行为呢?因为在第四步完成时set2中并没有从redis1->redis2的增量数据,这会造成从redis1->redis2的增量数据会转换成redis2->redis1增量数据且在本地无法被冲销,只有数据进入set1且被写入redis1后再次作为增量数据向redis2同步时才会被冲销,增加了网络开销同时redis1也增加了一次写入负载。
数据冲销方式及其缺陷
业务双写是最符合人类直觉的双向方案,同一份数据写入两个数据中心以保障数据冗余。但是在实际操作中会遇到数据写入顺序问题。
双写方案中的数据顺序问题
并发环境中同时写入同一个key的情况下,并不能保障key写入redis的顺序。造成key的结果不一致。
环境列表
主机名 | IP地址 | 部署软件或工具 |
---|---|---|
az_a1 | 10.0.0.110 | redis5.0 |
az_a1 | 10.0.0.110 | redissyncer |
az_a2 | 10.0.0.111 | redis5.0 |
az_b1 | 10.0.0.112 | redis5.0 |
az_b1 | 10.0.0.112 | redissyncer |
az_b2 | 10.0.0.113 | redis5.0 |
az_a1 代表 a 中心的 redis RW 实例;az_a2 代表 a 中心 redis RO 实例;az_b1 代表 b 中心 redis RW 实例;az_b2 代表 b 中心 redis RO 实例
分别在 az_a1、az_b1上部署 redissyncer 用于同步到对端数据中心
通过 redisdual 模拟双读客户端
1git clone https://github.com/TraceNature/redissyncer.git
2cd redissyncer
3docker-compose up -d
部署 redissyncer-cli 用于与服务器通讯
下载客户端程序
1wget https://github.com/TraceNature/redissyncer-cli/releases/download/v0. 1.0/redissyncer-cli-0.1.0-linux-amd64.tar.gz
2
3tar zxvf redissyncer-cli-0.1.0-linux-amd64.tar.gz
配置.config.yaml 文件
1# redissyncer-server 访问地址及端口
2syncserver: http://127.0.0.1:8080
3# 访问 redissyncer-server 的 token。可以通过 redissyncer-cli login 命令获得。默认用户名 admin 默认密码 123456.完整命令 redissyncer-cli login admin 123456
4token: 379F5E2BD55A4608B6A7557F0583CFC5
1{
2"sourcePassword": "redistest0102",
3"sourceRedisAddress": "10.0.0.110:16375",
4"targetRedisAddress": "10.0.0.113:16375",
5"targetPassword": "redistest0102",
6"taskName": "a1_to_b2",
7"targetRedisVersion": 5.0,
8"autostart": true,
9"afresh": true,
10"batchSize": 100
11}
启动任务
1redissyncer-cli-0.1.0-linux-amd64 -i
2redissyncer-cli> task create source synctask/a1_to_b2.json;
az_b1 配置同步任务同步到 az_a2
编辑任务文件 synctask/b1_to_a2.json
1{
2 "sourcePassword": "redistest0102",
3 "sourceRedisAddress": "10.0.0.112:16375",
4 "targetRedisAddress": "10.0.0.111:16375",
5 "targetPassword": "redistest0102",
6 "taskName": "b1_to_a2",
7 "targetRedisVersion": 5.0,
8 "autostart": true,
9 "afresh": true,
10 "batchSize": 100
11}
启动任务
1redissyncer-cli-0.1.0-linux-amd64 -i
2redissyncer-cli> task create source synctask/b1_to_a2.json;
通过redisdual 模拟redis 双读
config.yaml 文件参数详解
1# 日志配置不需改动
2zap:
3level: 'debug'
4format: 'console'
5prefix: '[redisdual]'
6director: 'log'
7link-name: 'latest_log'
8show-line: true
9encode-level: 'LowercaseColorLevelEncoder'
10# stacktrace-key: 'stacktrace'
11log-in-console: true
12
13# 执行时间间隔,单位毫秒,可以控制每次执行的间隔时长,便于观察日志
14execinterval: 1
15
16# 循环执行最大步长,当大于步长时归零;当达到步长时,重新执行写入和双读操作
17loopstep: 30
18
19# 本地key前缀,区分本地写入key和remote端写入的key
20localkeyprefix: a
21remotekeyprefix: b
22
23# redis 读写实例
24redisrw:
25db: 0
26addr: '114.67.76.82:16375'
27password: 'redistest0102'
28# redis 只读实例
29redisro:
30db: 0
31addr: '114.67.120.120:16375'
32password: 'redistest0102'
1func dual(rw *redis.Client, ro *redis.Client, key string) {
2 roResult, err := ro.Get(key).Result()
3
4 if err == nil && roResult != "" {
5 global.RSPLog.Sugar().Infof("Get key %s from redisro result is:%s ", key, roResult)
6 return
7 }
8
9 rwResult, err := rw.Get(key).Result()
10 if err != nil || rwResult == "" {
11 global.RSPLog.Sugar().Infof("key %s no result return!", key)
12 return
13 }
14
15 global.RSPLog.Sugar().Infof("Get key %s from redisrw result is: %s ", key, rwResult)
16
17}
1// -d 后台启动
2if global.RSPViper.GetBool("daemon") {
3cmd, err := background()
4if err != nil {
5panic(err)
6}
7
8//根据返回值区分父进程子进程
9if cmd != nil { //父进程
10fmt.Println("PPID: ", os.Getpid(), "; PID:", cmd.Process.Pid, "; Operating parameters: ", os.Args)
11return //父进程退出
12} else { //子进程
13fmt.Println("PID: ", os.Getpid(), "; Operating parameters: ", os.Args)
14}
15}
16
17global.RSPLog = core.Zap()
18global.RSPLog.Info("server start ... ")
19
20pidMap := make(map[string]int)
21// 记录pid
22pid := syscall.Getpid()
23pidMap["pid"] = pid
24
25pidYaml, _ := yaml.Marshal(pidMap)
26dir, err := filepath.Abs(filepath.Dir(os.Args[0]))
27if err != nil {
28panic(err)
29}
30
31if err := ioutil.WriteFile(dir+"/pid", pidYaml, 0664); err != nil {
32global.RSPLog.Sugar().Error(err)
33panic(err)
34}
35global.RSPLog.Sugar().Infof("Actual pid is %d", pid)
36
37//redis 读写实例
38redisRW := GetRedisRW()
39
40//redis 只读实例
41redisRO := GetRedisRO()
42
43//清理RW
44redisRW.FlushAll()
45
46global.RSPLog.Sugar().Info("execinterval:", global.RSPViper.GetInt("execinterval"))
47loopstep := global.RSPViper.GetInt("loopstep")
48i := 0
49for {
50if i > loopstep {
51i = 0
52}
53key := global.RSPViper.GetString("localkeyprefix") + "_key" + strconv.Itoa(i)
54redisRW.Set(key, key+"_"+strconv.FormatInt(time.Now().UnixNano(), 10), 3600*time.Second)
55dual(redisRW, redisRO, global.RSPViper.GetString("localkeyprefix")+"_key"+strconv.Itoa(i))
56dual(redisRW, redisRO, global.RSPViper.GetString("remotekeyprefix")+"_key"+strconv.Itoa(i))
57i++
58time.Sleep(time.Duration(global.RSPViper.GetInt("execinterval")) * time.Millisecond)
59}
启动redisdual 并观察日志
redisdual start
redis的双向同步方案的机制大致就是以上三种,具体生产中采用哪种方式要根据业务特性进行权衡。从数据安全和维护成本方面考虑,双读方案从运维成本来讲是最少的,且在故障发生时不会引起数据混淆。