【开源解析】:Python打造专业级USB安全弹出工具(附完整源码)
🔥 【开源解析】:Python打造专业级USB安全弹出工具(附完整源码)
🌈 个人主页:创客白泽 - CSDN博客
🔥 系列专栏:🐍《Python开源项目实战》
💡 热爱不止于代码,热情源自每一个灵感闪现的夜晚。愿以开源之火,点亮前行之路。
🐋 希望大家多多支持,我们一起进步!
👍 🎉如果文章对你有帮助的话,欢迎 点赞 👍🏻 评论 💬 收藏 ⭐️ 加关注+💗分享给更多人哦
📌 概述:为什么需要专业USB弹出工具?
在日常使用计算机时,我们经常会遇到"该设备正在使用中,无法安全移除"的烦人提示。传统解决方法要么是暴力拔插(可能损坏数据),要么是反复尝试弹出(效率低下)。本文将介绍如何使用Python开发一个专业级USB安全弹出工具,它能够:
- 智能检测占用USB设备的进程
- 自动终止顽固进程
- 深度解锁驱动器
- 安全弹出硬件设备
- 系统托盘快捷操作
相比Windows自带的弹出功能,我们的工具具有进程可视化、强制解锁、操作日志等高级特性,是IT技术人员和普通用户的实用利器。
🛠️ 功能全景图
功能模块 | 实现技术 | 特色亮点 |
---|---|---|
驱动器检测 | ctypes.windll.kernel32 | 实时刷新可移动设备列表 |
进程扫描 | psutil 库 | 全量扫描+精准定位 |
进程终止 | win32process | 权限提升处理 |
卷解锁 | win32file IOCTL控制 | 底层磁盘操作 |
设备弹出 | IOCTL_STORAGE_EJECT_MEDIA | 硬件级控制 |
GUI界面 | PyQt5 | 专业级交互体验 |
系统托盘 | QSystemTrayIcon | 后台常驻+快捷操作 |
🎨 效果展示
主界面截图
进程检测效果
[14:25:33] 🔍 正在获取进程列表...
[14:25:34] 📊 找到 156 个进程,正在扫描...
[14:25:37] ⚠️ 找到 2 个锁定进程:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[14:25:37] 🆔 PID: 1234
[14:25:37] 📛 名称: explorer.exe
[14:25:37] 📂 路径: C:\Windows\explorer.exe
[14:25:37] 💻 命令: explorer /select,D:\test.docx
[14:25:37] 👤 用户: DESKTOP-Admin
[14:25:37] 📊 状态: running
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
系统托盘菜单
🧰 开发环境准备
必备组件
pip install pywin32 psutil PyQt5 ctypes
特别说明
本程序需要管理员权限运行,因为涉及:
- 进程终止操作
- 底层磁盘控制
- 硬件设备管理
🏗️ 核心代码解析
1. 驱动器检测机制
def get_removable_drives(self):"""获取所有可移动驱动器"""drives = []bitmask = ctypes.windll.kernel32.GetLogicalDrives()for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':if bitmask & 1:drive_type = ctypes.windll.kernel32.GetDriveTypeW(f"{letter}:\\")if drive_type == win32con.DRIVE_REMOVABLE:drives.append(f"{letter}:")bitmask >>= 1return drives
关键技术点:
GetLogicalDrives()
获取所有逻辑驱动器位掩码GetDriveTypeW()
判断驱动器类型- 位运算遍历26个字母驱动器
2. 进程扫描引擎
def find_locking_processes(self):# 获取进程列表(约150-200个系统进程)processes = list(psutil.process_iter(['pid', 'name', 'exe', 'cmdline']))# 双重检测机制for proc in processes:# 检测1:打开的文件句柄for item in proc.open_files():if item.path.lower().startswith(drive_path):locking_processes.append(proc.info)# 检测2:工作目录try:cwd = proc.cwd()if cwd and cwd.lower().startswith(drive_path):locking_processes.append(proc.info)
3. 底层解锁三连击
# 1. 锁定卷(禁止写入)
win32file.DeviceIoControl(h_volume,FSCTL_LOCK_VOLUME, # 控制码0x00090018None, None, None
)# 2. 卸载文件系统
win32file.DeviceIoControl(h_volume,FSCTL_DISMOUNT_VOLUME, # 控制码0x00090020None, None, None
)# 3. 物理弹出
win32file.DeviceIoControl(h_volume,IOCTL_STORAGE_EJECT_MEDIA, # 控制码0x2D4808None, None, None
)
4. PyQt5多线程处理
class WorkerThread(QThread):update_progress = pyqtSignal(str, int, int) # 进度更新信号def run(self):try:if self.operation_type == 'find':self.find_locking_processes()elif self.operation_type == 'unlock_and_eject':self.unlock_and_eject_drive()except Exception as e:self.log_message(f"线程错误: {str(e)}")
🚀 使用教程
基本操作流程
- 启动程序(自动获取管理员权限)
- 从列表选择目标USB驱动器
- 点击"查找占用进程"分析问题
- 点击"解除占用并弹出"安全移除
高级技巧
- 托盘快捷操作:右键系统图标直接选择驱动器
- 自动刷新:每5秒自动更新驱动器列表
- 日志分析:查看完整的操作记录和错误信息
💾 完整源码下载
完整项目源码:
import ctypes
import sys
import win32api
import win32file
import win32con
import win32process
import psutil
import threading
from datetime import datetime
from time import sleep
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QListWidget, QPushButton, QLabel, QProgressBar, QTextEdit,QSystemTrayIcon, QMenu, QMessageBox, QStyle, QFrame, QAction,QDialog, QVBoxLayout, QHBoxLayout)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtGui import QIcon, QFont, QPalette, QColor# Define constants
try:from winioctlcon import FSCTL_LOCK_VOLUME, FSCTL_DISMOUNT_VOLUME, IOCTL_STORAGE_EJECT_MEDIA
except ImportError:FSCTL_LOCK_VOLUME = 0x00090018FSCTL_DISMOUNT_VOLUME = 0x00090020IOCTL_STORAGE_EJECT_MEDIA = 0x2D4808class EjectProgressDialog(QDialog):"""自定义进度对话框"""def __init__(self, parent=None, drive_letter=""):super().__init__(parent)self.setWindowTitle("安全弹出USB驱动器")self.setWindowIcon(QIcon.fromTheme('drive-removable-media'))self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)self.setFixedSize(300, 120)layout = QVBoxLayout()self.setLayout(layout)# 标题标签self.title_label = QLabel(f"正在安全弹出 {drive_letter}:...")self.title_label.setAlignment(Qt.AlignCenter)layout.addWidget(self.title_label)# 进度条self.progress_bar = QProgressBar()self.progress_bar.setRange(0, 100)self.progress_bar.setTextVisible(False)layout.addWidget(self.progress_bar)# 状态标签self.status_label = QLabel("准备解除占用...")self.status_label.setAlignment(Qt.AlignCenter)layout.addWidget(self.status_label)# 取消按钮self.cancel_btn = QPushButton("取消")self.cancel_btn.clicked.connect(self.reject)btn_layout = QHBoxLayout()btn_layout.addStretch()btn_layout.addWidget(self.cancel_btn)btn_layout.addStretch()layout.addLayout(btn_layout)# 设置样式self.setStyleSheet("""QDialog {background-color: #f5f5f5;}QLabel {font-size: 12px;}QProgressBar {border: 1px solid #ccc;border-radius: 3px;text-align: center;height: 12px;}QProgressBar::chunk {background-color: #4CAF50;width: 10px;}""")def update_progress(self, text, value, max_value):"""更新进度显示"""self.progress_bar.setMaximum(max_value)self.progress_bar.setValue(value)self.status_label.setText(text)class WorkerThread(QThread):update_progress = pyqtSignal(str, int, int)update_process_text = pyqtSignal(str)operation_complete = pyqtSignal()show_message = pyqtSignal(str, str, str) # title, message, icondef __init__(self, drive_letter, operation_type):super().__init__()self.drive_letter = drive_letterself.operation_type = operation_type # 'find', 'unlock_and_eject'self.running = Truedef run(self):try:if self.operation_type == 'find':self.find_locking_processes()elif self.operation_type == 'unlock_and_eject':self.unlock_and_eject_drive()except Exception as e:self.log_message(f"线程错误: {str(e)}")finally:self.operation_complete.emit()def get_timestamp(self):"""获取当前时间戳,格式为[HH:MM:SS]"""return datetime.now().strftime("[%H:%M:%S]")def log_message(self, message):"""记录带时间戳的消息"""timestamp = self.get_timestamp()self.update_process_text.emit(f"{timestamp} {message}\n")def find_locking_processes(self):"""查找锁定驱动器的进程"""self.log_message("🔍 正在获取进程列表...")self.update_progress.emit("🔍 正在获取进程列表...", 0, 100)try:processes = list(psutil.process_iter(['pid', 'name', 'exe', 'cmdline', 'username', 'status']))total = len(processes)except Exception as e:self.log_message(f"❌ 获取进程列表失败: {str(e)}")returnself.log_message(f"📊 找到 {total} 个进程,正在扫描...")self.update_progress.emit(f"🔎 正在扫描 0/{total} 进程", 0, total)locking_processes = []drive_path = f"{self.drive_letter}:\\".lower()for i, proc in enumerate(processes):if not self.running:self.log_message("⏹ 用户取消操作")breakself.update_progress.emit(f"🔎 正在扫描 {i+1}/{total}: {proc.name()}", i+1, total)try:# 检查打开的文件for item in proc.open_files():if not self.running:breakif item.path.lower().startswith(drive_path):locking_processes.append({'pid': proc.pid,'name': proc.name(),'exe': proc.exe(),'cmdline': ' '.join(proc.cmdline()),'username': proc.username(),'status': proc.status()})break# 检查工作目录try:cwd = proc.cwd()if cwd and cwd.lower().startswith(drive_path):locking_processes.append({'pid': proc.pid,'name': proc.name(),'exe': proc.exe(),'cmdline': ' '.join(proc.cmdline()),'username': proc.username(),'status': proc.status()})except (psutil.AccessDenied, psutil.NoSuchProcess):passexcept (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess):continueif not self.running:returnif not locking_processes:self.log_message("✅ 未找到锁定进程")else:self.log_message(f"⚠️ 找到 {len(locking_processes)} 个锁定进程:")self.update_process_text.emit("━" * 80 + "\n")for proc in locking_processes:self.update_process_text.emit(f"{self.get_timestamp()} 🆔 PID: {proc['pid']}\n"f"{self.get_timestamp()} 📛 名称: {proc['name']}\n"f"{self.get_timestamp()} 📂 路径: {proc['exe']}\n"f"{self.get_timestamp()} 💻 命令: {proc['cmdline']}\n"f"{self.get_timestamp()} 👤 用户: {proc['username']}\n"f"{self.get_timestamp()} 📊 状态: {proc['status']}\n""━" * 80 + "\n")self.update_progress.emit("✅ 扫描完成", total, total)def unlock_and_eject_drive(self):"""解除占用并弹出驱动器"""self.log_message("🔓 准备解除占用并弹出...")self.update_progress.emit("🔓 准备解除占用并弹出...", 0, 4)# 1. 查找并关闭锁定进程self.log_message("🔍 正在查找锁定进程...")self.update_progress.emit("🔍 正在查找锁定进程...", 1, 4)locking_processes = self.get_locking_processes()if locking_processes:self.log_message(f"⚠️ 找到 {len(locking_processes)} 个锁定进程,尝试关闭...")for proc in locking_processes:if not self.running:self.log_message("⏹ 用户取消操作")breaktry:p = psutil.Process(proc['pid'])p.terminate()self.log_message(f"✅ 已终止进程: {proc['name']} (PID: {proc['pid']})")except Exception as e:self.log_message(f"❌ 终止 {proc['name']} (PID: {proc['pid']}) 失败: {str(e)}")if not self.running:return# 2. 标准解锁方法self.log_message("🔓 正在解除占用...")self.update_progress.emit("🔓 正在解除占用...", 2, 4)drive_path = f"\\\\.\\{self.drive_letter}:"try:h_volume = win32file.CreateFile(drive_path,win32con.GENERIC_READ | win32con.GENERIC_WRITE,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,None,win32con.OPEN_EXISTING,0,None)if h_volume == win32file.INVALID_HANDLE_VALUE:self.show_message.emit("错误", "无法打开驱动器", "critical")returntry:# 锁定卷win32file.DeviceIoControl(h_volume,FSCTL_LOCK_VOLUME,None,None,None)# 卸载卷win32file.DeviceIoControl(h_volume,FSCTL_DISMOUNT_VOLUME,None,None,None)# 3. 弹出媒体self.log_message("🚀 正在弹出驱动器...")self.update_progress.emit("🚀 正在弹出驱动器...", 3, 4)win32file.DeviceIoControl(h_volume,IOCTL_STORAGE_EJECT_MEDIA,None,None,None)message = f"✅ 成功解除占用并弹出 {self.drive_letter}:,现在可以安全移除设备"self.show_message.emit("成功", message, "information")self.log_message(message)except Exception as e:error_msg = f"❌ 解除占用并弹出失败: {str(e)}"self.show_message.emit("错误", error_msg, "critical")self.log_message(error_msg)finally:win32file.CloseHandle(h_volume)except Exception as e:error_msg = f"❌ 操作失败: {str(e)}"self.show_message.emit("错误", error_msg, "critical")self.log_message(error_msg)self.update_progress.emit("✅ 操作完成", 4, 4)def get_locking_processes(self):"""获取所有锁定驱动器的进程"""locking_processes = []drive_path = f"{self.drive_letter}:\\".lower()try:processes = list(psutil.process_iter(['pid', 'name', 'exe', 'cmdline', 'username', 'status']))total = len(processes)for i, proc in enumerate(processes):if not self.running:breaktry:# 检查打开的文件for item in proc.open_files():if not self.running:breakif item.path.lower().startswith(drive_path):locking_processes.append({'pid': proc.pid,'name': proc.name(),'exe': proc.exe(),'cmdline': ' '.join(proc.cmdline()),'username': proc.username(),'status': proc.status()})break# 检查工作目录try:cwd = proc.cwd()if cwd and cwd.lower().startswith(drive_path):locking_processes.append({'pid': proc.pid,'name': proc.name(),'exe': proc.exe(),'cmdline': ' '.join(proc.cmdline()),'username': proc.username(),'status': proc.status()})except (psutil.AccessDenied, psutil.NoSuchProcess):passexcept (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess):continueexcept Exception as e:self.log_message(f"❌ 获取进程列表失败: {str(e)}")return locking_processesdef stop(self):"""停止线程"""self.running = Falseclass USBEjectorPro(QMainWindow):def __init__(self):super().__init__()self.worker_thread = Noneself.running = Falseself.progress_dialog = Noneself.setWindowTitle("💾 USB 安全弹出")self.setGeometry(100, 100, 520, 569) # Reduced height since we removed log view# 设置窗口图标self.setWindowIcon(QIcon.fromTheme('drive-removable-media'))# 创建系统托盘图标self.create_system_tray()self.init_ui()self.refresh_drives()# 自动刷新计时器self.refresh_timer = QTimer(self)self.refresh_timer.timeout.connect(self.refresh_drives)self.refresh_timer.start(5000) # 每5秒刷新一次def init_ui(self):"""初始化主界面"""main_widget = QWidget()self.setCentralWidget(main_widget)layout = QVBoxLayout()main_widget.setLayout(layout)# 标题title_label = QLabel("💾 USB 安全弹出专业版")title_label.setStyleSheet("font-size: 18px; font-weight: bold;")title_label.setAlignment(Qt.AlignCenter)layout.addWidget(title_label)# 驱动器列表drive_group = QWidget()drive_layout = QVBoxLayout()drive_group.setLayout(drive_layout)drive_label = QLabel("💾 可移动驱动器")drive_label.setStyleSheet("font-weight: bold;")drive_layout.addWidget(drive_label)self.drive_list = QListWidget()self.drive_list.setStyleSheet("""QListWidget {font-family: monospace;border: 1px solid #c0c0c0;border-radius: 4px;padding: 2px;}""")drive_layout.addWidget(self.drive_list)layout.addWidget(drive_group)# 按钮button_group = QWidget()button_layout = QHBoxLayout()button_group.setLayout(button_layout)self.refresh_btn = QPushButton("🔄 手动刷新")self.refresh_btn.clicked.connect(self.refresh_drives)self.find_btn = QPushButton("🔍 查找占用进程")self.find_btn.clicked.connect(self.start_find_processes)self.unlock_eject_btn = QPushButton("🔓 解除占用并弹出")self.unlock_eject_btn.clicked.connect(self.start_unlock_and_eject)button_layout.addWidget(self.refresh_btn)button_layout.addWidget(self.find_btn)button_layout.addWidget(self.unlock_eject_btn)layout.addWidget(button_group)# 进度条self.progress_label = QLabel("🟢 准备就绪")layout.addWidget(self.progress_label)self.progress_bar = QProgressBar()self.progress_bar.setRange(0, 100)self.progress_bar.setTextVisible(True)layout.addWidget(self.progress_bar)# 进程信息 (现在也包含日志信息)process_group = QWidget()process_layout = QVBoxLayout()process_group.setLayout(process_layout)process_label = QLabel("📊 进程信息与日志")process_label.setStyleSheet("font-weight: bold;")process_layout.addWidget(process_label)self.process_text = QTextEdit()self.process_text.setReadOnly(True)self.process_text.setStyleSheet("""QTextEdit {font-family: monospace;border: 1px solid #c0c0c0;border-radius: 4px;padding: 2px;}""")process_layout.addWidget(self.process_text)layout.addWidget(process_group)# 设置按钮样式self.set_button_styles()def set_button_styles(self):"""设置按钮自定义样式"""button_style = """QPushButton {padding: 8px;border-radius: 4px;font-weight: bold;border: 1px solid #a0a0a0;min-width: 80px;}QPushButton:hover {background-color: #e0e0e0;}QPushButton:pressed {background-color: #d0d0d0;border: 1px solid #808080;}QPushButton:disabled {color: #a0a0a0;background-color: #f0f0f0;border: 1px solid #c0c0c0;}"""# 为每个按钮设置不同的背景色self.refresh_btn.setStyleSheet(button_style + "background-color: #e6f3ff;")self.find_btn.setStyleSheet(button_style + "background-color: #fff2cc;")self.unlock_eject_btn.setStyleSheet(button_style + "background-color: #e6ffe6;")def create_system_tray(self):"""创建系统托盘图标和菜单"""self.tray_icon = QSystemTrayIcon(self)# 设置托盘图标if QSystemTrayIcon.isSystemTrayAvailable():# 使用系统内置图标icon = self.style().standardIcon(QStyle.SP_DriveCDIcon)self.tray_icon.setIcon(icon)# 创建托盘菜单self.tray_menu = QMenu()# 添加显示主窗口选项show_action = QAction("显示主窗口", self)show_action.triggered.connect(self.show_normal)self.tray_menu.addAction(show_action)# 添加分隔线self.tray_menu.addSeparator()# 添加USB驱动器弹出菜单self.usb_menu = QMenu("安全弹出USB驱动器")self.tray_menu.addMenu(self.usb_menu)# 添加分隔线self.tray_menu.addSeparator()# 添加退出选项exit_action = QAction("退出", self)exit_action.triggered.connect(self.safe_exit)self.tray_menu.addAction(exit_action)# 设置托盘菜单self.tray_icon.setContextMenu(self.tray_menu)# 连接托盘图标点击事件self.tray_icon.activated.connect(self.tray_icon_clicked)# 只有在系统支持托盘图标时才显示if QSystemTrayIcon.isSystemTrayAvailable():self.tray_icon.show()# 初始化USB驱动器菜单self.update_usb_tray_menu()def update_usb_tray_menu(self):"""更新托盘菜单中的USB驱动器列表"""self.usb_menu.clear()drives = self.get_removable_drives()if not drives:action = QAction("没有可移动驱动器", self)action.setEnabled(False)self.usb_menu.addAction(action)returnfor drive in drives:volume_name = self.get_volume_name(drive)action = QAction(f"{drive} - {volume_name}", self)action.setData(drive) # 存储驱动器字母action.triggered.connect(lambda checked, d=drive: self.tray_eject_drive(d))self.usb_menu.addAction(action)def tray_eject_drive(self, drive):"""从托盘菜单弹出驱动器"""if self.running:QMessageBox.warning(self, "警告", "已有操作正在进行")returndrive_letter = drive[0].upper()# 创建进度对话框self.progress_dialog = EjectProgressDialog(self, drive_letter)self.progress_dialog.rejected.connect(self.cancel_eject)# 显示对话框self.progress_dialog.show()# 开始弹出操作self.start_tray_eject(drive_letter)def start_tray_eject(self, drive_letter):"""开始从托盘弹出驱动器"""self.process_text.clear()self.log_message(f"🔓 (托盘操作) 准备解除占用并弹出 {drive_letter}:...")self.running = Trueself.worker_thread = WorkerThread(drive_letter, 'unlock_and_eject')self.worker_thread.update_progress.connect(self.update_tray_progress)self.worker_thread.update_process_text.connect(self.process_text.append)self.worker_thread.operation_complete.connect(self.tray_eject_complete)self.worker_thread.show_message.connect(self.show_message)self.worker_thread.start()def update_tray_progress(self, text, value, max_value):"""更新托盘操作的进度对话框"""if self.progress_dialog:self.progress_dialog.update_progress(text, value, max_value)def cancel_eject(self):"""取消弹出操作"""if self.worker_thread and self.worker_thread.isRunning():self.worker_thread.stop()self.running = Falseif self.progress_dialog:self.progress_dialog.close()self.log_message("⏹ 用户取消操作")def tray_eject_complete(self):"""托盘弹出操作完成"""self.running = False# 关闭进度对话框if self.progress_dialog:self.progress_dialog.close()self.progress_dialog = None# 更新托盘菜单self.update_usb_tray_menu()# 刷新驱动器列表QTimer.singleShot(1000, self.refresh_drives)def tray_icon_clicked(self, reason):"""处理托盘图标点击事件"""if reason == QSystemTrayIcon.Trigger: # 单击if self.isVisible():self.hide()else:self.show_normal()elif reason == QSystemTrayIcon.Context: # 右键self.update_usb_tray_menu() # 更新USB驱动器菜单def show_normal(self):"""正常显示窗口"""self.show()self.setWindowState(self.windowState() & ~Qt.WindowMinimized | Qt.WindowActive)self.activateWindow()def closeEvent(self, event):"""重写关闭事件以最小化到托盘"""if self.tray_icon.isVisible():self.hide()event.ignore()def refresh_drives(self):"""刷新可移动驱动器列表"""self.drive_list.clear()drives = self.get_removable_drives()for drive in drives:volume_name = self.get_volume_name(drive)self.drive_list.addItem(f"{drive} - {volume_name}")# 同时更新托盘菜单self.update_usb_tray_menu()def get_removable_drives(self):"""获取所有可移动驱动器"""drives = []bitmask = ctypes.windll.kernel32.GetLogicalDrives()for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':if bitmask & 1:drive_type = ctypes.windll.kernel32.GetDriveTypeW(f"{letter}:\\")if drive_type == win32con.DRIVE_REMOVABLE:drives.append(f"{letter}:")bitmask >>= 1return drivesdef get_volume_name(self, drive):"""获取驱动器的卷名"""try:volume_name = win32api.GetVolumeInformation(f"{drive}\\")[0]return volume_name if volume_name else "无标签"except:return "无法访问"def start_find_processes(self):"""开始查找锁定进程"""if self.running:QMessageBox.warning(self, "警告", "已有操作正在进行")returnselected_items = self.drive_list.selectedItems()if not selected_items:QMessageBox.warning(self, "警告", "请先选择一个驱动器")returndrive = selected_items[0].text().split()[0]drive_letter = drive[0].upper()self.process_text.clear()self.log_message(f"🔍 准备查找锁定 {drive} 的进程...")self.set_buttons_enabled(False)self.running = Trueself.worker_thread = WorkerThread(drive_letter, 'find')self.worker_thread.update_progress.connect(self.update_progress)self.worker_thread.update_process_text.connect(self.process_text.append)self.worker_thread.operation_complete.connect(self.operation_complete)self.worker_thread.show_message.connect(self.show_message)self.worker_thread.start()def start_unlock_and_eject(self):"""开始解除占用并弹出驱动器"""if self.running:QMessageBox.warning(self, "警告", "已有操作正在进行")returnselected_items = self.drive_list.selectedItems()if not selected_items:QMessageBox.warning(self, "警告", "请先选择一个驱动器")returndrive = selected_items[0].text().split()[0]drive_letter = drive[0].upper()self.process_text.clear()self.log_message(f"🔓 准备解除占用并弹出 {drive}...")self.set_buttons_enabled(False)self.running = Trueself.worker_thread = WorkerThread(drive_letter, 'unlock_and_eject')self.worker_thread.update_progress.connect(self.update_progress)self.worker_thread.update_process_text.connect(self.process_text.append)self.worker_thread.operation_complete.connect(self.operation_complete)self.worker_thread.show_message.connect(self.show_message)self.worker_thread.start()def get_timestamp(self):"""获取当前时间戳,格式为[HH:MM:SS]"""return datetime.now().strftime("[%H:%M:%S]")def update_progress(self, text, value=None, max_value=None):"""更新进度显示"""self.progress_label.setText(text)if value is not None and max_value is not None:self.progress_bar.setMaximum(max_value)self.progress_bar.setValue(value)def log_message(self, message):"""记录日志消息到进程信息窗口"""timestamp = self.get_timestamp()self.process_text.append(f"{timestamp} {message}")def show_message(self, title, message, icon_type):"""显示消息框"""if icon_type == "information":QMessageBox.information(self, title, message)elif icon_type == "warning":QMessageBox.warning(self, title, message)elif icon_type == "critical":QMessageBox.critical(self, title, message)else:QMessageBox.information(self, title, message)def operation_complete(self):"""处理操作完成"""self.running = Falseself.set_buttons_enabled(True)self.update_progress("🟢 准备就绪")# 操作后自动刷新驱动器QTimer.singleShot(1000, self.refresh_drives)def set_buttons_enabled(self, enabled):"""启用或禁用按钮"""self.refresh_btn.setEnabled(enabled)self.find_btn.setEnabled(enabled)self.unlock_eject_btn.setEnabled(enabled)def safe_exit(self):"""安全退出应用程序"""if self.running:reply = QMessageBox.question(self, "确认退出", "有操作正在进行,确定要退出吗?",QMessageBox.Yes | QMessageBox.No)if reply == QMessageBox.No:returnself.tray_icon.hide()QApplication.quit()def main():# 检查平台if sys.platform != "win32":print("本程序仅支持Windows系统")sys.exit(1)# 检查管理员权限try:is_admin = ctypes.windll.shell32.IsUserAnAdmin()except:is_admin = Falseif not is_admin:# 尝试以管理员身份重新启动ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)sys.exit(0)# 检查psutiltry:import psutilexcept ImportError:print("需要psutil库。请安装: pip install psutil")sys.exit(1)app = QApplication(sys.argv)# 设置应用程序样式app.setStyle('Fusion')# 设置应用程序字体font = QFont()font.setFamily('Microsoft YaHei')font.setPointSize(9)app.setFont(font)window = USBEjectorPro()window.show()sys.exit(app.exec_())if __name__ == "__main__":main()
项目结构:
USB_Ejector_Pro/
├── main.py # 主程序入口
├── requirements.txt # 依赖库列表
├── assets/ # 资源文件
│ ├── icon.ico # 程序图标
│ └── screenshot.png # 截图
└── README.md # 使用说明
🎯 技术深度剖析
1. Windows IOCTL控制原理
IOCTL(Input/Output Control)是Windows提供的设备控制接口,我们的程序使用了三类关键控制码:
控制码 | 值 | 功能说明 |
---|---|---|
FSCTL_LOCK_VOLUME | 0x00090018 | 独占锁定卷 |
FSCTL_DISMOUNT_VOLUME | 0x00090020 | 卸载文件系统 |
IOCTL_STORAGE_EJECT_MEDIA | 0x2D4808 | 物理弹出设备 |
2. 进程终止的权限问题
普通进程无法终止系统关键进程,我们的解决方案:
- 启动时检查管理员权限
- 通过
ShellExecuteW
请求UAC提权 - 使用
win32process.TerminateProcess
强制终止
3. PyQt5的多线程模型
GUI线程与工作线程分离的关键点:
- 使用
QThread
而非Python原生线程 - 通过
pyqtSignal
实现线程间通信 - 进度对话框的模态处理
📊 性能优化建议
- 进程扫描加速:缓存系统进程列表,增量更新
- 异常处理增强:对僵尸进程的特殊处理
- 日志系统改进:增加日志分级和文件输出
- 多语言支持:使用Qt的翻译系统
🔮 未来扩展方向
- 网络驱动器支持:扩展对远程存储设备的处理
- 批量操作:同时处理多个USB设备
- 硬件诊断:检测USB接口电压/电流
- 移动端适配:开发Android版本
📝 总结
本文详细介绍了一个专业级USB安全弹出工具的开发全过程,关键技术包括:
- Windows底层设备控制API的使用
- PyQt5构建现代化GUI界面
- 多线程编程在GUI程序中的应用
- 系统托盘程序的开发技巧
这个工具不仅解决了实际痛点,更展示了Python在系统编程方面的强大能力。读者可以根据自身需求进一步扩展功能,比如增加自动备份、磁盘修复等高级特性。
最后提醒:操作存储设备有风险,重要数据请提前备份!
转载请保留出处:CSDN技术博客
作者:创客白泽
更新日期:2025年6月12日
更多技术文章请访问:博客主页