Hive,Oracle,redis同步数据之-将本地数据同步到其他数据库之二

发布于:2025-02-10 ⋅ 阅读:(56) ⋅ 点赞:(0)

前言
在Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一(点我直达
我们已经将其他数据源的数据同步到本地 现在是要将本地的数据同步到其他数据库了。
函数入口方法
在这里插入图片描述

 public void startOut(String prdCode, int modeNum, String taskPrdCode) {
        if (CollectionUtils.isEmpty(jobListOut)) {
            logger.info("DBSync-out配置读取错误或未配置任务");
            return;
        }

        List<JobInfo> targetJobs = filterOutJob(jobListOut, true);
        printOutConfig(taskPrdCode);
        final Configuration config = generateConfiguration(targetJobs);
        final Map<String, Object> paramMap = getParamMap(taskPrdCode);

        CompletableFuture<Void>[] futures = new CompletableFuture[targetJobs.size()];
        for (int index = 0; index < futures.length; index++) {
            JobInfo jobInfo = targetJobs.get(index);
            String logTitle = "[" + codeOut + "]-" + jobInfo.getName() + " ";
            try {
                //系统监听的方式调用
                Map<String, Object> jobMap = Maps.newHashMap();
                jobMap.put("srcDbs", srcDbsOut);
                jobMap.put("destDbs", destDbsOut);
                jobMap.put("jobInfo", jobInfo);
                jobMap.put("logTitle", logTitle);

                //设置多线程方式执行
                futures[index] = CompletableFuture.runAsync(() -> {
                    logger.info("{},执行多线程-----同步到L0", Thread.currentThread().getName());

                    try {
                        String sql = generateSql(config, jobInfo.getName(), paramMap);
                        logger.info("任务{} 生成srcSql: {}", jobInfo.getName(), sql);
                        jobInfo.setSrcSql(sql);
                    } catch (TemplateException | IOException e) {
                        logger.error("任务{} 生成srcSql失败", jobInfo.getName(), e);
                        return;
                    }
                    jobDBSync.executeOut(jobMap);
                }, executor);
            } catch (Exception e) {
                logger.error(logTitle + " run failed", e);
                continue;
            }
        }

        try {
            // 等待所有请求处理完毕
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures);
            // 设置最大超时时间, 默认2小时
            completableFuture.get(kafkaPropertyConfig.getDayInitTimeout(), TimeUnit.HOURS);
            if (unifyPushInfo.isEnabled()) {
                logger.info("数据同步完成,开始发送消息到平台");
                unifyPushUtil.notifyUrl();
                logger.info("消息发送到平台完成");
                logger.info("延时{}s后,发送kafka消息", unifyPushInfo.getNotifyDelay());
                Thread.sleep(unifyPushInfo.getNotifyDelay() * 1000);
            }
            if (kafkaPropertyConfig.isEnabled()) {
                logger.info("数据同步完成,开始发送消息到kafka");
                dayInitMsgHandler.send(null, null, prdCode, modeNum);
                logger.info("消息发送到kafka完成");
            }
        } catch (InterruptedException e) {
            logger.error("同步数据中断异常", e);
        } catch (ExecutionException e) {
            logger.error("同步数据执行异常", e);
        } catch (TimeoutException e) {
            logger.error("同步数据超时", e);
        }
    }

上述重要代码解释
创建一个CompletableFuture数组futures,用于存储每个任务的异步执行结果。
遍历targetJobs,为每个任务创建一个jobMap,包含源数据库、目标数据库、任务信息和日志标题。
使用CompletableFuture.runAsync方法异步执行每个任务,生成SQL语句并调用jobDBSync.executeOut方法执行同步操作。
如果任务执行过程中出现异常,记录错误日志并继续执行下一个任务。
同步策略实现不同的同步方式
在这里插入图片描述
如redis的同步

@Override
    public Map<String, Object> assembleSQLOut(String srcSql, Connection paramConnection, Connection outConnection,
                                              JobInfo jobInfo, String dialect) throws Exception {
        //新建同步时间
        Timestamp create_sync_time = create_sync_time_cache.get();
        //更新同步时间
        Timestamp up_sync_time = up_sync_time_cache.get();

        List<String> columns;

        Timestamp timestamp = null;

        List<Map<String, Object>> redisSync = Lists.newArrayList();

        //插入脚本
        String insertSql = null;
        //更新脚本
        String updateSql = null;

        Map<String, Object> returnMap = Maps.newHashMap();

        Map<String, Map<String, Integer>> numMap = countMap.get() == null ? new HashMap<>() : countMap.get();
        if (numMap.get(jobInfo.getName()) == null) {
            HashMap<String, Integer> countMapIn = Maps.newHashMap();
            countMapIn.put("markNum", 0);
            numMap.put(jobInfo.getName(), countMapIn);
        } else {
            numMap.get(jobInfo.getName()).replace("markNum", 0);
        }

        //String uniqueName = this.generateString(6) + "_" + jobInfo.getName();

        String[] fields = jobInfo.getSourceTableFields().split(",");
        String[] fields_ = jobInfo.getDestTableFields().split(",");
        fields = this.trimArrayItem(fields);
        if (fields.length == 0 && columnMap.get() == null) {
            columns = this.getColumnNameList(paramConnection, jobInfo.getDestTable());
            fields = columns.toArray(fields);
            fields_ = columns.toArray(fields_);
            if (fields != null && fields.length > 0) {
                HashMap columnNameMap = new HashMap();
                columnNameMap.put(jobInfo.getName(), fields);
                columnMap.set(columnNameMap);
            }
        }
        String[] conditionFields = jobInfo.getDestTableCondition().split(",");
//        String destTable = jobInfo.getDestTable();
//        String destTableKey = jobInfo.getDestTableKey();
        conditionFields = this.trimArrayItem(conditionFields);
        QueryWrapper<DBSyncJob> DBSyncJobWrapper = new QueryWrapper<>();
        DBSyncJobWrapper.eq("JOB_NAME", jobInfo.getName());
        List<DBSyncJob> dbSyncJobs = dbSyncJobService.getBaseMapper().selectList(DBSyncJobWrapper);
        if (CollectionUtil.isNotEmpty(dbSyncJobs)) {
            if (!srcSql.contains("where") && !srcSql.contains("WHERE")) {
                srcSql += " where 1=1 ";
            }
        }
        PreparedStatement pst = null;
        ResultSet rs = null;
        long count_insert_add = 0;
        long count_update_add = 0;
        long count_all = 0;
        // resultSet的游标
        Integer countBatch = 0;
        //执行增量,目前只支持全量
        if (StringUtils.isNotBlank(insertSql)) {
            pst = paramConnection.prepareStatement(insertSql);
            rs = pst.executeQuery();
            if (fields.length == 0) {
                int columnCount = rs.getMetaData().getColumnCount();
                fields = new String[columnCount];
                fields_ = new String[columnCount];
                for (int i = 1; i <= columnCount; i++) {
                    String columnName = rs.getMetaData().getColumnName(i);
                    fields[i - 1] = columnName;
                    fields_[i - 1] = columnName.split(".")[0];
                }
            }
            while (rs.next()) {
                Map<String, Object> redisMap = Maps.newHashMap();
                for (int index = 0; index < fields.length; index++) {
                    redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
                    if (StringUtils.equals(fields[index], conditionFields[0])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                    }
                    if (StringUtils.equals(fields[index], conditionFields[1])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                    }
                }
                redisSync.add(redisMap);
                count_insert_add++;
            }
            pst = paramConnection.prepareStatement(updateSql);
            rs = pst.executeQuery();
            //         StringBuffer update_sql = new StringBuffer(16);
            while (rs.next()) {
                Map<String, Object> redisMap = Maps.newHashMap();
                for (int index = 0; index < fields.length; index++) {
                    //update_sql.append(fields_[index]+" = "+this.copyValueFromSourceDb(rs.getObject(fields[index]),dialect)).append(index == (fields.length - 1) ? "" : ",");
                    redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
                    if (StringUtils.equals(fields[index], conditionFields[0])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                    }
                    if (StringUtils.equals(fields[index], conditionFields[1])) {
                        timestamp = getTimestamp(rs, fields[index], jobInfo);
                        up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                    }
                }
                redisSync.add(redisMap);
                count_update_add++;
            }
        } else {//执行全量
            log.info("作业:{} - 获取源库的sql:{}", jobInfo.getName(), srcSql);
            if (rsMap.get() == null || rsMap.get().get(jobInfo.getName()) == null) {
                pst = paramConnection.prepareStatement(srcSql);
                pstMap.set(pst);
                pst.setFetchSize(5000);
                rs = pst.executeQuery();
                if (fields.length == 0) {
                    int columnCount = rs.getMetaData().getColumnCount();
                    fields = new String[columnCount];
                    fields_ = new String[columnCount];
                    for (int i = 1; i <= columnCount; i++) {
                        String columnName = rs.getMetaData().getColumnName(i);
                        fields[i - 1] = columnName;
                        String[] targetFields = columnName.split("[.]");
                        if (targetFields.length == 2) {// 表名.字段名 格式
                            fields_[i - 1] = targetFields[1];
                        } else {
                            fields_[i - 1] = columnName;
                        }
                    }
                    HashMap columnNameMap = new HashMap();
                    columnNameMap.put(jobInfo.getName(), fields);
                    columnMap.set(columnNameMap);
                }
                Map<String, ResultSet> rsMapIn = Maps.newHashMap();
                rsMapIn.put(jobInfo.getName(), rs);
                rsMap.set(rsMapIn);
            } else {
                rs = rsMap.get().get(jobInfo.getName());
                pst = pstMap.get();
                Map<String, String[]> stringMap = columnMap.get();
                if (stringMap != null) {
                    String[] columsNames = stringMap.get(jobInfo.getName());
                    if (columsNames != null) {
                        fields = columsNames;
                        fields_ = new String[columsNames.length];
                        for (int i = 0; i < columsNames.length; i++) {
                            String[] targetFields = columsNames[i].split("[.]");
                            if (targetFields.length == 2) {// 表名.字段名 格式
                                fields_[i] = targetFields[1];
                            } else {
                                fields_[i] = columsNames[i];
                            }
                        }
                    }
                }
            }
//            pst = paramConnection.prepareStatement(srcSql);
//            rs = pst.executeQuery();
            if (numMap.get(jobInfo.getName()).get("rowNum") != null && numMap.get(jobInfo.getName()).get("rowNum") > 0) {
                countBatch = numMap.get(jobInfo.getName()).get("rowNum");
//                rs.absolute(countBatch);
            }
            //        StringBuffer src_sql = new StringBuffer(16);
            while (rs.next()) {
                Map<String, Object> redisMap = Maps.newHashMap();
                for (int index = 0; index < fields.length; index++) {
                    redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
                    if (conditionFields.length > 0) {
                        if (StringUtils.equals(fields[index], conditionFields[0])) {
                            timestamp = getTimestamp(rs, fields[index], jobInfo);
                            create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
                        }
                        if (StringUtils.equals(fields[index], conditionFields[1])) {
                            timestamp = getTimestamp(rs, fields[index], jobInfo);
                            up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
                        }
                    }
                }
                redisSync.add(redisMap);
                count_all++;
                countBatch++;
                if (count_all % maxRead == 0) {
                    numMap.get(jobInfo.getName()).put("rowNum", countBatch);
                    numMap.get(jobInfo.getName()).put("markNum", 1);
                    break;
                }
            }
        }

        if (rs != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
            rs.close();
        }
        if (pst != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
            pst.close();
        }
        if (count_insert_add + count_update_add + count_all > 0) {
            if (numMap.get(jobInfo.getName()).get("markNum") > 0) {
                log.info("作业:{} - 本次处理记录数:{},累计处理记录数:{}", jobInfo.getName(), count_all, countBatch);
                //缓存同步时间
                create_sync_time_cache.set(create_sync_time);
                up_sync_time_cache.set(up_sync_time);
            } else {
                log.info("作业:{} - 最后一次处理记录数:{},累计处理记录数:{}", jobInfo.getName(), count_all, countBatch);
                returnMap.put("createSyncTime", create_sync_time);
                returnMap.put("upSyncTime", up_sync_time);
                create_sync_time_cache.remove();
                up_sync_time_cache.remove();
            }
            countMap.set(numMap);
            returnMap.put("dialect", dialect);
            returnMap.put("sql", redisSync);
            returnMap.put("dbSyncJobs", dbSyncJobs);
            returnMap.put("jobName", jobInfo.getName());
            returnMap.put("destTable", jobInfo.getDestTable());
            returnMap.put("destTableKey", jobInfo.getDestTableKey());
            // 这里简单考虑,仅填写当前批次全量的数量
            returnMap.put("count", count_all);
            return returnMap;
        }
        return new HashMap<>();
    }

执行sql如下
在这里插入图片描述

@Override
    public void executeSQL(Map<String, Object> returnMap, Connection conn) throws SQLException {
        String dialect = (String) returnMap.get("dialect");
        Timestamp create_sync_time = (Timestamp) returnMap.get("createSyncTime");
        Timestamp up_sync_time = (Timestamp) returnMap.get("upSyncTime");
        List<DBSyncJob> dbSyncJobs = (List<DBSyncJob>) returnMap.get("dbSyncJobs");
        String jobName = (String) returnMap.get("jobName");
        log.info("作业:{} - 执行SQL开始...", jobName);
        String destTable = (String) returnMap.get("destTable");
        String destTableKey = (String) returnMap.get("destTableKey");
        HashOperations<String, Object, Object> hashOperations = redisTemplateWithADC.opsForHash();
        long count = 0;
        if (dialect.equals(Constants.TYPE_DB_REDIS)) {
            List<Map<String, Object>> redisSync = (List<Map<String, Object>>) returnMap.get("sql");
            Map<String, byte[]> tempData = new HashMap<>();
            for (Map<String, Object> map : redisSync) {
                map.remove("ignore_rw");//忽略特殊字段 暂时写死字段名称
                String data = JSONObject.toJSONString(map);
                if (StringUtils.isBlank(destTableKey)) {
                    tempData.put(UUID.randomUUID().toString(), data.getBytes());
                } else {
                    tempData.put(getHashKey(map, destTableKey), data.getBytes());
                }
//                log.info("write data from table "+destTable+" to redis:{}", data);
                count++;
            }
            hashOperations.putAll(destTable.toLowerCase(), tempData);
        } else {// todo: redis 支持jdbc协议的连接,确认使用方法
            String sql = (String) returnMap.get("sql");
            Statement pst = conn.createStatement();
            String[] sqlList = sql.split(";");
            for (int index = 0; index < sqlList.length; index++) {
                pst.addBatch(sqlList[index]);
            }
            pst.executeBatch();
            conn.commit();
            pst.close();
            count++;
        }
        //数据库更新本次同步记录
        if (count > 0) {
            DBSyncJob dbSyncJob = CollectionUtil.isNotEmpty(dbSyncJobs) ? dbSyncJobs.get(0) : new DBSyncJob();
            dbSyncJob.setDmCreatedTime(create_sync_time);
            dbSyncJob.setDmUpdatedTime(up_sync_time);
            dbSyncJob.setJobName(jobName);
            dbSyncJobService.saveOrUpdate(dbSyncJob);
        }
    }

其余的HIVE和oracle由于很多 这里就不一一提供了 可关注私信我获取源码。

好了 至此 Hive,Oracle,redis同步数据之-将本地数据同步到其他数据库之二 点点关注不迷路 老铁们!!!!!


网站公告

今日签到

点亮在社区的每一天
去签到