当前位置: 首页 > news >正文

基于Kafka的延迟队列

实现原理

通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。

在这里插入图片描述

生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。

消费者实现

package mainimport ("context""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义每个topic对应的延迟时间(ms)
var topicDelayConfig = map[string]time.Duration{"delay-100ms":  100 * time.Millisecond,"delay-200ms":  200 * time.Millisecond,"delay-500ms":  500 * time.Millisecond,"delay-1000ms": 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依赖,如业务处理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者初始化完成")return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者清理完成")return nil
}// ConsumeClaim 处理分区消息,实现延迟逻辑
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic := claim.Topic()delay, exists := topicDelayConfig[topic]if !exists {logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)// 标记所有消息为已消费,避免重复处理for range claim.Messages() {sess.MarkMessage(msg, "")}return nil}// 按顺序处理消息(假设消息时间有序)for msg := range claim.Messages() {// 检查会话是否已关闭(如重平衡发生)select {case <-sess.Context().Done():logrus.Info("会话已关闭,停止消费")return nildefault:}// 计算需要延迟的时间// 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间produceTime := msg.TimestampprocessTime := produceTime.Add(delay)now := time.Now()// 如果当前时间未到处理时间,计算需要休眠的时间if now.Before(processTime) {sleepDuration := processTime.Sub(now)logrus.Debugf("消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期间监听会话关闭信号,避免阻塞重平衡select {case <-sess.Context().Done():logrus.Info("休眠期间会话关闭,停止消费")return nilcase <-time.After(sleepDuration):// 休眠完成,继续处理}}// 延迟时间已到,处理消息h.processMessage(msg)// 标记消息为已消费sess.MarkMessage(msg, "")}return nil
}// 实际业务处理逻辑
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof("处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 这里添加实际的业务处理代码
}// 初始化消费者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange// 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)config.Consumer.Fetch.Min = 1config.Consumer.Fetch.Default = 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers := []string{"localhost:9092"}groupID := "delay-queue-group"topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}consumer, err := newDelayConsumer(brokers, groupID)if err != nil {logrus.Fatalf("创建消费者失败: %v", err)}defer consumer.Close()handler := &delayConsumerHandler{}ctx := context.Background()// 持续消费for {if err := consumer.Consume(ctx, topics, handler); err != nil {logrus.Errorf("消费出错: %v", err)// 简单重试逻辑time.Sleep(5 * time.Second)}}
}

生产者实现

package mainimport ("errors""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义允许的延迟时长(毫秒)及其对应的Topic
var allowedDelays = map[time.Duration]string{100 * time.Millisecond:  "delay-100ms",200 * time.Millisecond:  "delay-200ms",500 * time.Millisecond:  "delay-500ms",1000 * time.Millisecond: "delay-1000ms",// 可根据需要添加更多允许的延迟时长
}// DelayProducer 延迟消息生产者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 创建延迟消息生产者实例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 3config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &DelayProducer{producer: producer,}, nil
}// SendDelayMessage 发送延迟消息
// 参数:
//   - key: 消息键
//   - value: 消息内容
//   - delay: 延迟时长
// 返回:
//   - 消息的分区和偏移量
//   - 错误信息(若延迟不合法或发送失败)
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校验延迟时长是否合法topic, ok := allowedDelays[delay]if !ok {return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")}// 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)msg := &sarama.ProducerMessage{Topic:     topic,Key:       sarama.ByteEncoder(key),Value:     sarama.ByteEncoder(value),Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间}// 3. 发送消息partition, offset, err = p.producer.SendMessage(msg)if err != nil {logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)return 0, 0, err}logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",topic, partition, offset, delay)return partition, offset, nil
}// Close 关闭生产者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生产者producer, err := NewDelayProducer([]string{"localhost:9092"})if err != nil {logrus.Fatalf("初始化生产者失败: %v", err)}defer producer.Close()// 发送合法延迟消息_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条延迟消息"),100*time.Millisecond, // 合法延迟)if err != nil {logrus.Error("发送消息失败:", err)}// 尝试发送非法延迟消息(会被拒绝)_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条非法延迟消息"),300*time.Millisecond, // 不允许的延迟)if err != nil {logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误}
}
http://www.dtcms.com/a/353756.html

相关文章:

  • 身份证号校验码算法
  • C++中类继承的意义
  • PMP项目管理知识点-⑮预测型项目概念辨析
  • 【Kafka】项目整合使用案例
  • 瑞芯微开发工具Linux Linux_Upgrade_Tool使用方法(镜像烧录)
  • Python 比较huggingface_hub库的hf_hub_download函数和snapshot_download函数
  • 在 .NET 8.0 中实现 JWT 刷新令牌
  • 密钥管理服务KMS介绍
  • 遗传算法:模拟自然选择的优化智慧
  • 可编辑69页PPT | 某手机品牌主数据治理项目案例
  • 神经网络学习笔记12——高效卷积神经网络架构MobileNet
  • Origin 2024 安装包下载与安装教程
  • 【算法速成课1 | 题解】洛谷P3366 【模板】最小生成树 MST(Prim Kruskal)
  • 深度学习入门:神经网络基础知识
  • YOLO11实战 第006期-基于yolo11-seg的香蕉种植园语义分割实战文档(yolo格式数据免费获取)
  • MDK-5.4.2 集成 Compiler 5 编译器
  • 基于SpringBoot的协同过滤余弦函数的美食推荐系统(爬虫Python)的设计与实现
  • 数据结构:堆(Heap)
  • 生成式AI的引擎室:深入剖析LLM内存管理与调度
  • 【解锁Photonics for AI:系统学习光学神经网络与超表面设计,成就下一代光芯片工程师】
  • python - js的引入方式、注释变量、数据类型、强制转换、自动类型转换、js运算符、分支结构、函数
  • Nginx单端口代理多个前后端服务的完整配置指南
  • 【雅思019】Canceling an appointment
  • 数据结构——算法设计的基本思想(穷举、递归、分治等)
  • 【自用】JavaSE--junit单元测试、反射、注解、动态代理
  • FreeRTOS 常见面试题与核心知识点详解
  • Redis数据持久化——RDB快照和Aof日志追加
  • 8.28 模拟
  • 从易用性的角度来看,哪个ETL平台比较好用?
  • MySQL-数据类型