当前位置: 首页 > news >正文

【python】基于pycharm的海康相机SDK二次开发

海康威视二次开发相机管理

在这里插入图片描述

这段代码基于python开发的,用了opencv的一些库函数。实现了一个完整的海康机器人相机管理工具,支持多相机连接、参数配置、图像采集和实时显示功能。目前USB相机测试无误,除了丢一些包。
在这里插入图片描述
在这里插入图片描述

1. 主要类结构

HKCameraManager

这是整个系统的核心类,负责管理所有相机的生命周期和操作。
全局可调参数

# 相机参数配置
EXPOSURE_MODE = 0  # 曝光模式:0:关闭;1:一次;2:自动曝光
EXPOSURE_TIME = 40000 # 曝光时间
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻转
ReverseY_enable = True # 垂直翻转
#图像显示大小
scale_width = 0.2  # 宽度缩放因子
scale_height = 0.2  # 高度缩放因子
PacketSizeLog = True  # 启用丢包信息检测
主要属性:
  • cameras: 字典,存储所有已连接相机的信息和句柄
  • _last_error: 记录最后一次错误信息
  • _running: 字典,记录每个相机的运行状态
  • _lock: 线程锁,保证线程安全
  • _display_threads: 字典,存储每个相机的显示线程
  • _fps: 字典,记录每个相机的帧率
 def __init__(self):"""初始化相机管理器"""self.cameras: Dict[int, Dict] = {}  # 存储所有相机实例和信息self._last_error: str = ""self._running = {}  # 每个相机的运行状态self._lock = threading.Lock()self._display_threads = {}  # 每个相机的显示线程self._fps = {}  # 每个相机的FPS

2. 核心功能流程

2.1 设备枚举

  • 通过enumerate_devices()方法枚举所有可用设备
  • 支持GigE和USB两种接口类型的相机
  • 返回设备列表,包含型号、序列号、IP地址等信息
    def enumerate_devices(self) -> Optional[List[dict]]:"""枚举所有可用设备"""try:# 设置要枚举的设备类型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化设备列表结构体device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 创建临时相机实例用于枚举temp_cam = MvCamera()# 枚举设备ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚举设备", ret)return None# 检查找到的设备数量if device_list.nDeviceNum == 0:print("未检测到任何相机设备")return []devices = []for i in range(device_list.nDeviceNum):# 获取设备信息指针device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根据传输层类型处理设备信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE设备device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB设备# 修正USB设备信息获取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函数获取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚举设备时发生异常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的错误堆栈return None

2.2 相机连接

  • connect_camera()方法连接指定索引的相机
  • 步骤:
    1. 检查相机是否已连接
    2. 枚举设备并选择指定索引的设备
    3. 创建相机句柄
    4. 打开设备
    5. 配置相机参数(曝光、增益等)
    6. 开始采集图像
    7. 存储相机信息到字典中
    def connect_camera(self, device_index: int) -> bool:"""连接指定索引的相机设备"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相机 {device_index} 已连接")return True# 枚举设备tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 实例化相机cam = MvCamera()# 枚举设备ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚举设备", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何设备"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"设备索引超出范围,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 选择指定设备stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 创建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("创建句柄", ret)return False# 获取设备信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打开相机ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特别处理USB相机连接问题if stDeviceList.nTLayerType == MV_USB_DEVICE:# 尝试设置USB传输大小(海康USB相机常见问题)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打开设备", ret)return False# 配置相机参数if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 开始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("开始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存储相机信息 - 确保所有必要字段都正确设置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 确保连接状态正确设置为True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS计数器self._fps[device_index] = 0print(f"相机 {device_index} 连接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"连接相机时发生异常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return False

2.3 相机参数配置

  • _configure_camera()私有方法处理相机参数配置
  • 可配置项:
    • 触发模式(连续采集)
    • 曝光模式(手动/自动)
    • 增益设置
    • 图像翻转(水平/垂直)
    def _configure_camera(self, cam: MvCamera) -> bool:"""配置相机参数"""try:# 设置触发方式为连续采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("设置触发模式", ret)return False# 设置曝光模式match EXPOSURE_MODE:case 0:  # 手动设置参数ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 关闭自动曝光设置失败,将采用自动曝光")# 设置曝光时间exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光设置失败,将继续使用手动曝光")case 2:  # 自动曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自动曝光设置失败,将继续使用手动曝光")# 设置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手动增益设置失败,将采用自动增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 设置水平翻转flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 设置垂直翻转flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相机时发生异常: {str(e)}"print(self._last_error)return False

2.4 图像获取

  • get_image()方法获取指定相机的图像
  • 步骤:
    1. 获取图像缓冲区
    2. 复制图像数据
    3. 根据像素类型处理图像数据
    4. 转换为灰度图像
    5. 释放图像缓冲区
    6. 更新帧统计信息
    def get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""获取指定相机的图像并返回原始图像和灰度图像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相机 {device_index} 未连接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化帧输出结构stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 获取图像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相机 {device_index} 获取图像", ret)return None# 获取图像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印调试信息# print(f"相机 {device_index} 图像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 复制图像数据data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 转换为numpy数组temp = np.frombuffer(data_buf, dtype=np.uint8)# 获取图像参数width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根据像素类型处理图像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相机 {device_index} 图像处理失败 - 数据大小: {len(temp)}, "f"预期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 转换为灰度图像if len(img.shape) == 2:  # 已经是灰度图像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 释放图像缓存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新帧统计信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相机 {device_index} 获取图像时发生异常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return None

2.5 图像显示

  • start_display()启动相机实时显示
  • 为每个相机创建独立的显示线程
  • 显示线程中:
    • 循环获取图像
    • 计算并显示FPS
    • 显示图像到窗口
    • 处理用户按键(ESC退出)
    def start_display(self,device_index: int) -> bool:"""启动所有已连接相机的实时显示"""with self._lock:  # 添加线程锁# 检查相机是否已连接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接,无法启动显示")return Falseif device_index in self._running and self._running[device_index]:print(f"相机 {device_index} 显示已启动")return True# 设置运行标志self._running[device_index] = True# 创建并启动显示线程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相机 {device_index} 显示线程已启动")return True

2.6 断开连接

  • disconnect_camera()断开单个相机连接
  • disconnect_all()断开所有相机连接
  • 释放所有资源
    def disconnect_camera(self, device_index: int) -> bool:"""断开指定相机的连接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相机 {device_index} 停止取流", ret)success = False# 关闭设备ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相机 {device_index} 关闭设备", ret)success = False# 销毁句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相机 {device_index} 销毁句柄", ret)success = Falseif success:print(f"相机 {device_index} 已成功断开连接")self.cameras[device_index]['connected'] = False# 从字典中移除相机del self.cameras[device_index]# 停止显示线程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"断开相机 {device_index} 连接时发生异常: {str(e)}"print(self._last_error)return Falsedef disconnect_all(self) -> None:"""断开所有相机的连接"""self.stop_display()  # 先停止所有显示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)

3. 目前程序的一些功能和特点如下

  1. 多线程支持

    • 每个相机的显示使用独立线程
    • 使用线程锁保证线程安全
  2. 错误处理

    • 详细的错误日志记录
    • 异常捕获和处理
      在这里插入图片描述
  3. 相机参数配置

    • 支持多种曝光模式
    • 可配置增益、翻转等参数
  4. 图像处理

    • 支持多种像素格式(Mono8, RGB8, BGR8等)
    • 自动处理数据大小不匹配的情况
    • 图像缩放显示
  5. 性能监控

    • 实时计算和显示FPS
    • 帧计数统计

4. 完整代码如下

from HK_Camera.MvCameraControl_class import *
from ctypes import *
from typing import Optional, Tuple, List, Dict
import time
import cv2
import numpy as np
import threading# 相机参数配置
EXPOSURE_MODE = 0  # 曝光模式:0:关闭;1:一次;2:自动曝光
EXPOSURE_TIME = 40000 # 曝光时间
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻转
ReverseY_enable = True # 垂直翻转
#图像显示大小
scale_width = 0.2  # 宽度缩放因子
scale_height = 0.2  # 高度缩放因子
PacketSizeLog = True  # 启用丢包信息检测class HKCameraManager:def __init__(self):"""初始化相机管理器"""self.cameras: Dict[int, Dict] = {}  # 存储所有相机实例和信息self._last_error: str = ""self._running = {}  # 每个相机的运行状态self._lock = threading.Lock()self._display_threads = {}  # 每个相机的显示线程self._fps = {}  # 每个相机的FPS@propertydef last_error(self) -> str:"""获取最后一次错误信息"""return self._last_errordef _log_error(self, operation: str, ret: int) -> None:"""记录错误日志"""self._last_error = f"{operation}失败,错误码: 0x{ret:x}"print(self._last_error)def enumerate_devices(self) -> Optional[List[dict]]:"""枚举所有可用设备"""try:# 设置要枚举的设备类型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化设备列表结构体device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 创建临时相机实例用于枚举temp_cam = MvCamera()# 枚举设备ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚举设备", ret)return None# 检查找到的设备数量if device_list.nDeviceNum == 0:print("未检测到任何相机设备")return []devices = []for i in range(device_list.nDeviceNum):# 获取设备信息指针device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根据传输层类型处理设备信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE设备device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB设备# 修正USB设备信息获取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函数获取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚举设备时发生异常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的错误堆栈return Nonedef connect_camera(self, device_index: int) -> bool:"""连接指定索引的相机设备"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相机 {device_index} 已连接")return True# 枚举设备tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 实例化相机cam = MvCamera()# 枚举设备ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚举设备", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何设备"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"设备索引超出范围,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 选择指定设备stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 创建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("创建句柄", ret)return False# 获取设备信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在连接设备 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打开相机ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特别处理USB相机连接问题if stDeviceList.nTLayerType == MV_USB_DEVICE:# 尝试设置USB传输大小(海康USB相机常见问题)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打开设备", ret)return False# 配置相机参数if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 开始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("开始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存储相机信息 - 确保所有必要字段都正确设置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 确保连接状态正确设置为True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS计数器self._fps[device_index] = 0print(f"相机 {device_index} 连接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"连接相机时发生异常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return Falsedef _configure_camera(self, cam: MvCamera) -> bool:"""配置相机参数"""try:# 设置触发方式为连续采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("设置触发模式", ret)return False# 设置曝光模式match EXPOSURE_MODE:case 0:  # 手动设置参数ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 关闭自动曝光设置失败,将采用自动曝光")# 设置曝光时间exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光设置失败,将继续使用手动曝光")case 2:  # 自动曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自动曝光设置失败,将继续使用手动曝光")# 设置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手动增益设置失败,将采用自动增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 设置水平翻转flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 设置垂直翻转flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相机时发生异常: {str(e)}"print(self._last_error)return Falsedef get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""获取指定相机的图像并返回原始图像和灰度图像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相机 {device_index} 未连接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化帧输出结构stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 获取图像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相机 {device_index} 获取图像", ret)return None# 获取图像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印调试信息# print(f"相机 {device_index} 图像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 复制图像数据data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 转换为numpy数组temp = np.frombuffer(data_buf, dtype=np.uint8)# 获取图像参数width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根据像素类型处理图像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相机 {device_index} 图像处理失败 - 数据大小: {len(temp)}, "f"预期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 转换为灰度图像if len(img.shape) == 2:  # 已经是灰度图像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 释放图像缓存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新帧统计信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相机 {device_index} 获取图像时发生异常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return Nonedef _process_image_data(self, data: np.ndarray, width: int, height: int, pixel_type: int) -> Optional[np.ndarray]:"""根据像素类型处理原始图像数据"""try:if PacketSizeLog:# 首先检查数据大小是否匹配预期expected_size = width * heightif pixel_type in [PixelType_Gvsp_Mono8, PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed]:if pixel_type == PixelType_Gvsp_Mono8:expected_size = width * heightelse:expected_size = width * height * 3if len(data) != expected_size:print(f"警告: 数据大小不匹配 (预期: {expected_size}, 实际: {len(data)}), 尝试自动处理")# 尝试自动计算正确的高度if pixel_type == PixelType_Gvsp_Mono8:actual_height = len(data) // widthif actual_height * width == len(data):return data.reshape((actual_height, width))else:actual_height = len(data) // (width * 3)if actual_height * width * 3 == len(data):img = data.reshape((actual_height, width, 3))if pixel_type == PixelType_Gvsp_RGB8_Packed:return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)return imgreturn None# 正常处理流程if pixel_type == PixelType_Gvsp_Mono8:return data.reshape((height, width))elif pixel_type == PixelType_Gvsp_RGB8_Packed:img = data.reshape((height, width, 3))return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)elif pixel_type == PixelType_Gvsp_BGR8_Packed:return data.reshape((height, width, 3))elif pixel_type in [PixelType_Gvsp_Mono10, PixelType_Gvsp_Mono12]:# 对于10位或12位图像,需要进行位转换img = data.view(np.uint16)img = (img >> (pixel_type - PixelType_Gvsp_Mono8)).astype(np.uint8)return img.reshape((height, width))else:self._last_error = f"不支持的像素格式: {pixel_type}"print(self._last_error)return Noneexcept Exception as e:self._last_error = f"图像处理错误: {str(e)}"if PacketSizeLog:print(self._last_error)return Nonedef disconnect_camera(self, device_index: int) -> bool:"""断开指定相机的连接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相机 {device_index} 停止取流", ret)success = False# 关闭设备ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相机 {device_index} 关闭设备", ret)success = False# 销毁句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相机 {device_index} 销毁句柄", ret)success = Falseif success:print(f"相机 {device_index} 已成功断开连接")self.cameras[device_index]['connected'] = False# 从字典中移除相机del self.cameras[device_index]# 停止显示线程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"断开相机 {device_index} 连接时发生异常: {str(e)}"print(self._last_error)return False
###########################图像视频流显示部分################################################def start_display(self,device_index: int) -> bool:"""启动所有已连接相机的实时显示"""with self._lock:  # 添加线程锁# 检查相机是否已连接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相机 {device_index} 未连接,无法启动显示")return Falseif device_index in self._running and self._running[device_index]:print(f"相机 {device_index} 显示已启动")return True# 设置运行标志self._running[device_index] = True# 创建并启动显示线程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相机 {device_index} 显示线程已启动")return Truedef stop_display(self, device_index: int = None) -> None:"""停止指定相机的显示或停止所有相机显示"""if device_index is None:# 停止所有显示for idx in list(self._running.keys()):self._running[idx] = Falsefor idx, thread in self._display_threads.items():if thread.is_alive():thread.join()self._display_threads.clear()cv2.destroyAllWindows()else:# 停止指定相机显示if device_index in self._running:self._running[device_index] = Falseif device_index in self._display_threads:if self._display_threads[device_index].is_alive():self._display_threads[device_index].join()del self._display_threads[device_index]cv2.destroyWindow(f"Camera {device_index}")def _display_thread(self, device_index: int) -> None:"""单个相机的显示线程"""frame_count = 0last_time = time.time()window_name = f"Camera {device_index}"def _window_exists(window_name):"""检查OpenCV窗口是否存在"""try:return cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 0except:return Falsetry:while self._running.get(device_index, False):try:# 获取图像result = self.get_image(device_index)if result is None:if PacketSizeLog:print(f"相机 {device_index} 获取图像超时")time.sleep(0.1)continueimg, _ = result# 计算FPSframe_count += 1current_time = time.time()if current_time - last_time >= 1.0:self._fps[device_index] = frame_count / (current_time - last_time)frame_count = 0last_time = current_time# 在图像上显示信息info = f"Cam {device_index} | {self.cameras[device_index]['model']} | FPS: {self._fps[device_index]:.1f}"cv2.putText(img, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)resized_image_by_scale = cv2.resize(img, None, fx=scale_width, fy=scale_height)# 显示图像cv2.imshow(window_name, resized_image_by_scale)# 检查按键key = cv2.waitKey(1) & 0xFFif key == 27:  # ESC键退出self._running[device_index] = Falsebreakexcept Exception as e:print(f"相机 {device_index} 显示线程异常: {str(e)}")time.sleep(0.1)finally:# 线程结束时清理if _window_exists(window_name):cv2.destroyWindow(window_name)print(f"相机 {device_index} 显示线程已停止")def disconnect_all(self) -> None:"""断开所有相机的连接"""self.stop_display()  # 先停止所有显示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)def __del__(self):"""析构函数,确保资源释放"""self.disconnect_all()def main():# 创建相机管理器cam_manager = HKCameraManager()# 枚举设备devices = cam_manager.enumerate_devices()if not devices:print("未找到任何相机设备")returnprint("找到以下相机设备:")for i, dev in enumerate(devices):# 根据设备类型显示不同信息if dev['type'] == 'GigE':print(f"{i}: {dev['model']} (SN: {dev['serial']}, IP: {dev['ip']})")else:  # USB设备print(f"{i}: {dev['model']} (SN: {dev['serial']}, Type: USB)")# 先连接所有相机for i in range(len(devices)):if not cam_manager.connect_camera(i):print(f"无法连接相机 {i}")continue  # 即使一个相机连接失败,也继续尝试其他相机# 确认连接状态后再启动显示for i in range(len(devices)):if i in cam_manager.cameras and cam_manager.cameras[i]['connected']:cam_manager.start_display(i)try:# 主线程等待while any(cam_manager._running.values()):time.sleep(0.1)except KeyboardInterrupt:print("用户中断...")finally:# 清理资源cam_manager.disconnect_all()print("程序退出")if __name__ == "__main__":main()

5. 写在最后

目前程序还有一些未增加的功能,后续会增加补充

  1. 相机参数动态调整功能
  2. 图像保存功能
  3. 支持更多像素格式
  4. 网络相机重连机制
  5. 日志系统替代print输出

相关文章:

  • 美团NoCode设计网站的尝试经验分享
  • 打卡第42天:简单CNN
  • 游戏日志统计操作次数前三的用户
  • Linux日志分割压缩实战指南
  • 手写RPC框架<四> 负载均衡
  • 不同厂商保障UEFI/BIOS安全的技术与机制详解
  • 界面控件DevExpress WPF v24.2新版亮点:报表等组件功能升级
  • thinkphp 一个系统在同一个域名下,一个文件夹下如何区分多站点——穷人的精致规划——仙盟创梦IDE
  • MyBatis实战指南(六)自动映射
  • 债券与股票:投资市场的两大基石
  • 用 OpenSSL 库实现 3DES(三重DES)加密
  • SSL错误无法建立安全连接
  • 三数之和-力扣
  • Koji构建系统宏定义注入与Tag体系解析
  • Bright Data网页抓取工具实战:BOSS直聘爬虫 + PandasAI分析洞察前端岗位市场趋势
  • 西安java面经1
  • Node.js Conf 配置库要点分析 和 使用注意事项
  • 云原生安全实践:CI/CD流水线集成DAST工具
  • 【Lua热更新知识】学习一 Lua语法学习
  • Delphi 获取 XP系统 mac地址
  • 偃师做网站/免费换友情链接
  • 网站pc端和手机端分离怎么做/排名公式
  • 300m空间够用吗 wordpress/百度竞价优化软件
  • python做网站显示表格/网络营销推广专家
  • 郑州 网站建设 东区/网店网络推广方案
  • 办公家具网站模版/中国足彩网竞彩推荐