当前位置: 首页 > wzjs >正文

论坛网站制作费用虞城seo代理地址

论坛网站制作费用,虞城seo代理地址,网店美工的意义与发展,网站优化需要做什么Kafka消息0丢失实战 当你用Kafka处理业务时,是否担心过消息神秘失踪?下面将从SpringBoot整合实战出发,穿透生产者→Broker→消费者全链路 1、 消息丢失的三大场景 场景1:生产者自信发送 // 致命陷阱代码示例 Bean public Pro…

Kafka消息0丢失实战

当你用Kafka处理业务时,是否担心过消息神秘失踪?下面将从SpringBoot整合实战出发,穿透生产者→Broker→消费者全链路

1、 消息丢失的三大场景

场景1:生产者自信发送

// 致命陷阱代码示例

@Bean
public ProducerFactory<String, String> producerFactory() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.ACKS_CONFIG, "0"); // 发后即忘configs.put(ProducerConfig.RETRIES_CONFIG, 0); // 不重试return new DefaultKafkaProducerFactory<>(configs);
}

// 异步发送忘记回调

kafkaTemplate.send("order-topic", orderId, json).addCallback(result -> logger.info("发送成功"),  // 成功日志ex -> logger.error("发送失败")     // 错误吞没
);

场景2:Broker的死亡错觉

# 危险配置示范
auto.create.topics.enable=true     # 自动创建主题埋雷
unclean.leader.election.enable=true # 允许脏选举
min.insync.replicas=1             # 单副本存活即工作

场景3:消费者的自信提交

// 问题配置

@KafkaListener(topics = "order-topic")
public void handle(Order order) {try {paymentService.process(order);  // 处理耗时操作} finally {// 没有手动提交偏移量!}
}

// 错误配置:自动提交间隔过长

spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=5000

2、生产者端的钢铁防线

1. 同步发送+重试策略(金融级防护)

@Bean
public KafkaTemplate<String, String> reliableKafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 必须所有副本确认configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 防止乱序configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}

// 发送模板

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("order-topic", key, value);
future.get(10, TimeUnit.SECONDS); // 同步等待确认

2. 事务消息(分布式事务防护)

@Bean
public KafkaTransactionManager<String, String> transactionManager() {return new KafkaTransactionManager<>(producerFactory());
}
@Transactional
public void processOrder(Order order) {paymentService.charge(order);kafkaTemplate.send("payment-topic", order.getId(), order.toJson());inventoryService.reduceStock(order); 
}

3. 监控指标看护

// 注册监控指标

metrics.addMetric("producer-error-rate", (tags) -> {return producer.metrics().get("record-error-rate").value();
});

3、Broker集群的堡垒计划

1. 存活确认矩阵

# broker关键配置
unclean.leader.election.enable=false    # 禁止脏选举
min.insync.replicas=2                  # 至少2个副本确认
default.replication.factor=3           # 默认3副本
log.flush.interval.messages=10000      # 刷盘策略
log.flush.interval.ms=1000

2. ISR机制源码解析

// Kafka源码片段(Partition.scala)

def inSyncReplicas = {val leaderLogEndOffset = localLogOrException.logEndOffsetremoteReplicas.filter { replica =>replica.logEndOffset >= leaderLogEndOffset &&(time.milliseconds - replica.lastCaughtUpTimeMs) < replicaLagTimeMaxMs}
}

3. 磁盘防护策略

# 使用JBOD而不是RAID(Kafka最佳实践)
log.dirs=/data/kafka/log1,/data/kafka/log2,/data/kafka/log3
# 监控脚本示例
df -h | grep /data/kafka | awk '{if ($5 > 85) print "ALERT: "$6" usage over 85%"}'

4、消费者端的终极防御

1. 手动提交+死信队列

@KafkaListener(topics = "order-topic", groupId = "payment-group")
public void listen(ConsumerRecord<String, String> record,Acknowledgment ack,Consumer<String, String> consumer) {try {paymentService.process(record.value());ack.acknowledge(); // 手动提交} catch (Exception ex) {// 记录原始消息到死信队列kafkaTemplate.send("order-dlq", record.key(), record.value());// 重置偏移量到5秒前consumer.seek(record.topic(), record.partition(), record.offset() - 1);}
}

2. 消费者组反脆弱模式

# 关键配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.isolation.level=read_committed

3. 消费延迟监控

// 计算消费延迟

long lag = record.timestamp() - System.currentTimeMillis();
metrics.recordGauge("consumer.lag", lag);

5、全链路防护方案

1. 端到端校验设计

// 消息指纹校验

public class MessageWrapper {private String payload;private String checksum; // SHA256(payload + salt)
}

// 生产者端

String salt = "kafka-secure-2023";
String checksum = DigestUtils.sha256Hex(payload + salt);
template.send("topic", new MessageWrapper(payload, checksum));

// 消费者端

if (!DigestUtils.sha256Hex(message.getPayload() + salt).equals(message.getChecksum())) {throw new InvalidMessageException();
}

2. 混沌工程测试用例

@SpringBootTest
public class KafkaChaosTest {@Autowiredprivate KafkaChaosRunner chaosRunner;@Testpublic void testNetworkPartition() {chaosRunner.networkPartition("kafka-broker1", Duration.ofMinutes(5));// 验证消息不丢失}
}

3. 消息轨迹追踪

// 使用OpenTelemetry实现

Span sendSpan = tracer.spanBuilder("kafka.send").setAttribute("message.key", key).startSpan();
try (Scope scope = sendSpan.makeCurrent()) {kafkaTemplate.send("topic", key, value);
} finally {sendSpan.end();
}

配置核查清单
✅ 生产者acks=all且开启幂等
✅ broker禁用unclean leader选举
✅ 消费者关闭自动提交
✅ 事务消息开启read_committed
✅ 监控Producer/Consumer/Broker关键指标

http://www.dtcms.com/wzjs/166624.html

相关文章:

  • 手机网站进不去怎么解决江门百度seo公司
  • 网站做301将重定向到新域名在哪里推广比较好
  • 扬中网站优化公司北京seo关键词排名优化软件
  • 做门户网站需要准备什么我的百度网盘登录入口
  • 网站收录就是没排名网络推广公司主要做什么
  • 网站建设中高低端区别百度网站禁止访问怎么解除
  • 免费网站源码博客南宁百度推广seo
  • 医院网站建设价格海东地区谷歌seo网络优化
  • 网站跳出率是什么意思互联网推广方案怎么写
  • 音乐网站的制作巨量引擎广告投放平台登录入口
  • 正邦做网站吗创建网址链接
  • 北京市建设城乡建设委员会网站推广网站平台
  • 沧州企业网站制作的天津百度seo排名优化
  • 一级a做爰片免费网站性恔东莞做网站推广
  • 灵璧做网站的公司互联网金融营销案例
  • 山东广饶建设银行网站西安seo诊断
  • 潍坊微信网站开发品牌推广方案怎么写
  • 做计算机网站有哪些功能石家庄疫情最新消息
  • 网站开发客户端网站你应该明白我的意思吗
  • 登录深圳住房和建设局网站凡科网站官网
  • 广州网站app制作公司优就业seo
  • 做游戏网站选服务器常见的网站推广方法
  • php动态网站开发 项目教程成都网站关键词推广优化
  • 广州做网站公司哪家好产品互联网营销推广
  • 建设厅网站怎样刷身份证推广网站文案
  • 在线网站建设者直销怎么做才最快成功
  • 佛山制作网站公司广东网站优化公司
  • 网站上线前准备方案做网站的软件
  • asp个人网站源码手机百度网盘登录入口
  • 网站建设与运营实践考试黑帽seo365t技术