当前位置: 首页 > news >正文

环形缓冲区实现共享内存

共享内存的问题在于如果发送进程频繁写数据,而读取进程没来得及读,会导致共享内存里的数据被覆盖,共享内存(shm_open + mmap)只是提供了一块所有进程都能访问的公共地址空间,它本身不带任何缓存、消息队列、版本控制的机制。操作系统不会帮你保存“历史数据”,也不会阻止另一个进程覆盖你写的内容。

这意味着:

  • 多个写者同时写 → 数据竞争(race condition);

  • 写者频繁写而读者没跟上 → 前面写的内容会被新的覆盖;

  • 除非你自己加同步机制缓冲结构,否则共享内存就只是“一块裸内存”。

使用 环形缓冲区(ring buffer)

原理是:

  • 分配一块较大的共享内存;
  • 定义头尾指针(head, tail);
  • 写者往尾部写数据;
  • 读者从头部读数据;
  • 用原子操作或信号量控制 head/tail

这种方式可以:
✅ 支持多个连续消息;
✅ 避免频繁 mmap
✅ 控制覆盖策略(例如“满了丢最旧”或“阻塞等待”)。

//shared_memory.h
#include <iostream>
#include <cstring>
#include <sys/ipc.h>
#include <sys/mman.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <semaphore.h>
#include <sys/types.h>
#include <vector>
#include <atomic>typedef struct img_data{size_t lengh;std::vector<char> data;
}IMG_DATA;class SharedMemory{
public:struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};SharedMemory(const std::string &name, size_t size, bool create=true);~SharedMemory();//get shared memory addressvoid * get_memory() const;//notify other process data is ready.void notify();void wait();int wait_until_timedout(int milliseconds);size_t get_size() const;bool write(const void *data, size_t len);void write_data(size_t tail, const void * data, size_t len);ssize_t read(void * out, size_t maxlen);
private:std::string name_;size_t size_;shared_header * header_;int shm_fd_;sem_t *semaphore_;std::string sem_name_;bool creator_;
private:void create_shared_memory(size_t size);void attach_shared_memory();
};

源文件实现

#include <iostream>
#include <cstring>
#include <sys/ipc.h>
#include <sys/mman.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <semaphore.h>
#include <sys/types.h>
#include <vector>
#include <atomic>typedef struct img_data{size_t lengh;std::vector<char> data;
}IMG_DATA;class SharedMemory{
public:struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};SharedMemory(const std::string &name, size_t size, bool create=true);~SharedMemory();//get shared memory addressvoid * get_memory() const;//notify other process data is ready.void notify();void wait();int wait_until_timedout(int milliseconds);size_t get_size() const;bool write(const void *data, size_t len);void write_data(size_t tail, const void * data, size_t len);ssize_t read(void * out, size_t maxlen);
private:std::string name_;size_t size_;shared_header * header_;int shm_fd_;sem_t *semaphore_;std::string sem_name_;bool creator_;
private:void create_shared_memory(size_t size);void attach_shared_memory();
};uos@uos-PC:/software/code/test/tls_server/ipc_demo$ 
uos@uos-PC:/software/code/test/tls_server/ipc_demo$ 
uos@uos-PC:/software/code/test/tls_server/ipc_demo$ cat shared_memory.cpp
#include "shared_memory.h"SharedMemory::SharedMemory(const std::string& name, size_t size, bool create): name_(name), size_(sizeof(shared_header) + size - 1), header_(nullptr), shm_fd_(-1), semaphore_(nullptr) {creator_ = create;// 创建或获取共享内存if (create) {create_shared_memory(size);} else {attach_shared_memory();}header_ = (shared_header*)mmap(nullptr, size_, PROT_READ|PROT_WRITE, MAP_SHARED, shm_fd_, 0);if(header_ == MAP_FAILED){throw std::runtime_error(std::string("mmap failed: ")+strerror(errno));}if(creator_){header_->head_.store(0);header_->tail_.store(0);header_->buffer_size = size;}
}void SharedMemory::create_shared_memory(size_t size) {std::string shm_name = "/" + name_;sem_name_ = "/sem_"+name_;shm_unlink(shm_name.c_str());shm_fd_ = shm_open(shm_name.c_str(), O_CREAT|O_RDWR, 0666);if(shm_fd_ == -1){std::cerr << "shm_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("shm_open failed");}if(ftruncate(shm_fd_,size) == -1){throw std::runtime_error("ftruncate failed");}sem_unlink(sem_name_.c_str());// 创建信号量用于同步semaphore_ = sem_open(sem_name_.c_str(), O_CREAT, 0644, 0);if (semaphore_ == SEM_FAILED) {std::cerr << "sem_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("sem_open failed");}
}void SharedMemory::attach_shared_memory() {std::string shm_name = "/" + name_;shm_fd_ = shm_open(shm_name.c_str(), O_RDWR, 666);if(shm_fd_ == -1){std::cerr << "shm_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("shm_open failed");}// 获取信号量std::string semName = "/sem_" + name_;semaphore_ = sem_open(semName.c_str(), 0);if (semaphore_ == SEM_FAILED) {std::cerr << "sem_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("sem_open failed");}
}SharedMemory::~SharedMemory() {if (header_ != nullptr && header_ != (void*)-1) {munmap(header_, size_);}if(shm_fd_!=-1){close(shm_fd_);}if (semaphore_ != nullptr && semaphore_ != SEM_FAILED) {sem_close(semaphore_);}if(creator_){std::string shm_name = "/" + name_;shm_unlink(shm_name.c_str());sem_unlink(sem_name_.c_str());}
}void* SharedMemory::get_memory() const {return header_->memory_;
}void SharedMemory::notify() {sem_post(semaphore_);
}int SharedMemory::wait_until_timedout(int milliseconds) {int return_status = 0;struct timespec ts;ts.tv_sec = milliseconds / 1000; //sts.tv_nsec = (milliseconds %1000) * 1000000; //nsint ret = sem_timedwait(semaphore_, &ts);if(ret!=0){return_status = errno;}return return_status;
}
void SharedMemory::wait(){sem_wait(semaphore_);
}
size_t SharedMemory::get_size() const {return header_->buffer_size;
}bool SharedMemory::write(const void *data, size_t len){if(len + sizeof(size_t) > header_->buffer_size){std::cerr<<"message too large for buffer\n";return false;}while(true){size_t head = header_->head_.load(std::memory_order_acquire);size_t tail = header_->tail_.load(std::memory_order_acquire);size_t used = (tail >= head)?(tail - head):(header_->buffer_size - head + tail);size_t free_space = header_->buffer_size -used - 1; //reserve 1byte for empty or full flagif(free_space < len + sizeof(size_t)){//mem cache is full, waittingusleep(1000);continue;}write_data(tail, data, len);return true;}return true;
}void SharedMemory::write_data(size_t tail, const void * data, size_t len){size_t data_len = static_cast<size_t>(len);//write lenif(tail + sizeof(size_t) <= header_->buffer_size){memcpy((unsigned char*)header_->memory_ + tail, &data_len, sizeof(size_t));}else{size_t first = header_->buffer_size - tail;memcpy((unsigned char*)header_->memory_+tail, &data_len, first);memcpy((unsigned char*)header_->memory_, &data_len + first, sizeof(size_t) - first);}tail = (tail + sizeof(size_t)) % header_->buffer_size;//write dataif(tail + len <= header_->buffer_size){memcpy((unsigned char*)header_->memory_ + tail, data, len);}else{size_t first = header_->buffer_size - tail;memcpy((unsigned char*)header_->memory_+tail, data, first);memcpy((unsigned char*)header_->memory_, (unsigned char*)data + first, len - first);}tail = (tail + len) % header_->buffer_size;header_->tail_.store(tail, std::memory_order_release);notify();
}ssize_t SharedMemory::read(void * out, size_t maxlen){size_t head = header_->head_.load(std::memory_order_acquire);size_t tail = header_->tail_.load(std::memory_order_acquire);if(tail == head){return 0;}uint32_t data_len;memcpy(&data_len, (unsigned char*)header_->memory_+head, sizeof(size_t));head = (head + sizeof(size_t)) % header_->buffer_size;if(data_len > maxlen) return -1;if(head + data_len <= header_->buffer_size){memcpy(out, (unsigned char*)header_->memory_+head, data_len);}else{size_t first = header_->buffer_size - head;memcpy(out, (unsigned char*)header_->memory_+head,first);memcpy((unsigned char*)out+first, header_->memory_, data_len - first);}header_->head_.store((head + data_len)%header_->buffer_size, std::memory_order_release);return data_len;
}

查看系统支持的消息队列最大长度,以及每条消息的长度

uos@uos-PC:~$ cat /proc/sys/fs/mqueue/msg_max
10    
uos@uos-PC:~$ cat /proc/sys/fs/mqueue/msgsize_max
8192

查看共享内存内容

hexdump -C /dev/shm/my_shm | less
00000000  4e 00 00 00 00 00 00 00  4e 00 00 00 00 00 00 00  |N.......N.......|
00000010  00 00 a0 00 00 00 00 00  12 00 00 00 00 00 00 00  |................|
00000020  74 68 69 73 20 69 73 20  61 20 70 69 63 74 75 72  |this is a pictur|
00000030  65 2e 12 00 00 00 00 00  00 00 74 68 69 73 20 69  |e.........this i|
00000040  73 20 61 20 70 69 63 74  75 72 65 2e 12 00 00 00  |s a picture.....|
00000050  00 00 00 00 74 68 69 73  20 69 73 20 61 20 70 69  |....this is a pi|
00000060  63 74 75 72 65 2e 00 00  00 00 00 00 00 00 00 00  |cture...........|
00000070  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |................|

对应的数据结构

    struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};

可以看到head_为4e 00 00 00 00 00 00 00,tail_为4e 00 00 00 00 00 00 00,buffer_size为00 00 a0 00 00 00 00 00,每次存数据格式为长度+值,12 00 00 00 00 00 00 00为长度,74 68 69 73 20 69 73 20 61 20 70 69 63 74 75 72 65 2e 为值。tail和head相等表示数据已读完。

http://www.dtcms.com/a/544255.html

相关文章:

  • Spring AI 搭建 RAG 个人知识库
  • 网站地址是什么用淘宝域名做网站什么效果
  • SPSSAU「质量控制」模块:从可视化监控到过程优化,一站式搞定质量难题
  • 基于健康指标的自动驾驶全系统运行时安全分析方法
  • 从 0 到 1 开发 Rust 分布式日志服务:高吞吐设计 + 存储优化,支撑千万级日志采集
  • 如何做好网站的推广工作成都百度爱采购
  • [无人机sdk] Open Protocol | 协议包构造验证
  • 【Vscode】解决ssh远程开发时Formatter失效的问题
  • TCP 如何保证传输的可靠性?
  • 亲子娱乐升级!Docker 电视盒子 ADB 安装助手,儿童 APP 一键装满电视
  • Microsoft 365 Copilot 扩展至应用和工作流构建功能
  • 【Latex】本地部署latex+vscode
  • 注册中心(环境隔离、分级模型、Eureka)、远程调用负载均衡、服务保护原理分析
  • 有没有专门做建筑造价的私单网站网站开发风险
  • LSTM模型做二分类(PyTorch实现)
  • Linux 文件变动监控工具:原理、设计与实用指南(C/C++代码实现)
  • 建站之星怎么用做视频解析网站犯法吗
  • LibreTV无广告观影实测:聚合全网资源,远程访问家庭影院新方案!
  • 仓颉中的 UTF-8 编码处理:从 DFA 解码、错误策略到流式与字素迭代的工程实战
  • 【React】打卡笔记,入门学习02:react-router
  • Latex 转 word 在线
  • 【OD刷题笔记】- 可以组成网络的服务器
  • 《算法闯关指南:优选算法--前缀和》--27.寻找数组的中心下标,28.除自身以外数组的乘积
  • linux arm64平台上协议栈发包报文长度溢出导致系统挂死举例
  • 深入理解 Rust `HashMap` 的哈希算法与冲突解决机制
  • 彩票网站开发做一个网站价格
  • 《C++ 继承》三大面向对象编程——继承:派生类构造、多继承、菱形虚拟继承概要
  • 医疗AI白箱编程:从理论到实践指南(代码部分)
  • Spring Cache 多级缓存中 hash 类型 Redis 缓存的自定义实现与核心功能
  • 福州建设人才市场网站山西网站推广