当前位置: 首页 > wzjs >正文

做微商城网站衡阳有实力seo优化

做微商城网站,衡阳有实力seo优化,产品线下推广活动方案,wordpress整合redis在Spring Boot项目中集成Kafka&#xff0c;首先需要确保Kafka服务器已经启动并正常运行。然后&#xff0c;通过引入 Kafka依赖&#xff0c;编写yml配置&#xff0c;创建Kafka生产者和消费者。最后通过接口调用&#xff0c;实现消息发送和消费。 一、pom依赖 <!--kafka-->…

在Spring Boot项目中集成Kafka,首先需要确保Kafka服务器已经启动并正常运行。然后,通过引入 Kafka依赖,编写yml配置,创建Kafka生产者和消费者。最后通过接口调用,实现消息发送和消费。

一、pom依赖

<!--kafka-->

<!--注意版本冲突-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

二、yml配置

spring:

  kafka:  

    bootstrap-servers: localhost:9092   # Kafka集群地址

    # 生产者配置

    producer:

      compression-type: gzip   # 默认不压缩,可选值:"gzip", "snappy", "lz4", "zstd"

      acks: all   # 确认机制

      retries: 3   # 发送失败后重试次数

      retry-backoff-ms: 500    # 每次重试之间的间隔时间(单位:毫秒)

      enable-idempotence: true   # 启用幂等性

      linger-ms: 5   # 延迟发送的时间,单位毫秒,默认 0ms(立即发送)

      batch-size: 32768   # 生产者每个批次发送的最大字节数,默认 16KB

      buffer-memory: 67108864   # 生产者缓冲区内存大小,单位字节,默认32MB

      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键的序列化器

      value-serializer: org.apache.kafka.common.serialization.StringSerializer   # 值的序列化器

      transactional-id-prefix: tx-   # 启用事务时的前缀,用于事务标识

      # 生产者拦截器,用于记录日志和指标收集

      interceptors:

        - org.apache.kafka.clients.producer.internals.LogAndMetricsProducerInterceptor

    # 消费者配置

    consumer:

      group-id: my-consumer-group   # 消费者组ID

      concurrency: 10   # 设置消费者并发数

      enable-auto-commit: false   # 禁用自动提交,手动提交偏移量

      auto-offset-reset: earliest   # 如果偏移量无效则从最早的消息开始消费。可选"latest"

      session-timeout-ms: 10000   # session会话超时时间(与心跳有关)

      max-poll-records: 1000   # 每次poll的最大记录数

      max-poll-interval-ms: 300000   # 最大轮询间隔 ,确保消费者不会在空闲期间超时 

      fetch-max-wait-ms: 300   # 拉取消息时的最大等待时间

      fetch-max-bytes: 104857600   # 拉取的最大字节数,单位字节

      fetch-min-size: 5000   # 拉取消息时的最小字节数,单位字节

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 键的反序列化器

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 值的反序列化器

      isolation-level: read_committed   # 事务隔离级别,确保读取已提交的数据。可选 "read_uncommitted"

    # Admin配置,用于创建和管理Kafka的主题

    admin:  

      bootstrap-servers: localhost:9092   # Kafka集群地址

      default-topic:

        partitions: 5   # 默认主题的分区数

        replication-factor: 3   # 默认主题的副本数

    # 副本机制

    replication:   # 副本落后超过此时间(毫秒)会被移出 ISR 列表

      replica_lag_time_max_ms: 10000   # 默认为 10000 毫秒,即 10 秒

      # 最少需要多少个同步副本才能进行写入

      min_insync_replicas: 2   # 默认为 1,根据需求设置

      # 是否允许在没有足够 ISR 的情况下进行领导者选举

      unclean_leader_election_enable: false   # 默认为 false,防止数据丢失

    # 监听器的配置

    listener:  

      concurrency: 10   # 设置监听器的并发数,控制消费者的并发线程数

      ack-mode: manual   # 手动确认消息,默认是 "auto" 自动确认

      poll-timeout: 3000   # 拉取消息的超时时间,单位毫秒

      missing-topics-fatal: false   # 是否忽略主题缺失的错误,默认是false

      container-factory: kafkaListenerContainerFactory   # 默认监听器容器工厂

    # 错误处理配置

    error-handler:

      # 在消息消费出错时重试的配置

      retry:

        max-attempts: 3   # 最大重试次数

        initial-interval: 1000   # 初始重试间隔,单位毫秒

        multiplier: 1.5   # 重试间隔的增量倍数

    #日志段和日志保留配置

    log: 

      # 设置每个日志段的大小

      segment:

        bytes: 1073741824   # 每个日志段的大小,默认为 1GB # 设置日志保留时间(毫秒)

      retention:

        ms: 604800000   # 7天,单位是毫秒

      # 设置日志的最大保留大小

      retention_bytes: 10737418240   # 默认为 10GB

      # 设置日志清理策略(delete 或 compact)

      cleanup_policy: "delete"   # 可选:delete 或 compact

      # 设置日志索引间隔大小

      index_interval_bytes: 4096   # 默认为 4096

三、KafkaConfig(可选)

@Configuration

public class KafkaConfig {

    @Bean

    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {

        return new KafkaTemplate<>(producerFactory);

    }

    @Bean

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }

}

四、KafkaProducer

在Spring Boot中创建Kafka生产者,向Kafka主题发送消息。

1.KafkaTemplate<String, String> 

Spring Kafka提供的用于生产消息的模板类。

2.sendMessage()

向指定的Kafka主题(test_topic)发送消息。

@Service

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {

        this.kafkaTemplate = kafkaTemplate;

    }

    public void sendMessage(String topic, String message) {

        kafkaTemplate.send(topic, message);

    }

}

五、KafkaConsumer

编写Kafka消费者来接收消息。

1.@KafkaListener

用于指定消费的主题及消费者组。

2.consumeMessage()

将会接收到来自test_topic主题的消息,并将其打印出来。

@Service

public class KafkaConsumer {

    // 监听test主题的消息

   @KafkaListener(topics = "test", groupId = "my-consumer-group")

    public void consumeMessage(String message) {

        System.out.println("Received Message in group my-group: " + message);

    }

}

六、自定义调用

在启动类中,使用CommandLineRunner来在应用启动时发送一条消息。

@SpringBootApplication

public class KafkaApplication {

    public static void main(String[] args) {

        SpringApplication.run(KafkaApplication.class, args);

    }

    @Bean

    public CommandLineRunner demo(KafkaProducer producer) {

        return (args) -> {

            producer.sendMessage("Hello, Kafka!");  

            // 启动时发送一条消息

        };

    }

}

七、注意事项

根据你的 Spring Boot 版本,选择兼容的 Kafka 依赖版本:

八、总结

在Spring Boot项目中集成Kafka,首先需要确保Kafka服务器已经启动并正常运行。然后,通过引入 Kafka依赖,编写yml配置,创建Kafka生产者和消费者。最后通过接口调用,实现消息发送和消费。

http://www.dtcms.com/wzjs/27371.html

相关文章:

  • 龙岗 网站建设深圳信科石家庄百度seo排名
  • 政府 教育 网站 案例东莞seo外包
  • 法律网站建设中国国家数据统计网
  • 国家级门户网站有哪些百度上看了不健康的内容犯法吗
  • 营销型网站如何建设全球热搜榜排名今日
  • 天津企业网站建站模板友情链接站长平台
  • 平凉建设局官方网站天津seo技术教程
  • 临沂网站建设服务商网络营销专业毕业论文
  • 棋牌游戏网站怎么做sem是什么基团
  • 建筑常用的模板下载网站有哪些杭州百度整站优化服务
  • 做外贸怎么看外国网站迅雷磁力链bt磁力天堂
  • 个人网站可以做导购吗网络推广seo教程
  • 北京网站怎么做疫情排行榜最新消息
  • 营销型网站重要特点是?维普网论文收录查询
  • html网站设计模板百度下载安装2019
  • 南通网站建设设计账户竞价托管公司
  • muse怎么做网站网站seo谷歌
  • 个人网站可以做导航石家庄网络营销
  • 福州网站建设福州站建设搜索引擎营销的基本流程
  • 1000M双线网站空间网站自然优化
  • 保健品网站模版幽默广告软文案例
  • 重庆做的好的房产网站seo排名如何优化
  • 复制审查元素做网站seo网络营销是什么意思
  • wordpress怎么放视频seo的优化原理
  • 网站排名如何上升泰安百度推广代理商
  • 一品威客做任务要给网站钱吗互联网销售怎么做
  • 淄博网站设计制作广东省各城市疫情搜索高峰进度
  • flash网页制作教程厦门零基础学seo
  • 网站的企业风采怎么做seo如何快速出排名
  • 网站目的及功能定位百度入驻商家