当前位置: 首页 > news >正文

kafka如何保证数据不丢失

下面我将使用 Python 代码示例,从生产者、集群和消费者三个层面详细讲解 Kafka 如何保证数据不丢失。我们将使用kafka-python库来实现相关功能。

一、生产者层面的数据不丢失保证

生产者通过配置确认机制、重试策略和幂等性来确保数据不丢失。

from kafka import KafkaProducer
from kafka.errors import KafkaError
import timedef create_safe_producer():# 配置生产者属性producer = KafkaProducer(bootstrap_servers=['localhost:9092'],# 确保所有ISR中的副本确认消息acks='all',# 失败重试次数retries=3,# 重试间隔(毫秒)retry_backoff_ms=1000,# 开启幂等性,防止重复发送enable_idempotence=True,# 限制未确认请求数量,保证顺序max_in_flight_requests_per_connection=1,# 序列化器value_serializer=lambda v: str(v).encode('utf-8'))return producerdef send_message_safely(producer, topic, message):try:# 发送消息并等待确认(同步发送)future = producer.send(topic, message)# 等待服务器响应record_metadata = future.get(timeout=10)print(f"消息发送成功 - 主题: {record_metadata.topic}, "f"分区: {record_metadata.partition}, "f"偏移量: {record_metadata.offset}")return Trueexcept KafkaError as e:print(f"消息发送失败: {str(e)}")# 这里可以添加自定义的重试逻辑或持久化失败的消息return Falseexcept Exception as e:print(f"发送过程中发生错误: {str(e)}")return Falseif __name__ == "__main__":producer = create_safe_producer()topic = "safe_topic"try:# 发送测试消息for i in range(5):message = f"这是第{i+1}条需要确保不丢失的消息"success = send_message_safely(producer, topic, message)if not success:print(f"消息 '{message}' 发送失败,已记录待后续处理")time.sleep(1)finally:# 确保所有缓冲消息都被发送producer.flush()producer.close()

关键配置说明:

  • acks='all':最安全的配置,消息需被所有同步副本确认
  • retries=3:发送失败时自动重试 3 次
  • enable_idempotence=True:开启幂等性,确保重试不会导致消息重复
  • 同步发送:通过future.get()等待结果,确保知道消息是否发送成功

二、集群层面的数据不丢失保证

Kafka 集群通过副本机制和 ISR(同步副本集)来保证数据不丢失。

1. 集群配置(server.properties)

# 每个broker的唯一标识
broker.id=0# 日志存储路径
log.dirs=/tmp/kafka-logs# 确保数据不丢失的关键配置
default.replication.factor=3  # 新主题默认副本数
min.insync.replicas=2         # 最小同步副本数,与生产者acks=all配合# 副本同步配置
replica.lag.time.max.ms=30000  # 副本同步滞后的最大时间# 禁止非ISR副本成为领导者,避免数据丢失
unclean.leader.election.enable=false# 日志保留策略
log.retention.hours=168  # 日志保留时间# Zookeeper连接
zookeeper.connect=localhost:2181

2. 创建高可用主题(Python 代码)

from kafka.admin import KafkaAdminClient, NewTopicdef create_safe_topic():# 连接到Kafka集群admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092",client_id='topic_creator')# 定义主题配置,确保高可用topic_name = "safe_topic"num_partitions = 3  # 分区数replication_factor = 3  # 每个分区的副本数# 创建主题topic_list = [NewTopic(name=topic_name,num_partitions=num_partitions,replication_factor=replication_factor,# 额外配置configs={'min.insync.replicas': '2'  # 此主题的最小同步副本数})]try:# 创建主题admin_client.create_topics(new_topics=topic_list, validate_only=False)print(f"主题 '{topic_name}' 创建成功,分区数: {num_partitions}, 副本数: {replication_factor}")except Exception as e:print(f"创建主题失败: {str(e)}")finally:admin_client.close()if __name__ == "__main__":create_safe_topic()

关键配置说明:

  • default.replication.factor=3:每个分区默认有 3 个副本,分布在不同 broker 上
  • min.insync.replicas=2:与生产者 acks='all' 配合,确保至少 2 个副本确认接收消息
  • unclean.leader.election.enable=false:防止非同步副本成为领导者,避免数据丢失

三、消费者层面的数据不丢失保证

消费者通过手动提交偏移量和异常处理来确保数据不丢失。

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import timedef create_safe_consumer(group_id):# 配置消费者属性consumer = KafkaConsumer('safe_topic',bootstrap_servers=['localhost:9092'],group_id=group_id,# 禁用自动提交偏移量enable_auto_commit=False,# 没有偏移量时从最早的消息开始消费auto_offset_reset='earliest',# 反序列化器value_deserializer=lambda m: m.decode('utf-8'),# 拉取超时时间consumer_timeout_ms=10000)return consumerdef process_message(message):"""处理消息的业务逻辑"""# 模拟处理时间time.sleep(0.5)print(f"处理消息: {message}")# 这里可以添加实际的业务逻辑# 如果处理失败,可以抛出异常# if some_condition:#     raise Exception("处理失败")return Truedef consume_messages_safely(consumer):try:while True:# 拉取消息messages = consumer.poll(timeout_ms=1000)if not messages:continueall_processed = True# 处理每个分区的消息for partition, records in messages.items():for record in records:try:# 处理消息success = process_message(record.value)if not success:all_processed = Falseprint(f"消息处理失败: {record.value}")except Exception as e:all_processed = Falseprint(f"处理消息时发生错误: {str(e)}, 消息: {record.value}")# 可以将失败的消息发送到死信队列# send_to_dead_letter_queue(record)# 只有所有消息都处理成功后才提交偏移量if all_processed:consumer.commit()print("偏移量已提交")else:print("部分消息处理失败,不提交偏移量")except KafkaError as e:print(f"消费过程中发生Kafka错误: {str(e)}")except Exception as e:print(f"消费过程中发生错误: {str(e)}")finally:consumer.close()if __name__ == "__main__":consumer = create_safe_consumer("safe_consumer_group")consume_messages_safely(consumer)

关键配置说明:

  • enable_auto_commit=False:禁用自动提交,由应用控制何时提交偏移量
  • auto_offset_reset='earliest':无偏移量时从最早消息开始消费
  • 手动提交:只有当所有消息处理成功后才调用consumer.commit()
  • 异常处理:捕获处理过程中的异常,确保失败时不提交偏移量

总结

Kafka 保证数据不丢失需要三个层面的协同工作:

  1. 生产者:通过acks='all'等待所有同步副本确认,设置重试机制,并使用同步发送确保消息成功投递
  2. 集群:通过多副本机制,设置合理的副本数和最小同步副本数,防止非同步副本成为领导者
  3. 消费者:通过手动提交偏移量,确保消息处理成功后再提交,并妥善处理异常情况

这三个层面的配置相互配合,才能构建一个可靠的 Kafka 系统,确保数据在各种异常情况下都不会丢失。

http://www.dtcms.com/a/297156.html

相关文章:

  • 机器学习中knn的详细知识点
  • Linux725 磁盘阵列RAID0 RAID1
  • OneCode3.0 Gallery 组件前后端映射机制:从注解配置到前端渲染的完整链路
  • 应用代码解释
  • 从零开始的云计算生活——番外6,使用zabbix对中间件监控
  • pycharm安装教程-PyCharm2023安装详细步骤【MAC版】【安装包自取】
  • Spring 策略模式实现
  • 配置Mac/Linux终端启动执行脚本
  • 电子电子架构 --- 软件项目的开端:裁剪
  • 现代 C++ 开发工作流(VSCode / Cursor)
  • ubuntu/centos系统ping 不通域名的解决方案
  • 清理DNS缓存
  • Ubuntu 环境下创建并启动一个 MediaMTX 的 systemd 服务
  • 缓存HDC内容用于后续Direct2D绘制.
  • 数据仓库深度探索系列 | 开篇:开启数仓建设新征程
  • 验证回文串-leetcode
  • Nginx简单介绍
  • 【STM32】Keil + FreeRTOS + HAL DMA + UART 空闲中断 接收异常
  • 【矩阵专题】Leetcode48.旋转图像(Hot100)
  • leetcode_122 买卖股票的最佳时机II
  • STM32与ADS1220实现多通道数据采集的完整分析和源程序
  • Comfyui中Upscale Image By 几种放大方法的区别
  • Java研学-RabbitMQ(三)
  • Centos7安装rabbitmq
  • RabbitMQ—HAProxy负载均衡
  • React性能优化终极指南:memo、useCallback、useMemo全解析
  • Ubuntu22 上,用C++ gSoap 创建一个简单的webservice
  • NineData 数据库 DevOps 全面支持 GaussDB,国产化管理再升级!
  • Spring Boot 自动装配底层源码实现详解
  • 国产DevOps平台Gitee:如何重塑中国企业研发效能新格局