JavaCV + Spring 实现高效 RTSP 视频流帧缓存与管理
基于FFmpeg的RTSP视频流缓存系统设计与实现
前言
在视频监控、直播等应用场景中,RTSP视频流的处理是一个常见需求。本文将介绍一个基于Spring Boot和FFmpeg的RTSP视频流缓存系统,该系统能够高效地管理多个RTSP流,提供实时帧数据访问,并具备完善的资源管理和内存控制机制。
系统架构概述
本系统采用组件化设计,通过@Component
注解将缓存组件集成到Spring容器中。核心功能包括:
- RTSP视频流的连接和帧抓取
- 最新帧数据的内存缓存
- 基于时间的自动清理机制
- 完善的资源管理和内存监控
核心技术栈
- Spring Boot - 应用框架
- JavaCV - Java版FFmpeg封装库
- FFmpeg - 视频处理核心库
- Lombok - 代码简化工具
- Java Concurrency - 多线程并发处理
关键设计特性
1. 多映射管理机制
系统采用多个ConcurrentHashMap来管理不同维度的数据:
// 缓存最新帧的字节流
private final Map<String, byte[]> cacheMap = new ConcurrentHashMap<>();
// 管理FFmpeg抓取器实例
private final Map<String, FFmpegFrameGrabber> grabberMap = new ConcurrentHashMap<>();
// 跟踪RTSP流运行状态
private final Map<String, Boolean> rtspIsRunning = new ConcurrentHashMap<>();
// 记录最后访问时间
private final Map<String, Instant> lastAccessTime = new ConcurrentHashMap<>();
// 记录流启动时间
private final Map<String, Instant> startTime = new ConcurrentHashMap<>();
// 管理帧转换器
private final Map<String, Java2DFrameConverter> converterMap = new ConcurrentHashMap<>();
2. 双重超时控制策略
系统实现了两种超时控制机制:
空闲超时:10秒无访问自动清理
最大运行时间:2小时强制重启,防止长时间运行导致的内存泄露
private static final long IDLE_TIMEOUT_MS = 10 * 1000; // 10秒空闲超时
private static final long MAX_RUNTIME_MS = 2 * 60 * 60 * 1000; // 2小时最大运行时间
3. 线程池配置优化
采用自定义ThreadPoolExecutor配置,针对视频流处理场景优化:
private volatile ExecutorService frameExecutor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new ThreadPoolExecutor.DiscardPolicy()
);
配置说明:
核心线程数:3个
最大线程数:6个
线程空闲时间:60秒
队列容量:100个任务
拒绝策略:丢弃策略(适合实时视频流)
核心功能实现
1. 系统初始化
@PostConstruct
public void init() {// 预加载FFmpeg native库try {org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avutil.class);org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avcodec.class);org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avformat.class);log.info("FFmpeg native 库加载成功");} catch (Exception e) {log.error("FFmpeg native 库加载失败", e);}// 启动定时清理任务cleanupExecutor.scheduleWithFixedDelay(this::cleanupExpiredStreams, 5, 5, TimeUnit.SECONDS);// 启动内存监控cleanupExecutor.scheduleWithFixedDelay(this::logMemoryUsage, 30, 30, TimeUnit.SECONDS);
}
2. RTSP流初始化系统对FFmpeg参数进行了针对性优化:
private void initGrabber(String rtspUrl) {FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);grabber.setOption("rtsp_transport", "tcp"); // 使用TCP传输grabber.setOption("stimeout", "5000000"); // 5秒连接超时grabber.setOption("max_delay", "2000000"); // 2秒最大延迟grabber.setOption("buffer_size", "1024"); // 减少缓冲区大小grabber.setFrameRate(8); // 8帧每秒grabber.setImageWidth(640); // 固定分辨率grabber.setImageHeight(360);grabber.start();
}
3. 帧处理循环采用高效的帧处理机制,确保资源及时释放:
private void processFrames(FFmpegFrameGrabber grabber, String rtspUrl, Java2DFrameConverter converter) {try {while (Boolean.TRUE.equals(rtspIsRunning.get(rtspUrl))) {Frame frame = null;BufferedImage image = null;try {frame = grabber.grabKeyFrame();if (frame == null || frame.image == null) {Thread.sleep(50); // 避免空转continue;}image = converter.convert(frame);if (image != null) {byte[] imageBytes = convertImageToBytes(image);if (imageBytes != null) {cacheMap.put(rtspUrl, imageBytes);}}} finally {// 确保资源释放if (image != null) {image.flush();image = null;}if (frame != null) {frame.close();frame = null;}}}} finally {// 清理所有相关资源cleanupResources(rtspUrl, converter, grabber);}
}
4. 智能清理机制
系统提供了基于时间的智能清理策略:
private void cleanupExpiredStreams() {Instant now = Instant.now();for (Map.Entry<String, Instant> entry : lastAccessTime.entrySet()) {String rtspUrl = entry.getKey();Instant lastAccess = entry.getValue();Instant start = startTime.get(rtspUrl);// 检查空闲超时boolean isIdleTimeout = now.toEpochMilli() - lastAccess.toEpochMilli() > IDLE_TIMEOUT_MS;// 检查最大运行时间boolean isMaxRuntimeExceeded = start != null &&now.toEpochMilli() - start.toEpochMilli() > MAX_RUNTIME_MS;if (isIdleTimeout || isMaxRuntimeExceeded) {shutdownStream(rtspUrl);}}
}
内存管理优化
1. 内存监控
系统集成了JVM内存监控功能:
private void logMemoryUsage() {MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();long heapUsed = memoryBean.getHeapMemoryUsage().getUsed() / 1024 / 1024;long heapMax = memoryBean.getHeapMemoryUsage().getMax() / 1024 / 1024;long nonHeapUsed = memoryBean.getNonHeapMemoryUsage().getUsed() / 1024 / 1024;log.info("内存使用情况 - 堆内存: {}/{}MB, 非堆内存: {}MB, 活跃RTSP流: {}",heapUsed, heapMax, nonHeapUsed, rtspIsRunning.size());
}
2. 图像压缩优化
采用JPEG格式压缩,平衡图像质量和内存占用:
private byte[] convertImageToBytes(BufferedImage image) {if (image == null) return null;try (ByteArrayOutputStream bas = new ByteArrayOutputStream()) {ImageIO.write(image, "jpg", bas);return bas.toByteArray();} catch (Exception e) {log.error("图像转换为字节数组失败: {}", e.getMessage());return null;}
}
API接口设计
1. 启动RTSP流
public void startRtspThread(String rtspUrl) {Boolean isRunning = rtspIsRunning.get(rtspUrl);if (Objects.nonNull(isRunning) && isRunning) {return; // 已在运行中}initGrabber(rtspUrl);lastAccessTime.put(rtspUrl, Instant.now());
}
2. 获取最新帧数据
public byte[] getLatestFrameBytes(String rtspUrl) {lastAccessTime.put(rtspUrl, Instant.now());byte[] imageBytes = cacheMap.get(rtspUrl);if (imageBytes == null) {log.debug("RTSP {} 当前无可用帧数据", rtspUrl);}return imageBytes;
}
3. 状态监控
提供详细的状态监控信息:
public Map<String, Object> getStatusInfo() {Map<String, Object> status = new ConcurrentHashMap<>();Instant now = Instant.now();for (String rtspUrl : rtspIsRunning.keySet()) {Map<String, Object> streamInfo = new ConcurrentHashMap<>();// 运行状态、访问时间、运行时长等信息streamInfo.put("isRunning", rtspIsRunning.get(rtspUrl));streamInfo.put("hasCachedData", cacheMap.containsKey(rtspUrl));// ... 更多状态信息status.put(rtspUrl, streamInfo);}return status;
}
性能特点与优化
- 内存优化策略
固定分辨率:640x360降低内存占用
关键帧抓取:只抓取关键帧减少处理负载
即时资源释放:Frame和BufferedImage使用后立即释放
定期垃圾回收:在资源清理后主动触发GC - 并发性能
无锁设计:使用ConcurrentHashMap避免同步开销
异步处理:帧抓取在独立线程池中执行
弹性线程池:根据负载动态调整线程数量 - 稳定性保障
异常处理:完善的异常捕获和恢复机制
资源清理:确保所有native资源正确释放
超时控制:防止僵死连接占用资源
使用示例
@RestController
public class VideoController {@Autowiredprivate RtspFrameCache rtspFrameCache;@GetMapping("/video/start")public String startVideo(@RequestParam String rtspUrl) {rtspFrameCache.startRtspThread(rtspUrl);return "视频流启动成功";}@GetMapping("/video/frame")public ResponseEntity<byte[]> getFrame(@RequestParam String rtspUrl) {byte[] frameBytes = rtspFrameCache.getLatestFrameBytes(rtspUrl);if (frameBytes != null) {return ResponseEntity.ok().contentType(MediaType.IMAGE_JPEG).body(frameBytes);}return ResponseEntity.notFound().build();}@GetMapping("/video/status")public Map<String, Object> getStatus() {return rtspFrameCache.getStatusInfo();}
}
附全代码
package com.weixin.wt.cache;import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.Java2DFrameConverter;
import org.springframework.stereotype.Component;import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import javax.imageio.ImageIO;@Slf4j
@Component
public class RtspFrameCache {// 只缓存最新一帧的字节流private final Map<String, byte[]> cacheMap = new ConcurrentHashMap<>();// 添加到类成员private final Map<String, FFmpegFrameGrabber> grabberMap = new ConcurrentHashMap<>();//存储rtsp运行状态private final Map<String, Boolean> rtspIsRunning = new ConcurrentHashMap<>();// 存储最后访问时间 控制一段时间不需要运行的线程private final Map<String, Instant> lastAccessTime = new ConcurrentHashMap<>();// 存储启动时间 控制长时间运行的线程private final Map<String, Instant> startTime = new ConcurrentHashMap<>();// 存储转换器,确保正确关闭private final Map<String, Java2DFrameConverter> converterMap = new ConcurrentHashMap<>();// 10秒无访问超时private static final long IDLE_TIMEOUT_MS = 10 * 1000;// 2小时最大运行时间private static final long MAX_RUNTIME_MS = 2 * 60 * 60 * 1000;private volatile ExecutorService frameExecutor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new ThreadPoolExecutor.DiscardPolicy());// 定时清理任务执行器private final ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor();@PostConstructpublic void init() {// 显式加载FFmpeg native库,防止找不到jniavutil等问题try {org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avutil.class);org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avcodec.class);org.bytedeco.javacpp.Loader.load(org.bytedeco.ffmpeg.global.avformat.class);log.info("FFmpeg native 库加载成功");} catch (Exception e) {log.error("FFmpeg native 库加载失败", e);}// 启动定时清理任务,每5秒检查一次cleanupExecutor.scheduleWithFixedDelay(this::cleanupExpiredStreams, 5, 5, TimeUnit.SECONDS);// 启动内存监控任务,每30秒检查一次cleanupExecutor.scheduleWithFixedDelay(this::logMemoryUsage, 30, 30, TimeUnit.SECONDS);log.info("RTSP帧缓存清理任务已启动");}@PreDestroypublic void destroy() {shutdownAll();if (!cleanupExecutor.isShutdown()) {cleanupExecutor.shutdown();try {if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {cleanupExecutor.shutdownNow();}} catch (InterruptedException e) {cleanupExecutor.shutdownNow();Thread.currentThread().interrupt();}}}/*** 记录内存使用情况*/private void logMemoryUsage() {MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();long heapUsed = memoryBean.getHeapMemoryUsage().getUsed() / 1024 / 1024; // MBlong heapMax = memoryBean.getHeapMemoryUsage().getMax() / 1024 / 1024; // MBlong nonHeapUsed = memoryBean.getNonHeapMemoryUsage().getUsed() / 1024 / 1024; // MBlog.info("内存使用情况 - 堆内存: {}/{}MB, 非堆内存: {}MB, 活跃RTSP流: {}",heapUsed, heapMax, nonHeapUsed, rtspIsRunning.size());}/*** 清理过期的RTSP流*/private void cleanupExpiredStreams() {Instant now = Instant.now();for (Map.Entry<String, Instant> entry : lastAccessTime.entrySet()) {String rtspUrl = entry.getKey();Instant lastAccess = entry.getValue();Instant start = startTime.get(rtspUrl);// 检查是否超过10秒无访问boolean isIdleTimeout = now.toEpochMilli() - lastAccess.toEpochMilli() > IDLE_TIMEOUT_MS;// 检查是否超过2小时运行时间boolean isMaxRuntimeExceeded = start != null &&now.toEpochMilli() - start.toEpochMilli() > MAX_RUNTIME_MS;if (isIdleTimeout) {log.info("RTSP流 {} 将被清理,原因: {}", rtspUrl, "超过10秒无访问");shutdownStream(rtspUrl);}if (isMaxRuntimeExceeded) {log.info("RTSP流 {} 将被清理,原因: {}", rtspUrl, "运行时间超过2小时");shutdownStreamByMaxTime(rtspUrl);}}}/*** 关闭指定的RTSP流*/private void shutdownStream(String rtspUrl) {// 标记为非运行状态rtspIsRunning.put(rtspUrl, Boolean.FALSE);// 释放转换器资源 remove获取资源后 移除 可以资源释放Java2DFrameConverter converter = converterMap.remove(rtspUrl);if (converter != null) {try {converter.close();log.info("RTSP {} converter已关闭", rtspUrl);} catch (Exception e) {log.warn("RTSP {} 关闭converter异常: {}", rtspUrl, e.getMessage());}}// 释放grabber资源FFmpegFrameGrabber grabber = grabberMap.remove(rtspUrl);if (grabber != null) {try {grabber.stop();grabber.release();log.info("RTSP {} grabber已释放", rtspUrl);} catch (Exception e) {log.warn("RTSP {} 释放grabber异常: {}", rtspUrl, e.getMessage());}}// 清理相关数据cacheMap.remove(rtspUrl);lastAccessTime.remove(rtspUrl);startTime.remove(rtspUrl);// 强制垃圾回收System.gc();log.info("RTSP流 {} 已完全清理", rtspUrl);}/*** 关闭指定的RTSP流*/private void shutdownStreamByMaxTime(String rtspUrl) {// 标记为非运行状态rtspIsRunning.put(rtspUrl, Boolean.FALSE);// 释放转换器资源Java2DFrameConverter converter = converterMap.remove(rtspUrl);if (converter != null) {try {converter.close();log.info("RTSP {} converter已关闭", rtspUrl);} catch (Exception e) {log.warn("RTSP {} 关闭converter异常: {}", rtspUrl, e.getMessage());}}// 释放grabber资源FFmpegFrameGrabber grabber = grabberMap.remove(rtspUrl);if (grabber != null) {try {grabber.stop();grabber.release();log.info("RTSP {} grabber已释放", rtspUrl);} catch (Exception e) {log.warn("RTSP {} 释放grabber异常: {}", rtspUrl, e.getMessage());}}lastAccessTime.remove(rtspUrl);startTime.remove(rtspUrl);// 强制垃圾回收System.gc();log.info("RTSP流 {} 已完全清理", rtspUrl);}public void startRtspThread(String rtspUrl) {Boolean b = rtspIsRunning.get(rtspUrl);//如果不为空 且为true 正在运行中if (Objects.nonNull(b) && b) {return;} else {initGrabber(rtspUrl);}// 更新最后访问时间lastAccessTime.put(rtspUrl, Instant.now());}private void initGrabber(String rtspUrl) {try {FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);grabber.setOption("rtsp_transport", "tcp");grabber.setOption("stimeout", "5000000");grabber.setOption("max_delay", "2000000");// 减少缓冲区大小以降低内存使用grabber.setOption("buffer_size", "1024");grabber.setOption("max_delay", "1000000");grabber.setFrameRate(8);grabber.setImageWidth(640);grabber.setImageHeight(360);log.info("RTSP {} 正在初始化连接...", rtspUrl);grabber.start();// 创建转换器并存储Java2DFrameConverter converter = new Java2DFrameConverter();converterMap.put(rtspUrl, converter);//启动一个线程进行处理frameExecutor.execute(() -> processFrames(grabber, rtspUrl, converter));rtspIsRunning.put(rtspUrl, Boolean.TRUE);grabberMap.put(rtspUrl, grabber);// 记录启动时间startTime.put(rtspUrl, Instant.now());} catch (Exception e) {log.error("初始化RTSP抓取器失败: {}", e.getMessage());rtspIsRunning.put(rtspUrl, Boolean.FALSE);}}private void processFrames(FFmpegFrameGrabber grabber, String rtspUrl, Java2DFrameConverter converter) {log.info("RTSP {} 启动成功, 开始抓取最新帧数据!", rtspUrl);try {while (Boolean.TRUE.equals(rtspIsRunning.get(rtspUrl))) {Frame frame = null;BufferedImage image = null;try {frame = grabber.grabKeyFrame();if (frame == null || frame.image == null) {// 添加短暂休眠减少CPU使用Thread.sleep(50);continue;}image = converter.convert(frame);if (image != null) {byte[] imageBytes = convertImageToBytes(image);if (imageBytes != null) {cacheMap.put(rtspUrl, imageBytes);}}} catch (Exception e) {log.warn("RTSP {} 获取帧数据时发生异常: {}", rtspUrl, e.getMessage());try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}} finally {// 确保释放所有资源if (image != null) {image.flush();image = null; // 显式置空}if (frame != null) {try {frame.close();} catch (Exception e) {log.warn("释放frame异常: {}", e.getMessage());}frame = null; // 显式置空}}}} finally {// 确保converter被正确关闭try {converter.close();log.info("RTSP {} converter已关闭", rtspUrl);} catch (Exception e) {log.warn("关闭converter异常: {}", e.getMessage());}// 清理grabber(如果还在map中)FFmpegFrameGrabber g = grabberMap.remove(rtspUrl);if (g != null) {try {g.stop();log.info("RTSP {} grabber已stop", rtspUrl);} catch (Exception e) {log.warn("RTSP {} stop异常: {}", rtspUrl, e.getMessage());}try {g.release();log.info("RTSP {} grabber已release", rtspUrl);} catch (Exception e) {log.warn("RTSP {} release异常: {}", rtspUrl, e.getMessage());}}// 清理转换器converterMap.remove(rtspUrl);log.info("RTSP {} 帧抓取线程结束", rtspUrl);}}//直接返回byte[]public byte[] getLatestFrameBytes(String rtspUrl) {// 更新最后访问时间lastAccessTime.put(rtspUrl, Instant.now());byte[] imageBytes = cacheMap.get(rtspUrl);if (imageBytes == null) {log.debug("RTSP {} 当前无可用帧数据", rtspUrl);}return imageBytes;}private byte[] convertImageToBytes(BufferedImage image) {if (image == null) {return null;}try (ByteArrayOutputStream bas = new ByteArrayOutputStream()) {// 直接使用ImageIO,内存开销最小ImageIO.write(image, "jpg", bas);return bas.toByteArray();} catch (Exception e) {log.error("ImageIO图像转换为字节数组失败: {}", e.getMessage());return null;}}public void shutdownAll() {log.info("准备关闭所有RTSP流...");// 标记所有为非运行状态,让线程主动退出rtspIsRunning.replaceAll((k, v) -> false);// 关闭所有转换器for (Map.Entry<String, Java2DFrameConverter> entry : converterMap.entrySet()) {String url = entry.getKey();Java2DFrameConverter converter = entry.getValue();try {log.info("关闭转换器:{}", url);if (converter != null) {converter.close();}} catch (Exception e) {log.warn("关闭转换器 {} 时异常: {}", url, e.getMessage());}}// 显式释放所有grabber资源for (Map.Entry<String, FFmpegFrameGrabber> entry : grabberMap.entrySet()) {String url = entry.getKey();FFmpegFrameGrabber grabber = entry.getValue();try {log.info("释放 RTSP 流:{}", url);if (grabber != null) {grabber.stop();grabber.release();}} catch (Exception e) {log.warn("关闭 RTSP 流 {} 时异常: {}", url, e.getMessage());}}// 清理所有数据grabberMap.clear();converterMap.clear();cacheMap.clear();rtspIsRunning.clear();lastAccessTime.clear();startTime.clear();// 强制垃圾回收System.gc();log.info("所有RTSP流已关闭并资源释放完毕。");}/*** 获取RTSP流状态信息*/public Map<String, Object> getStatusInfo() {Map<String, Object> status = new ConcurrentHashMap<>();Instant now = Instant.now();for (String rtspUrl : rtspIsRunning.keySet()) {Map<String, Object> streamInfo = new ConcurrentHashMap<>();// 运行状态Boolean isRunning = rtspIsRunning.get(rtspUrl);streamInfo.put("isRunning", isRunning);// 最后访问时间Instant lastAccess = lastAccessTime.get(rtspUrl);if (lastAccess != null) {long idleTimeMs = now.toEpochMilli() - lastAccess.toEpochMilli();streamInfo.put("lastAccessTime", lastAccess.toString());streamInfo.put("idleTimeSeconds", idleTimeMs / 1000);streamInfo.put("isIdleTimeout", idleTimeMs > IDLE_TIMEOUT_MS);}// 启动时间Instant start = startTime.get(rtspUrl);if (start != null) {long runtimeMs = now.toEpochMilli() - start.toEpochMilli();streamInfo.put("startTime", start.toString());streamInfo.put("runtimeSeconds", runtimeMs / 1000);streamInfo.put("isMaxRuntimeExceeded", runtimeMs > MAX_RUNTIME_MS);}// 是否有缓存数据streamInfo.put("hasCachedData", cacheMap.containsKey(rtspUrl));status.put(rtspUrl, streamInfo);}return status;}
}
总结
本RTSP视频流缓存系统通过精心设计的架构和优化策略,实现了高性能、低内存占用的视频流处理能力。注意使用版本在低版本测试中 会出现非堆内存溢出的情况,可以使用最新版本
<dependency><groupId>org.bytedeco</groupId><artifactId>javacv</artifactId><version>1.5.11</version>
</dependency><dependency><groupId>org.bytedeco</groupId><artifactId>javacv-platform</artifactId><version>1.5.11</version>
</dependency>