Python 实战:内网渗透中的信息收集自动化脚本(10)
用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。
最近在开发一款内网自动化的渗透工具,没有多少创新,但是也是从使用工具到开发工具的一个重要的转折点了,这里我主要介绍的是信息收集模块和AI分析模块,在这款工具我引用了AI工具,只是简单的信息分析和进行对话交流,下面介绍一下这两个模块,这里只是简单分享一下思路,代码无法使用的,也不是最终代码,其它模块还在开发中,下面是信息收集的模块
import time
import os
from command_utils import CommandUtils# 系统命令集(Windows/Linux)
commands = {"Windows": {"网络信息": "ipconfig /all","用户列表": "net user","本地管理员组": "net localgroup administrators","进程列表": "tasklist","计划任务": "schtasks /query /fo list","开放端口": "netstat -ano"},"Linux": {"网络信息": "ip addr && cat /etc/resolv.conf","用户列表": "cat /etc/passwd && cat /etc/group","sudo权限用户": "grep 'sudo' /etc/group","进程列表": "ps aux","计划任务": "crontab -l && ls -la /etc/cron.d","开放端口": "netstat -tulpn"}
}def collect_system_info(target_ip, os_type, folder):"""收集系统信息并保存到统一目录:param folder: 统一结果文件夹路径(从GUI传入)"""if os_type not in commands:print(f"不支持的系统类型:{os_type},无法收集信息")returnprint(f"\n开始收集{os_type}系统信息(保存到:{folder})...")for category, cmd in commands[os_type].items():print(f"正在执行:{category}(命令:{cmd})")# 执行命令cmd_result = CommandUtils.execute_command(cmd)output = cmd_result["output"] if cmd_result["success"] else cmd_result["error"]# 保存到统一目录(文件名:分类.txt)file_path = os.path.join(folder, f"{category}.txt")try:with open(file_path, "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n执行命令:{cmd}\n执行时间:{time.strftime('%Y-%m-%d %H:%M:%S')}\n\n{output}")print(f"✅ {category}已保存到:{file_path}")except Exception as e:print(f"❌ 保存{category}失败:{str(e)}")print(f"\n所有系统信息已保存至统一目录:{folder}")if __name__ == "__main__":# 本地测试用(实际使用由GUI调用)target_ip = input("请输入目标IP地址:")time_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())folder = os.path.join("results_history", f"{time_str}_{target_ip}_system_info")os.makedirs(folder, exist_ok=True)# 识别系统类型print("\n正在识别系统类型...")os_info = CommandUtils.detect_os_type(target_ip)os_type = os_info["os_type"]ping_result = os_info["ping_result"]print(f"识别结果:{os_type}")# 保存系统类型with open(os.path.join(folder, "系统类型.txt"), "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n系统类型:{os_type}\nping结果:{ping_result}\n识别时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")# 收集系统信息collect_system_info(target_ip, os_type, folder)
1. 导入模块部分
import time
import os
from command_utils import CommandUtils
time
:用于处理时间相关操作(如获取当前时间、格式化时间)os
:用于处理文件和目录操作(如创建文件夹、拼接路径)CommandUtils
:自定义的命令工具类(推测包含执行系统命令、检测系统类型等功能)
2. 系统命令集定义
commands = {"Windows": {"网络信息": "ipconfig /all","用户列表": "net user","本地管理员组": "net localgroup administrators","进程列表": "tasklist","计划任务": "schtasks /query /fo list","开放端口": "netstat -ano"},"Linux": {"网络信息": "ip addr && cat /etc/resolv.conf","用户列表": "cat /etc/passwd && cat /etc/group","sudo权限用户": "grep 'sudo' /etc/group","进程列表": "ps aux","计划任务": "crontab -l && ls -la /etc/cron.d","开放端口": "netstat -tulpn"}
}
- 这是一个嵌套字典,按系统类型(Windows/Linux)分类存储需要执行的命令
- 键是信息类别(如 "网络信息"、"用户列表"),值是对应系统下获取该信息的命令
- 不同系统获取同一类信息的命令不同(如 Windows 用
ipconfig
,Linux 用ip addr
)
3. 核心函数:collect_system_info
def collect_system_info(target_ip, os_type, folder):"""收集系统信息并保存到统一目录:param folder: 统一结果文件夹路径(从GUI传入)"""# 检查系统类型是否支持if os_type not in commands:print(f"不支持的系统类型:{os_type},无法收集信息")returnprint(f"\n开始收集{os_type}系统信息(保存到:{folder})...")# 遍历当前系统的所有命令for category, cmd in commands[os_type].items():print(f"正在执行:{category}(命令:{cmd})")# 执行命令(调用自定义工具类的方法)cmd_result = CommandUtils.execute_command(cmd)# 提取命令输出(成功取output,失败取error)output = cmd_result["output"] if cmd_result["success"] else cmd_result["error"]# 拼接保存文件的路径(文件夹+类别.txt)file_path = os.path.join(folder, f"{category}.txt")try:# 写入文件(包含目标IP、命令、时间、输出结果)with open(file_path, "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n执行命令:{cmd}\n执行时间:{time.strftime('%Y-%m-%d %H:%M:%S')}\n\n{output}")print(f"✅ {category}已保存到:{file_path}")except Exception as e:print(f"❌ 保存{category}失败:{str(e)}")print(f"\n所有系统信息已保存至统一目录:{folder}")
- 功能:根据系统类型执行对应命令,将结果保存到指定文件夹
- 流程:
- 检查系统类型是否在支持列表中(Windows/Linux)
- 循环遍历该系统的所有命令(如 "网络信息" 对应的命令)
- 调用
CommandUtils.execute_command
执行命令,获取结果 - 将结果写入以 "类别.txt" 命名的文件(包含详细元信息:目标 IP、命令、时间等)
- 处理文件写入的异常(如权限问题),并打印成功 / 失败信息
4. 主程序入口
if __name__ == "__main__":# 本地测试用(实际使用由GUI调用)target_ip = input("请输入目标IP地址:")# 生成包含时间和IP的文件夹名称(确保唯一性)time_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())folder = os.path.join("results_history", f"{time_str}_{target_ip}_system_info")# 创建文件夹(如果已存在则不报错)os.makedirs(folder, exist_ok=True)# 识别系统类型(调用自定义工具类的方法)print("\n正在识别系统类型...")os_info = CommandUtils.detect_os_type(target_ip)os_type = os_info["os_type"]ping_result = os_info["ping_result"]print(f"识别结果:{os_type}")# 保存系统类型信息到文件with open(os.path.join(folder, "系统类型.txt"), "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n系统类型:{os_type}\nping结果:{ping_result}\n识别时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")# 调用核心函数收集系统信息collect_system_info(target_ip, os_type, folder)
- 功能:提供本地测试入口(实际使用时由 GUI 调用)
- 流程:
- 获取用户输入的目标 IP
- 生成唯一的结果文件夹(路径包含时间戳和 IP,避免重名)
- 调用
CommandUtils.detect_os_type
检测目标系统类型(Windows/Linux) - 将系统类型信息保存到 "系统类型.txt"
- 调用
collect_system_info
函数开始收集详细系统信息
接下来是AI分析模块,主要思路是将信息收集模块转到AI分析模块进行信息分析,代码如下
import os
import re
import logging
from datetime import datetime
import json
from typing import Dict, List, Optional, Generator
from command_utils import AIUtils# 配置日志,便于调试分析过程
logging.basicConfig(filename="ai_analyzer.log",level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s"
)class AIAnalyzer:# 风险等级关键字映射(用于后续结果解析)RISK_LEVELS = {"高危": 3, "中危": 2, "低危": 1}# 在 ai_analyzer.py 的 AIAnalyzer 类中添加以下静态方法@staticmethoddef get_initial_messages(result_folder: str) -> (Optional[List[Dict]], str):"""生成初始对话消息(基于结果文件夹数据)"""# 读取结果文件夹数据data, msg = AIAnalyzer.read_module_data(result_folder)if not data:return None, f"读取数据失败:{msg}"# 生成初始分析promptinitial_prompt = AIAnalyzer.generate_initial_prompt(data)# 新增:校验prompt是否为空if not initial_prompt.strip(): # 排除纯空白字符串return None, "生成的初始分析提示为空,请检查是否有有效数据"# 构造初始对话消息(确保content非空)messages = [{"role": "user","content": initial_prompt.strip() # 移除首尾空白}]return messages, "成功生成初始消息"@staticmethoddef analyze(messages: List[Dict]) -> Dict:"""调用AI工具进行分析(基于消息列表)返回:AI分析结果(包含success、stream/error字段)"""# 复用AIUtils中的AI交互逻辑return AIUtils.analyze_with_ai(messages)@staticmethoddef read_module_data(result_folder: str) -> (Optional[Dict], str):"""读取统一文件夹下的所有模块数据(系统信息、nmap结果等)增强:添加大文件处理、数据分类和日志记录"""data: Dict = {"files": {}, "metadata": {}} # 结构化存储:文件数据+元信息if not os.path.exists(result_folder):err_msg = f"统一结果目录不存在: {result_folder}"logging.error(err_msg)return None, err_msg# 记录目录元信息data["metadata"]["folder_path"] = result_folderdata["metadata"]["collect_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")data["metadata"]["file_count"] = 0try:for filename in os.listdir(result_folder):file_path = os.path.join(result_folder, filename)if not os.path.isfile(file_path):continue # 跳过子目录# 按文件类型分类(系统信息/扫描结果/交互历史)file_type = "other"if filename.startswith("系统"):file_type = "system_info"elif filename.startswith("nmap"):file_type = "scan_result"elif "AI交互" in filename:file_type = "chat_history"# 读取文件内容(大文件限制读取前2000字符,避免内存占用过高)try:with open(file_path, "r", encoding="utf-8", errors="ignore") as f:content = f.read(2000) # 限制读取长度if os.path.getsize(file_path) > 2000:content += "\n[文件内容过长,已截断显示]"except Exception as e:err_msg = f"读取文件 {filename} 失败: {str(e)}"logging.warning(err_msg)content = f"[读取失败: {str(e)}]"# 存储文件数据(含类型标识)data["files"][filename] = {"type": file_type,"content": content,"size": os.path.getsize(file_path)}data["metadata"]["file_count"] += 1logging.info(f"成功读取目录数据,共{data['metadata']['file_count']}个文件")return data, "成功"except Exception as e:err_msg = f"目录数据读取失败: {str(e)}"logging.error(err_msg)return None, err_msg@staticmethoddef _extract_key_info(data: Dict) -> Dict:"""提取关键信息(辅助方法),减少冗余数据干扰AI分析"""key_info = {"os_type": "未知","open_ports": [],"users": [],"critical_processes": [],"risk_hints": []}# 从系统类型文件提取操作系统信息for fname, fdata in data["files"].items():if fdata["type"] == "system_info" and "系统类型" in fname:os_match = re.search(r"系统类型:(.+?)\n", fdata["content"])if os_match:key_info["os_type"] = os_match.group(1).strip()# 从nmap结果提取开放端口elif fdata["type"] == "scan_result":port_matches = re.findall(r"(\d+/tcp)\s+open\s+(.+?)\s", fdata["content"])key_info["open_ports"] = [f"{p} ({s})" for p, s in port_matches]# 从系统信息提取用户列表elif fdata["type"] == "system_info" and "用户" in fname:user_matches = re.findall(r"用户名:(.+?)\s", fdata["content"])key_info["users"] = list(set(user_matches)) # 去重# 生成风险提示(基于关键信息)if "22/tcp" in [p.split()[0] for p in key_info["open_ports"]]:key_info["risk_hints"].append("SSH端口(22)开放,需确认是否使用强认证")if len(key_info["users"]) > 5:key_info["risk_hints"].append(f"发现{len(key_info['users'])}个用户,需检查是否有冗余账号")return key_info@staticmethoddef generate_initial_prompt(data: Dict) -> str:key_info = AIAnalyzer._extract_key_info(data)# 生成文件列表摘要file_summary = "\n".join([f"- {fname}({fdata['type']},{fdata['size']}字节)"for fname, fdata in data["files"].items()])# 确保提示词至少包含基础内容(避免空值)if not file_summary and not key_info:return "请基于提供的系统数据(可能为空),生成安全分析报告。"# 正常生成提示词return f"""基于以下收集到的目标系统数据,生成一份安全分析报告:1. 关键信息摘要:- 操作系统类型:{key_info['os_type']}- 开放端口:{key_info['open_ports'] if key_info['open_ports'] else '无'}- 风险提示:{key_info['risk_hints'] if key_info['risk_hints'] else '无'}2. 可用文件列表:{file_summary if file_summary else '无有效文件'}请分析潜在风险、漏洞点,并给出修复建议。""".strip()
1. 导入模块与日志配置
import os
import re
import logging
from datetime import datetime
import json
from typing import Dict, List, Optional, Generator
from command_utils import AIUtils# 配置日志,便于调试分析过程
logging.basicConfig(filename="ai_analyzer.log",level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s"
)
- 导入的模块:
os
/re
:处理文件路径和正则匹配(提取关键信息)logging
:记录日志(便于追踪分析过程中的问题)datetime
:处理时间(记录数据收集时间)typing
:提供类型提示(增强代码可读性和 IDE 支持)AIUtils
:自定义 AI 工具类(提供与 AI 交互的底层能力)
- 日志配置:指定日志文件为
ai_analyzer.log
,级别为INFO
(记录一般信息),格式包含时间、级别和消息内容。
2. AIAnalyzer
类定义与基础属性
class AIAnalyzer:# 风险等级关键字映射(用于后续结果解析)RISK_LEVELS = {"高危": 3, "中危": 2, "低危": 1}
RISK_LEVELS
:静态字典,将风险等级文字(如 "高危")映射为数字(3/2/1),方便后续对风险等级进行量化处理(如排序、统计)。
3. get_initial_messages
方法
@staticmethod
def get_initial_messages(result_folder: str) -> (Optional[List[Dict]], str):"""生成初始对话消息(基于结果文件夹数据)"""# 读取结果文件夹数据data, msg = AIAnalyzer.read_module_data(result_folder)if not data:return None, f"读取数据失败:{msg}"# 生成初始分析promptinitial_prompt = AIAnalyzer.generate_initial_prompt(data)# 新增:校验prompt是否为空if not initial_prompt.strip(): # 排除纯空白字符串return None, "生成的初始分析提示为空,请检查是否有有效数据"# 构造初始对话消息(确保content非空)messages = [{"role": "user","content": initial_prompt.strip() # 移除首尾空白}]return messages, "成功生成初始消息"
- 功能:生成给 AI 的初始对话消息(用户角色的提示词)。
- 流程:
- 调用
read_module_data
读取结果文件夹中的数据。 - 若读取失败,返回错误信息;若成功,调用
generate_initial_prompt
生成提示词。 - 校验提示词是否有效(非空),无效则返回错误。
- 构造标准格式的消息列表(包含
role
和content
),返回消息和成功提示。
- 调用
4. analyze
方法
@staticmethod
def analyze(messages: List[Dict]) -> Dict:"""调用AI工具进行分析(基于消息列表)返回:AI分析结果(包含success、stream/error字段)"""# 复用AIUtils中的AI交互逻辑return AIUtils.analyze_with_ai(messages)
- 功能:作为与 AI 交互的入口,接收对话消息列表,调用底层工具进行分析。
- 逻辑:直接复用
AIUtils.analyze_with_ai
方法(隐藏了 AI 调用的细节,如 API 请求、参数处理等),返回 AI 的分析结果(通常包含是否成功、分析内容或错误信息)。
5. read_module_data
方法
@staticmethod
def read_module_data(result_folder: str) -> (Optional[Dict], str):"""读取统一文件夹下的所有模块数据(系统信息、nmap结果等)增强:添加大文件处理、数据分类和日志记录"""data: Dict = {"files": {}, "metadata": {}} # 结构化存储:文件数据+元信息if not os.path.exists(result_folder):err_msg = f"统一结果目录不存在: {result_folder}"logging.error(err_msg)return None, err_msg# 记录目录元信息data["metadata"]["folder_path"] = result_folderdata["metadata"]["collect_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")data["metadata"]["file_count"] = 0try:for filename in os.listdir(result_folder):file_path = os.path.join(result_folder, filename)if not os.path.isfile(file_path):continue # 跳过子目录# 按文件类型分类(系统信息/扫描结果/交互历史)file_type = "other"if filename.startswith("系统"):file_type = "system_info"elif filename.startswith("nmap"):file_type = "scan_result"elif "AI交互" in filename:file_type = "chat_history"# 读取文件内容(大文件限制读取前2000字符,避免内存占用过高)try:with open(file_path, "r", encoding="utf-8", errors="ignore") as f:content = f.read(2000) # 限制读取长度if os.path.getsize(file_path) > 2000:content += "\n[文件内容过长,已截断显示]"except Exception as e:err_msg = f"读取文件 {filename} 失败: {str(e)}"logging.warning(err_msg)content = f"[读取失败: {str(e)}]"# 存储文件数据(含类型标识)data["files"][filename] = {"type": file_type,"content": content,"size": os.path.getsize(file_path)}data["metadata"]["file_count"] += 1logging.info(f"成功读取目录数据,共{data['metadata']['file_count']}个文件")return data, "成功"except Exception as e:err_msg = f"目录数据读取失败: {str(e)}"logging.error(err_msg)return None, err_msg
- 功能:读取结果文件夹中的所有文件数据,进行结构化处理和分类。
- 核心逻辑:
- 初始化数据结构:
files
存储文件详情,metadata
存储目录元信息(路径、收集时间、文件数)。 - 检查文件夹是否存在,不存在则记录错误并返回。
- 遍历文件夹中的文件,跳过子目录,按文件名规则分类(系统信息 / 扫描结果 / 交互历史)。
- 读取文件内容(大文件截断为前 2000 字符,避免内存溢出),处理读取错误(记录日志并标记失败)。
- 将文件信息(类型、内容、大小)存入
files
,更新文件计数,最终返回结构化数据。
- 初始化数据结构:
6. _extract_key_info
方法
@staticmethod
def _extract_key_info(data: Dict) -> Dict:"""提取关键信息(辅助方法),减少冗余数据干扰AI分析"""key_info = {"os_type": "未知","open_ports": [],"users": [],"critical_processes": [],"risk_hints": []}# 从系统类型文件提取操作系统信息for fname, fdata in data["files"].items():if fdata["type"] == "system_info" and "系统类型" in fname:os_match = re.search(r"系统类型:(.+?)\n", fdata["content"])if os_match:key_info["os_type"] = os_match.group(1).strip()# 从nmap结果提取开放端口elif fdata["type"] == "scan_result":port_matches = re.findall(r"(\d+/tcp)\s+open\s+(.+?)\s", fdata["content"])key_info["open_ports"] = [f"{p} ({s})" for p, s in port_matches]# 从系统信息提取用户列表elif fdata["type"] == "system_info" and "用户" in fname:user_matches = re.findall(r"用户名:(.+?)\s", fdata["content"])key_info["users"] = list(set(user_matches)) # 去重# 生成风险提示(基于关键信息)if "22/tcp" in [p.split()[0] for p in key_info["open_ports"]]:key_info["risk_hints"].append("SSH端口(22)开放,需确认是否使用强认证")if len(key_info["users"]) > 5:key_info["risk_hints"].append(f"发现{len(key_info['users'])}个用户,需检查是否有冗余账号")return key_info
- 功能:从
read_module_data
返回的原始数据中提取关键信息(减少冗余,突出重点),为生成 AI 提示词做准备。 - 提取逻辑:
- 初始化关键信息结构(操作系统、开放端口、用户列表等)。
- 遍历文件数据,通过正则表达式提取:
- 从系统类型文件提取操作系统类型(如 Windows/Linux)。
- 从扫描结果文件提取开放端口(如
22/tcp (ssh)
)。 - 从用户信息文件提取用户名(去重处理)。
- 基于提取的信息生成初步风险提示(如 SSH 端口开放、用户数量过多)。
7. generate_initial_prompt
方法
@staticmethod
def generate_initial_prompt(data: Dict) -> str:key_info = AIAnalyzer._extract_key_info(data)# 生成文件列表摘要file_summary = "\n".join([f"- {fname}({fdata['type']},{fdata['size']}字节)"for fname, fdata in data["files"].items()])# 确保提示词至少包含基础内容(避免空值)if not file_summary and not key_info:return "请基于提供的系统数据(可能为空),生成安全分析报告。"# 正常生成提示词return f"""基于以下收集到的目标系统数据,生成一份安全分析报告:1. 关键信息摘要:- 操作系统类型:{key_info['os_type']}- 开放端口:{key_info['open_ports'] if key_info['open_ports'] else '无'}- 风险提示:{key_info['risk_hints'] if key_info['risk_hints'] else '无'}2. 可用文件列表:{file_summary if file_summary else '无有效文件'}请分析潜在风险、漏洞点,并给出修复建议。""".strip()
- 功能:生成给 AI 的初始提示词(指导 AI 进行安全分析的指令)。
- 逻辑:
- 调用
_extract_key_info
获取关键信息摘要。 - 生成文件列表摘要(包含文件名、类型、大小)。
- 组合成自然语言提示词,要求 AI 基于提供的信息分析潜在风险、漏洞点并给出修复建议。
- 处理边界情况(如无数据时,生成基础提示词避免空输入)。
- 调用
以下是工具的一部分情况
信息收集中我集成了nmap工具,系统命令所收集的信息都放在一个文档中
然后再看看AI分析模块
能够进行信息的分析,也能够进行简单的交互,目前其功能还在完善当中,还有很多的瑕疵。