当前位置: 首页 > wzjs >正文

中国旅游网站建设现状及发展趋势分析关键词优化工具互点

中国旅游网站建设现状及发展趋势分析,关键词优化工具互点,wordpress mysql主机名,网站如何免费推广一、Kafka消费者提交Offset的策略 Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息…

一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

  1. 自动提交Offset:
    1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
    2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
  2. 手动提交Offset
    1. 消费者在消费消息时/后,再提交offset,在消费者中实现
    2. 手动提交Offset分为:手动同步提交、手动异步提交
  3. 什么是Offset
    1. 参考文章:Linux:【Kafka三】组件介绍

二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,自动提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerAutoSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时// 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}}}
}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动同步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//  业务逻辑处理for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 当for循环业务逻辑处理结束以后,再手动提交offset// 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。// 一般使用同步提交,在同步提交后不再做其他逻辑处理consumer.commitAsync();// do anything}}
}

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

package com.demo.lxb.kafka;import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动异步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset2 {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 异步提交,不影响后续的内容。// new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e != null){// 可将提交失败的消息记录到日志System.out.println("记录提交offset失败的消息到日志");System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));}}});// 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。//do anything}}
}

http://www.dtcms.com/wzjs/67321.html

相关文章:

  • 建程网信息可靠吗seo运营推广
  • 红安建设局网站宁波seo搜索优化费用
  • 网站建设市场规模seo文章排名优化
  • asp网站有哪些如何在网上做销售推广
  • 蓝色清爽网站百度推广客服电话多少
  • 公安网站后台管理系统服装店营销策划方案
  • 重庆建站公司价钱足球联赛排名
  • 专做美容师招聘网站郑州seo排名优化公司
  • 最新网站建设语言郑州网站建设推广优化
  • 郴州疫情最新消息今天封城了重庆电子商务seo
  • 长沙优秀网站建设seo的方法有哪些
  • 中山网站建设推广看广告收益最高的软件
  • 做百度收录比较好的网站收录网站是什么意思
  • 做推广哪些网站好磁力链最佳的搜索引擎
  • wordpress 文档导入seo综合查询站长工具关键词
  • 网站数据库太大搬家还原500错误乐陵seo优化
  • 做的网站名百度搜索指数和资讯指数
  • 招聘网站咋做爱战网关键词查询网站
  • 做问答营销的网站有哪些fifa世界排名最新
  • 企业邮箱的登录方式网站怎么优化排名
  • 网站开发费用报价百度新闻头条
  • 网站建设服b站推广app大全
  • 传统网站怎么做前端模块网站有哪些平台
  • 网站建设岗位廉政风险防控百度百度百度一下
  • 锤子手机网站模板湖南专业关键词优化服务水平
  • 百度站长平台删站新东方考研班收费价格表
  • 收钱码合并的网站怎么做长沙自动seo
  • 商城型网站建设代理加盟全国疫情地区查询最新
  • 综合性网站模板快速收录域名
  • 淘宝客怎么做的网站如何注册属于自己的网站