场景举例:
为了查询方便性,目前订单中存在好多冗余字段,例如用户昵称,但是当昵称对应表变化时候,好多同学可能就直接在修改昵称的地方手 动调用订单接口更新昵称,但这样不仅代码结构混乱而且耦合严重
使用说明:
下面举例过程只是基于单机简单示例,没有加任务线程池、并发考虑。例外个人建议如果是同步重要的数据不要用此种同步方式,用不好可 能存在丢数据风险,向我上面说的一些仅展示无业务用户的字段可以用此同步
具体使用
应用maven核心依赖,其他spring等依赖可以自己添加
<!--binlog同步核心依赖-->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
<!--mysql链接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<!--mysql解析-->
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>4.5</version>
</dependency>
下面是我举例说明具体实现代码,用户直接实现BizTableListener接口自定义自己想同步表逻辑即可,举例中我只放了些核心代码,具体代码已附件上传
代码
1.配置数据库信息
package com.data.binlog.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(
prefix = "binlog.datasource"
)
public class DataSourceConfig {
private String url;
private int port;
private String username;
private String passwd;
private String db;
}
logging:
config: classpath:log4j2/log4j2-dev.yml
binlog:
datasource:
url: mysql的地址
db: 数据库
port: 端口
username: 用户名
passwd: 密码
2.加载binlog配置信息
package com.data.binlog.config;
import com.data.binlog.listener.BinLogEventListener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class BinaryLogClientConfig {
@Resource
private BinLogEventListener binLogEventListener;
@Resource
private DataSourceConfig dataSourceConfig;
@Bean
public void BinaryLog() throws Exception{
BinaryLogClient client = new BinaryLogClient(dataSourceConfig.getUrl(), dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());
EventDeserializer eventDeserializer = new EventDeserializer();
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(binLogEventListener);
client.connect();
}
}
package com.data.binlog.listener;
import cn.hutool.core.util.ObjectUtil;
import com.data.binlog.config.DataSourceConfig;
import com.data.binlog.context.TableContext;
import com.data.binlog.listener.biz.BizTableRouteProcess;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* binlog监听主类
*/
@Component
@Slf4j
public class BinLogEventListener implements BinaryLogClient.EventListener {
//调用用户自定义表处理实现类
@Resource
private BizTableRouteProcess bizTableRouteProcess;
@Resource
private DataSourceConfig dataSourceConfig;
@Override
public void onEvent(Event event) {
EventHeader header = event.getHeader();
EventType eventType = header.getEventType();
System.out.println("监听的事件类型:" + eventType);
Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();
Map<String, String> tableNameMap = TableContext.getTableNameMap();
Map<String, String> tableIdMap = TableContext.getTableIdMap();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
String tableId = String.valueOf(tableData.getTableId());
try {
tableNameMap.put(table, tableId);
tableIdMap.put(tableId, table);
//如果缓存中有数据结构则直接掉过
if (tableIdColumnMap.get(tableId) == null) {
Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);
tableIdColumnMap.put(tableId, columnInfoMap);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (eventType == EventType.QUERY) {
//更新表结构会进此逻辑
QueryEventData queryEventData = event.getData();
String execSql = queryEventData.getSql();
Statement statement = null;
try {
statement = CCJSqlParserUtil.parse(execSql);
} catch (JSQLParserException e) {
log.error("{} sql语句格式错误", execSql);
return;
}
//如果字段发生更新需要删除重新获取
if (statement instanceof Alter) {
Alter alterStatement = (Alter) statement;
String tableName = alterStatement.getTable().getName();
//数据结构有变化清除
tableIdColumnMap.remove(tableNameMap.get(tableName));
log.error("数据结构变化", tableName);
}
}
if (EventType.isWrite(eventType)) {
//获取事件体
WriteRowsEventData data = event.getData();
} else if (EventType.isUpdate(eventType)) {
UpdateRowsEventData data = (UpdateRowsEventData) event.getData();
for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {
Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));
Map<String, Serializable> before = Maps.newHashMap();
Map<String, Serializable> after = Maps.newHashMap();
Map<String, Object[]> change = Maps.newHashMap();
BinLogItem binLogItem = new BinLogItem();
binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
columnInfoMap.entrySet().forEach(entry -> {
String column = entry.getKey();
ColumnInfo columnInfo = entry.getValue();
Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];
Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];
before.put(column, beforeValue);
after.put(column, afterValue);
if (!ObjectUtil.equals(beforeValue, afterValue)) {
change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());
}
});
binLogItem.setEventType(eventType);
binLogItem.setTimestamp(event.getHeader().getTimestamp());
binLogItem.setBefore(before);
binLogItem.setAfter(after);
binLogItem.setColumnChangeMap(change);
binLogItem.setColumnInfoMap(columnInfoMap);
bizTableRouteProcess.route(binLogItem);
}
System.out.println(data);
} else if (EventType.isDelete(eventType)) {
DeleteRowsEventData data = event.getData();
BinLogItem binLogItem = new BinLogItem();
binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
binLogItem.setEventType(eventType);
binLogItem.setTimestamp(event.getHeader().getTimestamp());
bizTableRouteProcess.route(binLogItem);
System.out.println(data);
}
}
/**
* 获取表中所有的字段信息
*
* @param db
* @param table
* @return
* @throws Exception
*/
public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {
Map<String, ColumnInfo> map = new HashMap<>();
try {
Class.forName("com.mysql.jdbc.Driver");
// 保存当前注册的表的column信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"
+ dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sql
String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
"DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
" FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";
String sql = "SELECT * FROM TENANT_DOMAIN_REL";
PreparedStatement ps = connection.prepareStatement(preSql);
ps.setString(1, db);
ps.setString(2, table);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String schema = rs.getString("TABLE_SCHEMA");
String tableName = rs.getString("TABLE_NAME");
String column = rs.getString("COLUMN_NAME");
int idx = rs.getInt("ORDINAL_POSITION");
String dataType = rs.getString("DATA_TYPE");
String isPKC = rs.getString("IS_PKC");
ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));
map.put(column, columnInfo);
}
ps.close();
rs.close();
} catch (SQLException e) {
}
return map;
}
}
package com.data.binlog.listener.biz;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.jdl.edu.core.common.utils.SpringContextHolder;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
/**
* 用户自定义实现路由
*/
@Component
public class BizTableRouteProcess {
public static Map<String, List<BizTableListener>> tableIdColumnMap = new HashMap<>();
@Resource
private SpringContextHolder springContextHolder;
@PostConstruct
public void initRoute() {
// 使用ServiceLoader加载UserService接口的所有实现
List<BizTableListener> serviceImpl = SpringContextHolder.getBeanList(BizTableListener.class);
// 遍历加载的实现并获取它们的Class对象
if(serviceImpl != null){
for (BizTableListener implementation : serviceImpl) {
BinLogTable table = implementation.getClass().getAnnotation(BinLogTable.class);
if(table !=null && ObjectUtil.isNotEmpty(table.tableName())){
String tableName = table.tableName();
BizTableListener bean = SpringContextHolder.getBean(StrUtil.lowerFirst(implementation.getClass().getSimpleName()),BizTableListener.class);
List<BizTableListener> tableImplList = tableIdColumnMap.getOrDefault(tableName,new ArrayList<>());
tableImplList.add(bean);
tableIdColumnMap.put(tableName,tableImplList);
}
}
}
}
public void route(BinLogItem binLogItem) {
List<BizTableListener> tableImplList = tableIdColumnMap.get(binLogItem.getTableName());
if(ObjectUtil.isEmpty(tableImplList)){
return;
}
tableImplList.forEach(impl->{
impl.listener(binLogItem);
});
}
}
下面是对user表变化的处理加工类,可以热插拔,直接打注解即可
package com.data.binlog.listener.biz;
import com.data.binlog.param.BinLogItem;
/**
* 自定义处理类统一实现接口
*/
public interface BizTableListener {
public void listener(BinLogItem binLogItem);
}
package com.data.binlog.listener.biz.user;
import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.listener.biz.BizTableListener;
import com.data.binlog.param.BinLogItem;
import com.github.shyiko.mysql.binlog.event.EventType;
import org.springframework.stereotype.Component;
/**
* 用户想监听哪些表直接实现BizTableListener接口打BinLogTable注解上表明对应的表名
*/
@BinLogTable(tableName = "user")
@Component
public class UserTableListener implements BizTableListener {
private static Long currentTs = null;
@Override
public void listener(BinLogItem binLogItem) {
//全量更新才需判断
if (currentTs != null && binLogItem.getTimestamp() != null && currentTs >= binLogItem.getTimestamp()) {
System.out.println("当前消息不是最新数据");
return;
}
if (EventType.isUpdate(binLogItem.getEventType())) {
System.out.println("自己的逻辑");
} else {
///
}
currentTs = binLogItem.getTimestamp();
}
}
总结
代码附件: https://download.csdn.net/download/zhaoyonghenghcl/89221807
上面是关于binlog同步基本代码实现,代码已分享链接,有问题随时沟通