前言
在Hive,Oracle,redis同步数据之-从其他数据源同步到本地数据库之一(点我直达)
我们已经将其他数据源的数据同步到本地 现在是要将本地的数据同步到其他数据库了。
函数入口方法
public void startOut(String prdCode, int modeNum, String taskPrdCode) {
if (CollectionUtils.isEmpty(jobListOut)) {
logger.info("DBSync-out配置读取错误或未配置任务");
return;
}
List<JobInfo> targetJobs = filterOutJob(jobListOut, true);
printOutConfig(taskPrdCode);
final Configuration config = generateConfiguration(targetJobs);
final Map<String, Object> paramMap = getParamMap(taskPrdCode);
CompletableFuture<Void>[] futures = new CompletableFuture[targetJobs.size()];
for (int index = 0; index < futures.length; index++) {
JobInfo jobInfo = targetJobs.get(index);
String logTitle = "[" + codeOut + "]-" + jobInfo.getName() + " ";
try {
//系统监听的方式调用
Map<String, Object> jobMap = Maps.newHashMap();
jobMap.put("srcDbs", srcDbsOut);
jobMap.put("destDbs", destDbsOut);
jobMap.put("jobInfo", jobInfo);
jobMap.put("logTitle", logTitle);
//设置多线程方式执行
futures[index] = CompletableFuture.runAsync(() -> {
logger.info("{},执行多线程-----同步到L0", Thread.currentThread().getName());
try {
String sql = generateSql(config, jobInfo.getName(), paramMap);
logger.info("任务{} 生成srcSql: {}", jobInfo.getName(), sql);
jobInfo.setSrcSql(sql);
} catch (TemplateException | IOException e) {
logger.error("任务{} 生成srcSql失败", jobInfo.getName(), e);
return;
}
jobDBSync.executeOut(jobMap);
}, executor);
} catch (Exception e) {
logger.error(logTitle + " run failed", e);
continue;
}
}
try {
// 等待所有请求处理完毕
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures);
// 设置最大超时时间, 默认2小时
completableFuture.get(kafkaPropertyConfig.getDayInitTimeout(), TimeUnit.HOURS);
if (unifyPushInfo.isEnabled()) {
logger.info("数据同步完成,开始发送消息到平台");
unifyPushUtil.notifyUrl();
logger.info("消息发送到平台完成");
logger.info("延时{}s后,发送kafka消息", unifyPushInfo.getNotifyDelay());
Thread.sleep(unifyPushInfo.getNotifyDelay() * 1000);
}
if (kafkaPropertyConfig.isEnabled()) {
logger.info("数据同步完成,开始发送消息到kafka");
dayInitMsgHandler.send(null, null, prdCode, modeNum);
logger.info("消息发送到kafka完成");
}
} catch (InterruptedException e) {
logger.error("同步数据中断异常", e);
} catch (ExecutionException e) {
logger.error("同步数据执行异常", e);
} catch (TimeoutException e) {
logger.error("同步数据超时", e);
}
}
上述重要代码解释
创建一个CompletableFuture数组futures,用于存储每个任务的异步执行结果。
遍历targetJobs,为每个任务创建一个jobMap,包含源数据库、目标数据库、任务信息和日志标题。
使用CompletableFuture.runAsync方法异步执行每个任务,生成SQL语句并调用jobDBSync.executeOut方法执行同步操作。
如果任务执行过程中出现异常,记录错误日志并继续执行下一个任务。
同步策略实现不同的同步方式
如redis的同步
@Override
public Map<String, Object> assembleSQLOut(String srcSql, Connection paramConnection, Connection outConnection,
JobInfo jobInfo, String dialect) throws Exception {
//新建同步时间
Timestamp create_sync_time = create_sync_time_cache.get();
//更新同步时间
Timestamp up_sync_time = up_sync_time_cache.get();
List<String> columns;
Timestamp timestamp = null;
List<Map<String, Object>> redisSync = Lists.newArrayList();
//插入脚本
String insertSql = null;
//更新脚本
String updateSql = null;
Map<String, Object> returnMap = Maps.newHashMap();
Map<String, Map<String, Integer>> numMap = countMap.get() == null ? new HashMap<>() : countMap.get();
if (numMap.get(jobInfo.getName()) == null) {
HashMap<String, Integer> countMapIn = Maps.newHashMap();
countMapIn.put("markNum", 0);
numMap.put(jobInfo.getName(), countMapIn);
} else {
numMap.get(jobInfo.getName()).replace("markNum", 0);
}
//String uniqueName = this.generateString(6) + "_" + jobInfo.getName();
String[] fields = jobInfo.getSourceTableFields().split(",");
String[] fields_ = jobInfo.getDestTableFields().split(",");
fields = this.trimArrayItem(fields);
if (fields.length == 0 && columnMap.get() == null) {
columns = this.getColumnNameList(paramConnection, jobInfo.getDestTable());
fields = columns.toArray(fields);
fields_ = columns.toArray(fields_);
if (fields != null && fields.length > 0) {
HashMap columnNameMap = new HashMap();
columnNameMap.put(jobInfo.getName(), fields);
columnMap.set(columnNameMap);
}
}
String[] conditionFields = jobInfo.getDestTableCondition().split(",");
// String destTable = jobInfo.getDestTable();
// String destTableKey = jobInfo.getDestTableKey();
conditionFields = this.trimArrayItem(conditionFields);
QueryWrapper<DBSyncJob> DBSyncJobWrapper = new QueryWrapper<>();
DBSyncJobWrapper.eq("JOB_NAME", jobInfo.getName());
List<DBSyncJob> dbSyncJobs = dbSyncJobService.getBaseMapper().selectList(DBSyncJobWrapper);
if (CollectionUtil.isNotEmpty(dbSyncJobs)) {
if (!srcSql.contains("where") && !srcSql.contains("WHERE")) {
srcSql += " where 1=1 ";
}
}
PreparedStatement pst = null;
ResultSet rs = null;
long count_insert_add = 0;
long count_update_add = 0;
long count_all = 0;
// resultSet的游标
Integer countBatch = 0;
//执行增量,目前只支持全量
if (StringUtils.isNotBlank(insertSql)) {
pst = paramConnection.prepareStatement(insertSql);
rs = pst.executeQuery();
if (fields.length == 0) {
int columnCount = rs.getMetaData().getColumnCount();
fields = new String[columnCount];
fields_ = new String[columnCount];
for (int i = 1; i <= columnCount; i++) {
String columnName = rs.getMetaData().getColumnName(i);
fields[i - 1] = columnName;
fields_[i - 1] = columnName.split(".")[0];
}
}
while (rs.next()) {
Map<String, Object> redisMap = Maps.newHashMap();
for (int index = 0; index < fields.length; index++) {
redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
redisSync.add(redisMap);
count_insert_add++;
}
pst = paramConnection.prepareStatement(updateSql);
rs = pst.executeQuery();
// StringBuffer update_sql = new StringBuffer(16);
while (rs.next()) {
Map<String, Object> redisMap = Maps.newHashMap();
for (int index = 0; index < fields.length; index++) {
//update_sql.append(fields_[index]+" = "+this.copyValueFromSourceDb(rs.getObject(fields[index]),dialect)).append(index == (fields.length - 1) ? "" : ",");
redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
redisSync.add(redisMap);
count_update_add++;
}
} else {//执行全量
log.info("作业:{} - 获取源库的sql:{}", jobInfo.getName(), srcSql);
if (rsMap.get() == null || rsMap.get().get(jobInfo.getName()) == null) {
pst = paramConnection.prepareStatement(srcSql);
pstMap.set(pst);
pst.setFetchSize(5000);
rs = pst.executeQuery();
if (fields.length == 0) {
int columnCount = rs.getMetaData().getColumnCount();
fields = new String[columnCount];
fields_ = new String[columnCount];
for (int i = 1; i <= columnCount; i++) {
String columnName = rs.getMetaData().getColumnName(i);
fields[i - 1] = columnName;
String[] targetFields = columnName.split("[.]");
if (targetFields.length == 2) {// 表名.字段名 格式
fields_[i - 1] = targetFields[1];
} else {
fields_[i - 1] = columnName;
}
}
HashMap columnNameMap = new HashMap();
columnNameMap.put(jobInfo.getName(), fields);
columnMap.set(columnNameMap);
}
Map<String, ResultSet> rsMapIn = Maps.newHashMap();
rsMapIn.put(jobInfo.getName(), rs);
rsMap.set(rsMapIn);
} else {
rs = rsMap.get().get(jobInfo.getName());
pst = pstMap.get();
Map<String, String[]> stringMap = columnMap.get();
if (stringMap != null) {
String[] columsNames = stringMap.get(jobInfo.getName());
if (columsNames != null) {
fields = columsNames;
fields_ = new String[columsNames.length];
for (int i = 0; i < columsNames.length; i++) {
String[] targetFields = columsNames[i].split("[.]");
if (targetFields.length == 2) {// 表名.字段名 格式
fields_[i] = targetFields[1];
} else {
fields_[i] = columsNames[i];
}
}
}
}
}
// pst = paramConnection.prepareStatement(srcSql);
// rs = pst.executeQuery();
if (numMap.get(jobInfo.getName()).get("rowNum") != null && numMap.get(jobInfo.getName()).get("rowNum") > 0) {
countBatch = numMap.get(jobInfo.getName()).get("rowNum");
// rs.absolute(countBatch);
}
// StringBuffer src_sql = new StringBuffer(16);
while (rs.next()) {
Map<String, Object> redisMap = Maps.newHashMap();
for (int index = 0; index < fields.length; index++) {
redisMap.put(fields_[index], this.copyValueFromSourceDb(getObject(rs, fields[index], jobInfo), dialect));
if (conditionFields.length > 0) {
if (StringUtils.equals(fields[index], conditionFields[0])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
create_sync_time = create_sync_time == null ? timestamp : timestamp.before(create_sync_time) ? create_sync_time : timestamp;
}
if (StringUtils.equals(fields[index], conditionFields[1])) {
timestamp = getTimestamp(rs, fields[index], jobInfo);
up_sync_time = up_sync_time == null ? timestamp : timestamp.before(up_sync_time) ? up_sync_time : timestamp;
}
}
}
redisSync.add(redisMap);
count_all++;
countBatch++;
if (count_all % maxRead == 0) {
numMap.get(jobInfo.getName()).put("rowNum", countBatch);
numMap.get(jobInfo.getName()).put("markNum", 1);
break;
}
}
}
if (rs != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
rs.close();
}
if (pst != null && numMap.get(jobInfo.getName()).get("markNum") == 0) {
pst.close();
}
if (count_insert_add + count_update_add + count_all > 0) {
if (numMap.get(jobInfo.getName()).get("markNum") > 0) {
log.info("作业:{} - 本次处理记录数:{},累计处理记录数:{}", jobInfo.getName(), count_all, countBatch);
//缓存同步时间
create_sync_time_cache.set(create_sync_time);
up_sync_time_cache.set(up_sync_time);
} else {
log.info("作业:{} - 最后一次处理记录数:{},累计处理记录数:{}", jobInfo.getName(), count_all, countBatch);
returnMap.put("createSyncTime", create_sync_time);
returnMap.put("upSyncTime", up_sync_time);
create_sync_time_cache.remove();
up_sync_time_cache.remove();
}
countMap.set(numMap);
returnMap.put("dialect", dialect);
returnMap.put("sql", redisSync);
returnMap.put("dbSyncJobs", dbSyncJobs);
returnMap.put("jobName", jobInfo.getName());
returnMap.put("destTable", jobInfo.getDestTable());
returnMap.put("destTableKey", jobInfo.getDestTableKey());
// 这里简单考虑,仅填写当前批次全量的数量
returnMap.put("count", count_all);
return returnMap;
}
return new HashMap<>();
}
执行sql如下
@Override
public void executeSQL(Map<String, Object> returnMap, Connection conn) throws SQLException {
String dialect = (String) returnMap.get("dialect");
Timestamp create_sync_time = (Timestamp) returnMap.get("createSyncTime");
Timestamp up_sync_time = (Timestamp) returnMap.get("upSyncTime");
List<DBSyncJob> dbSyncJobs = (List<DBSyncJob>) returnMap.get("dbSyncJobs");
String jobName = (String) returnMap.get("jobName");
log.info("作业:{} - 执行SQL开始...", jobName);
String destTable = (String) returnMap.get("destTable");
String destTableKey = (String) returnMap.get("destTableKey");
HashOperations<String, Object, Object> hashOperations = redisTemplateWithADC.opsForHash();
long count = 0;
if (dialect.equals(Constants.TYPE_DB_REDIS)) {
List<Map<String, Object>> redisSync = (List<Map<String, Object>>) returnMap.get("sql");
Map<String, byte[]> tempData = new HashMap<>();
for (Map<String, Object> map : redisSync) {
map.remove("ignore_rw");//忽略特殊字段 暂时写死字段名称
String data = JSONObject.toJSONString(map);
if (StringUtils.isBlank(destTableKey)) {
tempData.put(UUID.randomUUID().toString(), data.getBytes());
} else {
tempData.put(getHashKey(map, destTableKey), data.getBytes());
}
// log.info("write data from table "+destTable+" to redis:{}", data);
count++;
}
hashOperations.putAll(destTable.toLowerCase(), tempData);
} else {// todo: redis 支持jdbc协议的连接,确认使用方法
String sql = (String) returnMap.get("sql");
Statement pst = conn.createStatement();
String[] sqlList = sql.split(";");
for (int index = 0; index < sqlList.length; index++) {
pst.addBatch(sqlList[index]);
}
pst.executeBatch();
conn.commit();
pst.close();
count++;
}
//数据库更新本次同步记录
if (count > 0) {
DBSyncJob dbSyncJob = CollectionUtil.isNotEmpty(dbSyncJobs) ? dbSyncJobs.get(0) : new DBSyncJob();
dbSyncJob.setDmCreatedTime(create_sync_time);
dbSyncJob.setDmUpdatedTime(up_sync_time);
dbSyncJob.setJobName(jobName);
dbSyncJobService.saveOrUpdate(dbSyncJob);
}
}
其余的HIVE和oracle由于很多 这里就不一一提供了 可关注私信我获取源码。
好了 至此 Hive,Oracle,redis同步数据之-将本地数据同步到其他数据库之二 点点关注不迷路 老铁们!!!!!