成都专业网站制作网站广告sdk接入
文章目录
- 🚀 基于Linux管道通信的进程池设计与实现
- 一、项目概述
- 二、完整代码实现
- 1. 📁 任务定义头文件(Task.h)
- 2. 💻 主程序(ProcessPool.cc)
- 三、🔧 关键技术解析
- 1. 管道生命周期管理
- 2. 进程池初始化流程
- 3. 任务分发机制
🚀 基于Linux管道通信的进程池设计与实现
一、项目概述
本文实现了一个基于匿名管道的多进程任务处理系统,核心功能包括:
- 进程池管理:动态创建/销毁子进程 🛠️
- 任务分发:通过管道发送控制指令 📨
- 资源回收:自动清理僵尸进程 ♻️
- 错误处理:管道断裂信号处理 🛑
二、完整代码实现
1. 📁 任务定义头文件(Task.h)
#pragma once
#include <iostream>
#include <functional>
#include <vector>using func = std::function<void()>;// 🏭 任务工厂函数
func createLogTask() {return []{ std::cout << "🪵 [LOG] Updating system logs" << std::endl;};
}func createRenderTask() {return []{ std::cout << "🖥️ [RENDER] Refreshing display" << std::endl;};
}// 📥 加载任务到全局容器
void loadTasks(std::vector<func>* tasks) {tasks->push_back(createLogTask());tasks->push_back(createRenderTask());
}
2. 💻 主程序(ProcessPool.cc)
#include "Task.h"
#include <unistd.h>
#include <vector>
#include <sys/wait.h>
#include <signal.h>const int WORKER_NUM = 4;
std::vector<func> g_tasks;// 📡 通信管道封装
struct Channel {int write_fd; // 父进程写端pid_t pid; // 子进程PIDstd::string name;Channel(int fd, pid_t p, const std::string& n): write_fd(fd), pid(p), name(n) {}
};// 🔄 子进程工作循环
void workerMain() {while(true) {int cmd = 0;ssize_t n = read(STDIN_FILENO, &cmd, sizeof(cmd));if(n <= 0) { // 管道关闭或错误std::cerr << "🔌 Worker " << getpid() << " exiting" << std::endl;break;}if(cmd >= 0 && cmd < g_tasks.size()) {std::cout << "👷 Worker " << getpid() << " executing task " << cmd << std::endl;g_tasks[cmd](); // 执行注册任务}}exit(EXIT_SUCCESS);
}// 🏗️ 初始化进程池
std::vector<Channel> createWorkers() {std::vector<Channel> workers;std::vector<int> old_fds;for(int i=0; i<WORKER_NUM; ++i) {int pipefd[2];pipe(pipefd); // 创建新管道pid_t pid = fork();if(pid == 0) { // 子进程for(int fd : old_fds) close(fd); // 关闭旧管道close(pipefd[1]); // 关闭写端dup2(pipefd[0], STDIN_FILENO); // 重定向标准输入close(pipefd[0]);workerMain(); // 永不返回}// 父进程close(pipefd[0]); // 关闭读端workers.emplace_back(pipefd[1], pid, "worker-"+std::to_string(i));old_fds.push_back(pipefd[1]);}return workers;
}// 📨 发送控制指令
void dispatchTasks(const std::vector<Channel>& workers) {for(const auto& w : workers) {int cmd = rand() % g_tasks.size();if(write(w.write_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {perror("❌ Write task failed");}}
}// 🧹 清理资源
void cleanupWorkers(const std::vector<Channel>& workers) {for(const auto& w : workers) {close(w.write_fd);waitpid(w.pid, nullptr, 0);}
}int main() {signal(SIGPIPE, SIG_IGN); // 忽略管道断裂信号loadTasks(&g_tasks);auto workers = createWorkers();dispatchTasks(workers);sleep(1); // 等待任务完成cleanupWorkers(workers);return 0;
}
三、🔧 关键技术解析
1. 管道生命周期管理
// 👨💻 父进程
int pipefd[2];
pipe(pipefd);
close(pipefd[0]); // 只保留写端// 👶 子进程
close(pipefd[1]);
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);
设计要点:
- 每个Worker独占一个管道 🚪
- 父进程持有所有写端文件描述符 🔑
- 子进程通过标准输入读取指令 📥
2. 进程池初始化流程
graph TDA[主进程] --> B[🛠️ 创建管道1]B --> C[👶 fork子进程1]C --> D[🔀 重定向输入]D --> E[🔄 进入工作循环]A --> F[🛠️ 创建管道2]F --> G[👶 fork子进程2]G --> H[🗑️ 关闭旧管道]
3. 任务分发机制
void dispatchTasks(const std::vector<Channel>& workers) {for(const auto& w : workers) {int cmd = rand() % g_tasks.size();write(w.write_fd, &cmd, sizeof(cmd));}
}
工作流程:
- 🎲 随机选择任务编号
- 📨 通过管道发送整型指令
- 👷 Worker解析指令执行对应任务