当前位置: 首页 > news >正文

rdKafka驾驭手册:从青铜到王者的异步消息屠龙术

简介

rdkafka 是 Rust 语言的 Apache Kafka 客户端库,它基于 librdkafka(一个用 C 语言编写的高性能 Kafka 客户端库)构建。rdkafka 提供了与 Kafka 集群交互的完整功能,包括生产者、消费者、管理员操作等。

本文档将详细介绍如何使用 rdkafka 进行各种 Kafka 操作,并提供清晰的代码示例。

目录

  1. 安装与配置
  2. 基本概念
  3. 生产者 (Producer)
  4. 消费者 (Consumer)
  5. 管理员 (Admin)
  6. 高级功能
  7. 错误处理
  8. 性能优化
  9. 最佳实践

安装与配置

添加依赖

Cargo.toml 文件中添加 rdkafka 依赖:

[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "ssl", "sasl"] }
tokio = { version = "1.0", features = ["full"] }

系统依赖

在 Windows 上,需要安装以下依赖:

  1. GNU toolchain:使用 MSYS2 安装 MinGW

    • 下载并安装 MSYS2:https://www.msys2.org/
    • 更新依赖:pacman -Syu
    • 安装工具链:pacman -S --needed base-devel mingw-w64-x86_64-toolchain
  2. CMake:从 https://cmake.org/download/ 下载并安装

  3. 其他可选依赖

    • zlib:压缩支持
    • libssl-dev:SSL 支持
    • libsasl2-dev:SASL 支持
    • libzstd-dev:Zstandard 压缩支持

在 Linux 上,可以使用包管理器安装:

# Ubuntu/Debian
sudo apt-get install build-essential cmake libssl-dev libsasl2-dev libzstd-dev# CentOS/RHEL
sudo yum install gcc-c++ cmake openssl-devel cyrus-sasl-devel libzstd-devel

基本概念

Kafka 架构

  • Producer:消息生产者,向 Kafka broker 发送消息的客户端
  • Consumer:消息消费者,从 Kafka broker 读取消息的客户端
  • Consumer Group:消费者组,由多个 consumer 组成
  • Broker:一台 Kafka 服务器就是一个 broker
  • Topic:消息类别,可以理解为一个队列
  • Partition:为了实现扩展性,一个 topic 可以分为多个 partition
  • Replica:副本,一个 topic 的每个分区都有若干个副本
  • Leader:每个分区多个副本的"主",生产者发送数据的对象
  • Follower:每个分区多个副本中的"从",实时从 Leader 中同步数据

rdkafka 组件

rdkafka 提供了以下主要组件:

  • FutureProducer:异步生产者
  • BaseProducer:同步生产者
  • StreamConsumer:基于流的消费者
  • BaseConsumer:基础消费者
  • AdminClient:管理员客户端,用于管理 Kafka 集群

生产者 (Producer)

创建生产者

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, BaseProducer};
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn create_producer(brokers: &str) -> FutureProducer {let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").set("acks", "all").create().expect("Producer creation error");producer
}

发送消息(异步)

use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;async fn send_message_async(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = FutureRecord::to(topic).key(key).payload(payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send message: {}", e);Err(e)}}
}

发送消息(同步)

use rdkafka::producer::BaseProducer;
use rdkafka::producer::BaseRecord;
use rdkafka::message::ToBytes;fn send_message_sync(producer: &BaseProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = BaseRecord::to(topic).key(key).payload(payload);producer.send(record)?;// 确保所有消息都已发送producer.flush(Duration::from_secs(1));Ok(())
}

批量发送消息

use rdkafka::producer::FutureRecord;
use futures::future::try_join_all;async fn send_messages_batch(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {let futures = messages.into_iter().map(|(key, payload)| {let record = FutureRecord::to(topic).key(key).payload(payload);producer.send(record, Duration::from_secs(0))});let results = try_join_all(futures).await;match results {Ok(_) => {println!("All messages sent successfully");Ok(())}Err(e) => {eprintln!("Failed to send some messages: {}", e);Err(e.into())}}
}

带消息头的消息发送

use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;async fn send_message_with_headers(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,headers: Vec<(String, Vec<u8>)>,
) -> Result<(), rdkafka::error::KafkaError> {let mut owned_headers = OwnedHeaders::new();for (key, value) in headers {owned_headers = owned_headers.add(Header::new(key, value));}let record = FutureRecord::to(topic).key(key).payload(payload).headers(owned_headers);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message with headers sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send message with headers: {}", e);Err(e)}}
}

指定分区的消息发送

use rdkafka::producer::FutureRecord;async fn send_message_to_partition(producer: &FutureProducer,topic: &str,partition: i32,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = FutureRecord::to(topic).partition(partition).key(key).payload(payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message sent to partition {} successfully", partition);Ok(())}Err((e, _)) => {eprintln!("Failed to send message to partition {}: {}", partition, e);Err(e)}}
}

消费者 (Consumer)

创建消费者

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{StreamConsumer, BaseConsumer, CommitMode, Consumer};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;
use std::time::Duration;async fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer {let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumer
}

基础消费者(同步)

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use std::time::Duration;fn consume_messages_sync(consumer: &BaseConsumer, timeout_ms: u64) {loop {match consumer.poll(Duration::from_millis(timeout_ms)) {Some(Ok(msg)) => {match msg.view() {Some(payload) => {println!("Received message: {:?}", payload);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Sync).expect("Failed to commit message");}None => {println!("Received empty message");}}}Some(Err(e)) => {eprintln!("Error while consuming: {}", e);}None => {// 超时,没有消息}}}
}

流式消费者(异步)

use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::message::Message;
use futures::StreamExt;async fn consume_messages_stream(consumer: &StreamConsumer) {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {println!("Received message: {:?}", payload);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Async).expect("Failed to commit message");}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}

从特定分区和偏移量消费

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;fn consume_from_offset(consumer: &BaseConsumer,topic: &str,partition: i32,offset: i64,
) {let mut tpl = TopicPartitionList::new();tpl.add_partition_offset(topic, partition, Offset::Offset(offset)).expect("Failed to add partition offset");consumer.assign(&tpl).expect("Failed to assign partition");loop {match consumer.poll(Duration::from_millis(100)) {Some(Ok(msg)) => {match msg.view() {Some(payload) => {println!("Received message from partition {} at offset {}: {:?}", partition, msg.offset(), payload);}None => {println!("Received empty message");}}}Some(Err(e)) => {eprintln!("Error while consuming: {}", e);}None => {// 超时,没有消息}}}
}

获取消费者位置

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;fn get_consumer_position(consumer: &BaseConsumer, topic: &str, partition: i32) -> i64 {let mut tpl = TopicPartitionList::new();tpl.add_partition(topic, partition);let position = consumer.position(&tpl).expect("Failed to get consumer position");for elem in position.elements() {if elem.topic() == topic && elem.partition() == partition {return elem.offset().to_raw();}}-1
}

获取水印偏移量

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;fn get_watermark_offsets(consumer: &BaseConsumer,topic: &str,partition: i32,
) -> (i64, i64) {let mut tpl = TopicPartitionList::new();tpl.add_partition(topic, partition);let (low, high) = consumer.get_watermark_offsets(&tpl, Duration::from_secs(1)).expect("Failed to get watermark offsets");(low, high)
}

消费者组管理

use rdkafka::consumer::{StreamConsumer, ConsumerGroupMetadata};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;fn get_consumer_group_metadata(consumer: &StreamConsumer) -> ConsumerGroupMetadata {consumer.group_metadata().expect("Failed to get consumer group metadata")
}

管理员 (Admin)

创建管理员客户端

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication, AlterConfig, ConfigSource};
use rdkafka::config::ClientConfig;
use std::time::Duration;async fn create_admin_client(brokers: &str) -> AdminClient {let admin_client: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");admin_client
}

创建主题

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};async fn create_topic(admin_client: &AdminClient,topic_name: &str,num_partitions: i32,replication_factor: i32,
) -> Result<(), rdkafka::error::KafkaError> {let new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}

创建主题时的参数配置

在创建 Kafka 主题时,除了基本的分区数和复制因子外,还可以设置许多其他配置参数。这些参数可以通过 NewTopicset 方法来设置。

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;async fn create_topic_with_configs(admin_client: &AdminClient,topic_name: &str,num_partitions: i32,replication_factor: i32,configs: &HashMap<String, String>,
) -> Result<(), rdkafka::error::KafkaError> {let mut new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));// 设置主题配置参数for (key, value) in configs {new_topic = new_topic.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully with custom configs", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}

主题配置参数详解

以下是创建 Kafka 主题时可以设置的所有参数及其说明:

1. 日志清理策略参数
参数名默认值说明可选值
cleanup.policydelete日志清理策略,决定如何处理旧数据delete(基于时间或大小删除)、compact(日志压缩)、compact,delete(同时启用压缩和删除)
delete.retention.ms86400000 (24小时)已删除消息的保留时间(仅当 cleanup.policy 包含 delete 时有效)正整数(毫秒)
min.compaction.lag.ms0消息被压缩前的最小等待时间(仅当 cleanup.policy 包含 compact 时有效)正整数(毫秒)
max.compaction.lag.ms9223372036854775807消息被压缩前的最大等待时间(仅当 cleanup.policy 包含 compact 时有效)正整数(毫秒)
min.cleanable.dirty.ratio0.5日志清理的触发阈值,当日志中未清理消息的比例超过此值时触发清理0到1之间的浮点数
2. 日志保留策略参数
参数名默认值说明可选值
retention.ms604800000 (7天)消息保留时间,超过此时间的消息将被删除正整数(毫秒),-1表示永不删除
retention.bytes-1主题的最大大小,超过此大小的消息将被删除正整数(字节),-1表示无限制
retention.check.interval.ms300000 (5分钟)日志保留策略的检查间隔正整数(毫秒)
segment.ms604800000 (7天)日志段滚动的时间间隔,超过此时间将创建新的日志段正整数(毫秒)
segment.bytes1073741824 (1GB)单个日志段的最大大小正整数(字节)
segment.index.bytes10485760 (10MB)日志索引文件的最大大小正整数(字节)
segment.jitter.ms0日志段滚动时间的最大随机抖动,用于避免同时滚动多个日志段正整数(毫秒)
3. 消息大小和时间戳参数
参数名默认值说明可选值
max.message.bytes1048588 (1MB)单个消息的最大大小正整数(字节)
message.timestamp.typeCreateTime消息时间戳类型CreateTime(消息创建时间)、LogAppendTime(日志追加时间)
message.timestamp.difference.max.ms9223372036854775807允许的消息时间戳与 broker 时间戳之间的最大差异正整数(毫秒)
message.downconversion.enabletrue是否启用消息格式降级转换truefalse
4. 日志刷新和同步参数
参数名默认值说明可选值
flush.messages9223372036854775807日志刷新的消息数量阈值,超过此数量的消息将刷新到磁盘正整数,-1表示禁用
flush.ms9223372036854775807日志刷新的时间间隔,超过此时间将刷新到磁盘正整数(毫秒),-1表示禁用
unclean.leader.election.enablefalse是否允许在数据丢失的情况下选举新的 leadertruefalse
5. 索引和缓存参数
参数名默认值说明可选值
index.interval.bytes4096 (4KB)索引项之间的字节数正整数(字节)
preallocatefalse是否预分配日志文件truefalse
6. 压缩参数
参数名默认值说明可选值
compression.typeproducer消息压缩类型producer(使用生产者指定的压缩类型)、nonegzipsnappylz4zstd
7. 副本和领导者参数
参数名默认值说明可选值
min.insync.replicas1当生产者设置 acks=all 时,必须成功写入的最小副本数正整数(1 ≤ 值 ≤ 副本因子)
leader.replication.throttled.replicas被限制复制的副本列表副本ID列表,例如:0:1,1:2
8. 远程日志存储参数
参数名默认值说明可选值
remote.log.storage.enablefalse是否启用远程日志存储truefalse
local.log.retention.ms-2本地日志保留时间,-2表示使用 retention.ms 的值正整数(毫秒),-2表示使用 retention.ms 的值
local.log.retention.bytes-2本地日志保留大小,-2表示使用 retention.bytes 的值正整数(字节),-2表示使用 retention.bytes 的值

创建带配置的主题示例

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;async fn create_topic_with_example_configs(admin_client: &AdminClient,topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {let mut configs = HashMap::new();// 设置日志清理策略为删除和压缩configs.insert("cleanup.policy".to_string(), "delete,compact".to_string());// 设置消息保留时间为3天configs.insert("retention.ms".to_string(), "259200000".to_string());// 设置单个日志段的最大大小为512MBconfigs.insert("segment.bytes".to_string(), "536870912".to_string());// 设置单个消息的最大大小为5MBconfigs.insert("max.message.bytes".to_string(), "5242880".to_string());// 设置消息时间戳类型为日志追加时间configs.insert("message.timestamp.type".to_string(), "LogAppendTime".to_string());// 设置压缩类型为lz4configs.insert("compression.type".to_string(), "lz4".to_string());// 设置必须成功写入的最小副本数为2configs.insert("min.insync.replicas".to_string(), "2".to_string());let mut new_topic = NewTopic::new(topic_name, 6, TopicReplication::Fixed(3));// 应用配置for (key, value) in &configs {new_topic = new_topic.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully with custom configs", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}

删除主题

use rdkafka::admin::{AdminClient, AdminOptions};async fn delete_topic(admin_client: &AdminClient,topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.delete_topics(&[topic_name], &options).await;match result {Ok(_) => {println!("Topic '{}' deleted successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to delete topic '{}': {}", topic_name, e);Err(e)}}
}

列出所有主题

use rdkafka::admin::{AdminClient, AdminOptions, Metadata};async fn list_topics(admin_client: &AdminClient) -> Result<Vec<String>, rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let metadata: Metadata = admin_client.inner().fetch_metadata(None, Duration::from_secs(5))?;let topics: Vec<String> = metadata.topics().iter().map(|topic| topic.name().to_string()).collect();Ok(topics)
}

获取主题详情

use rdkafka::admin::{AdminClient, AdminOptions, Metadata};async fn get_topic_details(admin_client: &AdminClient,topic_name: &str,
) -> Result<(i32, i32), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let metadata: Metadata = admin_client.inner().fetch_metadata(Some(topic_name), Duration::from_secs(5))?;for topic in metadata.topics() {if topic.name() == topic_name {let num_partitions = topic.partitions().len() as i32;let replication_factor = topic.partitions().first().map(|p| p.replicas().len() as i32).unwrap_or(0);return Ok((num_partitions, replication_factor));}}Err(rdkafka::error::KafkaError::UnknownTopicOrPartition)
}

修改主题配置

use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ConfigSource};async fn alter_topic_config(admin_client: &AdminClient,topic_name: &str,config_updates: Vec<(String, String)>,
) -> Result<(), rdkafka::error::KafkaError> {let mut alter_config = AlterConfig::new(ConfigSource::Topic(topic_name.to_string()));for (key, value) in config_updates {alter_config = alter_config.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.alter_configs(&[alter_config], &options).await;match result {Ok(_) => {println!("Topic '{}' config updated successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to update topic '{}' config: {}", topic_name, e);Err(e)}}
}

创建分区

use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions};async fn create_partitions(admin_client: &AdminClient,topic_name: &str,new_total_count: i32,
) -> Result<(), rdkafka::error::KafkaError> {let new_partitions = NewPartitions::new(topic_name, new_total_count);let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_partitions(&[new_partitions], &options).await;match result {Ok(_) => {println!("Partitions for topic '{}' created successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create partitions for topic '{}': {}", topic_name, e);Err(e)}}
}

列出消费者组

use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};async fn list_consumer_groups(admin_client: &AdminClient) -> Result<Vec<GroupListing>, rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let groups = admin_client.list_consumer_groups(&options).await?;Ok(groups)
}

描述消费者组

use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};async fn describe_consumer_group(admin_client: &AdminClient,group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let group_description = admin_client.describe_consumer_groups(&[group_id], &options).await?;for description in group_description {println!("Group ID: {}", description.group_id());println!("State: {}", description.state());println!("Coordinator: {:?}", description.coordinator());for member in description.members() {println!("Member ID: {}", member.member_id());println!("Client ID: {}", member.client_id());println!("Host: {}", member.client_host());for assignment in member.assignment() {println!("Topic: {}, Partition: {}", assignment.topic(), assignment.partition());}}}Ok(())
}

删除消费者组

use rdkafka::admin::{AdminClient, AdminOptions};async fn delete_consumer_group(admin_client: &AdminClient,group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.delete_consumer_groups(&[group_id], &options).await;match result {Ok(_) => {println!("Consumer group '{}' deleted successfully", group_id);Ok(())}Err(e) => {eprintln!("Failed to delete consumer group '{}': {}", group_id, e);Err(e)}}
}

高级功能

事务支持

use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn create_transactional_producer(brokers: &str,transactional_id: &str,
) -> TransactionalProducer {let producer: TransactionalProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("transactional.id", transactional_id).set("enable.idempotence", "true").create().expect("Transactional producer creation failed");producer
}async fn send_messages_in_transaction(producer: &TransactionalProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {// 初始化事务producer.init_transactions(Duration::from_secs(5)).await?;// 开始事务producer.begin_transaction().await?;// 发送消息for (key, payload) in messages {let record = FutureRecord::to(topic).key(key).payload(payload);producer.send(record, Duration::from_secs(0)).await?;}// 提交事务producer.commit_transaction(Duration::from_secs(5)).await?;Ok(())
}

精确一次语义 (Exactly-Once Semantics)

use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::message::Message;
use std::time::Duration;async fn exactly_once_processing(input_topic: &str,output_topic: &str,brokers: &str,group_id: &str,transactional_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {// 创建事务性生产者let producer: TransactionalProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("transactional.id", transactional_id).set("enable.idempotence", "true").create().expect("Transactional producer creation failed");// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.auto.commit", "false").set("isolation.level", "read_committed").create().expect("Consumer creation failed");consumer.subscribe(&[input_topic]).expect("Can't subscribe to input topic");// 初始化事务producer.init_transactions(Duration::from_secs(5)).await?;let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(input_msg) => {// 开始事务producer.begin_transaction().await?;// 处理消息let processed_payload = process_message(input_msg.view().unwrap_or(&[]));// 发送处理后的消息let record = FutureRecord::to(output_topic).key(input_msg.key()).payload(&processed_payload);producer.send(record, Duration::from_secs(0)).await?;// 提交消费者偏移量producer.send_offsets_to_transaction(&consumer.consumer_group_metadata()?,Duration::from_secs(5),).await?;// 提交事务producer.commit_transaction(Duration::from_secs(5)).await?;}Err(e) => {eprintln!("Error while consuming: {}", e);// 中止事务producer.abort_transaction(Duration::from_secs(5)).await?;}}}Ok(())
}fn process_message(input: &[u8]) -> Vec<u8> {// 这里实现消息处理逻辑// 示例:简单地将消息转换为大写input.to_uppercase()
}

自定义分区器

use rdkafka::producer::{DefaultProducerContext, FutureProducer, ProducerContext};
use rdkafka::config::ClientConfig;
use rdkafka::message::{Message, ToBytes};
use rdkafka::util::Timeout;
use std::time::Duration;struct CustomPartitioner;impl ProducerContext for CustomPartitioner {fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: i32, _key_data: Option<&[u8]>) -> i32 {// 自定义分区逻辑// 示例:基于键的哈希值进行分区if let Some(key) = key {let hash = key.iter().fold(0, |acc, &x| acc.wrapping_add(x as usize));(hash % partition_count as usize) as i32} else {// 如果没有键,使用轮询策略let current_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();(current_time % partition_count as u64) as i32}}
}async fn create_producer_with_custom_partitioner(brokers: &str) -> FutureProducer<CustomPartitioner> {let producer: FutureProducer<CustomPartitioner> = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create_with_context(CustomPartitioner).expect("Producer creation error");producer
}

自定义序列化器

use serde::{Serialize, Deserialize};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::error::Error;#[derive(Serialize, Deserialize, Debug)]
struct MyData {id: u32,name: String,timestamp: i64,
}trait MessageSerializer<T> {fn serialize(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>;
}trait MessageDeserializer<T> {fn deserialize(&self, bytes: &[u8]) -> Result<T, Box<dyn Error>>;
}struct JsonSerializer;impl MessageSerializer<MyData> for JsonSerializer {fn serialize(&self, data: &MyData) -> Result<Vec<u8>, Box<dyn Error>> {Ok(serde_json::to_vec(data)?)}
}struct JsonDeserializer;impl MessageDeserializer<MyData> for JsonDeserializer {fn deserialize(&self, bytes: &[u8]) -> Result<MyData, Box<dyn Error>> {Ok(serde_json::from_slice(bytes)?)}
}async fn send_serialized_message(producer: &FutureProducer,topic: &str,key: Option<&str>,data: &MyData,serializer: &impl MessageSerializer<MyData>,
) -> Result<(), Box<dyn Error>> {let payload = serializer.serialize(data)?;let record = FutureRecord::to(topic).key(key).payload(&payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Serialized message sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send serialized message: {}", e);Err(Box::new(e))}}
}

消息拦截器

use rdkafka::producer::{FutureProducer, ProducerContext};
use rdkafka::consumer::{StreamConsumer, ConsumerContext};
use rdkafka::message::{Message, OwnedMessage};
use rdkafka::config::ClientConfig;
use std::time::Duration;struct LoggingProducerContext;impl ProducerContext for LoggingProducerContext {fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult) {match delivery_result {Ok(message) => {println!("Message delivered to topic {}, partition {} at offset {}",message.topic(), message.partition(), message.offset());}Err((e, message)) => {eprintln!("Failed to deliver message to topic {}, partition {}: {}",message.topic(), message.partition(), e);}}}
}struct LoggingConsumerContext;impl ConsumerContext for LoggingConsumerContext {fn message_consumed(&self, message: &OwnedMessage) {match message.payload_view::<str>() {Ok(Some(payload)) => {println!("Message consumed from topic {}, partition {} at offset {}: {}",message.topic(), message.partition(), message.offset(), payload);}Ok(None) => {println!("Empty message consumed from topic {}, partition {} at offset {}",message.topic(), message.partition(), message.offset());}Err(e) => {eprintln!("Error while consuming message from topic {}, partition {}: {}",message.topic(), message.partition(), e);}}}
}async fn create_producer_with_interceptor(brokers: &str) -> FutureProducer<LoggingProducerContext> {let producer: FutureProducer<LoggingProducerContext> = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create_with_context(LoggingProducerContext).expect("Producer creation error");producer
}async fn create_consumer_with_interceptor(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer<LoggingConsumerContext> {let consumer: StreamConsumer<LoggingConsumerContext> = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create_with_context(LoggingConsumerContext).expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumer
}

消息过滤

use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;async fn filter_messages(consumer: &StreamConsumer,filter_fn: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {if filter_fn(payload) {println!("Filtered message: {:?}", payload);// 处理符合条件的消息}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例过滤函数
fn example_filter(payload: &[u8]) -> bool {// 只处理包含特定字符串的消息let payload_str = std::str::from_utf8(payload).unwrap_or("");payload_str.contains("important")
}

消息转换

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;async fn transform_messages(input_consumer: &StreamConsumer,output_producer: &FutureProducer,input_topic: &str,output_topic: &str,transform_fn: impl Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
) {let mut message_stream = input_consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {// 转换消息let transformed_payload = transform_fn(payload);// 发送转换后的消息let record = FutureRecord::to(output_topic).key(msg.key()).payload(&transformed_payload);match output_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Transformed message sent successfully");}Err((e, _)) => {eprintln!("Failed to send transformed message: {}", e);}}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例转换函数
fn example_transform(payload: &[u8]) -> Vec<u8> {// 将消息转换为大写let payload_str = std::str::from_utf8(payload).unwrap_or("");payload_str.to_uppercase().into_bytes()
}

错误处理

基本错误处理

use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn send_message_with_error_handling(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), Box<dyn std::error::Error>> {let record = FutureRecord::to(topic).key(key).payload(payload);match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message sent successfully");Ok(())}Err((e, _)) => {match e {KafkaError::MessageProduction(RDKafkaError::BrokerTransportFailure(_)) => {eprintln!("Broker transport failure: {}", e);// 可以尝试重新连接或使用备用 broker}KafkaError::MessageProduction(RDKafkaError::QueueFull) => {eprintln!("Producer queue is full: {}", e);// 可以等待一段时间后重试}KafkaError::MessageProduction(RDKafkaError::MessageSizeTooLarge) => {eprintln!("Message size too large: {}", e);// 可以尝试压缩消息或减小消息大小}KafkaError::MessageProduction(RDKafkaError::UnknownTopicOrPartition) => {eprintln!("Unknown topic or partition: {}", e);// 可以尝试创建主题或检查主题名称}_ => {eprintln!("Failed to send message: {}", e);}}Err(Box::new(e))}}
}

重试机制

use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
use tokio::time::sleep;async fn send_message_with_retry(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,max_retries: u32,retry_delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {let mut retries = 0;loop {let record = FutureRecord::to(topic).key(key).payload(payload);match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message sent successfully");return Ok(());}Err((e, _)) => {retries += 1;if retries >= max_retries {eprintln!("Failed to send message after {} retries: {}", max_retries, e);return Err(Box::new(e));}eprintln!("Failed to send message (attempt {}/{}): {}, retrying in {:?}...",retries, max_retries, e, retry_delay);sleep(retry_delay).await;}}}
}

死信队列处理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;async fn dead_letter_queue_handler(main_consumer: &StreamConsumer,dlq_producer: &FutureProducer,main_topic: &str,dlq_topic: &str,error_handler: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {let mut message_stream = main_consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {// 尝试处理消息if error_handler(payload) {// 处理成功println!("Message processed successfully");} else {// 处理失败,发送到死信队列let record = FutureRecord::to(dlq_topic).key(msg.key()).payload(payload);match dlq_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Failed message sent to dead letter queue");}Err((e, _)) => {eprintln!("Failed to send message to dead letter queue: {}", e);}}}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例错误处理函数
fn example_error_handler(payload: &[u8]) -> bool {// 模拟处理消息,有时会失败let payload_str = std::str::from_utf8(payload).unwrap_or("");// 如果消息包含 "error",则处理失败if payload_str.contains("error") {return false;}// 否则处理成功true
}

性能优化

批量处理

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use futures::future::try_join_all;
use std::time::Duration;async fn batch_send_messages(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {for chunk in messages.chunks(batch_size) {let futures = chunk.iter().map(|(key, payload)| {let record = FutureRecord::to(topic).key(*key).payload(*payload);producer.send(record, Duration::from_secs(0))});let results = try_join_all(futures).await;match results {Ok(_) => {println!("Batch of {} messages sent successfully", chunk.len());}Err(e) => {eprintln!("Failed to send batch of messages: {}", e);return Err(e.into());}}}Ok(())
}

异步并发处理

use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;
use tokio::task;async fn concurrent_message_processing(consumer: &StreamConsumer,concurrency_level: usize,process_fn: impl Fn(&[u8]) + Send + Sync + 'static,
) {let mut message_stream = consumer.stream();let mut tasks = Vec::with_capacity(concurrency_level);while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {let payload = payload.to_vec();let process_fn = &process_fn;// 如果已经达到并发限制,等待一个任务完成if tasks.len() >= concurrency_level {if let Some(task_result) = tasks.pop() {let _ = task_result.await;}}// 启动新的处理任务let task = task::spawn(async move {process_fn(&payload);});tasks.push(task);}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}// 等待所有剩余任务完成for task in tasks {let _ = task.await;}
}// 示例处理函数
fn example_process_fn(payload: &[u8]) {// 模拟耗时处理let payload_str = std::str::from_utf8(payload).unwrap_or("");println!("Processing message: {}", payload_str);// 模拟处理耗时std::thread::sleep(std::time::Duration::from_millis(100));
}

连接池管理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::config::ClientConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;struct KafkaConnectionPool {producers: Arc<Mutex<HashMap<String, FutureProducer>>>,consumers: Arc<Mutex<HashMap<String, StreamConsumer>>>,brokers: String,
}impl KafkaConnectionPool {fn new(brokers: &str) -> Self {Self {producers: Arc::new(Mutex::new(HashMap::new())),consumers: Arc::new(Mutex::new(HashMap::new())),brokers: brokers.to_string(),}}fn get_producer(&self, client_id: &str) -> FutureProducer {let mut producers = self.producers.lock().unwrap();if let Some(producer) = producers.get(client_id) {return producer.clone();}let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", &self.brokers).set("client.id", client_id).set("message.timeout.ms", "5000").create().expect("Producer creation error");producers.insert(client_id.to_string(), producer.clone());producer}fn get_consumer(&self, group_id: &str, topics: &[&str]) -> StreamConsumer {let key = format!("{}:{}", group_id, topics.join(","));let mut consumers = self.consumers.lock().unwrap();if let Some(consumer) = consumers.get(&key) {return consumer.clone();}let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", &self.brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumers.insert(key, consumer.clone());consumer}
}

缓冲区优化

use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::time::Duration;async fn optimized_send_messages(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,linger_ms: u32,batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {// 配置生产者以优化批量发送let mut config = producer.client().config().clone();config.set("linger.ms", linger_ms.to_string());config.set("batch.size", batch_size.to_string());// 创建新的生产者实例let optimized_producer: FutureProducer = config.create().expect("Optimized producer creation error");// 发送消息for (key, payload) in messages {let record = FutureRecord::to(topic).key(key).payload(payload);match optimized_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {// 消息已添加到缓冲区}Err((e, _)) => {eprintln!("Failed to send message: {}", e);return Err(e);}}}// 刷新缓冲区,确保所有消息都已发送optimized_producer.flush(Duration::from_secs(5));Ok(())
}

最佳实践

资源管理

use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::admin::AdminClient;
use std::sync::Arc;struct KafkaClient {producer: Arc<FutureProducer>,consumer: Arc<StreamConsumer>,admin: Arc<AdminClient>,
}impl KafkaClient {fn new(brokers: &str, group_id: &str, topics: &[&str]) -> Self {let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create().expect("Producer creation error");let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");let admin: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");Self {producer: Arc::new(producer),consumer: Arc::new(consumer),admin: Arc::new(admin),}}fn producer(&self) -> Arc<FutureProducer> {self.producer.clone()}fn consumer(&self) -> Arc<StreamConsumer> {self.consumer.clone()}fn admin(&self) -> Arc<AdminClient> {self.admin.clone()}
}// 使用示例
async fn use_kafka_client() {let brokers = "localhost:9092";let group_id = "my-group";let topics = &["my-topic"];let kafka_client = KafkaClient::new(brokers, group_id, topics);// 使用生产者let producer = kafka_client.producer();// ... 使用生产者发送消息// 使用消费者let consumer = kafka_client.consumer();// ... 使用消费者接收消息// 使用管理员let admin = kafka_client.admin();// ... 使用管理员执行管理操作
}

配置管理

use serde::{Deserialize, Serialize};
use std::env;#[derive(Debug, Serialize, Deserialize)]
struct KafkaConfig {brokers: String,group_id: String,topics: Vec<String>,producer_config: ProducerConfig,consumer_config: ConsumerConfig,admin_config: AdminConfig,
}#[derive(Debug, Serialize, Deserialize)]
struct ProducerConfig {linger_ms: u32,batch_size: usize,message_timeout_ms: u32,acks: String,retries: u32,
}#[derive(Debug, Serialize, Deserialize)]
struct ConsumerConfig {session_timeout_ms: u32,auto_commit: bool,auto_offset_reset: String,max_poll_records: i32,
}#[derive(Debug, Serialize, Deserialize)]
struct AdminConfig {request_timeout_ms: u32,
}impl KafkaConfig {fn from_env() -> Result<Self, env::VarError> {Ok(Self {brokers: env::var("KAFKA_BROKERS")?,group_id: env::var("KAFKA_GROUP_ID")?,topics: env::var("KAFKA_TOPICS")?.split(',').map(|s| s.to_string()).collect(),producer_config: ProducerConfig {linger_ms: env::var("KAFKA_PRODUCER_LINGER_MS")?.parse().unwrap_or(10),batch_size: env::var("KAFKA_PRODUCER_BATCH_SIZE")?.parse().unwrap_or(16384),message_timeout_ms: env::var("KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS")?.parse().unwrap_or(5000),acks: env::var("KAFKA_PRODUCER_ACKS")?.parse().unwrap_or_else(|_| "all".to_string()),retries: env::var("KAFKA_PRODUCER_RETRIES")?.parse().unwrap_or(3),},consumer_config: ConsumerConfig {session_timeout_ms: env::var("KAFKA_CONSUMER_SESSION_TIMEOUT_MS")?.parse().unwrap_or(10000),auto_commit: env::var("KAFKA_CONSUMER_AUTO_COMMIT")?.parse().unwrap_or(false),auto_offset_reset: env::var("KAFKA_CONSUMER_AUTO_OFFSET_RESET")?.parse().unwrap_or_else(|_| "earliest".to_string()),max_poll_records: env::var("KAFKA_CONSUMER_MAX_POLL_RECORDS")?.parse().unwrap_or(500),},admin_config: AdminConfig {request_timeout_ms: env::var("KAFKA_ADMIN_REQUEST_TIMEOUT_MS")?.parse().unwrap_or(5000),},})}fn to_producer_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("linger.ms", self.producer_config.linger_ms.to_string());config.set("batch.size", self.producer_config.batch_size.to_string());config.set("message.timeout.ms", self.producer_config.message_timeout_ms.to_string());config.set("acks", &self.producer_config.acks);config.set("retries", self.producer_config.retries.to_string());config}fn to_consumer_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("group.id", &self.group_id);config.set("session.timeout.ms", self.consumer_config.session_timeout_ms.to_string());config.set("enable.auto.commit", self.consumer_config.auto_commit.to_string());config.set("auto.offset.reset", &self.consumer_config.auto_offset_reset);config.set("max.poll.records", self.consumer_config.max_poll_records.to_string());config}fn to_admin_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("request.timeout.ms", self.admin_config.request_timeout_ms.to_string());config}
}// 使用示例
async fn use_config() -> Result<(), Box<dyn std::error::Error>> {// 从环境变量加载配置let kafka_config = KafkaConfig::from_env()?;// 创建生产者let producer: rdkafka::producer::FutureProducer = kafka_config.to_producer_config().create()?;// 创建消费者let consumer: rdkafka::consumer::StreamConsumer = kafka_config.to_consumer_config().create()?;// 创建管理员let admin: rdkafka::admin::AdminClient = kafka_config.to_admin_config().create()?;// 使用客户端...Ok(())
}

监控和指标

use rdkafka::consumer::StreamConsumer;
use rdkafka::producer::FutureProducer;
use rdkafka::statistics::Statistics;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};struct KafkaMetrics {messages_produced: u64,messages_consumed: u64,bytes_produced: u64,bytes_consumed: u64,produce_errors: u64,consume_errors: u64,last_update: Instant,
}impl KafkaMetrics {fn new() -> Self {Self {messages_produced: 0,messages_consumed: 0,bytes_produced: 0,bytes_consumed: 0,produce_errors: 0,consume_errors: 0,last_update: Instant::now(),}}fn record_message_produced(&mut self, size: usize) {self.messages_produced += 1;self.bytes_produced += size as u64;self.last_update = Instant::now();}fn record_message_consumed(&mut self, size: usize) {self.messages_consumed += 1;self.bytes_consumed += size as u64;self.last_update = Instant::now();}fn record_produce_error(&mut self) {self.produce_errors += 1;self.last_update = Instant::now();}fn record_consume_error(&mut self) {self.consume_errors += 1;self.last_update = Instant::now();}fn print_metrics(&self) {println!("Kafka Metrics:");println!("  Messages produced: {}", self.messages_produced);println!("  Messages consumed: {}", self.messages_consumed);println!("  Bytes produced: {}", self.bytes_produced);println!("  Bytes consumed: {}", self.bytes_consumed);println!("  Produce errors: {}", self.produce_errors);println!("  Consume errors: {}", self.consume_errors);println!("  Last update: {:?}", self.last_update.elapsed());}
}struct MonitoredProducer {producer: FutureProducer,metrics: Arc<Mutex<KafkaMetrics>>,
}impl MonitoredProducer {fn new(producer: FutureProducer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {Self { producer, metrics }}async fn send(&self,record: rdkafka::producer::FutureRecord<&[u8], &[u8]>,timeout: Duration,) -> Result<(), (rdkafka::error::KafkaError, rdkafka::producer::FutureRecord<&[u8], &[u8]>)> {let size = record.payload.map(|p| p.len()).unwrap_or(0);match self.producer.send(record, timeout).await {Ok(_) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_message_produced(size);Ok(())}Err(e) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_produce_error();Err(e)}}}
}struct MonitoredConsumer {consumer: StreamConsumer,metrics: Arc<Mutex<KafkaMetrics>>,
}impl MonitoredConsumer {fn new(consumer: StreamConsumer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {Self { consumer, metrics }}async fn consume(&self) {let mut message_stream = self.consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {let size = msg.payload().map(|p| p.len()).unwrap_or(0);let mut metrics = self.metrics.lock().unwrap();metrics.record_message_consumed(size);// 处理消息...}Err(e) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_consume_error();eprintln!("Error while consuming: {}", e);}}}}
}// 使用示例
async fn use_monitored_clients() {let metrics = Arc::new(Mutex::new(KafkaMetrics::new()));// 创建生产者let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", "localhost:9092").set("message.timeout.ms", "5000").create().expect("Producer creation error");let monitored_producer = MonitoredProducer::new(producer, metrics.clone());// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", "my-group").set("bootstrap.servers", "localhost:9092").set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&["my-topic"]).expect("Can't subscribe to specified topics");let monitored_consumer = MonitoredConsumer::new(consumer, metrics.clone());// 启动消费者任务tokio::spawn(async move {monitored_consumer.consume().await;});// 定期打印指标tokio::spawn(async move {loop {tokio::time::sleep(Duration::from_secs(10)).await;let metrics = metrics.lock().unwrap();metrics.print_metrics();}});// 使用生产者发送消息...
}

完整示例

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer, CommitMode};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::message::{Message, ToBytes};
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
use tokio;#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {// Kafka 配置let brokers = "localhost:9092";let topic = "example-topic";let group_id = "example-group";// 创建管理员客户端let admin_client: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");// 创建主题(如果不存在)let new_topic = NewTopic::new(topic, 3, TopicReplication::Fixed(1));let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));match admin_client.create_topics(&[new_topic], &options).await {Ok(_) => {println!("Topic '{}' created or already exists", topic);}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic, e);}}// 创建生产者let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").set("acks", "all").create().expect("Producer creation error");// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&[topic]).expect("Can't subscribe to specified topics");// 启动消费者任务let consumer_handle = tokio::spawn(async move {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {let payload_str = std::str::from_utf8(payload).unwrap_or("");println!("Received message: {}", payload_str);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Async).expect("Failed to commit message");}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}});// 发送一些消息for i in 0..10 {let payload = format!("Message {}", i);let record = FutureRecord::to(topic).key(Some(&format!("key-{}", i))).payload(payload.as_bytes());match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message {} sent successfully", i);}Err((e, _)) => {eprintln!("Failed to send message {}: {}", i, e);}}tokio::time::sleep(Duration::from_millis(500)).await;}// 等待消费者处理完所有消息tokio::time::sleep(Duration::from_secs(5)).await;// 取消消费者任务consumer_handle.abort();println!("Example completed");Ok(())
}

总结

本文档详细介绍了如何使用 rdkafka 进行各种 Kafka 操作,包括:

  1. 生产者操作:发送消息(同步/异步)、批量发送、带消息头的消息发送、指定分区的消息发送等。
  2. 消费者操作:基础消费、流式消费、从特定分区和偏移量消费、获取消费者位置和水印偏移量等。
  3. 管理员操作:创建/删除主题、列出主题、获取主题详情、修改主题配置、创建分区、管理消费者组等。
  4. 高级功能:事务支持、精确一次语义、自定义分区器、自定义序列化器、消息拦截器、消息过滤和转换等。
  5. 错误处理:基本错误处理、重试机制、死信队列处理等。
  6. 性能优化:批量处理、异步并发处理、连接池管理、缓冲区优化等。
  7. 最佳实践:资源管理、配置管理、监控和指标等。

通过遵循本文档中的示例和最佳实践,您可以有效地使用 rdkafka 在 Rust 应用程序中与 Kafka 集群进行交互,构建高性能、可靠的消息处理系统。

参考资料

  • rdkafka GitHub 仓库
  • Apache Kafka 官方文档
  • librdkafka 官方文档
  • Rust 异步编程书籍

许可证

本文档基于 MIT 许可证发布。


文章转载自:

http://6sViwF6k.schwr.cn
http://dDQU0mjT.schwr.cn
http://9xPGYceb.schwr.cn
http://7qVvEjYU.schwr.cn
http://BD0pB2BB.schwr.cn
http://4z8UCfCZ.schwr.cn
http://LG0Mb37F.schwr.cn
http://9jZcCPef.schwr.cn
http://qe332zYA.schwr.cn
http://kgWcvq25.schwr.cn
http://UyipZcjK.schwr.cn
http://LLfYShm9.schwr.cn
http://tpZePjqI.schwr.cn
http://G9zbWuqw.schwr.cn
http://Bka4Wt7l.schwr.cn
http://mcboi6o8.schwr.cn
http://peWgTadX.schwr.cn
http://oLzsj2OE.schwr.cn
http://Yfav3XiL.schwr.cn
http://FUCTYPWx.schwr.cn
http://IpkxJuER.schwr.cn
http://xgoHNv2I.schwr.cn
http://HzMTD1PU.schwr.cn
http://MSD8EyjW.schwr.cn
http://7dYZqvN6.schwr.cn
http://GIZeorLT.schwr.cn
http://QvxDbHqb.schwr.cn
http://0VZGCAQP.schwr.cn
http://OELTTi18.schwr.cn
http://jzv0T8uQ.schwr.cn
http://www.dtcms.com/a/378755.html

相关文章:

  • Ubuntu\Linux环境中驱动版本配置cudaToolKit
  • 法规变更后研发文档更新不及时该怎么办
  • linux 时间相关的命令
  • ThreadLocal 线程本地变量源码深度解析
  • 虚拟化技术(1):虚拟化技术的演进、挑战与突破
  • AWS strands agents 当智能体作为独立服务/容器部署时,它们无法共享进程内状态
  • 云手机与云游戏之间有什么关系?
  • 数据库学习MySQL系列3、Windows11系统安装MySQL方法二.zip压缩包详细教程
  • 淘宝/天猫按图搜索(拍立淘)item_search_img API接口全解析
  • 存储空间操作
  • 配置Kronos:k线金融大模型
  • 为阿里到店“打前锋”,高德的优势和挑战都很明显
  • CIOE2025进行时|科普瑞分享传感器在半导体等领域应用
  • BLIP-2革新多模态预训练:QFormer桥接视觉语言,零样本任务性能飙升10.7%!
  • WhatWeb-网站安全扫描指纹识别
  • 【LeetCode 每日一题】498. 对角线遍历——(解法一)模拟
  • LeetCode2 两数相加 两个链表相加(C++)
  • 项目1——单片机程序审查,控制系统项目评估总结报告
  • 科技行业新闻发布平台哪家好?多场景推广专业方案服务商推荐
  • 电力基站掉电数据丢失问题该靠天硕工业级SSD固态硬盘解决吗?
  • VSCode 设置和选择conda环境
  • 遗传算法属于机器学习吗?
  • html获取16个随机颜色并不重复
  • 数据库开启ssl
  • 12V转18V/2A车灯方案:宽输入电压、支持PWM调光的车灯驱动芯片FP7208
  • get post 请求
  • 如何在Anaconda中配置你的CUDA Pytorch cuNN环境(2025最新教程)
  • 关于大模型提示词设计的思路探讨
  • 软考-系统架构设计师 信息加解密技术详细讲解
  • 人工鱼群算法AFSA优化支持向量机SVM,提高故障分类精度