hive加载csv中字段含有换行符的处理方法

发布于:2025-08-15 ⋅ 阅读:(11) ⋅ 点赞:(0)

我们从数据库卸数据到csv文件中,再加载到hdfs hive里,然后从csv临时表转换到parquet正式表里。
但如果csv某个字段含有换行符,尽管这个csv字段有双引号括起来了,但Hive还是处理成两行了。查阅了各种文档,都无法解决,于是断定hive是不支持字段里有换行符的csv了。
然后我就想了个办法,先写个脚本把csv中的换行符替换成"\n",
如下:

cat /data/dolphinscheduler/loadOraTable.sh
#!/bin/bash
if [[ $# < 5 ]]
then
  echo "usage: $0 oraConnect sql hdfsRoot srcSystem bizDate"
  echo "e.g.: $0  \"yonbip/yonbip@10.16.10.69:1521/orcl\" \"select * from bond_contract\" \"/dmp/biz\" \"bip\" \"2025-07-27\""
  exit 1
fi
checkRst(){
  if [[ $? != 0 ]]
  then
    echo "--- check failed"
    exit 1
  else
    echo "--- check success"
  fi
}
#解析参数
pgConnect=$1
sql=$2
dmpRoot=$3
srcSystem=$4
bizDate=$5
echo "===== got input params:"
echo "pgConnect: $pgConnect"
echo "sql: $sql"
echo "dmpRoot: $dmpRoot"
echo "srcSystem: $srcSystem"
echo "bizDate: $bizDate"
echo "===== parsed params:"
tableName=$(echo $sql | awk -F ' from ' '{print $2}' |awk -F ' ' '{print $1}')
if [ -z $tableName ]; then
  tableName=$(echo $sql | awk -F ' FROM ' '{print $2}' |awk -F ' ' '{print $1}')
fi
if [[ $tableName == *.* ]]
then
  tableName=$(echo $tableName | awk -F '.' '{print $2}')
fi
echo "tableName: $tableName"
echo "===== end of params"

echo "1.尝试删除残留文件"
rm -f ~/${tableName}.csv

echo "2.1 导出数据到csv文件"
pgCmd="python3 /data/dolphinscheduler/oraQueryToCsv.py \"${pgConnect}\" ${tableName} \"$sql\""
echo "command: $pgCmd"
python3 /data/dolphinscheduler/oraQueryToCsv.py "${pgConnect}" ${tableName} "$sql"
checkRst

export dCount=$(awk 'END {print NR}' ~/${tableName}.csv)
echo "Lines in file ~/${tableName}.csv: $dCount"
if (( $dCount < 1 )); then
  rm -f ~/${tableName}.csv
  echo "No data got from database, no need to go ahead."
  exit 61
fi

echo "2.2 9999-12-31去掉时间"
sed -i 's/9999-12-31 23:59:59/9999-12-31/g' ~/${tableName}.csv

echo "2.3 字段中的换行符替换成\n"
python3 /data/dolphinscheduler/removeInColNewLine.py ~/${tableName}.csv

echo "3.尝试清除hdfs旧文件"
hdfs dfs -rm -r ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}

echo "4.尝试创建hdfs文件目录"
hdfs dfs -mkdir -p ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}

echo "5.上传本地文件到hdfs"
hdfs dfs -put ~/${tableName}.csv ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}/
checkRst

echo "6.清除本地临时文件"
rm -f ~/${tableName}.csv
cat /data/dolphinscheduler/removeInColNewLine.py
#!/usr/bin/python
# -*- coding:utf-8 -*-

#处理csv文件中换行符等特殊字符(\r\n,\n,\r,\\)
#python csv_handler.py filepath

import os
import sys
import csv
import codecs
import time

filename = sys.argv[1]

print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']开始处理')

with open(filename, 'r') as srcFile, open(filename + '.tmp', 'w') as dstFile:
    #读取csv文件的每一行
    fileReader = csv.reader(srcFile, dialect='excel')
    fileWriter = csv.writer(dstFile, dialect='excel')
    colSet = set()
    for d in list(fileReader):
        for ii,dd in enumerate(d):
            if dd.find('\r\n') != -1:
                dd = dd.replace('\r\n', '\\\\n')
                colSet.add(ii)
            if dd.find('\n') != -1:
                colSet.add(ii)
                dd = dd.replace('\n', '\\\\n')
            if dd.find('\r') != -1:
                colSet.add(ii)
                dd = dd.replace('\r', '\\\\n')
#            if dd.find('\\') != -1:
#                dd = dd.replace('\\', '')
            d[ii] = dd
        fileWriter.writerow(d)
    if len(colSet) > 0:
        print('new line cols:' + str(colSet))
    dstFile.close()
    srcFile.close()

#删除原文件,.tmp文件重命名为原文件
os.remove(filename)
os.rename(filename + '.tmp', filename)

print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']处理完成')

上面这个脚本在替换列中含有的换行符时,也记录下列号了,最后如果有列号,出打印出一行:

new line cols:{x,y}

如:

Lines in file ~/org_corp.csv: 534
2.2 9999-12-31去掉时间
2.3 字段中的换行符替换成\n
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]开始处理
new line cols:{2,7}
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]处理完成
3.尝试清除hdfs旧文件
Deleted /dmp/other/tmp/bip/org_corp/2025-08-13
4.尝试创建hdfs文件目录
5.上传本地文件到hdfs
--- check success
6.清除本地临时文件

然后在任务最后,从输出信息里滤出“new line cols:{2,7}”这一行,解析出列号list:2,7,输出到参数newLineColNums里面。

ret=`/data/dolphinscheduler/loadOraTable.sh "${dbConnect}" "select ${slctColums} from ${SRC_DB}.${tableName} t " "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"`
echo $ret
colsNum=$(echo $ret |grep "new line cols:" | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}')
if (( -n $colsNum ))
then
  echo "#{setValue(newLineColNums=${colsNum})}"
fi
echo "newLineColNums: $colsNum"

接着先把csv加载到临时表中,然后,从临时表抽取数据时,如果newLineColNums参数不空,则把对应的列加上replace函数,把\n替换回换行符。
任务脚本:

if [ -n "${newLineColNums}" ]
then
  nlRet=`sh /data/dolphinscheduler/restoreNLFunc.sh "${newLineColNums}" "${slctColums}"`
  newLineCols=`echo $nlRet | awk -F 'SQL:' '{print $2}'`
  echo "parsed newLineCols: $newLineCols"
else
  newLineCols="${slctColums}"
fi
hive -e "insert overwrite table ${DMP_DB}.ods_${srcSystem}_${tableName} \
SELECT ${newLineCols} \
from ${DMP_DB}.tmp_${srcSystem}_${tableName} t \
where ${tableId} != '' and ${tableId} is not null;"

基中,slctColums是列名,如"t.id, t.code,t.name, t.memo"
newLineColNums是前面任务输出的参数,如"1,2"

cat /data/dolphinscheduler/restoreNLFunc.sh
if [[ $# < 2 ]]
then
  echo "usage: $0 colNums colsName"
  echo "e.g.: $0 \"new line cols:{1,2}\" \"id,is_set,zl_office_id\""
  exit 1
fi

#colsNum=`echo $1 | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}'`
colsNum=$1
colNames=$2
colNmArr=(${colNames//,/ })
echo "${colNmArr[@]}"

if [ -n "$colsNum" ]
then
  echo "有列含有换行符:$colsNum"
  colNoArr=(${colsNum//,/ })
  for coln in ${colNoArr[@]}
  do
    echo "col: ${colNmArr[coln]}"
    colNmArr[coln]="replace(${colNmArr[coln]},'\\\\n', '\\n')"
    echo "col: ${colNmArr[coln]}"
  done
fi
colNames=$(IFS=,; echo "${colNmArr[*]}")
echo "SQL:$colNames"

如:

sh restoreNLFunc.sh "1,2" "col1,col2,col3"
col1 col2 col3
有列含有换行符:1,2
col: col2
col: replace(col2,'\\\\n','\\n')
col: col3
col: replace(col3,'\\\\n','\\n')
SQL:col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n')

最后再用如下语句转到Parquet格式的正式表中,就能看到正常的换行了。

insert into table1 select col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n') from tmp_table1

网站公告

今日签到

点亮在社区的每一天
去签到