当前位置: 首页 > news >正文

Kafka4.1.0 队列模式尝鲜

背景

kafka一直以来的消费模型是一个topic下多个partition,每个partition有一个消费者。其中partition的数量决定了消费者的并发度。在KIP-932 的提案中提出了队列模型,在4.0.0版本引入在4.1.0版本为预览版。

使用docker启动

我们使用docker启动一个开启了预览版本的broker。本文使用的native-image

docker run -d  \-p 9092:9092 \--name kafka410 \-e KAFKA_NODE_ID=1 \-e KAFKA_PROCESS_ROLES=broker,controller \-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \-e KAFKA_NUM_PARTITIONS=3 \-e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer,share \-e KAFKA_UNSTABLE_API_VERSIONS_ENABLE=true \-e KAFKA_GROUP_SHARE_ENABLE=true \-e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR=1 \apache/kafka-native:4.1.1-rc1
  • KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer,share 在rebalance协议中支持share协议
  • KAFKA_UNSTABLE_API_VERSIONS_ENABLE 解锁未stable的版本限制
  • KAFKA_GROUP_SHARE_ENABLE 开启共享组消费者模式
  • KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR 共享组(队列)模式使用一个topic来保存消费进度,这个是此topic的副本因子

使用spring-kafka消费

  • spring boot 使用的4.0.0-rc2
  • 并添加spring-kafka
    • implementation 'org.springframework.boot:spring-boot-starter-kafka'
package cc.sofast.practice.kafkagroupconsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultShareConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ShareConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;import java.util.*;/*** @author wxl*/
@Configuration
public class ShareConsumerConfig {private static final Logger log = LoggerFactory.getLogger(ShareConsumerConfig.class);@Beanpublic ShareConsumerFactory<String, String> shareConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return new DefaultShareConsumerFactory<>(props);}@Beanpublic ShareKafkaMessageListenerContainer<String, String> imageProcessingContainer(ShareConsumerFactory<String, String> shareConsumerFactory) {ContainerProperties containerProps = new ContainerProperties("image-processing");containerProps.setGroupId("image-processors");ShareKafkaMessageListenerContainer<String, String> container =new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);container.setConcurrency(10);container.setupMessageListener(new MessageListener<String, String>() {@Overridepublic void onMessage(ConsumerRecord<String, String> record) {System.out.println("Received on " + Thread.currentThread().getName() + ": Key: " + record.key() + " Val: " + record.value());// Implicit ACCEPT when method completes successfully}});return container;}@Beanpublic CommandLineRunner commandLineRunner(KafkaTemplate<String, String> template) {return new CommandLineRunner() {@Overridepublic void run(String... args) throws Exception {new Timer().schedule(new TimerTask() {@Overridepublic void run() {template.send("image-processing", UUID.randomUUID().toString(), "image-2-" + UUID.randomUUID().toString());}}, 3000, 1000);}};}
}

参考

  • https://strimzi.io/blog/2025/08/20/queues-for-kafka/
  • https://oso.sh/blog/kafka-queues-in-apache-kafka-4-0-via-share-groups/
  • https://hub.docker.com/r/apache/kafka-native
  • https://spring.io/blog/2025/10/14/introducing-spring-kafka-share-consumer
  • https://www.instaclustr.com/blog/apache-kafka-4-0-share-groups-what-you-need-to-know-about-queues-for-kafka/
http://www.dtcms.com/a/560580.html

相关文章:

  • transformer记录一(输入步骤讲解)
  • 做生存分析的网站有哪些网站背景怎么弄
  • Tomcat 新手避坑指南:环境配置 + 启动问题 + 乱码解决全流程
  • 整理、分类、总结与介绍Vue前端开发日常常用的第三方库/框架/插件-收藏
  • 第九天~在Arxml中定义一对XCP-PDU用于测量标定
  • Tomcat 配置问题速查表
  • 第九天~AUTOSAR网络管理NM-PDU详解:在Arxml中定义唤醒节点的NM-PDU
  • 在centos 7上配置FIP服务器的详细教程!!!
  • 做网站三网多少钱wordpress 贴吧主题
  • 无锡网站建设营销型诸城公司做网站
  • 【Docker】容器网络探索(二):实战理解 host 网络
  • 《数据结构风云》:二叉树遍历的底层思维>递归与迭代的双重视角
  • Java EE初阶 --多线程2
  • 论文精读(七):结合大语言模型和领域知识库的证券规则规约方法
  • Linux shell sed 命令基础
  • 选 Redis Stream 还是传统 MQ?队列选型全攻略(适用场景、优缺点与实践建议)
  • 【JVM】详解 Java内存模型(JMM)
  • 做网站工作室广告网站建设
  • 小语种网站制作广州网站建设哪里有
  • 广州学做网站上饶网站建设多少钱
  • GO写的http服务,清空cookie
  • 响应式企业网站模板望京网站建设公司
  • 最新聊天记录做图网站ip软件点击百度竞价推广
  • 关于学校网站建设申请报告深圳市网络seo推广价格
  • 公司网站后台怎么上传图片百度西安分公司地址
  • Go语言设计模式:组合模式详解
  • 南昌市住房和城乡建设网站检测网站是否正常
  • 自建网站费用营销推广的主要方法
  • 罗田做网站一个人看的在线观看视频免费下载
  • 网站 首页 栏目 内容wordpress发文章