当前位置: 首页 > news >正文

kafka+spring cloud stream 发送接收消息

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!

相关文章:

  • 华为OD机试真题-相对开音节-OD统一考试(E卷)
  • Meterpreter之getsystem命令提权原理详解
  • Zotero 快速参考文献导出(特定期刊引用)
  • 区块链相关方法-波士顿矩阵 (BCG Matrix)
  • Codes 开源免费研发项目管理平台 2025年第一个大版本3.0.0 版本发布及创新的轻IPD实现
  • 在LangFlow中集成OpenAI Compatible API类型的大语言模型
  • 不同类型的网站选择不同的服务器,那么应该怎么选择服务器呢?
  • STM32-心知天气项目
  • python包重要修改
  • 如何把windows机器作为SSH客户端免密登录
  • Markdown使用方法文字版解读
  • 数据表的存储过程和函数介绍
  • OpenBMC:BmcWeb app.run
  • tortoiseGit的使用和上传拉取
  • 使用docker开发镜像编译
  • pytest下allure
  • 医院安全(不良)事件上报系统源码,基于Laravel8开发,依托其优雅的语法与强大的扩展能力
  • 更换node版本
  • LLM:Agent
  • 【多模态处理篇四】【DeepSeek跨模态检索:联合嵌入空间构建方案 】
  • 从近200件文物文献里,回望光华大学建校百年
  • 特朗普公开“怼”库克:苹果不应在印度生产手机
  • 高新波任西安电子科技大学校长
  • 外企聊营商|武田制药:知识产权保护助创新药研发
  • 92岁上海交大退休教师捐赠百万元给学校,其父也曾设奖学金
  • 为什么越来越多景区,把C位留给了书店?