Python 从 SQLite 数据库中批量提取图像数据
Python 从 SQLite 数据库中批量提取图像数据
flyfish
实现了一个可扩展的 SQLite 图像导出工具,能够自动检测图像格式、处理数据前缀,并将数据库中的二进制图像数据导出为文件系统中的标准图像文件
import os
import sqlite3
from typing import Dict, List, Tupleclass SQLiteImageExporter:"""SQLite数据库图像导出工具,负责从数据库提取图像并保存到文件系统"""def __init__(self, db_path: str, output_dir: str):self.db_path = db_pathself.output_dir = output_dirself.format_stats = {}self.format_to_ext = {'JPEG': '.jpg', 'JFIF': '.jpg', 'PNG': '.png','GIF': '.gif', 'BMP': '.bmp', 'TIFF': '.tiff','WEBP': '.webp', 'ICO': '.ico', 'PDF': '.pdf', 'EPS': '.eps'}# 创建输出目录os.makedirs(output_dir, exist_ok=True)def detect_image_format(self, data: bytes) -> str:"""检测图像格式,支持跳过前缀字节"""# 调试输出前20字节print(f"数据前20字节: {data[:20].hex()}")# 查找JPEG SOI标记(FF D8)jpeg_start = data.find(b'\xFF\xD8')if jpeg_start != -1:truncated_data = data[jpeg_start:]print(f"找到JPEG起始标记,位置: {jpeg_start}")# 检查是否为JFIF格式if truncated_data.startswith(b'\xFF\xD8\xFF\xE0') and b'JFIF' in truncated_data[:50]:return 'JFIF'return 'JPEG'# 其他格式检测if data.startswith(b'\x89PNG\r\n\x1a\n'):return 'PNG'elif data.startswith(b'GIF87a') or data.startswith(b'GIF89a'):return 'GIF'elif data.startswith(b'BM'):return 'BMP'return 'UNKNOWN'def sanitize_filename(self, title: str, image_id: int) -> str:"""净化文件名,移除非法字符"""safe_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ._-')safe_title = ''.join(c for c in title if c in safe_chars).strip()return safe_title if safe_title else f"image_{image_id}"def process_image_data(self, img_format: str, data: bytes) -> bytes:"""处理图像数据,主要针对JPEG/JFIF跳过前缀字节"""if img_format in ('JPEG', 'JFIF'):jpeg_start = data.find(b'\xFF\xD8')return data[jpeg_start:] if jpeg_start != -1 else datareturn datadef get_extension(self, img_format: str) -> str:"""根据图像格式获取文件扩展名"""return self.format_to_ext.get(img_format, '.bin')def export_images(self) -> Dict[str, int]:"""从数据库导出所有图像并返回格式统计"""try:print(f"开始从 {self.db_path} 导出图像...")with sqlite3.connect(self.db_path) as conn:cursor = conn.cursor()cursor.execute("SELECT id, title, data FROM content")rows = cursor.fetchall()if not rows:print("未找到图像数据")return self.format_statsprint(f"找到 {len(rows)} 个图像记录")for image_id, title, data in rows:try:# 1. 检测图像格式img_format = self.detect_image_format(data)self.format_stats[img_format] = self.format_stats.get(img_format, 0) + 1# 2. 净化文件名safe_title = self.sanitize_filename(title, image_id)# 3. 处理图像数据processed_data = self.process_image_data(img_format, data)# 4. 获取文件扩展名ext = self.get_extension(img_format)# 5. 保存文件file_path = os.path.join(self.output_dir, f"{safe_title}{ext}")with open(file_path, 'wb') as f:f.write(processed_data)print(f"成功导出: {file_path} (格式: {img_format})")except Exception as e:print(f"导出失败 (ID:{image_id}, 标题:{title}): {str(e)}")# 输出统计结果print("\n导出统计:")for fmt, count in self.format_stats.items():print(f" {fmt}: {count} 个")return self.format_statsexcept sqlite3.Error as e:print(f"数据库错误: {str(e)}")return self.format_statsexcept Exception as e:print(f"未知错误: {str(e)}")return self.format_stats# 使用示例
if __name__ == "__main__":DATABASE_PATH = "/home/1.db"OUTPUT_DIRECTORY = "exported_images"exporter = SQLiteImageExporter(DATABASE_PATH, OUTPUT_DIRECTORY)exporter.export_images()