当前位置: 首页 > news >正文

Linux 匿名管道实现进程池

文章目录

  • 🚀 基于Linux管道通信的进程池设计与实现
    • 一、项目概述
    • 二、完整代码实现
      • 1. 📁 任务定义头文件(Task.h)
      • 2. 💻 主程序(ProcessPool.cc)
    • 三、🔧 关键技术解析
      • 1. 管道生命周期管理
      • 2. 进程池初始化流程
      • 3. 任务分发机制

在这里插入图片描述

🚀 基于Linux管道通信的进程池设计与实现


一、项目概述

本文实现了一个基于匿名管道的多进程任务处理系统,核心功能包括:

  • 进程池管理:动态创建/销毁子进程 🛠️
  • 任务分发:通过管道发送控制指令 📨
  • 资源回收:自动清理僵尸进程 ♻️
  • 错误处理:管道断裂信号处理 🛑

二、完整代码实现

1. 📁 任务定义头文件(Task.h)

#pragma once
#include <iostream>
#include <functional>
#include <vector>

using func = std::function<void()>;

// 🏭 任务工厂函数
func createLogTask() {
    return []{ 
        std::cout << "🪵 [LOG] Updating system logs" << std::endl;
    };
}

func createRenderTask() {
    return []{ 
        std::cout << "🖥️ [RENDER] Refreshing display" << std::endl;
    };
}

// 📥 加载任务到全局容器
void loadTasks(std::vector<func>* tasks) {
    tasks->push_back(createLogTask());
    tasks->push_back(createRenderTask());
}

2. 💻 主程序(ProcessPool.cc)

#include "Task.h"
#include <unistd.h>
#include <vector>
#include <sys/wait.h>
#include <signal.h>

const int WORKER_NUM = 4;
std::vector<func> g_tasks;

// 📡 通信管道封装
struct Channel {
    int write_fd;       // 父进程写端
    pid_t pid;          // 子进程PID
    std::string name;
    
    Channel(int fd, pid_t p, const std::string& n)
        : write_fd(fd), pid(p), name(n) {}
};

// 🔄 子进程工作循环
void workerMain() {
    while(true) {
        int cmd = 0;
        ssize_t n = read(STDIN_FILENO, &cmd, sizeof(cmd));
        
        if(n <= 0) { // 管道关闭或错误
            std::cerr << "🔌 Worker " << getpid() << " exiting" << std::endl;
            break;
        }
        
        if(cmd >= 0 && cmd < g_tasks.size()) {
            std::cout << "👷 Worker " << getpid() 
                     << " executing task " << cmd << std::endl;
            g_tasks[cmd](); // 执行注册任务
        }
    }
    exit(EXIT_SUCCESS);
}

// 🏗️ 初始化进程池
std::vector<Channel> createWorkers() {
    std::vector<Channel> workers;
    std::vector<int> old_fds;

    for(int i=0; i<WORKER_NUM; ++i) {
        int pipefd[2];
        pipe(pipefd); // 创建新管道

        pid_t pid = fork();
        if(pid == 0) { // 子进程
            for(int fd : old_fds) close(fd); // 关闭旧管道
            close(pipefd[1]); // 关闭写端
            
            dup2(pipefd[0], STDIN_FILENO); // 重定向标准输入
            close(pipefd[0]);
            
            workerMain(); // 永不返回
        }
        
        // 父进程
        close(pipefd[0]); // 关闭读端
        workers.emplace_back(pipefd[1], pid, "worker-"+std::to_string(i));
        old_fds.push_back(pipefd[1]);
    }
    return workers;
}

// 📨 发送控制指令
void dispatchTasks(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        int cmd = rand() % g_tasks.size();
        if(write(w.write_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
            perror("❌ Write task failed");
        }
    }
}

// 🧹 清理资源
void cleanupWorkers(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        close(w.write_fd);
        waitpid(w.pid, nullptr, 0);
    }
}

int main() {
    signal(SIGPIPE, SIG_IGN); // 忽略管道断裂信号
    loadTasks(&g_tasks);
    
    auto workers = createWorkers();
    dispatchTasks(workers);
    
    sleep(1); // 等待任务完成
    cleanupWorkers(workers);
    return 0;
}

三、🔧 关键技术解析

1. 管道生命周期管理

// 👨💻 父进程
int pipefd[2];
pipe(pipefd);
close(pipefd[0]); // 只保留写端

// 👶 子进程
close(pipefd[1]);
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);

设计要点

  • 每个Worker独占一个管道 🚪
  • 父进程持有所有写端文件描述符 🔑
  • 子进程通过标准输入读取指令 📥

2. 进程池初始化流程

graph TD
    A[主进程] --> B[🛠️ 创建管道1]
    B --> C[👶 fork子进程1]
    C --> D[🔀 重定向输入]
    D --> E[🔄 进入工作循环]
    A --> F[🛠️ 创建管道2]
    F --> G[👶 fork子进程2]
    G --> H[🗑️ 关闭旧管道]

3. 任务分发机制

void dispatchTasks(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        int cmd = rand() % g_tasks.size();
        write(w.write_fd, &cmd, sizeof(cmd));
    }
}

工作流程

  1. 🎲 随机选择任务编号
  2. 📨 通过管道发送整型指令
  3. 👷 Worker解析指令执行对应任务

相关文章:

  • 【瞎折腾/Dify】使用docker离线部署Dify
  • 基于RWA 与 AI-Agent 协同的企业数字化生态构建
  • LLM自动化评测
  • 系统思考:客户价值
  • 1700. 无法吃午餐的学生数量
  • 图的存储、DFS、BFS
  • 基于YOLO11深度学习的舌苔舌象检测识别与诊断系统【python源码+Pyqt5界面+数据集+训练代码】
  • Unity 和 Python 的连接(通过SocketIO)附源码
  • 【栈数据结构应用解析:常见算法题详细解答】—— Leetcode
  • 【春招笔试】2025.03.13-蚂蚁春招笔试题
  • 933. 最近的请求次数
  • 10个数据收集相关DeepSeek提示词
  • 机器学习神经网络中的损失函数表达的是什么意思
  • 基于SpringBoot + Vue 的房屋租赁系统
  • Java---JavaSpringMVC解析(1)
  • 【eNSP实战】配置NAPT(含动态NAT)
  • Java入职篇(2)——开发流程以及专业术语
  • 【C++项目】从零实现RPC框架「二」:项⽬设计
  • C++之文字修仙小游戏
  • 原理-事件循环
  • 建筑瞭望|从黄浦江畔趸船改造看航运设施的升级与利用
  • CBA官方对孙铭徽罚款3万、广厦投资人楼明停赛2场罚款5万
  • 广东缉捕1名象牙走私潜逃非洲“红通”逃犯
  • 烤肉店从泔水桶内捞出肉串再烤?西安未央区市监局:停业整顿
  • 中国科学院院士、我国航天液体火箭技术专家朱森元逝世
  • 乌拉圭前总统何塞·穆希卡去世