我们从数据库卸数据到csv文件中,再加载到hdfs hive里,然后从csv临时表转换到parquet正式表里。
但如果csv某个字段含有换行符,尽管这个csv字段有双引号括起来了,但Hive还是处理成两行了。查阅了各种文档,都无法解决,于是断定hive是不支持字段里有换行符的csv了。
然后我就想了个办法,先写个脚本把csv中的换行符替换成"\n",
如下:
cat /data/dolphinscheduler/loadOraTable.sh
#!/bin/bash
if [[ $# < 5 ]]
then
echo "usage: $0 oraConnect sql hdfsRoot srcSystem bizDate"
echo "e.g.: $0 \"yonbip/yonbip@10.16.10.69:1521/orcl\" \"select * from bond_contract\" \"/dmp/biz\" \"bip\" \"2025-07-27\""
exit 1
fi
checkRst(){
if [[ $? != 0 ]]
then
echo "--- check failed"
exit 1
else
echo "--- check success"
fi
}
#解析参数
pgConnect=$1
sql=$2
dmpRoot=$3
srcSystem=$4
bizDate=$5
echo "===== got input params:"
echo "pgConnect: $pgConnect"
echo "sql: $sql"
echo "dmpRoot: $dmpRoot"
echo "srcSystem: $srcSystem"
echo "bizDate: $bizDate"
echo "===== parsed params:"
tableName=$(echo $sql | awk -F ' from ' '{print $2}' |awk -F ' ' '{print $1}')
if [ -z $tableName ]; then
tableName=$(echo $sql | awk -F ' FROM ' '{print $2}' |awk -F ' ' '{print $1}')
fi
if [[ $tableName == *.* ]]
then
tableName=$(echo $tableName | awk -F '.' '{print $2}')
fi
echo "tableName: $tableName"
echo "===== end of params"
echo "1.尝试删除残留文件"
rm -f ~/${tableName}.csv
echo "2.1 导出数据到csv文件"
pgCmd="python3 /data/dolphinscheduler/oraQueryToCsv.py \"${pgConnect}\" ${tableName} \"$sql\""
echo "command: $pgCmd"
python3 /data/dolphinscheduler/oraQueryToCsv.py "${pgConnect}" ${tableName} "$sql"
checkRst
export dCount=$(awk 'END {print NR}' ~/${tableName}.csv)
echo "Lines in file ~/${tableName}.csv: $dCount"
if (( $dCount < 1 )); then
rm -f ~/${tableName}.csv
echo "No data got from database, no need to go ahead."
exit 61
fi
echo "2.2 9999-12-31去掉时间"
sed -i 's/9999-12-31 23:59:59/9999-12-31/g' ~/${tableName}.csv
echo "2.3 字段中的换行符替换成\n"
python3 /data/dolphinscheduler/removeInColNewLine.py ~/${tableName}.csv
echo "3.尝试清除hdfs旧文件"
hdfs dfs -rm -r ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}
echo "4.尝试创建hdfs文件目录"
hdfs dfs -mkdir -p ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}
echo "5.上传本地文件到hdfs"
hdfs dfs -put ~/${tableName}.csv ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}/
checkRst
echo "6.清除本地临时文件"
rm -f ~/${tableName}.csv
cat /data/dolphinscheduler/removeInColNewLine.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
#处理csv文件中换行符等特殊字符(\r\n,\n,\r,\\)
#python csv_handler.py filepath
import os
import sys
import csv
import codecs
import time
filename = sys.argv[1]
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']开始处理')
with open(filename, 'r') as srcFile, open(filename + '.tmp', 'w') as dstFile:
#读取csv文件的每一行
fileReader = csv.reader(srcFile, dialect='excel')
fileWriter = csv.writer(dstFile, dialect='excel')
colSet = set()
for d in list(fileReader):
for ii,dd in enumerate(d):
if dd.find('\r\n') != -1:
dd = dd.replace('\r\n', '\\\\n')
colSet.add(ii)
if dd.find('\n') != -1:
colSet.add(ii)
dd = dd.replace('\n', '\\\\n')
if dd.find('\r') != -1:
colSet.add(ii)
dd = dd.replace('\r', '\\\\n')
# if dd.find('\\') != -1:
# dd = dd.replace('\\', '')
d[ii] = dd
fileWriter.writerow(d)
if len(colSet) > 0:
print('new line cols:' + str(colSet))
dstFile.close()
srcFile.close()
#删除原文件,.tmp文件重命名为原文件
os.remove(filename)
os.rename(filename + '.tmp', filename)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']处理完成')
上面这个脚本在替换列中含有的换行符时,也记录下列号了,最后如果有列号,出打印出一行:
new line cols:{x,y}
如:
Lines in file ~/org_corp.csv: 534
2.2 9999-12-31去掉时间
2.3 字段中的换行符替换成\n
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]开始处理
new line cols:{2,7}
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]处理完成
3.尝试清除hdfs旧文件
Deleted /dmp/other/tmp/bip/org_corp/2025-08-13
4.尝试创建hdfs文件目录
5.上传本地文件到hdfs
--- check success
6.清除本地临时文件
然后在任务最后,从输出信息里滤出“new line cols:{2,7}”这一行,解析出列号list:2,7,输出到参数newLineColNums里面。
ret=`/data/dolphinscheduler/loadOraTable.sh "${dbConnect}" "select ${slctColums} from ${SRC_DB}.${tableName} t " "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"`
echo $ret
colsNum=$(echo $ret |grep "new line cols:" | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}')
if (( -n $colsNum ))
then
echo "#{setValue(newLineColNums=${colsNum})}"
fi
echo "newLineColNums: $colsNum"
接着先把csv加载到临时表中,然后,从临时表抽取数据时,如果newLineColNums参数不空,则把对应的列加上replace函数,把\n替换回换行符。
任务脚本:
if [ -n "${newLineColNums}" ]
then
nlRet=`sh /data/dolphinscheduler/restoreNLFunc.sh "${newLineColNums}" "${slctColums}"`
newLineCols=`echo $nlRet | awk -F 'SQL:' '{print $2}'`
echo "parsed newLineCols: $newLineCols"
else
newLineCols="${slctColums}"
fi
hive -e "insert overwrite table ${DMP_DB}.ods_${srcSystem}_${tableName} \
SELECT ${newLineCols} \
from ${DMP_DB}.tmp_${srcSystem}_${tableName} t \
where ${tableId} != '' and ${tableId} is not null;"
基中,slctColums是列名,如"t.id, t.code,t.name, t.memo"
newLineColNums是前面任务输出的参数,如"1,2"
cat /data/dolphinscheduler/restoreNLFunc.sh
if [[ $# < 2 ]]
then
echo "usage: $0 colNums colsName"
echo "e.g.: $0 \"new line cols:{1,2}\" \"id,is_set,zl_office_id\""
exit 1
fi
#colsNum=`echo $1 | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}'`
colsNum=$1
colNames=$2
colNmArr=(${colNames//,/ })
echo "${colNmArr[@]}"
if [ -n "$colsNum" ]
then
echo "有列含有换行符:$colsNum"
colNoArr=(${colsNum//,/ })
for coln in ${colNoArr[@]}
do
echo "col: ${colNmArr[coln]}"
colNmArr[coln]="replace(${colNmArr[coln]},'\\\\n', '\\n')"
echo "col: ${colNmArr[coln]}"
done
fi
colNames=$(IFS=,; echo "${colNmArr[*]}")
echo "SQL:$colNames"
如:
sh restoreNLFunc.sh "1,2" "col1,col2,col3"
col1 col2 col3
有列含有换行符:1,2
col: col2
col: replace(col2,'\\\\n','\\n')
col: col3
col: replace(col3,'\\\\n','\\n')
SQL:col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n')
最后再用如下语句转到Parquet格式的正式表中,就能看到正常的换行了。
insert into table1 select col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n') from tmp_table1