Rust + PostgreSQL:deadpool 和 diesel 数据库连接池实战(四)
我使用Rust开发RTMate的想法是实现一个提供Websocket连接服务的平台,免去大家自建Websocket服务。在这当中自然要对不同的租户以及不同租户下的客户端进行管理,对客户端需要认证、连接登记、统计。所以我选择PostgreSQL作为底层数据库。操作数据库需要使用数据库连接,数据库连接的创建对机器性能的消耗较昂贵的,自然要使用连接池来管理数据库连接了。Rust 语言中同步与异步并存,在Rust 当中如何构建一个 PostgreSQL 数据库连接池呢?在项目中使用ORM 框架又如何使用连接池中的数据库连接呢?本文来一探究竟。
GitHub地址:RTMate
在开发 RTMate 过程中,意识到每次查询都建立新的数据库连接会带来性能问题,于是开始研究 Rust 里的数据库连接池方案。试了几个库之后,最终选择了 deadpool + diesel 的组合。这里记录下使用过程中遇到的问题和解决方案。
diesel 基础用法回顾
选择 diesel 主要是看中它的类型安全,编译时就能发现 SQL 字段类型错误,比运行时报错强多了。而且 crates.io 官方都在用,应该比较靠谱。
基本用法就不详细说了,官方文档写得挺清楚:https://diesel.rs/guides/getting-started.html
简单回顾下核心概念:
1. Schema 定义(自动生成)
用 diesel CLI 生成 schema,直接读数据库表结构:
diesel print-schema
比如这个简单的 posts 表:
CREATE TABLE posts (id SERIAL PRIMARY KEY,title VARCHAR NOT NULL,body TEXT NOT NULL,published BOOLEAN NOT NULL DEFAULT FALSE
)
生成的 schema 长这样:
// @generated automatically by Diesel CLI.diesel::table! {posts (id) {id -> Int4,title -> Varchar,body -> Text,published -> Bool,}
}
2. Model 定义
use diesel::prelude::*;#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::posts)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Post {pub id: i32,pub title: String,pub body: String,pub published: bool,
}
3. 基础连接和查询
最原始的用法是每次都建连接:
use diesel::prelude::*;
use dotenvy::dotenv;
use std::env;pub fn establish_connection() -> PgConnection {dotenv().ok();let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");PgConnection::establish(&database_url).unwrap_or_else(|_| panic!("Error connecting to {}", database_url))
}
然后就可以直接查询了:
use self::models::*;
use diesel::prelude::*;
use diesel_demo::*;fn main() {use self::schema::posts::dsl::*;let connection = &mut establish_connection();let results = posts.filter(published.eq(true)).limit(5).select(Post::as_select()).load(connection).expect("Error loading posts");println!("Displaying {} posts", results.len());for post in results {println!("{}", post.title);println!("-----------\n");println!("{}", post.body);}
}
连接池的必要性
上面这种每次建连接的方式在开发阶段还行,但生产环境绝对不行。PostgreSQL 创建连接的开销比较大,而且有连接数限制。
每次建立连接都需要经过 TCP 握手、身份验证、参数协商等步骤,在高并发场景下会成为性能瓶颈。而且数据库服务器的最大连接数是有限的,如果每个请求都创建新连接,很容易耗尽连接资源。
所以必须用连接池,让连接可以复用,避免重复的连接创建开销。
deadpool + diesel 连接池实战
研究了几个连接池方案,最终选择 deadpool。主要原因:
- 原生支持异步
- 有专门的 deadpool_diesel 包
- 配置简单,文档还算清楚
先看看我项目中的实际代码,这是客户端连接记录表的定义:
Schema:
diesel::table! {rt_client_connection (id) {id -> Int8,app_id -> Int8,#[max_length = 100]rt_app -> Varchar,#[max_length = 100]client_id -> Varchar,#[max_length = 100]connect_token -> Varchar,used -> Bool,created_time -> Nullable<Timestamptz>,expire_time -> Nullable<Timestamptz>,}
}
Model:
#[derive(Queryable, Selectable,Deserialize, Serialize, Debug)]
#[diesel(table_name = rt_client_connection)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct RtClientConnection {pub id: i64,pub app_id: i64,pub rt_app: String,pub client_id: String,pub connect_token: String,pub used: bool,pub created_time: Option<DateTime<Utc>>,pub expire_time: Option<DateTime<Utc>>,
}
连接池封装:
use deadpool_diesel::Runtime;
use deadpool_diesel::postgres::BuildError;
use deadpool_diesel::postgres::Manager;
use deadpool_diesel::postgres::Pool;
use deadpool_diesel::Timeouts;
use deadpool_diesel::postgres::Object;#[derive(Clone)]
pub struct DataSource {pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>>,
}impl DataSource {pub async fn new() -> anyhow::Result<Self> {dotenvy::dotenv().map_err(PoolError::Env)?;let db_config = DbConfig::from_env().map_err(|e| anyhow::anyhow!("Failed to load database configuration: {}", e))?;let manager = Manager::new(db_config.database_url, Runtime::Tokio1);let pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>> = Pool::builder(manager).max_size(db_config.max_connections).timeouts(db_config.timeouts).build().map_err(PoolError::PoolBuildError)?;Ok(DataSource { pool })}pub async fn get_connection(&self) -> anyhow::Result<Object> {let conn = self.pool.get().await?;Ok(conn)}
}
插入操作:
async fn save_connect_token(&self, new_connection: NewRtClientConnection) -> anyhow::Result<()> {let pg_connection = self.data_source.get_connection().await?;pg_connection.interact(move |conn: &mut diesel::PgConnection| {diesel::insert_into(conn_dsl::rt_client_connection).values(&new_connection).execute(conn)}).await.map_err(|e| anyhow::anyhow!("Insert rt app new_connection failed: {}", e))??;Ok(())
}
关键点解析
最开始用这个 interact 方法时完全搞不懂,为什么要这么绕?直接用连接不行吗?
后来才明白原因:
后来才明白原因:
get_connection()从池里拿连接是异步的- diesel 操作本身是同步的,不能直接在异步上下文中使用
interact()把同步操作丢到专门的线程池执行,返回 Future- 用
move把数据移动到线程里,避免生命周期问题
那个双重 ? 也踩过坑:第一个 ? 处理 interact 自身的错误(比如线程池炸了),第二个 ? 处理 diesel 操作的错误(比如 SQL 执行失败)。
查询操作
/// 根据 connect_token 查询还未创建成功的 RtClientConnection token
async fn get_rt_client_connection_by_token(&self, query_connect_token: &str) -> anyhow::Result<Option<RtClientConnection>> {let pg_connection = self.get_connection().await?;let connect_token_query = query_connect_token.to_owned();use rtmate_common::schema::rt_client_connection::dsl::*;let result = pg_connection.interact(move |conn: &mut diesel::PgConnection| {rt_client_connection.filter(connect_token.eq(connect_token_query)).filter(used.eq(false)).select(RtClientConnection::as_select()).first::<RtClientConnection>(conn).optional()}).await.map_err(|e| anyhow::anyhow!("Query failed: {}", e))??;Ok(result)
}
注意这里用了 query_connect_token.to_owned(),因为要把数据移动到另一个线程,不能用引用。
连接池配置
这块配置我调了好久,主要是通过环境变量加载配置:
#[derive(Debug, Deserialize)]
struct DbConfig {pub database_url: String,#[serde(default = "default_max_connections")]pub max_connections: usize,#[serde(default = "default_connect_timeout")]pub connect_timeout: u64,#[serde(rename = "test_query", default = "default_test_query")]pub test_query: String,#[serde(flatten)]pub timeouts: Timeouts,
}fn default_max_connections() -> usize {5
}fn default_connect_timeout() -> u64 {10
}fn default_test_query() -> String {"SELECT 1".to_string()
}impl DbConfig {pub fn from_env() -> anyhow::Result<Self, ConfigError> {config::Config::builder().add_source(config::Environment::default()).build().unwrap().try_deserialize()}
}// 创建连接池
let manager = Manager::new(db_config.database_url, Runtime::Tokio1);
let pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>> = Pool::builder(manager).max_size(db_config.max_connections).timeouts(db_config.timeouts).build().map_err(PoolError::PoolBuildError)?;
几个重要参数:
max_size:最大连接数,根据数据库配置和并发量调整(项目中默认是5)timeouts:包含各种超时配置的结构体,通过环境变量配置database_url:数据库连接URL,从环境变量读取Runtime::Tokio1:指定异步运行时为Tokio
注意项目中使用了配置结构体和环境变量,而不是硬编码参数值。这样更灵活,可以根据不同环境调整配置。
小结
deadpool + diesel 这套组合确实不错:
- 性能提升明显,特别是高并发场景
- diesel 的类型安全很实用,编译期就能发现 SQL 问题
- deadpool 的异步支持让整个架构更现代化
- 配置相对简单,文档也还行
如果你也在用 Rust 写需要数据库的服务,可以试试这个组合。虽然学习成本比直接用连接稍高一些,但性能收益还是很值得的。
