当前位置: 首页 > news >正文

Python 实战:内网渗透中的信息收集自动化脚本(10)

用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。

最近在开发一款内网自动化的渗透工具,没有多少创新,但是也是从使用工具到开发工具的一个重要的转折点了,这里我主要介绍的是信息收集模块和AI分析模块,在这款工具我引用了AI工具,只是简单的信息分析和进行对话交流,下面介绍一下这两个模块,这里只是简单分享一下思路,代码无法使用的,也不是最终代码,其它模块还在开发中,下面是信息收集的模块

import time
import os
from command_utils import CommandUtils# 系统命令集(Windows/Linux)
commands = {"Windows": {"网络信息": "ipconfig /all","用户列表": "net user","本地管理员组": "net localgroup administrators","进程列表": "tasklist","计划任务": "schtasks /query /fo list","开放端口": "netstat -ano"},"Linux": {"网络信息": "ip addr && cat /etc/resolv.conf","用户列表": "cat /etc/passwd && cat /etc/group","sudo权限用户": "grep 'sudo' /etc/group","进程列表": "ps aux","计划任务": "crontab -l && ls -la /etc/cron.d","开放端口": "netstat -tulpn"}
}def collect_system_info(target_ip, os_type, folder):"""收集系统信息并保存到统一目录:param folder: 统一结果文件夹路径(从GUI传入)"""if os_type not in commands:print(f"不支持的系统类型:{os_type},无法收集信息")returnprint(f"\n开始收集{os_type}系统信息(保存到:{folder})...")for category, cmd in commands[os_type].items():print(f"正在执行:{category}(命令:{cmd})")# 执行命令cmd_result = CommandUtils.execute_command(cmd)output = cmd_result["output"] if cmd_result["success"] else cmd_result["error"]# 保存到统一目录(文件名:分类.txt)file_path = os.path.join(folder, f"{category}.txt")try:with open(file_path, "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n执行命令:{cmd}\n执行时间:{time.strftime('%Y-%m-%d %H:%M:%S')}\n\n{output}")print(f"✅ {category}已保存到:{file_path}")except Exception as e:print(f"❌ 保存{category}失败:{str(e)}")print(f"\n所有系统信息已保存至统一目录:{folder}")if __name__ == "__main__":# 本地测试用(实际使用由GUI调用)target_ip = input("请输入目标IP地址:")time_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())folder = os.path.join("results_history", f"{time_str}_{target_ip}_system_info")os.makedirs(folder, exist_ok=True)# 识别系统类型print("\n正在识别系统类型...")os_info = CommandUtils.detect_os_type(target_ip)os_type = os_info["os_type"]ping_result = os_info["ping_result"]print(f"识别结果:{os_type}")# 保存系统类型with open(os.path.join(folder, "系统类型.txt"), "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n系统类型:{os_type}\nping结果:{ping_result}\n识别时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")# 收集系统信息collect_system_info(target_ip, os_type, folder)

1. 导入模块部分

import time
import os
from command_utils import CommandUtils
  • time:用于处理时间相关操作(如获取当前时间、格式化时间)
  • os:用于处理文件和目录操作(如创建文件夹、拼接路径)
  • CommandUtils:自定义的命令工具类(推测包含执行系统命令、检测系统类型等功能)

2. 系统命令集定义

commands = {"Windows": {"网络信息": "ipconfig /all","用户列表": "net user","本地管理员组": "net localgroup administrators","进程列表": "tasklist","计划任务": "schtasks /query /fo list","开放端口": "netstat -ano"},"Linux": {"网络信息": "ip addr && cat /etc/resolv.conf","用户列表": "cat /etc/passwd && cat /etc/group","sudo权限用户": "grep 'sudo' /etc/group","进程列表": "ps aux","计划任务": "crontab -l && ls -la /etc/cron.d","开放端口": "netstat -tulpn"}
}
  • 这是一个嵌套字典,按系统类型(Windows/Linux)分类存储需要执行的命令
  • 键是信息类别(如 "网络信息"、"用户列表"),值是对应系统下获取该信息的命令
  • 不同系统获取同一类信息的命令不同(如 Windows 用ipconfig,Linux 用ip addr

3. 核心函数:collect_system_info

def collect_system_info(target_ip, os_type, folder):"""收集系统信息并保存到统一目录:param folder: 统一结果文件夹路径(从GUI传入)"""# 检查系统类型是否支持if os_type not in commands:print(f"不支持的系统类型:{os_type},无法收集信息")returnprint(f"\n开始收集{os_type}系统信息(保存到:{folder})...")# 遍历当前系统的所有命令for category, cmd in commands[os_type].items():print(f"正在执行:{category}(命令:{cmd})")# 执行命令(调用自定义工具类的方法)cmd_result = CommandUtils.execute_command(cmd)# 提取命令输出(成功取output,失败取error)output = cmd_result["output"] if cmd_result["success"] else cmd_result["error"]# 拼接保存文件的路径(文件夹+类别.txt)file_path = os.path.join(folder, f"{category}.txt")try:# 写入文件(包含目标IP、命令、时间、输出结果)with open(file_path, "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n执行命令:{cmd}\n执行时间:{time.strftime('%Y-%m-%d %H:%M:%S')}\n\n{output}")print(f"✅ {category}已保存到:{file_path}")except Exception as e:print(f"❌ 保存{category}失败:{str(e)}")print(f"\n所有系统信息已保存至统一目录:{folder}")
  • 功能:根据系统类型执行对应命令,将结果保存到指定文件夹
  • 流程:
    1. 检查系统类型是否在支持列表中(Windows/Linux)
    2. 循环遍历该系统的所有命令(如 "网络信息" 对应的命令)
    3. 调用CommandUtils.execute_command执行命令,获取结果
    4. 将结果写入以 "类别.txt" 命名的文件(包含详细元信息:目标 IP、命令、时间等)
    5. 处理文件写入的异常(如权限问题),并打印成功 / 失败信息

4. 主程序入口

if __name__ == "__main__":# 本地测试用(实际使用由GUI调用)target_ip = input("请输入目标IP地址:")# 生成包含时间和IP的文件夹名称(确保唯一性)time_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())folder = os.path.join("results_history", f"{time_str}_{target_ip}_system_info")# 创建文件夹(如果已存在则不报错)os.makedirs(folder, exist_ok=True)# 识别系统类型(调用自定义工具类的方法)print("\n正在识别系统类型...")os_info = CommandUtils.detect_os_type(target_ip)os_type = os_info["os_type"]ping_result = os_info["ping_result"]print(f"识别结果:{os_type}")# 保存系统类型信息到文件with open(os.path.join(folder, "系统类型.txt"), "w", encoding="utf-8") as f:f.write(f"目标IP:{target_ip}\n系统类型:{os_type}\nping结果:{ping_result}\n识别时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")# 调用核心函数收集系统信息collect_system_info(target_ip, os_type, folder)
  • 功能:提供本地测试入口(实际使用时由 GUI 调用)
  • 流程:
    1. 获取用户输入的目标 IP
    2. 生成唯一的结果文件夹(路径包含时间戳和 IP,避免重名)
    3. 调用CommandUtils.detect_os_type检测目标系统类型(Windows/Linux)
    4. 将系统类型信息保存到 "系统类型.txt"
    5. 调用collect_system_info函数开始收集详细系统信息

接下来是AI分析模块,主要思路是将信息收集模块转到AI分析模块进行信息分析,代码如下

import os
import re
import logging
from datetime import datetime
import json
from typing import Dict, List, Optional, Generator
from command_utils import AIUtils# 配置日志,便于调试分析过程
logging.basicConfig(filename="ai_analyzer.log",level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s"
)class AIAnalyzer:# 风险等级关键字映射(用于后续结果解析)RISK_LEVELS = {"高危": 3, "中危": 2, "低危": 1}# 在 ai_analyzer.py 的 AIAnalyzer 类中添加以下静态方法@staticmethoddef get_initial_messages(result_folder: str) -> (Optional[List[Dict]], str):"""生成初始对话消息(基于结果文件夹数据)"""# 读取结果文件夹数据data, msg = AIAnalyzer.read_module_data(result_folder)if not data:return None, f"读取数据失败:{msg}"# 生成初始分析promptinitial_prompt = AIAnalyzer.generate_initial_prompt(data)# 新增:校验prompt是否为空if not initial_prompt.strip():  # 排除纯空白字符串return None, "生成的初始分析提示为空,请检查是否有有效数据"# 构造初始对话消息(确保content非空)messages = [{"role": "user","content": initial_prompt.strip()  # 移除首尾空白}]return messages, "成功生成初始消息"@staticmethoddef analyze(messages: List[Dict]) -> Dict:"""调用AI工具进行分析(基于消息列表)返回:AI分析结果(包含success、stream/error字段)"""# 复用AIUtils中的AI交互逻辑return AIUtils.analyze_with_ai(messages)@staticmethoddef read_module_data(result_folder: str) -> (Optional[Dict], str):"""读取统一文件夹下的所有模块数据(系统信息、nmap结果等)增强:添加大文件处理、数据分类和日志记录"""data: Dict = {"files": {}, "metadata": {}}  # 结构化存储:文件数据+元信息if not os.path.exists(result_folder):err_msg = f"统一结果目录不存在: {result_folder}"logging.error(err_msg)return None, err_msg# 记录目录元信息data["metadata"]["folder_path"] = result_folderdata["metadata"]["collect_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")data["metadata"]["file_count"] = 0try:for filename in os.listdir(result_folder):file_path = os.path.join(result_folder, filename)if not os.path.isfile(file_path):continue  # 跳过子目录# 按文件类型分类(系统信息/扫描结果/交互历史)file_type = "other"if filename.startswith("系统"):file_type = "system_info"elif filename.startswith("nmap"):file_type = "scan_result"elif "AI交互" in filename:file_type = "chat_history"# 读取文件内容(大文件限制读取前2000字符,避免内存占用过高)try:with open(file_path, "r", encoding="utf-8", errors="ignore") as f:content = f.read(2000)  # 限制读取长度if os.path.getsize(file_path) > 2000:content += "\n[文件内容过长,已截断显示]"except Exception as e:err_msg = f"读取文件 {filename} 失败: {str(e)}"logging.warning(err_msg)content = f"[读取失败: {str(e)}]"# 存储文件数据(含类型标识)data["files"][filename] = {"type": file_type,"content": content,"size": os.path.getsize(file_path)}data["metadata"]["file_count"] += 1logging.info(f"成功读取目录数据,共{data['metadata']['file_count']}个文件")return data, "成功"except Exception as e:err_msg = f"目录数据读取失败: {str(e)}"logging.error(err_msg)return None, err_msg@staticmethoddef _extract_key_info(data: Dict) -> Dict:"""提取关键信息(辅助方法),减少冗余数据干扰AI分析"""key_info = {"os_type": "未知","open_ports": [],"users": [],"critical_processes": [],"risk_hints": []}# 从系统类型文件提取操作系统信息for fname, fdata in data["files"].items():if fdata["type"] == "system_info" and "系统类型" in fname:os_match = re.search(r"系统类型:(.+?)\n", fdata["content"])if os_match:key_info["os_type"] = os_match.group(1).strip()# 从nmap结果提取开放端口elif fdata["type"] == "scan_result":port_matches = re.findall(r"(\d+/tcp)\s+open\s+(.+?)\s", fdata["content"])key_info["open_ports"] = [f"{p} ({s})" for p, s in port_matches]# 从系统信息提取用户列表elif fdata["type"] == "system_info" and "用户" in fname:user_matches = re.findall(r"用户名:(.+?)\s", fdata["content"])key_info["users"] = list(set(user_matches))  # 去重# 生成风险提示(基于关键信息)if "22/tcp" in [p.split()[0] for p in key_info["open_ports"]]:key_info["risk_hints"].append("SSH端口(22)开放,需确认是否使用强认证")if len(key_info["users"]) > 5:key_info["risk_hints"].append(f"发现{len(key_info['users'])}个用户,需检查是否有冗余账号")return key_info@staticmethoddef generate_initial_prompt(data: Dict) -> str:key_info = AIAnalyzer._extract_key_info(data)# 生成文件列表摘要file_summary = "\n".join([f"- {fname}({fdata['type']},{fdata['size']}字节)"for fname, fdata in data["files"].items()])# 确保提示词至少包含基础内容(避免空值)if not file_summary and not key_info:return "请基于提供的系统数据(可能为空),生成安全分析报告。"# 正常生成提示词return f"""基于以下收集到的目标系统数据,生成一份安全分析报告:1. 关键信息摘要:- 操作系统类型:{key_info['os_type']}- 开放端口:{key_info['open_ports'] if key_info['open_ports'] else '无'}- 风险提示:{key_info['risk_hints'] if key_info['risk_hints'] else '无'}2. 可用文件列表:{file_summary if file_summary else '无有效文件'}请分析潜在风险、漏洞点,并给出修复建议。""".strip()

1. 导入模块与日志配置

import os
import re
import logging
from datetime import datetime
import json
from typing import Dict, List, Optional, Generator
from command_utils import AIUtils# 配置日志,便于调试分析过程
logging.basicConfig(filename="ai_analyzer.log",level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s"
)
  • 导入的模块:
    • os/re:处理文件路径和正则匹配(提取关键信息)
    • logging:记录日志(便于追踪分析过程中的问题)
    • datetime:处理时间(记录数据收集时间)
    • typing:提供类型提示(增强代码可读性和 IDE 支持)
    • AIUtils:自定义 AI 工具类(提供与 AI 交互的底层能力)
  • 日志配置:指定日志文件为ai_analyzer.log,级别为INFO(记录一般信息),格式包含时间、级别和消息内容。

2. AIAnalyzer类定义与基础属性

class AIAnalyzer:# 风险等级关键字映射(用于后续结果解析)RISK_LEVELS = {"高危": 3, "中危": 2, "低危": 1}
  • RISK_LEVELS:静态字典,将风险等级文字(如 "高危")映射为数字(3/2/1),方便后续对风险等级进行量化处理(如排序、统计)。

3. get_initial_messages方法

@staticmethod
def get_initial_messages(result_folder: str) -> (Optional[List[Dict]], str):"""生成初始对话消息(基于结果文件夹数据)"""# 读取结果文件夹数据data, msg = AIAnalyzer.read_module_data(result_folder)if not data:return None, f"读取数据失败:{msg}"# 生成初始分析promptinitial_prompt = AIAnalyzer.generate_initial_prompt(data)# 新增:校验prompt是否为空if not initial_prompt.strip():  # 排除纯空白字符串return None, "生成的初始分析提示为空,请检查是否有有效数据"# 构造初始对话消息(确保content非空)messages = [{"role": "user","content": initial_prompt.strip()  # 移除首尾空白}]return messages, "成功生成初始消息"
  • 功能:生成给 AI 的初始对话消息(用户角色的提示词)。
  • 流程:
    1. 调用read_module_data读取结果文件夹中的数据。
    2. 若读取失败,返回错误信息;若成功,调用generate_initial_prompt生成提示词。
    3. 校验提示词是否有效(非空),无效则返回错误。
    4. 构造标准格式的消息列表(包含rolecontent),返回消息和成功提示。

4. analyze方法

@staticmethod
def analyze(messages: List[Dict]) -> Dict:"""调用AI工具进行分析(基于消息列表)返回:AI分析结果(包含success、stream/error字段)"""# 复用AIUtils中的AI交互逻辑return AIUtils.analyze_with_ai(messages)
  • 功能:作为与 AI 交互的入口,接收对话消息列表,调用底层工具进行分析。
  • 逻辑:直接复用AIUtils.analyze_with_ai方法(隐藏了 AI 调用的细节,如 API 请求、参数处理等),返回 AI 的分析结果(通常包含是否成功、分析内容或错误信息)。

5. read_module_data方法

@staticmethod
def read_module_data(result_folder: str) -> (Optional[Dict], str):"""读取统一文件夹下的所有模块数据(系统信息、nmap结果等)增强:添加大文件处理、数据分类和日志记录"""data: Dict = {"files": {}, "metadata": {}}  # 结构化存储:文件数据+元信息if not os.path.exists(result_folder):err_msg = f"统一结果目录不存在: {result_folder}"logging.error(err_msg)return None, err_msg# 记录目录元信息data["metadata"]["folder_path"] = result_folderdata["metadata"]["collect_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")data["metadata"]["file_count"] = 0try:for filename in os.listdir(result_folder):file_path = os.path.join(result_folder, filename)if not os.path.isfile(file_path):continue  # 跳过子目录# 按文件类型分类(系统信息/扫描结果/交互历史)file_type = "other"if filename.startswith("系统"):file_type = "system_info"elif filename.startswith("nmap"):file_type = "scan_result"elif "AI交互" in filename:file_type = "chat_history"# 读取文件内容(大文件限制读取前2000字符,避免内存占用过高)try:with open(file_path, "r", encoding="utf-8", errors="ignore") as f:content = f.read(2000)  # 限制读取长度if os.path.getsize(file_path) > 2000:content += "\n[文件内容过长,已截断显示]"except Exception as e:err_msg = f"读取文件 {filename} 失败: {str(e)}"logging.warning(err_msg)content = f"[读取失败: {str(e)}]"# 存储文件数据(含类型标识)data["files"][filename] = {"type": file_type,"content": content,"size": os.path.getsize(file_path)}data["metadata"]["file_count"] += 1logging.info(f"成功读取目录数据,共{data['metadata']['file_count']}个文件")return data, "成功"except Exception as e:err_msg = f"目录数据读取失败: {str(e)}"logging.error(err_msg)return None, err_msg
  • 功能:读取结果文件夹中的所有文件数据,进行结构化处理和分类。
  • 核心逻辑:
    1. 初始化数据结构:files存储文件详情,metadata存储目录元信息(路径、收集时间、文件数)。
    2. 检查文件夹是否存在,不存在则记录错误并返回。
    3. 遍历文件夹中的文件,跳过子目录,按文件名规则分类(系统信息 / 扫描结果 / 交互历史)。
    4. 读取文件内容(大文件截断为前 2000 字符,避免内存溢出),处理读取错误(记录日志并标记失败)。
    5. 将文件信息(类型、内容、大小)存入files,更新文件计数,最终返回结构化数据。

6. _extract_key_info方法

@staticmethod
def _extract_key_info(data: Dict) -> Dict:"""提取关键信息(辅助方法),减少冗余数据干扰AI分析"""key_info = {"os_type": "未知","open_ports": [],"users": [],"critical_processes": [],"risk_hints": []}# 从系统类型文件提取操作系统信息for fname, fdata in data["files"].items():if fdata["type"] == "system_info" and "系统类型" in fname:os_match = re.search(r"系统类型:(.+?)\n", fdata["content"])if os_match:key_info["os_type"] = os_match.group(1).strip()# 从nmap结果提取开放端口elif fdata["type"] == "scan_result":port_matches = re.findall(r"(\d+/tcp)\s+open\s+(.+?)\s", fdata["content"])key_info["open_ports"] = [f"{p} ({s})" for p, s in port_matches]# 从系统信息提取用户列表elif fdata["type"] == "system_info" and "用户" in fname:user_matches = re.findall(r"用户名:(.+?)\s", fdata["content"])key_info["users"] = list(set(user_matches))  # 去重# 生成风险提示(基于关键信息)if "22/tcp" in [p.split()[0] for p in key_info["open_ports"]]:key_info["risk_hints"].append("SSH端口(22)开放,需确认是否使用强认证")if len(key_info["users"]) > 5:key_info["risk_hints"].append(f"发现{len(key_info['users'])}个用户,需检查是否有冗余账号")return key_info
  • 功能:从read_module_data返回的原始数据中提取关键信息(减少冗余,突出重点),为生成 AI 提示词做准备。
  • 提取逻辑:
    1. 初始化关键信息结构(操作系统、开放端口、用户列表等)。
    2. 遍历文件数据,通过正则表达式提取:
      • 从系统类型文件提取操作系统类型(如 Windows/Linux)。
      • 从扫描结果文件提取开放端口(如22/tcp (ssh))。
      • 从用户信息文件提取用户名(去重处理)。
    3. 基于提取的信息生成初步风险提示(如 SSH 端口开放、用户数量过多)。

7. generate_initial_prompt方法

@staticmethod
def generate_initial_prompt(data: Dict) -> str:key_info = AIAnalyzer._extract_key_info(data)# 生成文件列表摘要file_summary = "\n".join([f"- {fname}({fdata['type']},{fdata['size']}字节)"for fname, fdata in data["files"].items()])# 确保提示词至少包含基础内容(避免空值)if not file_summary and not key_info:return "请基于提供的系统数据(可能为空),生成安全分析报告。"# 正常生成提示词return f"""基于以下收集到的目标系统数据,生成一份安全分析报告:1. 关键信息摘要:- 操作系统类型:{key_info['os_type']}- 开放端口:{key_info['open_ports'] if key_info['open_ports'] else '无'}- 风险提示:{key_info['risk_hints'] if key_info['risk_hints'] else '无'}2. 可用文件列表:{file_summary if file_summary else '无有效文件'}请分析潜在风险、漏洞点,并给出修复建议。""".strip()
  • 功能:生成给 AI 的初始提示词(指导 AI 进行安全分析的指令)。
  • 逻辑:
    1. 调用_extract_key_info获取关键信息摘要。
    2. 生成文件列表摘要(包含文件名、类型、大小)。
    3. 组合成自然语言提示词,要求 AI 基于提供的信息分析潜在风险、漏洞点并给出修复建议。
    4. 处理边界情况(如无数据时,生成基础提示词避免空输入)。

以下是工具的一部分情况

信息收集中我集成了nmap工具,系统命令所收集的信息都放在一个文档中

然后再看看AI分析模块

能够进行信息的分析,也能够进行简单的交互,目前其功能还在完善当中,还有很多的瑕疵。

http://www.dtcms.com/a/394354.html

相关文章:

  • leetcode 206 反转链表
  • AI智能的网站SEO优化服务商
  • 生产者客户端
  • Puppeteer 在爬取电商 JavaScript 页面的使用
  • 2015/12 JLPT听力原文 问题四
  • 【设计模式】备忘录模式
  • STM32_07_按键
  • 基于迁移学习和SqueezeNet的滚动轴承故障诊断(MATLAB)
  • 实战项目(十二:《AI画质增强与LED驱动控制:一场关于‘创造’与‘还原’的对话》):从LED冬奥会、奥运会及春晚等大屏,到手机小屏,快来挖一挖里面都有什么
  • 开发避坑指南(52):IDEA 2025.1.3 顶部显示类完整路径的设置方法
  • 安装Qt新之后出现两本帮助手册
  • Rust_2025:阶段1:day7.2unsafe , 链接相关
  • 【论文速递】2025年第15周(Apr-06-12)(Robotics/Embodied AI/LLM)
  • 设计模式简单说明:责任链与规则树
  • 自动备份脚本 mysql_hourly_backup.sh
  • SuperGLUE:自然语言理解的挑战与进步
  • 线程安全的单例模式、自旋锁,以及读者写者问题
  • U盘长期插在电脑上的影响
  • Windows 系统部署 PaddleOCR —— 基于 EPGF 架构
  • 数据一致性指的是什么?如何实现数据一致性?
  • 初识消息队列的世界
  • Python快速入门专业版(三十八):Python字典:键值对结构的增删改查与进阶用法
  • SpringCloudOAuth2+JWT:微服务统⼀认证方案
  • LeetCode 分类刷题:2517. 礼盒的最大甜蜜度
  • 深度学习优化器进阶:从SGD到AdamW,不同优化器的适用场景
  • C++ 之 【C++的IO流】
  • truffle学习笔记
  • 现代循环神经网络
  • vlc播放NV12原始视频数据
  • ThinkPHP8学习篇(七):数据库(三)