基于flink cdc技术,在不停机情况下完成新老系统数据迁移解决方案

发布于:2024-04-19 ⋅ 阅读:(19) ⋅ 点赞:(0)

1.业务场景及痛点描述

      当前随着业务发展,由于公司是做海外业务,公司前几年搭建了官网商城,并积累了一定的用户量,然后去年,由于公司发展智能机器人业务,开发了手机APP,供用户控制机器人,也积累了一定的用户,现在公司想要APP端接入商城业务,完成官网商城和APP端用户体系体系打通,建立统一用户中心,并且业务系统接入用户中心的过程中业务系统不能停机,业务不中断。

2. 统一用户中心数据设计

   2.1 官网用户表结构(关键字段展示)
序号 字段名 字段类型 字段含义
1 id
2 email 用户邮箱,可用户登录,可以为空
4 user_login 用户账号,可用户登录
5 ....
6 ....
  2.2  APP端用户表结构(关键字段展示)
序号 字段名 字段类型 字段含义
1 id
2 email
3 app_version
4 os
5 last_login_time
6 ......
    2.3 统一用户中心表接口设计(关键字段展示)
序号 字段名 字段类型 字段含义
1 id
2 gw_user_id
3 app_user_id
4 user_login
5 email
6 app_version
7 os
8 last_login_time
9 row_key 标识当前行的唯一字段
10 ......

3. 使用flInk cdc技术,进行数据持续迁移,让业务系统改造完美融合

3.1 创建官网用户数据同步任务,代码如下:
CREATE TABLE gw_user(
	  id bigInt, 
	  user_login STRING, 
	  password STRING, 
	  email STRING, 
	  PRIMARY KEY (id) NOT ENFORCED
) comment '官网用户信息'
 WITH (  
	'connector'='mysql-cdc',
	hostname'='xxx.xx.xxx.xx',
	'port'='xx',
	'username'='xx',
	'password'='xxx',
	'scan.startup.mode'='initial',
	'database-name'='gw',
	'table-name'='user'
);

CREATE TABLE user_center_user (
  user_login STRING, 
  email STRING, 
  password STRING, 
  row_key STRING,
  PRIMARY KEY (row_key) NOT ENFORCED 
) comment 'sink app user'
 WITH ( 
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/user_center?useUnicode=true&characterEncoding=UTF-8',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'user' 
);

insert into 
    user_center_user
select 
  user_login,
  email,
  password,
  md5(user_login | email | source)
from gw_user;

3.2 创建APP用户数据同步任务,代码如下:
CREATE TABLE app_user(
	  id bigInt, 
	  email STRING, 
	  password STRING, 
	  .....
	  PRIMARY KEY (id) NOT ENFORCED
) comment 'APP用户信息'
 WITH (  
	'connector'='mysql-cdc',
	hostname'='xxx.xx.xxx.xx',
	'port'='xx',
	'username'='xx',
	'password'='xxx',
	'scan.startup.mode'='initial',
	'database-name'='app',
	'table-name'='user'
);


CREATE TABLE user_center_user (
  user_login STRING, 
  email STRING, 
  password STRING, 
  row_key STRING,
  PRIMARY KEY (row_key) NOT ENFORCED 
) comment 'sink app user'
 WITH ( 
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/user_center?useUnicode=true&characterEncoding=UTF-8',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'user' 
);

insert into 
    user_center_user
select 
  user_login,
  email,
  password,
  md5(user_login | email | source)
from app_user;

 4. 总结

    当前这里只描述了数据同步相关的设计,更多业务处理还是需要各个自己修改适配。