基于binlog实现数据加工处理

发布于:2024-04-25 ⋅ 阅读:(25) ⋅ 点赞:(0)

场景举例:

为了查询方便性,目前订单中存在好多冗余字段,例如用户昵称,但是当昵称对应表变化时候,好多同学可能就直接在修改昵称的地方手 动调用订单接口更新昵称,但这样不仅代码结构混乱而且耦合严重

使用说明:

下面举例过程只是基于单机简单示例,没有加任务线程池、并发考虑。例外个人建议如果是同步重要的数据不要用此种同步方式,用不好可 能存在丢数据风险,向我上面说的一些仅展示无业务用户的字段可以用此同步

具体使用

应用maven核心依赖,其他spring等依赖可以自己添加

 <!--binlog同步核心依赖-->
<dependency>
  <groupId>com.github.shyiko</groupId>
  <artifactId>mysql-binlog-connector-java</artifactId>
  <version>0.21.0</version>
</dependency>
 <!--mysql链接-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>6.0.6</version>
</dependency>
<!--mysql解析-->
<dependency>
  <groupId>com.github.jsqlparser</groupId>
  <artifactId>jsqlparser</artifactId>
  <version>4.5</version>
</dependency>

下面是我举例说明具体实现代码,用户直接实现BizTableListener接口自定义自己想同步表逻辑即可,举例中我只放了些核心代码,具体代码已附件上传

代码

1.配置数据库信息

package com.data.binlog.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(
        prefix = "binlog.datasource"
)
public class DataSourceConfig {

    private String url;

    private int port;

    private String username;

    private String passwd;

    private String db;

}

logging:
  config: classpath:log4j2/log4j2-dev.yml
binlog:
  datasource:
    url: mysql的地址
    db: 数据库
    port: 端口
    username: 用户名
    passwd: 密码

2.加载binlog配置信息

package com.data.binlog.config;

import com.data.binlog.listener.BinLogEventListener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
public class BinaryLogClientConfig {

    @Resource
    private BinLogEventListener binLogEventListener;
    @Resource
    private DataSourceConfig dataSourceConfig;
    @Bean
    public void BinaryLog() throws Exception{
        BinaryLogClient client = new BinaryLogClient(dataSourceConfig.getUrl(), dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());
        EventDeserializer eventDeserializer = new EventDeserializer();
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(binLogEventListener);
        client.connect();
    }

}

package com.data.binlog.listener;

import cn.hutool.core.util.ObjectUtil;
import com.data.binlog.config.DataSourceConfig;
import com.data.binlog.context.TableContext;
import com.data.binlog.listener.biz.BizTableRouteProcess;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.Serializable;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * binlog监听主类
 */
@Component
@Slf4j
public class BinLogEventListener implements BinaryLogClient.EventListener {
    //调用用户自定义表处理实现类
    @Resource
    private BizTableRouteProcess bizTableRouteProcess;
    @Resource
    private DataSourceConfig dataSourceConfig;

    @Override
    public void onEvent(Event event) {
        EventHeader header = event.getHeader();
        EventType eventType = header.getEventType();
        System.out.println("监听的事件类型:" + eventType);
        Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();
        Map<String, String> tableNameMap = TableContext.getTableNameMap();
        Map<String, String> tableIdMap = TableContext.getTableIdMap();
        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            String tableId = String.valueOf(tableData.getTableId());
            try {
                tableNameMap.put(table, tableId);
                tableIdMap.put(tableId, table);
                //如果缓存中有数据结构则直接掉过
                if (tableIdColumnMap.get(tableId) == null) {
                    Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);
                    tableIdColumnMap.put(tableId, columnInfoMap);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (eventType == EventType.QUERY) {
            //更新表结构会进此逻辑
            QueryEventData queryEventData = event.getData();
            String execSql = queryEventData.getSql();
            Statement statement = null;
            try {
                statement = CCJSqlParserUtil.parse(execSql);
            } catch (JSQLParserException e) {
                log.error("{} sql语句格式错误", execSql);
                return;
            }
            //如果字段发生更新需要删除重新获取
            if (statement instanceof Alter) {
                Alter alterStatement = (Alter) statement;
                String tableName = alterStatement.getTable().getName();
                //数据结构有变化清除
                tableIdColumnMap.remove(tableNameMap.get(tableName));
                log.error("数据结构变化", tableName);
            }
        }
        if (EventType.isWrite(eventType)) {
            //获取事件体
            WriteRowsEventData data = event.getData();
        } else if (EventType.isUpdate(eventType)) {
            UpdateRowsEventData data = (UpdateRowsEventData) event.getData();
            for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {
                Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));
                Map<String, Serializable> before = Maps.newHashMap();
                Map<String, Serializable> after = Maps.newHashMap();
                Map<String, Object[]> change = Maps.newHashMap();
                BinLogItem binLogItem = new BinLogItem();
                binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
                columnInfoMap.entrySet().forEach(entry -> {
                    String column = entry.getKey();
                    ColumnInfo columnInfo = entry.getValue();
                    Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];
                    Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];
                    before.put(column, beforeValue);
                    after.put(column, afterValue);
                    if (!ObjectUtil.equals(beforeValue, afterValue)) {
                        change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());
                    }
                });
                binLogItem.setEventType(eventType);
                binLogItem.setTimestamp(event.getHeader().getTimestamp());
                binLogItem.setBefore(before);
                binLogItem.setAfter(after);
                binLogItem.setColumnChangeMap(change);
                binLogItem.setColumnInfoMap(columnInfoMap);
                bizTableRouteProcess.route(binLogItem);
            }
            System.out.println(data);
        } else if (EventType.isDelete(eventType)) {
            DeleteRowsEventData data = event.getData();
            BinLogItem binLogItem = new BinLogItem();
            binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
            binLogItem.setEventType(eventType);
            binLogItem.setTimestamp(event.getHeader().getTimestamp());
            bizTableRouteProcess.route(binLogItem);
            System.out.println(data);
        }

    }

    /**
     * 获取表中所有的字段信息
     *
     * @param db
     * @param table
     * @return
     * @throws Exception
     */
    public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {
        Map<String, ColumnInfo> map = new HashMap<>();
        try {
            Class.forName("com.mysql.jdbc.Driver");
            // 保存当前注册的表的column信息
            Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"
                    + dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sql
            String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
                    "DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
                    " FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";
            String sql = "SELECT * FROM TENANT_DOMAIN_REL";
            PreparedStatement ps = connection.prepareStatement(preSql);
            ps.setString(1, db);
            ps.setString(2, table);
            ResultSet rs = ps.executeQuery();
            while (rs.next()) {
                String schema = rs.getString("TABLE_SCHEMA");
                String tableName = rs.getString("TABLE_NAME");
                String column = rs.getString("COLUMN_NAME");
                int idx = rs.getInt("ORDINAL_POSITION");
                String dataType = rs.getString("DATA_TYPE");
                String isPKC = rs.getString("IS_PKC");
                ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));
                map.put(column, columnInfo);
            }
            ps.close();
            rs.close();

        } catch (SQLException e) {
        }
        return map;
    }
}

package com.data.binlog.listener.biz;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.jdl.edu.core.common.utils.SpringContextHolder;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;

/**
 * 用户自定义实现路由
 */
@Component
public class BizTableRouteProcess {

    public static Map<String, List<BizTableListener>> tableIdColumnMap = new HashMap<>();
    @Resource
    private SpringContextHolder springContextHolder;


    @PostConstruct
    public void initRoute() {
        // 使用ServiceLoader加载UserService接口的所有实现
        List<BizTableListener> serviceImpl = SpringContextHolder.getBeanList(BizTableListener.class);
        // 遍历加载的实现并获取它们的Class对象
        if(serviceImpl != null){
            for (BizTableListener implementation : serviceImpl) {
                BinLogTable table =  implementation.getClass().getAnnotation(BinLogTable.class);
                if(table !=null && ObjectUtil.isNotEmpty(table.tableName())){
                    String tableName = table.tableName();
                    BizTableListener bean =  SpringContextHolder.getBean(StrUtil.lowerFirst(implementation.getClass().getSimpleName()),BizTableListener.class);
                    List<BizTableListener> tableImplList = tableIdColumnMap.getOrDefault(tableName,new ArrayList<>());
                    tableImplList.add(bean);
                    tableIdColumnMap.put(tableName,tableImplList);
                }
            }
        }

    }


    public void route(BinLogItem binLogItem) {
        List<BizTableListener> tableImplList =  tableIdColumnMap.get(binLogItem.getTableName());
        if(ObjectUtil.isEmpty(tableImplList)){
           return;
        }
        tableImplList.forEach(impl->{
            impl.listener(binLogItem);
        });
    }

}

下面是对user表变化的处理加工类,可以热插拔,直接打注解即可

package com.data.binlog.listener.biz;

import com.data.binlog.param.BinLogItem;

/**
 * 自定义处理类统一实现接口
 */
public interface BizTableListener {


    public void listener(BinLogItem binLogItem);
}

package com.data.binlog.listener.biz.user;

import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.listener.biz.BizTableListener;
import com.data.binlog.param.BinLogItem;
import com.github.shyiko.mysql.binlog.event.EventType;
import org.springframework.stereotype.Component;

/**
 * 用户想监听哪些表直接实现BizTableListener接口打BinLogTable注解上表明对应的表名
 */
@BinLogTable(tableName = "user")
@Component
public class UserTableListener implements BizTableListener {

    private static Long currentTs = null;

    @Override
    public void listener(BinLogItem binLogItem) {
        //全量更新才需判断
        if (currentTs != null && binLogItem.getTimestamp() != null && currentTs >= binLogItem.getTimestamp()) {
            System.out.println("当前消息不是最新数据");
            return;
        }
        if (EventType.isUpdate(binLogItem.getEventType())) {
            System.out.println("自己的逻辑");
        } else {
            ///
        }
        currentTs = binLogItem.getTimestamp();
    }
}


总结

代码附件: https://download.csdn.net/download/zhaoyonghenghcl/89221807
上面是关于binlog同步基本代码实现,代码已分享链接,有问题随时沟通