当前位置: 首页 > news >正文

hive加载csv中字段含有换行符的处理方法

我们从数据库卸数据到csv文件中,再加载到hdfs hive里,然后从csv临时表转换到parquet正式表里。
但如果csv某个字段含有换行符,尽管这个csv字段有双引号括起来了,但Hive还是处理成两行了。查阅了各种文档,都无法解决,于是断定hive是不支持字段里有换行符的csv了。
然后我就想了个办法,先写个脚本把csv中的换行符替换成"\n",
如下:

cat /data/dolphinscheduler/loadOraTable.sh
#!/bin/bash
if [[ $# < 5 ]]
thenecho "usage: $0 oraConnect sql hdfsRoot srcSystem bizDate"echo "e.g.: $0  \"yonbip/yonbip@10.16.10.69:1521/orcl\" \"select * from bond_contract\" \"/dmp/biz\" \"bip\" \"2025-07-27\""exit 1
fi
checkRst(){if [[ $? != 0 ]]thenecho "--- check failed"exit 1elseecho "--- check success"fi
}
#解析参数
pgConnect=$1
sql=$2
dmpRoot=$3
srcSystem=$4
bizDate=$5
echo "===== got input params:"
echo "pgConnect: $pgConnect"
echo "sql: $sql"
echo "dmpRoot: $dmpRoot"
echo "srcSystem: $srcSystem"
echo "bizDate: $bizDate"
echo "===== parsed params:"
tableName=$(echo $sql | awk -F ' from ' '{print $2}' |awk -F ' ' '{print $1}')
if [ -z $tableName ]; thentableName=$(echo $sql | awk -F ' FROM ' '{print $2}' |awk -F ' ' '{print $1}')
fi
if [[ $tableName == *.* ]]
thentableName=$(echo $tableName | awk -F '.' '{print $2}')
fi
echo "tableName: $tableName"
echo "===== end of params"echo "1.尝试删除残留文件"
rm -f ~/${tableName}.csvecho "2.1 导出数据到csv文件"
pgCmd="python3 /data/dolphinscheduler/oraQueryToCsv.py \"${pgConnect}\" ${tableName} \"$sql\""
echo "command: $pgCmd"
python3 /data/dolphinscheduler/oraQueryToCsv.py "${pgConnect}" ${tableName} "$sql"
checkRstexport dCount=$(awk 'END {print NR}' ~/${tableName}.csv)
echo "Lines in file ~/${tableName}.csv: $dCount"
if (( $dCount < 1 )); thenrm -f ~/${tableName}.csvecho "No data got from database, no need to go ahead."exit 61
fiecho "2.2 9999-12-31去掉时间"
sed -i 's/9999-12-31 23:59:59/9999-12-31/g' ~/${tableName}.csvecho "2.3 字段中的换行符替换成\n"
python3 /data/dolphinscheduler/removeInColNewLine.py ~/${tableName}.csvecho "3.尝试清除hdfs旧文件"
hdfs dfs -rm -r ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}echo "4.尝试创建hdfs文件目录"
hdfs dfs -mkdir -p ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}echo "5.上传本地文件到hdfs"
hdfs dfs -put ~/${tableName}.csv ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}/
checkRstecho "6.清除本地临时文件"
rm -f ~/${tableName}.csv
cat /data/dolphinscheduler/removeInColNewLine.py
#!/usr/bin/python
# -*- coding:utf-8 -*-#处理csv文件中换行符等特殊字符(\r\n,\n,\r,\\)
#python csv_handler.py filepathimport os
import sys
import csv
import codecs
import timefilename = sys.argv[1]print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']开始处理')with open(filename, 'r') as srcFile, open(filename + '.tmp', 'w') as dstFile:#读取csv文件的每一行fileReader = csv.reader(srcFile, dialect='excel')fileWriter = csv.writer(dstFile, dialect='excel')colSet = set()for d in list(fileReader):for ii,dd in enumerate(d):if dd.find('\r\n') != -1:dd = dd.replace('\r\n', '\\\\n')colSet.add(ii)if dd.find('\n') != -1:colSet.add(ii)dd = dd.replace('\n', '\\\\n')if dd.find('\r') != -1:colSet.add(ii)dd = dd.replace('\r', '\\\\n')
#            if dd.find('\\') != -1:
#                dd = dd.replace('\\', '')d[ii] = ddfileWriter.writerow(d)if len(colSet) > 0:print('new line cols:' + str(colSet))dstFile.close()srcFile.close()#删除原文件,.tmp文件重命名为原文件
os.remove(filename)
os.rename(filename + '.tmp', filename)print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']处理完成')

上面这个脚本在替换列中含有的换行符时,也记录下列号了,最后如果有列号,出打印出一行:

new line cols:{x,y}

如:

Lines in file ~/org_corp.csv: 534
2.2 9999-12-31去掉时间
2.3 字段中的换行符替换成\n
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]开始处理
new line cols:{2,7}
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]处理完成
3.尝试清除hdfs旧文件
Deleted /dmp/other/tmp/bip/org_corp/2025-08-13
4.尝试创建hdfs文件目录
5.上传本地文件到hdfs
--- check success
6.清除本地临时文件

然后在任务最后,从输出信息里滤出“new line cols:{2,7}”这一行,解析出列号list,输出到参数newLineColNums里面。

ret=`/data/dolphinscheduler/loadOraTable.sh "${dbConnect}" "select ${slctColums} from ${SRC_DB}.${tableName} t " "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"`
echo $ret
colsNum=$(echo $ret |grep "new line cols:" | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}')
if (( -n $colsNum ))
thenecho "#{setValue(newLineColNums=${colsNum})}"
fi
echo "newLineColNums: $colsNum"

加载到临时表中,从临时表抽取数据时,如果newLineColNums参数不空,则把对应的列加上replace函数,把\n替换回换行符。
任务脚本:

if [ -n "${newLineColNums}" ]
thennlRet=`sh /data/dolphinscheduler/restoreNLFunc.sh "${newLineColNums}" "${slctColums}"`newLineCols=`echo $nlRet | awk -F 'SQL:' '{print $2}'`echo "parsed newLineCols: $newLineCols"
elsenewLineCols="${slctColums}"
fi
hive -e "insert overwrite table ${DMP_DB}.ods_${srcSystem}_${tableName} \
SELECT ${newLineCols} \
from ${DMP_DB}.tmp_${srcSystem}_${tableName} t \
where ${tableId} != '' and ${tableId} is not null;"

基中,slctColums是列名,如"t.id, t.code,t.name, t.memo"
newLineColNums是前面任务输出的参数,如"2,3"

cat /data/dolphinscheduler/restoreNLFunc.sh
if [[ $# < 2 ]]
thenecho "usage: $0 colNums colsName"echo "e.g.: $0 \"new line cols:{1,2}\" \"id,is_set,zl_office_id\""exit 1
fi#colsNum=`echo $1 | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}'`
colsNum=$1
colNames=$2
colNmArr=(${colNames//,/ })
echo "${colNmArr[@]}"if [ -n "$colsNum" ]
thenecho "有列含有换行符:$colsNum"colNoArr=(${colsNum//,/ })for coln in ${colNoArr[@]}doecho "col: ${colNmArr[coln]}"colNmArr[coln]="replace(${colNmArr[coln]},'\\\\n', '\\n')"echo "col: ${colNmArr[coln]}"done
fi
colNames=$(IFS=,; echo "${colNmArr[*]}")
echo "SQL:$colNames"

如:

sh restoreNLFunc.sh "1,2" "col1,col2,col3"
col1 col2 col3
有列含有换行符:1,2
col: col2
col: replace(col2,'1','2')
col: col3
col: replace(col3,'1','2')
SQL:col1,replace(col2,'1','2'),replace(col3,'1','2')

最后再parquet表中,看到正常的换行了。

http://www.dtcms.com/a/331254.html

相关文章:

  • Java设计模式之《原型模式》--深、浅copy
  • 17 ABP Framework 项目模板
  • Origin绘制正态分布直方图+累积概率图|科研论文图表教程(附数据格式模板)
  • JS的学习6
  • 目标检测-动手学计算机视觉12
  • Redis入门到实战教程,深度透析redis
  • Promise 对象作用及使用场景
  • 实验室的样本是否安全?如何确保实验数据的准确性和可靠性?
  • 京东【自主售后】物流信息获取_影刀RPA源码解读
  • 如何写出更清晰易读的布尔逻辑判断?
  • 企业智脑正在构建企业第二大脑,四大场景引擎驱动数字化转型新范式
  • 异步同步,阻塞非阻塞,reactor/proactor
  • android 升级AGP版本后部分so文件变大
  • 记录JetPack组件用法及原理
  • c语言中堆和栈的区别
  • Mybatis学习笔记(二)
  • Python学习-----3.基础语法(2)
  • Linux面试题及详细答案 120道(1-15)-- 基础概念
  • Linux下的软件编程——framebuffer(文件操作的应用)
  • 初识CNN01——认识CNN
  • 计算机组成原理20250814
  • 网络通信---Axios
  • 在线进销存系统高效管理网站源码搭建可二开
  • 考研408《计算机组成原理》复习笔记,第三章(7)——虚拟存储器
  • 考公VS考研,拼哪个性价比高?
  • 什么是域名抢注?域名抢注常见问题汇总
  • 图书商城小程序怎么做?实体书店如何在微信小程序上卖书?
  • 使用vllm运行智谱GLM-4.5V视觉语言模型推理服务
  • 如何使用 AI 大语言模型解决生活中的实际小事情?
  • 数据结构——线性表(链表,力扣简单篇)