当前位置: 首页 > news >正文

kafka学习

目录

    • 下载
    • 启动
      • 使用zookeeper启动
      • 使用Kraft启动
    • 使用
      • 创建主题
    • 在springboot中使用
      • 生产者发送消息后

下载

直接到官网下载:https://kafka.apache.org/
然后解压
在这里插入图片描述
在这里插入图片描述
config里面是配置
logs里面是日志

启动

使用zookeeper启动

kafka运行在zookeeper上,先启动zookeeper,再启动kafka

  1. 先启动zookeeper,kafka里面内置了zookeeper,不需要我们再去下载,直接使用内置的就可以
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

可能报错:命令语法不正确。
解决方法:只需要让文件夹名字短一点就行在这里插入图片描述

出现这样结果就是成功启动
在这里插入图片描述

  1. 然后启动kafka
bin\windows\kafka-server-start.bat config\server.properties

如果这次启动失败了,关掉zookeeper重新启动 某位大佬的解决方法

使用Kraft启动

  1. 生成uuid
    在这里插入图片描述

  2. 格式化目录
    在这里插入图片描述

  3. 启动
    在这里插入图片描述
    也可以自定义集群id,在“格式化目录”时,把uuid换成自己定义的集群id就行

注意路径的变化

使用

创建主题

D:\MY\kafka\kafka3.7\bin\windows>kafka-topics.bat --create --topic dello --bootstrap-server localhost:9092
Created topic dello.

查看主题:

D:\MY\kafka\kafka3.7\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
dello

在springboot中使用

导入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

kafka是一个消息中间件,在生产者和消费者中间,所以需要配置3类资料:

  1. kafka自己的资料
  2. 生产者
  3. 消费者

配置文件中设置:

#自己的地址
spring.kafka.bootstrap-servers=localhost:9092
  1. 新建一个生产者的类:
@Component
public class EventProducer {

    //前面导入依赖+配置文件就会自动装配kafka
    @Resource
    public KafkaTemplate<String,String> kafkaTemplate;
	//发送事件,在hello主题上发一个i am a banana消息
    public void sentEvent(){
        kafkaTemplate.send("hello","i am a banana");
    }
    
    public void sentEvent02(){
        Message<String> message = MessageBuilder.withPayload("i am a banana too").setHeader(KafkaHeaders.TOPIC,
                "hello02").build();
        kafkaTemplate.send(message);
    }
    
    public void sentEvent03(){
        //里面放一些信息,消费者可以接收到
        Headers headers = new RecordHeaders();
        headers.add("color","yellow".getBytes(StandardCharsets.UTF_8));
        
        ProducerRecord producerRecord = new ProducerRecord<>(
                "hello03",0,System.currentTimeMillis(),"who","banana",headers
        );
        kafkaTemplate.send(producerRecord);
    }
    
    //使用这个发送方法,要在配置文件中新增默认topic的配置
    //spring.kafka.template.default-topic=default-topic
    public void sentEvent04(){
        kafkaTemplate.sendDefault("banana");
    }
	
}

不同的send方法本质都是把消息转换成 ProducerRecord 形式

进行测试:

@SpringBootTest
class Base01ApplicationTests {

    @Autowired
    private EventProducer eventProducer;
    @Test
    void test01() {
        eventProducer.sentEvent();
    }

}

不报错就是成功了

  1. 新建一个消费者的类:
//被初始化之后就一直在监听,默认监听到新发来的消息
@Component
public class EventConsumer {
    //这会开辟一个线程一直执行监听工作
    @KafkaListener(topics = "hello", groupId = "hello-group")
    public void onEvent(String event){
        System.out.println("监听到:" + event);
    }
}

运行启动类,开启监听线程,然后运行测试类,发送一个消息,结果:

监听到:i am a banana

在这里插入图片描述
如果想让一个新的消费者组groupId 开始消费当前主题,使用earliest就能读取到历史消息,(只限于新的消费者组,之前没消费过这个主题的)

生产者发送消息后

每一个send方法的返回结果都是CompletableFuture<SendResult<K, V>>

    public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.observeSend(producerRecord);
    }

发送消息是异步的,即:生产者发完消息就去干别的事情,但是马上就能拿到CompletableFutureCompletableFuture里面没有内容,他表示的是未来的消息,如果消息成功发送了,CompletableFuture里面才会有内容

拿到CompletableFuture里面内容的方法有:

  1. 阻塞方式拿到:里面的get()方法是阻塞的
    public void sentEvent04(){
        CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault("banana");
        try {
            SendResult<String, String> sendResult = completableFuture.get();
            if(sendResult.getRecordMetadata() != null){
                System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());
            }
        }  catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

在这里插入图片描述

  1. 非阻塞方式:thenAccept()方法注册回调函数,发送完成就执行这个回调函数,回调函数的返回结果还是CompletableFuture类型,可以继续执行回调函数
    public void sentEvent05(){
        CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault("banana");
        try {
            completableFuture.thenAccept((t) -> {
                if(t.getRecordMetadata() != null){
                    System.out.println("消息发送成功:"+t.getRecordMetadata().toString());
                }
            }).exceptionally((t) -> {
                t.printStackTrace();
                return null;
            });

        }  catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

相关文章:

  • 登录验证码的接口实习,uuid,code.
  • 数据结构十五、排序
  • 【计算机网络编码与调制】
  • 2025年- G28-Lc102-973. K 个距离原点最近的点--java版
  • 适合开发点餐系统的PHP开源框架要具备哪些优势?
  • Java设计模式之迭代器模式
  • 强化学习与智能决策:基本原理、算法及应用
  • @JSONField(serialize = false)序列化过程中排除特定字段
  • 从零构建大语言模型全栈开发指南:第二部分:模型架构设计与实现-2.2.3实战案例:在笔记本电脑上运行轻量级LLM
  • NLP高频面试题(十六)——deepspeed原理
  • 记一次线上环境JAR冲突导致程序报错org.springframework.web.util.NestedServletException
  • v-model 总结
  • Java后端API限流秘籍:高并发的防护伞与实战指南
  • unittest自动化测试实战
  • 嵌入式硬件工程师从小白到入门-PCB绘制(二)
  • Qt跨平台文件传输系统开发全解:TCP/IP协议+多线程架构
  • 观成科技:海莲花利用MST投递远控木马
  • K8s的网络
  • Go环境相关理解
  • MySQL 中,分库分表机制和分表分库策略
  • wordpress学生/广东做seo的公司
  • 海报素材网站推荐/抖音代运营收费详细价格
  • 独立做网站需要学习什么/体验营销
  • 魅力网络营销公司/网站搜索优化技巧
  • 网站怎样做能排名靠前/西安seo优化工作室
  • 用vs2015做网站教程/关键字参数