当前位置: 首页 > news >正文

Linux 消息队列接收与处理线程实现

下面是一个完整的 C 语言实现,创建一个线程来接收消息队列中的数据,缓存到队列中,然后逐个处理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <unistd.h>// 消息结构体
typedef struct {long mtype;             // 消息类型char mtext[1024];      // 消息内容
} Message;// 数据节点结构
typedef struct DataNode {Message message;struct DataNode *next;
} DataNode;// 线程共享数据结构
typedef struct {pthread_mutex_t lock;   // 互斥锁pthread_cond_t cond;    // 条件变量DataNode *head;         // 队列头指针DataNode *tail;         // 队列尾指针int msgq_id;            // 消息队列IDint running;            // 线程运行标志
} ThreadData;// 初始化线程共享数据
int init_thread_data(ThreadData *data, int msgq_id) {data->head = NULL;data->tail = NULL;data->msgq_id = msgq_id;data->running = 1;if (pthread_mutex_init(&data->lock, NULL) {perror("pthread_mutex_init failed");return -1;}if (pthread_cond_init(&data->cond, NULL)) {perror("pthread_cond_init failed");pthread_mutex_destroy(&data->lock);return -1;}return 0;
}// 清理线程共享数据
void cleanup_thread_data(ThreadData *data) {pthread_mutex_lock(&data->lock);// 清空队列DataNode *current = data->head;while (current != NULL) {DataNode *temp = current;current = current->next;free(temp);}data->head = NULL;data->tail = NULL;pthread_mutex_unlock(&data->lock);pthread_mutex_destroy(&data->lock);pthread_cond_destroy(&data->cond);
}// 添加消息到队列
void enqueue_message(ThreadData *data, const Message *msg) {DataNode *node = (DataNode *)malloc(sizeof(DataNode));if (!node) {perror("malloc failed");return;}node->message = *msg;node->next = NULL;pthread_mutex_lock(&data->lock);if (data->tail == NULL) {data->head = node;data->tail = node;} else {data->tail->next = node;data->tail = node;}// 通知有数据到达pthread_cond_signal(&data->cond);pthread_mutex_unlock(&data->lock);
}// 从队列获取消息
int dequeue_message(ThreadData *data, Message *msg) {pthread_mutex_lock(&data->lock);// 等待队列不为空while (data->head == NULL && data->running) {pthread_cond_wait(&data->cond, &data->lock);}if (!data->running) {pthread_mutex_unlock(&data->lock);return -1;}DataNode *node = data->head;*msg = node->message;data->head = node->next;if (data->head == NULL) {data->tail = NULL;}pthread_mutex_unlock(&data->lock);free(node);return 0;
}// 处理消息的函数
void process_message(const Message *msg) {printf("Processing message type %ld: %s\n", msg->mtype, msg->mtext);// 这里添加实际的消息处理逻辑
}// 消息处理线程函数
void *process_thread_func(void *arg) {ThreadData *data = (ThreadData *)arg;Message msg;while (1) {if (dequeue_message(data, &msg) {break; // 线程终止}process_message(&msg);}return NULL;
}// 消息接收线程函数
void *receive_thread_func(void *arg) {ThreadData *data = (ThreadData *)arg;Message msg;while (data->running) {// 接收消息 (阻塞方式)ssize_t len = msgrcv(data->msgq_id, &msg, sizeof(msg.mtext), 0, 0);if (len == -1) {if (errno == EINTR) {continue; // 被信号中断,继续}perror("msgrcv failed");break;}msg.mtext[len] = '\0'; // 确保字符串结束// 将消息加入队列enqueue_message(data, &msg);}// 通知处理线程退出pthread_mutex_lock(&data->lock);pthread_cond_signal(&data->cond);pthread_mutex_unlock(&data->lock);return NULL;
}int main() {// 创建或获取消息队列key_t key = ftok(".", 'a');if (key == -1) {perror("ftok failed");return 1;}int msgq_id = msgget(key, IPC_CREAT | 0666);if (msgq_id == -1) {perror("msgget failed");return 1;}printf("Message queue created with id: %d\n", msgq_id);// 初始化线程共享数据ThreadData thread_data;if (init_thread_data(&thread_data, msgq_id)) {return 1;}// 创建接收线程和处理线程pthread_t receive_thread, process_thread;if (pthread_create(&receive_thread, NULL, receive_thread_func, &thread_data)) {perror("pthread_create receive_thread failed");cleanup_thread_data(&thread_data);return 1;}if (pthread_create(&process_thread, NULL, process_thread_func, &thread_data)) {perror("pthread_create process_thread failed");thread_data.running = 0;pthread_join(receive_thread, NULL);cleanup_thread_data(&thread_data);return 1;}// 主线程等待用户输入退出printf("Press Enter to exit...\n");getchar();// 设置停止标志thread_data.running = 0;// 唤醒可能正在等待的线程pthread_mutex_lock(&thread_data.lock);pthread_cond_signal(&thread_data.cond);pthread_mutex_unlock(&thread_data.lock);// 等待线程结束pthread_join(receive_thread, NULL);pthread_join(process_thread, NULL);// 清理资源cleanup_thread_data(&thread_data);// 删除消息队列 (可选)if (msgctl(msgq_id, IPC_RMID, NULL) == -1) {perror("msgctl IPC_RMID failed");}return 0;
}

代码说明
1. 消息队列结构:
◦ 使用标准的 System V 消息队列
◦ 消息类型为 Message 结构体
2. 线程安全队列:
◦ 使用链表实现 FIFO 队列
◦ 使用互斥锁 (pthread_mutex_t) 保护共享数据
◦ 使用条件变量 (pthread_cond_t) 实现生产者-消费者模型
3. 线程设计:
◦ 接收线程: 从消息队列接收消息并放入处理队列
◦ 处理线程: 从处理队列取出消息并处理
4. 同步机制:
◦ 当队列为空时,处理线程等待条件变量
◦ 当有新消息到达时,接收线程通知条件变量
5. 优雅退出:
◦ 设置 running 标志位控制线程退出
◦ 确保资源正确释放
编译与运行
编译命令:
gcc -o msgq_processor msgq_processor.c -lpthread
运行:
./msgq_processor
扩展建议
1. 可以添加更复杂的错误处理机制
2. 可以增加队列最大长度限制
3. 可以添加统计功能,如处理消息数量统计
4. 可以根据需要修改消息处理函数 process_message() 的实现
这个实现提供了基本的框架,你可以根据实际需求进行修改和扩展。

http://www.dtcms.com/a/278863.html

相关文章:

  • 【HTTP版本演变】
  • 考完数通,能转云计算/安全方向吗?转型路径与拓展路线分析
  • Elasticsearch9.x核心架构概述
  • Redis7持久化
  • 【postgresql数据库实现表的树结构查询】
  • 项目进度中间节点缺失,如何精细化分解任务
  • MIPI DSI(三) MIPI DSI 物理层和 D-PHY
  • 《大数据技术原理与应用》实验报告三 熟悉HBase常用操作
  • 现代数据平台能力地图:如何构建未来数据平台的核心能力体系
  • 为什么ER-GNSS/MINS-01能重新定义高精度导航?
  • vscode 源码编译
  • 创客匠人:创始人 IP 打造的系统化思维,是知识变现的底层逻辑
  • 【图像处理基石】什么是色盲仿真技术?
  • VUE export import
  • JS基础快速入门(详细版)
  • 【InnoDB磁盘结构3】撤销表空间,Undo日志
  • 力扣 30 天 JavaScript 挑战 第一题笔记
  • 智慧教育平台电子教材下载器:暑期超车,一键获取全版本教材,打造高效学习新体验
  • Git LFS 操作处理Github上传大文件操作记录
  • 终端安全最佳实践
  • sshpass原理详解及自动化运维实践
  • Docker Desktop 挂载本地Win系统配置指南:Redis/MySQL/RabbitMQ持久化与自启设置
  • Kmeams聚类算法详解
  • CSS手写题
  • 精密模具冷却孔内轮廓检测方法探究 —— 激光频率梳 3D 轮廓检测
  • Redis单线程详解
  • H2 与高斯数据库兼容性解决方案:虚拟表与类型处理
  • Ai问答之空间站星等
  • MMKV 存储json list数据(kotlin)
  • Spring Boot 设置滚动日志logback