当前位置: 首页 > news >正文

Python 进程间通信:TCP安全加密数据传输

最近在写安全方面的程序,有需求,就做了这些TCP加密数据传输类。
utils.safeUtils的内容详见:

  • SafeObj:Python 高安全性加密数据容器类-CSDN博客
  • SafeKey:Python 高安全性加密密码容器类-CSDN博客

如有任何问题或漏洞欢迎批评指正!不胜感激!

# 代码

1. 服务端

import sys
import tracebackimport os
import socket
from threading import Thread
from typing import Callable, Any
import secrets
import struct
from datetime import datetime as dt,timedelta as tdel
from utils.safeUtils import SafeKey,combine_keys,SafeObj,secure_eraseclass Server:def __init__(self, *,sendCallback: Callable[[str, int, bytes], bytes],host: str = "localhost",port: int = 0,errCallback: Callable[[Exception], Any] = lambda err: print(traceback.format_exc())):self.srv = socket.socket()self._host = hostself._port = portself._sendCallback = sendCallbackself._errCallback = errCallbackself._close = Falseself.srv.bind((host, port))self.srv.listen(16)self.srv.settimeout(1)self._mainThread = Thread(None, self._main, "TCP-Server_Main", daemon=True)self._mainThread.start()@propertydef host(self):return self._host@propertydef port(self):return self._portdef _main(self):while not self._close:try:conn, addr = self.srv.accept()except socket.timeout:continuetry:data = b""while block := conn.recv(1024):data += blockcontent = self._sendCallback(addr[0],addr[1],data)conn.send(content)except socket.error as err:self._errCallback(err)finally:conn.close()def close(self):self.srv.close()self._close = Trueself._mainThread.join()class CryptServer(Server):def __init__(self,*args,**kwargs):self.nonce = []super().__init__(*args,**kwargs)print(self.srv.getsockname())def _main(self):def ATTACK_ALERT(o = None,ts = None,n = None,c = None,r = None,s = True,t = "REPLAY or MAN-IN-THE-MIDDLE ATTACK"):L, R = CF.LIGHTWHITE_EX + CS.BRIGHT, CS.RESET_ALLerr = f'''{CB.BLUE}{L}!!! [SECURITY ALERT] !!!: {t}{R}\n{CB.RED}{L}{"▰" *22} ATTACK {"▰" * 22}{R}\n{"TIMESTAMP(Now/Response): " if s else "ATTACK Count: "}{("/".join(x.strftime("%Y-%m-%dT%H:%M:%S") for x in (o, ts))) if s else str(c)}\n{"NONCE: " if s else "RAM Tamper: "}{n.hex() if s else str(r)}\n{CB.RED}{L}{"▰" * 20} SECURITY ALERT {"▰" * 20}{R}'''.replace(" " * 4, "")os.popen("msg * SECURITY ALERT: ************ has been ATTACKED");raise RuntimeError(err)last_nonce = len(self.nonce)while not self._close:try:conn, addr = self.srv.accept()print(addr)except socket.timeout:continuekey, safeData, cliKey, mainKey, content, data = (None for _ in range(6)) # 初始化(finally要用)try:key = SafeKey(secrets.token_bytes(128))                       # 每次会话的唯一公钥key.use_password(conn.sendall,t = bytes)                                   # 向客户端发送公钥cliKey = conn.recv(128)                                       # 读取客户端私钥ts = conn.recv(20).decode()                                   # [安全校验] 读取时间戳(%Y%m%d%H%M%S%f)ts = dt.strptime(ts,"%Y%m%d%H%M%S%f")                 # [安全校验] 格式化时间戳n = conn.recv(12)                                             # [安全校验] 读取会话唯一 nonce# [安全校验] 检测是否可能为重放if abs((o:=dt.now())-ts).total_seconds()>1or n in self.nonce:ATTACK_ALERT(o = o,ts = ts,n = n)else:self.nonce.append(n)mainKey = combine_keys(key.get_password(bytes),cliKey)            # 使用公钥和私钥派生主密钥secure_erase(cliKey)                                              # 擦除客户端私钥del key,cliKeylength = struct.unpack("!I",conn.recv(4))[0]              # 读取数据长度data = conn.recv(length)safeData = SafeObj.from_encrypted_package(data,mainKey)           # 获取SafeObj对象content = self._sendCallback(addr[0],addr[1],safeData.get_data()) # 从处理函数获取数据encrypted_package = SafeObj(content,mainKey).get_encrypted_package()length = len(encrypted_package)conn.sendall(struct.pack("!I",length))conn.sendall(encrypted_package)                                                 # 发送加密数据secure_erase(bytearray(encrypted_package))                                   # 安全擦除所有中间数据secure_erase(bytearray(data))secure_erase(bytearray(content))print(mainKey.get_password(bytes),"\n",length,"\n",data,"\n",encrypted_package)except Exception as err:self._errCallback(err)finally:conn.close()try:del keyexcept NameError:passtry:del cliKeyexcept NameError:passdel safeData, mainKey, content, datad = len(self.nonce) - last_nonceif d < 0 or d > 10:ATTACK_ALERT(c = d,r = d < 0,t = "EXPLOSIVE ATTACK or RAM TAMPERING")last_nonce = len(self.nonce)conn.close()print("close:",addr)if __name__ == '__main__':def send(host,port,content):return secrets.token_bytes(256)srv = CryptServer(sendCallback = send)srv._mainThread.join()

2. 客户端

import socket
import sys
import timeimport secrets
from datetime import datetime
import struct
from utils.safeUtils import SafeKey, combine_keys, SafeObj, secure_eraseclass Client:def __init__(self):self.cli = socket.socket()self.cli.settimeout(10)def connect(self,host:str,port:int):self.cli.connect((host,port))def request(self,data:bytes) -> bytes:self.cli.send(data)return self.cli.recv(1024)def close(self):self.cli.close()class CryptClient(Client):def __init__(self):super().__init__()def request(self, data: bytes) -> bytes:"""安全加密请求流程:1. 接收服务端公钥2. 生成并发送客户端私钥3. 发送时间戳和随机nonce4. 派生主密钥5. 加密并发送请求数据6. 接收并解密响应参数:data: 要发送的原始数据(字节类型)返回:bytes: 解密后的响应数据"""# 生成客户端私钥(128字节安全随机令牌)cli_key = secrets.token_bytes(128)# 1. 接收服务端公钥(128字节)srv_key = self.cli.recv(128)if len(srv_key) != 128:raise ConnectionError("无效的公钥长度")# 2. 发送客户端私钥self.cli.send(cli_key)# 3. 生成并发送安全参数# 时间戳(精确到微秒,20字节字符串)timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:20]# 随机nonce(12字节)nonce = secrets.token_bytes(12)self.cli.send(timestamp.encode('utf-8'))  # 发送时间戳self.cli.send(nonce)  # 发送随机noncetry:# 4. 派生主密钥(组合服务端公钥和客户端私钥)main_key = combine_keys(srv_key, cli_key)# 5. 加密请求数据# 创建安全对象加密数据safe_request = SafeObj(data, main_key)# 获取完整加密包(包含盐/nonce/标签/加密数据)encrypted_package = safe_request.get_encrypted_package()# 发送长度length = len(encrypted_package)self.cli.sendall(struct.pack("!I",length))# 发送加密数据包self.cli.sendall(encrypted_package)# 6. 接收服务端响应# 接收数据长度length = struct.unpack("!I",self.cli.recv(4))[0]response = self.cli.recv(length)# 解密响应数据safe_response = SafeObj.from_encrypted_package(response, main_key)return safe_response.get_data()finally:# 安全清除所有敏感数据secure_erase(srv_key)secure_erase(cli_key)secure_erase(nonce)if 'main_key' in locals():del main_keyif 'safe_request' in locals():del safe_requestif 'safe_response' in locals():del safe_responseif __name__ == '__main__':data = b"hello world!" * 100cli = CryptClient()cli.connect("localhost", YOU_SERVER_PORT)print(cli.request(data))cli.close()

3. utils.safeUtils

import ctypes
import mmap
import os
import pickle
import random
import secrets
import sys
import hashlib
import hmac
import time
from typing import Any, Optional, Callable, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.hkdf import HKDFfrom utils.Infoget import infogetclass PasswordError(Exception): passclass SafeKey:passclass CryptConfig:erase_count = 7PBKDF2HMAC_iterations = 1000  # 建议1000-100000def secure_erase(buffer) -> None:"""安全清除内存内容(支持多种类型)"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆盖内存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止编译器优化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 处理内存映射对象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)elif isinstance(buffer, str):# 字符串清除 - 创建可变副本mutable = bytearray(buffer.encode('utf-8'))for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0except:passdef combine_keys(key1: bytes, key2: bytes) -> SafeKey:"""安全混合两个字节密钥派生主密钥并返回SafeKey对象参数:key1: 第一个输入密钥 (字节类型)key2: 第二个输入密钥 (字节类型)返回:SafeKey: 包含派生主密钥的安全对象步骤:1. 验证输入密钥有效性2. 使用HKDF混合派生密钥3. 创建SafeKey对象4. 安全清除中间密钥"""# 1. 验证输入密钥if not key1 or len(key1) == 0:raise ValueError("key1 不能为空")if not key2 or len(key2) == 0:raise ValueError("key2 不能为空")try:# 2. 创建可变密钥副本用于安全处理mutable_key1 = bytearray(key1)mutable_key2 = bytearray(key2)# 3. 拼接密钥作为HKDF输入combined_key = mutable_key1 + mutable_key2# 4. 使用HKDF派生主密钥hkdf = HKDF(algorithm=hashes.SHA3_512(),length=32,  # 输出32字节密钥(AES-256兼容)salt=None,  # 不使用盐(输入密钥已足够随机)info=b"SafeKey-Derivation",  # 上下文信息backend=default_backend())# 派生密钥derived_key = hkdf.derive(combined_key)# 5. 创建SafeKey对象safekey = SafeKey(derived_key)return safekeyfinally:# 6. 安全清除所有中间密钥(确保即使异常也执行)if 'mutable_key1' in locals():secure_erase(mutable_key1)  # 使用SafeKey中的安全擦除方法if 'mutable_key2' in locals():secure_erase(mutable_key2)if 'combined_key' in locals():secure_erase(combined_key)if 'derived_key' in locals():secure_erase(derived_key)class SafeKey:def __init__(self, password: Union[str,bytes,bytearray]):"""加固版绝对安全的密钥存储 - 单次使用"""# 1. 将密码转换为可变字节数组if isinstance(password,str):password_bytes = bytearray(password.encode('utf-8'))elif isinstance(password,bytes):password_bytes = bytearray(password)elif isinstance(password,bytearray):password_bytes = passwordelse:password_type = type(password)del passwordraise TypeError(f"unsupported password type: '{password_type}'")# 2. 生成随机内部主密钥 (32字节)self._internal_master_key = secrets.token_bytes(32)# 3. 生成随机noncenonce = secrets.token_bytes(12)# 4. 使用AES-GCM加密密码cipher = Cipher(algorithms.AES(self._internal_master_key),modes.GCM(nonce),backend=default_backend())encryptor = cipher.encryptor()encrypted = encryptor.update(password_bytes) + encryptor.finalize()# 5. 创建加密数据包并添加HMAC校验encrypted_package = nonce + encryptor.tag + encryptedhmac_digest = hmac.new(self._internal_master_key,encrypted_package,hashlib.sha3_256).digest()self._encrypted_data = hmac_digest + encrypted_package# 6. 安全清除原始密码secure_erase(password_bytes)del password_bytesdel password  # 删除引用# 7. 锁定内部主密钥和加密数据在内存中self._locked_master_key = self._lock_in_memory(self._internal_master_key)self._locked_encrypted_data = self._lock_in_memory(self._encrypted_data)# 8. 清除原始密钥引用secure_erase(self._internal_master_key)self._internal_master_key = Nonesecure_erase(self._encrypted_data)self._encrypted_data = None# 9. 标记对象状态self._is_active = Trueself._password_ref = None# 10. 反调试保护self._check_security_environment()def _lock_in_memory(self, data: bytes) -> mmap.mmap:"""将数据安全锁定在内存中,防止交换到磁盘"""size = len(data)# 创建共享内存区域if sys.platform == 'win32':shm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE)else:# 类Unix系统内存保护PROT_READ = 1PROT_WRITE = 2PROT_READWRITE = PROT_READ | PROT_WRITEshm = mmap.mmap(-1, size, prot=PROT_READWRITE)# 复制数据到锁定内存shm.write(data)# 锁定内存防止交换if not self._mlock_memory(shm, size):# 锁定失败时立即清除并终止secure_erase(shm)shm.close()self._terminate_with_error("内存锁定失败")# 安全清除原始数据secure_erase(data)return shmdef _mlock_memory(self, shm: mmap.mmap, size: int) -> bool:"""锁定内存防止交换到磁盘(跨平台实现)"""try:if sys.platform == 'win32':# Windows内存锁定return ctypes.windll.kernel32.VirtualLock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) != 0else:# Unix系统内存锁定libc = ctypes.CDLL(None)return libc.mlock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) == 0except:return Falsedef _check_security_environment(self):"""检查安全环境,防御调试和内存转储"""# 1. 检测调试器if self._is_debugger_present():self._terminate_with_error("检测到调试器 - 安全终止")# 2. 检测虚拟机(可选)if self._is_running_in_vm():self._terminate_with_error("虚拟机环境不安全 - 安全终止")def _is_debugger_present(self) -> bool:"""检测调试器存在"""try:# Windows调试器检测if sys.platform == 'win32':kernel32 = ctypes.windll.kernel32return kernel32.IsDebuggerPresent() != 0# Linux调试器检测elif sys.platform.startswith('linux'):try:with open('/proc/self/status', 'r') as status_file:for line in status_file:if line.startswith('TracerPid:'):tracer_pid = int(line.split(':')[1].strip())return tracer_pid != 0except:pass# 检查LD_PRELOAD劫持if 'LD_PRELOAD' in os.environ:return Trueexcept:passreturn Falsedef _is_running_in_vm(self) -> bool:"""检测是否在虚拟机中运行"""try:# 简单虚拟机检测if sys.platform == 'win32':import winregwith winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"SYSTEM\CurrentControlSet\Control\SystemInformation") as key:system_manufacturer = winreg.QueryValueEx(key, "SystemManufacturer")[0]system_product = winreg.QueryValueEx(key, "SystemProductName")[0]vm_indicators = ["VMware", "Virtual", "Xen", "QEMU", "KVM"]return any(indicator in system_manufacturer or indicator in system_product for indicator in vm_indicators)elif sys.platform.startswith('linux'):# 检查DMI信息dmi_files = ['/sys/class/dmi/id/product_name','/sys/class/dmi/id/sys_vendor']for dmi_file in dmi_files:if os.path.exists(dmi_file):with open(dmi_file, 'r') as f:content = f.read().lower()if 'vmware' in content or 'virtual' in content or 'qemu' in content:return Trueexcept:passreturn Falsedef _terminate_with_error(self, message: str):"""安全终止程序并显示错误消息"""infoget(f"安全错误: {message}",type = "err")# 尝试安全清除内存if hasattr(self, '_locked_master_key'):secure_erase(self._locked_master_key)if hasattr(self, '_locked_encrypted_data'):secure_erase(self._locked_encrypted_data)# 立即终止程序os._exit(1)def __enter__(self):"""进入上下文时获取密码访问对象"""if not self._is_active:raise RuntimeError("SafeKey 已销毁")# 返回自身而不是直接解密密码return selfdef get_password(self,t:type = str) -> Union[str,bytes]:"""安全获取密码(使用后立即清除)"""if not self._is_active:raise RuntimeError("SafeKey 已销毁")# 1. 从锁定内存中获取加密数据self._locked_encrypted_data.seek(0)full_package = self._locked_encrypted_data.read()# 验证数据包长度if len(full_package) < 32 + 12 + 16:  # HMAC(32) + nonce(12) + tag(16)self._terminate_with_error("加密数据包损坏")# 2. 分离HMAC和加密数据hmac_digest = full_package[:32]encrypted_package = full_package[32:]# 3. 从锁定内存中获取主密钥self._locked_master_key.seek(0)master_key = self._locked_master_key.read()# 4. 验证HMACexpected_hmac = hmac.new(master_key,encrypted_package,hashlib.sha3_256).digest()if not hmac.compare_digest(hmac_digest, expected_hmac):self._terminate_with_error("加密数据完整性校验失败")# 5. 解包加密数据nonce = encrypted_package[:12]tag = encrypted_package[12:28]ciphertext = encrypted_package[28:]# 6. 使用AES-GCM解密cipher = Cipher(algorithms.AES(master_key),modes.GCM(nonce, tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(ciphertext) + decryptor.finalize()# 7. 创建可变密码副本password = decrypted.decode("utf-8") if t == str else decrypted# 8. 安全清除临时数据secure_erase(full_package)secure_erase(master_key)secure_erase(decrypted)return passworddef use_password(self, callback: callable,t = str) -> None:"""安全使用密码(自动清除)"""password = self.get_password(t)try:callback(password)finally:# 确保密码被清除if 'password' in locals():secure_erase(bytearray(password))def __exit__(self, exc_type, exc_value, traceback):"""退出上下文时销毁所有敏感数据"""# 安全清除并销毁密码引用if self._password_ref:secure_erase(self._password_ref)try:self._password_ref.close()except:passself._password_ref = None# 安全清除并销毁主密钥if self._locked_master_key:secure_erase(self._locked_master_key)try:self._locked_master_key.close()except:passself._locked_master_key = None# 安全清除并销毁加密数据if self._locked_encrypted_data:secure_erase(self._locked_encrypted_data)try:self._locked_encrypted_data.close()except:passself._locked_encrypted_data = None# 标记对象为已销毁self._is_active = Falsedef __del__(self):"""析构时确保内存安全清除"""if hasattr(self, '_is_active') and self._is_active:self.__exit__(None, None, None)class SafeObj:def __init__(self, data: Any, key: Union[str, 'SafeKey']):"""安全数据保护对象:param data: 需要保护的任意类型数据:param key: 用于加密的密钥(字符串或SafeKey对象)"""# 初始化安全属性self._key = keyself._encrypted_data: bytes = b''self._salt = os.urandom(16)self._nonce = os.urandom(12)self._tag: bytes = b''self._is_active = True# 加密数据self._encrypt(data)def _secure_memzero(self, buffer) -> None:"""安全清除内存内容"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆盖内存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止编译器优化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 处理内存映射对象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)except (ValueError, TypeError, OSError):passdef _get_key_bytes(self) -> bytes:"""从密钥源获取字节形式密钥"""if isinstance(self._key, str):# 字符串密钥直接使用return self._key.encode()elif hasattr(self._key, 'get_password'):# SafeKey对象 - 安全获取密码return self._key.get_password(bytes)else:raise TypeError("不支持的密钥类型")def _derive_key(self, key_length: int = 32) -> bytes:"""派生加密密钥"""# 获取原始密钥字节key_bytes = self._get_key_bytes()# 派生加密密钥kdf = PBKDF2HMAC(algorithm=hashes.SHA3_512(),length=key_length,salt=self._salt,iterations=CryptConfig.PBKDF2HMAC_iterations,backend=default_backend())derived_key = kdf.derive(key_bytes)# 安全清除中间数据self._secure_memzero(key_bytes)return derived_keydef _encrypt(self, data: Any) -> None:"""加密数据并存储在内存中"""# 序列化数据serialized = pickle.dumps(data)try:# 派生加密密钥derived_key = self._derive_key()# 加密数据cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce),backend=default_backend())encryptor = cipher.encryptor()self._encrypted_data = encryptor.update(serialized) + encryptor.finalize()self._tag = encryptor.tagfinally:# 安全清除中间数据self._secure_memzero(serialized)if "derived_key" in locals():self._secure_memzero(derived_key)def get_data(self) -> Any:"""安全获取解密后的原始数据- 返回原始数据类型- 调用者负责安全处理返回的数据"""if not self._is_active:raise RuntimeError("SafeObj 已销毁")# 派生解密密钥derived_key = self._derive_key()try:# 解密数据cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce, self._tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(self._encrypted_data) + decryptor.finalize()# 反序列化数据data = pickle.loads(decrypted)return datafinally:# 安全清除中间数据self._secure_memzero(derived_key)self._secure_memzero(decrypted)def use_data(self, callback: Callable[[Any], None]) -> None:"""安全使用数据(自动清除):param callback: 回调函数,接受解密后的数据作为参数"""data = Nonetry:data = self.get_data()callback(data)finally:# 安全清除数据(如果可能)if data is not None:self._secure_erase_recursive(data)def _secure_erase_recursive(self, obj: Any) -> None:"""递归安全清除数据结构中的敏感内容"""try:if isinstance(obj, (bytes, bytearray)):self._secure_memzero(obj)elif isinstance(obj, str):# 字符串不可变,创建可变副本后清除mutable = bytearray(obj.encode('utf-8'))self._secure_memzero(mutable)elif isinstance(obj, dict):for key, value in obj.items():self._secure_erase_recursive(value)self._secure_erase_recursive(key)elif isinstance(obj, (list, tuple, set)):for item in obj:self._secure_erase_recursive(item)elif hasattr(obj, '__dict__'):# 处理自定义对象for attr in vars(obj):self._secure_erase_recursive(getattr(obj, attr))except:pass@propertydef encrypted_data(self) -> bytes:"""获取加密后的数据(不含盐和nonce)"""return self._encrypted_datadef get_encrypted_package(self) -> bytes:"""获取完整加密包(包含盐、nonce、标签和加密数据)"""return self._salt + self._nonce + self._tag + self._encrypted_data@classmethoddef from_encrypted_package(cls, encrypted_package: bytes, key: Union[str, 'SafeKey']) -> 'SafeObj':"""从加密包创建SafeObj实例"""if len(encrypted_package) < 16 + 12 + 16:raise ValueError("无效的加密包")# 解包数据salt = encrypted_package[:16]nonce = encrypted_package[16:28]tag = encrypted_package[28:44]encrypted_data = encrypted_package[44:]# 创建虚拟实例instance = cls.__new__(cls)instance._key = keyinstance._salt = saltinstance._nonce = nonceinstance._tag = taginstance._encrypted_data = encrypted_datainstance._is_active = Truereturn instancedef __del__(self):"""析构时确保内存安全清除"""if self._is_active:self._is_active = False# 安全清除所有敏感数据self._secure_memzero(self._salt)self._secure_memzero(self._nonce)self._secure_memzero(self._tag)self._secure_memzero(self._encrypted_data)

# 分析

1. 连接步骤流程

初始化阶段:
        服务端启动时生成随机公钥(128字节),绑定端口监听连接。
        客户端连接服务端后,服务端立即发送公钥。
密钥交换阶段:
        客户端:
                生成随机私钥(128字节)并发送给服务端。
                发送精确到微秒的当前时间戳(20字节)和随机 nonce(12字节)。
        服务端:
                校验时间戳(与当前时间差≤1秒)和 nonce(是否重复),防止重放攻击
                组合服务端公钥 + 客户端私钥,派生主密钥(HKDF-SHA3-512)
数据传输阶段:
        客户端:
                用主密钥加密数据(AES-GCM + PBKDF2HMAC-SHA3-512)。
                发送加密数据包(盐 + nonce + 认证标签 + 密文)。
        服务端:
                用主密钥解密数据,处理业务逻辑。
                加密响应数据并返回相同结构的加密包。
连接终止:
        双方安全擦除会话密钥、临时密钥等敏感数据
        关闭连接。


2. 加密方式

密钥派生:
        主密钥派生:
                服务端公钥(128B) + 客户端私钥(128B) → HKDF-SHA3-512 → 32字节主密钥。
        数据加密密钥派生:
                主密钥 + 随机盐(16B) → PBKDF2HMAC-SHA3-512 → 32字节数据密钥。
数据加密:
        算法:
                AES-256-GCM(认证加密)。
        数据包结构:
                16B盐 + 12B nonce + 16B认证标签 + 变长密文。
完整性保护:
        时间戳 + nonce 防重放。
        GCM模式提供数据完整性和来源认证。


3. 安全设计

前向安全性:
        每次会话使用临时密钥(公钥/私钥对),会话结束立即擦除。
抗重放攻击:
        时间戳校验(1秒窗口) + nonce唯一性检测(服务端维护nonce列表)。
内存安全:
        敏感数据擦除:密钥、中间值用随机数据覆盖7次(secure_erase)。
        内存锁定:密钥存储在不可交换的内存区域(mlock)。
环境安全检测:
        反调试:检查调试器附加(IsDebuggerPresent/TracerPid)。
        反Hook:验证关键函数地址完整性。
        反虚拟机:检测VM特征(注册表/DMI信息)。
密钥管理:
        SafeKey对象:密钥全程加密存储,仅在使用时短暂解密。
        单次使用:密钥解密后立即擦除,不可复用。


4. 不足

长连接支持(暂未实现):
        当前设计每次请求新建连接,可扩展为k复用连接的会话模式。

速度(本机测试最大速度仅2Mbps = 256KB/s):

        加密操作大量耗时,CPU密集型操作。

适用性:
        仅支持单次、慢速通信(进程间加密数据传输),不能用于远程通信

可能的漏洞:

        中间人 (MITM) 攻击(当前无身份验证)
        内存耗尽攻击(恶意请求使服务器nonce列表无限增长)
        时间窗口攻击(攻击者可快速重放合法请求)
        内存安全残留风险(Python内部对象清除延时)
        拒绝服务 (DoS) 漏洞(Nonce 洪水、密钥派生消耗、连接耗尽等)
        侧信道攻击(时间、内存、物理监听等)


制作不易,感谢大家的支持!如有任何问题或建议欢迎评论!

http://www.dtcms.com/a/286660.html

相关文章:

  • H3CNE小小综合实验
  • 模拟数据生成---使用NGS数据模拟软件VarBen
  • SLM343CK-DG Sillumin数明半导体高性能LED驱动芯片 抗干扰+耐高温 车载照明专用
  • 二叉树(建立 + 遍历 + 拓展)
  • 外部DLL创建及使用
  • 灵巧手(具身智能入门十一)
  • if (a == 1 a == 2 a == 3)返回true的问题思考
  • NVIDIA 驱动安装失败问题排查与解决(含离线 GCC 工具链安装全过程)
  • MySQL组内拼接group_concat函数
  • MyUI会员排名VcMember组件文档
  • Java与Vue技术搭建的SRM招标采购管理系统,提供源码,涵盖招标、投标、评标全流程,助力企业高效规范采购管理
  • spring-cloud微服务部署-feign服务间调用
  • NFS读写性能评估与优化指南(下)
  • 二叉搜索树:高效的查找结构
  • 自学力扣:最长连续序列
  • python-pptx 的layout 布局
  • CCF编程能力等级认证GESP—C++1级—20250628
  • 扫地机器人,需要回归第一性原理
  • Docker安装教程
  • Visual Studio C++编译器优化等级详解:配置、原理与编码实践
  • 第七章 愿景07 实习小宇
  • LLC电源设计专题--详细讲解
  • Web开发 02
  • 贪吃蛇(C++实现)
  • 美客多跨境电商平台怎么开店?美客多入驻门槛有哪些?
  • 目标框的位置以及大小的分布
  • 进入当前正在运行的 Docker 容器
  • 应急响应-Windows资源监视器
  • 易用性强短视频矩阵平台源头开发商推荐
  • leetcode:单词接龙[图广搜][无权图找最短路径]