【开源工具】Python打造智能IP监控系统:邮件告警+可视化界面+配置持久化
🌐【开源工具】Python打造智能IP监控系统:邮件告警+可视化界面+配置持久化
🌈 个人主页:创客白泽 - CSDN博客
🔥 系列专栏:🐍《Python开源项目实战》
💡 热爱不止于代码,热情源自每一个灵感闪现的夜晚。愿以开源之火,点亮前行之路。
👍 如果觉得这篇文章有帮助,欢迎您一键三连,分享给更多人哦
📌 概述:为什么需要IP监控系统?
在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求。传统的人工巡检方式效率低下,而商业监控工具又往往价格昂贵。本文将带你用Python从零打造一个高可用IP监控系统,具备以下核心功能:
- ✅ 多目标监控:同时监测多个IP+端口组合状态
- ✅ 智能告警:异常状态自动触发邮件通知(支持多收件人)
- ✅ 可视化界面:基于Tkinter的现代化UI,操作直观
- ✅ 配置持久化:所有设置自动保存,重启不丢失
- ✅ 断线重试机制:避免网络抖动导致的误报
🔍 技术栈:Python 3 + Tkinter + smtplib + socket + 多线程
🛠️ 使用步骤说明
1. 环境准备
# 所需库(Python内置,无需额外安装)
import tkinter
import threading
import smtplib
import socket
2. 系统部署
-
下载完整代码(文末提供)
-
配置
config.json
(首次运行会自动生成) -
运行主程序:
python ip_monitor.py
3. 核心功能配置
📝 目标配置
- 通过表格形式管理监控目标
- 复选框控制是否启用监测
- 支持双击编辑现有条目
📧 邮件设置
服务商 | SMTP地址 | 端口 | 授权码获取方式 |
---|---|---|---|
网易163 | smtp.163.com | 465/994 | 邮箱设置→POP3/SMTP服务 |
QQ邮箱 | smtp.qq.com | 465 | 设置→账户→POP3服务 |
Gmail | smtp.gmail.com | 587 | Google账号→应用密码 |
# 配置示例(支持SSL/TLS)
SMTP服务器: smtp.163.com:465
邮箱账户: yourname@163.com
授权码: xxxxxx # 需在邮箱设置中获取
接收邮箱: admin@company.com,backup@company.com
⚠️ 注意事项:
- 必须开启SMTP服务
- 部分邮箱需要使用授权码而非密码
- Gmail需开启"低安全性应用访问"
⚙️ 监控参数
检测模式说明
纯IP检测模式
- 留空端口字段
- 使用ICMP协议Ping检测
- 适用场景:网络设备监控
组合检测模式
# 同时验证IP可达性和端口开放状态
if ping_success and port_open:return ONLINE
参数名 | 默认值 | 说明 |
---|---|---|
监测间隔 | 10秒 | 两次检测的时间间隔 |
超时时间 | 2秒 | 判定离线的超时阈值 |
重试次数 | 3次 | 连续失败次数触发告警 |
重试间隔 | 5秒 | 失败后的快速重试间隔 |
🖥️ 系统效果展示
实时监控面板
邮件告警示例
主题:[告警] IP状态变更: 192.168.1.1:80 - 离线
IP地址: 192.168.1.1
端口: 80
备注: 主数据库服务器
状态变更为: 离线
检测时间: 2023-08-20 14:30:45
日志记录
[2023-08-20 14:30:45] 检测到 192.168.1.1:80 状态变更为离线
[2023-08-20 14:31:00] 已发送告警邮件给3个收件人
[2023-08-20 15:00:00] 已发送每日状态报告
💻 核心源码解析
1. 状态检测引擎
def check_target(self, ip, port=None, timeout=2):"""智能检测IP/端口状态"""try:if port: # 端口检测模式with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.settimeout(timeout)s.connect((ip, port))return IPStatus.ONLINEelse: # 纯Ping模式# 使用ICMP协议实现Pingsock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)sock.settimeout(timeout)sock.connect((ip, 1))return IPStatus.ONLINEexcept socket.timeout:return IPStatus.OFFLINEexcept Exception:return IPStatus.UNKNOWN
2. 多线程监控架构
class MonitorThread(threading.Thread):def __init__(self, app):super().__init__(daemon=True)self.app = appself.running = Truedef run(self):while self.running:# 1. 执行所有目标的检测# 2. 触发状态变更通知# 3. 智能休眠控制CPU占用time.sleep(self.calculate_sleep_time())def stop(self):self.running = False
3. 配置持久化实现
def save_config(self):"""JSON格式保存所有配置"""config = {'targets': [self.tree.item(item, 'values')for item in self.tree.get_children()],'email_settings': {'smtp': self.smtp_entry.get(),'user': self.user_entry.get(),'pass': self.pass_entry.get(),'receivers': self.receiver_entry.get()}}with open('config.json', 'w') as f:json.dump(config, f, indent=2)
🚀 高级功能扩展建议
1. 微信/钉钉机器人告警
# 示例:企业微信机器人API
import requests
def send_wechat_alert(message):webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"requests.post(webhook, json={"text": {"content": message}})
2. 数据库存储历史记录
# 使用SQLite记录状态变化
import sqlite3
conn = sqlite3.connect('monitor.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS status_log(ip TEXT, port INT, status TEXT, check_time TIMESTAMP)''')
3. 可视化图表展示
# 使用Matplotlib绘制可用率曲线
plt.plot(dates, availability_rates)
plt.title('月度服务可用率')
plt.ylabel('百分比(%)')
plt.savefig('report.png')
📝 总结与资源下载
项目亮点
- 工业级可靠性:断线重试+智能休眠机制
- 开箱即用:无需复杂配置,5分钟快速部署
- 高度可扩展:代码结构清晰,便于二次开发
完整源码下载
👉
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import threading
import time
import smtplib
import ipaddress
import re
import json
import socket
from email.message import EmailMessage
from enum import Enumclass IPStatus(Enum):ONLINE = 1OFFLINE = 2UNKNOWN = 3class IPMonitorApp(tk.Tk):def __init__(self):super().__init__()self.title("IP状态监测系统")self.geometry("1300x850")try:self.iconbitmap('monitor.ico')except:passself.style = ttk.Style()self.configure_style()self.stop_event = threading.Event()self.monitor_thread = Noneself.ip_status = {}self.next_report_time = self.calculate_next_report_time()self.create_widgets()self.load_config()def configure_style(self):self.style.theme_create('ipmonitor', parent='alt', settings={'TFrame': {'configure': {'background': '#f5f5f5'}},'TLabelFrame': {'configure': {'background': '#f5f5f5','foreground': '#1e3d59','font': ('微软雅黑', 10, 'bold')}},'TLabel': {'configure': {'background': '#f5f5f5','foreground': '#1e3d59','font': ('微软雅黑', 10)}},'TButton': {'configure': {'background': '#1e3d59','foreground': 'white','font': ('微软雅黑', 10),'padding': 5},'map': {'background': [('active', '#3a6ea5')],'foreground': [('disabled', '#888888')]}},'TCheckbutton': {'configure': {'background': '#f5f5f5','font': ('微软雅黑', 10)}}})self.style.theme_use('ipmonitor')def create_widgets(self):# 主容器main_frame = ttk.Frame(self, padding=10)main_frame.grid(row=0, column=0, sticky="nsew")self.grid_columnconfigure(0, weight=1)self.grid_rowconfigure(0, weight=1)# 配置区域config_frame = ttk.Frame(main_frame)config_frame.grid(row=0, column=0, sticky="nsew")# IP配置区域ip_frame = ttk.LabelFrame(config_frame, text="监测目标配置", padding=10)ip_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")ip_frame.columnconfigure(0, weight=1)# IP列表Treeviewself.ip_tree = ttk.Treeview(ip_frame, columns=("check", "ip", "port", "remark"), show="headings",height=8,selectmode="browse")self.ip_tree.grid(row=0, column=0, sticky="nsew")# 配置列self.ip_tree.heading("check", text="检测")self.ip_tree.heading("ip", text="IP地址")self.ip_tree.heading("port", text="端口")self.ip_tree.heading("remark", text="备注")self.ip_tree.column("check", width=50, anchor="center")self.ip_tree.column("ip", width=150, anchor="w")self.ip_tree.column("port", width=80, anchor="center")self.ip_tree.column("remark", width=250, anchor="w")# 添加复选框self.ip_tree.tag_configure("checked", background="#e6f7ff")self.ip_tree.tag_configure("unchecked", background="#f5f5f5")# 编辑区域edit_frame = ttk.Frame(ip_frame)edit_frame.grid(row=1, column=0, sticky="ew", pady=(5, 0))ttk.Label(edit_frame, text="IP:").grid(row=0, column=0, sticky="e")self.ip_entry = ttk.Entry(edit_frame, width=18)self.ip_entry.grid(row=0, column=1, padx=2, sticky="w")ttk.Label(edit_frame, text="端口:").grid(row=0, column=2, sticky="e")self.port_entry = ttk.Entry(edit_frame, width=8)self.port_entry.grid(row=0, column=3, padx=2, sticky="w")ttk.Label(edit_frame, text="备注:").grid(row=0, column=4, sticky="e")self.remark_entry = ttk.Entry(edit_frame, width=20)self.remark_entry.grid(row=0, column=5, padx=2, sticky="w")# 操作按钮btn_frame = ttk.Frame(edit_frame)btn_frame.grid(row=0, column=6, padx=5)ttk.Button(btn_frame, text="添加", command=self.add_ip).grid(row=0, column=0, padx=2)ttk.Button(btn_frame, text="更新", command=self.update_ip).grid(row=0, column=1, padx=2)ttk.Button(btn_frame, text="删除", command=self.remove_ip).grid(row=0, column=2, padx=2)# 邮件配置区域mail_frame = ttk.LabelFrame(config_frame, text="邮件通知配置", padding=10)mail_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")# 邮件配置项mail_config = [("SMTP服务器:端口", "smtp_server", "smtp.163.com:465"),("邮箱账户", "email_user", "yourname@163.com"),("邮箱授权码", "email_pass", ""),("接收邮箱", "email_receiver", "多个邮箱用逗号分隔")]for i, (label, attr, ph) in enumerate(mail_config):ttk.Label(mail_frame, text=label).grid(row=i, column=0, sticky="e", pady=3)entry = ttk.Entry(mail_frame, width=25)entry.grid(row=i, column=1, padx=5, pady=3, sticky="ew")setattr(self, attr+"_entry", entry)ttk.Label(mail_frame, text=ph, foreground="#888888").grid(row=i, column=2, sticky="w", padx=5)# 监控参数param_frame = ttk.LabelFrame(config_frame, text="监控参数", padding=10)param_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0))params = [("监测间隔(秒):", "interval", "10"),("超时(秒):", "timeout", "2"),("重试次数:", "retry", "3"),("重试间隔(秒):", "retry_interval", "5")]for i, (label, attr, default) in enumerate(params):ttk.Label(param_frame, text=label).grid(row=0, column=i*2, sticky="e")entry = ttk.Entry(param_frame, width=8)entry.insert(0, default)entry.grid(row=0, column=i*2+1, padx=5, sticky="w")setattr(self, attr+"_entry", entry)# 控制按钮ctrl_frame = ttk.Frame(config_frame)ctrl_frame.grid(row=2, column=0, columnspan=2, pady=(10, 0), sticky="e")ttk.Button(ctrl_frame, text="开始监测", command=self.start_monitoring).grid(row=0, column=0, padx=5)ttk.Button(ctrl_frame, text="停止监测", command=self.stop_monitoring, state=tk.DISABLED).grid(row=0, column=1, padx=5)self.start_btn = ctrl_frame.grid_slaves(row=0, column=0)[0]self.stop_btn = ctrl_frame.grid_slaves(row=0, column=1)[0]ttk.Button(ctrl_frame, text="保存配置", command=self.save_config).grid(row=0, column=2, padx=5)ttk.Button(ctrl_frame, text="清空日志", command=self.clear_logs).grid(row=0, column=3, padx=5)# 状态监控区域status_frame = ttk.LabelFrame(main_frame, text="状态监控", padding=10)status_frame.grid(row=1, column=0, sticky="nsew", pady=(5, 0))self.status_tree = ttk.Treeview(status_frame, columns=("ip", "port", "remark", "status", "last_check"), show="headings",height=8)self.status_tree.grid(row=0, column=0, sticky="nsew")# 状态列配置for col, width in [("ip", 150), ("port", 80), ("remark", 200), ("status", 100), ("last_check", 180)]:self.status_tree.column(col, width=width, anchor="center")self.status_tree.heading(col, text=col)# 日志区域log_frame = ttk.LabelFrame(main_frame, text="系统日志", padding=10)log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0))self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, font=('微软雅黑', 9), bg='white', fg='#333333',height=10)self.log_text.pack(fill=tk.BOTH, expand=True)# 配置权重main_frame.columnconfigure(0, weight=1)main_frame.rowconfigure(1, weight=1)main_frame.rowconfigure(2, weight=1)config_frame.columnconfigure(1, weight=1)ip_frame.rowconfigure(0, weight=1)ip_frame.columnconfigure(0, weight=1)status_frame.rowconfigure(0, weight=1)status_frame.columnconfigure(0, weight=1)# 绑定事件self.ip_tree.bind("<Button-1>", self.on_tree_click)self.ip_tree.bind("<Double-1>", self.on_tree_double_click)def on_tree_click(self, event):"""处理树状图点击事件"""region = self.ip_tree.identify("region", event.x, event.y)if region == "cell":column = self.ip_tree.identify_column(event.x)item = self.ip_tree.identify_row(event.y)if column == "#1": # 复选框列current_val = self.ip_tree.item(item, "values")[0]new_val = "✓" if current_val == "" else ""values = list(self.ip_tree.item(item, "values"))values[0] = new_valself.ip_tree.item(item, values=values, tags=("checked" if new_val else "unchecked"))def on_tree_double_click(self, event):"""处理树状图双击事件"""item = self.ip_tree.selection()if item:values = self.ip_tree.item(item, "values")self.ip_entry.delete(0, tk.END)self.ip_entry.insert(0, values[1])self.port_entry.delete(0, tk.END)self.port_entry.insert(0, values[2])self.remark_entry.delete(0, tk.END)self.remark_entry.insert(0, values[3])def add_ip(self):"""添加IP到列表"""ip = self.ip_entry.get().strip()port = self.port_entry.get().strip()remark = self.remark_entry.get().strip()if not ip:messagebox.showwarning("错误", "IP地址不能为空")returnif not self.is_valid_ip(ip):messagebox.showwarning("错误", "无效的IP地址格式")returnif port:try:port = int(port)if not (1 <= port <= 65535):messagebox.showwarning("错误", "端口必须在1-65535之间")returnexcept ValueError:messagebox.showwarning("错误", "端口必须是数字")returnself.ip_tree.insert("", tk.END, values=("✓", ip, port, remark), tags=("checked",))# 清空输入框self.ip_entry.delete(0, tk.END)self.port_entry.delete(0, tk.END)self.remark_entry.delete(0, tk.END)def update_ip(self):"""更新选中的IP"""item = self.ip_tree.selection()if not item:messagebox.showwarning("错误", "请先选择要更新的项")returnip = self.ip_entry.get().strip()port = self.port_entry.get().strip()remark = self.remark_entry.get().strip()if not ip:messagebox.showwarning("错误", "IP地址不能为空")returnif not self.is_valid_ip(ip):messagebox.showwarning("错误", "无效的IP地址格式")returnif port:try:port = int(port)if not (1 <= port <= 65535):messagebox.showwarning("错误", "端口必须在1-65535之间")returnexcept ValueError:messagebox.showwarning("错误", "端口必须是数字")return# 保留原来的复选框状态current_check = self.ip_tree.item(item, "values")[0]self.ip_tree.item(item, values=(current_check, ip, port, remark))def remove_ip(self):"""删除选中的IP"""items = self.ip_tree.selection()if not items:messagebox.showwarning("错误", "请先选择要删除的项")returnfor item in items:self.ip_tree.delete(item)def is_valid_ip(self, ip):"""验证IP地址格式"""try:ipaddress.ip_address(ip)return Trueexcept ValueError:return Falsedef start_monitoring(self):"""开始监控"""if not self.validate_inputs():return# 获取要监控的目标targets = []for item in self.ip_tree.get_children():check, ip, port, remark = self.ip_tree.item(item, "values")if check == "✓": # 只监控选中的项targets.append((ip, int(port) if port else None, remark))if not targets:messagebox.showwarning("错误", "没有选中要监控的目标")return# 初始化状态字典self.ip_status = {}for ip, port, remark in targets:self.ip_status[(ip, port)] = {"status": IPStatus.UNKNOWN,"last_notified": None,"remark": remark,"retries": 0,"next_retry_time": 0,"last_check": "从未检测"}# 更新状态显示self.update_status_display()# 更新按钮状态self.start_btn.config(state=tk.DISABLED)self.stop_btn.config(state=tk.NORMAL)self.stop_event.clear()# 获取监控参数interval = int(self.interval_entry.get())timeout = int(self.timeout_entry.get())max_retries = int(self.retry_entry.get())retry_interval = int(self.retry_interval_entry.get())# 创建监控线程self.monitor_thread = threading.Thread(target=self.monitor_targets,args=(interval, timeout, max_retries, retry_interval),daemon=True)self.monitor_thread.start()self.log_message("监控已启动")def stop_monitoring(self):"""停止监控"""self.stop_event.set()self.start_btn.config(state=tk.NORMAL)self.stop_btn.config(state=tk.DISABLED)self.log_message("监控已停止")def monitor_targets(self, interval, timeout, max_retries, retry_interval):"""监控主循环"""while not self.stop_event.is_set():current_time = time.time()# 每日报告检查if current_time >= self.next_report_time:self.send_daily_report()self.next_report_time = self.calculate_next_report_time()# 检查所有目标for (ip, port), data in list(self.ip_status.items()):if self.stop_event.is_set():breakif current_time < data['next_retry_time']:continue# 执行检测if port: # 如果有端口则检测端口current_status = self.check_port(ip, port, timeout)else: # 否则只ping IPcurrent_status = self.ping_ip(ip, timeout)previous_status = data["status"]data["last_check"] = time.strftime("%Y-%m-%d %H:%M:%S")# 处理状态变化if current_status == IPStatus.OFFLINE:if data['retries'] < max_retries:data['retries'] += 1data['next_retry_time'] = current_time + retry_intervalself.log_message(f"{ip}:{port or '无端口'} 检测失败,正在进行第 {data['retries']} 次重试...")else:if previous_status != current_status:self.send_alert(ip, port, data['remark'], current_status)data["status"] = current_statusdata['retries'] = 0data['next_retry_time'] = current_time + intervalelse:if previous_status != current_status:self.send_alert(ip, port, data['remark'], current_status)data["status"] = current_statusdata['retries'] = 0data['next_retry_time'] = current_time + interval# 更新UIself.after(0, self.update_status_display)# 计算睡眠时间next_checks = [data['next_retry_time'] for data in self.ip_status.values()]next_check = min(next_checks) if next_checks else current_time + intervalsleep_time = max(min(next_check - time.time(), interval), 0.1)time.sleep(sleep_time)def ping_ip(self, ip, timeout):"""Ping IP地址"""try:# 使用socket创建ICMP pingsock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)sock.settimeout(timeout)sock.connect((ip, 1)) # 端口号不重要return IPStatus.ONLINEexcept socket.timeout:return IPStatus.OFFLINEexcept Exception:return IPStatus.UNKNOWNfinally:try:sock.close()except:passdef check_port(self, ip, port, timeout):"""检查端口"""try:with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.settimeout(timeout)s.connect((ip, port))return IPStatus.ONLINEexcept socket.timeout:return IPStatus.OFFLINEexcept ConnectionRefusedError:return IPStatus.OFFLINEexcept Exception as e:self.log_message(f"检测异常: {str(e)}")return IPStatus.UNKNOWNdef update_status_display(self):"""更新状态显示"""# 清空现有显示for item in self.status_tree.get_children():self.status_tree.delete(item)# 添加新状态for (ip, port), data in self.ip_status.items():status = data["status"]status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"status_color = "green" if status == IPStatus.ONLINE else "red" if status == IPStatus.OFFLINE else "orange"item = self.status_tree.insert("", tk.END, values=(ip, port if port else "无", data["remark"], status_text, data["last_check"]))self.status_tree.tag_configure(status_color, foreground=status_color)self.status_tree.item(item, tags=(status_color,))def send_alert(self, ip, port, remark, status):"""发送状态变更通知"""status_text = "在线" if status == IPStatus.ONLINE else "离线"subject = f"IP状态变更: {remark or ip}:{port if port else '无端口'} - {status_text}"content = "\n".join([f"IP地址: {ip}",f"端口: {port if port else '无'}",f"备注: {remark or '无备注信息'}",f"状态变更为: {status_text}",f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"])try:self.send_email(subject, content)self.log_message(f"已发送状态变更通知: {ip}:{port if port else '无'} -> {status_text}")except Exception as e:self.log_message(f"邮件发送失败: {str(e)}")def send_email(self, subject, content):"""发送邮件"""smtp_server = self.smtp_server_entry.get().strip()user = self.email_user_entry.get().strip()password = self.email_pass_entry.get().strip()receiver_str = self.email_receiver_entry.get().strip()if not all([smtp_server, user, password, receiver_str]):raise Exception("邮件配置不完整")receivers = [addr.strip() for addr in receiver_str.split(',') if addr.strip()]msg = EmailMessage()msg['Subject'] = subjectmsg['From'] = usermsg['To'] = receiversmsg.set_content(content)server, port = smtp_server.split(":")port = int(port)try:if port == 465:with smtplib.SMTP_SSL(server, port) as smtp:smtp.login(user, password)smtp.send_message(msg)else:with smtplib.SMTP(server, port) as smtp:smtp.starttls()smtp.login(user, password)smtp.send_message(msg)except Exception as e:raise Exception(f"SMTP错误: {str(e)}")def send_daily_report(self):"""发送每日报告"""report_lines = []for (ip, port), data in self.ip_status.items():status = data["status"]status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"report_lines.append(f"IP: {ip}:{port if port else '无'} 备注: {data['remark']} 状态: {status_text}")current_time = time.strftime("%Y-%m-%d %H:%M:%S")content = "每日IP状态报告\n\n" + "\n".join(report_lines) + f"\n\n报告时间: {current_time}"subject = "每日IP状态报告"try:self.send_email(subject, content)self.log_message("已发送每日状态报告")except Exception as e:self.log_message(f"发送每日报告失败: {str(e)}")def calculate_next_report_time(self):"""计算下次报告时间"""now = time.localtime()today_10am = time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 10, 0, 0, 0, 0, -1))current_time = time.time()return today_10am + 86400 if current_time >= today_10am else today_10amdef log_message(self, message):"""记录日志"""timestamp = time.strftime("%Y-%m-%d %H:%M:%S")log_line = f"[{timestamp}] {message}\n"self.log_text.insert(tk.END, log_line)self.log_text.see(tk.END)with open("monitor.log", "a", encoding="utf-8") as f:f.write(log_line)def clear_logs(self):"""清空日志"""self.log_text.delete(1.0, tk.END)def validate_inputs(self):"""验证输入"""# 检查邮件配置smtp_server = self.smtp_server_entry.get().strip()user = self.email_user_entry.get().strip()password = self.email_pass_entry.get().strip()receiver_str = self.email_receiver_entry.get().strip()if not smtp_server:messagebox.showwarning("错误", "SMTP服务器不能为空")return Falseif ":" not in smtp_server:messagebox.showwarning("错误", "SMTP服务器格式应为 host:port")return Falseif not user:messagebox.showwarning("错误", "邮箱账户不能为空")return Falseif not password:messagebox.showwarning("错误", "邮箱授权码不能为空")return Falseif not receiver_str:messagebox.showwarning("错误", "接收邮箱不能为空")return False# 验证监控参数try:interval = int(self.interval_entry.get())timeout = int(self.timeout_entry.get())retries = int(self.retry_entry.get())retry_interval = int(self.retry_interval_entry.get())if interval < 5:messagebox.showwarning("错误", "监测间隔不能小于5秒")return Falseif timeout < 1:messagebox.showwarning("错误", "超时时间不能小于1秒")return Falseif retries < 0:messagebox.showwarning("错误", "重试次数不能为负数")return Falseif retry_interval < 1:messagebox.showwarning("错误", "重试间隔不能小于1秒")return Falseexcept ValueError:messagebox.showwarning("错误", "请输入有效的数字")return Falsereturn Truedef save_config(self):"""保存配置"""config = {"targets": [self.ip_tree.item(item, "values")for item in self.ip_tree.get_children()],"smtp_server": self.smtp_server_entry.get(),"email_user": self.email_user_entry.get(),"email_pass": self.email_pass_entry.get(),"email_receiver": self.email_receiver_entry.get(),"interval": self.interval_entry.get(),"timeout": self.timeout_entry.get(),"retry": self.retry_entry.get(),"retry_interval": self.retry_interval_entry.get()}try:with open("config.json", "w", encoding="utf-8") as f:json.dump(config, f, indent=2)self.log_message("配置已保存")messagebox.showinfo("成功", "配置已保存到config.json")except Exception as e:self.log_message(f"保存配置失败: {str(e)}")messagebox.showerror("错误", f"保存配置失败: {str(e)}")def load_config(self):"""加载配置"""try:with open("config.json", encoding="utf-8") as f:config = json.load(f)# 加载目标for item in self.ip_tree.get_children():self.ip_tree.delete(item)for values in config.get("targets", []):self.ip_tree.insert("", tk.END, values=values, tags=("checked" if values[0] == "✓" else "unchecked"))# 加载邮件配置self.smtp_server_entry.delete(0, tk.END)self.smtp_server_entry.insert(0, config.get("smtp_server", ""))self.email_user_entry.delete(0, tk.END)self.email_user_entry.insert(0, config.get("email_user", ""))self.email_pass_entry.delete(0, tk.END)self.email_pass_entry.insert(0, config.get("email_pass", ""))self.email_receiver_entry.delete(0, tk.END)self.email_receiver_entry.insert(0, config.get("email_receiver", ""))# 加载监控参数self.interval_entry.delete(0, tk.END)self.interval_entry.insert(0, config.get("interval", "10"))self.timeout_entry.delete(0, tk.END)self.timeout_entry.insert(0, config.get("timeout", "2"))self.retry_entry.delete(0, tk.END)self.retry_entry.insert(0, config.get("retry", "3"))self.retry_interval_entry.delete(0, tk.END)self.retry_interval_entry.insert(0, config.get("retry_interval", "5"))self.log_message("配置已加载")except FileNotFoundError:self.log_message("未找到配置文件,使用默认配置")except Exception as e:self.log_message(f"加载配置失败: {str(e)}")if __name__ == "__main__":app = IPMonitorApp()app.mainloop()
🎯 适合人群:网络管理员、运维工程师、Python中级开发者
如果觉得项目有用,欢迎Star🌟和Fork!你的支持是我持续优化的动力~
💬 常见问题解答
Q:如何修改每日报告发送时间?
A:修改calculate_next_report_time()
方法中的小时数(默认10:00)
Q:支持监控IPv6地址吗?
A:当前版本需要稍作修改,建议使用ipaddress
库进行验证
Q:最大支持监控多少个IP?
A:理论上无限制,但建议不超过100个以保证性能
📚 延伸阅读:
- 我的专栏[🐍《Python开源项目实战》]