Python使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步

发布于:2025-06-07 ⋅ 阅读:(20) ⋅ 点赞:(0)

下面是一个使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步的Python解决方案,包含全量同步、增量同步和测试用例。

此解决方案提供了生产级数据同步所需的核心功能,可根据具体场景扩展更多高级特性如:数据转换、字段映射、类型转换等。

设计思路

  1. 全量同步:首次运行时将MySQL表完整导入ClickHouse
  2. 增量同步:基于增量字段(如自增ID或时间戳)同步新增数据
  3. 状态管理:使用JSON文件记录同步位置
  4. 错误处理:完善的日志和异常处理机制
import subprocess
import json
import os
import logging
from configparser import ConfigParser

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

class MySQLToClickHouseSync:
    def __init__(self, config_path='config.ini'):
        self.config = self._load_config(config_path)
        self.state_file = self.config['state_file']
        self.last_state = self._load_state()

    def _load_config(self, path):
        """加载配置文件"""
        config = ConfigParser()
        config.read(path)
        return {
            'mysql': dict(config['mysql']),
            'clickhouse': dict(config['clickhouse']),
            'state_file': config['general']['state_file']
        }

    def _load_state(self):
        """加载同步状态"""
        try:
            if os.path.exists(self.state_file):
                with open(self.state_file, 'r') as f:
                    return json.load(f)
            return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}
        except Exception as e:
            logger.error(f"加载状态失败: {e}")
            return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}

    def _save_state(self, state):
        """保存同步状态"""
        try:
            with open(self.state_file, 'w') as f:
                json.dump(state, f)
            logger.info(f"状态已保存: {state}")
        except Exception as e:
            logger.error(f"保存状态失败: {e}")

    def run_clickhouse_command(self, query):
        """执行clickhouse-local命令"""
        cmd = [
            'clickhouse-local',
            '--query', query
        ]
        try:
            result = subprocess.run(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                check=True
            )
            logger.debug(f"命令执行成功: {cmd}\n输出: {result.stdout}")
            return True
        except subprocess.CalledProcessError as e:
            logger.error(f"命令执行失败: {cmd}\n错误: {e.stderr}")
            return False

    def full_sync(self):
        """全量数据同步"""
        mysql = self.config['mysql']
        ch = self.config['clickhouse']
        
        query = f"""
        CREATE TABLE {ch['table']} ENGINE = MergeTree ORDER BY id AS
        SELECT *
        FROM mysql('{mysql['host']}:{mysql['port']}', 
                   '{mysql['database']}', 
                   '{mysql['table']}', 
                   '{mysql['user']}', 
                   '{mysql['password']}')
        """
        
        logger.info("开始全量同步...")
        if self.run_clickhouse_command(query):
            # 获取最新ID作为增量起点
            max_id_query = f"""
            SELECT max(id) 
            FROM mysql('{mysql['host']}:{mysql['port']}', 
                      '{mysql['database']}', 
                      '{mysql['table']}', 
                      '{mysql['user']}', 
                      '{mysql['password']}')
            """
            cmd = ['clickhouse-local', '--query', max_id_query]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                new_state = {'last_id': int(result.stdout.strip())}
                self._save_state(new_state)
                self.last_state = new_state
            logger.info("全量同步完成")
            return True
        return False

    def incremental_sync(self):
        """增量数据同步"""
        mysql = self.config['mysql']
        ch = self.config['clickhouse']
        last_id = self.last_state.get('last_id', 0)
        
        query = f"""
        INSERT INTO {ch['table']}
        SELECT *
        FROM mysql('{mysql['host']}:{mysql['port']}', 
                   '{mysql['database']}', 
                   '{mysql['table']}', 
                   '{mysql['user']}', 
                   '{mysql['password']}')
        WHERE id > {last_id}
        """
        
        logger.info(f"开始增量同步, 最后ID: {last_id}")
        if self.run_clickhouse_command(query):
            # 获取新增的最大ID
            new_max_query = f"""
            SELECT max(id) 
            FROM mysql('{mysql['host']}:{mysql['port']}', 
                      '{mysql['database']}', 
                      '{mysql['table']}', 
                      '{mysql['user']}', 
                      '{mysql['password']}')
            WHERE id > {last_id}
            """
            cmd = ['clickhouse-local', '--query', new_max_query]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0 and result.stdout.strip():
                new_id = int(result.stdout.strip())
                if new_id > last_id:
                    self._save_state({'last_id': new_id})
                    self.last_state = {'last_id': new_id}
                    logger.info(f"增量同步完成, 新最后ID: {new_id}")
                else:
                    logger.info("没有新数据需要同步")
            return True
        return False

# 配置文件示例 (config.ini)
"""
[general]
state_file = sync_state.json

[mysql]
host = 127.0.0.1
port = 3306
database = test_db
table = source_table
user = root
password = mysqlpass

[clickhouse]
table = default.target_table
"""

if __name__ == "__main__":
    sync = MySQLToClickHouseSync()
    
    # 首次运行全量同步
    if not sync.last_state.get('last_id'):
        sync.full_sync()
    
    # 后续增量同步
    sync.incremental_sync()

测试用例

import unittest
import sqlite3
from unittest.mock import patch, MagicMock
import tempfile
import os
import json

class TestMySQLToClickHouseSync(unittest.TestCase):
    def setUp(self):
        self.config = {
            'state_file': 'test_state.json',
            'mysql': {
                'host': '127.0.0.1',
                'port': '3306',
                'database': 'test_db',
                'table': 'source_table',
                'user': 'root',
                'password': 'pass'
            },
            'clickhouse': {
                'table': 'target_table'
            }
        }
        
        # 创建临时状态文件
        self.state_file = tempfile.NamedTemporaryFile(delete=False)
        self.config['state_file'] = self.state_file.name
        
    def tearDown(self):
        os.unlink(self.state_file.name)
    
    def test_full_sync(self):
        """测试全量同步"""
        with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \
             patch.object(MySQLToClickHouseSync, '_load_state', return_value={'last_id': 0}), \
             patch('subprocess.run') as mock_run:
            
            # 模拟clickhouse-local成功执行
            mock_run.return_value = MagicMock(returncode=0, stdout="100")
            
            sync = MySQLToClickHouseSync()
            result = sync.full_sync()
            
            # 验证命令执行
            self.assertTrue(mock_run.called)
            self.assertTrue(result)
            
            # 验证状态更新
            with open(self.state_file.name) as f:
                state = json.load(f)
                self.assertEqual(state['last_id'], 100)

    def test_incremental_sync(self):
        """测试增量同步"""
        # 初始状态
        with open(self.state_file.name, 'w') as f:
            json.dump({'last_id': 50}, f)
        
        with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \
             patch('subprocess.run') as mock_run:
            
            # 模拟获取新最大ID为75
            mock_run.side_effect = [
                MagicMock(returncode=0),  # INSERT执行
                MagicMock(returncode=0, stdout="75")  # SELECT max(id)
            ]
            
            sync = MySQLToClickHouseSync()
            result = sync.incremental_sync()
            
            # 验证命令执行
            self.assertEqual(mock_run.call_count, 2)
            self.assertTrue(result)
            
            # 验证状态更新
            with open(self.state_file.name) as f:
                state = json.load(f)
                self.assertEqual(state['last_id'], 75)

    def test_no_new_data(self):
        """测试无新数据的情况"""
        with open(self.state_file.name, 'w') as f:
            json.dump({'last_id': 100}, f)
        
        with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \
             patch('subprocess.run') as mock_run:
            
            # 模拟返回空结果
            mock_run.side_effect = [
                MagicMock(returncode=0),
                MagicMock(returncode=0, stdout="")
            ]
            
            sync = MySQLToClickHouseSync()
            result = sync.incremental_sync()
            
            self.assertTrue(result)
            # 状态应保持不变
            self.assertEqual(sync.last_state['last_id'], 100)

    def test_command_failure(self):
        """测试命令执行失败"""
        with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \
             patch('subprocess.run') as mock_run:
            
            mock_run.side_effect = subprocess.CalledProcessError(
                1, "cmd", output="", stderr="Error"
            )
            
            sync = MySQLToClickHouseSync()
            result = sync.full_sync()
            
            self.assertFalse(result)

if __name__ == '__main__':
    unittest.main()

使用说明

  1. 安装依赖:
pip install configparser
  1. 准备配置文件 (config.ini):
[general]
state_file = sync_state.json

[mysql]
host = 127.0.0.1
port = 3306
database = your_db
table = source_table
user = root
password = your_mysql_password

[clickhouse]
table = default.target_table
  1. 创建ClickHouse表 (自动创建):
-- 首次运行时会自动创建表
-- 表结构自动从MySQL继承
  1. 运行同步:
# 首次运行(全量同步)
python sync.py

# 后续运行(增量同步)
python sync.py

关键特性

  1. 高效同步

    • 使用clickhouse-local直接管道传输,无需中间存储
    • 批量数据加载,避免逐行插入
  2. 增量同步机制

    • 基于自增ID的增量检测
    • 支持时间戳字段(需修改WHERE条件)
  3. 状态管理

    • JSON文件记录最后同步位置
    • 支持异常恢复
  4. 错误处理

    • 详细日志记录
    • 子进程错误捕获
    • 状态文件异常处理
  5. 配置驱动

    • 所有参数通过配置文件管理
    • 敏感信息与代码分离

性能优化建议

  1. 大表分批次同步
# 在全量同步中增加分页逻辑
BATCH_SIZE = 100000
for offset in range(0, total_count, BATCH_SIZE):
    query = f"SELECT * FROM ... LIMIT {BATCH_SIZE} OFFSET {offset}"
  1. 使用时间戳增量
# 修改增量查询条件
WHERE update_time > '{last_timestamp}'
  1. 并行处理
# 使用ThreadPoolExecutor并行处理不同数据分区
from concurrent.futures import ThreadPoolExecutor
  1. 压缩传输
# 在命令中添加压缩选项
clickhouse-local --query "..." | gzip | clickhouse-client --query "INSERT ..."