当前位置: 首页 > news >正文

实现自己的AI视频监控系统-第一章-视频拉流与解码3

文章目录

  • 前言
  • 一、H.264编码解码简要流程
    • 1. 编码器工作流程
    • 2. 解码器工作流程
  • 二、未覆盖的H.264核心知识点
  • 本小节的作用与意义
  • 总结
  • 下期预告


前言


写在最前面:这一小节是我非常不想写的一小节,一方面是因为编码解码是特别特别复杂的事情,另外一方面,这一小节也并不是实现AI视频监控系统的核心部分,我们可以选择合适的组件实现相应的功能。但是,有开始便有结束,我希望构建一个完整的工程体系,面临问题时,大家伙可以从这一部分获得相应的启发。以下是真正的前言部分:


大家好,上一节我们详细介绍了拉流的完整过程,今天将简要介绍解码和编码的内容。这块内容涉及的知识点很多,包括:帧内预测、帧间预测、运动估计、DCT变换、量化、熵编码等核心技术。由于体系复杂,针对H264/H265的编解码知识足够单开几本书去撰写,由于我们的重心在于AI视频监控系统的实现,我将舍去这些深奥的数学原理和算法细节,以一个简化的代码为例,深入浅出地介绍H.264编解码的基本流程。


一、H.264编码解码简要流程

  • 本章节使用的测试图片如下:
    在这里插入图片描述

  • 编码和解码的测试代码如下(建议从编码器工作流程开始阅读,再返回来看该代码):
    我采用宏块内的均值作为宏块信息值,这本身是不符合H264编解码的定义,但是整个流程基本符合H264的编解码逻辑!

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import structclass SimpleH264Encoder:"""简化的H.264编码器(仅用于演示)"""# 修改宏块大小为128x128MACROBLOCK_SIZE = 8def __init__(self, quality=50):self.quality = max(1, min(100, quality))self.qp = int(51 * (100 - self.quality) / 100)  # 量化参数def encode_image(self, image_array):"""将图像数组编码为简化的H.264格式参数:image_array: 形状为(height, width, 3)的numpy数组返回:bytes: 编码后的字节数据"""height, width, channels = image_array.shape# 确保图像尺寸是宏块大小的倍数mb_width = (width + self.MACROBLOCK_SIZE - 1) // self.MACROBLOCK_SIZEmb_height = (height + self.MACROBLOCK_SIZE - 1) // self.MACROBLOCK_SIZEpadded_width = mb_width * self.MACROBLOCK_SIZEpadded_height = mb_height * self.MACROBLOCK_SIZE# 如果需要,填充图像if padded_width != width or padded_height != height:padded_image = np.zeros((padded_height, padded_width, 3), dtype=np.uint8)padded_image[:height, :width] = image_arrayimage_array = padded_image# 创建编码数据缓冲区encoded_data = bytearray()# 1. 添加SPS (序列参数集)sps = self._generate_sps(padded_width, padded_height)encoded_data.extend(b'\x00\x00\x01')  # NAL单元起始码encoded_data.extend(sps)# 2. 添加PPS (图像参数集)pps = self._generate_pps()encoded_data.extend(b'\x00\x00\x01')  # NAL单元起始码encoded_data.extend(pps)# 3. 添加IDR帧 (关键帧)idr_frame = self._encode_idr_frame(image_array)encoded_data.extend(b'\x00\x00\x01')  # NAL单元起始码encoded_data.extend(idr_frame)return bytes(encoded_data)def _generate_sps(self, width, height):"""生成简化的SPS"""# SPS NAL头 (类型7, 重要性3)nal_header = bytes([0x67])# 简化的SPS数据sps_data = bytearray()# 图像尺寸 (以宏块为单位)mb_width = width // self.MACROBLOCK_SIZEmb_height = height // self.MACROBLOCK_SIZE# 添加宽度和高度信息sps_data.extend(struct.pack('>HH', mb_width, mb_height))# 添加量化参数sps_data.append(self.qp)return nal_header + self._add_emulation_prevention_bytes(sps_data)def _generate_pps(self):"""生成简化的PPS"""# PPS NAL头 (类型8, 重要性3)nal_header = bytes([0x68])# 简化的PPS数据pps_data = bytearray()# 添加一些基本参数pps_data.append(0x00)  # 参数集IDpps_data.append(0x00)  # 序列参数集IDreturn nal_header + self._add_emulation_prevention_bytes(pps_data)def _encode_idr_frame(self, image_array):"""编码IDR帧"""# IDR NAL头 (类型5, 重要性3)nal_header = bytes([0x65])# 简化的帧数据frame_data = bytearray()height, width, channels = image_array.shape# 添加帧类型标识 (0表示IDR帧)frame_data.append(0x00)# 对每个颜色通道进行简单的"编码"for c in range(3):  # R, G, B通道channel_data = image_array[:, :, c]# 将图像分割为宏块for y in range(0, height, self.MACROBLOCK_SIZE):for x in range(0, width, self.MACROBLOCK_SIZE):macroblock = channel_data[y:y + self.MACROBLOCK_SIZE, x:x + self.MACROBLOCK_SIZE]# 计算宏块的平均值 (非常简化的"编码")avg_value = int(np.mean(macroblock))# 添加宏块数据frame_data.append(avg_value)return nal_header + self._add_emulation_prevention_bytes(frame_data)def _add_emulation_prevention_bytes(self, data):"""添加防竞争字节 (0x03)"""result = bytearray()i = 0while i < len(data):# 检查是否有需要插入防竞争字节的位置if i < len(data) - 2 and data[i] == 0x00 and data[i + 1] == 0x00 and data[i + 2] in [0x00, 0x01, 0x02,0x03]:result.extend(data[i:i + 2])result.append(0x03)  # 插入防竞争字节result.append(data[i + 2])i += 3else:result.append(data[i])i += 1return bytes(result)class SimpleH264Decoder:"""简化的H.264解码器(仅用于演示)"""# 修改宏块大小为128x128MACROBLOCK_SIZE = 128def __init__(self):self.width = 0self.height = 0self.qp = 26self.decoded_image = Nonedef decode(self, encoded_data):"""解码H.264数据"""print(f"开始解码,数据长度: {len(encoded_data)} 字节")# 查找所有NAL单元起始位置start_positions = []pos = 0while pos < len(encoded_data) - 3:if (encoded_data[pos] == 0x00 andencoded_data[pos + 1] == 0x00 andencoded_data[pos + 2] == 0x01):start_positions.append(pos)pos += 3else:pos += 1print(f"找到 {len(start_positions)} 个NAL单元起始码")# 如果没有找到起始码,尝试直接解析整个数据if not start_positions:print("警告: 未找到NAL单元起始码,尝试直接解析数据")self._parse_nal_unit(encoded_data)return self.decoded_image# 提取并处理每个NAL单元for i, start in enumerate(start_positions):# 确定NAL单元结束位置if i < len(start_positions) - 1:end = start_positions[i + 1]else:end = len(encoded_data)# 提取NAL单元数据 (跳过起始码)nal_unit = encoded_data[start + 3:end]# 解析NAL单元self._parse_nal_unit(nal_unit)return self.decoded_imagedef _parse_nal_unit(self, nal_data):"""解析NAL单元"""if not nal_data:print("警告: 空的NAL单元")return# 获取NAL单元类型nal_type = nal_data[0] & 0x1Fprint(f"解析NAL单元,类型: {nal_type}")# 移除NAL头nal_data = nal_data[1:]# 移除防竞争字节nal_data = self._remove_emulation_prevention_bytes(nal_data)# 处理不同类型的NAL单元if nal_type == 7:  # SPSself._parse_sps(nal_data)elif nal_type == 8:  # PPSself._parse_pps(nal_data)elif nal_type == 5:  # IDR帧self._decode_idr_frame(nal_data)else:print(f"警告: 未知的NAL单元类型 {nal_type}")def _parse_sps(self, sps_data):"""解析SPS"""if len(sps_data) < 5:print(f"错误: SPS数据长度不足: {len(sps_data)}")returntry:# 解析图像尺寸 (以宏块为单位)mb_width, mb_height = struct.unpack('>HH', sps_data[:4])self.width = mb_width * self.MACROBLOCK_SIZEself.height = mb_height * self.MACROBLOCK_SIZE# 解析量化参数if len(sps_data) > 4:self.qp = sps_data[4]print(f"解析SPS: 图像尺寸={self.width}x{self.height}, QP={self.qp}")except Exception as e:print(f"解析SPS时出错: {e}")def _parse_pps(self, pps_data):"""解析PPS"""# 在这个简化实现中,PPS没有包含重要信息print(f"解析PPS: 数据长度={len(pps_data)}")def _decode_idr_frame(self, frame_data):"""解码IDR帧"""if self.width == 0 or self.height == 0:print("错误: 尚未解析SPS,无法确定图像尺寸")return# 跳过帧类型标识if len(frame_data) > 0 and frame_data[0] == 0x00:frame_data = frame_data[1:]else:print("警告: 帧类型标识不是0x00")# 计算宏块数量mb_width = self.width // self.MACROBLOCK_SIZEmb_height = self.height // self.MACROBLOCK_SIZEtotal_macroblocks = mb_width * mb_heightexpected_data_length = total_macroblocks * 3  # 3个颜色通道# 检查数据长度是否足够if len(frame_data) < expected_data_length:print(f"警告: 帧数据不完整,期望{expected_data_length}字节,实际{len(frame_data)}字节")# 尝试使用可用数据继续解码frame_data = frame_data.ljust(expected_data_length, b'\x00')# 创建图像数组self.decoded_image = np.zeros((self.height, self.width, 3), dtype=np.uint8)# 解码每个颜色通道for c in range(3):  # R, G, B通道start_idx = c * total_macroblocksend_idx = (c + 1) * total_macroblocksif end_idx > len(frame_data):channel_data = frame_data[start_idx:] + b'\x00' * (end_idx - len(frame_data))else:channel_data = frame_data[start_idx:end_idx]# 填充每个宏块mb_idx = 0for y in range(0, self.height, self.MACROBLOCK_SIZE):for x in range(0, self.width, self.MACROBLOCK_SIZE):# 获取当前宏块的值if mb_idx < len(channel_data):value = channel_data[mb_idx]mb_idx += 1else:value = 0  # 默认值# 填充整个宏块self.decoded_image[y:y + self.MACROBLOCK_SIZE, x:x + self.MACROBLOCK_SIZE, c] = valueprint(f"成功解码IDR帧: 图像尺寸={self.width}x{self.height}")def _remove_emulation_prevention_bytes(self, data):"""移除防竞争字节 (0x03)"""result = bytearray()i = 0while i < len(data):# 检查是否有需要移除防竞争字节的位置if (i < len(data) - 2 anddata[i] == 0x00 anddata[i + 1] == 0x00 anddata[i + 2] == 0x03):result.extend(data[i:i + 2])  # 添加两个0x00i += 3  # 跳过0x03else:result.append(data[i])i += 1return bytes(result)def load_and_prepare_image(image_path, max_size=512):"""加载并准备图像"""# 加载图像image = Image.open(image_path)# 调整图像大小(如果太大)if max(image.width, image.height) > max_size:ratio = max_size / max(image.width, image.height)new_width = int(image.width * ratio)new_height = int(image.height * ratio)image = image.resize((new_width, new_height), Image.LANCZOS)# 确保图像尺寸是宏块大小的倍数macroblock_size = 128  # 与编码器中的宏块大小一致width = (image.width + macroblock_size - 1) // macroblock_size * macroblock_sizeheight = (image.height + macroblock_size - 1) // macroblock_size * macroblock_sizeif width != image.width or height != image.height:image = image.resize((width, height), Image.LANCZOS)# 转换为numpy数组image_array = np.array(image)# 确保是3通道图像if len(image_array.shape) == 2:  # 灰度图像image_array = np.stack([image_array] * 3, axis=-1)elif image_array.shape[2] == 4:  # RGBA图像image_array = image_array[:, :, :3]return image_arraydef display_images(original, decoded, title1="Original Image", title2="Decoded Image"):"""显示原始图像和解码后的图像"""fig, axes = plt.subplots(1, 2, figsize=(12, 6))axes[0].imshow(original)axes[0].set_title(title1)axes[0].axis('off')axes[1].imshow(decoded)axes[1].set_title(title2)axes[1].axis('off')plt.tight_layout()plt.show()def main():"""主函数"""# 图像路径 (替换为您自己的图像路径)image_path = r"path_to_img.jpg"  # 或者使用其他格式try:# 加载和准备图像print("加载图像...")original_image = load_and_prepare_image(image_path)print(f"图像尺寸: {original_image.shape[1]}x{original_image.shape[0]}")# 创建编码器print("创建编码器...")encoder = SimpleH264Encoder(quality=80)# 编码图像print("编码图像...")encoded_data = encoder.encode_image(original_image)print(f"编码数据大小: {len(encoded_data)} 字节")print(f"压缩比: {original_image.nbytes / len(encoded_data):.2f}:1")# 创建解码器print("创建解码器...")decoder = SimpleH264Decoder()# 解码数据print("解码数据...")decoded_image = decoder.decode(encoded_data)if decoded_image is not None:# 显示结果print("显示结果...")display_images(original_image, decoded_image)else:print("解码失败")except Exception as e:print(f"发生错误: {e}")import tracebacktraceback.print_exc()# 如果没有图像文件,创建一个示例图像print("创建示例图像...")width, height = 256, 256  # 使用更大的尺寸以适应128x128宏块original_image = np.zeros((height, width, 3), dtype=np.uint8)# 创建一个简单的渐变图像for y in range(height):for x in range(width):r = int(255 * x / width)g = int(255 * y / height)b = int(255 * (x + y) / (width + height))original_image[y, x] = [r, g, b]# 编码和解码encoder = SimpleH264Encoder(quality=80)encoded_data = encoder.encode_image(original_image)decoder = SimpleH264Decoder()decoded_image = decoder.decode(encoded_data)# 显示结果display_images(original_image, decoded_image, "Generated Image", "Decoded Image")if __name__ == "__main__":main()

  • 大家可以调节宏块的大小来看不同的编解码结构,测试结果如下所示:
  • 宏块大小16*16
    在这里插入图片描述

  • 宏块大小8*8
    在这里插入图片描述

  • 宏块大小4*4
    在这里插入图片描述

  • 宏块大小2*2
    在这里插入图片描述

1. 编码器工作流程

原始图像
图像预处理
生成SPS参数集
生成PPS参数集
编码IDR帧
NAL单元封装
添加起始码
输出H.264码流

编码过程详解
在我们的简化代码中,编码过程主要分为三个步骤:

  1. 生成SPS(序列参数集)
    SPS包含了视频序列的全局参数,如图像尺寸、编码档次等信息。

    def _generate_sps(self, width, height):# 计算以宏块为单位的图像尺寸mb_width = width // self.MACROBLOCK_SIZEmb_height = height // self.MACROBLOCK_SIZE# 打包基本参数sps_data = bytearray()sps_data.extend(struct.pack('>HH', mb_width, mb_height))sps_data.append(self.qp)  # 量化参数return bytes([0x67]) + self._add_emulation_prevention_bytes(sps_data)
    
  2. 生成PPS(图像参数集)
    PPS包含了一帧图像或者若干帧图像的公共参数。

  3. 编码IDR帧(关键帧)
    IDR帧是可以独立解码的关键帧,我们的简化实现使用宏块平均值作为编码策略:

    def _encode_idr_frame(self, image_array):frame_data = bytearray()frame_data.append(0x00)  # 帧类型标识# 对每个颜色通道分别处理for c in range(3):  # R, G, B三个通道channel_data = image_array[:, :, c]# 将图像划分为宏块并处理for y in range(0, height, self.MACROBLOCK_SIZE):for x in range(0, width, self.MACROBLOCK_SIZE):macroblock = channel_data[y:y+self.MACROBLOCK_SIZE, x:x+self.MACROBLOCK_SIZE]avg_value = int(np.mean(macroblock))  # 计算宏块平均值frame_data.append(avg_value)  # 存储平均值return bytes([0x65]) + self._add_emulation_prevention_bytes(frame_data)
    

2. 解码器工作流程

SPS
PPS
IDR帧
H.264码流
解析起始码
提取NAL单元
判断NAL类型
解析序列参数
解析图像参数
解码帧数据
图像重建
显示解码图像

解码是编码的逆过程,主要包括:

  1. 解析NAL单元

    def decode(self, encoded_data):# 查找所有的起始码(0x000001)start_positions = []pos = 0while pos < len(encoded_data) - 3:if (encoded_data[pos] == 0x00 andencoded_data[pos + 1] == 0x00 andencoded_data[pos + 2] == 0x01):start_positions.append(pos)pos += 3else:pos += 1
    
  2. 根据NAL类型处理不同数据

    def _parse_nal_unit(self, nal_data):nal_type = nal_data[0] & 0x1F  # 获取NAL类型if nal_type == 7:  # SPSself._parse_sps(nal_data[1:])elif nal_type == 8:  # PPSself._parse_pps(nal_data[1:])elif nal_type == 5:  # IDR帧self._decode_idr_frame(nal_data[1:])
    
  3. 图像重建

    def _decode_idr_frame(self, frame_data):# 根据SPS中的图像尺寸创建空白图像self.decoded_image = np.zeros((self.height, self.width, 3), dtype=np.uint8)# 对每个颜色通道进行重建for c in range(3):# 获取该通道的所有宏块数据channel_data = frame_data[c*total_macroblocks:(c+1)*total_macroblocks]# 将平均值填充到整个宏块mb_idx = 0for y in range(0, self.height, self.MACROBLOCK_SIZE):for x in range(0, self.width, self.MACROBLOCK_SIZE):value = channel_data[mb_idx]self.decoded_image[y:y+self.MACROBLOCK_SIZE, x:x+self.MACROBLOCK_SIZE, c] = valuemb_idx += 1
    

二、未覆盖的H.264核心知识点

虽然我们的简化代码演示了基本流程,但真实的H264标准包含更多复杂技术:

  1. 预测编码技术

    • 帧内预测:利用当前帧内已编码区域预测当前块,有9种预测模式

    • 帧间预测:利用已编码参考帧进行运动补偿预测

    • 运动估计:寻找最佳匹配块的运动矢量

    • 多参考帧:可以使用多个先前帧作为参考

  2. 变换与量化

    • 整数DCT变换:将残差数据从空间域变换到频域

    • 量化过程:通过量化矩阵减少高频系数精度

    • 自适应量化:根据内容特性调整量化强度

  3. 熵编码

    • CAVLC(上下文自适应变长编码):用于残差系数编码

    • CABAC(上下文自适应二进制算术编码):更高效的无损压缩

  4. 码流结构

    • Slice划分:一帧图像可以划分为多个片(Slice)

    • NAL单元分层:参数集、片数据、SEI信息等分层结构

    • 参考帧管理:DPB(解码图像缓冲区)管理

  5. 高级特性

    • B帧编码:双向预测帧,提高压缩效率

    • 加权预测:对参考帧进行加权组合

    • 去块效应滤波:减少块边界视觉伪影

    • SP/SI帧:特殊用途的切换帧

  6. 性能优化技术

    • 率失真优化:平衡编码质量和比特率

    • 码率控制:根据信道条件调整编码参数

    • 并行处理:Slice级和帧级并行编码


本小节的作用与意义

在实际监控应用中,计算资源往往是有限的——特别是在嵌入式设备、移动终端或需要同时处理多路视频的场景中。通过了解H.264中I帧(IDR)、P帧和B帧的特性差异,我们可以设计出高效的计算资源分配策略,在保证监控基本需求的同时大幅降低计算负担

  1. 帧类型特性与计算需求对比
  • I帧(关键帧/IDR帧)

    • 特性:包含完整的图像信息,独立编码,不依赖其他帧

    • 计算需求:编码需要大量计算,解码相对简单

    • 数据量:占用空间最大,但提供完整的图像信息

  • P帧(预测帧)

    • 特性:只存储与前一帧的差异信息

    • 计算需求:编码和解码都需要运动补偿计算

    • 数据量:中等,约为I帧的1/3-1/2

  • B帧(双向预测帧)

    • 特性:参考前后帧的差异信息

    • 计算需求:编码和解码计算最复杂,需要缓存多帧

    • 数据量:最小,约为I帧的1/5-1/3

  1. 解码流程对比
  • 以下是满帧解码流程示意图
I帧
P帧
B帧
开始
接收码流
解析NAL单元
判断帧类型
解码I帧
解码P帧
解码B帧
输出帧
继续下一帧?
结束
  • 关键帧解码示意图
I帧
P帧
B帧
开始
接收码流
解析NAL单元
判断帧类型
解码I帧并存储
计算资源充足?
计算资源充足?
解码P帧
使用最后I帧替代
解码B帧
使用最后I帧替代
输出帧
继续下一帧?
结束

总结

通过优先处理H.264关键帧,我们可以在有限的计算资源下实现有效的监控功能。这种策略基于监控场景的特殊性:对画面连续性的要求相对较低,而对关键信息获取的能力要求较高。

在实际应用中,我们可以根据设备能力、网络条件和监控需求动态调整编解码策略,在资源约束和功能完整性之间找到最佳平衡点。这种基于关键帧优先的优化的不仅适用于监控场景,也可以扩展到视频会议、远程教育等其他对实时性要求较高的视频应用领域。

正是通过对编解码原理的了解,我们才能设计出这种既节约资源又满足实际需求的智能视频处理方案。


下期预告

  • 拉流解码模块的设计
  • 拉流解码资源对比
http://www.dtcms.com/a/342590.html

相关文章:

  • JavaWeb前端03(Ajax概念及在前端开发时应用)
  • Windows下,将本地视频转化成rtsp推流的方法
  • 高效处理NetCDF文件经纬度转换:一个纯CDO驱动的Bash脚本详解
  • GitHub 热榜项目 - 日榜(2025-08-21)
  • 009.Redis Predixy + Sentinel 架构
  • 深度卷积神经网络AlexNet
  • 【NVIDIA-B200】生产报错 Test CUDA failure common.cu:1035 ‘system not yet initialized‘
  • Docker 搭建 Gitlab 实现自动部署Vue项目
  • NW755NW776美光固态闪存NW863NX595
  • 【永洪BI】报告脚本-JavaScript使用【完整版】
  • Vue 项目中父子传值使用Vuex异步数据不更新问题
  • Postman来做API安全测试:身份验证缺陷漏洞测试
  • 药品追溯码(溯源码)采集系统(二):门诊发药后端
  • 【Linux系统】进程信号:信号的产生和保存
  • 使用EasyExcel 导出复杂的合并单元格
  • 第四届中国高校机器人实验教学创新大赛团队参赛总结
  • selenium一些进阶方法如何使用
  • 大模型0基础开发入门与实践:第11章 进阶:LangChain与外部工具调用
  • 打破传统课程模式,IP变现的创新玩法 | 创客匠人
  • 从零开始学 Selenium:浏览器驱动、元素定位与实战技巧
  • 微服务:现代软件架构的主流范式
  • Linux mmap内存映射
  • 集中式负载均衡 vs. 分布式负载均衡
  • 【赵渝强老师】Redis Cluster分布式集群
  • #千问海报大赛
  • 订单簿动力学与深度学习模型的融合大单识别与短期市场价格波动预测
  • Java多线程编程基础篇
  • 多级缓存一致性矩阵:ABP vNext 下的旁路 / 写穿 / 写回组合实战
  • Qt的moveToThread使用
  • SQL-leetcode—3451. 查找无效的 IP 地址