Windows下利用boost库与Windows Api 实现共享内存
1、Boost库实现生产者与消费者
头文件与数据结构
// shared_data.h
#ifndef SHARED_DATA_H
#define SHARED_DATA_H#include <string>// 共享数据结构
struct SharedData {int message_id;char message[256];bool data_ready;bool stop_requested;SharedData() : message_id(0), data_ready(false), stop_requested(false) {memset(message, 0, sizeof(message));}
};#endif生产者进程
// producer.cpp
#include <iostream>
#include <string>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "shared_data.h"using namespace boost::interprocess;int main() {try {std::cout << "=== 生产者进程启动 ===" << std::endl;// 创建或打开共享内存(1MB大小)managed_windows_shared_memory segment(create_only, // 创建模式"DemoSharedMemory", // 共享内存名称1024 * 1024 // 1MB大小);// 创建同步对象named_mutex mutex(create_only, "DemoMutex");named_condition condition(create_only, "DemoCondition");// 在共享内存中构造共享数据对象SharedData* shared_data = segment.construct<SharedData>("SharedData")();std::cout << "共享内存创建成功,等待消费者连接..." << std::endl;int message_count = 0;std::string input;while (true) {std::cout << "\n请输入要发送的消息 (输入 'quit' 退出): ";std::getline(std::cin, input);if (input == "quit") {// 通知消费者退出scoped_lock<named_mutex> lock(mutex);shared_data->stop_requested = true;condition.notify_one();std::cout << "已发送退出信号" << std::endl;break;}// 写入数据到共享内存{scoped_lock<named_mutex> lock(mutex);// 等待消费者读取上一批数据while (shared_data->data_ready) {condition.wait(lock);}// 准备新数据shared_data->message_id = ++message_count;strncpy(shared_data->message, input.c_str(), sizeof(shared_data->message) - 1);shared_data->message[sizeof(shared_data->message) - 1] = '\0';shared_data->data_ready = true;std::cout << "发送消息 [" << shared_data->message_id << "]: " << shared_data->message << std::endl;// 通知消费者有新数据condition.notify_one();}}// 等待一段时间让消费者处理退出信号Sleep(2000);// 清理资源segment.destroy<SharedData>("SharedData");std::cout << "生产者进程结束" << std::endl;}catch (const interprocess_exception& ex) {std::cerr << "进程间异常: " << ex.what() << std::endl;return 1;}catch (const std::exception& ex) {std::cerr << "标准异常: " << ex.what() << std::endl;return 1;}return 0;
}消费者进程
// consumer.cpp
#include <iostream>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "shared_data.h"using namespace boost::interprocess;int main() {try {std::cout << "=== 消费者进程启动 ===" << std::endl;// 尝试打开共享内存(等待生产者创建)std::cout << "等待连接到生产者..." << std::endl;managed_windows_shared_memory segment;int retry_count = 0;const int max_retries = 10;while (retry_count < max_retries) {try {segment = managed_windows_shared_memory(open_only, "DemoSharedMemory");std::cout << "成功连接到共享内存" << std::endl;break;}catch (const interprocess_exception&) {retry_count++;std::cout << "尝试连接 " << retry_count << "/" << max_retries << "..." << std::endl;Sleep(1000);}}if (retry_count >= max_retries) {std::cerr << "无法连接到共享内存,请先启动生产者进程" << std::endl;return 1;}// 打开同步对象named_mutex mutex(open_only, "DemoMutex");named_condition condition(open_only, "DemoCondition");// 查找共享数据SharedData* shared_data = segment.find<SharedData>("SharedData").first;if (!shared_data) {std::cerr << "在共享内存中找不到数据对象" << std::endl;return 1;}std::cout << "消费者就绪,开始接收消息..." << std::endl;// 主循环while (true) {scoped_lock<named_mutex> lock(mutex);// 等待新数据或退出信号while (!shared_data->data_ready && !shared_data->stop_requested) {condition.wait(lock);}// 检查退出信号if (shared_data->stop_requested) {std::cout << "收到退出信号,消费者进程结束" << std::endl;break;}// 处理数据if (shared_data->data_ready) {std::cout << "接收消息 [" << shared_data->message_id << "]: " << shared_data->message << std::endl;// 标记数据已处理shared_data->data_ready = false;// 通知生产者可以发送新数据condition.notify_one();}}}catch (const interprocess_exception& ex) {std::cerr << "进程间异常: " << ex.what() << std::endl;return 1;}catch (const std::exception& ex) {std::cerr << "标准异常: " << ex.what() << std::endl;return 1;}return 0;
}说明:
1. 共享内存管理
managed_windows_shared_memory segment(create_only, "DemoSharedMemory", 1024 * 1024);
create_only:仅创建,如果已存在则失败open_only:仅打开,如果不存在则失败自动管理内存分配和对象生命周期
2. 对象构造
SharedData* shared_data = segment.construct<SharedData>("SharedData")();在共享内存中构造对象
自动调用构造函数
通过名称标识对象
3. 同步机制
named_mutex mutex(create_only, "DemoMutex"); named_condition condition(create_only, "DemoCondition");
named_mutex:命名互斥锁,跨进程同步named_condition:命名条件变量,用于线程间通信scoped_lock:RAII锁管理,自动加锁和解锁
重要函数说明:
在 Boost.Interprocess 中,segment.find<SharedData>("SharedData") 返回的是一个 std::pair 对象
find 方法的返回值
// find 方法的签名类似于: std::pair<T*, std::size_t> find(const char* name);
返回值是一个包含两个成员的 std::pair:
.first: 指向找到的对象的指针 (T*).second: 找到的对象数量 (std::size_t)
2、Windows API 实现
数据结构定义
#include <windows.h>
#include <iostream>
#include <string>
#include <vector>#define BUFFER_SIZE 10
#define SHARED_MEM_NAME L"Local\\ProducerConsumerSharedMem"
#define MUTEX_NAME L"Local\\ProducerConsumerMutex"
#define EMPTY_SEM_NAME L"Local\\EmptySemaphore"
#define FULL_SEM_NAME L"Local\\FullSemaphore"struct SharedData {int buffer[BUFFER_SIZE];int in; // 生产者位置int out; // 消费者位置int count; // 当前产品数量bool producerFinished;
};生产者实现
class Producer {
private:HANDLE hMapFile;HANDLE hMutex;HANDLE hEmptySem;HANDLE hFullSem;SharedData* pSharedData;public:Producer() : hMapFile(NULL), hMutex(NULL), hEmptySem(NULL), hFullSem(NULL), pSharedData(nullptr) {}~Producer() {Cleanup();}bool Initialize() {// 创建共享内存hMapFile = CreateFileMapping(INVALID_HANDLE_VALUE,NULL,PAGE_READWRITE,0,sizeof(SharedData),SHARED_MEM_NAME);if (hMapFile == NULL) {std::cerr << "CreateFileMapping failed: " << GetLastError() << std::endl;return false;}// 映射共享内存pSharedData = static_cast<SharedData*>(MapViewOfFile(hMapFile,FILE_MAP_ALL_ACCESS,0, 0, sizeof(SharedData)));if (pSharedData == NULL) {std::cerr << "MapViewOfFile failed: " << GetLastError() << std::endl;return false;}// 如果是第一次创建,初始化共享数据if (GetLastError() != ERROR_ALREADY_EXISTS) {pSharedData->in = 0;pSharedData->out = 0;pSharedData->count = 0;pSharedData->producerFinished = false;}// 创建同步对象hMutex = CreateMutex(NULL, FALSE, MUTEX_NAME);hEmptySem = CreateSemaphore(NULL, BUFFER_SIZE, BUFFER_SIZE, EMPTY_SEM_NAME);hFullSem = CreateSemaphore(NULL, 0, BUFFER_SIZE, FULL_SEM_NAME);if (hMutex == NULL || hEmptySem == NULL || hFullSem == NULL) {std::cerr << "Create sync objects failed" << std::endl;return false;}return true;}void Produce(int item) {// 等待空槽位WaitForSingleObject(hEmptySem, INFINITE);// 进入临界区WaitForSingleObject(hMutex, INFINITE);// 生产物品pSharedData->buffer[pSharedData->in] = item;pSharedData->in = (pSharedData->in + 1) % BUFFER_SIZE;pSharedData->count++;std::cout << "Produced: " << item << " [Count: " << pSharedData->count << "]" << std::endl;// 离开临界区ReleaseMutex(hMutex);// 通知有可用产品ReleaseSemaphore(hFullSem, 1, NULL);}void FinishProduction() {WaitForSingleObject(hMutex, INFINITE);pSharedData->producerFinished = true;ReleaseMutex(hMutex);// 唤醒所有等待的消费者ReleaseSemaphore(hFullSem, BUFFER_SIZE, NULL);}private:void Cleanup() {if (pSharedData) {UnmapViewOfFile(pSharedData);pSharedData = nullptr;}if (hMapFile) {CloseHandle(hMapFile);hMapFile = NULL;}if (hMutex) {CloseHandle(hMutex);hMutex = NULL;}if (hEmptySem) {CloseHandle(hEmptySem);hEmptySem = NULL;}if (hFullSem) {CloseHandle(hFullSem);hFullSem = NULL;}}
};消费者实现
class Consumer {
private:HANDLE hMapFile;HANDLE hMutex;HANDLE hEmptySem;HANDLE hFullSem;SharedData* pSharedData;public:Consumer() : hMapFile(NULL), hMutex(NULL), hEmptySem(NULL), hFullSem(NULL), pSharedData(nullptr) {}~Consumer() {Cleanup();}bool Initialize() {// 打开共享内存hMapFile = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, SHARED_MEM_NAME);if (hMapFile == NULL) {std::cerr << "OpenFileMapping failed: " << GetLastError() << std::endl;return false;}// 映射共享内存pSharedData = static_cast<SharedData*>(MapViewOfFile(hMapFile,FILE_MAP_ALL_ACCESS,0, 0, sizeof(SharedData)));if (pSharedData == NULL) {std::cerr << "MapViewOfFile failed: " << GetLastError() << std::endl;return false;}// 打开同步对象hMutex = OpenMutex(MUTEX_ALL_ACCESS, FALSE, MUTEX_NAME);hEmptySem = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, EMPTY_SEM_NAME);hFullSem = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, FULL_SEM_NAME);if (hMutex == NULL || hEmptySem == NULL || hFullSem == NULL) {std::cerr << "Open sync objects failed" << std::endl;return false;}return true;}int Consume() {// 等待有产品可用DWORD result = WaitForSingleObject(hFullSem, 5000); // 5秒超时if (result == WAIT_TIMEOUT) {return -1;}// 进入临界区WaitForSingleObject(hMutex, INFINITE);// 检查生产者是否已完成if (pSharedData->producerFinished && pSharedData->count == 0) {ReleaseMutex(hMutex);return -1; // 生产结束且无产品}// 消费物品int item = pSharedData->buffer[pSharedData->out];pSharedData->out = (pSharedData->out + 1) % BUFFER_SIZE;pSharedData->count--;std::cout << "Consumed: " << item << " [Count: " << pSharedData->count << "]" << std::endl;// 离开临界区ReleaseMutex(hMutex);// 通知有空槽位ReleaseSemaphore(hEmptySem, 1, NULL);return item;}bool ShouldContinue() {WaitForSingleObject(hMutex, INFINITE);bool continueConsuming = !(pSharedData->producerFinished && pSharedData->count == 0);ReleaseMutex(hMutex);return continueConsuming;}private:void Cleanup() {if (pSharedData) {UnmapViewOfFile(pSharedData);pSharedData = nullptr;}if (hMapFile) {CloseHandle(hMapFile);hMapFile = NULL;}if (hMutex) {CloseHandle(hMutex);hMutex = NULL;}if (hEmptySem) {CloseHandle(hEmptySem);hEmptySem = NULL;}if (hFullSem) {CloseHandle(hFullSem);hFullSem = NULL;}}
};