当前位置: 首页 > news >正文

mq_timedsend系统调用及示例

mq_timedsend函数详解

1. 函数介绍

mq_timedsend函数是Linux系统中用于在指定时间内发送消息到POSIX消息队列的函数。它是mq_send函数的增强版本,支持超时控制。可以把mq_timedsend想象成一个"限时消息发送器",它能够在指定的时间内尝试发送消息,如果超时则返回错误。
这个函数特别适用于需要控制发送等待时间的场景,比如实时系统或需要避免无限期阻塞的应用程序。

使用场景:

  • 实时系统的消息发送
  • 避免无限期阻塞的发送操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理。

2. 函数原型

#include <mqueue.h>
#include <time.h>int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedsend函数的主要功能是在指定的绝对超时时间内发送消息到消息队列。如果队列已满且在超时时间内无法发送,则返回错误。。

4. 参数

  • mqdes: 消息队列描述符

    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息内容指针

    • 类型:const char*
    • 含义:指向要发送的消息内容
  • msg_len: 消息长度

    • 类型:size_t
    • 含义:消息内容的字节数
  • msg_prio: 消息优先级

    • 类型:unsigned int
    • 含义:消息的优先级(0-32767)
  • abs_timeout: 绝对超时时间

    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回0
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内无法发送消息
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:消息大小超过队列限制
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_send(): 发送消息(阻塞)
  • mq_receive(): 接收消息
  • mq_timedreceive(): 限时接收消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedsend使用 - 超时控制发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>// 创建带限制的消息队列
mqd_t create_limited_queue(const char* name) {struct mq_attr attr = {.mq_flags = 0,.mq_maxmsg = 2,      // 很小的队列容量.mq_msgsize = 128,.mq_curmsgs = 0};mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);if (mq == (mqd_t)-1) {perror("创建消息队列失败");return -1;}printf("创建限制队列: %s (容量: %ld)\n", name, attr.mq_maxmsg);return mq;
}// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {perror("获取当前时间失败");return -1;}abs_timeout->tv_sec += seconds;return 0;
}int main() {printf("=== 基础mq_timedsend使用示例 ===\n");const char* queue_name = "/timed_queue";// 创建限制队列mqd_t mq = create_limited_queue(queue_name);if (mq == -1) {exit(EXIT_FAILURE);}// 填满队列printf("1. 填满队列:\n");for (int i = 0; i < 2; i++) {char message[64];snprintf(message, sizeof(message), "填充消息 %d", i + 1);if (mq_send(mq, message, strlen(message), 0) == -1) {perror("发送填充消息失败");} else {printf("发送: %s\n", message);}}// 显示队列状态struct mq_attr attr;if (mq_getattr(mq, &attr) == 0) {printf("队列当前消息数: %ld/%ld\n", attr.mq_curmsgs, attr.mq_maxmsg);}// 演示mq_timedsend超时printf("\n2. 演示mq_timedsend超时:\n");struct timespec abs_timeout;if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时mq_close(mq);mq_unlink(queue_name);exit(EXIT_FAILURE);}char test_message[] = "超时测试消息";printf("尝试发送消息(队列已满,3秒超时):\n");clock_t start_time = clock();int result = mq_timedsend(mq, test_message, strlen(test_message), 0, &abs_timeout);clock_t end_time = clock();double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;if (result == 0) {printf("✓ 消息发送成功\n");} else {if (errno == ETIMEDOUT) {printf("✗ 发送超时 (耗时: %.2f 秒)\n", elapsed_time);} else if (errno == EAGAIN) {printf("✗ 队列满,无法发送: %s\n", strerror(errno));} else {printf("✗ 发送失败: %s\n", strerror(errno));}}// 演示成功的mq_timedsendprintf("\n3. 演示成功的mq_timedsend:\n");// 先接收一条消息,为发送腾出空间char buffer[128];ssize_t bytes_received = mq_receive(mq, buffer, sizeof(buffer), NULL);if (bytes_received > 0) {buffer[bytes_received] = '\0';printf("接收消息为发送腾出空间: %s\n", buffer);}// 现在队列有空间了if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时char success_message[] = "成功发送的消息";printf("发送消息(队列有空间):\n");if (mq_timedsend(mq, success_message, strlen(success_message), 5, &abs_timeout) == 0) {printf("✓ 消息发送成功 (优先级: 5)\n");} else {printf("✗ 发送失败: %s\n", strerror(errno));}}// 演示不同超时时间的效果printf("\n4. 不同超时时间演示:\n");// 立即超时(过去的时间)struct timespec past_time = {0, 0};char immediate_message[] = "立即超时消息";printf("使用过去时间作为超时(立即返回):\n");if (mq_timedsend(mq, immediate_message, strlen(immediate_message), 0, &past_time) == -1) {if (errno == ETIMEDOUT) {printf("✓ 立即超时 (预期行为)\n");} else {printf("✗ 其他错误: %s\n", strerror(errno));}}// 长时间超时if (calculate_absolute_timeout(&abs_timeout, 10) == 0) {  // 10秒超时char long_timeout_message[] = "长超时消息";printf("使用长超时时间:\n");if (mq_timedsend(mq, long_timeout_message, strlen(long_timeout_message), 1, &abs_timeout) == 0) {printf("✓ 长超时发送成功\n");} else {printf("✗ 长超时发送失败: %s\n", strerror(errno));}}// 清理资源printf("\n5. 清理资源:\n");mq_close(mq);mq_unlink(queue_name);printf("队列已清理\n");printf("\n=== 基础mq_timedsend演示完成 ===\n");return 0;
}

示例2:实时系统中的超时消息发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>#define MAX_MESSAGES 10
#define MESSAGE_SIZE 256volatile sig_atomic_t stop_flag = 0;// 信号处理函数
void signal_handler(int sig) {printf("收到信号 %d,准备停止...\n", sig);stop_flag = 1;
}// 创建实时消息队列
mqd_t create_realtime_queue(const char* name) {struct mq_attr attr = {.mq_flags = 0,.mq_maxmsg = 5,.mq_msgsize = MESSAGE_SIZE,.mq_curmsgs = 0};mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);if (mq == (mqd_t)-1) {perror("创建实时队列失败");return -1;}printf("创建实时队列: %s\n", name);return mq;
}// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {perror("获取当前时间失败");return -1;}// 转换毫秒到秒和纳秒long seconds = milliseconds / 1000;long nanoseconds = (milliseconds % 1000) * 1000000;abs_timeout->tv_sec += seconds;abs_timeout->tv_nsec += nanoseconds;// 处理纳秒溢出if (abs_timeout->tv_nsec >= 1000000000) {abs_timeout->tv_sec++;abs_timeout->tv_nsec -= 1000000000;}return 0;
}// 实时消息发送器
void realtime_message_sender(mqd_t mq, const char* sender_name) {printf("实时发送器 %s 启动\n", sender_name);srand(time(NULL));int message_count = 0;while (!stop_flag && message_count < MAX_MESSAGES) {char message[MESSAGE_SIZE];snprintf(message, sizeof(message), "%s: 实时消息 %d", sender_name, message_count + 1);// 随机超时时间(10-100毫秒)int timeout_ms = 10 + rand() % 91;struct timespec abs_timeout;if (calculate_relative_timeout(&abs_timeout, timeout_ms) == -1) {continue;}// 使用mq_timedsend发送消息unsigned int priority = rand() % 10;int result = mq_timedsend(mq, message, strlen(message), priority, &abs_timeout);if (result == 0) {printf("[%s] 发送成功: %s (优先级: %u, 超时: %dms)\n", sender_name, message, priority, timeout_ms);} else {if (errno == ETIMEDOUT) {printf("[%s] 发送超时: %s (超时: %dms)\n", sender_name, message, timeout_ms);} else if (errno == EAGAIN) {printf("[%s] 队列满,发送失败: %s\n", sender_name, message);} else {printf("[%s] 发送错误: %s (%s)\n", sender_name, message, strerror(errno));}}message_count++;usleep(500000);  // 0.5秒间隔}printf("实时发送器 %s 完成\n", sender_name);
}// 消息接收器
void message_receiver(mqd_t mq, const char* receiver_name) {printf("消息接收器 %s 启动\n", receiver_name);char buffer[MESSAGE_SIZE];ssize_t bytes_received;unsigned int priority;int received_count = 0;while (!stop_flag && received_count < MAX_MESSAGES * 2) {struct timespec abs_timeout;if (calculate_relative_timeout(&abs_timeout, 2000) == -1) {  // 2秒超时continue;}bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);if (bytes_received > 0) {buffer[bytes_received] = '\0';printf("[%s] 接收: %s (优先级: %u)\n", receiver_name, buffer, priority);received_count++;} else if (errno == ETIMEDOUT) {printf("[%s] 接收超时\n", receiver_name);} else if (errno == EAGAIN) {printf("[%s] 暂无消息\n", receiver_name);usleep(100000);  // 0.1秒后重试} else {printf("[%s] 接收错误: %s\n", receiver_name, strerror(errno));break;}}printf("消息接收器 %s 完成,接收 %d 条消息\n", receiver_name, received_count);
}int main() {printf("=== 实时系统超时消息发送示例 ===\n");const char* queue_name = "/realtime_queue";// 设置信号处理signal(SIGINT, signal_handler);signal(SIGTERM, signal_handler);// 创建实时队列mqd_t mq = create_realtime_queue(queue_name);if (mq == -1) {exit(EXIT_FAILURE);}// 启动发送器和接收器pid_t sender1 = fork();if (sender1 == 0) {realtime_message_sender(mq, "发送器1");exit(EXIT_SUCCESS);}pid_t sender2 = fork();if (sender2 == 0) {realtime_message_sender(mq, "发送器2");exit(EXIT_SUCCESS);}pid_t receiver = fork();if (receiver == 0) {message_receiver(mq, "接收器");exit(EXIT_SUCCESS);}// 主进程等待一段时间后发送停止信号printf("系统运行中... 按Ctrl+C停止或等待30秒\n");int elapsed = 0;while (elapsed < 30 && !stop_flag) {sleep(1);elapsed++;// 定期显示队列状态if (elapsed % 5 == 0) {struct mq_attr attr;if (mq_getattr(mq, &attr) == 0) {printf("队列状态: %ld/%ld 消息\n", attr.mq_curmsgs, attr.mq_maxmsg);}}}// 发送停止信号stop_flag = 1;printf("发送停止信号...\n");// 等待所有子进程完成waitpid(sender1, NULL, 0);waitpid(sender2, NULL, 0);waitpid(receiver, NULL, 0);// 清理资源mq_close(mq);mq_unlink(queue_name);printf("系统已停止,资源已清理\n");printf("\n=== 实时系统演示完成 ===\n");return 0;
}
http://www.dtcms.com/a/316207.html

相关文章:

  • Lua语言程序设计1:基础知识、数值、字符串与表
  • DDOS攻击和CC攻击对服务器的伤害有哪些?
  • 蘑兔音乐:音乐创作的神奇钥匙​
  • AI产品经理手册(Ch9-11)AI Product Manager‘s Handbook学习笔记
  • Linux系统交叉编译:依赖、构建与实践
  • makefile的使用与双向链表
  • 使用YOLOv8-gpu训练自己的数据集并预测
  • 多传感器融合
  • 2025暑期作业
  • 企业如何用现代数仓架构挖掘新业务盈利点?AllData产品从目标、路径、结果给出答案
  • 分布式文件系统06-分布式中间件弹性扩容与rebalance冲平衡
  • 集成学习与随机森林:从原理到实践指南
  • 解决VScode无法打开本地文件夹及远程连接后无反应的问题
  • Maven和Gradle在构建项目上的区别
  • 范式集团与海博思创成立合资公司,杀入“AI+储能”赛道
  • 机器学习之KNN、贝叶斯与决策树算法
  • 【题解】P3172 [CQOI2015] 选数(倍数莫反做法)
  • 深圳多奥500KG磁力锁(DAIC-MJ-500S)技术解析与产品优势报告,应用到门禁系统坚若磐石!
  • 计算机网络 第2章通信基础(竟成)
  • Pycaita二次开发基础代码解析:参数化模板创建与设计表驱动建模
  • 【Java面试题】注解,异常相关知识
  • Go语言的gRPC教程-错误处理
  • Android AppSearch 深度解析:现代应用搜索架构与实践
  • Elasticsearch向量库
  • 【web应用】前后端分离项目基本框架组成:Vue + Spring Boot 最佳实践指南
  • 深度解析 TCP 三次握手与四次挥手:从原理到 HTTP/HTTPS 的应用
  • 微服务—OpenFeign
  • Spring中七种Propagation类的事务属性详解
  • 研发团队看板协作中的自动化实践:集成CI/CD与任务流转
  • 007TG洞察:高效运营Telegram私域流量:技术挑战与自动化解决方案探索