当前位置: 首页 > news >正文

在工商局网站怎么做清算app市场分析

在工商局网站怎么做清算,app市场分析,关于室内设计的网站有哪些,网站上的地图怎么做Kafka 是一个高性能的分布式消息系统,但消费者重启、偏移量(offset)未正确提交或网络问题可能导致重复消费。API 幂等性设计则用于防止重复操作带来的副作用。本文从 Kafka 重复消费和 API 幂等性两个方面提供解决方案,重点深入探…

Kafka 是一个高性能的分布式消息系统,但消费者重启、偏移量(offset)未正确提交或网络问题可能导致重复消费。API 幂等性设计则用于防止重复操作带来的副作用。本文从 Kafka 重复消费和 API 幂等性两个方面提供解决方案,重点深入探讨 事务性偏移量管理 如何实现精确一次消费(exactly-once),并结合其他方法确保消息可靠性和一致性。

1. Kafka 重复消费问题

Kafka 的重复消费问题通常由以下原因引发:消费者异常退出导致偏移量未提交、网络抖动、消费者组再平衡(rebalance)等。以下是解决重复消费的几种方法,重点聚焦事务性偏移量管理。

1.1 启用消费者幂等性

  • 手动提交偏移量

    • 设置 enable.auto.commit=false,在消息处理成功后手动提交偏移量(commitSynccommitAsync),确保消费与业务处理一致,减少重复消费风险。
    • commitSync:同步提交,阻塞直到 Broker 确认,适合高一致性场景,但可能降低吞吐量。
    • commitAsync:异步提交,非阻塞,适合高吞吐场景,但需通过回调(OffsetCommitCallback)监控提交失败并重试,以避免偏移量丢失导致重复消费。
    • 示例:
      Properties props = new Properties();
      props.put("enable.auto.commit", "false");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("my-topic"));
      while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processRecord(record);}consumer.commitSync(); // 同步提交偏移量
      }
      
  • 事务性消费(重点:事务性偏移量管理)

    • 核心原理:通过 Kafka 的事务机制,将消息生产、消费和偏移量提交绑定在一个原子操作中,确保消息只被处理一次(exactly-once)。这依赖于生产者事务(transactional.id)和消费者隔离级别(isolation.level=read_committed)。
    • 事务性偏移量管理的实现
      • 生产者事务:生产者配置 transactional.idenable.idempotence=true,通过 initTransactions()beginTransaction()commitTransaction() 等操作管理事务。生产者使用 sendOffsetsToTransaction() 将消费者偏移量纳入事务,确保偏移量提交与消息写入原子性一致。
      • 消费者隔离级别:消费者设置 isolation.level=read_committed,只读取已提交的事务消息,未提交或回滚的消息对消费者不可见。
      • 偏移量存储:消费者偏移量存储在 Kafka 内部主题 __consumer_offsets 中,事务性提交通过生产者的事务机制记录,确保偏移量与消息处理同步。
  • 代码示例:

    public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事务producer.initTransactions();while(true) {try {// 1. 开启事务producer.beginTransaction();// 2. 定义Map结构,用于保存分区对应的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 进行转换处理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生产消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事务producer.commitTransaction();} catch (Exception e) {// 8. 放弃事务producer.abortTransaction();}}}// 1. 创建消费者public static Consumer<String, String> createConsumer() {// 1. 创建Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 创建生产者public static Producer<String, String> createProduceer() {// 1. 创建生产者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}
    
    • 深入理解事务性偏移量管理
      • 原子性:事务性偏移量提交将消息写入、业务处理和偏移量提交绑定在一个事务中,确保三者要么全成功,要么全失败。例如,若消费者处理消息后数据库操作失败,事务回滚,偏移量不会提交,消费者可重新消费。
      • 去重机制:Broker 根据 transactional.id 和序列号(Sequence Number)对生产者消息去重,防止重复写入。消费者通过 read_committed 隔离级别避免读取未提交消息。
      • 偏移量持久化:偏移量记录在 __consumer_offsets 主题中,事务性提交通过事务协调器(Transaction Coordinator)管理,确保偏移量与消息一致。
      • 故障恢复:消费者重启后,从 __consumer_offsets 中读取最后提交的偏移量开始消费。由于事务性提交保证偏移量与消息处理一致,不会重复消费。
    • 适用场景
      • 金融系统:如支付、转账,确保每笔交易只处理一次。
      • 订单处理:防止重复创建订单。
      • 数据同步:确保数据从源到目标的精确一次传递。
    • 性能考量
      • 事务增加日志写入和协调开销,适合高一致性场景。
      • 建议保持事务范围短,避免长时间占用资源。
    • 版本要求:Kafka 0.11.0+ 支持事务,推荐 2.0+ 版本以获得更稳定的事务支持。

1.2 业务层去重

  • 方法:在消息中添加唯一标识(如消息ID、业务ID),消费者端通过数据库(如 Redis、MySQL)或内存记录已处理的消息ID,消费前检查是否重复。
  • 数据库表结构示例
    CREATE TABLE consumed_messages (message_id VARCHAR(64) PRIMARY KEY,consume_time TIMESTAMP
    );
    
    消费时查询 message_id 是否存在,若存在则跳过。
  • Redis 实现
    if (redis.exists(messageId)) {return; // 跳过重复消息
    }
    // 处理消息
    processMessage(message);
    redis.set(messageId, "processed", EXPIRE_TIME_SECONDS);
    
  • 优势:简单易实现,适合无事务支持的旧版本 Kafka 或非严格 exactly-once 场景。
  • 局限:增加存储和查询开销,需定期清理去重记录。

1.3 偏移量管理

  • 可靠提交
    • 使用 commitSync() 确保偏移量提交成功,适合高一致性场景。
    • 使用 commitAsync() 提高吞吐量,但需通过回调监控失败并重试:
      consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + exception);// 重试或记录日志}
      });
      
  • 外部存储
    • 将偏移量存储到外部系统(如 Redis、ZooKeeper),异常恢复时从外部读取正确偏移量。
    • 示例(Redis):
      redis.set("consumer:group:offset", offset);
      
  • 注意:外部存储需保证一致性,可能增加复杂度,事务性偏移量管理更推荐。

1.4 消费者组优化

  • 唯一消费者组ID:确保 group.id 唯一,避免多个消费者组重复消费同一分区。
  • 配置超时参数
    • session.timeout.ms:建议 10-20 秒(如 10000ms),避免消费者因网络延迟被踢出组。
    • max.poll.interval.ms:建议 5-10 分钟(如 300000ms),适应消息处理耗时,避免超时触发再平衡。
    • 示例:
      props.put("session.timeout.ms", "10000");
      props.put("max.poll.interval.ms", "300000");
      
  • 监控再平衡:通过日志或 JMX 指标检查再平衡频率,优化参数以减少偏移量混乱。

2. API 幂等消费问题

API 幂等性确保多次调用同一 API 产生相同结果,防止重复操作的副作用。结合 Kafka,解决方法如下:

2.1 Kafka 生产者幂等性

  • 配置
    • 设置 enable.idempotence=true,Kafka 自动为消息分配序列号和分区标识,Broker 端去重。
    • 配置 retries=5acks=-1,确保消息可靠投递:
      props.put("enable.idempotence", "true");
      props.put("retries", "5");
      props.put("acks", "all");
      
  • 作用:生产者重试不会导致消息重复写入,Broker 根据序列号去重。

2.2 API 层幂等设计

  • 唯一请求ID
    • 为每个 API 请求生成唯一 ID(如 UUID),服务端用 Redis 或数据库记录已处理请求。
    • 示例(Redis):
      if (redis.exists(requestId)) {return cachedResult;
      }
      redis.set(requestId, result, EXPIRE_TIME_SECONDS);
      
  • 数据库约束
    • 使用唯一约束(如订单号)防止重复插入:
      CREATE TABLE orders (order_id VARCHAR(64) PRIMARY KEY,amount DECIMAL,create_time TIMESTAMP
      );
      
      插入时捕获唯一约束异常并返回。

2.3 结合 Kafka 事务

  • 方法:使用事务性生产者(transactional.id),将 API 操作(如数据库写入)和消息发送绑定在同一事务中,确保原子性。
  • 示例
    producer.initTransactions();
    producer.beginTransaction();
    try {producer.send(new ProducerRecord<>("topic", message));db.save(order); // 数据库操作producer.commitTransaction();
    } catch (Exception e) {producer.abortTransaction();throw e;
    }
    
  • 作用:事务失败时,消息和数据库操作均回滚,避免不一致。

3. 综合建议

  • 短事务:尽量减少事务范围(如仅包含必要操作),降低资源占用。
  • 分布式锁:在分布式系统中,使用 Redis 或 ZooKeeper 实现锁,防止并发重复处理。
  • 监控与日志:记录消息ID、处理时间等日志,便于排查重复消费问题。
  • 超时与重试:设置合理超时(如 request.timeout.ms)和重试次数(如 retries),避免无限重试。

4. 注意事项

  • 性能与一致性权衡
    • Redis 适合高性能去重,数据库适合强一致性场景。
    • 事务性机制增加开销,适合高一致性需求场景(如金融、订单)。
  • Kafka 版本:exactly-once 语义需 Kafka 0.11.0+,推荐 2.0+。
  • 清理去重记录:设置 Redis 过期时间或定期清理数据库记录,避免存储膨胀。
  • Broker 配置
    • min.insync.replicas=2:确保 acks=-1 的可靠性。
    • transaction.state.log.replication.factor=3:事务日志高可用。
    • num.partitions__consumer_offsets__transaction_state):建议 ≥50,提高并发性。

5. 深入理解事务性偏移量管理的优势

  • 一致性:事务性偏移量提交确保消息处理、偏移量更新和外部操作(如数据库写入)原子性一致,消除了重复消费和消息丢失的风险。
  • 容错性:消费者重启后,从 __consumer_offsets 中读取最后提交的偏移量,确保从正确位置继续消费。
  • 可扩展性:事务机制支持分布式环境,生产者和消费者可跨节点协作,适合复杂系统。
  • Broker 支持
    • 事务协调器(Transaction Coordinator)管理事务状态,存储在 __transaction_state 主题。
    • Broker 去重机制(基于 transactional.id 和序列号)防止重复写入。
  • 实现复杂度
    • 需要生产者和消费者协同配置(transactional.idisolation.level)。
    • 事务性偏移量提交通常由生产者通过 sendOffsetsToTransaction() 完成,消费者仅需确保 read_committed 和手动提交。

6. 总结

通过事务性偏移量管理,Kafka 结合生产者事务(transactional.idenable.idempotence=trueacks=-1)和消费者配置(isolation.level=read_committedenable.auto.commit=false),实现消息从生产到消费的精确一次语义。事务性偏移量提交将消息写入、业务处理和偏移量更新绑定在一个原子事务中,确保不重复、不丢失。结合业务层去重、偏移量管理和消费者组优化,可进一步提升系统可靠性。Broker 端通过事务协调器和内部主题(__consumer_offsets__transaction_state)支持事务性机制,确保高一致性场景下的可靠投递和消费。

http://www.dtcms.com/a/514826.html

相关文章:

  • 土木毕业设计代做网站什么网站可以做市场分析呢
  • 小企业门户网站建设网站开发的基本过程
  • 微网站平台微网站建设方案网络优化工程师
  • 软件下载网站如何履行网站建设要学习什么
  • 微信商城网站凡科网下载
  • 网站地址栏图标怎么做平面设计主要做什么内容
  • 湖北洲天建设集团有限公司网站九牧全球市场地位
  • 广东建设厅网站网站建设费如何账务处理
  • 东网站建设wordpress留言发送邮件
  • 网站开发是编程吗石家庄seo培训
  • 给一个网站做需求分析广州网络营销十年乐云seo
  • 兰州做it网站运营的怎么样今天江苏最新新闻
  • 平和网站建设猎头公司好做吗
  • 网站建设尢首先金手指东莞网上推广
  • 济南网站建设公司网站建设dw 什么软件
  • app购物网站建设vi设计是平面设计吗
  • 网页设计与网站建设连接数据库手机网站标准字体大小
  • 做家乡网站需要哪些内容湘潭做网站价格 磐石网络
  • 龙潭古镇网站建设网站选择语言怎么做
  • 中国建设工程招标官方网站营销外包公司
  • 网站后台使用什么做的网站建设中应注意的问题
  • 百度做网站吗舆情信息
  • 自考都到哪个网站找题做wordpress 关闭插件更新
  • 长沙网络公司网站网站建设留言板的实现
  • 小城镇建设网站的观点html5网站图标
  • 科技有限公司网站建设策划书白云区做网站
  • 北京网络推广兰州网站优化推广
  • asp做网站很少自己如何做棋牌网站
  • 58同城网站建设深圳丽丽亚wordpress编辑器主题
  • wordpress商城网站wordpress 显示空白