环形缓冲区实现共享内存
共享内存的问题在于如果发送进程频繁写数据,而读取进程没来得及读,会导致共享内存里的数据被覆盖,共享内存(shm_open + mmap)只是提供了一块所有进程都能访问的公共地址空间,它本身不带任何缓存、消息队列、版本控制的机制。操作系统不会帮你保存“历史数据”,也不会阻止另一个进程覆盖你写的内容。
这意味着:
-
多个写者同时写 → 数据竞争(race condition);
-
写者频繁写而读者没跟上 → 前面写的内容会被新的覆盖;
-
除非你自己加同步机制或缓冲结构,否则共享内存就只是“一块裸内存”。
使用 环形缓冲区(ring buffer)
原理是:
- 分配一块较大的共享内存;
- 定义头尾指针(
head,tail); - 写者往尾部写数据;
- 读者从头部读数据;
- 用原子操作或信号量控制 head/tail
这种方式可以:
✅ 支持多个连续消息;
✅ 避免频繁 mmap;
✅ 控制覆盖策略(例如“满了丢最旧”或“阻塞等待”)。
//shared_memory.h
#include <iostream>
#include <cstring>
#include <sys/ipc.h>
#include <sys/mman.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <semaphore.h>
#include <sys/types.h>
#include <vector>
#include <atomic>typedef struct img_data{size_t lengh;std::vector<char> data;
}IMG_DATA;class SharedMemory{
public:struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};SharedMemory(const std::string &name, size_t size, bool create=true);~SharedMemory();//get shared memory addressvoid * get_memory() const;//notify other process data is ready.void notify();void wait();int wait_until_timedout(int milliseconds);size_t get_size() const;bool write(const void *data, size_t len);void write_data(size_t tail, const void * data, size_t len);ssize_t read(void * out, size_t maxlen);
private:std::string name_;size_t size_;shared_header * header_;int shm_fd_;sem_t *semaphore_;std::string sem_name_;bool creator_;
private:void create_shared_memory(size_t size);void attach_shared_memory();
};
源文件实现
#include <iostream>
#include <cstring>
#include <sys/ipc.h>
#include <sys/mman.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <semaphore.h>
#include <sys/types.h>
#include <vector>
#include <atomic>typedef struct img_data{size_t lengh;std::vector<char> data;
}IMG_DATA;class SharedMemory{
public:struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};SharedMemory(const std::string &name, size_t size, bool create=true);~SharedMemory();//get shared memory addressvoid * get_memory() const;//notify other process data is ready.void notify();void wait();int wait_until_timedout(int milliseconds);size_t get_size() const;bool write(const void *data, size_t len);void write_data(size_t tail, const void * data, size_t len);ssize_t read(void * out, size_t maxlen);
private:std::string name_;size_t size_;shared_header * header_;int shm_fd_;sem_t *semaphore_;std::string sem_name_;bool creator_;
private:void create_shared_memory(size_t size);void attach_shared_memory();
};uos@uos-PC:/software/code/test/tls_server/ipc_demo$
uos@uos-PC:/software/code/test/tls_server/ipc_demo$
uos@uos-PC:/software/code/test/tls_server/ipc_demo$ cat shared_memory.cpp
#include "shared_memory.h"SharedMemory::SharedMemory(const std::string& name, size_t size, bool create): name_(name), size_(sizeof(shared_header) + size - 1), header_(nullptr), shm_fd_(-1), semaphore_(nullptr) {creator_ = create;// 创建或获取共享内存if (create) {create_shared_memory(size);} else {attach_shared_memory();}header_ = (shared_header*)mmap(nullptr, size_, PROT_READ|PROT_WRITE, MAP_SHARED, shm_fd_, 0);if(header_ == MAP_FAILED){throw std::runtime_error(std::string("mmap failed: ")+strerror(errno));}if(creator_){header_->head_.store(0);header_->tail_.store(0);header_->buffer_size = size;}
}void SharedMemory::create_shared_memory(size_t size) {std::string shm_name = "/" + name_;sem_name_ = "/sem_"+name_;shm_unlink(shm_name.c_str());shm_fd_ = shm_open(shm_name.c_str(), O_CREAT|O_RDWR, 0666);if(shm_fd_ == -1){std::cerr << "shm_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("shm_open failed");}if(ftruncate(shm_fd_,size) == -1){throw std::runtime_error("ftruncate failed");}sem_unlink(sem_name_.c_str());// 创建信号量用于同步semaphore_ = sem_open(sem_name_.c_str(), O_CREAT, 0644, 0);if (semaphore_ == SEM_FAILED) {std::cerr << "sem_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("sem_open failed");}
}void SharedMemory::attach_shared_memory() {std::string shm_name = "/" + name_;shm_fd_ = shm_open(shm_name.c_str(), O_RDWR, 666);if(shm_fd_ == -1){std::cerr << "shm_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("shm_open failed");}// 获取信号量std::string semName = "/sem_" + name_;semaphore_ = sem_open(semName.c_str(), 0);if (semaphore_ == SEM_FAILED) {std::cerr << "sem_open failed: " << strerror(errno) << std::endl;throw std::runtime_error("sem_open failed");}
}SharedMemory::~SharedMemory() {if (header_ != nullptr && header_ != (void*)-1) {munmap(header_, size_);}if(shm_fd_!=-1){close(shm_fd_);}if (semaphore_ != nullptr && semaphore_ != SEM_FAILED) {sem_close(semaphore_);}if(creator_){std::string shm_name = "/" + name_;shm_unlink(shm_name.c_str());sem_unlink(sem_name_.c_str());}
}void* SharedMemory::get_memory() const {return header_->memory_;
}void SharedMemory::notify() {sem_post(semaphore_);
}int SharedMemory::wait_until_timedout(int milliseconds) {int return_status = 0;struct timespec ts;ts.tv_sec = milliseconds / 1000; //sts.tv_nsec = (milliseconds %1000) * 1000000; //nsint ret = sem_timedwait(semaphore_, &ts);if(ret!=0){return_status = errno;}return return_status;
}
void SharedMemory::wait(){sem_wait(semaphore_);
}
size_t SharedMemory::get_size() const {return header_->buffer_size;
}bool SharedMemory::write(const void *data, size_t len){if(len + sizeof(size_t) > header_->buffer_size){std::cerr<<"message too large for buffer\n";return false;}while(true){size_t head = header_->head_.load(std::memory_order_acquire);size_t tail = header_->tail_.load(std::memory_order_acquire);size_t used = (tail >= head)?(tail - head):(header_->buffer_size - head + tail);size_t free_space = header_->buffer_size -used - 1; //reserve 1byte for empty or full flagif(free_space < len + sizeof(size_t)){//mem cache is full, waittingusleep(1000);continue;}write_data(tail, data, len);return true;}return true;
}void SharedMemory::write_data(size_t tail, const void * data, size_t len){size_t data_len = static_cast<size_t>(len);//write lenif(tail + sizeof(size_t) <= header_->buffer_size){memcpy((unsigned char*)header_->memory_ + tail, &data_len, sizeof(size_t));}else{size_t first = header_->buffer_size - tail;memcpy((unsigned char*)header_->memory_+tail, &data_len, first);memcpy((unsigned char*)header_->memory_, &data_len + first, sizeof(size_t) - first);}tail = (tail + sizeof(size_t)) % header_->buffer_size;//write dataif(tail + len <= header_->buffer_size){memcpy((unsigned char*)header_->memory_ + tail, data, len);}else{size_t first = header_->buffer_size - tail;memcpy((unsigned char*)header_->memory_+tail, data, first);memcpy((unsigned char*)header_->memory_, (unsigned char*)data + first, len - first);}tail = (tail + len) % header_->buffer_size;header_->tail_.store(tail, std::memory_order_release);notify();
}ssize_t SharedMemory::read(void * out, size_t maxlen){size_t head = header_->head_.load(std::memory_order_acquire);size_t tail = header_->tail_.load(std::memory_order_acquire);if(tail == head){return 0;}uint32_t data_len;memcpy(&data_len, (unsigned char*)header_->memory_+head, sizeof(size_t));head = (head + sizeof(size_t)) % header_->buffer_size;if(data_len > maxlen) return -1;if(head + data_len <= header_->buffer_size){memcpy(out, (unsigned char*)header_->memory_+head, data_len);}else{size_t first = header_->buffer_size - head;memcpy(out, (unsigned char*)header_->memory_+head,first);memcpy((unsigned char*)out+first, header_->memory_, data_len - first);}header_->head_.store((head + data_len)%header_->buffer_size, std::memory_order_release);return data_len;
}
查看系统支持的消息队列最大长度,以及每条消息的长度
uos@uos-PC:~$ cat /proc/sys/fs/mqueue/msg_max
10
uos@uos-PC:~$ cat /proc/sys/fs/mqueue/msgsize_max
8192
查看共享内存内容
hexdump -C /dev/shm/my_shm | less
00000000 4e 00 00 00 00 00 00 00 4e 00 00 00 00 00 00 00 |N.......N.......|
00000010 00 00 a0 00 00 00 00 00 12 00 00 00 00 00 00 00 |................|
00000020 74 68 69 73 20 69 73 20 61 20 70 69 63 74 75 72 |this is a pictur|
00000030 65 2e 12 00 00 00 00 00 00 00 74 68 69 73 20 69 |e.........this i|
00000040 73 20 61 20 70 69 63 74 75 72 65 2e 12 00 00 00 |s a picture.....|
00000050 00 00 00 00 74 68 69 73 20 69 73 20 61 20 70 69 |....this is a pi|
00000060 63 74 75 72 65 2e 00 00 00 00 00 00 00 00 00 00 |cture...........|
00000070 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
对应的数据结构
struct shared_header{std::atomic<size_t> head_;std::atomic<size_t> tail_;size_t buffer_size;unsigned char memory_[1];};
可以看到head_为4e 00 00 00 00 00 00 00,tail_为4e 00 00 00 00 00 00 00,buffer_size为00 00 a0 00 00 00 00 00,每次存数据格式为长度+值,12 00 00 00 00 00 00 00为长度,74 68 69 73 20 69 73 20 61 20 70 69 63 74 75 72 65 2e 为值。tail和head相等表示数据已读完。
