python共享内存实际案例,传输opencv frame
主进程程序
send.py
import cv2
import numpy as np
from multiprocessing import shared_memory, resource_trackercap = cv2.VideoCapture(0)
if not cap.isOpened():print("无法打开 RTSP 流,请检查地址、网络连接或 GStreamer 配置。")
else:# 创建共享内存shm_name = 'shared_frame'frame_shape = (360, 640, 3) # 假设视频帧的大小为 480x640shm = shared_memory.SharedMemory(create=True, size=np.prod(frame_shape) * np.dtype(np.uint8).itemsize, name=shm_name)frame_buffer = np.ndarray(frame_shape, dtype=np.uint8, buffer=shm.buf)try:while True:ret, frame = cap.read()if ret:# 将frame写入共享内存frame = cv2.resize(frame, (640, 360))np.copyto(frame_buffer, frame)cv2.namedWindow("RTSP Stream", cv2.WINDOW_NORMAL)cv2.imshow("RTSP Stream", frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakelse:print("无法读取视频帧")breakfinally:cap.release()cv2.destroyAllWindows()shm.close()shm.unlink()
receive.py
import cv2
import numpy as np
from multiprocessing import shared_memory, resource_tracker# 共享内存的名称
shm_name = 'shared_frame'
frame_shape = (360, 640, 3) # 假设视频帧的大小为 480x640# 连接到共享内存
shm = shared_memory.SharedMemory(name=shm_name)
# 注销资源跟踪器,避免receive异常终止,关闭共享内存
resource_tracker.unregister(shm._name, 'shared_memory')
frame_buffer = np.ndarray(frame_shape, dtype=np.uint8, buffer=shm.buf)try:while True:# 从共享内存中读取frameframe = np.copy(frame_buffer)cv2.namedWindow("Shared Memory Frame", cv2.WINDOW_NORMAL)cv2.imshow("Shared Memory Frame", frame)if cv2.waitKey(1) & 0xFF == ord('q'):break
finally:cv2.destroyAllWindows()shm.close()
终端1启动
send.py
终端2启动 receive.py
ps:使用共享内存的方式,可以实现容器内和容器外程序的通讯,且延迟极低。通过该方式可以实现容器内做推理,容器外做GUI显示