对上篇ipc 比较完美应用框架设计拓展
上篇内容:如何设个一个比较完美的用户空间ipc应用框架-CSDN博客
一、内存池管理完整实现
1.1 内存池头文件
include/memory_pool.h
#ifndef MEMORY_POOL_H
#define MEMORY_POOL_H
#include "ipc_core.h"
#include <pthread.h>
#include <stddef.h>
/* 内存池统计信息 */
typedef struct {size_t total_allocations;size_t successful_allocations;size_t failed_allocations;size_t cache_hits;size_t cache_misses;double avg_alloc_time_ns;
} pool_stats_t;
/* 消息对象池 */
typedef struct {ipc_message_t* messages; // 预分配的消息数组size_t* free_indices; // 空闲索引栈size_t pool_size; // 池子总大小size_t used_count; // 已使用数量size_t free_top; // 空闲栈顶指针pthread_mutex_t lock; // 线程安全锁pthread_spinlock_t fast_lock; // 快速路径自旋锁pool_stats_t stats; // 统计信息struct timespec last_stat_reset; // 最后统计重置时间
} message_pool_t;
/* 内存池API */
message_pool_t* message_pool_create(size_t initial_size);
void message_pool_destroy(message_pool_t* pool);
ipc_message_t* message_pool_allocate(message_pool_t* pool, int use_fast_path);
void message_pool_release(message_pool_t* pool, ipc_message_t* msg);
/* 批量操作 */
int message_pool_allocate_batch(message_pool_t* pool, ipc_message_t** messages, size_t count);
int message_pool_release_batch(message_pool_t* pool, ipc_message_t** messages, size_t count);
/* 统计和监控 */
void message_pool_get_stats(const message_pool_t* pool, pool_stats_t* stats);
void message_pool_reset_stats(message_pool_t* pool);
size_t message_pool_get_usage(const message_pool_t* pool);
/* 动态调整 */
int message_pool_expand(message_pool_t* pool, size_t additional_size);
int message_pool_shrink(message_pool_t* pool, size_t target_size);
#endif
1.2 内存池实现
src/utils/memory_pool.c
#include "../../include/memory_pool.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#define ALIGNMENT 64 // 缓存行对齐
#define MIN_POOL_SIZE 16
#define MAX_POOL_SIZE 65536
/* 计算对齐的内存地址 */
static inline void* align_ptr(void* ptr, size_t alignment) {return (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1));
}
/* 获取当前时间戳(纳秒) */
static inline uint64_t get_time_ns() {struct timespec ts;clock_gettime(CLOCK_MONOTONIC, &ts);return (uint64_t)ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
message_pool_t* message_pool_create(size_t initial_size) {if (initial_size < MIN_POOL_SIZE) initial_size = MIN_POOL_SIZE;if (initial_size > MAX_POOL_SIZE) initial_size = MAX_POOL_SIZE;message_pool_t* pool = calloc(1, sizeof(message_pool_t));if (!pool) return NULL;// 分配对齐的消息内存size_t total_size = initial_size * sizeof(ipc_message_t) + ALIGNMENT;pool->messages = malloc(total_size);if (!pool->messages) {free(pool);return NULL;}// 对齐消息数组pool->messages = align_ptr(pool->messages, ALIGNMENT);// 分配空闲索引数组pool->free_indices = malloc(initial_size * sizeof(size_t));if (!pool->free_indices) {free(pool->messages);free(pool);return NULL;}// 初始化空闲索引栈(逆序,提高缓存局部性)for (size_t i = 0; i < initial_size; i++) {pool->free_indices[i] = initial_size - 1 - i;}pool->pool_size = initial_size;pool->free_top = initial_size;// 初始化锁pthread_mutex_init(&pool->lock, NULL);pthread_spin_init(&pool->fast_lock, PTHREAD_PROCESS_PRIVATE);// 初始化统计clock_gettime(CLOCK_MONOTONIC, &pool->last_stat_reset);pool->stats.total_allocations = 0;pool->stats.successful_allocations = 0;pool->stats.failed_allocations = 0;pool->stats.cache_hits = 0;pool->stats.cache_misses = 0;pool->stats.avg_alloc_time_ns = 0.0;return pool;
}
void message_pool_destroy(message_pool_t* pool) {if (!pool) return;// 计算原始指针地址进行释放void* original_ptr = (char*)pool->messages - ALIGNMENT;free(original_ptr);free(pool->free_indices);pthread_mutex_destroy(&pool->lock);pthread_spin_destroy(&pool->fast_lock);free(pool);
}
ipc_message_t* message_pool_allocate(message_pool_t* pool, int use_fast_path) {if (!pool) return NULL;uint64_t start_time = get_time_ns();ipc_message_t* msg = NULL;if (use_fast_path) {// 快速路径:使用自旋锁pthread_spin_lock(&pool->fast_lock);if (pool->free_top > 0) {size_t index = pool->free_indices[--pool->free_top];msg = &pool->messages[index];pool->used_count++;pool->stats.cache_hits++;}pthread_spin_unlock(&pool->fast_lock);if (msg) {// 初始化消息memset(msg, 0, sizeof(ipc_message_t));goto success;}// 快速路径失败,回退到慢速路径pool->stats.cache_misses++;}// 慢速路径:使用互斥锁,可能进行池扩展pthread_mutex_lock(&pool->lock);if (pool->free_top == 0) {// 池已满,尝试扩展if (message_pool_expand(pool, pool->pool_size) != 0) {pthread_mutex_unlock(&pool->lock);goto failure;}}size_t index = pool->free_indices[--pool->free_top];msg = &pool->messages[index];pool->used_count++;pthread_mutex_unlock(&pool->lock);// 初始化消息memset(msg, 0, sizeof(ipc_message_t));
success:pool->stats.successful_allocations++;pool->stats.total_allocations++;// 更新平均分配时间(指数移动平均)uint64_t alloc_time = get_time_ns() - start_time;pool->stats.avg_alloc_time_ns = 0.9 * pool->stats.avg_alloc_time_ns + 0.1 * alloc_time;return msg;
failure:pool->stats.failed_allocations++;pool->stats.total_allocations++;return NULL;
}
void message_pool_release(message_pool_t* pool, ipc_message_t* msg) {if (!pool || !msg) return;// 检查消息是否属于这个池if (msg < pool->messages || msg >= pool->messages + pool->pool_size) {return; // 不属于这个池,可能是静态分配的消息}size_t index = msg - pool->messages;// 使用自旋锁快速释放pthread_spin_lock(&pool->fast_lock);if (pool->free_top < pool->pool_size) {pool->free_indices[pool->free_top++] = index;pool->used_count--;} else {// 不应该发生,但安全处理pthread_spin_unlock(&pool->fast_lock);return;}pthread_spin_unlock(&pool->fast_lock);
}
int message_pool_allocate_batch(message_pool_t* pool, ipc_message_t** messages, size_t count) {if (!pool || !messages || count == 0) return -1;pthread_mutex_lock(&pool->lock);// 检查是否有足够空间if (pool->free_top < count) {// 需要扩展池size_t needed = count - pool->free_top;if (message_pool_expand(pool, needed) != 0) {pthread_mutex_unlock(&pool->lock);return -1;}}// 批量分配for (size_t i = 0; i < count; i++) {size_t index = pool->free_indices[--pool->free_top];messages[i] = &pool->messages[index];memset(messages[i], 0, sizeof(ipc_message_t));pool->used_count++;}pthread_mutex_unlock(&pool->lock);pool->stats.successful_allocations += count;pool->stats.total_allocations += count;return 0;
}
int message_pool_release_batch(message_pool_t* pool, ipc_message_t** messages, size_t count) {if (!pool || !messages || count == 0) return -1;pthread_spin_lock(&pool->fast_lock);// 检查是否有足够空间存放释放的消息if (pool->free_top + count > pool->pool_size) {pthread_spin_unlock(&pool->fast_lock);return -1;}// 批量释放for (size_t i = 0; i < count; i++) {if (messages[i] && messages[i] >= pool->messages && messages[i] < pool->messages + pool->pool_size) {size_t index = messages[i] - pool->messages;pool->free_indices[pool->free_top++] = index;pool->used_count--;}}pthread_spin_unlock(&pool->fast_lock);return 0;
}
int message_pool_expand(message_pool_t* pool, size_t additional_size) {if (!pool) return -1;size_t new_size = pool->pool_size + additional_size;if (new_size > MAX_POOL_SIZE) {new_size = MAX_POOL_SIZE;if (new_size <= pool->pool_size) {return -1; // 无法继续扩展}}// 重新分配消息数组size_t total_size = new_size * sizeof(ipc_message_t) + ALIGNMENT;ipc_message_t* new_messages = realloc((char*)pool->messages - ALIGNMENT, total_size);if (!new_messages) return -1;// 重新对齐pool->messages = align_ptr(new_messages, ALIGNMENT);// 重新分配空闲索引数组size_t* new_free_indices = realloc(pool->free_indices, new_size * sizeof(size_t));if (!new_free_indices) return -1;pool->free_indices = new_free_indices;// 添加新的空闲索引for (size_t i = pool->pool_size; i < new_size; i++) {pool->free_indices[pool->free_top++] = i;}pool->pool_size = new_size;return 0;
}
void message_pool_get_stats(const message_pool_t* pool, pool_stats_t* stats) {if (!pool || !stats) return;memcpy(stats, &pool->stats, sizeof(pool_stats_t));
}
size_t message_pool_get_usage(const message_pool_t* pool) {if (!pool) return 0;return pool->used_count;
}
二、批量操作完整实现
2.1 批量操作头文件
include/batch_operations.h
#ifndef BATCH_OPERATIONS_H
#define BATCH_OPERATIONS_H
#include "ipc_core.h"
#include "memory_pool.h"
/* 批量操作配置 */
typedef struct {size_t max_batch_size; // 最大批量大小size_t optimal_batch_size; // 最优批量大小unsigned int batch_timeout_ms; // 批量超时时间int enable_adaptive_batching; // 启用自适应批处理double target_throughput_mbps; // 目标吞吐量
} batch_config_t;
/* 批量操作上下文 */
typedef struct {batch_config_t config;message_pool_t* message_pool;// 自适应批处理状态size_t current_batch_size;size_t successful_batches;size_t failed_batches;double current_throughput_mbps;struct timespec last_adaptation;// 性能统计struct {size_t total_messages_batched;size_t total_batches_processed;uint64_t total_processing_time_ns;size_t peak_batch_size;} stats;
} batch_context_t;
/* 批量操作API */
batch_context_t* batch_context_create(const batch_config_t* config);
void batch_context_destroy(batch_context_t* context);
int ipc_batch_send(ipc_endpoint_t* endpoint, batch_context_t* context, const void* data_blocks[], const size_t sizes[], const int priorities[], size_t count);
int ipc_batch_receive(ipc_endpoint_t* endpoint, batch_context_t* context,ipc_message_t* messages[], size_t max_count, unsigned int timeout_ms);
/* 批量构建辅助函数 */
int batch_build_messages(batch_context_t* context, ipc_message_t* messages[],const void* data_blocks[], const size_t sizes[],const int priorities[], size_t count);
int batch_release_messages(batch_context_t* context, ipc_message_t* messages[],size_t count);
/* 自适应批处理 */
void batch_adapt_size(batch_context_t* context, double current_throughput);
size_t batch_get_optimal_size(batch_context_t* context);
/* 统计和监控 */
void batch_get_statistics(const batch_context_t* context, void* stats);
void batch_reset_statistics(batch_context_t* context);
#endif
2.2 批量操作实现
src/utils/batch_operations.c
#include "../../include/batch_operations.h"
#include <stdlib.h>
#include <string.h>
#include <math.h>
#define DEFAULT_MAX_BATCH_SIZE 64
#define DEFAULT_OPTIMAL_BATCH_SIZE 16
#define DEFAULT_BATCH_TIMEOUT_MS 100
#define MIN_BATCH_SIZE 1
#define MAX_BATCH_SIZE 256
batch_context_t* batch_context_create(const batch_config_t* config) {batch_context_t* context = calloc(1, sizeof(batch_context_t));if (!context) return NULL;// 设置配置if (config) {context->config = *config;} else {// 默认配置context->config.max_batch_size = DEFAULT_MAX_BATCH_SIZE;context->config.optimal_batch_size = DEFAULT_OPTIMAL_BATCH_SIZE;context->config.batch_timeout_ms = DEFAULT_BATCH_TIMEOUT_MS;context->config.enable_adaptive_batching = 1;context->config.target_throughput_mbps = 100.0; // 100 Mbps}// 验证配置if (context->config.max_batch_size > MAX_BATCH_SIZE) {context->config.max_batch_size = MAX_BATCH_SIZE;}if (context->config.optimal_batch_size > context->config.max_batch_size) {context->config.optimal_batch_size = context->config.max_batch_size;}// 创建消息池context->message_pool = message_pool_create(context->config.max_batch_size * 2);if (!context->message_pool) {free(context);return NULL;}context->current_batch_size = context->config.optimal_batch_size;clock_gettime(CLOCK_MONOTONIC, &context->last_adaptation);return context;
}
void batch_context_destroy(batch_context_t* context) {if (!context) return;if (context->message_pool) {message_pool_destroy(context->message_pool);}free(context);
}
int batch_build_messages(batch_context_t* context, ipc_message_t* messages[],const void* data_blocks[], const size_t sizes[],const int priorities[], size_t count) {if (!context || !messages || count == 0) return -1;// 批量分配消息if (message_pool_allocate_batch(context->message_pool, messages, count) != 0) {return -1;}// 填充消息内容unsigned int timestamp = (unsigned int)time(NULL);for (size_t i = 0; i < count; i++) {ipc_message_t* msg = messages[i];msg->mtype = 1;msg->sender_pid = getpid();msg->timestamp = timestamp;msg->sequence = i;msg->priority = priorities ? priorities[i] : 1;// 复制数据if (data_blocks && data_blocks[i] && sizes[i] > 0) {size_t copy_size = sizes[i];if (copy_size > sizeof(msg->data)) {copy_size = sizeof(msg->data);}memcpy(msg->data, data_blocks[i], copy_size);msg->data_len = copy_size;} else {msg->data_len = 0;}}return 0;
}
int ipc_batch_send(ipc_endpoint_t* endpoint, batch_context_t* context, const void* data_blocks[], const size_t sizes[], const int priorities[], size_t count) {if (!endpoint || !context || count == 0) return -1;uint64_t start_time = get_time_ns();// 限制批量大小if (count > context->config.max_batch_size) {count = context->config.max_batch_size;}// 批量构建消息ipc_message_t* messages[count];if (batch_build_messages(context, messages, data_blocks, sizes, priorities, count) != 0) {return -1;}// 批量发送int success_count = 0;for (size_t i = 0; i < count; i++) {if (endpoint->protocol->send(endpoint->protocol, messages[i]) == 0) {success_count++;// 更新端点统计endpoint->send_stats.sent_count++;endpoint->send_stats.total_sent_bytes += messages[i]->data_len;} else {endpoint->send_stats.error_count++;}}// 释放消息(无论发送成功与否)batch_release_messages(context, messages, count);// 更新批量统计uint64_t processing_time = get_time_ns() - start_time;context->stats.total_processing_time_ns += processing_time;context->stats.total_batches_processed++;context->stats.total_messages_batched += count;if (count > context->stats.peak_batch_size) {context->stats.peak_batch_size = count;}if (success_count == (int)count) {context->successful_batches++;} else {context->failed_batches++;}// 计算吞吐量并自适应调整if (context->config.enable_adaptive_batching) {double throughput = (count * sizeof(ipc_message_t) * 8.0) / (processing_time / 1e9) / 1e6; // Mbpsbatch_adapt_size(context, throughput);}return success_count;
}
int ipc_batch_receive(ipc_endpoint_t* endpoint, batch_context_t* context,ipc_message_t* messages[], size_t max_count, unsigned int timeout_ms) {if (!endpoint || !context || !messages || max_count == 0) return -1;uint64_t start_time = get_time_ns();struct timespec deadline;// 计算截止时间clock_gettime(CLOCK_MONOTONIC, &deadline);deadline.tv_sec += timeout_ms / 1000;deadline.tv_nsec += (timeout_ms % 1000) * 1000000;if (deadline.tv_nsec >= 1000000000) {deadline.tv_sec++;deadline.tv_nsec -= 1000000000;}size_t received_count = 0;struct timespec current_time;while (received_count < max_count) {// 检查超时clock_gettime(CLOCK_MONOTONIC, ¤t_time);if (current_time.tv_sec > deadline.tv_sec || (current_time.tv_sec == deadline.tv_sec && current_time.tv_nsec >= deadline.tv_nsec)) {break;}// 计算剩余超时时间unsigned int remaining_ms = 0;if (current_time.tv_sec < deadline.tv_sec) {remaining_ms = (deadline.tv_sec - current_time.tv_sec) * 1000;remaining_ms += (deadline.tv_nsec - current_time.tv_nsec) / 1000000;} else {remaining_ms = (deadline.tv_nsec - current_time.tv_nsec) / 1000000;}if (remaining_ms == 0) remaining_ms = 1;// 接收消息ipc_message_t* msg = message_pool_allocate(context->message_pool, 1);if (!msg) break;int result = endpoint->protocol->receive(endpoint->protocol, msg, remaining_ms);if (result == 0) {messages[received_count++] = msg;// 更新端点统计endpoint->receive_stats.received_count++;endpoint->receive_stats.total_received_bytes += msg->data_len;} else {// 接收失败,释放消息message_pool_release(context->message_pool, msg);if (result == -2) {// 超时break;}}}// 更新批量统计uint64_t processing_time = get_time_ns() - start_time;context->stats.total_processing_time_ns += processing_time;context->stats.total_batches_processed++;context->stats.total_messages_batched += received_count;if (received_count > context->stats.peak_batch_size) {context->stats.peak_batch_size = received_count;}return received_count;
}
void batch_adapt_size(batch_context_t* context, double current_throughput) {if (!context->config.enable_adaptive_batching) return;struct timespec now;clock_gettime(CLOCK_MONOTONIC, &now);// 每秒钟最多调整一次if (now.tv_sec - context->last_adaptation.tv_sec < 1) {return;}context->last_adaptation = now;context->current_throughput_mbps = current_throughput;double target = context->config.target_throughput_mbps;double ratio = current_throughput / target;if (ratio < 0.8) {// 吞吐量低于目标,增加批量大小context->current_batch_size = (size_t)(context->current_batch_size * 1.2);} else if (ratio > 1.2) {// 吞吐量高于目标,减少批量大小以降低延迟context->current_batch_size = (size_t)(context->current_batch_size * 0.9);}// 限制在合理范围内if (context->current_batch_size < MIN_BATCH_SIZE) {context->current_batch_size = MIN_BATCH_SIZE;}if (context->current_batch_size > context->config.max_batch_size) {context->current_batch_size = context->config.max_batch_size;}
}
int batch_release_messages(batch_context_t* context, ipc_message_t* messages[],size_t count) {if (!context || !messages || count == 0) return -1;return message_pool_release_batch(context->message_pool, messages, count);
}
三、零拷贝优化完整实现
3.1 零拷贝头文件
include/zero_copy.h
#ifndef ZERO_COPY_H
#define ZERO_COPY_H
#include "ipc_core.h"
#include <sys/mman.h>
/* 共享内存段描述符 */
typedef struct {int shm_id; // 共享内存IDvoid* address; // 映射地址size_t size; // 段大小key_t key; // 共享内存键值int is_owner; // 是否是创建者unsigned int reference_count; // 引用计数
} shm_segment_t;
/* 零拷贝消息头 */
typedef struct {size_t data_size; // 数据大小unsigned int sequence; // 序列号int sender_pid; // 发送者PIDunsigned int timestamp; // 时间戳unsigned char priority; // 优先级unsigned char flags; // 标志位uint16_t checksum; // 校验和
} zero_copy_header_t;
/* 零拷贝消息 */
typedef struct {zero_copy_header_t header; // 消息头char data[]; // 柔性数组,实际数据
} zero_copy_message_t;
/* 零拷贝上下文 */
typedef struct {shm_segment_t* segments; // 共享内存段数组size_t segment_count; // 段数量size_t max_segments; // 最大段数size_t segment_size; // 每个段的大小// 内存管理void* free_list; // 空闲块链表size_t block_size; // 块大小pthread_mutex_t lock; // 保护锁// 统计信息struct {size_t total_allocations;size_t total_deallocations;size_t shared_memory_used;size_t peak_memory_used;} stats;
} zero_copy_context_t;
/* 零拷贝API */
zero_copy_context_t* zero_copy_context_create(size_t segment_size, size_t max_segments);
void zero_copy_context_destroy(zero_copy_context_t* context);
void* zero_copy_allocate(zero_copy_context_t* context, size_t size);
void zero_copy_free(zero_copy_context_t* context, void* ptr);
/* IPC集成接口 */
int ipc_zero_copy_send(ipc_endpoint_t* endpoint, zero_copy_context_t* context,const void* data, size_t size, int priority);
void* ipc_zero_copy_receive(ipc_endpoint_t* endpoint, zero_copy_context_t* context,size_t* size, unsigned int timeout_ms);
/* 共享内存管理 */
shm_segment_t* shm_segment_create(zero_copy_context_t* context, size_t size);
shm_segment_t* shm_segment_attach(key_t key, size_t size);
int shm_segment_detach(shm_segment_t* segment);
int shm_segment_destroy(shm_segment_t* segment);
/* 工具函数 */
uint16_t calculate_checksum(const void* data, size_t size);
int validate_message(const zero_copy_message_t* message, size_t total_size);
#endif
3.2 零拷贝实现
src/utils/zero_copy.c
#include "../../include/zero_copy.h"
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <errno.h>
#define DEFAULT_SEGMENT_SIZE (2 * 1024 * 1024) // 2MB
#define DEFAULT_MAX_SEGMENTS 16
#define MIN_SEGMENT_SIZE (64 * 1024) // 64KB
#define MESSAGE_ALIGNMENT 64
// 空闲块头
typedef struct free_block_s {struct free_block_s* next;size_t size;
} free_block_t;
zero_copy_context_t* zero_copy_context_create(size_t segment_size, size_t max_segments) {if (segment_size < MIN_SEGMENT_SIZE) segment_size = MIN_SEGMENT_SIZE;if (max_segments == 0) max_segments = DEFAULT_MAX_SEGMENTS;zero_copy_context_t* context = calloc(1, sizeof(zero_copy_context_t));if (!context) return NULL;context->segment_size = segment_size;context->max_segments = max_segments;context->block_size = MESSAGE_ALIGNMENT;// 分配段数组context->segments = calloc(max_segments, sizeof(shm_segment_t));if (!context->segments) {free(context);return NULL;}pthread_mutex_init(&context->lock, NULL);// 创建第一个段if (shm_segment_create(context, segment_size) == NULL) {free(context->segments);free(context);return NULL;}return context;
}
void zero_copy_context_destroy(zero_copy_context_t* context) {if (!context) return;pthread_mutex_lock(&context->lock);// 销毁所有段for (size_t i = 0; i < context->segment_count; i++) {shm_segment_t* segment = &context->segments[i];if (segment->address) {if (segment->is_owner) {shm_segment_destroy(segment);} else {shm_segment_detach(segment);}}}pthread_mutex_unlock(&context->lock);pthread_mutex_destroy(&context->lock);free(context->segments);free(context);
}
shm_segment_t* shm_segment_create(zero_copy_context_t* context, size_t size) {if (!context || context->segment_count >= context->max_segments) return NULL;shm_segment_t* segment = &context->segments[context->segment_count];// 生成唯一的keysegment->key = ftok("/tmp", context->segment_count + 1);if (segment->key == -1) {return NULL;}// 创建共享内存段segment->shm_id = shmget(segment->key, size, IPC_CREAT | 0666);if (segment->shm_id == -1) {return NULL;}// 映射到进程地址空间segment->address = shmat(segment->shm_id, NULL, 0);if (segment->address == (void*)-1) {shmctl(segment->shm_id, IPC_RMID, NULL);return NULL;}segment->size = size;segment->is_owner = 1;segment->reference_count = 1;// 初始化空闲列表free_block_t* first_block = (free_block_t*)segment->address;first_block->next = NULL;first_block->size = size - sizeof(free_block_t);if (context->free_list == NULL) {context->free_list = first_block;} else {// 连接到现有空闲列表free_block_t* current = context->free_list;while (current->next) current = current->next;current->next = first_block;}context->segment_count++;context->stats.shared_memory_used += size;return segment;
}
void* zero_copy_allocate(zero_copy_context_t* context, size_t size) {if (!context || size == 0) return NULL;// 计算总大小(包括消息头和对齐)size_t total_size = sizeof(zero_copy_message_t) + size;total_size = (total_size + MESSAGE_ALIGNMENT - 1) & ~(MESSAGE_ALIGNMENT - 1);pthread_mutex_lock(&context->lock);// 在第一fit算法中查找空闲块free_block_t** prev = &context->free_list;free_block_t* current = context->free_list;while (current) {if (current->size >= total_size) {// 找到合适的块void* allocated = (char*)current + sizeof(free_block_t);// 如果剩余空间足够大,分割块if (current->size >= total_size + sizeof(free_block_t) + MESSAGE_ALIGNMENT) {free_block_t* new_free = (free_block_t*)((char*)current + sizeof(free_block_t) + total_size);new_free->size = current->size - total_size - sizeof(free_block_t);new_free->next = current->next;*prev = new_free;} else {// 整个块都被使用*prev = current->next;}pthread_mutex_unlock(&context->lock);// 初始化消息头zero_copy_message_t* message = (zero_copy_message_t*)allocated;memset(message, 0, sizeof(zero_copy_header_t));message->header.data_size = size;context->stats.total_allocations++;context->stats.shared_memory_used += total_size;if (context->stats.shared_memory_used > context->stats.peak_memory_used) {context->stats.peak_memory_used = context->stats.shared_memory_used;}return message->data; // 返回数据部分的指针}prev = ¤t->next;current = current->next;}// 没有找到合适的块,尝试创建新段if (context->segment_count < context->max_segments) {size_t new_segment_size = (total_size > context->segment_size) ? total_size * 2 : context->segment_size;if (shm_segment_create(context, new_segment_size)) {// 重试分配pthread_mutex_unlock(&context->lock);return zero_copy_allocate(context, size);}}pthread_mutex_unlock(&context->lock);return NULL;
}
void zero_copy_free(zero_copy_context_t* context, void* ptr) {if (!context || !ptr) return;// 计算块的起始地址zero_copy_message_t* message = (zero_copy_message_t*)((char*)ptr - sizeof(zero_copy_header_t));size_t block_size = sizeof(zero_copy_message_t) + message->header.data_size;block_size = (block_size + MESSAGE_ALIGNMENT - 1) & ~(MESSAGE_ALIGNMENT - 1);pthread_mutex_lock(&context->lock);// 创建空闲块free_block_t* freed_block = (free_block_t*)((char*)message - sizeof(free_block_t));freed_block->size = block_size;freed_block->next = context->free_list;context->free_list = freed_block;// 尝试合并相邻的空闲块(简化实现)pthread_mutex_unlock(&context->lock);context->stats.total_deallocations++;context->stats.shared_memory_used -= block_size;
}
int ipc_zero_copy_send(ipc_endpoint_t* endpoint, zero_copy_context_t* context,const void* data, size_t size, int priority) {if (!endpoint || !context || !data || size == 0) return -1;// 在共享内存中分配空间void* shared_data = zero_copy_allocate(context, size);if (!shared_data) return -1;// 复制数据到共享内存memcpy(shared_data, data, size);// 获取消息头并设置元数据zero_copy_message_t* message = (zero_copy_message_t*)((char*)shared_data - sizeof(zero_copy_header_t));message->header.sender_pid = getpid();message->header.timestamp = (unsigned int)time(NULL);message->header.priority = priority;message->header.sequence = 0; // 需要从上下文获取message->header.checksum = calculate_checksum(data, size);// 创建控制消息通过普通IPC发送ipc_message_t ctrl_msg = {.mtype = 2, // 控制消息类型.sender_pid = getpid(),.timestamp = message->header.timestamp,.data_len = sizeof(zero_copy_header_t),.priority = priority};// 在控制消息中传递共享内存信息memcpy(ctrl_msg.data, &message->header, sizeof(zero_copy_header_t));// 发送控制消息int result = endpoint->protocol->send(endpoint->protocol, &ctrl_msg);if (result != 0) {// 发送失败,释放共享内存zero_copy_free(context, shared_data);return -1;}return 0;
}
uint16_t calculate_checksum(const void* data, size_t size) {// 简单的校验和计算(实际应用应该使用更强大的算法)const unsigned char* bytes = (const unsigned char*)data;uint16_t sum = 0;for (size_t i = 0; i < size; i++) {sum += bytes[i];}return sum;
}
四、性能优化集成示例
examples/performance_demo.c
#include "../include/ipc_core.h"
#include "../include/memory_pool.h"
#include "../include/batch_operations.h"
#include "../include/zero_copy.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_MESSAGES 1000
#define MESSAGE_SIZE 1024
void performance_test_memory_pool() {printf("=== Memory Pool Performance Test ===\n");message_pool_t* pool = message_pool_create(100);if (!pool) {printf("Failed to create memory pool\n");return;}struct timespec start, end;clock_gettime(CLOCK_MONOTONIC, &start);// 测试单个分配性能for (int i = 0; i < NUM_MESSAGES; i++) {ipc_message_t* msg = message_pool_allocate(pool, 1);if (msg) {msg->sequence = i;message_pool_release(pool, msg);}}clock_gettime(CLOCK_MONOTONIC, &end);double time_taken = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;printf("Single allocation: %d messages in %.6f seconds (%.0f msg/s)\n",NUM_MESSAGES, time_taken, NUM_MESSAGES / time_taken);// 测试批量分配性能ipc_message_t* batch[100];clock_gettime(CLOCK_MONOTONIC, &start);for (int i = 0; i < NUM_MESSAGES / 100; i++) {if (message_pool_allocate_batch(pool, batch, 100) == 0) {message_pool_release_batch(pool, batch, 100);}}clock_gettime(CLOCK_MONOTONIC, &end);time_taken = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;printf("Batch allocation: %d messages in %.6f seconds (%.0f msg/s)\n",NUM_MESSAGES, time_taken, NUM_MESSAGES / time_taken);// 打印统计信息pool_stats_t stats;message_pool_get_stats(pool, &stats);printf("Cache hit rate: %.2f%%\n", (double)stats.cache_hits / stats.total_allocations * 100);message_pool_destroy(pool);
}
int main() {printf("IPC Performance Optimization Demo\n\n");performance_test_memory_pool();printf("\nAll performance tests completed.\n");return 0;
}
这些完整的性能优化实现提供了:
-
高效的内存管理:对象池减少系统调用和内存碎片
-
智能批处理:自适应调整批量大小优化吞吐量
-
零拷贝传输:共享内存避免数据复制
-
线程安全:细粒度锁和快速路径优化
-
详细监控:完整的性能统计和诊断信息
这些优化可以显著提升IPC系统的性能,特别是在高并发和大数据量场景下。
