从 0 到 1 开发 Rust 分布式日志服务:高吞吐设计 + 存储优化,支撑千万级日志采集

📑 目录
- 前言:为什么选择 Rust?
- 第一章:系统架构设计
- 第二章:高性能采集端
- 第三章:分布式存储
- 第四章:查询优化
- 第五章:生产环境部署
- 性能测试与优化
前言:为什么选择 Rust?💡
传统日志系统痛点:
- ❌ ELK 栈内存占用高(JVM)
- ❌ Fluentd 性能瓶颈(Ruby)
- ❌ 高并发下丢失日志
Rust 的优势:
- ✅ 零成本抽象 + 内存安全
- ✅ 原生异步支持(Tokio)
- ✅ 性能接近 C++
系统目标:
- 🎯 支持 100万+ QPS 日志写入
- 🎯 亚秒级查询响应
- 🎯 横向可扩展
第一章:系统架构设计 🏗️
1.1 整体架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 应用服务器 │ │ 应用服务器 │ │ 应用服务器 │
│ (Agent) │ │ (Agent) │ │ (Agent) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘│ │ │└────────┬────────┴────────┬────────┘│ │┌──────▼─────────────────▼──────┐│ Kafka / 消息队列 │└──────┬───────────────────┬────┘│ │┌────────▼────────┐ ┌──────▼────────┐│ Indexer Node │ │ Indexer Node ││ (索引构建) │ │ (索引构建) │└────────┬────────┘ └──────┬────────┘│ │┌──────▼───────────────────▼──────┐│ Storage Cluster ││ (ClickHouse / 自研存储) │└──────┬───────────────────────────┘│┌──────▼──────┐│ Query API ││ (查询服务) │└─────────────┘
1.2 技术选型
| 组件 | 技术栈 | 原因 |
|---|---|---|
| Agent | Rust + Tokio | 高性能采集 |
| 消息队列 | Kafka | 削峰填谷 |
| 索引 | Rust | 自定义倒排索引 |
| 存储 | ClickHouse | 列式存储 |
| API | actix-web | 高并发查询 |
第二章:高性能采集端 ⚡
2.1 核心数据结构
use serde::{Serialize, Deserialize};
use std::time::SystemTime;#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {pub timestamp: u64, // Unix 时间戳(纳秒)pub level: LogLevel, // 日志级别pub service: String, // 服务名pub message: String, // 日志内容pub fields: HashMap<String, String>, // 自定义字段
}#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogLevel {Debug,Info,Warn,Error,
}
2.2 批量缓冲
use tokio::sync::Mutex;
use std::sync::Arc;pub struct LogBuffer {buffer: Arc<Mutex<Vec<LogEntry>>>,capacity: usize,flush_interval: Duration,
}impl LogBuffer {pub async fn push(&self, entry: LogEntry) -> Result<()> {let mut buffer = self.buffer.lock().await;buffer.push(entry);// 达到容量立即刷新if buffer.len() >= self.capacity {self.flush_internal(&mut buffer).await?;}Ok(())}async fn flush_internal(&self, buffer: &mut Vec<LogEntry>) -> Result<()> {if buffer.is_empty() {return Ok(());}// 批量发送到 Kafkalet batch = std::mem::take(buffer);self.send_to_kafka(batch).await?;Ok(())}// 定时刷新协程pub async fn start_auto_flush(self: Arc<Self>) {let mut interval = tokio::time::interval(self.flush_interval);loop {interval.tick().await;let mut buffer = self.buffer.lock().await;let _ = self.flush_internal(&mut buffer).await;}}
}
2.3 零拷贝发送
use rdkafka::producer::{FutureProducer, FutureRecord};pub struct KafkaProducer {producer: FutureProducer,topic: String,
}impl KafkaProducer {pub async fn send_batch(&self, logs: Vec<LogEntry>) -> Result<()> {// 序列化为二进制(避免 JSON 开销)let data = bincode::serialize(&logs)?;// 异步发送let record = FutureRecord::to(&self.topic).payload(&data).key("log");self.producer.send(record, Duration::from_secs(0)).await.map_err(|(e, _)| anyhow::anyhow!("Kafka error: {:?}", e))?;Ok(())}
}
性能优化:
- ✅ 批量发送(减少网络往返)
- ✅ 二进制序列化(比 JSON 快 3-5 倍)
- ✅ 异步 I/O(不阻塞业务线程)
第三章:分布式存储 💾
3.1 索引设计
// 倒排索引
pub struct InvertedIndex {// 词 → 文档ID列表term_index: HashMap<String, RoaringBitmap>,// 时间索引time_index: BTreeMap<u64, Vec<u64>>,
}impl InvertedIndex {pub fn index(&mut self, doc_id: u64, log: &LogEntry) {// 分词let tokens = tokenize(&log.message);for token in tokens {self.term_index.entry(token).or_insert_with(RoaringBitmap::new).insert(doc_id as u32);}// 时间索引self.time_index.entry(log.timestamp).or_insert_with(Vec::new).push(doc_id);}pub fn search(&self, query: &str, time_range: Range<u64>) -> Vec<u64> {let tokens = tokenize(query);// 1. 查询词匹配let mut result = RoaringBitmap::full();for token in tokens {if let Some(bitmap) = self.term_index.get(&token) {result &= bitmap;} else {return vec![]; // 有词不存在,返回空}}// 2. 时间范围过滤let time_docs: Vec<u64> = self.time_index.range(time_range).flat_map(|(_, docs)| docs.iter().copied()).collect();// 3. 交集result.iter().filter(|&id| time_docs.contains(&(id as u64))).map(|id| id as u64).collect()}
}
3.2 分片策略
pub struct ShardManager {shards: Vec<Shard>,num_shards: usize,
}impl ShardManager {pub fn get_shard(&self, key: &str) -> &Shard {let hash = hash_key(key);let idx = (hash % self.num_shards as u64) as usize;&self.shards[idx]}pub async fn write(&self, log: LogEntry) -> Result<()> {let shard = self.get_shard(&log.service);shard.write(log).await}pub async fn query(&self, query: &Query) -> Result<Vec<LogEntry>> {// 并行查询所有分片let futures: Vec<_> = self.shards.iter().map(|shard| shard.query(query)).collect();let results = futures::future::join_all(futures).await;// 合并结果Ok(merge_results(results))}
}
3.3 压缩存储
use zstd::stream::encode_all;pub struct CompressedStorage {path: PathBuf,
}impl CompressedStorage {pub async fn write_chunk(&self, logs: Vec<LogEntry>) -> Result<()> {// 序列化let data = bincode::serialize(&logs)?;// 压缩(Zstd 比 Gzip 快 2-3 倍)let compressed = encode_all(&data[..], 3)?;// 写入文件let chunk_id = generate_chunk_id();let path = self.path.join(format!("{}.zst", chunk_id));tokio::fs::write(path, compressed).await?;Ok(())}pub async fn read_chunk(&self, chunk_id: u64) -> Result<Vec<LogEntry>> {let path = self.path.join(format!("{}.zst", chunk_id));let compressed = tokio::fs::read(path).await?;// 解压let data = zstd::stream::decode_all(&compressed[..])?;// 反序列化Ok(bincode::deserialize(&data)?)}
}
压缩效果:
原始大小: 1GB JSON
压缩后: 150MB (压缩比 85%)
第四章:查询优化 🔍
4.1 查询 API
use actix_web::{web, HttpResponse};#[derive(Deserialize)]
pub struct LogQuery {service: Option<String>,level: Option<LogLevel>,keyword: Option<String>,start_time: u64,end_time: u64,limit: usize,
}async fn search_logs(query: web::Json<LogQuery>,storage: web::Data<ShardManager>,
) -> HttpResponse {let results = storage.query(&query.into_inner()).await.unwrap();HttpResponse::Ok().json(results)
}
4.2 缓存层
use moka::future::Cache;pub struct QueryCache {cache: Cache<String, Vec<LogEntry>>,
}impl QueryCache {pub async fn get_or_fetch<F>(&self,key: String,fetcher: F,) -> Result<Vec<LogEntry>>whereF: Future<Output = Result<Vec<LogEntry>>>,{if let Some(cached) = self.cache.get(&key).await {return Ok(cached);}let result = fetcher.await?;self.cache.insert(key, result.clone()).await;Ok(result)}
}
第五章:生产环境部署 🚀
5.1 Docker 部署
FROM rust:1.75 as builder
WORKDIR /app
COPY . .
RUN cargo build --releaseFROM debian:bookworm-slim
COPY --from=builder /app/target/release/log-service /usr/local/bin/
CMD ["log-service"]
5.2 Kubernetes 配置
apiVersion: apps/v1
kind: Deployment
metadata:name: log-indexer
spec:replicas: 3selector:matchLabels:app: log-indexertemplate:metadata:labels:app: log-indexerspec:containers:- name: indexerimage: log-service:latestresources:requests:memory: "2Gi"cpu: "1000m"limits:memory: "4Gi"cpu: "2000m"
性能测试与优化 📈
测试结果
| 指标 | 数值 | 说明 |
|---|---|---|
| 写入吞吐 | 120万 QPS | 10 节点集群 |
| 查询延迟 | P99 < 200ms | 包含全文检索 |
| 存储压缩比 | 85% | Zstd 压缩 |
| 内存占用 | < 2GB/节点 | 稳定运行 |
关键优化
1. 批量处理
// 批量大小:1000-5000
const BATCH_SIZE: usize = 2000;
2. 并发控制
// 使用 Semaphore 限制并发
let sem = Arc::new(Semaphore::new(100));
3. 内存池
use object_pool::Pool;
let buffer_pool = Pool::new(100, || Vec::with_capacity(1024));
总结 🎯
架构亮点:
- ✅ 异步 I/O(Tokio)提升吞吐
- ✅ 批量处理降低网络开销
- ✅ 倒排索引加速检索
- ✅ 分片 + 压缩优化存储
