基于Kafka的延迟队列
实现原理
通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。
生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。
消费者实现
package mainimport ("context""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义每个topic对应的延迟时间(ms)
var topicDelayConfig = map[string]time.Duration{"delay-100ms": 100 * time.Millisecond,"delay-200ms": 200 * time.Millisecond,"delay-500ms": 500 * time.Millisecond,"delay-1000ms": 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依赖,如业务处理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者初始化完成")return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者清理完成")return nil
}// ConsumeClaim 处理分区消息,实现延迟逻辑
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic := claim.Topic()delay, exists := topicDelayConfig[topic]if !exists {logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)// 标记所有消息为已消费,避免重复处理for range claim.Messages() {sess.MarkMessage(msg, "")}return nil}// 按顺序处理消息(假设消息时间有序)for msg := range claim.Messages() {// 检查会话是否已关闭(如重平衡发生)select {case <-sess.Context().Done():logrus.Info("会话已关闭,停止消费")return nildefault:}// 计算需要延迟的时间// 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间produceTime := msg.TimestampprocessTime := produceTime.Add(delay)now := time.Now()// 如果当前时间未到处理时间,计算需要休眠的时间if now.Before(processTime) {sleepDuration := processTime.Sub(now)logrus.Debugf("消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期间监听会话关闭信号,避免阻塞重平衡select {case <-sess.Context().Done():logrus.Info("休眠期间会话关闭,停止消费")return nilcase <-time.After(sleepDuration):// 休眠完成,继续处理}}// 延迟时间已到,处理消息h.processMessage(msg)// 标记消息为已消费sess.MarkMessage(msg, "")}return nil
}// 实际业务处理逻辑
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof("处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 这里添加实际的业务处理代码
}// 初始化消费者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange// 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)config.Consumer.Fetch.Min = 1config.Consumer.Fetch.Default = 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers := []string{"localhost:9092"}groupID := "delay-queue-group"topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}consumer, err := newDelayConsumer(brokers, groupID)if err != nil {logrus.Fatalf("创建消费者失败: %v", err)}defer consumer.Close()handler := &delayConsumerHandler{}ctx := context.Background()// 持续消费for {if err := consumer.Consume(ctx, topics, handler); err != nil {logrus.Errorf("消费出错: %v", err)// 简单重试逻辑time.Sleep(5 * time.Second)}}
}
生产者实现
package mainimport ("errors""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义允许的延迟时长(毫秒)及其对应的Topic
var allowedDelays = map[time.Duration]string{100 * time.Millisecond: "delay-100ms",200 * time.Millisecond: "delay-200ms",500 * time.Millisecond: "delay-500ms",1000 * time.Millisecond: "delay-1000ms",// 可根据需要添加更多允许的延迟时长
}// DelayProducer 延迟消息生产者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 创建延迟消息生产者实例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 3config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &DelayProducer{producer: producer,}, nil
}// SendDelayMessage 发送延迟消息
// 参数:
// - key: 消息键
// - value: 消息内容
// - delay: 延迟时长
// 返回:
// - 消息的分区和偏移量
// - 错误信息(若延迟不合法或发送失败)
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校验延迟时长是否合法topic, ok := allowedDelays[delay]if !ok {return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")}// 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)msg := &sarama.ProducerMessage{Topic: topic,Key: sarama.ByteEncoder(key),Value: sarama.ByteEncoder(value),Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间}// 3. 发送消息partition, offset, err = p.producer.SendMessage(msg)if err != nil {logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)return 0, 0, err}logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",topic, partition, offset, delay)return partition, offset, nil
}// Close 关闭生产者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生产者producer, err := NewDelayProducer([]string{"localhost:9092"})if err != nil {logrus.Fatalf("初始化生产者失败: %v", err)}defer producer.Close()// 发送合法延迟消息_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条延迟消息"),100*time.Millisecond, // 合法延迟)if err != nil {logrus.Error("发送消息失败:", err)}// 尝试发送非法延迟消息(会被拒绝)_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条非法延迟消息"),300*time.Millisecond, // 不允许的延迟)if err != nil {logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误}
}