当前位置: 首页 > news >正文

导出wireshark的FLV RAW数据并进行分析

import struct
import json
import argparse
from collections import namedtuple
from typing import List, Tuple, Dict, Optional, Any

# 定义数据结构
FLVHeader = namedtuple('FLVHeader', ['signature', 'version', 'flags', 'header_size'])
FLVTagHeader = namedtuple('FLVTagHeader', [
'prev_tag_size', 'tag_type', 'data_size', 'timestamp', 'timestamp_ext', 'stream_id'
])

MetadataHeader = namedtuple('MetadataHeader', [
'AMF1_type', 'size', 'onMetadata', "AMF2_type", 'arr_size'
])

MetadataArray = namedtuple('MetadataArray', [
'duration', 'width', 'height', 'videodatarate', 'framerate', 'videocodecid', 'audiodatarate', 
'audiosamplerate', 'audiosamplesize', 'stereo', 'audiocodecid', 'encoder', 'filesize', "custom_fields"
])

AudioHeader = namedtuple('AudioHeader', [
'format', 'rate', 'size', 'type', 'aac_packet_type', 'audio_object_type',
'sampling_index', 'channel_config'
])
VideoHeader = namedtuple('VideoHeader', [
'frame_type', 'codec_id', 'avc_packet_type', 'composition_time'
])
NALUnit = namedtuple('NALUnit', ['type', 'ref_idc', 'size', 'data'])

# 常量定义
AUDIO_FORMATS = {
0: "Linear PCM, platform endian",
1: "ADPCM",
2: "MP3",
3: "Linear PCM, little endian",
4: "Nellymoser 16kHz mono",
5: "Nellymoser 8kHz mono",
6: "Nellymoser",
7: "G.711 A-law",
8: "G.711 mu-law",
10: "AAC",
11: "Speex",
14: "MP3 8kHz",
15: "Device-specific sound"
}

SAMPLE_RATES = {
0: "5.5kHz",
1: "11kHz",
2: "22kHz",
3: "44kHz"
}

FRAME_TYPES = {
1: "Keyframe",
2: "Inter frame",
3: "Disposable inter frame",
4: "Generated keyframe",
5: "Video info/command frame"
}

CODEC_IDS = {
1: "JPEG",
2: "Sorenson H.263",
3: "Screen video",
4: "On2 VP6",
5: "On2 VP6 with alpha",
6: "Screen video v2",
7: "AVC"
}

AVC_PACKET_TYPES = {
0: "AVC sequence header",
1: "AVC NALU",
2: "AVC end of sequence"
}

NALU_TYPES = {
1: "Coded slice of a non-IDR picture",
5: "Coded slice of an IDR picture",
6: "Supplemental enhancement information (SEI)",
7: "Sequence parameter set",
8: "Picture parameter set",
9: "Access unit delimiter",
10: "End of sequence",
11: "End of stream",
12: "Filler data",
13: "Sequence parameter set extension",
14: "Prefix NAL unit",
15: "Subset sequence parameter set",
19: "Coded slice of an auxiliary coded picture without partitioning",
20: "Coded slice extension"
}

NALU_REF_IDC = {
0: "Disposable",
1: "Lowest",
2: "Low",
3: "High"
}

SCRIPT_DATA_VALUE = {
0: "Number",
1: "Boolean",
2: "String",
3: "Object",
4: "MovieClip (reserved, not supported)",
5: "Null",
6: "Undefined",
7: "Reference",
8: "ECMA array",
9: "Object end marker",
10: "Strict array",
11: "Date",
12: "Long string"
}

class FLVProcessor:

def __init__(self, input_file: str):
self.input_file = input_file
self.header: Optional[FLVHeader] = None
self.tags: List[Tuple] = []
self.raw_data: bytes = b''

def read_file(self) -> None:
#读取文件内容
with open(self.input_file, 'rb') as f:
self.raw_data = f.read()

def _process_http_response(self, data: bytes) -> bytes:
#处理HTTP响应,自动判断是否为chunk编码
if not data.startswith(b'HTTP/1.1 200 OK'):
return data

# 查找HTTP头结束位置
http_header_end = data.find(b'\r\n\r\n') + 4
if http_header_end < 4:  # 没有找到完整的HTTP头
return data

# 检查是否为chunk编码
headers = data[:http_header_end-4].split(b'\r\n')
is_chunked = any(b'Transfer-Encoding: chunked' in h for h in headers)

# 提取主体数据
body_data = data[http_header_end:]

# 如果是chunk编码,则解码
if is_chunked:
return self._decode_chunked_data(body_data)
return body_data

def _decode_chunked_data(self, data: bytes) -> bytes:
#解码HTTP chunk传输编码的数据
result = b''
pos = 0
while pos < len(data):
# 查找块大小行结束位置
chunk_size_end = data.find(b'\r\n', pos)
if chunk_size_end == -1:
break

# 解析块大小(16进制)
chunk_size_str = data[pos:chunk_size_end].decode('ascii').strip()
try:
chunk_size = int(chunk_size_str, 16)
except ValueError:
break

# 0大小的块表示结束
if chunk_size == 0:
break

# 移动到块数据开始位置
chunk_start = chunk_size_end + 2
chunk_end = chunk_start + chunk_size

# 检查是否有足够的空间
if chunk_end > len(data):
break

# 添加块数据到结果
result += data[chunk_start:chunk_end]

# 移动到下一个块开始位置(跳过CRLF)
pos = chunk_end + 2

return result

def parse(self) -> None:
#解析FLV文件
if not self.raw_data:
self.read_file()

# 处理可能的HTTP响应
flv_data = self._process_http_response(self.raw_data)

# 验证FLV签名
if not flv_data.startswith(b'FLV'):
raise ValueError("不是有效的FLV文件, 签名不匹配")

# 解析FLV头
self.header = self._parse_flv_header(flv_data[:9])
pos = 9

# 解析标签
while pos + 15 < len(flv_data):
# 解析前一个标签大小
prev_tag_size = struct.unpack('>I', flv_data[pos:pos+4])[0]
pos += 4

# 解析标签头
tag_type = flv_data[pos]
data_size = struct.unpack('>I', b'\x00' + flv_data[pos+1:pos+4])[0]
timestamp = struct.unpack('>I', flv_data[pos+4:pos+7] + b'\x00')[0] >> 8
timestamp_ext = flv_data[pos+7]
stream_id = struct.unpack('>I', b'\x00' + flv_data[pos+8:pos+11])[0]

tag_header = FLVTagHeader(
prev_tag_size=prev_tag_size,
tag_type=tag_type,
data_size=data_size,
timestamp=timestamp,
timestamp_ext=timestamp_ext,
stream_id=stream_id
)

pos += 11

# 检查数据大小是否有效
if pos + data_size > len(flv_data):
print(f"警告: 数据大小({data_size})超出文件范围,终止解析")
break

# 读取标签数据
tag_data = flv_data[pos:pos+data_size]
pos += data_size

# 根据标签类型解析
if tag_type == 8:  # 音频
audio_info = self._parse_audio_tag(tag_data)
self.tags.append(('audio', tag_header, audio_info, tag_data))
elif tag_type == 9:  # 视频
video_info, nal_units, video_body = self._parse_video_tag(tag_data)
self.tags.append(('video', tag_header, (video_info, nal_units, video_body), tag_data))
elif tag_type == 18:  # 脚本数据/Metadata
metadata_info, metadata_array = self._parse_flv_metadate(tag_data)
self.tags.append(('metadata', tag_header, (metadata_info, metadata_array), tag_data))
# self.tags.append(('metadata', tag_header, tag_data, tag_data))
else:
self.tags.append(('unknown', tag_header, tag_data, tag_data))

def _parse_flv_header(self, header_data: bytes) -> FLVHeader:
#解析FLV头
signature = header_data[:3].decode('ascii')
version = header_data[3]
flags = header_data[4]
header_size = struct.unpack('>I', header_data[5:9])[0]
return FLVHeader(signature, version, flags, header_size)

    def _parse_flv_metadate(self , data: bytes) -> Tuple[Optional[MetadataHeader], Optional[MetadataArray]]:
#解析Metadata
if not data:
return None, None
pos = 0

AMF1_type = data[pos]
pos += 1

        # 字符串长度
name_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2

# 检查剩余数据是否足够
if len(data) < pos + name_size + 1:
return None, None

        # 提取字符串
onMetadata = data[pos:pos+name_size].decode('utf-8')
pos += name_size

        # AMF2类型
AMF2_type = data[pos]
pos += 1

        metadata_dict = {}

        if AMF2_type == 0x08:  # 0x08 ECMA数组
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict, pos = self._parse_amf_object(data, pos)

        elif AMF2_type == 0x03:  # 0x03 对象
metadata_dict, pos = self._parse_amf_object(data, pos)
arr_size = len(metadata_dict)

        elif AMF2_type == 0x0A:  # 0x0A 严格数组
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict, pos = self._parse_amf_strict_array(data, pos, arr_size)

        elif AMF2_type == 0x05:  # 0x05 Null
pass  # metadata_dict保持为空

        elif AMF2_type == 0x0B:  # 0x0B Date
timestamp = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
timezone = struct.unpack('>h', data[pos:pos+2])[0]
pos += 2
metadata_dict["date"] = f"Date({timestamp}, tz={timezone})"

        elif AMF2_type == 0x0C:  # 0x0C 长字符串
str_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict["long_string"] = data[pos:pos+str_size].decode('utf-8', errors='replace')
pos += str_size

        elif AMF2_type in [0x06, 0x0D]:  # 0x06 或 0x0D
metadata_dict["unsupported"] = None

        else:  # 其他未实现类型(如0x04 MovieClip)
raise ValueError(f"不支持的AMF2类型: {hex(AMF2_type)}")

        header = MetadataHeader(
AMF1_type=AMF1_type,
size=name_size,
onMetadata=onMetadata,
AMF2_type=AMF2_type,
arr_size=arr_size
)

        standard_data = {}
custom_fields = {}
standard_fields = {
'duration': 'duration',
'width': 'width',
'height': 'height',
'videodatarate': 'videodatarate',
'framerate': 'framerate',
'videocodecid': 'videocodecid',
'audiodatarate': 'audiodatarate',
'audiosamplerate': 'audiosamplerate',
'audiosamplesize': 'audiosamplesize',
'stereo': 'stereo',
'audiocodecid': 'audiocodecid',
'encoder': 'encoder',
'filesize': 'filesize'
}

        for key_in_data, mapped_key in standard_fields.items():
if key_in_data in metadata_dict:
standard_data[mapped_key] = metadata_dict[key_in_data]

        for key, value in metadata_dict.items():
if key not in standard_fields:
custom_fields[key] = value

        metadata = MetadataArray(
duration=standard_data.get('duration'),
width=standard_data.get('width'),
height=standard_data.get('height'),
videodatarate=standard_data.get('videodatarate'),
framerate=standard_data.get('framerate'),
videocodecid=standard_data.get('videocodecid'),
audiodatarate=standard_data.get('audiodatarate'),
audiosamplerate=standard_data.get('audiosamplerate'),
audiosamplesize=standard_data.get('audiosamplesize'),
stereo=standard_data.get('stereo'),
audiocodecid=standard_data.get('audiocodecid'),
encoder=standard_data.get('encoder'),
filesize=standard_data.get('filesize'),
custom_fields = custom_fields
)
return header, metadata


def _parse_amf_value(self, data: bytes, pos: int) -> Tuple[Any, int]:
#单值
value_type = data[pos]
pos += 1

        if value_type == 0x00:  # Double
value = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
elif value_type == 0x01:  # Boolean
value = bool(data[pos])
pos += 1
elif value_type == 0x02:  # String
str_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
value = data[pos:pos+str_size].decode('latin-1')
pos += str_size
elif value_type == 0x03:  # Object
value, pos = self._parse_amf_object(data, pos)
elif value_type == AMF_NULL:  # Null
value = None
elif value_type == 0x06:  # Undefined
value = "undefined"
elif value_type == 0x08:  # ECMA Array
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value, pos = self._parse_amf_object(data, pos)
elif value_type == 0x0A:  # Strict Array
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value, pos = self._parse_amf_strict_array(data, pos, arr_size)
elif value_type == 0X0B:  # Date
timestamp = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
timezone = struct.unpack('>h', data[pos:pos+2])[0]
pos += 2
value = f"Date({timestamp}, tz={timezone})"
elif value_type == 0X0C:  # Long String
str_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value = data[pos:pos+str_size].decode('latin-1')
pos += str_size
else:
raise ValueError(f"Unknown AMF type: {hex(value_type)}")

        return value, pos

    def _parse_amf_object(self, data: bytes, pos: int) -> Tuple[Dict[str, Any], int]:
#object
obj = {}
while pos < len(data):
# 检查对象结束标记(0x000009)
if data[pos:pos+3] == b'\x00\x00\x09':
pos += 3
break

            # 解析键名
key_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
key = data[pos:pos+key_size].decode('latin-1')
pos += key_size

            # 解析值
value, pos = self._parse_amf_value(data, pos)
obj[key] = value

        return obj, pos

    def _parse_amf_strict_array(self, data: bytes, pos: int, size: int) -> Tuple[Dict[str, Any], int]:
#strict array
arr = {}
for i in range(size):
value, pos = self._parse_amf_value(data, pos)
arr[str(i)] = value
return arr, pos


def _parse_audio_tag(self, data: bytes) -> Optional[AudioHeader]:
#解析音频标签
if not data:
return None

header_byte = data[0]
format = (header_byte & 0xF0) >> 4
rate = (header_byte & 0x0C) >> 2
size = (header_byte & 0x02) >> 1
audio_type = header_byte & 0x01

aac_packet_type = None
audio_object_type = None
sampling_index = None
channel_config = None

if format == 10:  # AAC
if len(data) > 1:
aac_packet_type = data[1]
if aac_packet_type == 0 and len(data) > 2:  # AAC sequence header
audio_specific_config = data[2:]
if audio_specific_config:
audio_object_type = (audio_specific_config[0] & 0xF8) >> 3
sampling_index = ((audio_specific_config[0] & 0x07) << 1) | ((audio_specific_config[1] & 0x80) >> 7)
channel_config = (audio_specific_config[1] & 0x78) >> 3

return AudioHeader(
format=format,
rate=rate,
size=size,
type=audio_type,
aac_packet_type=aac_packet_type,
audio_object_type=audio_object_type,
sampling_index=sampling_index,
channel_config=channel_config
)

def _parse_video_tag(self, data: bytes) -> Tuple[Optional[VideoHeader], List[NALUnit], Optional[bytes]]:
#解析视频标签
if not data:
return None, [], None

header_byte = data[0]
frame_type = (header_byte & 0xF0) >> 4
codec_id = header_byte & 0x0F

avc_packet_type = None
composition_time = None
nal_units = []
video_body = None

if codec_id == 7:  # AVC
if len(data) > 4:
avc_packet_type = data[1]
composition_time = struct.unpack('>I', b'\x00' + data[2:5])[0]

if avc_packet_type == 0:  # AVC sequence header
video_body = data[5:]
elif avc_packet_type == 1:  # AVC NALU
pos = 5
while pos + 4 < len(data):
nalu_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4

if pos + nalu_size > len(data):
break

nalu_data = data[pos:pos+nalu_size]
pos += nalu_size

if nalu_data:
nalu_header = nalu_data[0]
nalu_type = nalu_header & 0x1F
nalu_ref_idc = (nalu_header & 0x60) >> 5

nal_units.append(NALUnit(
type=nalu_type,
ref_idc=nalu_ref_idc,
size=nalu_size,
data=nalu_data
))
video_body = data[5:pos]
else:
video_body = data[5:]
else:
video_body = data[1:]

return VideoHeader(
frame_type=frame_type,
codec_id=codec_id,
avc_packet_type=avc_packet_type,
composition_time=composition_time
), nal_units, video_body

def save_as_json(self, output_path: str) -> None:
#将解析结果保存为JSON
if not self.header:
raise ValueError("未解析FLV文件, 请先调用parse()方法")

result = {
'file': self.input_file,
'header': {
'signature': self.header.signature,
'version': self.header.version,
'flags': self.header.flags,
'header_size': self.header.header_size
},
'statistics': {
'total_tags': len(self.tags),
'audio_tags': sum(1 for tag in self.tags if tag[0] == 'audio'),
'video_tags': sum(1 for tag in self.tags if tag[0] == 'video'),
'metadata_tags': sum(1 for tag in self.tags if tag[0] == 'metadata'),
'unknown_tags': sum(1 for tag in self.tags if tag[0] == 'unknown')
},
'tags': []
}

for tag in self.tags:
tag_type, tag_header, tag_data, raw_data = tag
tag_dict = {
'prev_tag_size': tag_header.prev_tag_size,
'type': tag_type,
'data_size': tag_header.data_size,
'timestamp': tag_header.timestamp,
'timestamp_extended': tag_header.timestamp_ext,
'stream_id': tag_header.stream_id,
'details': {}
}

if tag_type == 'audio':
audio_header = tag_data
tag_dict['details'] = {
'format': audio_header.format,
'format_description': AUDIO_FORMATS.get(audio_header.format, "Unknown"),
'sample_rate': audio_header.rate,
'sample_rate_description': SAMPLE_RATES.get(audio_header.rate, "Unknown"),
'sample_size': audio_header.size,
'channels': audio_header.type,
'aac_packet_type': audio_header.aac_packet_type,
'audio_object_type': audio_header.audio_object_type,
'sampling_index': audio_header.sampling_index,
'channel_config': audio_header.channel_config
}
elif tag_type == 'video':
video_header, nal_units, video_body = tag_data
tag_dict['details'] = {
'frame_type': video_header.frame_type,
'frame_type_description': FRAME_TYPES.get(video_header.frame_type, "Unknown"),
'codec_id': video_header.codec_id,
'codec_description': CODEC_IDS.get(video_header.codec_id, "Unknown"),
'avc_packet_type': video_header.avc_packet_type,
'avc_packet_type_description': AVC_PACKET_TYPES.get(video_header.avc_packet_type, "Unknown"),
'composition_time': video_header.composition_time,
'nal_units_count': len(nal_units),
'video_body_size': len(video_body) if video_body else 0,
'nal_units': [{
'type': unit.type,
'type_description': NALU_TYPES.get(unit.type, "Unknown"),
'ref_idc': unit.ref_idc,
'ref_idc_description': NALU_REF_IDC.get(unit.ref_idc, "Unknown"),
'size': unit.size
} for unit in nal_units]
}
elif tag_type == 'metadata':
metadata_info, metadata_array = tag_data
tag_dict['details'] = {
"AMF1_type": metadata_info.AMF1_type,
"AMF1_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF1_type],
"size":metadata_info.size,
SCRIPT_DATA_VALUE[metadata_info.AMF1_type]: metadata_info.onMetadata,
"AMF2_type": metadata_info.AMF2_type,
"AMF2_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF2_type],
SCRIPT_DATA_VALUE[metadata_info.AMF2_type] + " size": metadata_info.arr_size,
"Metadata_array_data": [{
"duration": metadata_array.duration,
"width": metadata_array.width,
"height": metadata_array.height,
"videodatarate": metadata_array.videodatarate,
"framerate": metadata_array.framerate,
"videocodecid": metadata_array.videocodecid,
"audiodatarate": metadata_array.audiodatarate,
"audiosamplerate": metadata_array.audiosamplerate,
"audiosamplesize": metadata_array.audiosamplesize,
"stereo": metadata_array.stereo,
"audiocodecid": metadata_array.audiocodecid,
"encoder": metadata_array.encoder,
"filesize": metadata_array.filesize,
**metadata_array.custom_fields
}]
}
# try:
#     tag_dict['details'] = {'metadata': tag_data.decode('utf-8', errors='replace')}
# except UnicodeDecodeError:
#     tag_dict['details'] = {'metadata': '二进制数据,无法解码为文本'}

result['tags'].append(tag_dict)

with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"已保存JSON解析结果到: {output_path}")

def main():
parser = argparse.ArgumentParser(
description='FLV文件解析与保存工具 - 自动处理HTTP和chunk编码 兼容AMF解析',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('input', help='输入FLV文件路径')
parser.add_argument('--save-json', metavar='OUTPUT', help='将解析结果保存为JSON')

args = parser.parse_args()

try:
processor = FLVProcessor(args.input)
processor.parse()

if args.save_json:
processor.save_as_json(args.save_json)
else:
print("未指定输出操作, 使用python process.py input.raw --save-json out.json")

except Exception as e:
print(f"处理文件时出错: {str(e)}")

if __name__ == "__main__":
main()

运行:

python3 flv.py cap.raw --save-json flv-ana.json

http://www.dtcms.com/a/354613.html

相关文章:

  • 第13集 当您的USB设备不在已实测支持列表,如何让TOS-WLink支持您的USB设备--答案Wireshark USB抓包
  • [数据结构] ArrayList与顺序表(下)
  • indexDB快速上手
  • 2015考研数学(二)真题
  • 让模糊物体变清晰的视频AI:快速提升画质指南
  • 51c大模型~合集175
  • pcl_案例2 叶片与根茎的分离
  • Redis发布订阅:实时消息系统的极简解决方案
  • MyBatis延迟加载
  • 云计算学习100天-第29天
  • Node.js 的模块化规范是什么?CommonJS 和 ES6 模块有什么区别?
  • Python DELL Logo
  • day1 ———C++———变量和字符串的使用
  • AI驱动企业数字化转型:解码未来三年的智能化变革密码
  • STAGEWISE实战指南:从集成到使用的完整解决方案
  • AI在商业领域的多元应用:从写作助手到精准运营,解锁AI商业工具新价值
  • 流程控制语句(3)
  • 操作系统中的死锁是什么意思
  • 农行广西区分行携手广西专精特新商会共探金融赋能专精特新企业新路径
  • 用KPI导航数字化转型:制造企业如何科学评估系统上线成效
  • 流程控制语句(2)
  • Java网络编程(UDP, TCP, HTTP)
  • 【Linux基础知识系列:第一百一十五篇】使用gzip与bzip2进行压缩
  • 从首次测试到采购40个机器人:Junior kühlk如何自动化协作机械臂矩阵
  • Linux学习-基于TCP实现群聊
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(三)
  • windows下查看别的服务器的端口是否通
  • [光学原理与应用-319]:激光器光路设计的主要输出文件的形式和内容
  • 解构与重构:“真人不露相,露相非真人” 的存在论新解 —— 论 “真在” 的行为表达本质
  • 一文读懂:用PyTorch从零搭建一个Transformer模型