当前位置: 首页 > news >正文

一个简化版的进程内通信库实现

以下是一个简化版的进程内通信库实现,支持类似ZeroMQ的inproc模式,包含多线程通信和基本的PUSH/PULL模式:

c
include <pthread.h>
include <stdlib.h>
include <string.h>
include <stdint.h>
include <sys/queue.h>

// --------------------- 数据结构定义 ---------------------
typedef struct {
    pthread_mutex_t lock;
    pthread_cond_t cond;
    void *data;
    size_t size;
    TAILQ_ENTRY(msg_entry) entries;
} msg_entry;

TAILQ_HEAD(msg_queue, msg_entry);

typedef struct {
    char *endpoint;
    struct msg_queue queue;
    pthread_mutex_t lock;
    pthread_cond_t cond;
} endpoint_t;

typedef struct {
    endpoint_t *endpoints;
    size_t num_endpoints;
} ipc_context;

typedef enum {
    IPC_PUSH,
    IPC_PULL
} socket_type;

typedef struct {
    socket_type type;
    endpoint_t *bind_endpoint;
    endpoint_t *connect_endpoint;
} ipc_socket;

// --------------------- 全局上下文 ---------------------
static ipc_context g_ctx = {0};

// --------------------- 核心函数实现 ---------------------

// 初始化上下文
int ipc_ctx_init() {
    g_ctx.endpoints = NULL;
    g_ctx.num_endpoints = 0;
    return 0;
}

// 创建socket
ipc_socket* ipc_socket_create(socket_type type) {
    ipc_socket *s = malloc(sizeof(ipc_socket));
    s->type = type;
    s->bind_endpoint = NULL;
    s->connect_endpoint = NULL;
    return s;
}

// 绑定端点
int ipc_bind(ipc_socket *s, const char *endpoint) {
    for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
        if (strcmp(g_ctx.endpointsi.endpoint, endpoint) == 0) {
            s->bind_endpoint = &g_ctx.endpointsi;
            return 0;
        }
    }

    endpoint_t *ep = realloc(g_ctx.endpoints, (g_ctx.num_endpoints + 1) * sizeof(endpoint_t));
    if (!ep) return -1;
    
    epg_ctx.num_endpoints.endpoint = strdup(endpoint);
    TAILQ_INIT(&epg_ctx.num_endpoints.queue);
    pthread_mutex_init(&epg_ctx.num_endpoints.lock, NULL);
    pthread_cond_init(&epg_ctx.num_endpoints.cond, NULL);
    
    s->bind_endpoint = &epg_ctx.num_endpoints;
    g_ctx.endpoints = ep;
    g_ctx.num_endpoints++;
    return 0;
}

// 连接端点
int ipc_connect(ipc_socket *s, const char *endpoint) {
    for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
        if (strcmp(g_ctx.endpointsi.endpoint, endpoint) == 0) {
            s->connect_endpoint = &g_ctx.endpointsi;
            return 0;
        }
    }
    return -1; // 端点不存在需先绑定
}

// 发送消息
int ipc_send(ipc_socket *s, const void *data, size_t size, int flags) {
    if (s->type != IPC_PUSH || !s->bind_endpoint) return -1;

    msg_entry *entry = malloc(sizeof(msg_entry));
    entry->data = malloc(size);
    memcpy(entry->data, data, size);
    entry->size = size;

    pthread_mutex_lock(&s->bind_endpoint->lock);
    TAILQ_INSERT_TAIL(&s->bind_endpoint->queue, entry, entries);
    pthread_cond_signal(&s->bind_endpoint->cond);
    pthread_mutex_unlock(&s->bind_endpoint->lock);
    return 0;
}

// 接收消息
int ipc_recv(ipc_socket *s, void data, size_t *size, int flags) {
    if (s->type != IPC_PULL || !s->connect_endpoint) return -1;

    pthread_mutex_lock(&s->connect_endpoint->lock);
    while (TAILQ_EMPTY(&s->connect_endpoint->queue)) {
        pthread_cond_wait(&s->connect_endpoint->cond, &s->connect_endpoint->lock);
    }

    msg_entry *entry = TAILQ_FIRST(&s->connect_endpoint->queue);
    *data = entry->data;
    *size = entry->size;
    TAILQ_REMOVE(&s->connect_endpoint->queue, entry, entries);
    pthread_mutex_unlock(&s->connect_endpoint->lock);
    
    free(entry);
    return 0;
}

// 清理资源
void ipc_cleanup() {
    for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
        free(g_ctx.endpointsi.endpoint);
        pthread_mutex_destroy(&g_ctx.endpointsi.lock);
        pthread_cond_destroy(&g_ctx.endpointsi.cond);
    }
    free(g_ctx.endpoints);
}

 使用示例
c
include <stdio.h>
include <unistd.h>

void* sender_thread(void *arg) {
    ipc_socket *push_socket = ipc_socket_create(IPC_PUSH);
    ipc_bind(push_socket, "inproc://test");
    
    const char *msg = "Hello from thread!";
    ipc_send(push_socket, msg, strlen(msg)+1, 0);
    return NULL;
}

void* receiver_thread(void *arg) {
    ipc_socket *pull_socket = ipc_socket_create(IPC_PULL);
    ipc_connect(pull_socket, "inproc://test");
    
    void *data;
    size_t size;
    ipc_recv(pull_socket, &data, &size, 0);
    printf("Received: %s\n", (char*)data);
    free(data);
    return NULL;
}

int main() {
    ipc_ctx_init();
    
    pthread_t t1, t2;
    pthread_create(&t1, NULL, sender_thread, NULL);
    pthread_create(&t2, NULL, receiver_thread, NULL);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
    ipc_cleanup();
    return 0;
}

核心特性说明

  1. 轻量级进程内通信
    仅依赖POSIX线程库,适用于多线程通信场景。

  2. PUSH/PULL模式

    • PUSH端绑定端点后发送消息
    • PULL端连接端点后接收消息
  3. 零拷贝支持
    实际实现中可优化为引用计数,此处为简化版使用内存拷贝。

  4. 线程安全队列
    使用互斥锁和条件变量实现生产者-消费者模型。

扩展方向
5. 性能优化

  • 使用无锁队列(如CAS操作)
  • 实现真正的零拷贝(共享内存+引用计数)
  1. 更多通信模式
   c
   typedef enum {
       IPC_PUSH,
       IPC_PULL,
       IPC_PUB,
       IPC_SUB,
       IPC_REQ,
       IPC_REP
   } socket_type;
  1. 超时机制
   c
   int ipc_recv(ipc_socket *s, void data, size_t *size, int timeout_ms) {
       // 使用pthread_cond_timedwait实现超时
   }
  1. 多端点管理
    • 实现动态端点发现
    • 支持通配符订阅(如ZeroMQ的"topic"机制)

该实现约200行代码,体现了ZeroMQ的核心思想,可根据需求进一步扩展功能。

相关文章:

  • 深入理解Java中的static关键字及其内存原理
  • 《云原生技术:DeepSeek分布式推理的效能倍增器》
  • Git系列之git checkout
  • 发起请求的步骤
  • Spring Boot整合WebSocket
  • 基于深度学习的中文文本情感分析系统
  • EasyTwin全新体验 | 春启新章,智焕新生
  • 隧道定向号角喇叭为隧道安全保驾护航
  • Microsof Visual Studio Code 安装教程(中文设置)
  • 计算机网络基础知识(web漏洞解析与攻防实战)
  • stm32week6
  • linux docker相关指令
  • Centos磁盘扩容
  • 企业招聘能力提升之道:突破困境,精准纳才
  • Synology 部署的 WordPress 無法升級至最新版本時,可以透過以下改良版指南進行排查和解決。
  • springboot3.X 无法解析parameter参数问题
  • git规范提交之commitizen conventional-changelog-cli 安装
  • 使用Python和p5.js创建的迷你游戏示例,该游戏包含多个屏幕和动画,满足在画布上显示图像、使用键盘命令移动图像
  • [GHCTF 2025]SQL??? 【sqlite注入】
  • java中过滤器
  • 见微知沪|科学既要勇攀高峰,又要放低身段
  • 美国失去最后一个AAA评级,资产价格怎么走?美股或将触及天花板
  • 南宁一学校发生伤害案件,警方通报:嫌疑人死亡,2人受伤
  • 【社论】城市更新,始终以人为核心
  • 马上评|训斥打骂女儿致死,无暴力应是“管教”底线
  • 坚持吃素,是不是就不会得高血脂了?