当前位置: 首页 > news >正文

可信网站值得做吗网站中怎么做下载链接

可信网站值得做吗,网站中怎么做下载链接,学平面设计去哪个机构,wordpress 显示文章固定链接Redis Stream 消息队列详解及 PHP 实现 Redis Stream 作为消息队列的基本概念 Redis Stream 是 Redis 5.0 引入的一种数据结构,专门用于实现高性能消息队列系统。它提供了以下核心特性: 消息持久化:消息被持久保存在内存中消费者组&#x…

Redis Stream 消息队列详解及 PHP 实现

Redis Stream 作为消息队列的基本概念

Redis Stream 是 Redis 5.0 引入的一种数据结构,专门用于实现高性能消息队列系统。它提供了以下核心特性:

  1. 消息持久化:消息被持久保存在内存中
  2. 消费者组:支持多个消费者组同时消费消息
  3. 消息确认机制:确保消息被正确处理
  4. 消息回溯:支持按时间范围查询历史消息
  5. 高性能:支持每秒数十万条消息的读写

关键方法解析

1. XADD - 添加消息到流

XADD stream_name * key1 value1 key2 value2 ...
  • 向指定的流添加新消息
  • * 表示自动生成消息ID(格式为 <毫秒时间戳>-<序列号>
  • 消息以键值对形式存储

2. XGROUP - 消费者组管理

XGROUP CREATE stream_name group_name id [MKSTREAM]
  • 创建消费者组
  • id 参数指定起始位置:
    • $:从新消息开始消费
    • 0:从流开头消费所有消息
  • MKSTREAM:如果流不存在则自动创建

3. XREADGROUP - 消费者组读取消息

XREADGROUP GROUP group_name consumer_name COUNT n STREAMS stream_name >
  • 消费者组中的消费者读取消息
  • >:表示只读取未分配给其他消费者的新消息
  • COUNT n:指定一次读取的最大消息数

4. XACK - 确认消息处理完成

XACK stream_name group_name message_id
  • 消费者处理完消息后发送确认
  • 被确认的消息会从消费者组的待处理列表中移除
  • 确保消息不会被重复处理

PHP 实现 Redis Stream 消息队列

安装依赖

composer require predis/predis

完整实现代码

<?php
require 'vendor/autoload.php';use Predis\Client;class RedisStreamQueue {private $redis;private $streamName;private $groupName;public function __construct($streamName, $groupName) {$this->redis = new Client(['scheme' => 'tcp','host'   => '127.0.0.1','port'   => 6379,]);$this->streamName = $streamName;$this->groupName = $groupName;// 确保消费者组存在$this->createConsumerGroup();}private function createConsumerGroup() {try {$this->redis->xgroup('CREATE', $this->streamName, $this->groupName, '0', 'MKSTREAM');} catch (\Exception $e) {// 消费者组可能已存在,忽略错误}}// 生产者:添加消息public function produce($message) {$messageId = $this->redis->xadd($this->streamName, '*', ['data' => json_encode($message),'timestamp' => microtime(true)]);echo "Produced message ID: $messageId\n";return $messageId;}// 消费者:处理消息public function consume($consumerName, $timeout = 5000) {// 读取消息(阻塞方式)$messages = $this->redis->xreadgroup('GROUP', $this->groupName, $consumerName,'COUNT', 1,'BLOCK', $timeout,'STREAMS', $this->streamName, '>');if (!$messages) {return null;}// 解析消息$stream = $messages[0];$messageId = $stream[1][0][0];$messageData = json_decode($stream[1][0][1]['data'], true);echo "Consumer '$consumerName' received message ID: $messageId\n";return ['id' => $messageId,'data' => $messageData,'ack' => function() use ($messageId) {$this->acknowledge($messageId);}];}// 确认消息处理完成public function acknowledge($messageId) {$this->redis->xack($this->streamName, $this->groupName, [$messageId]);echo "Acknowledged message ID: $messageId\n";}// 查看待处理消息public function pendingMessages() {return $this->redis->xpending($this->streamName, $this->groupName);}
}// 使用示例
$queue = new RedisStreamQueue('order_queue', 'order_processing_group');// 生产者添加消息
if ($argv[1] === 'producer') {$orderId = uniqid('order_');$orderData = ['id' => $orderId,'product' => 'Laptop','quantity' => 1,'price' => 999.99,'customer' => 'john.doe@example.com'];$messageId = $queue->produce($orderData);echo "Produced order: $orderId\n";// 消费者处理消息
} elseif ($argv[1] === 'consumer') {$consumerName = isset($argv[2]) ? $argv[2] : 'consumer_1';echo "Consumer '$consumerName' started. Waiting for messages...\n";while (true) {$message = $queue->consume($consumerName);if ($message) {// 模拟消息处理echo "Processing order: {$message['data']['id']}\n";sleep(rand(1, 3)); // 模拟处理时间// 确认消息处理完成$message['ack']();echo "Order processed: {$message['data']['id']}\n\n";}}
} else {echo "Usage:\n";echo "  php redis_queue.php producer\n";echo "  php redis_queue.php consumer [consumer_name]\n";
}

使用说明

1. 启动生产者

php redis_queue.php producer
  • 每次执行会生成一个模拟订单消息
  • 输出示例:
    Produced message ID: 1716733794340-0
    Produced order: order_664d4d622c7e5
    

2. 启动消费者

php redis_queue.php consumer consumer_1
  • 消费者会持续监听并处理消息
  • 输出示例:
    Consumer 'consumer_1' started. Waiting for messages...
    Consumer 'consumer_1' received message ID: 1716733794340-0
    Processing order: order_664d4d622c7e5
    Acknowledged message ID: 1716733794340-0
    Order processed: order_664d4d622c7e5
    

3. 关键功能说明

  1. 消息生产

    • 使用 XADD 命令添加消息
    • 消息内容以 JSON 格式存储
  2. 消息消费

    • 使用 XREADGROUP 从消费者组读取消息
    • 阻塞方式等待消息(默认5秒超时)
  3. 消息确认

    • 处理完成后使用 XACK 确认消息
    • 确保消息不会重复处理
  4. 消费者组

    • 支持多个消费者同时处理消息
    • 每个消费者处理不同的消息
  5. 容错机制

    • 未确认的消息会被重新分配给其他消费者
    • 使用 XPENDING 可以查看待处理消息

实际应用场景

  1. 订单处理系统:如示例所示,处理电商订单
  2. 异步任务队列:处理耗时任务(邮件发送、图片处理)
  3. 事件驱动架构:微服务间的事件通知
  4. 实时数据处理:日志收集与分析
  5. 消息广播:向多个消费者组广播消息

Redis Stream 提供了一种轻量级但功能强大的消息队列解决方案,特别适合需要高性能和简单部署的场景。

http://www.dtcms.com/a/609891.html

相关文章:

  • 在 UniApp 中为小程序实现视频播放记录功能
  • 嗑一下Vue3 生态新插件
  • 31、【Ubuntu】【远程开发】内网穿透:反向隧道建立(三)
  • ubuntu20.04下使用D435i实时运行ORB-SLAM3
  • 网站建设哪便宜wordpress建手机版6
  • 东莞如何搭建网站建设wordpress视频压缩
  • Rust 宏:深入理解与高效使用
  • 基于异质专家协同一致性学习的弱监督红外 - 可见光行人重识别
  • 挂载配置文件以Docker启动Redis服务
  • 网站被墙怎么做跳转深圳龙岗个人网站建设
  • 标准输入输出stdio和JSON-RPC
  • 免费seo网站推荐一下软件手机网站建立教程
  • 有哪些网站可以用常州小程序开发报价
  • Python自动化浏览器操作与定时任务实战指南
  • web中国民族文化展示网站4页面
  • 【剑斩OFFER】算法的暴力美学——【模板】前缀和
  • php网站建设考试新品发布会的作用
  • 视频模板网站推荐建筑装饰网站模板
  • PyCharm 软件关联 GitHub 账户
  • 中东核心支付方式
  • 2025数维杯秋季赛赛中陪跑助攻进行中
  • 人工智能基础知识笔记二十:构建一个简单的Agent+MCP Server
  • 零基础入门C语言之贪吃蛇的实现
  • Origin将2D普通的XPS曲线图升级为三维XPS瀑布图
  • 【C++】哈希表算法习题
  • 上传网站页面打不开怎么办莆田网站制作企业
  • Kotlin协程Flow流buffer缓冲批量任务或数据,条件筛选任务或数据
  • BuildingAI 控制台智能体菜单和页面功能PRD
  • 球机与云台摄像机的差异解析
  • Opencv(十二):图像矫正