python(42) : 监听本地文件夹上传到服务器指定目录
1.前言
服务器部署http服务, 本地代码定时扫描或收到点击弹窗按钮扫描文件内容, 初次扫描记录时间到本地, 再次扫描查询更新时间大于记录时间的文件或者记录中不存在的文件夹, 通过http服务上传文件到服务指定目录或者创建文件夹, 扫描支持配置忽略条件。
配置信息示例:
# 监听的文件夹和远程服务器路径
config = {r"F:\test\api": r"/home/test/api",r"F:\test\worker": r"/home/test/worker",
}
# 远程服务器地址
remote_host = f"http://192.168.1.2" # 是否是调试模式, 调试模式下不进行上传
is_debug = False
2.服务器http服务
# -*- coding: utf-8 -*-
"""
Flask HTTP接口服务器
提供文件上传和目录创建功能
"""import os
import argparse
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
import logging
from datetime import datetime
from flask_cors import CORS# pip install flask flask-cors -i https://mirrors.aliyun.com/pypi/simple/ requests# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)app = Flask(__name__)
CORS(app) # 支持跨域# 允许所有文件类型上传
ALLOW_ALL_FILES = Truedef ensure_directory_exists(directory_path):"""确保目录存在,如果不存在则创建"""try:os.makedirs(directory_path, exist_ok=True)logger.info(f"✅ 目录创建成功: {directory_path}")return Trueexcept Exception as e:logger.error(f"❌ 目录创建失败: {directory_path}, 错误: {str(e)}")return False@app.route('/health', methods=['GET'])
def health_check():"""健康检查接口"""return jsonify({'status': 'healthy','timestamp': datetime.now().isoformat(),'message': 'HTTP文件上传服务器运行正常'})@app.route('/file_sync/create_folder', methods=['POST'])
def create_folder():"""创建文件夹接口"""try:data = request.get_json()if not data or 'folder_path' not in data:return jsonify({'success': False,'error': '缺少folder_path参数'}), 400folder_path = data['folder_path']# 构建完整的服务器路径full_path = os.path.join(UPLOAD_FOLDER, folder_path.lstrip('/'))# 确保路径安全full_path = os.path.normpath(full_path)if not full_path.startswith(UPLOAD_FOLDER):return jsonify({'success': False,'error': '路径不安全,不允许访问基础目录之外的文件'}), 400# 创建目录if ensure_directory_exists(full_path):return jsonify({'success': True,'message': f'文件夹创建成功: {folder_path}','full_path': full_path})else:return jsonify({'success': False,'error': f'文件夹创建失败: {folder_path}'}), 500except Exception as e:logger.error(f"创建文件夹异常: {str(e)}")return jsonify({'success': False,'error': f'服务器内部错误: {str(e)}'}), 500@app.route('/file_sync/upload_file', methods=['POST'])
def upload_file():"""文件上传接口 - 支持完整路径"""try:# 检查是否有文件if 'file' not in request.files:return jsonify({'success': False,'error': '没有找到文件'}), 400file = request.files['file']if file.filename == '':return jsonify({'success': False,'error': '没有选择文件'}), 400# 获取目标路径 - 支持完整路径或相对路径target_path = request.form.get('target_path', '')if not target_path:return jsonify({'success': False,'error': '缺少target_path参数'}), 400# 判断是否为完整路径if target_path.startswith('/'):# 完整路径,直接使用full_path = target_pathelse:# 相对路径,拼接到基础目录full_path = os.path.join(UPLOAD_FOLDER, target_path.lstrip('/'))# 确保路径安全full_path = os.path.normpath(full_path)if not full_path.startswith(UPLOAD_FOLDER):return jsonify({'success': False,'error': '路径不安全,不允许访问基础目录之外的文件'}), 400# 确保目标目录存在target_dir = os.path.dirname(full_path)if not ensure_directory_exists(target_dir):return jsonify({'success': False,'error': f'目标目录创建失败: {target_dir}'}), 500# 保存文件filename = secure_filename(file.filename)if not filename:filename = os.path.basename(full_path)# 如果full_path是文件路径,使用它;否则使用目录+文件名if os.path.splitext(full_path)[1]: # 如果full_path有扩展名,说明是文件路径save_path = full_pathelse:save_path = os.path.join(full_path, filename)file.save(save_path)logger.info(f"✅ 文件上传成功: {save_path}")return jsonify({'success': True,'message': f'文件上传成功: {target_path}','full_path': save_path,'file_size': os.path.getsize(save_path)})except Exception as e:logger.error(f"文件上传异常: {str(e)}")return jsonify({'success': False,'error': f'服务器内部错误: {str(e)}'}), 500@app.route('/file_sync/batch_upload', methods=['POST'])
def batch_upload():"""批量上传接口"""try:data = request.get_json()if not data or 'files' not in data:return jsonify({'success': False,'error': '缺少files参数'}), 400results = []for file_info in data['files']:if 'file_path' not in file_info or 'target_path' not in file_info:results.append({'file_path': file_info.get('file_path', 'unknown'),'success': False,'error': '缺少必要参数'})continue# 这里需要客户端提供文件内容,实际实现可能需要调整# 暂时返回模拟结果results.append({'file_path': file_info['file_path'],'target_path': file_info['target_path'],'success': True,'message': '批量上传功能待实现'})return jsonify({'success': True,'message': '批量上传完成','results': results})except Exception as e:logger.error(f"批量上传异常: {str(e)}")return jsonify({'success': False,'error': f'服务器内部错误: {str(e)}'}), 500@app.errorhandler(404)
def not_found(error):return jsonify({'success': False,'error': '接口不存在'}), 404@app.errorhandler(500)
def internal_error(error):return jsonify({'success': False,'error': '服务器内部错误'}), 500def main():"""主函数"""parser = argparse.ArgumentParser(description='HTTP文件上传服务器')parser.add_argument('--port', '-p', type=int, default=5100,help='服务器端口,默认5100')parser.add_argument('--host', default='0.0.0.0',help='服务器主机地址,默认0.0.0.0')parser.add_argument('--upload-folder', '-u', default='/home/test',help='上传目录,默认/home/test')parser.add_argument('--debug', '-d', action='store_true',help='启用调试模式')args = parser.parse_args()# 设置全局配置global UPLOAD_FOLDERUPLOAD_FOLDER = args.upload_folder# 确保上传目录存在ensure_directory_exists(UPLOAD_FOLDER)logger.info(f"🚀 HTTP文件上传服务器启动")logger.info(f"📁 上传目录: {UPLOAD_FOLDER}")logger.info(f"🌐 服务地址: http://{args.host}:{args.port}")app.run(host=args.host, port=args.port, debug=args.debug)if __name__ == '__main__':main()
3.本地监听脚本(二者都可用, 看场景)
3.1.定时扫描
# -*- coding: utf-8 -*-
from ast import main
import os
import sys
import time
from pathlib import Path
import fnmatch
import pickle
from loguru import logger
import requests# pip install loguru requests -i https://mirrors.aliyun.com/pypi/simple/ requests # 配置loguru日志
logger.remove() # 移除默认处理器# 确保logs目录存在
os.makedirs("logs", exist_ok=True)logger.add(sys.stdout,format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",level="INFO",colorize=True
)
logger.add("logs/file_watcher_{time:YYYY-MM-DD}.log",format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",level="DEBUG",rotation="1 day",retention="30 days",compression="zip"
)# 默认排除规则
DEFAULT_EXCLUDE_PATTERNS = {"files": ["*.pyc", # Python字节码文件"*.pyo", # Python优化字节码文件"__pycache__", # Python缓存目录"*.log", # 日志文件"*.tmp", # 临时文件"*.temp", # 临时文件"*.swp", # Vim交换文件"*.swo", # Vim交换文件"*~", # 备份文件".DS_Store", # macOS系统文件"Thumbs.db", # Windows缩略图文件"*.pid", # 进程ID文件".git*", # Git相关文件".vscode", # VS Code配置".idea", # IntelliJ IDEA配置"node_modules", # Node.js依赖".env", # 环境变量文件"*.bak", # 备份文件"*.orig", # 原始文件备份"__init__.py", # 初始化文件"file_timestamps.pkl" # 时间戳记录文件本身],"folders": ["__pycache__", # Python缓存目录".git", # Git仓库".vscode", # VS Code配置".idea", # IntelliJ IDEA配置"node_modules", # Node.js依赖".pytest_cache", # pytest缓存".coverage", # 覆盖率文件"logs", # 日志目录"tmp", # 临时目录"temp", # 临时目录"cache", # 缓存目录".cache" # 缓存目录]
}class ExcludeManager:"""排除规则管理器"""def __init__(self):self.exclude_patterns = {"files": [], "folders": []}self.exclude_patterns['files'].extend(DEFAULT_EXCLUDE_PATTERNS['files'])self.exclude_patterns['folders'].extend(DEFAULT_EXCLUDE_PATTERNS['folders'])def should_exclude(self, file_path):"""检查文件是否应该被排除"""file_name = os.path.basename(file_path)if os.path.isdir(file_path):for pattern in self.exclude_patterns['folders']:if fnmatch.fnmatch(file_name, pattern):return Trueelse:# 检查文件名模式for pattern in self.exclude_patterns['files']:if fnmatch.fnmatch(file_name, pattern):return True# 检查路径中的文件夹模式path_parts = Path(file_path).partsfor part in path_parts:for pattern in self.exclude_patterns['folders']:if fnmatch.fnmatch(part, pattern):return Truereturn Falsedef list_exclude_patterns(self):"""列出所有排除规则"""logger.info("📋 当前排除规则:")logger.info("文件:")for pattern in self.exclude_patterns['files']:logger.info(f" - {pattern}")logger.info("文件夹:")for pattern in self.exclude_patterns['folders']:logger.info(f" - {pattern}")class FileTimestampManager:"""文件时间戳管理器"""def __init__(self, timestamp_file="file_timestamps.pkl"):self.timestamp_file = timestamp_fileself.timestamps = self.load_timestamps()logger.success(f"\n✅ 加载时间戳记录成功: {self.timestamp_file}")def load_timestamps(self):"""加载时间戳记录"""if os.path.exists(self.timestamp_file):try:with open(self.timestamp_file, 'rb') as f:return pickle.load(f)except Exception as e:logger.error(f"⚠️ 加载时间戳记录失败: {e}")return {} return {}def save_timestamps(self):"""保存时间戳记录"""try:with open(self.timestamp_file, 'wb') as f:pickle.dump(self.timestamps, f)except Exception as e:logger.error(f"⚠️ 保存时间戳记录失败: {e}")def get_file_timestamp(self, file_path):"""获取文件修改时间"""try:return os.path.getmtime(file_path)except Exception:return 0def should_upload_file(self, file_path):"""检查文件是否需要上传"""current_timestamp = self.get_file_timestamp(file_path)recorded_timestamp = self.timestamps.get(file_path, None)if recorded_timestamp is None:return False# 如果当前时间戳大于记录的时间戳,则需要上传if current_timestamp > recorded_timestamp:self.timestamps[file_path] = current_timestampreturn Truereturn Falsedef should_create_folder(self, folder_path):"""检查文件夹是否需要创建"""# 文件夹的标识符,添加前缀区分文件和文件夹folder_key = f"FOLDER:{folder_path}"current_timestamp = self.get_file_timestamp(folder_path)recorded_timestamp = self.timestamps.get(folder_key, None)if recorded_timestamp is None :# 首次检测到文件夹self.timestamps[folder_key] = current_timestampreturn False# 如果当前时间戳大于记录的时间戳,说明文件夹被修改过if current_timestamp > recorded_timestamp:self.timestamps[folder_key] = current_timestampreturn Truereturn Falsedef update_folder_timestamp(self, folder_path):"""更新文件夹时间戳记录"""folder_key = f"FOLDER:{folder_path}"self.timestamps[folder_key] = self.get_file_timestamp(folder_path)def update_timestamp(self, file_path):"""更新文件时间戳记录"""self.timestamps[file_path] = self.get_file_timestamp(file_path)class FileUploader:"""文件上传器 - HTTP接口版本"""def __init__(self, remote_host):self.remote_host = remote_hostdef upload_file(self, file_path, remote_path):if is_debug:logger.success(f"✅ 调试模式-文件上传成功: {remote_path}")return True"""通过HTTP接口上传文件到服务器 - 支持完整路径"""try:# 检查文件是否存在if not os.path.exists(file_path):logger.error(f"❌ 文件不存在: {file_path}")return False# 准备上传数据 - 直接传递完整路径with open(file_path, 'rb') as f:files = {'file': f}data = {'target_path': remote_path} # 直接传递完整路径# 发送HTTP请求print(f"{self.remote_host}/file_sync/upload_file")response = requests.post(f"{self.remote_host}/file_sync/upload_file",files=files,data=data,timeout=60 # 60秒超时)# 检查响应if response.status_code == 200:result = response.json()if result.get('success'):rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.success(f"\n✅ 上传成功: {file_path}")return Trueelse:rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.error(f"❌ 上传失败: {rel_path}")logger.error(f"错误信息: {result.get('error', '未知错误')}")return Falseelse:rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.error(f"❌ 上传失败: {rel_path}")logger.error(f"HTTP状态码: {response.status_code}")logger.error(f"响应内容: {response.text}")return Falseexcept requests.exceptions.Timeout:logger.error(f"❌ 上传超时: {file_path}")return Falseexcept requests.exceptions.ConnectionError:logger.error(f"❌ 连接失败: {self.remote_host}")return Falseexcept Exception as e:logger.exception(f"❌ 上传异常: {file_path}, 错误: {str(e)}")return Falsedef create_folder(self, folder_path, remote_path):if is_debug:logger.success(f"✅ 调试模式-文件夹创建成功: {remote_path}")return True"""通过HTTP接口在服务器上创建文件夹"""try:# 构建远程路径(去掉开头的斜杠)remote_folder_path = remote_path.lstrip('/')# 准备请求数据data = {'folder_path': remote_folder_path}# 发送HTTP请求response = requests.post(f"{self.remote_host}/file_sync/create_folder",json=data,timeout=30 # 30秒超时)# 检查响应if response.status_code == 200:result = response.json()if result.get('success'):logger.success(f"✅ 文件夹创建成功: {remote_folder_path}")return Trueelse:logger.error(f"❌ 文件夹创建失败: {remote_folder_path}")logger.error(f"错误信息: {result.get('error', '未知错误')}")return Falseelse:logger.error(f"❌ 文件夹创建失败: {remote_folder_path}")logger.error(f"HTTP状态码: {response.status_code}")logger.error(f"响应内容: {response.text}")return Falseexcept requests.exceptions.Timeout:logger.error(f"❌ 创建文件夹超时: {remote_folder_path}")return Falseexcept requests.exceptions.ConnectionError:logger.error(f"❌ 连接失败: {self.remote_host}")return Falseexcept Exception as e:logger.exception(f"❌ 创建文件夹异常: {folder_path}, 错误: {str(e)}")return Falseclass FileWatcher:"""文件扫描器(轮询方式)"""def __init__(self, remote_host, config=None, exclude_manager=None, interval=3):self.source_path = Noneself.remote_path = Noneself.remote_host = remote_hostself.config = configself.exclude_manager = exclude_manager or ExcludeManager()self.interval = intervalself.timestamp_manager = FileTimestampManager()self.uploader = FileUploader(remote_host)self.running = Falsedef scan_folder(self, source_path,remote_path):self.source_path = source_pathself.remote_path = remote_path"""递归扫描文件夹"""# logger.info(f"🔍 递归扫描文件夹: {folder_name}")folder_path = source_pathif not os.path.exists(folder_path):logger.warning(f"⚠️ 文件夹不存在: {folder_path}")returnlogger.debug(f"🔍 扫描文件夹: {folder_path}")file_count = 0upload_count = 0folder_count = 0create_folder_count = 0# 首先检查根文件夹是否需要创建if self.timestamp_manager.should_create_folder(folder_path):rel_path = os.path.relpath(folder_path, source_path)remote_folder_path = os.path.join(remote_path, rel_path).replace("\\", "/")logger.info(f"📁 准备创建根文件夹: {rel_path}")if self.uploader.create_folder(folder_path, remote_folder_path):create_folder_count += 1for root, dirs, files in os.walk(folder_path):# 排除不需要的文件夹dirs[:] = [d for d in dirs if not self.exclude_manager.should_exclude(os.path.join(root, d))]# 检查每个子文件夹是否需要创建for dir_name in dirs:dir_path = os.path.join(root, dir_name)folder_count += 1if self.timestamp_manager.should_create_folder(dir_path):# 计算相对路径用于创建文件夹rel_path = os.path.relpath(dir_path, source_path)remote_folder_path = os.path.join(remote_path, rel_path).replace("\\", "/")logger.info(f"📁 准备创建文件夹: {rel_path}")if self.uploader.create_folder(dir_path, remote_folder_path):create_folder_count += 1else:# 更新文件夹时间戳记录self.timestamp_manager.update_folder_timestamp(dir_path)# 处理文件for file in files:file_path = os.path.join(root, file)if self.exclude_manager.should_exclude(file_path):logger.debug(f"🚫 排除文件: {file}")continuefile_count += 1if self.timestamp_manager.should_upload_file(file_path):# 计算相对路径用于上传rel_path = os.path.relpath(file_path, source_path)remote_file_path = os.path.join(remote_path, rel_path).replace("\\", "/")#logger.info(f"📤 准备上传: {rel_path}")if self.uploader.upload_file(file_path, remote_file_path):upload_count += 1self.timestamp_manager.update_timestamp(file_path)#logger.info(f"\n📊 扫描完成: {folder_path} - 检查文件: {file_count}, 上传文件: {upload_count}, 检查文件夹: {folder_count}, 创建文件夹: {create_folder_count}")def start_watching(self):"""开始扫描"""logger.info(f"\n🚀 开始扫描文件夹: {self.source_path}")logger.info(f"\n📡 目标服务器: {self.remote_host}:{self.remote_path}")logger.info(f"\n📁 扫描目录: {', '.join(self.config)}")logger.info(f"\n⏱️ 轮询间隔: {self.interval}秒")logger.info("=" * 60)self.running = Truetry:count = 1while self.running:st =time.time()logger.info(f"🔍 开始执行第{count}次扫描")for source_path, remote_path in self.config.items():self.scan_folder(source_path,remote_path)# 保存时间戳记录self.timestamp_manager.save_timestamps()# 等待下次检查et = time.time()logger.info(f"🔍 第{count}次扫描完成,耗时{round(et-st, 2)}秒")time.sleep(self.interval)count += 1except KeyboardInterrupt:logger.warning("\n🛑 停止扫描...")self.stop_watching()def stop_watching(self):"""停止扫描"""self.running = Falseself.timestamp_manager.save_timestamps()logger.info("✅ 扫描已停止")def main():exclude_manager = ExcludeManager()# 创建并启动扫描器watcher = FileWatcher(remote_host, config,exclude_manager,interval)watcher.start_watching()if __name__ == "__main__":# 监听的文件夹和远程服务器路径config = {r"F:\test\api": r"/home/test/api",r"F:\test\worker": r"/home/test/worker",}# 是否是调试模式, 调试模式下不进行上传is_debug = False # 远程服务器地址remote_host = f"http://192.168.1.2" # 轮询间隔interval = 5logger.info("\n🎯 文件扫描上传服务器启动")logger.info(f"\n🖥️ 远程服务器: {remote_host}")logger.info(f"\n📁 目标文件夹: {config}")main()
3.2.按钮更新
# -*- coding: utf-8 -*-import os
import sys
import time
from pathlib import Path
import fnmatch
import pickle
from loguru import logger
import requests
import tkinter as tk
import win32gui
import win32con# pip install loguru requests pywin32 -i https://mirrors.aliyun.com/pypi/simple/ requests# 配置loguru日志
logger.remove() # 移除默认处理器# 确保logs目录存在
os.makedirs("logs", exist_ok=True)
update_count = 0logger.add(sys.stdout,format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",level="INFO",colorize=True
)
logger.add("logs/file_watcher_{time:YYYY-MM-DD}.log",format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",level="DEBUG",rotation="1 day",retention="30 days",compression="zip"
)# 默认排除规则
DEFAULT_EXCLUDE_PATTERNS = {"files": ["*.pyc", # Python字节码文件"*.pyo", # Python优化字节码文件"__pycache__", # Python缓存目录"*.log", # 日志文件"*.tmp", # 临时文件"*.temp", # 临时文件"*.swp", # Vim交换文件"*.swo", # Vim交换文件"*~", # 备份文件".DS_Store", # macOS系统文件"Thumbs.db", # Windows缩略图文件"*.pid", # 进程ID文件".git*", # Git相关文件".vscode", # VS Code配置".idea", # IntelliJ IDEA配置"node_modules", # Node.js依赖".env", # 环境变量文件"*.bak", # 备份文件"*.orig", # 原始文件备份"__init__.py", # 初始化文件"file_timestamps.pkl" # 时间戳记录文件本身],"folders": ["__pycache__", # Python缓存目录".git", # Git仓库".vscode", # VS Code配置".idea", # IntelliJ IDEA配置"node_modules", # Node.js依赖".pytest_cache", # pytest缓存".coverage", # 覆盖率文件"logs", # 日志目录"tmp", # 临时目录"temp", # 临时目录"cache", # 缓存目录".cache" # 缓存目录]
}class TestTaskbarIcon:def __init__(self):# 注册一个窗口类wc = win32gui.WNDCLASS()hinst = wc.hInstance = win32gui.GetModuleHandle(None)wc.lpszClassName = "PythonTaskbarDemo"wc.lpfnWndProc = {win32con.WM_DESTROY: self.OnDestroy, }classAtom = win32gui.RegisterClass(wc)style = win32con.WS_OVERLAPPED | win32con.WS_SYSMENUself.hwnd = win32gui.CreateWindow(classAtom, "Taskbar Demo", style,0, 0, win32con.CW_USEDEFAULT, win32con.CW_USEDEFAULT,0, 0, hinst, None)hicon = win32gui.LoadIcon(0, win32con.IDI_APPLICATION)nid = (self.hwnd, 0, win32gui.NIF_ICON, win32con.WM_USER + 20, hicon, "Demo")win32gui.Shell_NotifyIcon(win32gui.NIM_ADD, nid)def showMsg(self, title, msg):# 原作者使用Shell_NotifyIconA方法代替包装后的Shell_NotifyIcon方法# 据称是不能win32gui structure, 我稀里糊涂搞出来了.# 具体对比原代码.nid = (self.hwnd, # 句柄0, # 托盘图标IDwin32gui.NIF_INFO, # 标识0, # 回调消息ID0, # 托盘图标句柄"TestMessage", # 图标字符串msg, # 气球提示字符串0, # 提示的显示时间title, # 提示标题win32gui.NIIF_INFO # 提示用到的图标)win32gui.Shell_NotifyIcon(win32gui.NIM_MODIFY, nid)def OnDestroy(self, hwnd, msg, wparam, lparam):nid = (self.hwnd, 0)win32gui.Shell_NotifyIcon(win32gui.NIM_DELETE, nid)win32gui.PostQuitMessage(0) # Terminate the app.t = TestTaskbarIcon()class ExcludeManager:"""排除规则管理器"""def __init__(self):self.exclude_patterns = {"files": [], "folders": []}self.exclude_patterns['files'].extend(DEFAULT_EXCLUDE_PATTERNS['files'])self.exclude_patterns['folders'].extend(DEFAULT_EXCLUDE_PATTERNS['folders'])def should_exclude(self, file_path):"""检查文件是否应该被排除"""file_name = os.path.basename(file_path)if os.path.isdir(file_path):for pattern in self.exclude_patterns['folders']:if fnmatch.fnmatch(file_name, pattern):return Trueelse:# 检查文件名模式for pattern in self.exclude_patterns['files']:if fnmatch.fnmatch(file_name, pattern):return True# 检查路径中的文件夹模式path_parts = Path(file_path).partsfor part in path_parts:for pattern in self.exclude_patterns['folders']:if fnmatch.fnmatch(part, pattern):return Truereturn Falsedef list_exclude_patterns(self):"""列出所有排除规则"""logger.info("📋 当前排除规则:")logger.info("文件:")for pattern in self.exclude_patterns['files']:logger.info(f" - {pattern}")logger.info("文件夹:")for pattern in self.exclude_patterns['folders']:logger.info(f" - {pattern}")class FileTimestampManager:"""文件时间戳管理器"""def __init__(self, timestamp_file="file_timestamps.pkl"):self.timestamp_file = timestamp_fileself.timestamps = self.load_timestamps()logger.success(f"\n✅ 加载时间戳记录成功: {self.timestamp_file}")def load_timestamps(self):"""加载时间戳记录"""if os.path.exists(self.timestamp_file):try:with open(self.timestamp_file, 'rb') as f:return pickle.load(f)except Exception as e:logger.error(f"⚠️ 加载时间戳记录失败: {e}")return {} return {}def save_timestamps(self):"""保存时间戳记录"""try:with open(self.timestamp_file, 'wb') as f:pickle.dump(self.timestamps, f)except Exception as e:logger.error(f"⚠️ 保存时间戳记录失败: {e}")def get_file_timestamp(self, file_path):"""获取文件修改时间"""try:return os.path.getmtime(file_path)except Exception:return 0def should_upload_file(self, file_path):"""检查文件是否需要上传"""current_timestamp = self.get_file_timestamp(file_path)recorded_timestamp = self.timestamps.get(file_path, None)if recorded_timestamp is None:return False# 如果当前时间戳大于记录的时间戳,则需要上传if current_timestamp > recorded_timestamp:self.timestamps[file_path] = current_timestampreturn Truereturn Falsedef should_create_folder(self, folder_path):"""检查文件夹是否需要创建"""# 文件夹的标识符,添加前缀区分文件和文件夹folder_key = f"FOLDER:{folder_path}"current_timestamp = self.get_file_timestamp(folder_path)recorded_timestamp = self.timestamps.get(folder_key, None)if recorded_timestamp is None :# 首次检测到文件夹self.timestamps[folder_key] = current_timestampreturn False# 如果当前时间戳大于记录的时间戳,说明文件夹被修改过if current_timestamp > recorded_timestamp:self.timestamps[folder_key] = current_timestampreturn Truereturn Falsedef update_folder_timestamp(self, folder_path):"""更新文件夹时间戳记录"""folder_key = f"FOLDER:{folder_path}"self.timestamps[folder_key] = self.get_file_timestamp(folder_path)def update_timestamp(self, file_path):"""更新文件时间戳记录"""self.timestamps[file_path] = self.get_file_timestamp(file_path)class FileUploader:"""文件上传器 - HTTP接口版本"""def __init__(self, remote_host):self.remote_host = remote_hostdef upload_file(self, file_path, remote_path):if is_debug:logger.success(f"✅ 调试模式-文件上传成功: {remote_path}")return True"""通过HTTP接口上传文件到服务器 - 支持完整路径"""try:# 检查文件是否存在if not os.path.exists(file_path):logger.error(f"❌ 文件不存在: {file_path}")return False# 准备上传数据 - 直接传递完整路径with open(file_path, 'rb') as f:files = {'file': f}data = {'target_path': remote_path} # 直接传递完整路径# 发送HTTP请求print(f"{self.remote_host}/file_sync/upload_file")response = requests.post(f"{self.remote_host}/file_sync/upload_file",files=files,data=data,timeout=60 # 60秒超时)# 检查响应if response.status_code == 200:result = response.json()if result.get('success'):rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.success(f"\n✅ 上传成功: \n=> {file_path}\n=> {remote_path}")return Trueelse:rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.error(f"❌ 上传失败: {rel_path}")logger.error(f"错误信息: {result.get('error', '未知错误')}")return Falseelse:rel_path = os.path.relpath(file_path, os.path.dirname(file_path))logger.error(f"❌ 上传失败: {rel_path}")logger.error(f"HTTP状态码: {response.status_code}")logger.error(f"响应内容: {response.text}")return Falseexcept requests.exceptions.Timeout:logger.error(f"❌ 上传超时: {file_path}")return Falseexcept requests.exceptions.ConnectionError:logger.error(f"❌ 连接失败: {self.remote_host}")return Falseexcept Exception as e:logger.exception(f"❌ 上传异常: {file_path}, 错误: {str(e)}")return Falsedef create_folder(self, folder_path, remote_path):if is_debug:logger.success(f"✅ 调试模式-文件夹创建成功: {remote_path}")return True"""通过HTTP接口在服务器上创建文件夹"""try:# 构建远程路径(去掉开头的斜杠)remote_folder_path = remote_path.lstrip('/')# 准备请求数据data = {'folder_path': remote_folder_path}# 发送HTTP请求response = requests.post(f"{self.remote_host}/file_sync/create_folder",json=data,timeout=30 # 30秒超时)# 检查响应if response.status_code == 200:result = response.json()if result.get('success'):logger.success(f"✅ 文件夹创建成功: {remote_folder_path}")return Trueelse:logger.error(f"❌ 文件夹创建失败: {remote_folder_path}")logger.error(f"错误信息: {result.get('error', '未知错误')}")return Falseelse:logger.error(f"❌ 文件夹创建失败: {remote_folder_path}")logger.error(f"HTTP状态码: {response.status_code}")logger.error(f"响应内容: {response.text}")return Falseexcept requests.exceptions.Timeout:logger.error(f"❌ 创建文件夹超时: {remote_folder_path}")return Falseexcept requests.exceptions.ConnectionError:logger.error(f"❌ 连接失败: {self.remote_host}")return Falseexcept Exception as e:logger.exception(f"❌ 创建文件夹异常: {folder_path}, 错误: {str(e)}")return Falseclass FileWatcher:"""文件扫描器(轮询方式)"""def __init__(self, remote_host, config=None, exclude_manager=None):self.source_path = Noneself.remote_path = Noneself.remote_host = remote_hostself.config = configself.exclude_manager = exclude_manager or ExcludeManager()self.timestamp_manager = FileTimestampManager()self.uploader = FileUploader(remote_host)self.running = Falsedef scan_folder(self, source_path,remote_path):"""递归扫描文件夹"""# logger.info(f"🔍 递归扫描文件夹: {folder_name}")folder_path = source_pathself.source_path = folder_pathself.remote_path = remote_pathif not os.path.exists(folder_path):logger.warning(f"⚠️ 文件夹不存在: {folder_path}")returnlogger.debug(f"🔍 扫描文件夹: {folder_path}")file_count = 0upload_count = 0folder_count = 0create_folder_count = 0global update_count# 首先检查根文件夹是否需要创建if self.timestamp_manager.should_create_folder(folder_path):rel_path = os.path.relpath(folder_path, self.source_path)remote_folder_path = os.path.join(self.remote_path, rel_path).replace("\\", "/")logger.info(f"📁 准备创建根文件夹: {rel_path}")if self.uploader.create_folder(folder_path, remote_folder_path):create_folder_count += 1for root, dirs, files in os.walk(folder_path):# 排除不需要的文件夹dirs[:] = [d for d in dirs if not self.exclude_manager.should_exclude(os.path.join(root, d))]# 检查每个子文件夹是否需要创建for dir_name in dirs:dir_path = os.path.join(root, dir_name)folder_count += 1if self.timestamp_manager.should_create_folder(dir_path):# 计算相对路径用于创建文件夹rel_path = os.path.relpath(dir_path, self.source_path)remote_folder_path = os.path.join(self.remote_path, rel_path).replace("\\", "/")logger.info(f"📁 准备创建文件夹: {rel_path}")if self.uploader.create_folder(dir_path, remote_folder_path):create_folder_count += 1else:# 更新文件夹时间戳记录self.timestamp_manager.update_folder_timestamp(dir_path)# 处理文件for file in files:file_path = os.path.join(root, file)if self.exclude_manager.should_exclude(file_path):logger.debug(f"🚫 排除文件: {file}")continuefile_count += 1if self.timestamp_manager.should_upload_file(file_path):# 计算相对路径用于上传rel_path = os.path.relpath(file_path, self.source_path)remote_file_path = os.path.join(self.remote_path, rel_path).replace("\\", "/")#logger.info(f"📤 准备上传: {rel_path}")if self.uploader.upload_file(file_path, remote_file_path):update_count += 1upload_count += 1self.timestamp_manager.update_timestamp(file_path)#logger.info(f"\n📊 扫描完成: {folder_path} - 检查文件: {file_count}, 上传文件: {upload_count}, 检查文件夹: {folder_count}, 创建文件夹: {create_folder_count}")def watching(self):"""开始扫描"""st =time.time()logger.info(f"🔍 开始执行扫描")global update_countupdate_count = 0for source_path, remote_path in self.config.items():self.scan_folder(source_path,remote_path)# 保存时间戳记录self.timestamp_manager.save_timestamps()if update_count > 0:t.showMsg("", f"已成功上传【{update_count}】个文件")# 等待下次检查et = time.time()logger.info(f"🔍 扫描完成,耗时{round(et-st, 2)}秒")def stop_watching(self):"""停止扫描"""self.running = Falseself.timestamp_manager.save_timestamps()logger.info("✅ 扫描已停止")def on_button_click():watcher.watching()if __name__ == "__main__":# 监听的文件夹和远程服务器路径config = {r"F:\test\api": r"/home/test/api",r"F:\test\worker": r"/home/test/worker",}# 是否是调试模式, 调试模式下不进行上传is_debug = False # 远程服务器地址remote_host = f"http://192.168.1.2" logger.info("\n🎯 文件扫描上传服务器启动")logger.info(f"\n️ 远程服务器: {remote_host}")logger.info(f"\n📁 目标文件夹: {config}")exclude_manager = ExcludeManager()# 创建并启动扫描器watcher = FileWatcher(remote_host, config,exclude_manager)update_count = 0root = tk.Tk()root.title("")root.geometry("200x100") # 设置初始窗口大小# 🔔 设置窗口置顶(始终在最前面)root.attributes('-topmost', True)# 可选:强制窗口获取焦点# root.focus_force()# 创建按钮,并填充整个窗口button = tk.Button(root,text="执行",font=("宋体", 16),command=on_button_click,bg="#CDBBF9", #fg="white", # 白色文字(可选)relief='flat' # 去掉按钮边框立体感)# 使用 pack 并填充整个窗口button.pack(fill='both', expand=True, padx=10, pady=10)# 启动 GUI 主循环root.mainloop()