当前位置: 首页 > wzjs >正文

公司网站要怎么做产品招商网

公司网站要怎么做,产品招商网,绍兴网站建设方案托管,厦门专业网站设计公司目录 RabbitMQ保证消息可靠性 生产者丢失消息 MQ丢失消息 消费端丢失了数据 Kakfa的消息可靠性 生产者的消息可靠性 Kakfa的消息可靠性 消费者的消息可靠性 RabbitMQ保证消息可靠性 生产者丢失消息 1.事务消息保证 生产者在发送消息之前,开启事务消息随后生…

目录

RabbitMQ保证消息可靠性

生产者丢失消息

MQ丢失消息

消费端丢失了数据

Kakfa的消息可靠性

生产者的消息可靠性

Kakfa的消息可靠性

消费者的消息可靠性


RabbitMQ保证消息可靠性

生产者丢失消息

1.事务消息保证

生产者在发送消息之前,开启事务消息随后生产者发送消息,消息发送之后,如果消息没有被MQ接收到的话,生产者会收到异常报错,生产者回滚事务,然后重试消息,如果收到了消息,就能提交事务了

@Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();Channel channel = connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 开启事务channel.basicPublish("exchange", "routing.key", null, "message".getBytes());channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 出错回滚}
}

2.使用confirm机制

  • 普通confirm机制,就是发送消息之后,等待服务器confirm之后再发送下一个消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功发送到Broker");} else {System.out.println("消息发送失败,原因:" + cause);}
});rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
  • 批量confirm机制,每发送一批消息之后,等待服务器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();for (int i = 0; i < 100; i++) {channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息确认
  • 异步confirm机制,服务器confirm一个或者多个消息之后,客户端(生产者)能够通过回调函数来确定消息是否被confirm(推荐)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println("未确认消息:" + tag);if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq = channel.getNextPublishSeqNo();channel.basicPublish("demo.exchange", "demo.key",MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());pendingSet.add(seq);
}

MQ丢失消息

防止MQ的丢失数据的话,方法就是开启RabbitMQ的持久化,消息写入之后(也就是到了MQ之后)就直接持久化到磁盘中,即使Rabbimq自己挂了之后,会恢复数据。

设置持久化步骤

  • 创建queue的时候直接设置持久化,此时就能持久化queue的元数据(不是消息)
@Bean
public Queue durableQueue() {return new Queue("myQueue", true); // true 表示持久化
}
  • 发送消息的时候指定消息为deliveryMode设置为2,也就是设置消息为持久化,此时消息可以持久化磁盘上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);

极端情况:

消息写到RabbitMQ之后,但是还没有持久化到磁盘之后直接挂了,导致内存中消息丢失。

解决方法:持久化与生产者的confirm机制配合,当且仅当持久化了消息之后,再confirm,避免数据与消息丢失,此时生产者收不到ack,也是可以自己重发

消费端丢失了数据

意思就是消息已经拉取到了信息,还没有处理(注意这是已经告诉MQ我拉取到数据了),结果进程挂了,重启之后继续消费下一条消息,导致中间的这一条没有消费到,此时数据丢失了。

利用ack机制处理

取消RabbiMQ的自动ack,也就是一个api,可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ack,RabbitMQ会把该消息分配给其他的consumer处理,消息不会丢失。通过配置处理

spring:rabbitmq:listener:simple:acknowledge-mode: manual

Kakfa的消息可靠性

生产者的消息可靠性

在kafka中,可以在producer(生产段)设置一个参数,也就是ack=all,要求每个数据,必须写入所有的replica(也就是所有该分区的副本),才认为是接收成功。该参数设置的是你的leader接收到消息后,所有的follower都同步到消息后才认为写成功

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("发送成功:" + metadata.offset());} else {exception.printStackTrace();}
});
producer.close();

Kakfa的消息可靠性

kafka默认是会将消息持久化到磁盘上的,但是还是有情况会导致丢失数据

kafka某个broker宕机,随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息,还没有被其他broker中的follower同步,此时同步缺失的数据就丢失了,也就是少了一些数据

解决方法:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

按照上面的配置之后,leader的切换就不会导致数据缺失了。

消费者的消息可靠性

唯一可能也是类似于RabbitMQ中的,也就是说你消费到该消息的时候,消费者自动提交offset,让kafka以为你消费好了该消息,但是自己还没处理就宕机后,会导致重启后没有消费该消息。

解决方法:

关闭kafka默认的自动提交offset,通过消费端业务逻辑处理完消息后,再手动提交offset,当然这里就是会导致重复消费了,这里就是幂等性的问题了。比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次

手动提交api:consumer.commitSync();

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.println("处理消息:" + record.value());}// 手动提交 offsetconsumer.commitSync();
}


文章转载自:

http://XO681cte.xhwty.cn
http://TNi4rg4I.xhwty.cn
http://D6ctaUz5.xhwty.cn
http://cLcPZUd0.xhwty.cn
http://l8BySgk9.xhwty.cn
http://O5sMiNOt.xhwty.cn
http://dVKAcsO6.xhwty.cn
http://o92Ez3bX.xhwty.cn
http://rSny9ISf.xhwty.cn
http://5Q8k01Mj.xhwty.cn
http://DLDss2Lf.xhwty.cn
http://iMuz9yy0.xhwty.cn
http://46CLYBZD.xhwty.cn
http://w9mLxBpO.xhwty.cn
http://POJWUz2h.xhwty.cn
http://wVZ2f5U3.xhwty.cn
http://zVNP93O7.xhwty.cn
http://1p2mjuDT.xhwty.cn
http://mnqjHH82.xhwty.cn
http://dwUt2chE.xhwty.cn
http://6Udnct4t.xhwty.cn
http://o7COXYLK.xhwty.cn
http://gWdNu8qt.xhwty.cn
http://IoE1prBi.xhwty.cn
http://5FkmIi5q.xhwty.cn
http://gmEILMuy.xhwty.cn
http://nl710dgk.xhwty.cn
http://F0TznyQs.xhwty.cn
http://ANJuUV0C.xhwty.cn
http://tNVlgcgy.xhwty.cn
http://www.dtcms.com/wzjs/676692.html

相关文章:

  • 网站建设需求说明书怎么写建立免费公司网站
  • 怎么自建一个网站制作博客网站
  • 郑州网站制作企业建e网怎么做效果图
  • 网站建设三秒原则wordpress 禁用google字体
  • 郑东新区网站开发软件开发培训学校的三大特色
  • 承德市宽城县建设局网站wordpress 主题模板下载
  • 如何海外网站建设公司宣传 如何做公司网站
  • 网站建设招聘启事自己有域名怎么做免费网站
  • 网站建设开发制作网站建设文化如何
  • 深圳做网站收费游戏网站服务器租用
  • 手机网站 win8风格淘宝客优惠卷网站模板
  • 如何购买网站空间采集电影做的网站
  • 建设免费手机网站wordpress博客设置主题方法
  • 做外贸上哪些网站找客户做电影网站的流程
  • 做58同城这样的网站有哪些wordpress 企业 模板 下载
  • 建筑培训网站有哪些怎样做免费网站建设
  • 九江本土专业网站建设网上购物系统功能需求分析
  • asp.net网站开发流程及相关工具成都旅游景点大全排名
  • 做钓鱼网站论坛wordpress 同步公众号
  • 怎么做网站流量赚钱吗做平台好还是做网站好
  • 修改wordpress后台地址 插件搜索排名优化
  • 建立网站大概需要多少钱网站建设亼仐团
  • 推荐一个做淘客网站756ka网站建设
  • 网站定制报价网站集群建设必要性
  • 做seo网站图片怎么优化最便宜做个网站多少钱
  • 一个空间怎么放两个网站吗王烨然盈盈
  • 呼和浩特市网站玄天教学网站建设
  • 小学网站建设设计方案做网站好赚钱
  • 天津网站制作工具thinkphp5来做网站吗
  • 网站建设php培训视频拍摄软件