一个简化版的进程内通信库实现
以下是一个简化版的进程内通信库实现,支持类似ZeroMQ的inproc
模式,包含多线程通信和基本的PUSH/PULL模式:
c
include <pthread.h>
include <stdlib.h>
include <string.h>
include <stdint.h>
include <sys/queue.h>
// --------------------- 数据结构定义 ---------------------
typedef struct {
pthread_mutex_t lock;
pthread_cond_t cond;
void *data;
size_t size;
TAILQ_ENTRY(msg_entry) entries;
} msg_entry;
TAILQ_HEAD(msg_queue, msg_entry);
typedef struct {
char *endpoint;
struct msg_queue queue;
pthread_mutex_t lock;
pthread_cond_t cond;
} endpoint_t;
typedef struct {
endpoint_t *endpoints;
size_t num_endpoints;
} ipc_context;
typedef enum {
IPC_PUSH,
IPC_PULL
} socket_type;
typedef struct {
socket_type type;
endpoint_t *bind_endpoint;
endpoint_t *connect_endpoint;
} ipc_socket;
// --------------------- 全局上下文 ---------------------
static ipc_context g_ctx = {0};
// --------------------- 核心函数实现 ---------------------
// 初始化上下文
int ipc_ctx_init() {
g_ctx.endpoints = NULL;
g_ctx.num_endpoints = 0;
return 0;
}
// 创建socket
ipc_socket* ipc_socket_create(socket_type type) {
ipc_socket *s = malloc(sizeof(ipc_socket));
s->type = type;
s->bind_endpoint = NULL;
s->connect_endpoint = NULL;
return s;
}
// 绑定端点
int ipc_bind(ipc_socket *s, const char *endpoint) {
for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
if (strcmp(g_ctx.endpointsi.endpoint, endpoint) == 0) {
s->bind_endpoint = &g_ctx.endpointsi;
return 0;
}
}
endpoint_t *ep = realloc(g_ctx.endpoints, (g_ctx.num_endpoints + 1) * sizeof(endpoint_t));
if (!ep) return -1;
epg_ctx.num_endpoints.endpoint = strdup(endpoint);
TAILQ_INIT(&epg_ctx.num_endpoints.queue);
pthread_mutex_init(&epg_ctx.num_endpoints.lock, NULL);
pthread_cond_init(&epg_ctx.num_endpoints.cond, NULL);
s->bind_endpoint = &epg_ctx.num_endpoints;
g_ctx.endpoints = ep;
g_ctx.num_endpoints++;
return 0;
}
// 连接端点
int ipc_connect(ipc_socket *s, const char *endpoint) {
for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
if (strcmp(g_ctx.endpointsi.endpoint, endpoint) == 0) {
s->connect_endpoint = &g_ctx.endpointsi;
return 0;
}
}
return -1; // 端点不存在需先绑定
}
// 发送消息
int ipc_send(ipc_socket *s, const void *data, size_t size, int flags) {
if (s->type != IPC_PUSH || !s->bind_endpoint) return -1;
msg_entry *entry = malloc(sizeof(msg_entry));
entry->data = malloc(size);
memcpy(entry->data, data, size);
entry->size = size;
pthread_mutex_lock(&s->bind_endpoint->lock);
TAILQ_INSERT_TAIL(&s->bind_endpoint->queue, entry, entries);
pthread_cond_signal(&s->bind_endpoint->cond);
pthread_mutex_unlock(&s->bind_endpoint->lock);
return 0;
}
// 接收消息
int ipc_recv(ipc_socket *s, void data, size_t *size, int flags) {
if (s->type != IPC_PULL || !s->connect_endpoint) return -1;
pthread_mutex_lock(&s->connect_endpoint->lock);
while (TAILQ_EMPTY(&s->connect_endpoint->queue)) {
pthread_cond_wait(&s->connect_endpoint->cond, &s->connect_endpoint->lock);
}
msg_entry *entry = TAILQ_FIRST(&s->connect_endpoint->queue);
*data = entry->data;
*size = entry->size;
TAILQ_REMOVE(&s->connect_endpoint->queue, entry, entries);
pthread_mutex_unlock(&s->connect_endpoint->lock);
free(entry);
return 0;
}
// 清理资源
void ipc_cleanup() {
for (size_t i = 0; i < g_ctx.num_endpoints; i++) {
free(g_ctx.endpointsi.endpoint);
pthread_mutex_destroy(&g_ctx.endpointsi.lock);
pthread_cond_destroy(&g_ctx.endpointsi.cond);
}
free(g_ctx.endpoints);
}
使用示例
c
include <stdio.h>
include <unistd.h>
void* sender_thread(void *arg) {
ipc_socket *push_socket = ipc_socket_create(IPC_PUSH);
ipc_bind(push_socket, "inproc://test");
const char *msg = "Hello from thread!";
ipc_send(push_socket, msg, strlen(msg)+1, 0);
return NULL;
}
void* receiver_thread(void *arg) {
ipc_socket *pull_socket = ipc_socket_create(IPC_PULL);
ipc_connect(pull_socket, "inproc://test");
void *data;
size_t size;
ipc_recv(pull_socket, &data, &size, 0);
printf("Received: %s\n", (char*)data);
free(data);
return NULL;
}
int main() {
ipc_ctx_init();
pthread_t t1, t2;
pthread_create(&t1, NULL, sender_thread, NULL);
pthread_create(&t2, NULL, receiver_thread, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
ipc_cleanup();
return 0;
}
核心特性说明
-
轻量级进程内通信
仅依赖POSIX线程库,适用于多线程通信场景。 -
PUSH/PULL模式
- PUSH端绑定端点后发送消息
- PULL端连接端点后接收消息
-
零拷贝支持
实际实现中可优化为引用计数,此处为简化版使用内存拷贝。 -
线程安全队列
使用互斥锁和条件变量实现生产者-消费者模型。
扩展方向
5. 性能优化
- 使用无锁队列(如CAS操作)
- 实现真正的零拷贝(共享内存+引用计数)
- 更多通信模式
c
typedef enum {
IPC_PUSH,
IPC_PULL,
IPC_PUB,
IPC_SUB,
IPC_REQ,
IPC_REP
} socket_type;
- 超时机制
c
int ipc_recv(ipc_socket *s, void data, size_t *size, int timeout_ms) {
// 使用pthread_cond_timedwait实现超时
}
- 多端点管理
- 实现动态端点发现
- 支持通配符订阅(如ZeroMQ的"topic"机制)
该实现约200行代码,体现了ZeroMQ的核心思想,可根据需求进一步扩展功能。