C++单头文件实现windows进程间通信(基于命名管道)
goodtrailer/win-pipe: Single-file C++ library for Windows named pipes IPC.
参考Github开源项目,并补充了双向管道的实现。
本文介绍了一个名为win-pipe的Windows专用命名管道库,主要包含接收器(receiver)、发送器(sender)和双向管道(DuplexPipe)三个核心组件。接收器通过异步线程处理消息接收并调用回调函数,发送器提供简单的消息发送功能,双向管道则封装了二者实现双向通信。该库提供了线程安全的消息处理机制,支持管道连接管理、错误处理和资源自动释放。适合需要进程间通信的Windows应用程序开发。
win-pipe.h 需要进行进程间通信的进程,包含此头文件即可
/** ISC License** Copyright (c) 2021 Alden Wu** Permission to use, copy, modify, and/or distribute this software for any* purpose with or without fee is hereby granted, provided that the above* copyright notice and this permission notice appear in all copies.** THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.*/#pragma once#ifndef _WIN32
#error "win-pipe is a Windows only library."
#endif#include <algorithm>
#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <vector>#include <iostream>#include <Windows.h>namespace win_pipe {namespace details {static inline std::string format_name(const std::string& name){std::string formatted = R"(\\.\pipe\)";formatted += name;return formatted;}struct handle_deleter {void operator()(HANDLE handle){if (handle != NULL && handle != INVALID_HANDLE_VALUE)CloseHandle(handle);}};using unique_handle = std::unique_ptr<void, handle_deleter>;}using callback_t = std::function<void(uint8_t*, size_t)>;// -------------------------------------------------------------------[ receiverclass receiver {public:/// <summary>/// Default constructor. Does nothing. No pipe is opened/created, and no/// read thread is started./// <para/>/// Note: remember that move constructor exists. This constructor is mainly/// meant for use with containers which require a default constructor./// </summary>receiver() = default;receiver(const std::string& name, callback_t callback){m_param = std::make_unique<thread_param>();std::string pipe_name{ details::format_name(name) };m_param->pipe.reset(CreateNamedPipeA(pipe_name.c_str(),PIPE_ACCESS_INBOUND | FILE_FLAG_FIRST_PIPE_INSTANCE,PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,1, 1024, 1024, NMPWAIT_USE_DEFAULT_WAIT, NULL));if (m_param->pipe.get() == INVALID_HANDLE_VALUE) {std::string msg{ "Pipe creation failed: " };msg += std::to_string(GetLastError());throw std::runtime_error(msg);}m_param->callback = callback;m_param->event.reset(CreateEventA(NULL, TRUE, FALSE, NULL));m_thread.reset(CreateThread(NULL, NULL, thread, m_param.get(), 0, NULL));}receiver(receiver&&) noexcept = default;~receiver(){if (m_param)SetEvent(m_param->event.get());CancelSynchronousIo(m_thread.get());WaitForSingleObject(m_thread.get(), INFINITE);}receiver& operator=(receiver&&) noexcept = default;void set_callback(callback_t callback){if (!m_param)return;std::lock_guard<std::mutex> lock{ m_param->callback_mutex };m_param->callback = callback;}private:static DWORD WINAPI thread(LPVOID lp){auto* param = reinterpret_cast<thread_param*>(lp);auto pipe = param->pipe.get();auto event = param->event.get();auto& callback = param->callback;auto& callback_mutex = param->callback_mutex;std::vector<uint8_t> buffer(1024);while (WaitForSingleObject(event, 1) == WAIT_TIMEOUT) {ConnectNamedPipe(pipe, NULL);while (WaitForSingleObject(event, 1) == WAIT_TIMEOUT) {DWORD bytes_read = 0;if (!ReadFile(pipe, buffer.data(), (DWORD)buffer.size(), &bytes_read, NULL)) {if (GetLastError() != ERROR_MORE_DATA)break;DWORD leftover = 0;PeekNamedPipe(pipe, NULL, NULL, NULL, NULL, &leftover);buffer.resize(bytes_read + leftover);DWORD more_bytes_read = 0;ReadFile(pipe, buffer.data() + bytes_read, leftover, &more_bytes_read, NULL);bytes_read += more_bytes_read;}std::lock_guard<std::mutex> lock{ callback_mutex };callback(buffer.data(), (size_t)bytes_read);}DisconnectNamedPipe(pipe);}return TRUE;}private:struct thread_param {details::unique_handle pipe;details::unique_handle event;std::mutex callback_mutex;callback_t callback;};private:std::unique_ptr<thread_param> m_param;details::unique_handle m_thread;};// ---------------------------------------------------------------------[ senderclass sender {public:/// <summary>/// Default constructor. Does nothing. Cannot actually write to a pipe./// <para />/// Note: remember that move constructor exists. This constructor is mainly/// meant for use with containers which require a default constructor./// </summary>sender() = default;sender(const std::string& name): m_name{ details::format_name(name) }{}sender(sender&&) noexcept = default;sender& operator=(sender&&) noexcept = default;bool send(const void* buffer, DWORD size){if (WriteFile(m_pipe.get(), buffer, size, NULL, NULL) == FALSE) {DWORD error = GetLastError();switch (error) {case ERROR_INVALID_HANDLE:case ERROR_PIPE_NOT_CONNECTED:connect();break;default:return false;}if (WriteFile(m_pipe.get(), buffer, size, NULL, NULL) == FALSE)return false;}FlushFileBuffers(m_pipe.get());return true;}private:void connect(){// In order to CloseHandle before CreateFile, you need to destroy// what's inside the unique_ptr by either calling reset() or assigning// it nullptr.m_pipe = nullptr;m_pipe.reset(CreateFileA(m_name.c_str(), GENERIC_WRITE,FILE_SHARE_READ, NULL, OPEN_ALWAYS, NULL,NULL));}private:details::unique_handle m_pipe;std::string m_name;};class DuplexPipe {public:using Callback = std::function<void(uint8_t*, size_t)>;DuplexPipe(const std::string& self_name, const std::string& peer_name, Callback recv_callback): self_name_(self_name),peer_name_(peer_name),recv_callback_(recv_callback),running_(false){}void start() {// 启动receiver线程receiver_ = std::make_unique<win_pipe::receiver>(self_name_.c_str(), recv_callback_);running_ = true;recv_thread_ = std::thread([this]() {while (running_) {// 让receiver持续工作(如需阻塞可略过此循环)std::this_thread::sleep_for(std::chrono::milliseconds(100));}});// sender可以随时使用,不需要线程sender_ = std::make_unique<win_pipe::sender>(peer_name_.c_str());}void stop() {running_ = false;if (recv_thread_.joinable()) recv_thread_.join();receiver_.reset();sender_.reset();}void send(const uint8_t* data, size_t size) {if (sender_) sender_->send(data, size);}void send(const std::string& str) {if (sender_)sender_->send((uint8_t*)str.data(), str.size());}~DuplexPipe() {stop();}private:std::string self_name_;std::string peer_name_;Callback recv_callback_;std::unique_ptr<win_pipe::receiver> receiver_;std::unique_ptr<win_pipe::sender> sender_;std::thread recv_thread_;std::atomic<bool> running_;};}
进程A main.cpp
#include "win_pipe.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <string>int main() {std::cout << "进程A启动..." << std::endl;try {// 进程A:监听管道 "pipe_a",向管道 "pipe_b" 发送数据win_pipe::DuplexPipe pipe("pipe_a", // 自身监听的管道名"pipe_b", // 对端管道名[](uint8_t* data, size_t size) {// 接收数据的回调函数std::string message(reinterpret_cast<char*>(data), size);std::cout << "[进程A 收到]: " << message << std::endl;});pipe.start();std::cout << "进程A管道已启动" << std::endl;// 发送一些测试消息for (int i = 1; i <= 5; ++i) {std::string message = "Hello from Process A, message #" + std::to_string(i);pipe.send(message);std::cout << "[进程A 发送]: " << message << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));}// 等待一段时间让进程B有机会回复std::this_thread::sleep_for(std::chrono::seconds(10));pipe.stop();std::cout << "进程A结束" << std::endl;} catch (const std::exception& e) {std::cerr << "进程A错误: " << e.what() << std::endl;return 1;}return 0;
}
进程B main.cpp
#include "win_pipe.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <string>int main() {std::cout << "进程B启动..." << std::endl;try {// 进程B:监听管道 "pipe_b",向管道 "pipe_a" 发送数据win_pipe::DuplexPipe pipe("pipe_b", // 自身监听的管道名"pipe_a", // 对端管道名 [](uint8_t* data, size_t size) {// 接收数据的回调函数std::string message(reinterpret_cast<char*>(data), size);std::cout << "[进程B 收到]: " << message << std::endl;// 可以在这里添加回复逻辑});pipe.start();std::cout << "进程B管道已启动" << std::endl;// 发送回复消息for (int i = 1; i <= 3; ++i) {std::string message = "Reply from Process B, reply #" + std::to_string(i);pipe.send(message);std::cout << "[进程B 发送]: " << message << std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));}// 保持运行以接收更多消息std::this_thread::sleep_for(std::chrono::seconds(15));pipe.stop();std::cout << "进程B结束" << std::endl;} catch (const std::exception& e) {std::cerr << "进程B错误: " << e.what() << std::endl;return 1;}return 0;
}