当前位置: 首页 > news >正文

Kafka 4.0 五大 API 选型指南、依赖坐标、上手示例与最佳实践

一、怎么选:五大 API 的“场景分工”

  • Producer API:把事件写进某个 Topic。——“我产生日志、订单、埋点、传感器数据,要写进 Kafka。”
  • Consumer API:从 Topic 拉取事件并消费处理。——“我做风控、告警、入库、实时特征,订阅并处理。”
  • Streams API:把输入 Topic 的数据变换输出 Topic(过滤、聚合、窗口、Join、状态)。——“我做实时计算/微服务,既读又写,还要有状态。”
  • Connect API:以连接器持续同步外部系统 ↔ Kafka。——“我想把 MySQL/对象存储/ES 等批量接入/导出,少写代码甚至零代码。”
  • Admin API:管理与巡检主题、Broker、ACL 等对象。——“我需要创建/修改 Topic、查状态、运维脚本化。”

Kafka 的功能通过与语言无关的协议提供,多语言客户端很多;但仅 Java 客户端由主项目维护,其余为独立开源实现。

二、依赖坐标(Maven)

统一用 4.0.0。Gradle 可自行换写法。

Producer / Consumer / Admin(同一坐标):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency>

Streams(Java/Scala 通用核心库):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>4.0.0</version>
</dependency>

Streams(Scala 2.13 可选 DSL 封装):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams-scala_2.13</artifactId><version>4.0.0</version>
</dependency>

三、Producer API:最小可用写入

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class DemoProducer {public static void main(String[] args) {Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 生产建议p.put(ProducerConfig.ACKS_CONFIG, "all");               // 强一致写入p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等// Kafka 4.0 默认 linger.ms=5,批量更高效try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {ProducerRecord<String,String> rec = new ProducerRecord<>("quickstart-events", "key1", "hello kafka");producer.send(rec, (md, ex) -> {if (ex != null) ex.printStackTrace();else System.out.printf("OK topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());});producer.flush();}}
}

要点

  • acks=all + 幂等(enable.idempotence=true)是可靠写的默认组合。
  • 4.0 默认 linger.ms=5,通常更高吞吐且延迟不劣

四、Consumer API:订阅与拉取

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;public class DemoConsumer {public static void main(String[] args) {Properties p = new Properties();p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新组从头开始try (KafkaConsumer<String, String> c = new KafkaConsumer<>(p)) {c.subscribe(List.of("quickstart-events"));while (true) {// 4.0 用 poll(Duration),不再使用 poll(long)ConsumerRecords<String,String> recs = c.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> r : recs) {System.out.printf("key=%s value=%s offset=%d%n", r.key(), r.value(), r.offset());}// 手动提交示例c.commitAsync();}}}
}

要点

  • 4.0 不再提供 poll(long),请使用 poll(Duration)
  • 消费组并行度 = 分区数;按需扩展分区或消费者实例。

五、Streams API:把 Topic 当作“输入表/输出表”

一个最小单词计数拓扑(输入 input-topic → 输出 output-topic):

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;public class WordCountApp {public static void main(String[] args) {Properties p = new Properties();p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<String,String> text = builder.stream("input-topic");KTable<String,Long> counts = text.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+"))).groupBy((k, word) -> word).count();counts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), p);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}

要点

  • Streams 把 Kafka 作为“日志 + 状态快照”的存储,支持窗口、聚合、Join、事务(恰好一次) 等。
  • 4.0 清理了过时 API;常用 DSL 基本不受影响。

六、Connect API:零/少代码联通外部系统

多数场景无需编写自定义连接器:直接使用预构建 Source/Sink 即可(如 JDBC、对象存储、ES、BigQuery…)。

典型启动(Standalone 示例):

bin/connect-standalone.sh \config/connect-standalone.properties \config/connect-file-source.properties \config/connect-file-sink.properties

建议

  • 生产使用 分布式模式(多 worker、弹性容错)。
  • 设计好 DLT(死信主题) 与重试策略,避免“毒数据”卡死。
  • 有强 Schema 需求:结合 Schema Registry 管控演进。

七、Admin API:脚本化管理运维

与 Producer/Consumer 使用同一 kafka-clients 依赖。

创建 Topic(示例)

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;public class CreateTopicDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties p = new Properties();p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (Admin admin = Admin.create(p)) {NewTopic t = new NewTopic("demo-admin-topic", 3, (short)1);admin.createTopics(List.of(t)).all().get();System.out.println("Created.");}}
}

修改配置(4.0 用 incrementalAlterConfigs)

ConfigResource res = new ConfigResource(ConfigResource.Type.TOPIC, "demo-admin-topic");
AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "2"),AlterConfigOp.OpType.SET);
admin.incrementalAlterConfigs(Map.of(res, List.of(op))).all().get();

八、从“单点调用”到“端到端应用”的组合拳

  1. 数据进入:业务服务 → Producer APIorders.created

  2. 实时处理Streams API 读取 orders.created,聚合出用户维度统计 → user.order.stats

  3. 下游分发

    • Consumer API:风控服务订阅 orders.created 实时判定
    • Connect:把 user.order.stats Sink 到 OLAP/湖仓
  4. 运维管理Admin API 创建/巡检 Topic、配额、ACL

九、Kafka 4.0 相关注意事项(API 侧)

  • 仅 Java 客户端为官方维护;其他语言(Go、Python、.NET、C/C++…)社区实现良多,但请留意版本兼容与协议支持。
  • Consumer:使用 poll(Duration)committed(TopicPartition) 等旧签名在 4.0 已移除对应变体(请用集合参数形式)。
  • Admin:使用 incrementalAlterConfigs,不要再用已移除的 alterConfigs
  • Producer:默认 linger.ms=5;幂等与事务结合使用时注意 max.in.flight.requests.per.connection 的设置。

十、落地清单(Checklist)

  • 统一依赖版本到 4.0.0kafka-clients / kafka-streams / kafka-streams-scala_2.13
  • Producer:acks=all、幂等、压测确认批量参数(linger.msbatch.size
  • Consumer:poll(Duration) 改造、消费组并行度与重试/DLT
  • Streams:状态存储大小与容错、窗口语义(事件时间/水位线)
  • Connect:选对 Source/Sink、分布式部署、DLT/重试、监控指标
  • Admin:脚本化创建/变更 Topic 与配额、滚动发布流程与回滚方案
http://www.dtcms.com/a/354766.html

相关文章:

  • AI智能教育新实践:从作业批改到薄弱项定位,构建个性化学习新路径
  • 深入理解QLabel:Qt中的文本与图像显示控件
  • 云计算学习100天-第30天
  • LaunchScreen是啥?AppDelegate是啥?SceneDelegate是啥?ContentView又是啥?Main.storyboard是啥?
  • 生成式 AI 的 “魔法”:以 GPT 为例,拆解大语言模型(LLM)的训练与推理过程
  • Java线程池深度解析:从原理到实战的完整指南
  • ABAP - CPI - pass header parameter and filter parameter to odata service
  • 【C语言】函数栈帧的创建与销毁
  • 引入资源即针对于不同的屏幕尺寸,调用不同的css文件
  • 开发避坑指南(41):Vue3 提示框proxy.$modal.msgSuccess()提示文本换行解决方案
  • 腾讯混元开源视频拟音模型,破解 AI 视频 “无声” 难题
  • vscode 远程ssh登录免手动输入密码
  • 20.22 QLoRA微调实战:中文语音识别数据准备全流程解密
  • 音合成之二十四 微软VibeVoice语音合成模型
  • 2025通用证书研究:方法论、岗位映射与四证对比
  • 【Bluedroid】A2DP Source设备音频数据读取机制分析(btif_a2dp_source_read_callback)
  • Unity 打包 iOS,Xcode 构建并上传 App Store
  • Java 大视界 -- 基于 Java 的大数据实时流处理在智能电网分布式电源接入与电力系统稳定性维护中的应用(404)
  • mac中进行适用于IOS的静态库构建
  • 【大前端】React Native 调用 Android、iOS 原生能力封装
  • 基于FPGA的情绪感知系统设计方案:心理健康监测应用(五)
  • Ckman部署clickhouse
  • Qt基础_xiaozuo
  • Groovy集合常用简洁语法
  • linux mysql 数据库启动异常问题记录
  • KafKa学习笔记
  • AT_abc407_e [ABC407E] Most Valuable Parentheses
  • 前端开发中的CSS变量管理:实现缓存与响应式更新
  • 从 WPF 到 Avalonia 的迁移系列实战篇3:ResourceDictionary资源与样式的差异与迁移技巧
  • CuTe C++ 简介01,从示例开始