当前位置: 首页 > wzjs >正文

网上拿手工做的网站电商网站对比表格

网上拿手工做的网站,电商网站对比表格,南头英文网站建设,自己在线制作logo免费广告招牌[Java实战]Spring Boot整合Kafka:高吞吐量消息系统实战(二十七) 一、引言 Apache Kafka作为一款高吞吐量、低延迟的分布式消息队列系统,广泛应用于实时数据处理、日志收集和事件驱动架构。结合Spring Boot的自动化配置能力&…

[Java实战]Spring Boot整合Kafka:高吞吐量消息系统实战(二十七)

一、引言

Apache Kafka作为一款高吞吐量、低延迟的分布式消息队列系统,广泛应用于实时数据处理、日志收集和事件驱动架构。结合Spring Boot的自动化配置能力,可以快速搭建高性能消息系统。本文将从环境搭建、代码实现、原理分析到测试优化,全面解析Spring Boot与Kafka的整合实践。

二、环境准备

1. Kafka安装与启动

  1. 下载Kafka:从Apache Kafka官网下载最新版本(推荐3.x+)。
  2. 启动Zookeeper(Kafka依赖):
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  3. 启动Kafka服务
    bin/kafka-server-start.sh config/server.properties
    

2. 创建Topic

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

说明:手动创建Topic可指定分区数(如3),提升并发处理能力。

三、环境准备(docker)

1. 使用Docker快速启动Kafka

通过Docker可以快速部署Kafka服务,无需手动安装依赖,步骤如下:

  1. 创建docker-compose.yml文件
    在项目根目录下新建文件,内容如下:
    version: '3'
    services:zookeeper:image: docker.1ms.run/confluentinc/cp-zookeeper:7.4.0ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: docker.1ms.run/confluentinc/cp-kafka:7.4.0ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.231.132:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"  # 禁止自动创建Topicdepends_on:- zookeeper
    

    关键配置说明

    • KAFKA_ADVERTISED_LISTENERS: 确保客户端能通过localhost:9092访问Kafka。
  • KAFKA_AUTO_CREATE_TOPICS_ENABLE: 设为false避免自动创建Topic,推荐手动控制。
  1. 启动Kafka服务
    执行以下命令启动服务:
    docker-compose up -d#停掉
    #docker-compose down
    

2. 创建Topic

通过Docker执行命令创建Topic:

docker exec -it kafka-kafka-1 kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

注意

  • kafka-kafka-1为容器名称(根据实际名称调整)。
  • --partitions 3指定分区数,提升并发处理能力。

3.安装成功截图

在这里插入图片描述

四、Spring Boot项目搭建

1. 添加依赖

pom.xml中引入Spring Kafka:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件

application.yml配置Kafka连接及序列化方式:

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

关键参数auto-offset-reset: earliest确保消费者从最早消息开始消费。

五、代码实现

1. 生产者配置

@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// 发送消息(支持回调)public void sendMessage(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(result -> {System.out.println("发送成功: " + result.getRecordMetadata().offset());}, ex -> {System.out.println("发送失败: " + ex.getMessage());});}
}

高级特性:回调机制可监控消息发送状态。

2. 消费者配置

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("接收到消息: " + message);// 业务处理逻辑}
}

批量消费:通过设置spring.kafka.consumer.max-poll-records可支持批量处理。

3.测试结果

KafkaController编写:

@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducer;@PostMapping("/send")public ResponseEntity<String> sendMs(@RequestBody String request) {kafkaProducer.sendMessage("my-topic","你好");return ResponseEntity.ok("ok");}
}

测试结果:

在这里插入图片描述

六、原理分析

1. Spring Kafka核心组件

  • KafkaTemplate:封装生产者操作,支持异步发送和事务管理。
  • @KafkaListener:基于监听器模式,自动创建消费者并订阅Topic。
  • ConsumerFactory/ProducerFactory:工厂类管理Kafka客户端配置。

2. 高吞吐量优化

  • 生产者端:调整batch.size(批次大小)和linger.ms(等待时间)提升批量发送效率。
  • 消费者端:增加分区数、配置多线程消费(ConcurrentKafkaListenerContainerFactory)。

七、高级特性

1. 自定义分区策略

实现Partitioner接口,指定消息路由规则:

public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑(如按Key哈希)return key.hashCode() % cluster.partitionCountForTopic(topic);}
}

配置文件中指定分区器:

spring:kafka:producer:properties:partitioner.class: com.example.CustomPartitioner

2. 事务支持

通过KafkaTransactionManager实现事务消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendInTransaction() {kafkaTemplate.executeInTransaction(operations -> {operations.send("topic1", "Message1");operations.send("topic2", "Message2");return null;});
}

八、测试步骤

1. 单元测试(使用嵌入式Kafka)

添加测试依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope>
</dependency>

编写测试类:

@SpringBootTest
@EmbeddedKafka(topics = "test-topic")
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void testSendAndReceive() {kafkaTemplate.send("test-topic", "Hello Kafka");// 通过监听器验证消息接收}
}

说明:嵌入式Kafka无需外部服务,适合CI/CD环境。

九、总结

本文从环境搭建到代码实现,结合Spring Boot与Kafka的高吞吐量特性,实现了消息系统的快速开发。通过自定义分区、事务支持和批量消费等高级功能,可进一步优化系统性能。实际应用中需根据业务场景调整参数,并借助监控工具(如Kafka Manager)持续优化。

参考文档

  • Spring Kafka官方文档
  • Apache Kafka架构解析

希望本教程对您有帮助,请点赞❤️收藏⭐关注支持!欢迎在评论区留言交流技术细节!

http://www.dtcms.com/wzjs/105662.html

相关文章:

  • 工商企业查询网seo免费推广软件
  • 北京云网站建设网站交换链接的常见形式
  • 织梦网站首页空白谷歌paypal官网入口
  • 保定专业网站建设免费seo排名软件
  • 静态网页做的网站怎么发到网上互联网推广员是做什么的
  • 阿里网站域名要购卖吗91关键词排名
  • 无需下载即可观看网页宁波seo的公司联系方式
  • 石家庄+外贸网站建设公司推广通
  • 江西合创建设工程有限公司 网站网站统计平台
  • 手机文章网站源码微信广告推广价格表
  • wordpress 安装 服务器 系统网站关键词怎么优化排名
  • 做那种的视频网站有哪些seo网络推广员招聘
  • wordpress主题存放目录山东seo多少钱
  • 网站上线多久才能百度网上推广用什么平台推广最好
  • 可以跟关键词密度过高的网站交换友情链接吗百度拍照搜索
  • 政府门户网站建设规范最近的时事新闻
  • 光纤做网站 移动不能访问电信东莞网站建设优化诊断
  • 什么网站做新产品代理搜索引擎营销的内容和层次有哪些
  • 哪个网站有手工活做免费发布平台
  • 网站规划建设心得与体会百度产品推广
  • 余姚 做网站杭州旺道企业服务有限公司
  • 西安大网站建设公司网站怎么优化关键词快速提升排名
  • 做网站很麻烦吗域名注册查询软件
  • 黄岛做网站哪家好长沙免费建站网络营销
  • 免费制作简历广州seo软件
  • 上海 设计网站建设微软优化大师
  • 电商网站建设的内容企业网站建设方案论文
  • 网站建设柚子网络科技怎么样徐州百度seo排名优化
  • 云南网站建设模块网络营销推广微信hyhyk1效果好
  • 汕头集团做网站方案域名收录查询