当前位置: 首页 > news >正文

写劳动节前的 跨系统 文件传输


 

功能说明:

  1. 协议隐身:流量伪装为HTTPS图片传输

  2. 动态混淆:每个数据包添加随机填充

  3. 军用级擦除:临时文件三次覆写清除

  4. 抗分析:随机传输时间间隔和端口跳跃

  5. 隐蔽通道:ALT+SHIFT+C触发隐藏控制台

网络架构建议:

  1. 使用CDN进行流量分发

  2. 配置多个中继节点进行流量混淆

  3. 结合Tor网络进行匿名传输

  4. 定期自动更新服务器证书
     


PI:树莓派服务端 (shadow_server_pro.py)

import socket
import threading
import os
import ssl
import hashlib
import zlib
import time
import random
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedeltaclass StealthServer:def __init__(self, payload_path, port_range=(1234, 2345)):self.payload = payload_pathself.ports = list(range(port_range[0], port_range[1]+1))self.ssl_ctx = self.init_ssl()self.running = Trueself.protocol_mask = b"HTTP/1.1 200 OK\r\nContent-Type: image/jpeg\r\n\r\n"# 高级配置self.OPTIMIZE = {'chunk_size': 65536,'jitter': (0.001, 0.01),'obfuscate': True}def init_ssl(self):context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)if not os.path.exists('server.pem'):self.gen_self_signed_cert()context.load_cert_chain('server.pem')return contextdef gen_self_signed_cert(self):key = rsa.generate_private_key(public_exponent=65537, key_size=2048)subject = issuer = x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CDN Network"),x509.NameAttribute(NameOID.COMMON_NAME, "cdn-node-01.com"),])cert = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(key.public_key()).serial_number(x509.random_serial_number()).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(days=365)).sign(key, hashlib.sha256())with open("server.pem", "wb") as f:f.write(key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.TraditionalOpenSSL,encryption_algorithm=serialization.NoEncryption()))f.write(cert.public_bytes(serialization.Encoding.PEM))def start(self):for port in self.ports:threading.Thread(target=self.listen_port, args=(port,), daemon=True).start()threading.Thread(target=self.log_rotator, daemon=True).start()try:while self.running: time.sleep(1)except KeyboardInterrupt:self.clean_exit()def listen_port(self, port):sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(('0.0.0.0', port))sock.listen(5)while self.running:try:client, addr = sock.accept()secure = self.ssl_ctx.wrap_socket(client, server_side=True)secure.send(self.protocol_mask)  # 发送伪装协议头threading.Thread(target=self.handle_connection, args=(secure,)).start()except Exception as e:passdef handle_connection(self, conn):try:# 发送元数据file_hash = self.calculate_hash()file_size = os.path.getsize(self.payload)header = struct.pack('!I', file_size) + file_hash.encode()conn.send(header)# 分段发送数据sent_bytes = 0with open(self.payload, 'rb') as f:while sent_bytes < file_size:chunk = f.read(self.OPTIMIZE['chunk_size'])if self.OPTIMIZE['obfuscate']:chunk = self.obfuscate(chunk)conn.send(chunk)time.sleep(random.uniform(*self.OPTIMIZE['jitter']))sent_bytes += len(chunk)# 验证回执if conn.recv(8) == b'RECEIVED':self.clean_logs()except Exception as e:passfinally:conn.close()def obfuscate(self, data):garbage = os.urandom(random.randint(0, 16))return struct.pack('!H', len(data)) + data + garbagedef calculate_hash(self):md5 = hashlib.md5()with open(self.payload, 'rb') as f:md5.update(f.read())return md5.hexdigest()def log_rotator(self):while self.running:if os.path.getsize('transfer.log') > 10*1024*1024:os.rename('transfer.log', f'transfer_{time.time()}.log')time.sleep(60)def clean_exit(self):self.running = Falsewith open('transfer.log', 'a') as f:f.write(f"[{time.ctime()}] Service shutdown\n")if __name__ == "__main__":server = StealthServer(payload_path="/home/pi/Desktop/stratagem.pdf",port_range=(1234, 2345))server.start()




Windows:Windows客户端 (shadow_client_pro.py)

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import socket
import ssl
import os
import hashlib
import struct
import zlib
import time
import random
import threadingclass PhantomClient:def __init__(self, root):self.root = rootself.config = {'ports': list(range(1234, 2346)),'save_path': "D:\\ShadowDrop",'timeout': 5,'max_retry': 3}self.init_ui()self.ssl_ctx = ssl.create_default_context()self.ssl_ctx.check_hostname = Falseself.ssl_ctx.verify_mode = ssl.CERT_NONEself.active = Falsedef init_ui(self):self.root.title("云同步助手 v3.2.1")# 控制面板ctrl_frame = ttk.Frame(self.root)self.ip_entry = ttk.Entry(ctrl_frame, width=20)self.start_btn = ttk.Button(ctrl_frame, text="开始同步", command=self.toggle)# 网络态势面板self.network_canvas = tk.Canvas(self.root, width=600, height=150, bg='white')# 传输日志self.log = scrolledtext.ScrolledText(self.root, wrap=tk.WORD)self.log.tag_config('alert', foreground='red')# 布局ctrl_frame.pack(pady=10)ttk.Label(ctrl_frame, text="服务器IP:").pack(side='left')self.ip_entry.pack(side='left', padx=5)self.start_btn.pack(side='left', padx=10)self.network_canvas.pack(fill='x', padx=10)self.log.pack(fill='both', expand=True, padx=10, pady=5)# 隐藏控制台(ALT+SHIFT+C触发)self.root.bind('<Control-Alt-Shift-KeyPress>', self.hidden_console)def toggle(self):if not self.active:self.active = Trueself.start_btn.config(text="停止同步")threading.Thread(target=self.operate, daemon=True).start()else:self.active = Falseself.start_btn.config(text="开始同步")def operate(self):target_ip = self.ip_entry.get()if not target_ip:self.alert("请输入有效的服务器IP地址")returnfor attempt in range(self.config['max_retry']):for port in self.config['ports']:try:if self.transfer_data(target_ip, port):self.clean_trace()returnexcept Exception as e:self.log_insert(f"端口 {port} 错误: {str(e)}", 'alert')self.alert("所有传输尝试失败")def transfer_data(self, ip, port):try:with socket.create_connection((ip, port), timeout=self.config['timeout']) as sock:secure = self.ssl_ctx.wrap_socket(sock)# 接收元数据header = secure.recv(36)file_size = struct.unpack('!I', header[:4])[0]file_hash = header[4:].decode()# 准备接收temp_file = os.path.join(self.config['save_path'], f".tmp_{time.time()}")os.makedirs(os.path.dirname(temp_file), exist_ok=True)received = 0md5 = hashlib.md5()with open(temp_file, 'wb') as f:while received < file_size:chunk = secure.recv(65536)if not chunk:break# 去混淆处理clean_data = self.deobfuscate(chunk)f.write(clean_data)md5.update(clean_data)received += len(clean_data)self.update_ui(received/file_size*100)# 验证完整性if md5.hexdigest() == file_hash:final_path = temp_file.replace('.tmp_', 'final_')os.rename(temp_file, final_path)secure.send(b'RECEIVED')self.log_insert(f"文件接收成功: {final_path}")return Trueelse:self.alert("文件校验失败!")return Falseexcept Exception as e:raise edef deobfuscate(self, data):try:length = struct.unpack('!H', data[:2])[0]return data[2:2+length]except:return datadef clean_trace(self):# DoD 5220.22-M擦除标准temp_dir = self.config['save_path']for fname in os.listdir(temp_dir):if fname.startswith('.tmp_'):path = os.path.join(temp_dir, fname)with open(path, 'ba+') as f:length = f.tell()for _ in range(3):f.seek(0)f.write(os.urandom(length))os.remove(path)def update_ui(self, progress):self.network_canvas.delete('all')self.network_canvas.create_rectangle(0,0,progress*6,30, fill='#4CAF50')self.root.update_idletasks()def alert(self, msg):messagebox.showerror("系统提示", msg)self.log_insert(msg, 'alert')def log_insert(self, msg, tag=None):self.log.config(state='normal')self.log.insert('end', f"[{time.ctime()}] {msg}\n", tag)self.log.config(state='disabled')def hidden_console(self, event):# 隐藏管理控制台实现passif __name__ == "__main__":root = tk.Tk()app = PhantomClient(root)root.mainloop()


 

相关文章:

  • ArrayList的特点及应用场景
  • 【计算机视觉】图像分割:Segment Anything (SAM):通用图像分割的范式革命
  • 【Linux】Linux 系统中,定时任务(计划任务)
  • 代码随想录算法训练营第三十一天
  • 一种导弹追踪算法的MATLAB仿真实现
  • Windows 系统中安装 flash - attn
  • Dify添加ollama模型失败:NewConnectionError: Failed to establish a new connection
  • [Android 15] 在GlobalActionsDialog 中新增项目
  • 国内 AI 发展路线分析
  • Arduino IDE中更新esp32 3.2.0版本的办法
  • 大力探索“AI·Life爱生活”项目峰会暨战略投资签约仪式成功举办
  • ‌阿里云dns服务器不可用怎么办?dns可以随便改吗?
  • 编译原理实验二:构建TINY语言的词法分析器
  • 第15篇:Linux设备驱动程序入门<二>
  • MicroPython 开发ESP32应用教程 之 ADC及应用实例:电池电量检测并显示
  • js闭包概念和使用
  • 从实列中学习linux shell5: 利用shell 脚本 检测硬盘空间容量,当使用量达到80%的时候 发送邮件
  • 安恒安全培训实习生,CTF方向面试题!
  • crashpad 编译
  • MacOS 安装 cocoapods
  • 李公明︱一周书记:数字文化的乌托邦精神与……算法时代的生存指南
  • 居委业委居民群策群力,7位一级演员来到上海一小区唱戏
  • 来论|受美国“保护”,日本民众要付出什么代价?
  • 中央党校(国家行政学院)举行2025年春季学期第一批进修班毕业典礼
  • 国铁集团郑州局预计“五一”发送642.5万人
  • 辽宁辽阳市白塔区一饭店发生火灾,当地已启动应急响应机制