Linux下GCC的C++实现Hive到Snowflake数据迁移

发布于:2025-08-11 ⋅ 阅读:(15) ⋅ 点赞:(0)

程序结构

├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/

核心代码实现 (main.cpp)

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>

namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;

// 全局锁用于日志和队列同步
mutex log_mutex, queue_mutex;

// 配置文件结构
struct Config {
    string hive_jdbc;
    string hql_output_dir;
    string parquet_output_dir;
    string sql_script_dir;
    string snowflake_cfg;
    int export_threads;
    int import_threads;
};

// 日志记录函数
void log_message(const string& message, const string& log_path) {
    lock_guard<mutex> guard(log_mutex);
    ofstream log_file(log_path, ios::app);
    if (log_file) {
        time_t now = time(nullptr);
        log_file << "[" << put_time(localtime(&now), "%F %T") << "] " 
                 << message << endl;
    }
}

// 解析配置文件
Config load_config(const string& config_path) {
    ifstream config_file(config_path);
    if (!config_file) throw runtime_error("Config file not found");

    json j;
    config_file >> j;

    return {
        j["hive_jdbc"],
        j["directories"]["hql_output"],
        j["directories"]["parquet_output"],
        j["directories"]["sql_scripts"],
        j["snowflake"]["config_path"],
        j["threads"]["export"],
        j["threads"]["import"]
    };
}

// 导出Hive建表语句
void export_hql(const Config& cfg, const string& log_path) {
    string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
                 "-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";
    system(cmd.c_str());

    ifstream db_file("databases.txt");
    string db;
    while (getline(db_file, db)) {
        cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
              "-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";
        system(cmd.c_str());

        ifstream table_file(db + "_tables.txt");
        string table;
        while (getline(table_file, table)) {
            fs::path dir = fs::path(cfg.hql_output_dir) / db;
            fs::create_directories(dir);
            string hql_path = (dir / (table + ".hql")).string();

            cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
                  "-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | "
                  "awk 'NR>2' | head -n -1 > " + hql_path;
            
            if (system(cmd.c_str()) == 0) {
                log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);
            } else {
                log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);
            }
        }
    }
}

// 导出Parquet数据(线程任务)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {
    while (true) {
        string task;
        {
            lock_guard<mutex> guard(queue_mutex);
            if (tasks.empty()) return;
            task = move(tasks.front());
            tasks.pop();
        }

        size_t pos = task.find('.');
        string db = task.substr(0, pos);
        string table = task.substr(pos + 1);

        fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;
        fs::create_directories(out_dir);

        string cmd = "hive -e \"SET hive.exec.compress.output=false; "
                     "INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' "
                     "STORED AS PARQUET SELECT * FROM " + task + ";\"";
        
        if (system(cmd.c_str()) == 0) {
            log_message("SUCCESS: Exported Parquet for " + task, log_path);
        } else {
            log_message("ERROR: Failed to export Parquet for " + task, log_path);
        }
    }
}

// 多线程导出Parquet
void export_parquet(const Config& cfg, const string& log_path) {
    ifstream db_file("databases.txt");
    queue<string> tasks;
    string db;

    while (getline(db_file, db)) {
        ifstream table_file(db + "_tables.txt");
        string table;
        while (getline(table_file, table)) {
            tasks.push(db + "." + table);
        }
    }

    vector<thread> threads;
    for (int i = 0; i < cfg.export_threads; ++i) {
        threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));
    }

    for (auto& t : threads) t.join();
}

// 执行SnowSQL脚本
void run_snowsql(const Config& cfg, const string& log_path) {
    for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {
        if (entry.path().extension() == ".sql") {
            string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();
            if (system(cmd.c_str()) == 0) {
                log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);
            } else {
                log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);
            }
        }
    }
}

// 导入Parquet到Snowflake(线程任务)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {
    while (true) {
        fs::path task;
        {
            lock_guard<mutex> guard(queue_mutex);
            if (tasks.empty()) return;
            task = move(tasks.front());
            tasks.pop();
        }

        string db = task.parent_path().filename();
        string table = task.stem();

        string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \""
                     "COPY INTO " + db + "." + table + " "
                     "FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " "
                     "FILE_FORMAT = (TYPE = PARQUET);\"";
        
        if (system(cmd.c_str()) == 0) {
            log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);
        } else {
            log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);
        }
    }
}

// 多线程导入Parquet
void import_parquet(const Config& cfg, const string& log_path) {
    queue<fs::path> tasks;
    for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {
        for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {
            tasks.push(table_entry.path());
        }
    }

    vector<thread> threads;
    for (int i = 0; i < cfg.import_threads; ++i) {
        threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));
    }

    for (auto& t : threads) t.join();
}

int main() {
    try {
        // 初始化配置和日志
        Config cfg = load_config("config.json");
        string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";
        fs::create_directories("logs");

        // 执行全流程
        export_hql(cfg, log_path);
        export_parquet(cfg, log_path);
        run_snowsql(cfg, log_path);
        import_parquet(cfg, log_path);

        log_message("ALL OPERATIONS COMPLETED", log_path);
    } catch (const exception& e) {
        cerr << "CRITICAL ERROR: " << e.what() << endl;
        return 1;
    }
    return 0;
}

配置文件示例 (config.json)

{
    "hive_jdbc": "jdbc:hive2://hive-server:10000",
    "directories": {
        "hql_output": "hive_export",
        "parquet_output": "parquet_data",
        "sql_scripts": "sql_scripts"
    },
    "snowflake": {
        "config_path": "~/.snowsql/config"
    },
    "threads": {
        "export": 8,
        "import": 8
    }
}

关键功能说明

  1. HQL导出

    • 使用beeline连接Hive获取所有数据库和表
    • 数据库/表名.hql格式存储建表语句
    • 自动跳过系统表(通过tailawk过滤)
  2. Parquet导出

    • 使用Hive的INSERT OVERWRITE DIRECTORY导出为Parquet格式
    • 多线程处理不同表(线程数由配置控制)
    • 输出路径:parquet_data/数据库/表名/
  3. SnowSQL执行

    • 遍历指定目录的所有.sql文件
    • 使用snowsql -c执行配置文件中的连接
    • 支持认证文件自动加载(需预先配置)
  4. Parquet导入

    • 使用Snowflake的COPY INTO命令
    • 多线程并发导入不同表
    • 自动匹配目录结构与表名
  5. 日志系统

    • 按天分割日志文件(文件名含时间戳)
    • 记录操作类型、状态和时间
    • 线程安全的日志写入
  6. 异常处理

    • 配置文件缺失检测
    • 命令执行状态码检查
    • 目录创建失败处理
    • JSON解析异常捕获

编译与运行

  1. 安装依赖
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
  1. 编译程序
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
  1. 运行程序
./hive2snowflake

注意事项

  1. 需要预先配置:

    • Hive的beeline客户端
    • SnowSQL及认证配置
    • Hive表访问权限
    • Snowflake表结构匹配
  2. 性能调整:

    • 通过config.json调整线程数
    • 大表建议单独处理
    • 可添加重试机制应对网络波动
  3. 安全增强建议:

    • 配置文件加密(如使用jq解密)
    • 敏感信息使用环境变量
    • 添加操作审计日志

网站公告

今日签到

点亮在社区的每一天
去签到