当前位置: 首页 > news >正文

centos7.9使用docker-compose安装kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已发送: " + message + " 到主题: " + topic;}
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送成功: " + message +", 分区: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息发送失败: " + ex.getMessage());}});}
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 监听指定的主题,groupId用于区分不同的消费者组@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 这里可以添加你的业务逻辑处理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

相关文章:

  • 大同网站建设优化营商环境的金句
  • 上海最新资讯重庆seo什么意思
  • 网站如何安装源码广州seo网络推广员
  • 怎么进入wordpress修改界面湛江百度seo公司
  • 一个公司做100个网站宣传广告怎么做吸引人
  • 网站开发的技术难点友情链接作用
  • GitAny - 無需登入的 GitHub 最新倉庫檢索工具
  • 图像分割技术的实现与比较分析
  • RabbitMQ 应用 - SpringBoot
  • html使用JS实现账号密码登录的简单案例
  • MFC: 文件加解密(单元测试模块)
  • 理解局部放电分析中的 PRPD 和 PRPS 图
  • 在Spring Boot中实现Kafka动态反序列化:针对多主题的灵活数据处理
  • MySQL数据库零基础入门教程:从安装配置到数据查询全掌握【MySQL系列】
  • 【图像大模型】Stable Diffusion XL:下一代文本到图像生成模型的技术突破与实践指南
  • 用python写节奏大师小游戏
  • 基于大模型的急性腐蚀性胃炎风险预测与诊疗方案研究报告
  • QT5.15 MacOS 打包指南
  • VLLM框架部署Qwen大模型,应该选择哪个qwen3系列的大模型和什么硬件配置?
  • Windows系统如何查看ssh公钥
  • pcdn的发展
  • 论文阅读:2024 arxiv Prompt Injection attack against LLM-integrated Applications
  • java-单列集合list与set。
  • 【SpringBoot】从零开始全面解析Spring IocDI (二)
  • 学习threejs,使用three-spritetext实现黑客帝国数字雨效果
  • C++ list基础概念、list初始化、list赋值操作、list大小操作、list数据插入