第三章 · 数据库管理与视频路径获取
在视频分析系统中,数据库模块负责:
获取待处理的视频列表;
存储视频分析结果;
记录已处理的视频,防止重复分析;
支持从多表获取路径、转换为 Linux 可访问路径。
下面我们以 VideoDB
类为例,详细讲解。
3.1 数据库连接与初始化
import pymysql
from typing import List, Tuple
import jsonclass VideoDB:def __init__(self, host="your_db_host", port=3306, user="your_user", password="your_password", database="your_database"):self.db_config = {"host": host,"port": port,"user": user,"password": password,"database": database,"charset": "utf8mb4"}def _get_connection(self):"""获取数据库连接"""return pymysql.connect(**self.db_config)
说明:
所有敏感信息(IP、账号、密码)已替换为占位符,实际使用时请替换成自己的配置;
_get_connection
每次调用都返回一个新的连接,确保线程安全。
3.2 获取待处理视频
def get_pending_videos(self, limit: int = 10) -> List[Tuple[int, str]]:"""获取待处理视频返回 [(id, video_path), ...]"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT id, video_path FROM video_tasks WHERE status='pending' LIMIT %s",(limit,))rows = cursor.fetchall()cursor.close()conn.close()return rows
说明:
status='pending'
表示视频尚未分析;返回列表,供分析程序批量分发到 Ray Worker。
3.3 更新视频状态
def update_video_status(self, video_id: int, status: str):"""更新视频状态status: pending / processing / done / failed"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("UPDATE video_tasks SET status=%s WHERE id=%s",(status, video_id))conn.commit()cursor.close()conn.close()
说明:
每处理完一个视频,及时更新状态,防止重复分析或遗漏;
支持多种状态,方便监控系统进度。
3.4 保存分析结果
def save_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45,"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO video_sft_results (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info")) # 转成 JSON 存入))conn.commit()cursor.close()conn.close()
说明:
分析结果以 JSON 格式存储
info
,便于后续统计或可视化;使用
ON DUPLICATE KEY UPDATE
避免重复写入。
3.5 获取服务器视频路径并转换 Linux 路径
def fetch_vcfile_server_path(self):"""获取 vc_file.new_path,转换为 /mnt/nas/ Linux 路径并排除已处理的视频"""conn = self._get_connection()try:with conn.cursor() as cursor:sql = """SELECT vf.new_pathFROM vc_file vfWHERE vf.id NOT IN (SELECT s.original_video_idFROM al_video_segment_info s)AND vf.type = 1AND vf.width > 1080AND vf.audio_bitrate IS NOT NULL"""cursor.execute(sql)paths = [row[0].strip().replace('\\', '/') for row in cursor.fetchall() if row[0]]# 排除已处理视频with conn.cursor() as cursor2:cursor2.execute("SELECT video_path FROM processed_SFT_video")existing = set(row[0] for row in cursor2.fetchall())final_paths = [p.replace('//172.16.177.70/tzvchain/', '/mnt/nas/') for p in paths if p not in existing]return final_pathsfinally:conn.close()
说明:
自动替换 Windows/UNC 网络路径为 Linux 路径
/mnt/nas/
;排除已经处理的视频,避免重复分析;
提供给 Ray Worker 批量分析使用。
3.6 插入已处理视频
def insert_processed_video(self, video_path: str):"""插入已处理视频路径"""conn = self._get_connection()try:with conn.cursor() as cursor:cursor.execute("INSERT IGNORE INTO processed_SFT_video (video_path) VALUES (%s)",(video_path,))conn.commit()finally:conn.close()
说明:
每处理完一个视频,记录到
processed_SFT_video
表,保证幂等性;使用
INSERT IGNORE
防止重复插入报错。
✅ 小结
通过 VideoDB
模块,我们实现了:
视频任务管理(待处理、处理中、已完成、失败);
视频分析结果存储(JSON info,支持多指标);
路径获取与转换,保证分布式 Worker 能访问视频文件;
防重复机制,保证分析任务幂等。
这为 Ray 分布式视频分析 提供了坚实的数据支持和管理能力。
完整代码如下:
import pymysql
from typing import List, Tuple
import statistics
from collections import Counter
import jsonclass VideoDB:def __init__(self, host="127.0.0.1", port=3306, user="db", password="123456", database="db"):self.db_config = {"host": host,"port": port,"user": user,"password": password,"database": database,"charset": "utf8mb4"}def _get_connection(self):"""获取数据库连接"""return pymysql.connect(**self.db_config)def get_pending_videos(self, limit: int = 10) -> List[Tuple[int, str]]:"""获取待处理视频返回 [(id, video_path), ...]"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT id, video_path FROM video_tasks WHERE status='pending' LIMIT %s",(limit,))rows = cursor.fetchall()cursor.close()conn.close()return rowsdef update_video_status(self, video_id: int, status: str):"""更新视频状态status: pending / processing / done / failed"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("UPDATE video_tasks SET status=%s WHERE id=%s",(status, video_id))conn.commit()cursor.close()conn.close()def get_existing_video_path(self):"""获取数据库中已存在的 video_path返回一个 set,方便快速查重"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("SELECT video_path FROM video_sft_results")rows = cursor.fetchall()cursor.close()conn.close()return {row[0] for row in rows}def add_video(self, video_path: str):"""插入一个新视频任务"""conn = self._get_connection()cursor = conn.cursor()cursor.execute("INSERT INTO video_tasks (video_path, status) VALUES (%s, 'pending')",(video_path,))conn.commit()cursor.close()conn.close()def save_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45, # 秒"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO video_sft_results (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info")) # 转成 JSON 字符串存入数据库))conn.commit()cursor.close()conn.close()def fetch_server_paths(self):"""读取数据库 al_video_cut_sft 中满足 volume_level > -91 的 server_path 字段"""conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT s.server_pathFROM al_video_segment_info sINNER JOIN al_video_info v ON s.original_video_id = v.video_idWHERE v.volume_level > -91"""# sql = """# SELECT s.server_path# FROM al_video_segment_info s# INNER JOIN al_video_info v ON s.original_video_id = v.video_id# WHERE v.volume_level > -91# AND s.server_path NOT IN (# SELECT r.video_path FROM video_sft_results r# )# """cursor.execute(sql)results = cursor.fetchall()return [row[0] for row in results if row[0]]finally:conn.close()def fetch_server_path(self):conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT server_pathFROM al_video_segment_info"""cursor.execute(sql)results = cursor.fetchall()return [row[0] for row in results if row[0]]finally:conn.close()def fetch_vcfile_server_path(self):"""获取符合条件的 vc_file.new_path,并转换为 Linux 下的 /mnt/nas/ 路径"""conn = pymysql.connect(**self.db_config)try:with conn.cursor() as cursor:sql = """SELECT vf.new_pathFROM vc_file vfWHERE vf.id NOT IN (SELECT s.original_video_idFROM al_video_segment_info s)AND vf.type = 1AND vf.width > 1080AND vf.audio_bitrate IS NOT NULL"""cursor.execute(sql)results = cursor.fetchall()paths = [row[0] for row in results if row[0]]print("paths:",len(paths))# Python 里替换路径linux_paths = []for p in paths:if not p:continue# 先去掉首尾空白p = p.strip()# 把所有反斜杠换成正斜杠p = p.replace('\\', '/')# 现在才判断前缀(此时路径已变成 //172.16.177.70/tzvchain/...)if p.startswith('//172.16.177.70/tzvchain/'):p = p.replace('//172.16.177.70/tzvchain/', '/mnt/nas/')linux_paths.append(p)# 排除已存在的 processed_SFT_videowith conn.cursor() as cursor2:cursor2.execute("SELECT video_path FROM processed_SFT_video")existing = set(row[0] for row in cursor2.fetchall())final_paths = [p for p in linux_paths if p not in existing]print("final_paths:",len(final_paths))return final_pathsfinally:conn.close()def insert_processed_video(self, video_path: str):"""插入已处理视频路径"""conn = self._get_connection()try:with conn.cursor() as cursor:sql = """INSERT IGNORE INTO processed_SFT_video (video_path)VALUES (%s)"""cursor.execute(sql, (video_path,))conn.commit()finally:conn.close()def save_vcfile_video_result(self, result: dict):"""result = {"video_id": "xxx","video_path": "/full/path/to/video.mp4","duration": 123.45, # 秒"info": {"0": {"age": 25, "emotion": "happy", "emotion_richness": 0.67, "hand_richness": 0.32},"1": {"age": 30, "emotion": "sad", "emotion_richness": 0.45, "hand_richness": 0.21}}}"""conn = self._get_connection()cursor = conn.cursor()sql = """INSERT INTO vc_sft_results (video_id, video_path, duration, info)VALUES (%s, %s, %s, %s)ON DUPLICATE KEY UPDATEvideo_path=VALUES(video_path),duration=VALUES(duration),info=VALUES(info)"""cursor.execute(sql, (result["video_id"],result["video_path"],result.get("duration", None),json.dumps(result.get("info")) # 转成 JSON 字符串存入数据库))conn.commit()cursor.close()conn.close()
下一章更新VideoDecoder解码模块