将你的旧手机变成监控摄像头(Python + OpenCV)
目录
- 将你的旧手机变成监控摄像头(Python + OpenCV)
- 1. 引言
- 2. 系统架构与工作原理
- 2.1 整体系统架构
- 2.2 视频流传输原理
- 3. 环境配置与依赖安装
- 3.1 手机端配置
- 3.1.1 Android手机配置
- 3.1.2 iPhone手机配置
- 3.2 PC端环境配置
- 3.2.1 安装Python环境
- 3.2.2 验证安装
- 4. 基础视频流捕获
- 4.1 简单的视频流捕获程序
- 4.2 支持多种流格式的增强版捕获器
- 5. 运动检测功能实现
- 5.1 基于帧差分的运动检测
- 6. 高级功能:人脸识别与物体检测
- 6.1 人脸识别集成
- 7. 完整监控系统实现
- 8. 性能优化与故障排除
- 8.1 性能优化技巧
- 8.2 常见问题与解决方案
- 9. 安全与隐私考虑
- 10. 扩展功能与未来改进
- 10.1 可能的扩展功能
- 10.2 技术改进方向
- 11. 总结
『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网
将你的旧手机变成监控摄像头(Python + OpenCV)
1. 引言
在科技快速发展的今天,我们手中往往堆积着不少被淘汰的旧手机。这些设备虽然无法跟上最新款手机的性能,但它们仍然具备完好的摄像头、处理器和网络连接功能。据统计,全球每年有超过1.5亿部智能手机被闲置或丢弃,这不仅是资源的浪费,也对环境造成了压力。
与此同时,家庭和办公场所对安防监控的需求日益增长。传统的监控摄像头价格昂贵,安装复杂,而且可能存在隐私泄露的风险。利用旧手机搭建监控系统,不仅成本低廉,还能充分发挥闲置设备的剩余价值。
本文将详细介绍如何使用Python和OpenCV将旧手机改造成功能完善的监控摄像头,实现实时监控、运动检测、人脸识别、自动录像等高级功能。这个方案具有以下优势:
- 成本极低:利用闲置设备,无需额外硬件投资
- 灵活性强:可根据需求自定义各种监控功能
- 隐私安全:数据存储在本地,避免云端隐私泄露风险
- 易于扩展:基于Python生态系统,方便添加新功能
2. 系统架构与工作原理
2.1 整体系统架构
整个监控系统由三个主要部分组成:手机端视频流服务器、PC端处理程序、以及可选的云端通知服务。
2.2 视频流传输原理
手机摄像头视频流通过IP摄像头应用转换成RTSP或HTTP流,PC端使用OpenCV捕获这些视频流并进行处理。整个过程基于客户端-服务器架构:
- 手机端:运行IP摄像头应用,将摄像头数据编码为H.264/H.265格式
- 网络传输:通过WiFi传输视频流数据
- PC端:接收并解码视频流,应用计算机视觉算法进行分析
视频流的传输可以使用以下公式表示:
Video Stream=∑t=0T(Encode(Framet)+Transmit(Packett)+Decode(Datat))\text{Video Stream} = \sum_{t=0}^{T} \left( \text{Encode}( \text{Frame}_t ) + \text{Transmit}( \text{Packet}_t ) + \text{Decode}( \text{Data}_t ) \right) Video Stream=t=0∑T(Encode(Framet)+Transmit(Packett)+Decode(Datat))
其中每个帧的处理时间为:
tprocessing=tcapture+tprocess+tdisplayt_{\text{processing}} = t_{\text{capture}} + t_{\text{process}} + t_{\text{display}} tprocessing=tcapture+tprocess+tdisplay
3. 环境配置与依赖安装
3.1 手机端配置
3.1.1 Android手机配置
-
安装IP摄像头应用
- 推荐应用:IP Webcam(免费,功能丰富)
- 替代方案:DroidCam、Alfred Camera
-
配置步骤:
- 下载并安装IP Webcam应用
- 打开应用,向下滚动到"服务器"部分
- 点击"启动服务器"按钮
- 记下显示的IP地址和端口号(通常是
http://192.168.x.x:8080)
-
高级设置:
- 视频质量:建议设置为720p以平衡质量与性能
- 帧率:15-30fps
- 音频:根据需要开启或关闭
3.1.2 iPhone手机配置
- 安装应用:使用iVCam或EpocCam
- 确保手机和电脑在同一WiFi网络下
- 启动应用并记下连接信息
3.2 PC端环境配置
3.2.1 安装Python环境
# 创建虚拟环境(推荐)
python -m venv surveillance_env
source surveillance_env/bin/activate # Linux/Mac
# 或
surveillance_env\Scripts\activate # Windows# 安装核心依赖
pip install opencv-python
pip install numpy
pip install pillow
pip install requests
pip install smtplib # 用于邮件通知
pip install twilio # 用于短信通知(可选)
3.2.2 验证安装
创建测试脚本验证环境配置:
# test_environment.py
import cv2
import numpy as np
import sysdef test_environment():"""测试环境配置是否正常"""print("Python版本:", sys.version)print("OpenCV版本:", cv2.__version__)print("NumPy版本:", np.__version__)# 测试OpenCV基本功能try:# 创建测试图像test_image = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)# 测试图像处理gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY)blurred = cv2.GaussianBlur(gray, (5, 5), 0)print("✓ OpenCV图像处理功能正常")print("✓ NumPy数组操作正常")print("环境测试通过!")except Exception as e:print(f"环境测试失败: {e}")if __name__ == "__main__":test_environment()
4. 基础视频流捕获
4.1 简单的视频流捕获程序
让我们从最基本的视频流捕获开始,这是一个验证连接和基础功能的关键步骤。
# basic_stream.py
import cv2
import numpy as np
import timeclass BasicCameraStream:"""基础摄像头流捕获类"""def __init__(self, stream_url):"""初始化摄像头流参数:stream_url (str): 视频流URL"""self.stream_url = stream_urlself.cap = Noneself.is_connected = Falsedef connect(self, timeout=30):"""连接到视频流参数:timeout (int): 连接超时时间(秒)返回:bool: 连接是否成功"""print(f"尝试连接到: {self.stream_url}")self.cap = cv2.VideoCapture(self.stream_url)start_time = time.time()while not self.is_connected and (time.time() - start_time) < timeout:ret, frame = self.cap.read()if ret and frame is not None:self.is_connected = Trueprint("连接成功!")breaktime.sleep(0.1)return self.is_connecteddef read_frame(self):"""读取一帧图像返回:tuple: (success, frame)"""if not self.is_connected:return False, Noneret, frame = self.cap.read()return ret, framedef display_stream(self, window_name="监控画面"):"""显示实时视频流参数:window_name (str): 窗口名称"""if not self.connect():print("连接失败,请检查URL和网络连接")returnprint("按 'q' 键退出显示")frame_count = 0start_time = time.time()while True:ret, frame = self.read_frame()if not ret:print("读取帧失败")break# 计算并显示FPSframe_count += 1elapsed_time = time.time() - start_timeif elapsed_time > 0:fps = frame_count / elapsed_timecv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)# 显示帧cv2.imshow(window_name, frame)# 按'q'退出if cv2.waitKey(1) & 0xFF == ord('q'):breakself.release()cv2.destroyAllWindows()def release(self):"""释放资源"""if self.cap is not None:self.cap.release()self.is_connected = False# 使用示例
if __name__ == "__main__":# 常见的视频流URL格式stream_urls = [# IP Webcam默认URL"http://192.168.1.100:8080/video",# MJPEG流"http://192.168.1.100:8080/videofeed",# RTSP流(某些应用使用)"rtsp://192.168.1.100:8080/h264_ulaw.sdp"]# 替换为你的手机IP和端口your_phone_ip = "192.168.1.100" # 修改为实际IPstream_url = f"http://{your_phone_ip}:8080/video"stream = BasicCameraStream(stream_url)stream.display_stream()
4.2 支持多种流格式的增强版捕获器
# enhanced_stream.py
import cv2
import time
import threading
from queue import Queue
import urllib.request
import urllib.errorclass EnhancedCameraStream:"""增强版摄像头流捕获类,支持多种协议和自动重连"""def __init__(self, stream_url, buffer_size=128, timeout=10):"""初始化增强摄像头流参数:stream_url (str): 视频流URLbuffer_size (int): 帧缓冲区大小timeout (int): 连接超时时间(秒)"""self.stream_url = stream_urlself.buffer_size = buffer_sizeself.timeout = timeoutself.frame_queue = Queue(maxsize=buffer_size)self.running = Falseself.thread = Noneself.current_frame = Noneself.frame_count = 0self.last_frame_time = 0self.fps = 0def start(self):"""开始捕获视频流"""if self.running:print("视频流已经在运行中")returnself.running = Trueself.thread = threading.Thread(target=self._capture_frames)self.thread.daemon = Trueself.thread.start()print("视频流捕获已启动")def stop(self):"""停止捕获视频流"""self.running = Falseif self.thread is not None:self.thread.join(timeout=5)print("视频流捕获已停止")def _capture_frames(self):"""在单独线程中捕获帧"""cap = cv2.VideoCapture(self.stream_url)# 设置缓冲大小以减少延迟cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)last_success_time = time.time()while self.running:ret, frame = cap.read()if ret and frame is not None:# 更新成功时间last_success_time = time.time()# 计算FPScurrent_time = time.time()if self.last_frame_time > 0:self.fps = 1.0 / (current_time - self.last_frame_time)self.last_frame_time = current_time# 更新当前帧self.current_frame = frame.copy()self.frame_count += 1# 将帧放入队列(如果队列已满,移除最旧的帧)if self.frame_queue.full():try:self.frame_queue.get_nowait()except:passself.frame_queue.put(frame)else:# 检查是否需要重连if time.time() - last_success_time > self.timeout:print("视频流中断,尝试重新连接...")cap.release()time.sleep(2)cap = cv2.VideoCapture(self.stream_url)last_success_time = time.time()time.sleep(0.001) # 小延迟以避免过度占用CPUcap.release()def read(self):"""读取当前帧返回:tuple: (success, frame)"""if self.current_frame is None:return False, Nonereturn True, self.current_frame.copy()def get_frame_from_queue(self, timeout=1.0):"""从队列获取帧参数:timeout (float): 超时时间返回:frame or None: 获取到的帧"""try:return self.frame_queue.get(timeout=timeout)except:return Nonedef is_connected(self):"""检查是否连接成功"""return self.current_frame is not None and self.fps > 0def get_status(self):"""获取流状态"""return {'connected': self.is_connected(),'fps': self.fps,'frame_count': self.frame_count,'queue_size': self.frame_queue.qsize()}def test_connection(ip_address, port=8080):"""测试与手机摄像头的连接参数:ip_address (str): 手机IP地址port (int): 端口号返回:bool: 连接是否成功"""test_urls = [f"http://{ip_address}:{port}/video",f"http://{ip_address}:{port}/videofeed", f"http://{ip_address}:{port}"]for url in test_urls:try:response = urllib.request.urlopen(url, timeout=5)if response.getcode() == 200:print(f"✓ 连接成功: {url}")return urlexcept urllib.error.URLError:continueexcept Exception as e:continueprint("✗ 所有连接尝试都失败")return None# 使用示例
if __name__ == "__main__":# 测试连接phone_ip = "192.168.1.100" # 替换为实际IPworking_url = test_connection(phone_ip)if working_url:stream = EnhancedCameraStream(working_url)stream.start()# 等待连接建立time.sleep(3)try:while True:success, frame = stream.read()if success:status = stream.get_status()cv2.putText(frame, f"FPS: {status['fps']:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)cv2.putText(frame, f"Frames: {status['frame_count']}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)cv2.imshow("Enhanced Stream", frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakfinally:stream.stop()cv2.destroyAllWindows()
5. 运动检测功能实现
运动检测是监控系统的核心功能,它能够在画面发生变化时自动触发录像或报警。
5.1 基于帧差分的运动检测
# motion_detector.py
import cv2
import numpy as np
import time
from datetime import datetime
import osclass MotionDetector:"""运动检测器类"""def __init__(self, min_area=500, threshold=25, blur_kernel=(5, 5)):"""初始化运动检测器参数:min_area (int): 最小运动区域面积(像素)threshold (int): 二值化阈值blur_kernel (tuple): 高斯模糊核大小"""self.min_area = min_areaself.threshold = thresholdself.blur_kernel = blur_kernel# 状态变量self.previous_frame = Noneself.motion_detected = Falseself.motion_start_time = Noneself.motion_counter = 0def preprocess_frame(self, frame):"""预处理帧参数:frame: 输入帧返回:处理后的灰度帧"""# 转换为灰度图gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)# 应用高斯模糊减少噪声blurred = cv2.GaussianBlur(gray, self.blur_kernel, 0)return blurreddef detect_motion(self, current_frame):"""检测运动参数:current_frame: 当前帧返回:tuple: (has_motion, contours, processed_frame)"""# 预处理当前帧processed_frame = self.preprocess_frame(current_frame)# 如果没有前一帧,初始化并返回无运动if self.previous_frame is None:self.previous_frame = processed_framereturn False, [], processed_frame# 计算当前帧与前一帧的绝对差frame_delta = cv2.absdiff(self.previous_frame, processed_frame)# 二值化差分图像thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)[1]# 膨胀二值图像以填充孔洞thresh = cv2.dilate(thresh, None, iterations=2)# 查找轮廓contours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)# 过滤小轮廓significant_contours = []motion_detected = Falsefor contour in contours:if cv2.contourArea(contour) > self.min_area:significant_contours.append(contour)motion_detected = True# 更新前一帧self.previous_frame = processed_frame# 更新运动状态self.update_motion_status(motion_detected)return motion_detected, significant_contours, processed_framedef update_motion_status(self, current_motion):"""更新运动状态计数器"""if current_motion:self.motion_counter += 1if self.motion_counter >= 3: # 连续3帧检测到运动才确认self.motion_detected = Trueif self.motion_start_time is None:self.motion_start_time = time.time()else:self.motion_counter = max(0, self.motion_counter - 1)if self.motion_counter == 0:self.motion_detected = Falseself.motion_start_time = Nonedef draw_motion_areas(self, frame, contours):"""在帧上绘制运动区域参数:frame: 原始帧contours: 运动轮廓列表返回:绘制了运动区域的帧"""output_frame = frame.copy()for contour in contours:# 计算边界框(x, y, w, h) = cv2.boundingRect(contour)# 绘制边界框cv2.rectangle(output_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)# 添加标签cv2.putText(output_frame, "Motion", (x, y - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)return output_framedef get_motion_duration(self):"""获取运动持续时间(秒)"""if self.motion_start_time is not None:return time.time() - self.motion_start_timereturn 0class MotionRecordingSystem:"""运动触发的录像系统"""def __init__(self, output_dir="recordings", pre_motion_buffer=30, post_motion_buffer=30):"""初始化录像系统参数:output_dir (str): 录像保存目录pre_motion_buffer (int): 运动前缓冲帧数post_motion_buffer (int): 运动后缓冲帧数"""self.output_dir = output_dirself.pre_motion_buffer = pre_motion_bufferself.post_motion_buffer = post_motion_buffer# 创建输出目录os.makedirs(output_dir, exist_ok=True)# 状态变量self.is_recording = Falseself.frame_buffer = []self.video_writer = Noneself.recording_start_time = Nonedef start_recording(self, frame, fps=20.0):"""开始录像"""if self.is_recording:return# 生成文件名timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")filename = os.path.join(self.output_dir, f"motion_{timestamp}.avi")# 获取帧尺寸height, width = frame.shape[:2]# 初始化视频写入器fourcc = cv2.VideoWriter_fourcc(*'XVID')self.video_writer = cv2.VideoWriter(filename, fourcc, fps, (width, height))# 写入缓冲帧for buffered_frame in self.frame_buffer:self.video_writer.write(buffered_frame)self.is_recording = Trueself.recording_start_time = time.time()print(f"开始录像: {filename}")def stop_recording(self):"""停止录像"""if self.is_recording and self.video_writer is not None:self.video_writer.release()self.video_writer = Noneself.is_recording = Falseduration = time.time() - self.recording_start_timeprint(f"停止录像,时长: {duration:.2f}秒")def process_frame(self, frame, motion_detected):"""处理帧并管理录像参数:frame: 当前帧motion_detected (bool): 是否检测到运动"""# 维护帧缓冲区self.frame_buffer.append(frame.copy())if len(self.frame_buffer) > self.pre_motion_buffer:self.frame_buffer.pop(0)# 录像逻辑if motion_detected:if not self.is_recording:self.start_recording(frame)# 写入当前帧if self.video_writer is not None:self.video_writer.write(frame)elif self.is_recording:# 运动结束,检查是否需要停止录像if len(self.frame_buffer) >= self.post_motion_buffer:self.stop_recording()# 运动检测演示
def demo_motion_detection(stream_url):"""运动检测演示函数"""# 初始化组件stream = EnhancedCameraStream(stream_url)detector = MotionDetector(min_area=1000)recorder = MotionRecordingSystem()stream.start()print("运动检测系统启动")print("按 'q' 退出,按 'r' 重置背景帧")try:while True:success, frame = stream.read()if not success:time.sleep(0.1)continue# 检测运动motion_detected, contours, processed_frame = detector.detect_motion(frame)# 绘制运动区域if motion_detected:frame = detector.draw_motion_areas(frame, contours)# 显示运动信息duration = detector.get_motion_duration()cv2.putText(frame, f"Motion: {duration:.1f}s", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)# 处理录像recorder.process_frame(frame, motion_detected)# 显示状态信息status = "RECORDING" if recorder.is_recording else "Monitoring"cv2.putText(frame, f"Status: {status}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)# 显示帧cv2.imshow("Motion Detection", frame)# 键盘输入处理key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('r'):detector.previous_frame = Noneprint("背景帧已重置")finally:if recorder.is_recording:recorder.stop_recording()stream.stop()cv2.destroyAllWindows()if __name__ == "__main__":# 使用示例stream_url = "http://192.168.1.100:8080/video" # 替换为实际URLdemo_motion_detection(stream_url)
6. 高级功能:人脸识别与物体检测
6.1 人脸识别集成
# face_detection.py
import cv2
import numpy as np
import os
import timeclass FaceDetector:"""人脸检测器类"""def __init__(self, model_path=None, confidence_threshold=0.5):"""初始化人脸检测器参数:model_path (str): 模型文件路径confidence_threshold (float): 置信度阈值"""self.confidence_threshold = confidence_threshold# 加载人脸检测模型if model_path and os.path.exists(model_path):self.net = cv2.dnn.readNetFromTensorflow(model_path)self.model_loaded = Trueelse:# 使用OpenCV内置的Haar级联分类器self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')self.model_loaded = Falsedef detect_faces(self, frame):"""检测人脸参数:frame: 输入帧返回:list: 人脸边界框列表 [(x, y, w, h), ...]"""if self.model_loaded:return self._detect_faces_dnn(frame)else:return self._detect_faces_haar(frame)def _detect_faces_haar(self, frame):"""使用Haar级联检测人脸"""gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)# 检测人脸faces = self.face_cascade.detectMultiScale(gray,scaleFactor=1.1,minNeighbors=5,minSize=(30, 30),flags=cv2.CASCADE_SCALE_IMAGE)return facesdef _detect_faces_dnn(self, frame):"""使用DNN模型检测人脸"""(h, w) = frame.shape[:2]# 构建blobblob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300),(104.0, 177.0, 123.0))# 通过网络前向传播self.net.setInput(blob)detections = self.net.forward()faces = []# 遍历检测结果for i in range(0, detections.shape[2]):confidence = detections[0, 0, i, 2]# 过滤弱检测if confidence > self.confidence_threshold:# 计算边界框坐标box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])(startX, startY, endX, endY) = box.astype("int")# 确保边界框在图像尺寸内startX = max(0, startX)startY = max(0, startY)endX = min(w, endX)endY = min(h, endY)faces.append((startX, startY, endX - startX, endY - startY))return facesdef draw_faces(self, frame, faces):"""在帧上绘制人脸边界框参数:frame: 原始帧faces: 人脸边界框列表返回:绘制了人脸边界框的帧"""output_frame = frame.copy()for (x, y, w, h) in faces:# 绘制边界框cv2.rectangle(output_frame, (x, y), (x + w, y + h), (255, 0, 0), 2)# 添加标签cv2.putText(output_frame, "Face", (x, y - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)return output_frameclass AdvancedSurveillanceSystem:"""高级监控系统:集成运动检测和人脸识别"""def __init__(self, stream_url):"""初始化高级监控系统参数:stream_url (str): 视频流URL"""self.stream_url = stream_url# 初始化各个组件self.stream = EnhancedCameraStream(stream_url)self.motion_detector = MotionDetector(min_area=800)self.face_detector = FaceDetector()self.recorder = MotionRecordingSystem()# 统计信息self.stats = {'total_frames': 0,'motion_events': 0,'face_detections': 0,'start_time': time.time()}def start(self):"""启动监控系统"""self.stream.start()print("高级监控系统启动")try:while True:success, frame = self.stream.read()if not success:time.sleep(0.1)continueself.stats['total_frames'] += 1# 运动检测motion_detected, motion_contours, _ = self.motion_detector.detect_motion(frame)if motion_detected:self.stats['motion_events'] += 1frame = self.motion_detector.draw_motion_areas(frame, motion_contours)# 人脸检测(只在检测到运动时进行,以节省计算资源)faces = []if motion_detected:faces = self.face_detector.detect_faces(frame)if faces:self.stats['face_detections'] += 1frame = self.face_detector.draw_faces(frame, faces)# 录像管理self.recorder.process_frame(frame, motion_detected)# 显示统计信息frame = self._draw_statistics(frame)# 显示帧cv2.imshow("Advanced Surveillance", frame)# 键盘控制key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('r'):self.motion_detector.previous_frame = Noneprint("背景帧已重置")elif key == ord('s'):self._save_snapshot(frame)finally:self.stop()def _draw_statistics(self, frame):"""在帧上绘制统计信息"""# 计算运行时间run_time = time.time() - self.stats['start_time']fps = self.stats['total_frames'] / run_time if run_time > 0 else 0# 绘制统计信息stats_text = [f"FPS: {fps:.1f}",f"Motion Events: {self.stats['motion_events']}",f"Face Detections: {self.stats['face_detections']}",f"Run Time: {run_time:.0f}s"]for i, text in enumerate(stats_text):y_position = 30 + i * 25cv2.putText(frame, text, (10, y_position), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)# 录像状态status = "RECORDING" if self.recorder.is_recording else "MONITORING"color = (0, 0, 255) if self.recorder.is_recording else (0, 255, 0)cv2.putText(frame, f"Status: {status}", (frame.shape[1] - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)return framedef _save_snapshot(self, frame):"""保存快照"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")filename = os.path.join(self.recorder.output_dir, f"snapshot_{timestamp}.jpg")cv2.imwrite(filename, frame)print(f"快照已保存: {filename}")def stop(self):"""停止监控系统"""if self.recorder.is_recording:self.recorder.stop_recording()self.stream.stop()cv2.destroyAllWindows()# 打印最终统计run_time = time.time() - self.stats['start_time']print(f"\n监控系统运行统计:")print(f"总运行时间: {run_time:.0f}秒")print(f"处理帧数: {self.stats['total_frames']}")print(f"运动事件: {self.stats['motion_events']}")print(f"人脸检测: {self.stats['face_detections']}")# 使用示例
if __name__ == "__main__":stream_url = "http://192.168.1.100:8080/video" # 替换为实际URLsystem = AdvancedSurveillanceSystem(stream_url)system.start()
7. 完整监控系统实现
下面是一个完整的监控系统实现,集成了所有功能并提供用户友好的界面。
# complete_surveillance_system.py
import cv2
import numpy as np
import time
import threading
import os
import json
from datetime import datetime
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from email.mime.base import MimeBase
from email import encodersclass CompleteSurveillanceSystem:"""完整的监控系统"""def __init__(self, config_file="config.json"):"""初始化完整监控系统参数:config_file (str): 配置文件路径"""self.load_config(config_file)self.initialize_components()self.running = Falsedef load_config(self, config_file):"""加载配置文件"""default_config = {"stream_url": "http://192.168.1.100:8080/video","output_dir": "surveillance_recordings","min_motion_area": 800,"motion_threshold": 25,"face_detection": True,"recording": {"pre_buffer": 30,"post_buffer": 30,"fps": 20},"notifications": {"enabled": False,"email": {"smtp_server": "smtp.gmail.com","smtp_port": 587,"username": "your_email@gmail.com","password": "your_password","to_email": "recipient@gmail.com"}},"display": {"show_fps": True,"show_stats": True,"window_width": 800,"window_height": 600}}if os.path.exists(config_file):with open(config_file, 'r') as f:self.config = json.load(f)print("配置文件加载成功")else:self.config = default_configself.save_config(config_file)print("使用默认配置,请编辑 config.json 文件")def save_config(self, config_file):"""保存配置文件"""with open(config_file, 'w') as f:json.dump(self.config, f, indent=4)def initialize_components(self):"""初始化所有组件"""# 创建输出目录os.makedirs(self.config["output_dir"], exist_ok=True)# 初始化视频流self.stream = EnhancedCameraStream(self.config["stream_url"])# 初始化运动检测器self.motion_detector = MotionDetector(min_area=self.config["min_motion_area"],threshold=self.config["motion_threshold"])# 初始化人脸检测器if self.config["face_detection"]:self.face_detector = FaceDetector()else:self.face_detector = None# 初始化录像系统self.recorder = MotionRecordingSystem(output_dir=self.config["output_dir"],pre_motion_buffer=self.config["recording"]["pre_buffer"],post_motion_buffer=self.config["recording"]["post_buffer"])# 统计信息self.stats = {'start_time': time.time(),'total_frames': 0,'motion_events': 0,'face_detections': 0,'recordings': 0}# 事件日志self.event_log = []def send_notification(self, event_type, details):"""发送通知参数:event_type (str): 事件类型details (str): 事件详情"""if not self.config["notifications"]["enabled"]:returntry:email_config = self.config["notifications"]["email"]# 创建邮件msg = MimeMultipart()msg['From'] = email_config["username"]msg['To'] = email_config["to_email"]msg['Subject'] = f"Surveillance Alert: {event_type}"body = f"""Surveillance System AlertEvent Type: {event_type}Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}Details: {details}This is an automated message from your surveillance system."""msg.attach(MimeText(body, 'plain'))# 发送邮件server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])server.starttls()server.login(email_config["username"], email_config["password"])server.send_message(msg)server.quit()print(f"通知已发送: {event_type}")except Exception as e:print(f"发送通知失败: {e}")def log_event(self, event_type, details):"""记录事件参数:event_type (str): 事件类型details (str): 事件详情"""event = {'timestamp': datetime.now().isoformat(),'type': event_type,'details': details}self.event_log.append(event)# 保存到文件log_file = os.path.join(self.config["output_dir"], "events.json")with open(log_file, 'w') as f:json.dump(self.event_log, f, indent=2)def process_frame(self, frame):"""处理单帧"""self.stats['total_frames'] += 1# 运动检测motion_detected, motion_contours, _ = self.motion_detector.detect_motion(frame)if motion_detected:self.stats['motion_events'] += 1frame = self.motion_detector.draw_motion_areas(frame, motion_contours)# 记录运动事件if self.stats['motion_events'] % 10 == 1: # 避免过多记录self.log_event("motion", f"Motion detected with {len(motion_contours)} areas")# 人脸检测faces = []if self.face_detector and motion_detected:faces = self.face_detector.detect_faces(frame)if faces:self.stats['face_detections'] += 1frame = self.face_detector.draw_faces(frame, faces)# 发送人脸检测通知self.send_notification("Face Detected", f"{len(faces)} face(s) detected")self.log_event("face_detection", f"{len(faces)} face(s) detected")# 录像管理was_recording = self.recorder.is_recordingself.recorder.process_frame(frame, motion_detected)if self.recorder.is_recording and not was_recording:self.stats['recordings'] += 1self.send_notification("Recording Started", "Motion-triggered recording started")self.log_event("recording_start", "Motion-triggered recording")return frame, motion_detected, facesdef draw_overlay(self, frame, motion_detected, faces):"""在帧上绘制叠加信息"""overlay = frame.copy()# 显示统计信息if self.config["display"]["show_stats"]:run_time = time.time() - self.stats['start_time']fps = self.stats['total_frames'] / run_time if run_time > 0 else 0stats = [f"Time: {datetime.now().strftime('%H:%M:%S')}",f"FPS: {fps:.1f}",f"Motion: {self.stats['motion_events']}",f"Faces: {self.stats['face_detections']}",f"Recordings: {self.stats['recordings']}"]for i, text in enumerate(stats):y_pos = 30 + i * 25cv2.putText(overlay, text, (10, y_pos),cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)# 显示状态指示器status_color = (0, 0, 255) if motion_detected else (0, 255, 0)status_text = "ALERT" if motion_detected else "NORMAL"cv2.putText(overlay, status_text, (overlay.shape[1] - 120, 30),cv2.FONT_HERSHEY_SIMPLEX, 0.8, status_color, 2)# 录像状态if self.recorder.is_recording:cv2.putText(overlay, "REC", (overlay.shape[1] - 50, 70),cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)return overlaydef start(self):"""启动监控系统"""print("启动完整监控系统...")print("控制命令:")print(" q - 退出")print(" r - 重置背景帧")print(" s - 保存快照")print(" p - 暂停/继续")print(" n - 切换通知开关")self.stream.start()self.running = Truepaused = Falsetry:while self.running:if not paused:success, frame = self.stream.read()if success:# 处理帧processed_frame, motion_detected, faces = self.process_frame(frame)# 添加叠加信息display_frame = self.draw_overlay(processed_frame, motion_detected, faces)# 调整显示尺寸if (self.config["display"]["window_width"] > 0 and self.config["display"]["window_height"] > 0):display_frame = cv2.resize(display_frame,(self.config["display"]["window_width"], self.config["display"]["window_height"]))# 显示帧cv2.imshow("Complete Surveillance System", display_frame)# 键盘输入处理key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('r'):self.motion_detector.previous_frame = Noneprint("背景帧已重置")elif key == ord('s'):self._save_snapshot(frame if success else None)elif key == ord('p'):paused = not pausedprint("系统已暂停" if paused else "系统已继续")elif key == ord('n'):self.config["notifications"]["enabled"] = not self.config["notifications"]["enabled"]status = "启用" if self.config["notifications"]["enabled"] else "禁用"print(f"通知功能已{status}")time.sleep(0.01) # 小延迟以减少CPU占用except KeyboardInterrupt:print("系统被用户中断")finally:self.stop()def _save_snapshot(self, frame):"""保存快照"""if frame is not None:timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")filename = os.path.join(self.config["output_dir"], f"snapshot_{timestamp}.jpg")cv2.imwrite(filename, frame)print(f"快照已保存: {filename}")else:print("无法保存快照:无有效帧")def stop(self):"""停止监控系统"""self.running = Falseif self.recorder.is_recording:self.recorder.stop_recording()self.stream.stop()cv2.destroyAllWindows()# 生成最终报告self.generate_report()print("监控系统已停止")def generate_report(self):"""生成运行报告"""run_time = time.time() - self.stats['start_time']report = {'session_start': datetime.fromtimestamp(self.stats['start_time']).isoformat(),'session_end': datetime.now().isoformat(),'total_duration_seconds': run_time,'total_frames_processed': self.stats['total_frames'],'average_fps': self.stats['total_frames'] / run_time if run_time > 0 else 0,'motion_events': self.stats['motion_events'],'face_detections': self.stats['face_detections'],'recordings_made': self.stats['recordings'],'events_logged': len(self.event_log)}# 保存报告report_file = os.path.join(self.config["output_dir"], f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")with open(report_file, 'w') as f:json.dump(report, f, indent=2)print(f"运行报告已保存: {report_file}")return report# 主程序入口
if __name__ == "__main__":# 创建默认配置文件(如果不存在)if not os.path.exists("config.json"):system = CompleteSurveillanceSystem()print("请编辑 config.json 文件配置您的监控系统,然后重新运行程序。")else:system = CompleteSurveillanceSystem()system.start()
8. 性能优化与故障排除
8.1 性能优化技巧
-
调整视频流参数:
- 降低分辨率(720p通常足够)
- 减少帧率(15-20fps)
- 使用MJPEG编码而非H.264
-
优化处理流程:
- 只在检测到运动时进行人脸识别
- 使用多线程处理不同的任务
- 合理设置检测间隔
-
内存管理:
- 及时释放不再使用的资源
- 使用帧缓冲区限制内存使用
- 定期清理临时文件
8.2 常见问题与解决方案
问题1:视频流连接失败
- 检查手机和电脑是否在同一网络
- 验证IP地址和端口号
- 检查防火墙设置
问题2:高CPU使用率
- 降低处理分辨率
- 减少检测频率
- 使用硬件加速(如果可用)
问题3:误报太多
- 调整运动检测阈值
- 增加最小检测区域
- 使用更复杂的背景减除算法
问题4:延迟过高
- 使用有线网络连接
- 优化视频编码设置
- 减少处理流水线的复杂度
9. 安全与隐私考虑
在使用监控系统时,安全和隐私是至关重要的考虑因素:
-
网络安全:
- 使用WPA2/WPA3加密的WiFi网络
- 定期更改路由器密码
- 考虑使用VPN进行远程访问
-
数据保护:
- 录像文件本地存储,不上传至云端
- 加密敏感录像文件
- 定期清理旧录像
-
隐私合规:
- 只在私人财产范围内使用
- 告知家庭成员或访客监控的存在
- 遵守当地隐私法律法规
10. 扩展功能与未来改进
10.1 可能的扩展功能
- 远程访问:通过Web界面远程查看监控画面
- 云存储集成:将重要录像备份到云存储
- 智能分析:使用深度学习模型进行更准确的行为分析
- 多摄像头支持:同时监控多个位置的摄像头
- 移动应用:开发手机App接收实时通知
10.2 技术改进方向
-
算法优化:
- 使用YOLO等现代目标检测算法
- 实现行人重识别功能
- 添加异常行为检测
-
系统架构:
- 微服务架构便于扩展
- 容器化部署
- 分布式处理
11. 总结
本文详细介绍了如何将旧手机改造成功能完整的监控摄像头系统。通过Python和OpenCV,我们实现了:
- 基础视频流捕获:稳定地从手机摄像头获取视频流
- 运动检测:智能识别画面中的运动变化
- 人脸识别:检测和标记画面中的人脸
- 自动录像:运动触发的高效录像系统
- 通知系统:及时的事件通知机制
- 完整系统集成:所有功能集于一体的监控解决方案
这个方案不仅成本低廉,而且高度可定制,可以根据具体需求进行调整和扩展。旧手机因此获得了新的生命,成为了一个功能强大的安防设备。
随着技术的不断发展,我们还可以继续为这个系统添加更多智能功能,使其更加智能、高效和易用。希望本文能够为您提供一个良好的起点,开启您的DIY智能监控之旅。
注意:在实际部署监控系统时,请务必遵守当地法律法规,尊重他人隐私,并确保系统的安全性。
