当前位置: 首页 > news >正文

mq_timedreceive系统调用及示例

mq_timedreceive函数详解

1. 函数介绍

mq_timedreceive函数是Linux系统中用于在指定时间内从POSIX消息队列接收消息的函数。它是mq_receive函数的增强版本,支持超时控制。可以把mq_timedreceive想象成一个"限时消息接收器",它能够在指定的时间内尝试接收消息,如果超时则返回错误。
这个函数特别适用于需要控制接收等待时间的场景,比如实时系统、服务器应用或需要避免无限期阻塞的程序。

使用场景:

  • 实时系统的消息接收
  • 服务器程序的请求处理
  • 避免无限期阻塞的接收操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理

2. 函数原型

#include <mqueue.h>
#include <time.h>ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedreceive函数的主要功能是在指定的绝对超时时间内从消息队列接收消息。如果队列为空且在超时时间内没有消息到达,则返回错误。

4. 参数

  • mqdes: 消息队列描述符

    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息缓冲区指针

    • 类型:char*
    • 含义:指向存储接收消息的缓冲区
  • msg_len: 缓冲区大小

    • 类型:size_t
    • 含义:消息缓冲区的大小(字节数)
  • msg_prio: 消息优先级指针

    • 类型:unsigned int*
    • 含义:指向存储消息优先级的变量(可为NULL)
  • abs_timeout: 绝对超时时间

    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回接收到的消息字节数
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内没有消息可接收
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:缓冲区太小
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_receive(): 接收消息(阻塞)
  • mq_send(): 发送消息
  • mq_timedsend(): 限时发送消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedreceive使用 - 超时控制接收

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>// 创建消息队列
mqd_t create_test_queue(const char* name) {struct mq_attr attr = {.mq_flags = 0,.mq_maxmsg = 10,.mq_msgsize = 256,.mq_curmsgs = 0};mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);if (mq == (mqd_t)-1) {perror("创建消息队列失败");return -1;}printf("创建测试队列: %s\n", name);return mq;
}// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {perror("获取当前时间失败");return -1;}abs_timeout->tv_sec += seconds;return 0;
}int main() {printf("=== 基础mq_timedreceive使用示例 ===\n");const char* queue_name = "/receive_test_queue";// 创建测试队列mqd_t mq = create_test_queue(queue_name);if (mq == -1) {exit(EXIT_FAILURE);}// 演示mq_timedreceive超时(空队列)printf("1. 演示空队列超时接收:\n");struct timespec abs_timeout;if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时mq_close(mq);mq_unlink(queue_name);exit(EXIT_FAILURE);}char buffer[256];unsigned int priority;printf("从空队列接收消息(3秒超时):\n");clock_t start_time = clock();ssize_t result = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);clock_t end_time = clock();double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;if (result == -1) {if (errno == ETIMEDOUT) {printf("✗ 接收超时 (耗时: %.2f 秒)\n", elapsed_time);} else if (errno == EAGAIN) {printf("✗ 暂无消息: %s\n", strerror(errno));} else {printf("✗ 接收失败: %s\n", strerror(errno));}} else {buffer[result] = '\0';printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);}// 发送一些测试消息printf("\n2. 发送测试消息:\n");const char* messages[] = {"第一条测试消息","第二条测试消息","第三条测试消息"};for (int i = 0; i < 3; i++) {unsigned int priorities[] = {1, 5, 3};if (mq_send(mq, messages[i], strlen(messages[i]), priorities[i]) == -1) {perror("发送消息失败");} else {printf("发送: %s (优先级: %u)\n", messages[i], priorities[i]);}}// 演示成功的mq_timedreceiveprintf("\n3. 演示成功接收消息:\n");if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时printf("接收消息(队列有消息):\n");// 接收所有消息while (1) {ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);if (bytes_received > 0) {buffer[bytes_received] = '\0';printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);} else {if (errno == ETIMEDOUT || errno == EAGAIN) {printf("无更多消息可接收\n");break;} else {printf("接收失败: %s\n", strerror(errno));break;}}}}// 演示缓冲区大小处理printf("\n4. 缓冲区大小处理演示:\n");// 发送一条长消息char long_message[200];memset(long_message, 'A', sizeof(long_message) - 1);long_message[sizeof(long_message) - 1] = '\0';if (mq_send(mq, long_message, strlen(long_message), 0) == 0) {printf("发送长消息成功\n");// 使用过小的缓冲区接收char small_buffer[50];if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {ssize_t bytes_received = mq_timedreceive(mq, small_buffer, sizeof(small_buffer), NULL, &abs_timeout);if (bytes_received == -1) {if (errno == EMSGSIZE) {printf("✗ 缓冲区太小 (预期错误)\n");} else {printf("✗ 其他错误: %s\n", strerror(errno));}} else {small_buffer[bytes_received] = '\0';printf("✓ 接收到截断消息: %s (%zd 字节)\n", small_buffer, bytes_received);}}// 使用足够大的缓冲区接收char large_buffer[256];if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {ssize_t bytes_received = mq_timedreceive(mq, large_buffer, sizeof(large_buffer), NULL, &abs_timeout);if (bytes_received > 0) {large_buffer[bytes_received] = '\0';printf("✓ 接收到完整消息 (%zd 字节)\n", bytes_received);}}}// 演示优先级接收printf("\n5. 优先级接收演示:\n");// 发送不同优先级的消息struct {const char* message;unsigned int priority;} priority_messages[] = {{"低优先级消息", 1},{"中优先级消息", 5},{"高优先级消息", 10},{"最高优先级消息", 15}};for (int i = 0; i < 4; i++) {if (mq_send(mq, priority_messages[i].message, strlen(priority_messages[i].message), priority_messages[i].priority) == 0) {printf("发送: %s (优先级: %u)\n", priority_messages[i].message, priority_messages[i].priority);}}// 接收消息(应该按优先级顺序接收)printf("按优先级顺序接收消息:\n");if (calculate_absolute_timeout(&abs_timeout, 3) == 0) {for (int i = 0; i < 4; i++) {ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);if (bytes_received > 0) {buffer[bytes_received] = '\0';printf("接收: %s (优先级: %u)\n", buffer, priority);}}}// 清理资源printf("\n6. 清理资源:\n");mq_close(mq);mq_unlink(queue_name);printf("队列已清理\n");printf("\n=== 基础mq_timedreceive演示完成 ===\n");return 0;
}

示例2:服务器应用中的超时消息处理

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>#define MAX_REQUESTS 100
#define REQUEST_SIZE 512volatile sig_atomic_t server_running = 1;// 服务器请求结构
typedef struct {char client_id[32];char request_data[256];time_t timestamp;int request_id;
} server_request_t;// 服务器响应结构
typedef struct {int request_id;char response_data[256];int status;time_t timestamp;
} server_response_t;// 信号处理函数
void signal_handler(int sig) {printf("服务器收到停止信号 %d\n", sig);server_running = 0;
}// 创建服务器队列
mqd_t create_server_queues(const char* request_queue, const char* response_queue) {struct mq_attr request_attr = {.mq_flags = 0,.mq_maxmsg = 20,.mq_msgsize = sizeof(server_request_t),.mq_curmsgs = 0};struct mq_attr response_attr = {.mq_flags = 0,.mq_maxmsg = 20,.mq_msgsize = sizeof(server_response_t),.mq_curmsgs = 0};// 创建请求队列mqd_t req_mq = mq_open(request_queue, O_CREAT | O_RDONLY, 0644, &request_attr);if (req_mq == (mqd_t)-1) {perror("创建请求队列失败");return -1;}// 创建响应队列mqd_t resp_mq = mq_open(response_queue, O_CREAT | O_WRONLY, 0644, &response_attr);if (resp_mq == (mqd_t)-1) {perror("创建响应队列失败");mq_close(req_mq);return -1;}printf("服务器队列创建成功:\n");printf("  请求队列: %s\n", request_queue);printf("  响应队列: %s\n", response_queue);return req_mq;  // 返回请求队列描述符
}// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {perror("获取当前时间失败");return -1;}long seconds = milliseconds / 1000;long nanoseconds = (milliseconds % 1000) * 1000000;abs_timeout->tv_sec += seconds;abs_timeout->tv_nsec += nanoseconds;if (abs_timeout->tv_nsec >= 1000000000) {abs_timeout->tv_sec++;abs_timeout->tv_nsec -= 1000000000;}return 0;
}// 处理服务器请求
void process_server_requests(mqd_t req_mq, mqd_t resp_mq) {printf("服务器开始处理请求...\n");int processed_requests = 0;time_t last_status_time = time(NULL);while (server_running) {struct timespec abs_timeout;if (calculate_relative_timeout(&abs_timeout, 1000) == -1) {  // 1秒超时continue;}server_request_t request;unsigned int priority;ssize_t bytes_received = mq_timedreceive(req_mq, (char*)&request, sizeof(request), &priority, &abs_timeout);if (bytes_received > 0) {// 处理请求printf("处理请求 #%d 来自客户端 %s\n", request.request_id, request.client_id);// 模拟处理时间usleep(100000);  // 0.1秒// 构造响应server_response_t response;response.request_id = request.request_id;snprintf(response.response_data, sizeof(response.response_data), "请求 #%d 已处理完成", request.request_id);response.status = 200;response.timestamp = time(NULL);// 发送响应if (mq_send(resp_mq, (char*)&response, sizeof(response), priority) == 0) {printf("响应已发送: 请求 #%d\n", request.request_id);processed_requests++;} else {printf("发送响应失败: %s\n", strerror(errno));}} else if (errno == ETIMEDOUT || errno == EAGAIN) {// 超时或无消息,继续循环} else {printf("接收请求失败: %s\n", strerror(errno));if (errno != EINTR) {break;}}// 定期显示状态time_t current_time = time(NULL);if (current_time - last_status_time >= 5) {printf("服务器状态: 已处理 %d 个请求\n", processed_requests);last_status_time = current_time;}}printf("服务器停止,总共处理 %d 个请求\n", processed_requests);
}// 客户端模拟器
void client_simulator(const char* request_queue, const char* response_queue, int client_id) {printf("客户端 %d 启动\n", client_id);// 打开队列mqd_t req_mq = mq_open(request_queue, O_WRONLY);mqd_t resp_mq = mq_open(response_queue, O_RDONLY);if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {perror("客户端打开队列失败");if (req_mq != (mqd_t)-1) mq_close(req_mq);if (resp_mq != (mqd_t)-1) mq_close(resp_mq);exit(EXIT_FAILURE);}srand(time(NULL) + client_id);// 发送请求for (int i = 0; i < 5; i++) {server_request_t request;snprintf(request.client_id, sizeof(request.client_id), "Client_%d", client_id);snprintf(request.request_data, sizeof(request.request_data), "请求数据_%d", i + 1);request.timestamp = time(NULL);request.request_id = client_id * 100 + i + 1;unsigned int priority = rand() % 10;if (mq_send(req_mq, (char*)&request, sizeof(request), priority) == 0) {printf("客户端 %d 发送请求 #%d\n", client_id, request.request_id);} else {printf("客户端 %d 发送请求失败: %s\n", client_id, strerror(errno));continue;}// 等待响应struct timespec abs_timeout;if (calculate_relative_timeout(&abs_timeout, 3000) == 0) {  // 3秒超时server_response_t response;ssize_t bytes_received = mq_timedreceive(resp_mq, (char*)&response, sizeof(response), NULL, &abs_timeout);if (bytes_received > 0) {printf("客户端 %d 收到响应: %s (状态: %d)\n", client_id, response.response_data, response.status);} else if (errno == ETIMEDOUT) {printf("客户端 %d 等待响应超时\n", client_id);} else {printf("客户端 %d 接收响应失败: %s\n", client_id, strerror(errno));}}sleep(1);  // 客户端间隔}mq_close(req_mq);mq_close(resp_mq);printf("客户端 %d 完成\n", client_id);
}int main() {printf("=== 服务器应用超时消息处理示例 ===\n");const char* request_queue = "/server_requests";const char* response_queue = "/server_responses";// 设置信号处理signal(SIGINT, signal_handler);signal(SIGTERM, signal_handler);// 启动服务器进程pid_t server_pid = fork();if (server_pid == 0) {// 服务器进程mqd_t req_mq = mq_open(request_queue, O_RDONLY);mqd_t resp_mq = mq_open(response_queue, O_WRONLY);if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {perror("服务器打开队列失败");exit(EXIT_FAILURE);}process_server_requests(req_mq, resp_mq);mq_close(req_mq);mq_close(resp_mq);exit(EXIT_SUCCESS);}// 等待服务器启动sleep(1);// 启动多个客户端进程pid_t clients[3];for (int i = 0; i < 3; i++) {clients[i] = fork();if (clients[i] == 0) {client_simulator(request_queue, response_queue, i + 1);exit(EXIT_SUCCESS);}}// 等待客户端完成for (int i = 0; i < 3; i++) {waitpid(clients[i], NULL, 0);}// 停止服务器server_running = 0;sleep(2);waitpid(server_pid, NULL, 0);// 清理队列mq_unlink(request_queue);mq_unlink(response_queue);printf("\n=== 服务器应用演示完成 ===\n");return 0;
}

编译和运行

# 编译示例(需要链接实时库)
gcc -o mq_unlink_example1 mq_unlink_example1.c -lrt
gcc -o mq_timedsend_example1 mq_timedsend_example1.c -lrt
gcc -o mq_timedsend_example2 mq_timedsend_example2.c -lrt
gcc -o mq_timedreceive_example1 mq_timedreceive_example1.c -lrt
gcc -o mq_timedreceive_example2 mq_timedreceive_example2.c -lrt# 运行示例
./mq_unlink_example1
./mq_timedsend_example1
./mq_timedsend_example2
./mq_timedreceive_example1
./mq_timedreceive_example2

重要注意事项

  1. 权限要求: 需要适当的文件系统权限来创建和访问消息队列
  2. 名称规范: 消息队列名称必须以’/'开头
  3. 超时时间: 使用绝对时间而非相对时间
  4. 资源管理: 必须正确关闭队列描述符和删除队列
  5. 错误处理: 必须检查返回值并处理各种错误情况
  6. 线程安全: 消息队列操作是线程安全的
  7. 系统限制: 受系统消息队列数量和大小限制

最佳实践

1. 资源清理: 及时关闭队列描述符和删除不需要的队列
2. 超时设置: 合理设置超时时间以避免无限期等待
3. 错误处理: 完善的错误处理和恢复机制
4. 优先级使用: 合理使用消息优先级
5. 缓冲区管理: 确保缓冲区大小足够
6. 信号处理: 正确处理信号中断
7. 性能监控: 监控队列性能和系统资源使用

通过这些示例,你可以理解POSIX消息队列相关函数在进程间通信方面的强大功能,它们为Linux系统提供了高效、可靠的IPC机制,特别适用于实时系统、服务器应用和分布式系统。

http://www.dtcms.com/a/316353.html

相关文章:

  • 工业设备远程监控的 “颠覆性突破”:边缘计算网关让千里之外如在眼前
  • 【图像算法 - 09】基于深度学习的烟雾检测:从算法原理到工程实现,完整实战指南
  • 16核32G硬件服务器租用需要多少钱
  • 【Redis初阶】------单线程模型
  • Next.js SSR 实战:构建高性能新闻网站
  • C++中的泛型算法(三)
  • 智慧城市SaaS平台|市容环卫管理系统
  • 【PHP】对数据库操作:获取数据表,导出数据结构,根据条件生成SQL语句,根据条件导出SQL文件
  • nordic通过j-link rtt viewer打印日志
  • Unknown initial character set index ‘255’,Kettle连接MySQL数据库常见错误及解决方案大全
  • 心念之球:在意识的天空下
  • Gemini CLI最近更新
  • GitLab:一站式 DevOps 平台的全方位解析
  • 笔记学习杂记
  • fastgpt本地运行起来的 服务配置
  • iptables 里INPUT、OUTPUT、FORWARD 三个链(Chain)详解
  • 编程算法:技术创新与业务增长的核心引擎
  • 如何在虚拟机(Linux)安装Qt5.15.2
  • STM32 外设驱动模块一:LED 模块
  • 第13届蓝桥杯Scratch_选拔赛_初级组_真题2021年10月23日
  • 基于MATLAB实现的频域模态参数识别方法
  • SpringAI:AI基本概念
  • 基于ARM+FPGA多通道超声信号采集与传输系统设计
  • PCIe Base Specification解析(六)
  • 五、逐波限流保护电路-硬件部分
  • 从零搭建Cloud Alibaba (下) Sentinel篇
  • VUE-第二季-02
  • Sentinel全面实战指南
  • 制作一款打飞机游戏85:Hyper模式
  • JavaScript:Proxy 代理