linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令
需求
需求方通过sftp不定时的上传一批用户(SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv),需要我们从这些用户中找出满足条件的用户。然后把这些结果用户通过文件的形式上传到ftp。
环境说明
ip1能连接hive库环境,不能连接sftp。
ip2不能连接hive库环境,能连接sftp。
ip1和ip2是共享盘,能同时访问公共目录。
目录规划
源文件名: SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv (例:SBXDS_ACC_M_test001_20240201103828.csv)
结果文件名: WTF_YBZ_DSGS_任务id.csv (例:WTF_YBZ_DSGS_test001.csv)
本地路径: /date/localPath/SBXDS_ACC_M
sftp下载路径(源文件):/ftpdata/preciseUpload
sftp上传路径(结果文件):/ftpdata/orderringlist
处理逻辑
1.通过sftp获取preciseUpload目录下,当天的 SBXDS_ACC_M_*_YYYYMMDD*.csv 文件名,并写入本地文件SBXDS_ACC_M.txt
2.比对SBXDS_ACC_M.txt里面的文件是否在SBXDS_ACC_M目录下存在,不存在则将文件名写入SBXDS_ACC_M_NEWFILES.txt。
3.获取SBXDS_ACC_M_NEWFILE.txt里面的文件,并写入hive表 dods_ftp_target_users partition(task_id)。
4.目标用户与通信圈用户匹配,跑出结果数据dapp_ftp_calling_txq_users partition(task_id)。
5.结果数据写入文件WTF_YBZ_DSGS_task_id.csv,并上传sftp
6.为了防止文件正在传输,内容未传输完,需求方就把文件取走了,导致漏数据的情况。在文件上传之后,对文件进行重命名处理。
hive表
-- 目标用户hive表 create table ods_target_users( phone_no string ) -- 结果数据hive表 create table dapp_target_result_users( phone_no string, msg_info string )
脚本执行顺序
sh ftp_getTodayFiles.sh 20240201 # ip1执行,文件处理
sh ftp_getTxq_users.sh # ip2 执行,跑SQL
sh ftp_sftpPutFiles.sh # ip1 执行,文件处理
脚本
#!/bin/bash # ftp_getTodayFiles.sh # ip2 执行 # 调用方式 sh ftp_getTodayFiles.sh 20240201 # 定义函数,获取文件名称 function funcGetTodayFilesName(){ dateNo=$1 PORT=22 HOST="ip1" USERNAME="uname1" PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M" SFTPDIR="/ftpdata/preciseUpload" FILENAME="SBXDS_ACC_M_*_$dateNo*" cd $LOCALDIR /usr/bin/expect << EOF spawn sftp -P $PORT $USERNAME@$HOST expect "*Password*" send "$PASSWORD\r" expect "*#" send "cd $SFTPDIR\r" expect "*#" send "ls -1 $FILENAME\r" expect eof EOF } # 定义函数,sftp获取文件 function funcSftpGetFile(){ FILENAME=$1 PORT=22 HOST="ip1" USERNAME="uname1" PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M" SFTPDIR="/ftpdata/preciseUpload" cd $LOCALDIR /usr/bin/expect << EOF spawn sftp -P $PORT $USERNAME@$HOST expect "*Password*" send "$PASSWORD\r" expect "*#" send "cd $SFTPDIR\r" expect "*#" send "get $FILENAME \r" expect eof EOF } # 调用函数,获取文件名称 dateNo=$1 LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR echo "开始获取sftp目录下当天文件名称SBXDS_ACC_M_*_$dateNo*......" funcGetTodayFilesName $dateNo|grep ".csv" > SBXDS_ACC_M_FILES.txt echo "获取sftp目录下当天文件名称结束" dayfiles=$(cat SBXDS_ACC_M_FILES.txt|wc -l) if [ $dayfiles -eq 0 ]; then echo "sftp今日无文件。" exit else echo "当日所有文件:" cat SBXDS_ACC_M_FILES.txt fi echo "清空文件 SBXDS_ACC_M_NEWFILES.txt" echo "判断本地是否存在文件,不存在则将文件名写入文件SBXDS_ACC_M_NEWFILES.txt" > SBXDS_ACC_M_NEWFILES.txt for filename in $(cat SBXDS_ACC_M_FILES.txt|sed 's/\r$//') do if [ -e $filename ]; then echo "$filename 已存在" else echo "$filename 新文件" echo $filename >> SBXDS_ACC_M_NEWFILES.txt fi done newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles -eq 0 ]; then echo "无新增文件。" exit else echo "文件名写入SBXDS_ACC_M_NEWFILES.txt结束" echo "当日新增文件$newfiles个:" cat SBXDS_ACC_M_NEWFILES.txt fi # 调用函数,sftp获取文件 echo "开始下载文件...." LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//') do echo "下载$filename..." funcSftpGetFile $filename done echo "下载文件结束,文件名如下:" cat SBXDS_ACC_M_NEWFILES.txt # 验证 SBXDS_ACC_M_NEWFILES.txt newfiles2=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles2 -eq 0 ]; then echo "文件异常!!!" exit 1 fi
#!/bin/bash # ftp_getTxq_users.sh # ip1 执行 # 把SBXDS_ACC_M_NEWFILE.txt里面的文件写入hive表 dods_ftp_target_users partition(task_id)。 # 匹配通信圈用户 dapp_ftp_txq_users # 结果数据导入文件WTF_YBZ_DSGS_task_id # 调用格式: sh ftp_getTxq_users.sh 20240114 dateNo=$1 LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles -eq 0 ]; then echo "无新增文件。" exit fi fileList=() for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//') do task_id=$(echo $filename|cut -d'_' -f4) echo "文件名: $filename,任务ID: $task_id" echo "执行命令:hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id if [ $? -eq 0 ]; then echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id else echo "分区 $task_id 已存在。" echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id" hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id fi # 匹配通信圈用户 hive -e" MSCK REPAIR TABLE dods_ftp_target_users; insert overwrite table dapp_ftp_txq_users partition(task_id='${task_id}') select a.phone_no, b.msg_info from dods_ftp_target_users a, dods_msg_info b where a.phone_no=b.phone_no and a.task_id='${task_id}' and b.deal_day='${dateNo}' group by a.phone_no, b.msg_info " # 结果数据导入文件 hive -e" select concat_ws(',',phone_no,msg_info) from dapp_ftp_txq_users where task_id = '${task_id}' " > WTF_YBZ_DSGS_${task_id} # 文件记录数 file_rows=$(cat WTF_YBZ_DSGS_${task_id}|wc -l) msgs="源文件$filename的结果文件WTF_YBZ_DSGS_${task_id},记录数$file_rows。" fileList+=$msgs done echo "************************* 执行成功 *************************" for msg in ${fileList[*]} do echo $msg done
#!/bin/bash # ftp_sftpPutFiles.sh # ip2 执行 # 调用格式 sh ftp_sftpPutFiles.sh # 定义函数,sftp上传文件 function sftpPutFiles(){ FILENAME=$1 PORT=22 HOST="ip1" USERNAME="uname1" PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M" SFTPDIR="/ftpdata/orderringlist" cd $LOCALDIR /usr/bin/expect << EOF spawn sftp -P $PORT $USERNAME@$HOST expect "*Password*" send "$PASSWORD\r" expect "*#" send "cd $SFTPDIR\r" expect "*#" send "put $FILENAME \r" expect "*#" send "rename $FILENAME ${FILENAME}.csv \r" expect eof EOF } # 调用函数,sftp上传文件 LOCALDIR="/date/localPath/SBXDS_ACC_M" cd $LOCALDIR newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l) if [ $newfiles -eq 0 ]; then echo "无新增文件。" exit fi echo "开始sftp上传文件...." for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//') do task_id=$(echo $filename|cut -d'_' -f4) echo "上传WTF_YBZ_DSGS_${task_id}..." sftpPutFiles WTF_YBZ_DSGS_${task_id} done echo "*************** sftp上传文件结束 ***************"
热门相关:冲浪同学会 花样女鬼 刀尖 甜蜜隐婚:老公大人,宠上瘾 我和超级大佬隐婚了