当前位置: 首页 > news >正文

linux下实现System V消息队列实现任意结构体传输

以下是一个实现,可以发送和接收任意类型的结构体消息,而不仅限于特定的CustomMsg类型:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>// 通用消息结构体模板
// 要求: 所有消息结构体的第一个字段必须是long类型的mtype
#define MSG_HEADER long mtype// 消息队列配置
#define MSG_QUEUE_KEY 0x1234  // 自定义消息队列键值// 错误处理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 创建消息队列
int create_message_queue(key_t key) {int msgid;// 创建消息队列 (IPC_CREAT | IPC_EXCL | 0666)if ((msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666)) == -1) {if (errno == EEXIST) {// 如果已存在,则直接获取msgid = msgget(key, 0666);if (msgid == -1) {ERROR_EXIT("msgget failed to get existing queue");}} else {ERROR_EXIT("msgget failed to create new queue");}}return msgid;
}// 发送任意结构体消息
// msg: 指向包含MSG_HEADER的结构体指针
// msg_size: 结构体总大小
void send_struct_message(int msgid, void *msg, size_t msg_size) {// 计算实际数据大小 (减去mtype的大小)size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, IPC_NOWAIT) == -1) {ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 接收任意结构体消息
// msg: 指向包含MSG_HEADER的结构体指针
// msg_size: 结构体总大小
// msg_type: 期望接收的消息类型
void receive_struct_message(int msgid, void *msg, size_t msg_size, long msg_type) {// 计算实际数据大小 (减去mtype的大小)size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld)\n", bytes + sizeof(long), ((long*)msg)[0]);
}// 删除消息队列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1) {if (errno != EIDRM) { // 忽略已删除的错误perror("msgctl IPC_RMID failed");}}
}// ===================== 使用示例 =====================// 示例消息结构体1
typedef struct {MSG_HEADER;  // 必须作为第一个字段int sensor_id;float temperature;float humidity;unsigned long timestamp;
} SensorData;// 示例消息结构体2
typedef struct {MSG_HEADER;  // 必须作为第一个字段char device_name[16];int state;int error_code;float voltage;float current;
} DeviceStatus;// 示例消息结构体3
typedef struct {MSG_HEADER;  // 必须作为第一个字段short x;short y;short z;unsigned char accuracy;
} MotionData;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父进程 - 发送者// 给接收者时间启动sleep(1);// 发送SensorData消息SensorData sensor_msg = {.mtype = 1,.sensor_id = 101,.temperature = 25.6f,.humidity = 45.7f,.timestamp = 1234567890};send_struct_message(msgid, &sensor_msg, sizeof(SensorData));// 发送DeviceStatus消息DeviceStatus device_msg = {.mtype = 2,.device_name = "Main Controller",.state = 1,.error_code = 0,.voltage = 3.3f,.current = 0.75f};send_struct_message(msgid, &device_msg, sizeof(DeviceStatus));// 发送MotionData消息MotionData motion_msg = {.mtype = 3,.x = 1024,.y = -512,.z = 256,.accuracy = 95};send_struct_message(msgid, &motion_msg, sizeof(MotionData));// 等待接收者处理sleep(1);// 删除消息队列remove_message_queue(msgid);} else { // 子进程 - 接收者// 接收SensorData消息SensorData recv_sensor;receive_struct_message(msgid, &recv_sensor, sizeof(SensorData), 1);printf("SensorData: ID=%d, Temp=%.1f°C, Hum=%.1f%%, Time=%lu\n",recv_sensor.sensor_id, recv_sensor.temperature,recv_sensor.humidity, recv_sensor.timestamp);// 接收DeviceStatus消息DeviceStatus recv_device;receive_struct_message(msgid, &recv_device, sizeof(DeviceStatus), 2);printf("DeviceStatus: Name='%s', State=%d, Error=%d, V=%.2fV, I=%.2fA\n",recv_device.device_name, recv_device.state,recv_device.error_code, recv_device.voltage, recv_device.current);// 接收MotionData消息MotionData recv_motion;receive_struct_message(msgid, &recv_motion, sizeof(MotionData), 3);printf("MotionData: X=%d, Y=%d, Z=%d, Accuracy=%d%%\n",recv_motion.x, recv_motion.y, recv_motion.z, recv_motion.accuracy);}return EXIT_SUCCESS;
}
  1. 消息队列限制

    • 使用msgctl(IPC_STAT)检查队列状态

    • 监控队列使用情况,避免溢出

这个实现提供了高度灵活的消息传递机制,适用于各种嵌入式场景,从简单的传感器数据采集到复杂的设备控制命令,都可以通过定义适当的结构体来实现高效通信。

=========================阻塞和非阻塞接收方式===============================

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <stdbool.h>// 通用消息头要求
#define MSG_HEADER long mtype// 消息队列配置
#define MSG_QUEUE_KEY 0x1234// 错误处理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 创建消息队列
int create_message_queue(key_t key) {int msgid = msgget(key, IPC_CREAT | 0666);if (msgid == -1) {ERROR_EXIT("msgget failed");}return msgid;
}// 发送任意结构体消息(阻塞)
void send_struct_message(int msgid, void *msg, size_t msg_size) {size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, 0) == -1) {  // 阻塞发送ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 阻塞接收任意结构体消息
bool receive_struct_message_blocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {return false;}printf("Received %zd byte message (type %ld) [blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 非阻塞接收任意结构体消息
bool receive_struct_message_nonblocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, IPC_NOWAIT);if (bytes == -1) {if (errno == ENOMSG) {// 没有消息不是错误,只是需要重试return false;}ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld) [non-blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 带超时的接收(混合模式)
bool receive_struct_message_timeout(int msgid, void *msg, size_t msg_size, long msg_type, int timeout_sec) {for (int i = 0; i < timeout_sec * 10; i++) {if (receive_struct_message_nonblocking(msgid, msg, msg_size, msg_type)) {return true;}// 等待100ms后重试usleep(100 * 1000);}return false;
}// 删除消息队列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1 && errno != EIDRM) {perror("msgctl IPC_RMID failed");}
}// ===================== 使用示例 =====================typedef struct {MSG_HEADER;int counter;char data[64];
} TestMessage;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父进程 - 发送者sleep(1);  // 等待接收者准备// 发送3条消息for (int i = 1; i <= 3; i++) {TestMessage msg = {.mtype = 1,.counter = i,.data = "Blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));sleep(1);}// 发送快速连续消息for (int i = 4; i <= 6; i++) {TestMessage msg = {.mtype = 2,.counter = i,.data = "Non-blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));}// 等待接收者处理sleep(2);remove_message_queue(msgid);} else { // 子进程 - 接收者// 1. 阻塞接收演示printf("=== Blocking Receive Test ===\n");for (int i = 0; i < 3; i++) {TestMessage msg;if (receive_struct_message_blocking(msgid, &msg, sizeof(TestMessage), 1)) {printf("Blocking received: counter=%d, data=%s\n", msg.counter, msg.data);}}// 2. 非阻塞接收演示printf("\n=== Non-blocking Receive Test ===\n");int received = 0;while (received < 3) {TestMessage msg;if (receive_struct_message_nonblocking(msgid, &msg, sizeof(TestMessage), 2)) {printf("Non-blocking received: counter=%d, data=%s\n", msg.counter, msg.data);received++;} else {printf("No message available, doing other work...\n");sleep(1); // 模拟其他工作}}// 3. 超时接收演示printf("\n=== Timeout Receive Test ===\n");TestMessage msg;if (receive_struct_message_timeout(msgid, &msg, sizeof(TestMessage), 3, 2)) {printf("Received message within timeout\n");} else {printf("Timeout waiting for message (type 3)\n");}}return EXIT_SUCCESS;
}

http://www.dtcms.com/a/319738.html

相关文章:

  • 具身智能,正在翻越三座大山
  • 计算机毕业设计java疫情开放下的新冠信息共享平台 基于Java的社区疫情防控人员流动管理系统 疫情防控期间社区人员动态管理系统
  • 范数的定义、分类与 MATLAB 应用实践
  • 解决React白板应用中的画布内容丢失问题
  • 3363. 最多可收集的水果数目
  • 关键字 - 第二讲
  • Spring AI + Redis:构建高效AI应用缓存方案
  • 【物联网】基于树莓派的物联网开发【25】——树莓派安装Grafana与Influxdb无缝集成
  • 在 Linux 系统上安装 Docker 的步骤如下(以 Ubuntu/Debian为例)
  • 前缀和
  • 简洁明了的讲明什么是哈希(hash)函数
  • [激光原理与应用-170]:测量仪器 - 能量型 - 光功率计的工作原理与内部功能模块组成
  • 【第7话:相机模型3】自动驾驶IPM图像投影拼接技术详解及代码示例
  • 直连微软,下载速度达18M/S
  • Mysql 单行函数 聚合函数
  • MySQL聚簇索引与非聚簇索引详解
  • 北京企业数据防泄漏指南:5款适合北方市场的安全加密工具评测
  • 【华为机试】332. 重新安排行程
  • MySQL——黑马
  • STM32U5 周期性异常复位问题分析
  • 【MyQSL】库 表—基操
  • 性能优化——GPU的影响
  • [C++20]协程:语义、调度与异步 | Reactor 模式
  • Kafka原理--主题、分区、消费者的关系
  • windows内核研究(内存管理-线性地址的管理)
  • 【PHP 中的 `use` 关键字完全指南】
  • Linux图文理解进程
  • fiddler实用用法,抓包内容导入到apipos
  • 数据库管理系统:入门需要了解的内容
  • Modbus核心参数,调试工具,接线注意事项