当前位置: 首页 > news >正文

第三章 · 数据库管理与视频路径获取

在视频分析系统中,数据库模块负责:

  1. 获取待处理的视频列表;

  2. 存储视频分析结果;

  3. 记录已处理的视频,防止重复分析;

  4. 支持从多表获取路径、转换为 Linux 可访问路径。

下面我们以 VideoDB 类为例,详细讲解。


3.1 数据库连接与初始化

import pymysql
from typing import List, Tuple
import jsonclass VideoDB:def __init__(self, host="your_db_host", port=3306, user="your_user", password="your_password", database="your_database"):self.db_config = {"host": host,"port": port,"user": user,"password": password,"database": database,"charset": "utf8mb4"}def _get_connection(self):"""获取数据库连接"""return pymysql.connect(**self.db_config)

说明:

  • 所有敏感信息(IP、账号、密码)已替换为占位符,实际使用时请替换成自己的配置;

  • _get_connection 每次调用都返回一个新的连接,确保线程安全。


3.2 获取待处理视频

def get_pending_videos(self, limit: int = 10) -> List[Tuple[int, str]]:"""获取待处理视频返回 [(id, video_path), ...]"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT id, video_path FROM video_tasks WHERE status='pending' LIMIT %s",(limit,))rows = cursor.fetchall()cursor.close()conn.close()return rows

说明:

  • status='pending' 表示视频尚未分析;

  • 返回列表,供分析程序批量分发到 Ray Worker。


3.3 更新视频状态

def update_video_status(self, video_id: int, status: str):"""更新视频状态status: pending / processing / done / failed"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("UPDATE video_tasks SET status=%s WHERE id=%s",(status, video_id))conn.commit()cursor.close()conn.close()

说明:

  • 每处理完一个视频,及时更新状态,防止重复分析或遗漏;

  • 支持多种状态,方便监控系统进度。


3.4 保存分析结果

def save_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45,"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO video_sft_results (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info"))  # 转成 JSON 存入))conn.commit()cursor.close()conn.close()

说明:

  • 分析结果以 JSON 格式存储 info,便于后续统计或可视化;

  • 使用 ON DUPLICATE KEY UPDATE 避免重复写入。


3.5 获取服务器视频路径并转换 Linux 路径

def fetch_vcfile_server_path(self):"""获取 vc_file.new_path,转换为 /mnt/nas/ Linux 路径并排除已处理的视频"""conn = self._get_connection()try:with conn.cursor() as cursor:sql = """SELECT vf.new_pathFROM vc_file vfWHERE vf.id NOT IN (SELECT s.original_video_idFROM al_video_segment_info s)AND vf.type = 1AND vf.width > 1080AND vf.audio_bitrate IS NOT NULL"""cursor.execute(sql)paths = [row[0].strip().replace('\\', '/') for row in cursor.fetchall() if row[0]]# 排除已处理视频with conn.cursor() as cursor2:cursor2.execute("SELECT video_path FROM processed_SFT_video")existing = set(row[0] for row in cursor2.fetchall())final_paths = [p.replace('//172.16.177.70/tzvchain/', '/mnt/nas/') for p in paths if p not in existing]return final_pathsfinally:conn.close()

说明:

  • 自动替换 Windows/UNC 网络路径为 Linux 路径 /mnt/nas/

  • 排除已经处理的视频,避免重复分析;

  • 提供给 Ray Worker 批量分析使用。


3.6 插入已处理视频

def insert_processed_video(self, video_path: str):"""插入已处理视频路径"""conn = self._get_connection()try:with conn.cursor() as cursor:cursor.execute("INSERT IGNORE INTO processed_SFT_video (video_path) VALUES (%s)",(video_path,))conn.commit()finally:conn.close()

说明:

  • 每处理完一个视频,记录到 processed_SFT_video 表,保证幂等性;

  • 使用 INSERT IGNORE 防止重复插入报错。


✅ 小结

通过 VideoDB 模块,我们实现了:

  1. 视频任务管理(待处理、处理中、已完成、失败);

  2. 视频分析结果存储(JSON info,支持多指标);

  3. 路径获取与转换,保证分布式 Worker 能访问视频文件;

  4. 防重复机制,保证分析任务幂等。

这为 Ray 分布式视频分析 提供了坚实的数据支持和管理能力。

完整代码如下:

import pymysql
from typing import List, Tuple
import statistics
from collections import Counter
import jsonclass VideoDB:def __init__(self, host="127.0.0.1", port=3306, user="db", password="123456", database="db"):self.db_config = {"host": host,"port": port,"user": user,"password": password,"database": database,"charset": "utf8mb4"}def _get_connection(self):"""获取数据库连接"""return pymysql.connect(**self.db_config)def get_pending_videos(self, limit: int = 10) -> List[Tuple[int, str]]:"""获取待处理视频返回 [(id, video_path), ...]"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT id, video_path FROM video_tasks WHERE status='pending' LIMIT %s",(limit,))rows = cursor.fetchall()cursor.close()conn.close()return rowsdef update_video_status(self, video_id: int, status: str):"""更新视频状态status: pending / processing / done / failed"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("UPDATE video_tasks SET status=%s WHERE id=%s",(status, video_id))conn.commit()cursor.close()conn.close()def get_existing_video_path(self):"""获取数据库中已存在的 video_path返回一个 set,方便快速查重"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT video_path FROM video_sft_results")rows = cursor.fetchall()cursor.close()conn.close()return {row[0] for row in rows}def add_video(self, video_path: str):"""插入一个新视频任务"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("INSERT INTO video_tasks (video_path, status) VALUES (%s, 'pending')",(video_path,))conn.commit()cursor.close()conn.close()def save_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45,   # 秒"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO video_sft_results (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info"))  # 转成 JSON 字符串存入数据库))conn.commit()cursor.close()conn.close()def fetch_server_paths(self):"""读取数据库 al_video_cut_sft 中满足 volume_level > -91 的 server_path 字段"""conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT s.server_pathFROM al_video_segment_info sINNER JOIN al_video_info v ON s.original_video_id = v.video_idWHERE v.volume_level > -91"""# sql = """#     SELECT s.server_path#     FROM al_video_segment_info s#     INNER JOIN al_video_info v ON s.original_video_id = v.video_id#     WHERE v.volume_level > -91#     AND s.server_path NOT IN (#         SELECT r.video_path FROM video_sft_results r#     )# """cursor.execute(sql)results = cursor.fetchall()return [row[0] for row in results if row[0]]finally:conn.close()def fetch_server_path(self):conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT server_pathFROM al_video_segment_info"""cursor.execute(sql)results = cursor.fetchall()return [row[0] for row in results if row[0]]finally:conn.close()def fetch_vcfile_server_path(self):"""获取符合条件的 vc_file.new_path,并转换为 Linux 下的 /mnt/nas/ 路径"""conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT vf.new_pathFROM vc_file vfWHERE vf.id NOT IN (SELECT s.original_video_idFROM al_video_segment_info s)AND vf.type = 1AND vf.width > 1080AND vf.audio_bitrate IS NOT NULL"""cursor.execute(sql)results = cursor.fetchall()paths = [row[0] for row in results if row[0]]print("paths:",len(paths))# Python 里替换路径linux_paths = []for p in paths:if not p:continue# 先去掉首尾空白p = p.strip()# 把所有反斜杠换成正斜杠p = p.replace('\\', '/')# 现在才判断前缀(此时路径已变成 //172.16.177.70/tzvchain/...)if p.startswith('//172.16.177.70/tzvchain/'):p = p.replace('//172.16.177.70/tzvchain/', '/mnt/nas/')linux_paths.append(p)# 排除已存在的 processed_SFT_videowith conn.cursor() as cursor2:cursor2.execute("SELECT video_path FROM processed_SFT_video")existing = set(row[0] for row in cursor2.fetchall())final_paths = [p for p in linux_paths if p not in existing]print("final_paths:",len(final_paths))return final_pathsfinally:conn.close()def insert_processed_video(self, video_path: str):"""插入已处理视频路径"""conn = self._get_connection()try:with conn.cursor() as cursor:sql = """INSERT IGNORE INTO processed_SFT_video (video_path)VALUES (%s)"""cursor.execute(sql, (video_path,))conn.commit()finally:conn.close()def save_vcfile_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45,   # 秒"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO vc_sft_results  (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info"))  # 转成 JSON 字符串存入数据库))conn.commit()cursor.close()conn.close()

下一章更新VideoDecoder解码模块

http://www.dtcms.com/a/465273.html

相关文章:

  • 网站log文件示例网站备案到公司
  • 玩转ChatGPT:Kimi OK Computer 数据分析
  • iOS 26 App 性能测试|性能评测|iOS 26 性能对比:实战策略
  • 网站文章多久收录郑州seo服务技术
  • 随州网站建设学习不限流量网站空间
  • 突破机房围墙:openEuler设备的公网管理实战指南
  • 2025年渗透测试面试题总结-105(题目+回答)
  • 4.6 移动IP (答案见原书 P210)
  • Word之分栏出现问题分析与解决方案
  • Linux内核架构浅谈2- Linux内核与硬件交互的底层逻辑:硬件抽象层的作用
  • 三亚城乡建设局网站标识设计是什么
  • 网站建设流程渠道城市建设管理网站
  • 百胜软件“胜券在握AI开发平台”:以AI生态重构零售智能新范式
  • rtthread studio快速创建工程
  • MySQL事务隔离级别详解从读未提交到串行化的全面对比
  • 通用机械(1)
  • 使用yt-dlp来下载视频
  • 【深入浅出PyTorch】--上采样+下采样
  • 一个基于自适应图卷积神经微分方程(AGCNDE)的时空序列预测Matlab实现。这个模型结合了图卷积网络和神经微分方程,能够有效捕捉时空数据的动态演化规律
  • 笑话网站模板重庆品牌设计公司
  • (6)100天python从入门到拿捏《推导式》
  • 【数据结构】考研数据结构核心考点:AVL树插入操作深度解析——从理论到实践的旋转平衡实现
  • 遂宁网站建设哪家好网站诊断案例
  • Python访问数据库——使用SQLite
  • 一行配置解决claude code 2.0版本更新后 vscode 插件需要登录的问题
  • 问题:conda创建的虚拟环境打印中文在vscode中乱码
  • vscode 连接 wsl
  • 华为OD机试C卷 - 灰度图存储 - 矩阵 - (Java C++ JavaScript Python)
  • 资源采集网站如何做wap网站使用微信登陆
  • UNIX下C语言编程与实践58-UNIX TCP 连接处理:accept 函数与新套接字创建