rdKafka驾驭手册:从青铜到王者的异步消息屠龙术
简介
rdkafka 是 Rust 语言的 Apache Kafka 客户端库,它基于 librdkafka(一个用 C 语言编写的高性能 Kafka 客户端库)构建。rdkafka 提供了与 Kafka 集群交互的完整功能,包括生产者、消费者、管理员操作等。
本文档将详细介绍如何使用 rdkafka 进行各种 Kafka 操作,并提供清晰的代码示例。
目录
- 安装与配置
- 基本概念
- 生产者 (Producer)
- 消费者 (Consumer)
- 管理员 (Admin)
- 高级功能
- 错误处理
- 性能优化
- 最佳实践
安装与配置
添加依赖
在 Cargo.toml
文件中添加 rdkafka 依赖:
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "ssl", "sasl"] }
tokio = { version = "1.0", features = ["full"] }
系统依赖
在 Windows 上,需要安装以下依赖:
-
GNU toolchain:使用 MSYS2 安装 MinGW
- 下载并安装 MSYS2:https://www.msys2.org/
- 更新依赖:
pacman -Syu
- 安装工具链:
pacman -S --needed base-devel mingw-w64-x86_64-toolchain
-
CMake:从 https://cmake.org/download/ 下载并安装
-
其他可选依赖:
- zlib:压缩支持
- libssl-dev:SSL 支持
- libsasl2-dev:SASL 支持
- libzstd-dev:Zstandard 压缩支持
在 Linux 上,可以使用包管理器安装:
# Ubuntu/Debian
sudo apt-get install build-essential cmake libssl-dev libsasl2-dev libzstd-dev# CentOS/RHEL
sudo yum install gcc-c++ cmake openssl-devel cyrus-sasl-devel libzstd-devel
基本概念
Kafka 架构
- Producer:消息生产者,向 Kafka broker 发送消息的客户端
- Consumer:消息消费者,从 Kafka broker 读取消息的客户端
- Consumer Group:消费者组,由多个 consumer 组成
- Broker:一台 Kafka 服务器就是一个 broker
- Topic:消息类别,可以理解为一个队列
- Partition:为了实现扩展性,一个 topic 可以分为多个 partition
- Replica:副本,一个 topic 的每个分区都有若干个副本
- Leader:每个分区多个副本的"主",生产者发送数据的对象
- Follower:每个分区多个副本中的"从",实时从 Leader 中同步数据
rdkafka 组件
rdkafka 提供了以下主要组件:
- FutureProducer:异步生产者
- BaseProducer:同步生产者
- StreamConsumer:基于流的消费者
- BaseConsumer:基础消费者
- AdminClient:管理员客户端,用于管理 Kafka 集群
生产者 (Producer)
创建生产者
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, BaseProducer};
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn create_producer(brokers: &str) -> FutureProducer {let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").set("acks", "all").create().expect("Producer creation error");producer
}
发送消息(异步)
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;async fn send_message_async(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = FutureRecord::to(topic).key(key).payload(payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send message: {}", e);Err(e)}}
}
发送消息(同步)
use rdkafka::producer::BaseProducer;
use rdkafka::producer::BaseRecord;
use rdkafka::message::ToBytes;fn send_message_sync(producer: &BaseProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = BaseRecord::to(topic).key(key).payload(payload);producer.send(record)?;// 确保所有消息都已发送producer.flush(Duration::from_secs(1));Ok(())
}
批量发送消息
use rdkafka::producer::FutureRecord;
use futures::future::try_join_all;async fn send_messages_batch(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {let futures = messages.into_iter().map(|(key, payload)| {let record = FutureRecord::to(topic).key(key).payload(payload);producer.send(record, Duration::from_secs(0))});let results = try_join_all(futures).await;match results {Ok(_) => {println!("All messages sent successfully");Ok(())}Err(e) => {eprintln!("Failed to send some messages: {}", e);Err(e.into())}}
}
带消息头的消息发送
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;async fn send_message_with_headers(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,headers: Vec<(String, Vec<u8>)>,
) -> Result<(), rdkafka::error::KafkaError> {let mut owned_headers = OwnedHeaders::new();for (key, value) in headers {owned_headers = owned_headers.add(Header::new(key, value));}let record = FutureRecord::to(topic).key(key).payload(payload).headers(owned_headers);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message with headers sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send message with headers: {}", e);Err(e)}}
}
指定分区的消息发送
use rdkafka::producer::FutureRecord;async fn send_message_to_partition(producer: &FutureProducer,topic: &str,partition: i32,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {let record = FutureRecord::to(topic).partition(partition).key(key).payload(payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Message sent to partition {} successfully", partition);Ok(())}Err((e, _)) => {eprintln!("Failed to send message to partition {}: {}", partition, e);Err(e)}}
}
消费者 (Consumer)
创建消费者
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{StreamConsumer, BaseConsumer, CommitMode, Consumer};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;
use std::time::Duration;async fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer {let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumer
}
基础消费者(同步)
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use std::time::Duration;fn consume_messages_sync(consumer: &BaseConsumer, timeout_ms: u64) {loop {match consumer.poll(Duration::from_millis(timeout_ms)) {Some(Ok(msg)) => {match msg.view() {Some(payload) => {println!("Received message: {:?}", payload);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Sync).expect("Failed to commit message");}None => {println!("Received empty message");}}}Some(Err(e)) => {eprintln!("Error while consuming: {}", e);}None => {// 超时,没有消息}}}
}
流式消费者(异步)
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::message::Message;
use futures::StreamExt;async fn consume_messages_stream(consumer: &StreamConsumer) {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {println!("Received message: {:?}", payload);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Async).expect("Failed to commit message");}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}
从特定分区和偏移量消费
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;fn consume_from_offset(consumer: &BaseConsumer,topic: &str,partition: i32,offset: i64,
) {let mut tpl = TopicPartitionList::new();tpl.add_partition_offset(topic, partition, Offset::Offset(offset)).expect("Failed to add partition offset");consumer.assign(&tpl).expect("Failed to assign partition");loop {match consumer.poll(Duration::from_millis(100)) {Some(Ok(msg)) => {match msg.view() {Some(payload) => {println!("Received message from partition {} at offset {}: {:?}", partition, msg.offset(), payload);}None => {println!("Received empty message");}}}Some(Err(e)) => {eprintln!("Error while consuming: {}", e);}None => {// 超时,没有消息}}}
}
获取消费者位置
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;fn get_consumer_position(consumer: &BaseConsumer, topic: &str, partition: i32) -> i64 {let mut tpl = TopicPartitionList::new();tpl.add_partition(topic, partition);let position = consumer.position(&tpl).expect("Failed to get consumer position");for elem in position.elements() {if elem.topic() == topic && elem.partition() == partition {return elem.offset().to_raw();}}-1
}
获取水印偏移量
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;fn get_watermark_offsets(consumer: &BaseConsumer,topic: &str,partition: i32,
) -> (i64, i64) {let mut tpl = TopicPartitionList::new();tpl.add_partition(topic, partition);let (low, high) = consumer.get_watermark_offsets(&tpl, Duration::from_secs(1)).expect("Failed to get watermark offsets");(low, high)
}
消费者组管理
use rdkafka::consumer::{StreamConsumer, ConsumerGroupMetadata};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;fn get_consumer_group_metadata(consumer: &StreamConsumer) -> ConsumerGroupMetadata {consumer.group_metadata().expect("Failed to get consumer group metadata")
}
管理员 (Admin)
创建管理员客户端
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication, AlterConfig, ConfigSource};
use rdkafka::config::ClientConfig;
use std::time::Duration;async fn create_admin_client(brokers: &str) -> AdminClient {let admin_client: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");admin_client
}
创建主题
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};async fn create_topic(admin_client: &AdminClient,topic_name: &str,num_partitions: i32,replication_factor: i32,
) -> Result<(), rdkafka::error::KafkaError> {let new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}
创建主题时的参数配置
在创建 Kafka 主题时,除了基本的分区数和复制因子外,还可以设置许多其他配置参数。这些参数可以通过 NewTopic
的 set
方法来设置。
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;async fn create_topic_with_configs(admin_client: &AdminClient,topic_name: &str,num_partitions: i32,replication_factor: i32,configs: &HashMap<String, String>,
) -> Result<(), rdkafka::error::KafkaError> {let mut new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));// 设置主题配置参数for (key, value) in configs {new_topic = new_topic.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully with custom configs", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}
主题配置参数详解
以下是创建 Kafka 主题时可以设置的所有参数及其说明:
1. 日志清理策略参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
cleanup.policy | delete | 日志清理策略,决定如何处理旧数据 | delete (基于时间或大小删除)、compact (日志压缩)、compact,delete (同时启用压缩和删除) |
delete.retention.ms | 86400000 (24小时) | 已删除消息的保留时间(仅当 cleanup.policy 包含 delete 时有效) | 正整数(毫秒) |
min.compaction.lag.ms | 0 | 消息被压缩前的最小等待时间(仅当 cleanup.policy 包含 compact 时有效) | 正整数(毫秒) |
max.compaction.lag.ms | 9223372036854775807 | 消息被压缩前的最大等待时间(仅当 cleanup.policy 包含 compact 时有效) | 正整数(毫秒) |
min.cleanable.dirty.ratio | 0.5 | 日志清理的触发阈值,当日志中未清理消息的比例超过此值时触发清理 | 0到1之间的浮点数 |
2. 日志保留策略参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
retention.ms | 604800000 (7天) | 消息保留时间,超过此时间的消息将被删除 | 正整数(毫秒),-1表示永不删除 |
retention.bytes | -1 | 主题的最大大小,超过此大小的消息将被删除 | 正整数(字节),-1表示无限制 |
retention.check.interval.ms | 300000 (5分钟) | 日志保留策略的检查间隔 | 正整数(毫秒) |
segment.ms | 604800000 (7天) | 日志段滚动的时间间隔,超过此时间将创建新的日志段 | 正整数(毫秒) |
segment.bytes | 1073741824 (1GB) | 单个日志段的最大大小 | 正整数(字节) |
segment.index.bytes | 10485760 (10MB) | 日志索引文件的最大大小 | 正整数(字节) |
segment.jitter.ms | 0 | 日志段滚动时间的最大随机抖动,用于避免同时滚动多个日志段 | 正整数(毫秒) |
3. 消息大小和时间戳参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
max.message.bytes | 1048588 (1MB) | 单个消息的最大大小 | 正整数(字节) |
message.timestamp.type | CreateTime | 消息时间戳类型 | CreateTime (消息创建时间)、LogAppendTime (日志追加时间) |
message.timestamp.difference.max.ms | 9223372036854775807 | 允许的消息时间戳与 broker 时间戳之间的最大差异 | 正整数(毫秒) |
message.downconversion.enable | true | 是否启用消息格式降级转换 | true 、false |
4. 日志刷新和同步参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
flush.messages | 9223372036854775807 | 日志刷新的消息数量阈值,超过此数量的消息将刷新到磁盘 | 正整数,-1表示禁用 |
flush.ms | 9223372036854775807 | 日志刷新的时间间隔,超过此时间将刷新到磁盘 | 正整数(毫秒),-1表示禁用 |
unclean.leader.election.enable | false | 是否允许在数据丢失的情况下选举新的 leader | true 、false |
5. 索引和缓存参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
index.interval.bytes | 4096 (4KB) | 索引项之间的字节数 | 正整数(字节) |
preallocate | false | 是否预分配日志文件 | true 、false |
6. 压缩参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
compression.type | producer | 消息压缩类型 | producer (使用生产者指定的压缩类型)、none 、gzip 、snappy 、lz4 、zstd |
7. 副本和领导者参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
min.insync.replicas | 1 | 当生产者设置 acks=all 时,必须成功写入的最小副本数 | 正整数(1 ≤ 值 ≤ 副本因子) |
leader.replication.throttled.replicas | 无 | 被限制复制的副本列表 | 副本ID列表,例如:0:1,1:2 |
8. 远程日志存储参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
remote.log.storage.enable | false | 是否启用远程日志存储 | true 、false |
local.log.retention.ms | -2 | 本地日志保留时间,-2表示使用 retention.ms 的值 | 正整数(毫秒),-2表示使用 retention.ms 的值 |
local.log.retention.bytes | -2 | 本地日志保留大小,-2表示使用 retention.bytes 的值 | 正整数(字节),-2表示使用 retention.bytes 的值 |
创建带配置的主题示例
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;async fn create_topic_with_example_configs(admin_client: &AdminClient,topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {let mut configs = HashMap::new();// 设置日志清理策略为删除和压缩configs.insert("cleanup.policy".to_string(), "delete,compact".to_string());// 设置消息保留时间为3天configs.insert("retention.ms".to_string(), "259200000".to_string());// 设置单个日志段的最大大小为512MBconfigs.insert("segment.bytes".to_string(), "536870912".to_string());// 设置单个消息的最大大小为5MBconfigs.insert("max.message.bytes".to_string(), "5242880".to_string());// 设置消息时间戳类型为日志追加时间configs.insert("message.timestamp.type".to_string(), "LogAppendTime".to_string());// 设置压缩类型为lz4configs.insert("compression.type".to_string(), "lz4".to_string());// 设置必须成功写入的最小副本数为2configs.insert("min.insync.replicas".to_string(), "2".to_string());let mut new_topic = NewTopic::new(topic_name, 6, TopicReplication::Fixed(3));// 应用配置for (key, value) in &configs {new_topic = new_topic.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));let result = admin_client.create_topics(&[new_topic], &options).await;match result {Ok(_) => {println!("Topic '{}' created successfully with custom configs", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic_name, e);Err(e)}}
}
删除主题
use rdkafka::admin::{AdminClient, AdminOptions};async fn delete_topic(admin_client: &AdminClient,topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.delete_topics(&[topic_name], &options).await;match result {Ok(_) => {println!("Topic '{}' deleted successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to delete topic '{}': {}", topic_name, e);Err(e)}}
}
列出所有主题
use rdkafka::admin::{AdminClient, AdminOptions, Metadata};async fn list_topics(admin_client: &AdminClient) -> Result<Vec<String>, rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let metadata: Metadata = admin_client.inner().fetch_metadata(None, Duration::from_secs(5))?;let topics: Vec<String> = metadata.topics().iter().map(|topic| topic.name().to_string()).collect();Ok(topics)
}
获取主题详情
use rdkafka::admin::{AdminClient, AdminOptions, Metadata};async fn get_topic_details(admin_client: &AdminClient,topic_name: &str,
) -> Result<(i32, i32), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let metadata: Metadata = admin_client.inner().fetch_metadata(Some(topic_name), Duration::from_secs(5))?;for topic in metadata.topics() {if topic.name() == topic_name {let num_partitions = topic.partitions().len() as i32;let replication_factor = topic.partitions().first().map(|p| p.replicas().len() as i32).unwrap_or(0);return Ok((num_partitions, replication_factor));}}Err(rdkafka::error::KafkaError::UnknownTopicOrPartition)
}
修改主题配置
use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ConfigSource};async fn alter_topic_config(admin_client: &AdminClient,topic_name: &str,config_updates: Vec<(String, String)>,
) -> Result<(), rdkafka::error::KafkaError> {let mut alter_config = AlterConfig::new(ConfigSource::Topic(topic_name.to_string()));for (key, value) in config_updates {alter_config = alter_config.set(key, value);}let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.alter_configs(&[alter_config], &options).await;match result {Ok(_) => {println!("Topic '{}' config updated successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to update topic '{}' config: {}", topic_name, e);Err(e)}}
}
创建分区
use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions};async fn create_partitions(admin_client: &AdminClient,topic_name: &str,new_total_count: i32,
) -> Result<(), rdkafka::error::KafkaError> {let new_partitions = NewPartitions::new(topic_name, new_total_count);let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.create_partitions(&[new_partitions], &options).await;match result {Ok(_) => {println!("Partitions for topic '{}' created successfully", topic_name);Ok(())}Err(e) => {eprintln!("Failed to create partitions for topic '{}': {}", topic_name, e);Err(e)}}
}
列出消费者组
use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};async fn list_consumer_groups(admin_client: &AdminClient) -> Result<Vec<GroupListing>, rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let groups = admin_client.list_consumer_groups(&options).await?;Ok(groups)
}
描述消费者组
use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};async fn describe_consumer_group(admin_client: &AdminClient,group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let group_description = admin_client.describe_consumer_groups(&[group_id], &options).await?;for description in group_description {println!("Group ID: {}", description.group_id());println!("State: {}", description.state());println!("Coordinator: {:?}", description.coordinator());for member in description.members() {println!("Member ID: {}", member.member_id());println!("Client ID: {}", member.client_id());println!("Host: {}", member.client_host());for assignment in member.assignment() {println!("Topic: {}, Partition: {}", assignment.topic(), assignment.partition());}}}Ok(())
}
删除消费者组
use rdkafka::admin::{AdminClient, AdminOptions};async fn delete_consumer_group(admin_client: &AdminClient,group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));let result = admin_client.delete_consumer_groups(&[group_id], &options).await;match result {Ok(_) => {println!("Consumer group '{}' deleted successfully", group_id);Ok(())}Err(e) => {eprintln!("Failed to delete consumer group '{}': {}", group_id, e);Err(e)}}
}
高级功能
事务支持
use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn create_transactional_producer(brokers: &str,transactional_id: &str,
) -> TransactionalProducer {let producer: TransactionalProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("transactional.id", transactional_id).set("enable.idempotence", "true").create().expect("Transactional producer creation failed");producer
}async fn send_messages_in_transaction(producer: &TransactionalProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {// 初始化事务producer.init_transactions(Duration::from_secs(5)).await?;// 开始事务producer.begin_transaction().await?;// 发送消息for (key, payload) in messages {let record = FutureRecord::to(topic).key(key).payload(payload);producer.send(record, Duration::from_secs(0)).await?;}// 提交事务producer.commit_transaction(Duration::from_secs(5)).await?;Ok(())
}
精确一次语义 (Exactly-Once Semantics)
use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::message::Message;
use std::time::Duration;async fn exactly_once_processing(input_topic: &str,output_topic: &str,brokers: &str,group_id: &str,transactional_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {// 创建事务性生产者let producer: TransactionalProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("transactional.id", transactional_id).set("enable.idempotence", "true").create().expect("Transactional producer creation failed");// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.auto.commit", "false").set("isolation.level", "read_committed").create().expect("Consumer creation failed");consumer.subscribe(&[input_topic]).expect("Can't subscribe to input topic");// 初始化事务producer.init_transactions(Duration::from_secs(5)).await?;let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(input_msg) => {// 开始事务producer.begin_transaction().await?;// 处理消息let processed_payload = process_message(input_msg.view().unwrap_or(&[]));// 发送处理后的消息let record = FutureRecord::to(output_topic).key(input_msg.key()).payload(&processed_payload);producer.send(record, Duration::from_secs(0)).await?;// 提交消费者偏移量producer.send_offsets_to_transaction(&consumer.consumer_group_metadata()?,Duration::from_secs(5),).await?;// 提交事务producer.commit_transaction(Duration::from_secs(5)).await?;}Err(e) => {eprintln!("Error while consuming: {}", e);// 中止事务producer.abort_transaction(Duration::from_secs(5)).await?;}}}Ok(())
}fn process_message(input: &[u8]) -> Vec<u8> {// 这里实现消息处理逻辑// 示例:简单地将消息转换为大写input.to_uppercase()
}
自定义分区器
use rdkafka::producer::{DefaultProducerContext, FutureProducer, ProducerContext};
use rdkafka::config::ClientConfig;
use rdkafka::message::{Message, ToBytes};
use rdkafka::util::Timeout;
use std::time::Duration;struct CustomPartitioner;impl ProducerContext for CustomPartitioner {fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: i32, _key_data: Option<&[u8]>) -> i32 {// 自定义分区逻辑// 示例:基于键的哈希值进行分区if let Some(key) = key {let hash = key.iter().fold(0, |acc, &x| acc.wrapping_add(x as usize));(hash % partition_count as usize) as i32} else {// 如果没有键,使用轮询策略let current_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();(current_time % partition_count as u64) as i32}}
}async fn create_producer_with_custom_partitioner(brokers: &str) -> FutureProducer<CustomPartitioner> {let producer: FutureProducer<CustomPartitioner> = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create_with_context(CustomPartitioner).expect("Producer creation error");producer
}
自定义序列化器
use serde::{Serialize, Deserialize};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::error::Error;#[derive(Serialize, Deserialize, Debug)]
struct MyData {id: u32,name: String,timestamp: i64,
}trait MessageSerializer<T> {fn serialize(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>;
}trait MessageDeserializer<T> {fn deserialize(&self, bytes: &[u8]) -> Result<T, Box<dyn Error>>;
}struct JsonSerializer;impl MessageSerializer<MyData> for JsonSerializer {fn serialize(&self, data: &MyData) -> Result<Vec<u8>, Box<dyn Error>> {Ok(serde_json::to_vec(data)?)}
}struct JsonDeserializer;impl MessageDeserializer<MyData> for JsonDeserializer {fn deserialize(&self, bytes: &[u8]) -> Result<MyData, Box<dyn Error>> {Ok(serde_json::from_slice(bytes)?)}
}async fn send_serialized_message(producer: &FutureProducer,topic: &str,key: Option<&str>,data: &MyData,serializer: &impl MessageSerializer<MyData>,
) -> Result<(), Box<dyn Error>> {let payload = serializer.serialize(data)?;let record = FutureRecord::to(topic).key(key).payload(&payload);let delivery_status = producer.send(record, Duration::from_secs(0)).await;match delivery_status {Ok(_) => {println!("Serialized message sent successfully");Ok(())}Err((e, _)) => {eprintln!("Failed to send serialized message: {}", e);Err(Box::new(e))}}
}
消息拦截器
use rdkafka::producer::{FutureProducer, ProducerContext};
use rdkafka::consumer::{StreamConsumer, ConsumerContext};
use rdkafka::message::{Message, OwnedMessage};
use rdkafka::config::ClientConfig;
use std::time::Duration;struct LoggingProducerContext;impl ProducerContext for LoggingProducerContext {fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult) {match delivery_result {Ok(message) => {println!("Message delivered to topic {}, partition {} at offset {}",message.topic(), message.partition(), message.offset());}Err((e, message)) => {eprintln!("Failed to deliver message to topic {}, partition {}: {}",message.topic(), message.partition(), e);}}}
}struct LoggingConsumerContext;impl ConsumerContext for LoggingConsumerContext {fn message_consumed(&self, message: &OwnedMessage) {match message.payload_view::<str>() {Ok(Some(payload)) => {println!("Message consumed from topic {}, partition {} at offset {}: {}",message.topic(), message.partition(), message.offset(), payload);}Ok(None) => {println!("Empty message consumed from topic {}, partition {} at offset {}",message.topic(), message.partition(), message.offset());}Err(e) => {eprintln!("Error while consuming message from topic {}, partition {}: {}",message.topic(), message.partition(), e);}}}
}async fn create_producer_with_interceptor(brokers: &str) -> FutureProducer<LoggingProducerContext> {let producer: FutureProducer<LoggingProducerContext> = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create_with_context(LoggingProducerContext).expect("Producer creation error");producer
}async fn create_consumer_with_interceptor(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer<LoggingConsumerContext> {let consumer: StreamConsumer<LoggingConsumerContext> = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create_with_context(LoggingConsumerContext).expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumer
}
消息过滤
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;async fn filter_messages(consumer: &StreamConsumer,filter_fn: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {if filter_fn(payload) {println!("Filtered message: {:?}", payload);// 处理符合条件的消息}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例过滤函数
fn example_filter(payload: &[u8]) -> bool {// 只处理包含特定字符串的消息let payload_str = std::str::from_utf8(payload).unwrap_or("");payload_str.contains("important")
}
消息转换
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;async fn transform_messages(input_consumer: &StreamConsumer,output_producer: &FutureProducer,input_topic: &str,output_topic: &str,transform_fn: impl Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
) {let mut message_stream = input_consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {// 转换消息let transformed_payload = transform_fn(payload);// 发送转换后的消息let record = FutureRecord::to(output_topic).key(msg.key()).payload(&transformed_payload);match output_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Transformed message sent successfully");}Err((e, _)) => {eprintln!("Failed to send transformed message: {}", e);}}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例转换函数
fn example_transform(payload: &[u8]) -> Vec<u8> {// 将消息转换为大写let payload_str = std::str::from_utf8(payload).unwrap_or("");payload_str.to_uppercase().into_bytes()
}
错误处理
基本错误处理
use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;async fn send_message_with_error_handling(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,
) -> Result<(), Box<dyn std::error::Error>> {let record = FutureRecord::to(topic).key(key).payload(payload);match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message sent successfully");Ok(())}Err((e, _)) => {match e {KafkaError::MessageProduction(RDKafkaError::BrokerTransportFailure(_)) => {eprintln!("Broker transport failure: {}", e);// 可以尝试重新连接或使用备用 broker}KafkaError::MessageProduction(RDKafkaError::QueueFull) => {eprintln!("Producer queue is full: {}", e);// 可以等待一段时间后重试}KafkaError::MessageProduction(RDKafkaError::MessageSizeTooLarge) => {eprintln!("Message size too large: {}", e);// 可以尝试压缩消息或减小消息大小}KafkaError::MessageProduction(RDKafkaError::UnknownTopicOrPartition) => {eprintln!("Unknown topic or partition: {}", e);// 可以尝试创建主题或检查主题名称}_ => {eprintln!("Failed to send message: {}", e);}}Err(Box::new(e))}}
}
重试机制
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
use tokio::time::sleep;async fn send_message_with_retry(producer: &FutureProducer,topic: &str,key: Option<&str>,payload: Option<&[u8]>,max_retries: u32,retry_delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {let mut retries = 0;loop {let record = FutureRecord::to(topic).key(key).payload(payload);match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message sent successfully");return Ok(());}Err((e, _)) => {retries += 1;if retries >= max_retries {eprintln!("Failed to send message after {} retries: {}", max_retries, e);return Err(Box::new(e));}eprintln!("Failed to send message (attempt {}/{}): {}, retrying in {:?}...",retries, max_retries, e, retry_delay);sleep(retry_delay).await;}}}
}
死信队列处理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;async fn dead_letter_queue_handler(main_consumer: &StreamConsumer,dlq_producer: &FutureProducer,main_topic: &str,dlq_topic: &str,error_handler: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {let mut message_stream = main_consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {// 尝试处理消息if error_handler(payload) {// 处理成功println!("Message processed successfully");} else {// 处理失败,发送到死信队列let record = FutureRecord::to(dlq_topic).key(msg.key()).payload(payload);match dlq_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Failed message sent to dead letter queue");}Err((e, _)) => {eprintln!("Failed to send message to dead letter queue: {}", e);}}}}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}
}// 示例错误处理函数
fn example_error_handler(payload: &[u8]) -> bool {// 模拟处理消息,有时会失败let payload_str = std::str::from_utf8(payload).unwrap_or("");// 如果消息包含 "error",则处理失败if payload_str.contains("error") {return false;}// 否则处理成功true
}
性能优化
批量处理
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use futures::future::try_join_all;
use std::time::Duration;async fn batch_send_messages(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {for chunk in messages.chunks(batch_size) {let futures = chunk.iter().map(|(key, payload)| {let record = FutureRecord::to(topic).key(*key).payload(*payload);producer.send(record, Duration::from_secs(0))});let results = try_join_all(futures).await;match results {Ok(_) => {println!("Batch of {} messages sent successfully", chunk.len());}Err(e) => {eprintln!("Failed to send batch of messages: {}", e);return Err(e.into());}}}Ok(())
}
异步并发处理
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;
use tokio::task;async fn concurrent_message_processing(consumer: &StreamConsumer,concurrency_level: usize,process_fn: impl Fn(&[u8]) + Send + Sync + 'static,
) {let mut message_stream = consumer.stream();let mut tasks = Vec::with_capacity(concurrency_level);while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {let payload = payload.to_vec();let process_fn = &process_fn;// 如果已经达到并发限制,等待一个任务完成if tasks.len() >= concurrency_level {if let Some(task_result) = tasks.pop() {let _ = task_result.await;}}// 启动新的处理任务let task = task::spawn(async move {process_fn(&payload);});tasks.push(task);}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}// 等待所有剩余任务完成for task in tasks {let _ = task.await;}
}// 示例处理函数
fn example_process_fn(payload: &[u8]) {// 模拟耗时处理let payload_str = std::str::from_utf8(payload).unwrap_or("");println!("Processing message: {}", payload_str);// 模拟处理耗时std::thread::sleep(std::time::Duration::from_millis(100));
}
连接池管理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::config::ClientConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;struct KafkaConnectionPool {producers: Arc<Mutex<HashMap<String, FutureProducer>>>,consumers: Arc<Mutex<HashMap<String, StreamConsumer>>>,brokers: String,
}impl KafkaConnectionPool {fn new(brokers: &str) -> Self {Self {producers: Arc::new(Mutex::new(HashMap::new())),consumers: Arc::new(Mutex::new(HashMap::new())),brokers: brokers.to_string(),}}fn get_producer(&self, client_id: &str) -> FutureProducer {let mut producers = self.producers.lock().unwrap();if let Some(producer) = producers.get(client_id) {return producer.clone();}let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", &self.brokers).set("client.id", client_id).set("message.timeout.ms", "5000").create().expect("Producer creation error");producers.insert(client_id.to_string(), producer.clone());producer}fn get_consumer(&self, group_id: &str, topics: &[&str]) -> StreamConsumer {let key = format!("{}:{}", group_id, topics.join(","));let mut consumers = self.consumers.lock().unwrap();if let Some(consumer) = consumers.get(&key) {return consumer.clone();}let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", &self.brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");consumers.insert(key, consumer.clone());consumer}
}
缓冲区优化
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::time::Duration;async fn optimized_send_messages(producer: &FutureProducer,topic: &str,messages: Vec<(Option<&str>, Option<&[u8]>)>,linger_ms: u32,batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {// 配置生产者以优化批量发送let mut config = producer.client().config().clone();config.set("linger.ms", linger_ms.to_string());config.set("batch.size", batch_size.to_string());// 创建新的生产者实例let optimized_producer: FutureProducer = config.create().expect("Optimized producer creation error");// 发送消息for (key, payload) in messages {let record = FutureRecord::to(topic).key(key).payload(payload);match optimized_producer.send(record, Duration::from_secs(0)).await {Ok(_) => {// 消息已添加到缓冲区}Err((e, _)) => {eprintln!("Failed to send message: {}", e);return Err(e);}}}// 刷新缓冲区,确保所有消息都已发送optimized_producer.flush(Duration::from_secs(5));Ok(())
}
最佳实践
资源管理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::admin::AdminClient;
use std::sync::Arc;struct KafkaClient {producer: Arc<FutureProducer>,consumer: Arc<StreamConsumer>,admin: Arc<AdminClient>,
}impl KafkaClient {fn new(brokers: &str, group_id: &str, topics: &[&str]) -> Self {let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create().expect("Producer creation error");let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");let admin: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");Self {producer: Arc::new(producer),consumer: Arc::new(consumer),admin: Arc::new(admin),}}fn producer(&self) -> Arc<FutureProducer> {self.producer.clone()}fn consumer(&self) -> Arc<StreamConsumer> {self.consumer.clone()}fn admin(&self) -> Arc<AdminClient> {self.admin.clone()}
}// 使用示例
async fn use_kafka_client() {let brokers = "localhost:9092";let group_id = "my-group";let topics = &["my-topic"];let kafka_client = KafkaClient::new(brokers, group_id, topics);// 使用生产者let producer = kafka_client.producer();// ... 使用生产者发送消息// 使用消费者let consumer = kafka_client.consumer();// ... 使用消费者接收消息// 使用管理员let admin = kafka_client.admin();// ... 使用管理员执行管理操作
}
配置管理
use serde::{Deserialize, Serialize};
use std::env;#[derive(Debug, Serialize, Deserialize)]
struct KafkaConfig {brokers: String,group_id: String,topics: Vec<String>,producer_config: ProducerConfig,consumer_config: ConsumerConfig,admin_config: AdminConfig,
}#[derive(Debug, Serialize, Deserialize)]
struct ProducerConfig {linger_ms: u32,batch_size: usize,message_timeout_ms: u32,acks: String,retries: u32,
}#[derive(Debug, Serialize, Deserialize)]
struct ConsumerConfig {session_timeout_ms: u32,auto_commit: bool,auto_offset_reset: String,max_poll_records: i32,
}#[derive(Debug, Serialize, Deserialize)]
struct AdminConfig {request_timeout_ms: u32,
}impl KafkaConfig {fn from_env() -> Result<Self, env::VarError> {Ok(Self {brokers: env::var("KAFKA_BROKERS")?,group_id: env::var("KAFKA_GROUP_ID")?,topics: env::var("KAFKA_TOPICS")?.split(',').map(|s| s.to_string()).collect(),producer_config: ProducerConfig {linger_ms: env::var("KAFKA_PRODUCER_LINGER_MS")?.parse().unwrap_or(10),batch_size: env::var("KAFKA_PRODUCER_BATCH_SIZE")?.parse().unwrap_or(16384),message_timeout_ms: env::var("KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS")?.parse().unwrap_or(5000),acks: env::var("KAFKA_PRODUCER_ACKS")?.parse().unwrap_or_else(|_| "all".to_string()),retries: env::var("KAFKA_PRODUCER_RETRIES")?.parse().unwrap_or(3),},consumer_config: ConsumerConfig {session_timeout_ms: env::var("KAFKA_CONSUMER_SESSION_TIMEOUT_MS")?.parse().unwrap_or(10000),auto_commit: env::var("KAFKA_CONSUMER_AUTO_COMMIT")?.parse().unwrap_or(false),auto_offset_reset: env::var("KAFKA_CONSUMER_AUTO_OFFSET_RESET")?.parse().unwrap_or_else(|_| "earliest".to_string()),max_poll_records: env::var("KAFKA_CONSUMER_MAX_POLL_RECORDS")?.parse().unwrap_or(500),},admin_config: AdminConfig {request_timeout_ms: env::var("KAFKA_ADMIN_REQUEST_TIMEOUT_MS")?.parse().unwrap_or(5000),},})}fn to_producer_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("linger.ms", self.producer_config.linger_ms.to_string());config.set("batch.size", self.producer_config.batch_size.to_string());config.set("message.timeout.ms", self.producer_config.message_timeout_ms.to_string());config.set("acks", &self.producer_config.acks);config.set("retries", self.producer_config.retries.to_string());config}fn to_consumer_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("group.id", &self.group_id);config.set("session.timeout.ms", self.consumer_config.session_timeout_ms.to_string());config.set("enable.auto.commit", self.consumer_config.auto_commit.to_string());config.set("auto.offset.reset", &self.consumer_config.auto_offset_reset);config.set("max.poll.records", self.consumer_config.max_poll_records.to_string());config}fn to_admin_config(&self) -> rdkafka::config::ClientConfig {let mut config = rdkafka::config::ClientConfig::new();config.set("bootstrap.servers", &self.brokers);config.set("request.timeout.ms", self.admin_config.request_timeout_ms.to_string());config}
}// 使用示例
async fn use_config() -> Result<(), Box<dyn std::error::Error>> {// 从环境变量加载配置let kafka_config = KafkaConfig::from_env()?;// 创建生产者let producer: rdkafka::producer::FutureProducer = kafka_config.to_producer_config().create()?;// 创建消费者let consumer: rdkafka::consumer::StreamConsumer = kafka_config.to_consumer_config().create()?;// 创建管理员let admin: rdkafka::admin::AdminClient = kafka_config.to_admin_config().create()?;// 使用客户端...Ok(())
}
监控和指标
use rdkafka::consumer::StreamConsumer;
use rdkafka::producer::FutureProducer;
use rdkafka::statistics::Statistics;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};struct KafkaMetrics {messages_produced: u64,messages_consumed: u64,bytes_produced: u64,bytes_consumed: u64,produce_errors: u64,consume_errors: u64,last_update: Instant,
}impl KafkaMetrics {fn new() -> Self {Self {messages_produced: 0,messages_consumed: 0,bytes_produced: 0,bytes_consumed: 0,produce_errors: 0,consume_errors: 0,last_update: Instant::now(),}}fn record_message_produced(&mut self, size: usize) {self.messages_produced += 1;self.bytes_produced += size as u64;self.last_update = Instant::now();}fn record_message_consumed(&mut self, size: usize) {self.messages_consumed += 1;self.bytes_consumed += size as u64;self.last_update = Instant::now();}fn record_produce_error(&mut self) {self.produce_errors += 1;self.last_update = Instant::now();}fn record_consume_error(&mut self) {self.consume_errors += 1;self.last_update = Instant::now();}fn print_metrics(&self) {println!("Kafka Metrics:");println!(" Messages produced: {}", self.messages_produced);println!(" Messages consumed: {}", self.messages_consumed);println!(" Bytes produced: {}", self.bytes_produced);println!(" Bytes consumed: {}", self.bytes_consumed);println!(" Produce errors: {}", self.produce_errors);println!(" Consume errors: {}", self.consume_errors);println!(" Last update: {:?}", self.last_update.elapsed());}
}struct MonitoredProducer {producer: FutureProducer,metrics: Arc<Mutex<KafkaMetrics>>,
}impl MonitoredProducer {fn new(producer: FutureProducer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {Self { producer, metrics }}async fn send(&self,record: rdkafka::producer::FutureRecord<&[u8], &[u8]>,timeout: Duration,) -> Result<(), (rdkafka::error::KafkaError, rdkafka::producer::FutureRecord<&[u8], &[u8]>)> {let size = record.payload.map(|p| p.len()).unwrap_or(0);match self.producer.send(record, timeout).await {Ok(_) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_message_produced(size);Ok(())}Err(e) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_produce_error();Err(e)}}}
}struct MonitoredConsumer {consumer: StreamConsumer,metrics: Arc<Mutex<KafkaMetrics>>,
}impl MonitoredConsumer {fn new(consumer: StreamConsumer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {Self { consumer, metrics }}async fn consume(&self) {let mut message_stream = self.consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {let size = msg.payload().map(|p| p.len()).unwrap_or(0);let mut metrics = self.metrics.lock().unwrap();metrics.record_message_consumed(size);// 处理消息...}Err(e) => {let mut metrics = self.metrics.lock().unwrap();metrics.record_consume_error();eprintln!("Error while consuming: {}", e);}}}}
}// 使用示例
async fn use_monitored_clients() {let metrics = Arc::new(Mutex::new(KafkaMetrics::new()));// 创建生产者let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", "localhost:9092").set("message.timeout.ms", "5000").create().expect("Producer creation error");let monitored_producer = MonitoredProducer::new(producer, metrics.clone());// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", "my-group").set("bootstrap.servers", "localhost:9092").set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&["my-topic"]).expect("Can't subscribe to specified topics");let monitored_consumer = MonitoredConsumer::new(consumer, metrics.clone());// 启动消费者任务tokio::spawn(async move {monitored_consumer.consume().await;});// 定期打印指标tokio::spawn(async move {loop {tokio::time::sleep(Duration::from_secs(10)).await;let metrics = metrics.lock().unwrap();metrics.print_metrics();}});// 使用生产者发送消息...
}
完整示例
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer, CommitMode};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::message::{Message, ToBytes};
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
use tokio;#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {// Kafka 配置let brokers = "localhost:9092";let topic = "example-topic";let group_id = "example-group";// 创建管理员客户端let admin_client: AdminClient = ClientConfig::new().set("bootstrap.servers", brokers).create().expect("Admin client creation failed");// 创建主题(如果不存在)let new_topic = NewTopic::new(topic, 3, TopicReplication::Fixed(1));let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));match admin_client.create_topics(&[new_topic], &options).await {Ok(_) => {println!("Topic '{}' created or already exists", topic);}Err(e) => {eprintln!("Failed to create topic '{}': {}", topic, e);}}// 创建生产者let producer: FutureProducer = ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").set("acks", "all").create().expect("Producer creation error");// 创建消费者let consumer: StreamConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "false").create().expect("Consumer creation failed");consumer.subscribe(&[topic]).expect("Can't subscribe to specified topics");// 启动消费者任务let consumer_handle = tokio::spawn(async move {let mut message_stream = consumer.stream();while let Some(result) = message_stream.next().await {match result {Ok(msg) => {match msg.view() {Some(payload) => {let payload_str = std::str::from_utf8(payload).unwrap_or("");println!("Received message: {}", payload_str);// 手动提交偏移量consumer.commit_message(&msg, CommitMode::Async).expect("Failed to commit message");}None => {println!("Received empty message");}}}Err(e) => {eprintln!("Error while consuming: {}", e);}}}});// 发送一些消息for i in 0..10 {let payload = format!("Message {}", i);let record = FutureRecord::to(topic).key(Some(&format!("key-{}", i))).payload(payload.as_bytes());match producer.send(record, Duration::from_secs(0)).await {Ok(_) => {println!("Message {} sent successfully", i);}Err((e, _)) => {eprintln!("Failed to send message {}: {}", i, e);}}tokio::time::sleep(Duration::from_millis(500)).await;}// 等待消费者处理完所有消息tokio::time::sleep(Duration::from_secs(5)).await;// 取消消费者任务consumer_handle.abort();println!("Example completed");Ok(())
}
总结
本文档详细介绍了如何使用 rdkafka 进行各种 Kafka 操作,包括:
- 生产者操作:发送消息(同步/异步)、批量发送、带消息头的消息发送、指定分区的消息发送等。
- 消费者操作:基础消费、流式消费、从特定分区和偏移量消费、获取消费者位置和水印偏移量等。
- 管理员操作:创建/删除主题、列出主题、获取主题详情、修改主题配置、创建分区、管理消费者组等。
- 高级功能:事务支持、精确一次语义、自定义分区器、自定义序列化器、消息拦截器、消息过滤和转换等。
- 错误处理:基本错误处理、重试机制、死信队列处理等。
- 性能优化:批量处理、异步并发处理、连接池管理、缓冲区优化等。
- 最佳实践:资源管理、配置管理、监控和指标等。
通过遵循本文档中的示例和最佳实践,您可以有效地使用 rdkafka 在 Rust 应用程序中与 Kafka 集群进行交互,构建高性能、可靠的消息处理系统。
参考资料
- rdkafka GitHub 仓库
- Apache Kafka 官方文档
- librdkafka 官方文档
- Rust 异步编程书籍
许可证
本文档基于 MIT 许可证发布。