当前位置: 首页 > news >正文

FTP定时推拉数据思考

目录

  • 需求
  • 思考
    • 1. 使用临时文件
    • 2. 判断文件完整性
    • 3. 判断是否重复
    • 4. 定时任务频率
    • 5. 上游未到等待
    • 6. 日志与删除
  • 代码示例
    • 拉取
    • 推送

虽然没有什么经验也不太情愿,最近尝试做了一些业务化的FTP数据推拉工作,一些思考记录在此。

需求

将数据文件通过FTP协议定时从src服务器传输到dst服务器。如果程序运行在src服务器上,即为推送;如果程序运行在dst服务器上,即为拉取。

思考

1. 使用临时文件

由于数据推拉写入硬盘需要时间,所以推拉写入过程中,数据文件在硬盘上是不完整的,若此时下游程序来读取便会出问题。

可以先写入临时文件,完成后再重命名为目标文件名。

2. 判断文件完整性

拉取数据后重命名前需要判断文件完整性。

推送数据前需要判断文件完整性。

3. 判断是否重复

除了判断上游数据是否到位,还要判断上游数据是否变化(被覆盖)。判断方式可以为时间戳和文件大小。

采用过不管三七二十一只要上游数据存在就推拉一遍的定时任务方案,后来发现不太合适。除了重复推拉浪费资源时间外,dst服务器上的文件时间戳会被重复更新,这样不好判断最早到位时间,出问题了不方便blame。

4. 定时任务频率

由于担心上游数据没有准时到,试过在某个时间段内密集定时启动几次程序,后来发现这么做并不好。

如果受网络之类的问题导致某次推拉数据用时大于与下一次定时任务的间隔,那下一次任务启动时可能会和这次发生对同一个文件写入的冲突。当然如果如思考1中所说使用了不同临时文件名可以回避这一冲突,但仍然有重复传输以及第一次成功时间戳被覆盖的问题。

所以定时任务的间隔不宜过低,最好保证两次启动运行时间不要重叠。

5. 上游未到等待

当程序启动时,若上游数据还未到,在程序内采用适当的等待重试比直接退出指望下一次定时任务更好。等待可以sleep。用循环实现重试里比使用递归更容易控制重试次数。等待重试的方案同样要注意最好保证两次启动运行时间不要重叠。

6. 日志与删除

有时间的话还是要实现日志功能,将运行日志按天记录在一个日志路径下,便于判断传输用时以及定时任务调试。

定时删除包含日志的过期数据。判断文件修改时间比根据文件名里的时间更方便合理。

代码示例

整理简化了拉取和推送两个python示例,两者略有区别。

拉取

from ftplib import FTP_TLS, FTP, error_perm
from datetime import datetime, timedelta
from glob import glob
from os.path import getmtime, exists, getsize, dirname
from os import remove, makedirs, getpid
from shutil import move
from time import sleep
import tarfile
from contextlib import contextmanager
import logging
from logging.config import dictConfig
import tracebackdef is_complete(filepath):"""判断文件完整性。以tar为例"""try:with tarfile.open(filepath, 'r:*') as tar:members = tar.getmembers()if members:return Trueelse:return Falseexcept (tarfile.TarError, IOError) as e:return Falsedef make_filename():"""构造每次启动要传输的文件名"""now = datetime.now()if now.hour < 7:t = (now - timedelta(hours=7)).strftime('%Y%m%d20')elif 7 <= now.hour < 19:t = now.strftime('%Y%m%d08.tar')elif now.hour >= 19:t = now.strftime('%Y%m%d20.tar')return t@contextmanager
def get_ftp():"""连接ftp"""ftp = FTP_TLS()ftp.connect('666.666.666.666', 21)ftp.login('user', 'passwd')ftp.prot_p()# ftp = FTP()# ftp.set_pasv(True)# ftp.connect('666.666.666.666', 21)# ftp.login('user', 'passwd')yield ftpftp.quit()def get_src_mtime_size(path_src):"""获取ftp上指定文件的修改时间和大小"""try:with get_ftp() as ftp:mtime = ftp.sendcmd('MDTM ' + path_src).split(' ')[1]mtime = datetime.strptime(mtime, '%Y%m%d%H%M%S')size = ftp.size(path_src)except error_perm as e:mtime, size = None, Nonereturn mtime, sizedef get_dst_mtime_size(path_dst):"""获取保存到指定路径文件的修改时间和大小"""mtime, size = None, Noneif exists(path_dst):mtime = datetime.fromtimestamp(getmtime(path_dst))size = getsize(path_dst)return mtime, sizedef get_logger():path_log = f'/path/to/log/{datetime.now():%Y%m%d}.log'makedirs(dirname(path_log), exist_ok=True)dictConfig({'version': 1,'disable_existing_loggers': False,'formatters': {'a': {'format': f'%(asctime)s pid={getpid()} %(levelname)s %(message)s','datefmt': '%Y-%m-%dT%H:%M:%S'}},'handlers': {'a': {'class': 'logging.FileHandler','filename': path_log,'formatter': 'a','level': 'INFO'}},'loggers': {'a': {'handlers': ['a'], 'level': 'INFO'}},})logger = logging.getLogger('a')return loggerlogger = get_logger()def pull():"""拉取数据"""file_name = make_filename()logger.info(f'开始下载{file_name}')# 要保存的文件路径path_dst = f'/path/to/dst/{file_name}'# 上游ftp数据路径path_src = f'/path/to/src/{file_name}'# 临时文件路径path_temp = f'/path/to/dst/tempfile{datetime.now():%Y%m%d%H%M%S}.tar'mtime_dst, size_dst = get_dst_mtime_size(path_dst)# 尝试10次,每次失败等待60秒重试for retry in range(10):mtime_src, size_src = get_src_mtime_size(path_src)if mtime_src is None:logger.info('上游数据未找到')sleep(60)elif ((mtime_dst is None) or(mtime_src - mtime_dst > timedelta(minutes=2)) or(size_dst != size_src)):with get_ftp() as ftp:with open(path_temp, 'wb') as f:ftp.retrbinary(f'RETR {path_src}', f.write, blocksize=1048576)if is_complete(path_temp):move(path_temp, path_dst)logger.info(f'下载完成{file_name}')breakelse:logger.info('上游数据不完整')remove(path_temp)else:logger.info(f'跳过下载{file_name}')breakdef delete():"""删除修改时间2天前的文件"""now = datetime.now()paths = [*glob('/path/to/dst/*.tar'), *glob('/path/to/log/*.log')]for path in paths:last_mtime = datetime.fromtimestamp(getmtime(path))if now - last_mtime > timedelta(days=2):remove(path)logger.info(f'删除{path}')def main():try:pull()delete()except Exception as e:logger.error(traceback.format_exc())if __name__ == '__main__':main()

推送

from ftplib import FTP_TLS, FTP, error_perm
from datetime import datetime, timedelta
from glob import glob
from os.path import getmtime, exists, getsize, dirname
from os import remove, makedirs, getpid
from shutil import move
from time import sleep
import tarfile
from contextlib import contextmanager
import logging
from logging.config import dictConfig
import tracebackdef is_complete(filepath):"""判断文件完整性。以tar为例"""try:with tarfile.open(filepath, 'r:*') as tar:members = tar.getmembers()if members:return Trueelse:return Falseexcept (tarfile.TarError, IOError) as e:return Falsedef make_filename():"""构造每次启动要传输的文件名"""now = datetime.now()if now.hour < 7:t = (now - timedelta(hours=7)).strftime('%Y%m%d20')elif 7 <= now.hour < 19:t = now.strftime('%Y%m%d08.tar')elif now.hour >= 19:t = now.strftime('%Y%m%d20.tar')return t@contextmanager
def get_ftp():"""连接ftp"""ftp = FTP_TLS()ftp.connect('888.888.888.888', 21)ftp.login('user', 'passwd')ftp.prot_p()# ftp = FTP()# ftp.set_pasv(True)# ftp.connect('888.888.888.888', 21)# ftp.login('user', 'passwd')yield ftpftp.quit()def get_dst_mtime_size(path_dst):"""获取ftp上指定文件的修改时间和大小"""try:with get_ftp() as ftp:mtime = ftp.sendcmd('MDTM ' + path_dst).split(' ')[1]mtime = datetime.strptime(mtime, '%Y%m%d%H%M%S')size = ftp.size(path_dst)except error_perm as e:mtime, size = None, Nonereturn mtime, sizedef get_src_mtime_size(path_src):"""获取要推送的文件的修改时间和大小"""mtime, size = None, Noneif exists(path_src):mtime = datetime.fromtimestamp(getmtime(path_src))size = getsize(path_src)return mtime, sizedef get_logger():path_log = f'/path/to/log/{datetime.now():%Y%m%d}.log'makedirs(dirname(path_log), exist_ok=True)dictConfig({'version': 1,'disable_existing_loggers': False,'formatters': {'a': {'format': f'%(asctime)s pid={getpid()} %(levelname)s %(message)s','datefmt': '%Y-%m-%dT%H:%M:%S'}},'handlers': {'a': {'class': 'logging.FileHandler','filename': path_log,'formatter': 'a','level': 'INFO'}},'loggers': {'a': {'handlers': ['a'], 'level': 'INFO'}},})logger = logging.getLogger('a')return loggerlogger = get_logger()def push():"""推送数据"""file_name = make_filename()logger.info(f'开始推送{file_name}')# 要推送到的ftp的文件路径path_dst = f'/path/to/dst/{file_name}'# 要推送的数据路径path_src = f'/path/to/src/{file_name}'# 临时文件路径path_temp = f'/path/to/dst/tempfile{datetime.now():%Y%m%d%H%M%S}.tar'mtime_dst, size_dst = get_dst_mtime_size(path_dst)# 尝试10次,每次失败等待60秒重试for retry in range(10):mtime_src, size_src = get_src_mtime_size(path_src)if mtime_src is None:logger.info('要推送的数据未找到')sleep(60)elif not is_complete(path_src):logger.info('要推送的数据不完整')sleep(60)elif ((mtime_dst is None) or(mtime_src - mtime_dst > timedelta(minutes=2)) or(size_dst != size_src)):with get_ftp() as ftp:with open(path_src, 'rb') as f:ftp.storbinary(f'SROR {path_temp}', f, blocksize=1048576)ftp.rename(path_temp, path_dst)logger.info(f'推送完成{file_name}')breakelse:logger.info(f'跳过推送{file_name}')breakdef delete():"""删除修改时间2天前的文件"""now = datetime.now()paths = [*glob('/path/to/src/*.tar'), *glob('/path/to/log/*.log')]for path in paths:last_mtime = datetime.fromtimestamp(getmtime(path))if now - last_mtime > timedelta(days=2):remove(path)logger.info(f'删除{path}')def main():try:push()delete()except Exception as e:logger.error(traceback.format_exc())if __name__ == '__main__':main()
http://www.dtcms.com/a/333802.html

相关文章:

  • 深入理解 Python 闭包:从原理到实践
  • AI - MCP 协议(一)
  • NY232NY236美光固态闪存NY240NY241
  • Dummy步进电机驱动使用和相关问题
  • 疏老师-python训练营-Day46通道注意力(SE注意力)
  • 高通vendor app访问文件
  • 【使用三化总结大模型基础概念】
  • 淘宝/天猫店铺商品搜索利器:taobao.item_search_shop API返回值详解
  • 【秋招笔试】2025.08.15饿了么秋招机考-第一题
  • 嵌入式linux学习 -- 进程和线程
  • CIAIE 2025上海汽车内外饰展观察:从美学到功能的产业跃迁
  • Redis 启动时出现 “Bad file format reading the append only file“ 错误
  • 【万字精讲】 左枝清减·右枝丰盈:C++构筑的二叉搜索森林
  • office2016常见故障解决方法
  • 第七十一章:AI的“个性定制服务”:微调 LLM vs 微调 Diffusion 模型——谁是“魔改之王”?
  • 展览讯息易天邀您共赴第26届中国国际光电博览会
  • AI创业公司分析:Paloma
  • 网络通讯核心知识
  • AI的拜师学艺,模型蒸馏技术
  • 标注工具label-studio保姆级配置教程
  • MySQL主从集群
  • 软件开发过程中的维护活动
  • Effective C++ 条款42:了解 typename 的双重含义
  • 大模型幻觉涉及的违约责任探讨
  • Chrome插件开发全指南
  • K 近邻算法(KNN)及其应用解析
  • strings命令和findstr命令验证iso文件中ntkrnlmp.exe系统版本
  • 昇腾AI自学Day1-- 深度学习基础工具与数学
  • Lecture 9: Concurrency 2
  • AAAI爆款:目标检测新范式,模块化设计封神之作