OceanBase在.NET开发操作指南
文章目录
- OceanBase与.NET开发概述
- OceanBase特点
- .NET开发选择
- 环境配置与连接管理
- 安装驱动
- 基础连接配置
- 连接池配置
- 多租户连接配置
- 基础CRUD操作
- 创建表
- 插入数据
- 查询数据
- 更新数据
- 删除数据
- 事务处理
- 基础事务示例
- 分布式事务处理
- 批量操作与性能优化
- 批量插入
- 使用MySqlBulkLoader
- 批量更新
- 存储过程与函数调用
- 创建存储过程
- 调用存储过程
- 使用输出参数
- 数据类型映射与处理
- .NET与OceanBase类型映射
- 特殊类型处理
- 高级查询技术
- 分页查询
- 窗口函数
- 全文检索
- JSON数据处理
- JSON查询与修改
- JSON数组处理
- 分布式事务处理
- 跨分区事务
- 两阶段提交(2PC)模式
- 性能监控与调优
- 查询执行计划分析
- 性能监控查询
- 连接池监控
- 异常处理与日志记录
- 常见异常处理
- 重试机制实现
- 安全最佳实践
- 参数化查询防止SQL注入
- 使用最小权限原则
- 敏感数据加密
- ORM集成
- 使用Dapper进行数据访问
- 使用Entity Framework Core
- 实战案例
- 电商系统数据访问层实现
- 报表生成服务
- 总结
OceanBase与.NET开发概述
OceanBase是阿里巴巴集团自主研发的分布式关系型数据库,具有高可用、高性能、高扩展等特点。在.NET环境下开发OceanBase应用,主要通过ADO.NET接口或ORM框架实现。
OceanBase特点
- 分布式架构:支持水平扩展,数据自动分片
- 高可用性:基于Paxos协议的多副本一致性
- 兼容性:兼容MySQL协议,支持大部分MySQL语法
- HTAP能力:同时支持OLTP和OLAP场景
.NET开发选择
- ADO.NET:使用MySQL官方提供的Connector/NET
- ORM框架:Entity Framework Core、Dapper等
- 第三方驱动:如MySqlConnector等替代驱动
环境配置与连接管理
安装驱动
# 使用NuGet安装MySQL官方驱动
Install-Package MySql.Data
基础连接配置
using MySql.Data.MySqlClient;string connectionString = "server=obproxy.mydomain.com;port=2883;user=root;password=yourpassword;database=test;";
using (var connection = new MySqlConnection(connectionString))
{try{connection.Open();Console.WriteLine("连接OceanBase成功!");}catch (Exception ex){Console.WriteLine($"连接失败: {ex.Message}");}
}
连接池配置
string connectionString = "server=obproxy.mydomain.com;port=2883;user=root;password=yourpassword;database=test;" +"Pooling=true;MinimumPoolSize=5;MaximumPoolSize=100;ConnectionTimeout=30;";// 高级连接池配置示例
var builder = new MySqlConnectionStringBuilder(connectionString)
{Pooling = true,MinimumPoolSize = 10,MaximumPoolSize = 200,ConnectionIdleTimeout = 300,ConnectionLifeTime = 1800
};
多租户连接配置
OceanBase支持多租户架构,连接时需要指定租户:
string tenantConnectionString = "server=obproxy.mydomain.com;port=2883;user=user@tenant;password=password;database=test;";
基础CRUD操作
创建表
using (var connection = new MySqlConnection(connectionString))
{connection.Open();var createTableSql = @"CREATE TABLE IF NOT EXISTS employees (id BIGINT PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE,salary DECIMAL(15,2),hire_date DATETIME,department_id INT,INDEX idx_department (department_id),INDEX idx_name (name)) PARTITION BY HASH(id) PARTITIONS 16;";using (var command = new MySqlCommand(createTableSql, connection)){command.ExecuteNonQuery();Console.WriteLine("表创建成功");}
}
插入数据
// 单条插入
var insertSql = "INSERT INTO employees (id, name, email, salary, hire_date, department_id) " +"VALUES (@id, @name, @email, @salary, @hire_date, @department_id)";using (var command = new MySqlCommand(insertSql, connection))
{command.Parameters.AddWithValue("@id", 1);command.Parameters.AddWithValue("@name", "张三");command.Parameters.AddWithValue("@email", "zhangsan@example.com");command.Parameters.AddWithValue("@salary", 15000.50m);command.Parameters.AddWithValue("@hire_date", DateTime.Now);command.Parameters.AddWithValue("@department_id", 101);int rowsAffected = command.ExecuteNonQuery();Console.WriteLine($"插入了 {rowsAffected} 行数据");
}
查询数据
// 基础查询
var query = "SELECT id, name, email, salary, hire_date FROM employees WHERE department_id = @deptId";using (var command = new MySqlCommand(query, connection))
{command.Parameters.AddWithValue("@deptId", 101);using (var reader = command.ExecuteReader()){while (reader.Read()){Console.WriteLine($"ID: {reader["id"]}, 姓名: {reader["name"]}, " +$"邮箱: {reader["email"]}, 薪资: {reader.GetDecimal(3)}");}}
}
更新数据
var updateSql = "UPDATE employees SET salary = salary * 1.1 WHERE department_id = @deptId";using (var command = new MySqlCommand(updateSql, connection))
{command.Parameters.AddWithValue("@deptId", 101);int rowsUpdated = command.ExecuteNonQuery();Console.WriteLine($"更新了 {rowsUpdated} 行数据");
}
删除数据
var deleteSql = "DELETE FROM employees WHERE id = @id";using (var command = new MySqlCommand(deleteSql, connection))
{command.Parameters.AddWithValue("@id", 1);int rowsDeleted = command.ExecuteNonQuery();Console.WriteLine($"删除了 {rowsDeleted} 行数据");
}
事务处理
基础事务示例
using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var transaction = connection.BeginTransaction()){try{// 插入员工记录var insertEmp = "INSERT INTO employees (id, name, salary) VALUES (1001, '李四', 20000)";using (var cmd1 = new MySqlCommand(insertEmp, connection, transaction)){cmd1.ExecuteNonQuery();}// 更新部门预算var updateBudget = "UPDATE departments SET budget = budget - 20000 WHERE id = 10";using (var cmd2 = new MySqlCommand(updateBudget, connection, transaction)){int affected = cmd2.ExecuteNonQuery();if (affected == 0){throw new Exception("部门不存在,回滚事务");}}transaction.Commit();Console.WriteLine("事务提交成功");}catch (Exception ex){transaction.Rollback();Console.WriteLine($"事务回滚: {ex.Message}");}}
}
分布式事务处理
OceanBase支持分布式事务,可通过XA协议实现:
// XA事务示例
using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 开启XA事务using (var xaStart = new MySqlCommand("XA START 'transaction1'", connection)){xaStart.ExecuteNonQuery();}try{// 执行多个SQL操作using (var cmd1 = new MySqlCommand("INSERT INTO table1 VALUES (1, 'data1')", connection)){cmd1.ExecuteNonQuery();}using (var cmd2 = new MySqlCommand("UPDATE table2 SET value = 'new' WHERE id = 1", connection)){cmd2.ExecuteNonQuery();}// 准备阶段using (var xaEnd = new MySqlCommand("XA END 'transaction1'", connection))using (var xaPrepare = new MySqlCommand("XA PREPARE 'transaction1'", connection)){xaEnd.ExecuteNonQuery();xaPrepare.ExecuteNonQuery();}// 提交阶段using (var xaCommit = new MySqlCommand("XA COMMIT 'transaction1'", connection)){xaCommit.ExecuteNonQuery();Console.WriteLine("分布式事务提交成功");}}catch (Exception){// 回滚using (var xaRollback = new MySqlCommand("XA ROLLBACK 'transaction1'", connection)){xaRollback.ExecuteNonQuery();Console.WriteLine("分布式事务回滚");}throw;}
}
批量操作与性能优化
批量插入
// 使用Bulk Insert提高性能
var insertSql = "INSERT INTO employees (id, name, email, salary, hire_date, department_id) " +"VALUES (@id, @name, @email, @salary, @hire_date, @department_id)";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var transaction = connection.BeginTransaction())using (var command = new MySqlCommand(insertSql, connection, transaction)){// 添加参数command.Parameters.Add("@id", MySqlDbType.Int64);command.Parameters.Add("@name", MySqlDbType.VarChar);command.Parameters.Add("@email", MySqlDbType.VarChar);command.Parameters.Add("@salary", MySqlDbType.Decimal);command.Parameters.Add("@hire_date", MySqlDbType.DateTime);command.Parameters.Add("@department_id", MySqlDbType.Int32);// 批量添加数据for (int i = 0; i < 1000; i++){command.Parameters["@id"].Value = 2000 + i;command.Parameters["@name"].Value = $"员工_{i}";command.Parameters["@email"].Value = $"user_{i}@company.com";command.Parameters["@salary"].Value = 5000 + (i % 10) * 1000;command.Parameters["@hire_date"].Value = DateTime.Now.AddDays(-i);command.Parameters["@department_id"].Value = 100 + (i % 5);command.ExecuteNonQuery();}transaction.Commit();Console.WriteLine("批量插入完成");}
}
使用MySqlBulkLoader
// 创建CSV文件
File.WriteAllText("employees.csv", "1,张三,zhangsan@example.com,15000.50,2023-01-15,101\n" +"2,李四,lisi@example.com,18000.00,2022-11-20,102");using (var connection = new MySqlConnection(connectionString))
{connection.Open();var bulkLoader = new MySqlBulkLoader(connection){TableName = "employees",FieldTerminator = ",",LineTerminator = "\n",FileName = "employees.csv",NumberOfLinesToSkip = 0,Columns = { "id", "name", "email", "salary", "hire_date", "department_id" }};int count = bulkLoader.Load();Console.WriteLine($"通过BulkLoader插入了 {count} 行数据");
}
批量更新
// 使用CASE WHEN实现批量更新
var updateSql = @"
UPDATE employees
SET salary = CASE idWHEN @id1 THEN @salary1WHEN @id2 THEN @salary2WHEN @id3 THEN @salary3
END
WHERE id IN (@id1, @id2, @id3)";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(updateSql, connection)){command.Parameters.AddWithValue("@id1", 101);command.Parameters.AddWithValue("@salary1", 20000);command.Parameters.AddWithValue("@id2", 102);command.Parameters.AddWithValue("@salary2", 22000);command.Parameters.AddWithValue("@id3", 103);command.Parameters.AddWithValue("@salary3", 25000);int rowsUpdated = command.ExecuteNonQuery();Console.WriteLine($"批量更新了 {rowsUpdated} 行数据");}
}
存储过程与函数调用
创建存储过程
var createProcSql = @"
CREATE PROCEDURE increase_salary(IN dept_id INT, IN increase_percent DECIMAL(5,2))
BEGINUPDATE employees SET salary = salary * (1 + increase_percent / 100)WHERE department_id = dept_id;SELECT COUNT(*) AS affected_rows FROM employees WHERE department_id = dept_id;
END";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(createProcSql, connection)){command.ExecuteNonQuery();Console.WriteLine("存储过程创建成功");}
}
调用存储过程
using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand("increase_salary", connection)){command.CommandType = CommandType.StoredProcedure;command.Parameters.AddWithValue("@dept_id", 101);command.Parameters.AddWithValue("@increase_percent", 5.0m);using (var reader = command.ExecuteReader()){while (reader.Read()){Console.WriteLine($"影响了 {reader["affected_rows"]} 行数据");}}}
}
使用输出参数
var createProcWithOutput = @"
CREATE PROCEDURE get_employee_stats(IN dept_id INT, OUT avg_salary DECIMAL(15,2), OUT max_salary DECIMAL(15,2))
BEGINSELECT AVG(salary), MAX(salary) INTO avg_salary, max_salaryFROM employees WHERE department_id = dept_id;
END";// 先创建存储过程
using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(createProcWithOutput, connection)){command.ExecuteNonQuery();}
}// 调用带输出参数的存储过程
using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand("get_employee_stats", connection)){command.CommandType = CommandType.StoredProcedure;command.Parameters.AddWithValue("@dept_id", 101);var avgSalaryParam = new MySqlParameter("@avg_salary", MySqlDbType.Decimal){Direction = ParameterDirection.Output,Precision = 15,Scale = 2};command.Parameters.Add(avgSalaryParam);var maxSalaryParam = new MySqlParameter("@max_salary", MySqlDbType.Decimal){Direction = ParameterDirection.Output,Precision = 15,Scale = 2};command.Parameters.Add(maxSalaryParam);command.ExecuteNonQuery();Console.WriteLine($"部门101的平均薪资: {avgSalaryParam.Value}, 最高薪资: {maxSalaryParam.Value}");}
}
数据类型映射与处理
.NET与OceanBase类型映射
.NET类型 | OceanBase类型 | MySqlDbType枚举 |
---|---|---|
int | INT | Int32 |
long | BIGINT | Int64 |
decimal | DECIMAL | Decimal |
float | FLOAT | Float |
double | DOUBLE | Double |
string | VARCHAR, CHAR | VarChar, String |
DateTime | DATETIME | DateTime |
bool | TINYINT(1) | Bool |
byte[] | BLOB | Blob |
特殊类型处理
JSON类型处理
// 创建包含JSON列的表
var createTableWithJson = @"
CREATE TABLE IF NOT EXISTS products (id BIGINT PRIMARY KEY,name VARCHAR(100),attributes JSON,created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)";// 插入JSON数据
var insertJson = "INSERT INTO products (id, name, attributes) VALUES (1, '智能手机', @attributes)";using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 创建表using (var cmdCreate = new MySqlCommand(createTableWithJson, connection)){cmdCreate.ExecuteNonQuery();}// 插入JSON数据using (var cmdInsert = new MySqlCommand(insertJson, connection)){var json = new {color = "黑色",memory = "128GB",camera = "48MP",features = new[] { "防水", "面部识别" }};cmdInsert.Parameters.AddWithValue("@attributes", JsonConvert.SerializeObject(json));cmdInsert.ExecuteNonQuery();}// 查询JSON数据using (var cmdQuery = new MySqlCommand("SELECT id, name, attributes->'$.color' AS color FROM products", connection))using (var reader = cmdQuery.ExecuteReader()){while (reader.Read()){Console.WriteLine($"产品: {reader["name"]}, 颜色: {reader["color"]}");}}
}
枚举类型处理
// 枚举处理示例
public enum EmployeeStatus { Active, OnLeave, Terminated }var insertWithEnum = "INSERT INTO employees (id, name, status) VALUES (@id, @name, @status)";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(insertWithEnum, connection)){command.Parameters.AddWithValue("@id", 1001);command.Parameters.AddWithValue("@name", "王五");command.Parameters.AddWithValue("@status", EmployeeStatus.Active.ToString());command.ExecuteNonQuery();}
}
高级查询技术
分页查询
// OceanBase分页查询(兼容MySQL语法)
public List<Employee> GetEmployeesPaged(int pageNumber, int pageSize, string sortColumn, bool sortAsc)
{var employees = new List<Employee>();using (var connection = new MySqlConnection(connectionString)){connection.Open();var offset = (pageNumber - 1) * pageSize;var sortDirection = sortAsc ? "ASC" : "DESC";var query = $@"SELECT id, name, email, salary, hire_date, department_idFROM employeesORDER BY {sortColumn} {sortDirection}LIMIT {pageSize} OFFSET {offset}";using (var command = new MySqlCommand(query, connection))using (var reader = command.ExecuteReader()){while (reader.Read()){employees.Add(new Employee{Id = reader.GetInt64("id"),Name = reader.GetString("name"),Email = reader.IsDBNull("email") ? null : reader.GetString("email"),Salary = reader.GetDecimal("salary"),HireDate = reader.GetDateTime("hire_date"),DepartmentId = reader.GetInt32("department_id")});}}}return employees;
}
窗口函数
// 使用窗口函数计算排名
var windowQuery = @"
SELECT id, name, salary,department_id,RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) AS dept_salary_rank,salary - LAG(salary, 1, 0) OVER (PARTITION BY department_id ORDER BY salary) AS salary_diff
FROM employees
ORDER BY department_id, salary DESC";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(windowQuery, connection))using (var reader = command.ExecuteReader()){Console.WriteLine("ID\t姓名\t薪资\t部门\t部门排名\t薪资差");while (reader.Read()){Console.WriteLine($"{reader["id"]}\t{reader["name"]}\t{reader["salary"]}\t" +$"{reader["department_id"]}\t{reader["dept_salary_rank"]}\t" +$"{reader["salary_diff"]}");}}
}
全文检索
// 创建全文索引
var createFulltextIndex = @"
ALTER TABLE employees
ADD FULLTEXT INDEX ft_name_email (name, email)";// 全文检索查询
var fulltextQuery = @"
SELECT id, name, email, MATCH(name, email) AGAINST(@searchTerm IN NATURAL LANGUAGE MODE) AS relevance
FROM employees
WHERE MATCH(name, email) AGAINST(@searchTerm IN NATURAL LANGUAGE MODE)
ORDER BY relevance DESC";using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 创建全文索引using (var cmdIndex = new MySqlCommand(createFulltextIndex, connection)){cmdIndex.ExecuteNonQuery();}// 执行全文检索using (var cmdSearch = new MySqlCommand(fulltextQuery, connection)){cmdSearch.Parameters.AddWithValue("@searchTerm", "张经理 OR 总监");using (var reader = cmdSearch.ExecuteReader()){while (reader.Read()){Console.WriteLine($"匹配结果: {reader["name"]} ({reader["email"]}), 相关度: {reader["relevance"]}");}}}
}
JSON数据处理
OceanBase支持丰富的JSON函数,可以高效处理JSON数据。
JSON查询与修改
// JSON数据查询与修改示例
var jsonOperations = @"
-- 插入JSON数据
INSERT INTO products (id, name, attributes)
VALUES (2, '笔记本电脑', JSON_OBJECT('brand', 'Dell', 'cpu', 'i7', 'ram', '16GB'));-- 查询JSON属性
SELECT id, name, JSON_EXTRACT(attributes, '$.brand') AS brand FROM products;-- 修改JSON属性
UPDATE products
SET attributes = JSON_SET(attributes, '$.ram', '32GB', '$.storage', '1TB SSD')
WHERE id = 2;-- 查询修改后的JSON
SELECT id, name, attributes FROM products WHERE id = 2;";using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 执行多条JSON操作using (var command = new MySqlCommand(jsonOperations, connection)){using (var reader = command.ExecuteReader()){do{while (reader.Read()){if (reader.FieldCount == 3){Console.WriteLine($"产品: {reader["name"]}, 品牌: {reader["brand"]}");}else if (reader.FieldCount == 2){Console.WriteLine($"产品: {reader["name"]}, 完整属性: {reader["attributes"]}");}}} while (reader.NextResult());}}
}
JSON数组处理
// JSON数组处理示例
var jsonArrayQuery = @"
-- 创建包含JSON数组的表
CREATE TABLE IF NOT EXISTS orders (id BIGINT PRIMARY KEY,customer_id BIGINT,items JSON,order_date DATETIME
);-- 插入包含数组的JSON数据
INSERT INTO orders (id, customer_id, items, order_date)
VALUES (1, 1001, JSON_ARRAY(JSON_OBJECT('product_id', 101, 'quantity', 2, 'price', 5999),JSON_OBJECT('product_id', 205, 'quantity', 1, 'price', 1299)),NOW());-- 查询JSON数组
SELECT id,customer_id,JSON_LENGTH(items) AS item_count,JSON_EXTRACT(items, '$[0].product_id') AS first_product_id,JSON_SEARCH(items, 'one', 205, NULL, '$[*].product_id') AS item_path
FROM orders;";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(jsonArrayQuery, connection)){using (var reader = command.ExecuteReader()){while (reader.Read()){Console.WriteLine($"订单ID: {reader["id"]}, 商品数量: {reader["item_count"]}, " +$"第一个商品ID: {reader["first_product_id"]}, " +$"商品205路径: {reader["item_path"]}");}}}
}
分布式事务处理
OceanBase作为分布式数据库,提供了强大的分布式事务支持。
跨分区事务
// 跨分区事务示例
using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var transaction = connection.BeginTransaction()){try{// 操作分区表1var updateAccount = @"UPDATE accounts SET balance = balance - 1000 WHERE account_id = 'A1001'";using (var cmd1 = new MySqlCommand(updateAccount, connection, transaction)){int affected = cmd1.ExecuteNonQuery();if (affected == 0){throw new Exception("账户A1001不存在或余额不足");}}// 操作分区表2var insertTransaction = @"INSERT INTO transactions (txn_id, from_account, to_account, amount, txn_time)VALUES (UUID(), 'A1001', 'B2001', 1000, NOW())";using (var cmd2 = new MySqlCommand(insertTransaction, connection, transaction)){cmd2.ExecuteNonQuery();}// 操作分区表3var updateTargetAccount = @"UPDATE accounts SET balance = balance + 1000 WHERE account_id = 'B2001'";using (var cmd3 = new MySqlCommand(updateTargetAccount, connection, transaction)){int affected = cmd3.ExecuteNonQuery();if (affected == 0){throw new Exception("目标账户B2001不存在");}}transaction.Commit();Console.WriteLine("分布式事务提交成功");}catch (Exception ex){transaction.Rollback();Console.WriteLine($"分布式事务回滚: {ex.Message}");throw;}}
}
两阶段提交(2PC)模式
// 两阶段提交示例
using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 第一阶段:准备using (var cmdPrepare1 = new MySqlCommand("XA START 'txn123'", connection))using (var cmdPrepare2 = new MySqlCommand("UPDATE accounts SET balance = balance - 500 WHERE account_id = 'A1001'", connection))using (var cmdPrepare3 = new MySqlCommand("XA END 'txn123'", connection))using (var cmdPrepare4 = new MySqlCommand("XA PREPARE 'txn123'", connection)){cmdPrepare1.ExecuteNonQuery();cmdPrepare2.ExecuteNonQuery();cmdPrepare3.ExecuteNonQuery();cmdPrepare4.ExecuteNonQuery();}try{// 第二阶段:提交using (var cmdCommit = new MySqlCommand("XA COMMIT 'txn123'", connection)){cmdCommit.ExecuteNonQuery();Console.WriteLine("两阶段提交完成");}}catch (Exception){// 如果提交失败,尝试回滚using (var cmdRollback = new MySqlCommand("XA ROLLBACK 'txn123'", connection)){cmdRollback.ExecuteNonQuery();Console.WriteLine("两阶段提交失败,已回滚");}throw;}
}
性能监控与调优
查询执行计划分析
// 获取查询执行计划
var explainQuery = "EXPLAIN SELECT * FROM employees WHERE department_id = 101 AND salary > 10000";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(explainQuery, connection))using (var reader = command.ExecuteReader()){Console.WriteLine("执行计划分析结果:");Console.WriteLine("ID\tSELECT_TYPE\tTABLE\tTYPE\tPOSSIBLE_KEYS\tKEY\tKEY_LEN\tREF\tROWS\tEXTRA");while (reader.Read()){Console.WriteLine($"{reader["id"]}\t{reader["select_type"]}\t" +$"{reader["table"]}\t{reader["type"]}\t" +$"{reader["possible_keys"]}\t{reader["key"]}\t" +$"{reader["key_len"]}\t{reader["ref"]}\t" +$"{reader["rows"]}\t{reader["Extra"]}");}}
}
性能监控查询
// 查询OceanBase性能视图
var performanceQueries = @"
-- 查看当前会话
SHOW PROCESSLIST;-- 查看表统计信息
SHOW TABLE STATUS LIKE 'employees';-- 查看索引统计信息
SHOW INDEX FROM employees;-- 查看OceanBase系统视图
SELECT * FROM oceanbase.v$sql_audit ORDER BY request_time DESC LIMIT 10;";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(performanceQueries, connection)){using (var reader = command.ExecuteReader()){do{Console.WriteLine($"--- {command.CommandText.Substring(0, 30)}... ---");for (int i = 0; i < reader.FieldCount; i++){Console.Write($"{reader.GetName(i)}\t");}Console.WriteLine();while (reader.Read()){for (int i = 0; i < reader.FieldCount; i++){Console.Write($"{reader[i]}\t");}Console.WriteLine();}Console.WriteLine();} while (reader.NextResult());}}
}
连接池监控
// 获取连接池状态
var poolStats = typeof(MySqlConnection).GetProperty("PoolStats", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);if (poolStats != null)
{var stats = poolStats.GetValue(null);Console.WriteLine("连接池状态:");Console.WriteLine($"Number of pools: {stats.GetType().GetProperty("NumberOfPools").GetValue(stats)}");Console.WriteLine($"Available connections: {stats.GetType().GetProperty("Available").GetValue(stats)}");Console.WriteLine($"Connections in use: {stats.GetType().GetProperty("InUse").GetValue(stats)}");
}
异常处理与日志记录
常见异常处理
try
{using (var connection = new MySqlConnection(connectionString)){connection.Open();var query = "SELECT * FROM non_existing_table";using (var command = new MySqlCommand(query, connection)){command.ExecuteReader();}}
}
catch (MySqlException ex)
{switch (ex.Number){case 1045: // 访问被拒绝Console.WriteLine("数据库访问被拒绝,请检查用户名和密码");break;case 1049: // 未知数据库Console.WriteLine($"指定的数据库不存在: {ex.Message}");break;case 1146: // 表不存在Console.WriteLine($"表不存在: {ex.Message}");// 尝试创建表或提示用户break;case 1213: // 死锁Console.WriteLine("发生死锁,请重试操作");break;case 2006: // 服务器已断开连接Console.WriteLine("与数据库的连接已断开,尝试重新连接");break;case 2013: // 查询期间丢失连接Console.WriteLine("查询期间连接丢失,请检查网络或重试");break;default:Console.WriteLine($"数据库错误({ex.Number}): {ex.Message}");break;}// 记录完整异常信息LogError(ex);
}
catch (Exception ex)
{Console.WriteLine($"发生意外错误: {ex.Message}");LogError(ex);
}void LogError(Exception ex)
{// 实际项目中可以使用Log4Net、NLog等日志框架string logMessage = $"[{DateTime.Now}] ERROR: {ex.GetType().Name}\n" +$"Message: {ex.Message}\n" +$"Stack Trace:\n{ex.StackTrace}\n";if (ex is MySqlException mySqlEx){logMessage += $"MySQL Error Number: {mySqlEx.Number}\n";}File.AppendAllText("db_errors.log", logMessage + new string('-', 50) + "\n");
}
重试机制实现
public T ExecuteWithRetry<T>(Func<T> operation, int maxRetries = 3, int delayMs = 1000)
{int retryCount = 0;while (true){try{return operation();}catch (MySqlException ex) when (IsTransientError(ex) && retryCount < maxRetries){retryCount++;Console.WriteLine($"遇到临时错误,正在进行第 {retryCount} 次重试: {ex.Message}");Thread.Sleep(delayMs * retryCount); // 指数退避}catch (Exception){throw;}}
}private bool IsTransientError(MySqlException ex)
{// 定义需要重试的错误代码int[] transientErrorNumbers = { 1213, 1205, 2006, 2013, 1040, 1317 };// 连接超时或死锁等临时性错误return transientErrorNumbers.Contains(ex.Number) || ex.Message.Contains("deadlock") ||ex.Message.Contains("timeout");
}// 使用重试机制
var result = ExecuteWithRetry(() => {using (var connection = new MySqlConnection(connectionString)){connection.Open();using (var cmd = new MySqlCommand("SELECT COUNT(*) FROM employees", connection)){return Convert.ToInt32(cmd.ExecuteScalar());}}
});
Console.WriteLine($"员工总数: {result}");
安全最佳实践
参数化查询防止SQL注入
// 不安全的拼接SQL方式 - 容易受到SQL注入攻击
var unsafeQuery = $"SELECT * FROM users WHERE username = '{userInput}' AND password = '{passwordInput}'";// 安全的参数化查询方式
var safeQuery = "SELECT * FROM users WHERE username = @username AND password = @password";using (var connection = new MySqlConnection(connectionString))
{connection.Open();using (var command = new MySqlCommand(safeQuery, connection)){command.Parameters.AddWithValue("@username", userInput);command.Parameters.AddWithValue("@password", passwordInput);using (var reader = command.ExecuteReader()){if (reader.HasRows){Console.WriteLine("登录成功");}else{Console.WriteLine("用户名或密码错误");}}}
}
使用最小权限原则
// 为应用创建专用用户并授予最小必要权限
var createAppUser = @"
-- 创建只读用户
CREATE USER 'app_readonly'@'%' IDENTIFIED BY 'StrongPassword123!';
GRANT SELECT ON mydb.* TO 'app_readonly'@'%';-- 创建读写用户
CREATE USER 'app_readwrite'@'%' IDENTIFIED BY 'EvenStrongerPassword456!';
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'app_readwrite'@'%';-- 创建仅能访问特定表的用户
CREATE USER 'report_user'@'%' IDENTIFIED BY 'ReportPassword789!';
GRANT SELECT ON mydb.sales_data TO 'report_user'@'%';
GRANT SELECT ON mydb.products TO 'report_user'@'%';";using (var connection = new MySqlConnection(connectionString))
{connection.Open();foreach (var statement in createAppUser.Split(';')){if (!string.IsNullOrWhiteSpace(statement)){using (var command = new MySqlCommand(statement, connection)){command.ExecuteNonQuery();}}}Console.WriteLine("安全用户创建完成");
}
敏感数据加密
// 使用AES加密敏感数据
public static string Encrypt(string plainText, string key)
{using (Aes aes = Aes.Create()){aes.Key = Encoding.UTF8.GetBytes(key);aes.IV = new byte[16]; // 在实际应用中应使用随机IVICryptoTransform encryptor = aes.CreateEncryptor(aes.Key, aes.IV);using (MemoryStream ms = new MemoryStream())using (CryptoStream cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write)){using (StreamWriter sw = new StreamWriter(cs)){sw.Write(plainText);}return Convert.ToBase64String(ms.ToArray());}}
}// 在数据库中存储加密数据
var insertSensitiveData = "INSERT INTO customers (id, name, encrypted_credit_card) VALUES (@id, @name, @card)";using (var connection = new MySqlConnection(connectionString))
{connection.Open();string creditCardNumber = "4111111111111111";string encryptionKey = "my-secret-key-123"; // 实际应用中应从安全的地方获取using (var command = new MySqlCommand(insertSensitiveData, connection)){command.Parameters.AddWithValue("@id", 1001);command.Parameters.AddWithValue("@name", "张三");command.Parameters.AddWithValue("@card", Encrypt(creditCardNumber, encryptionKey));command.ExecuteNonQuery();Console.WriteLine("加密数据存储完成");}
}
ORM集成
使用Dapper进行数据访问
// 安装Dapper
// Install-Package Dapperpublic class Employee
{public long Id { get; set; }public string Name { get; set; }public string Email { get; set; }public decimal Salary { get; set; }public DateTime HireDate { get; set; }public int DepartmentId { get; set; }
}using Dapper;// 查询示例
using (var connection = new MySqlConnection(connectionString))
{connection.Open();// 简单查询var employees = connection.Query<Employee>("SELECT * FROM employees WHERE department_id = @deptId",new { deptId = 101 });foreach (var emp in employees){Console.WriteLine($"{emp.Id}: {emp.Name}, {emp.Salary:C}");}// 多映射查询var sql = @"SELECT e.*, d.name AS department_name FROM employees e JOIN departments d ON e.department_id = d.idWHERE e.salary > @minSalary";var results = connection.Query<Employee, string, dynamic>(sql,(employee, departmentName) => {return new { Employee = employee, Department = departmentName };},new { minSalary = 10000 },splitOn: "department_name");foreach (var item in results){Console.WriteLine($"{item.Employee.Name} 在 {item.Department} 部门");}// 执行存储过程var parameters = new DynamicParameters();parameters.Add("@dept_id", 101);parameters.Add("@increase_percent", 5.0m);parameters.Add("@affected_rows", dbType: DbType.Int32, direction: ParameterDirection.Output);connection.Execute("increase_salary", parameters, commandType: CommandType.StoredProcedure);Console.WriteLine($"影响了 {parameters.Get<int>("@affected_rows")} 行数据");
}
使用Entity Framework Core
// 安装EF Core和相关包
// Install-Package Microsoft.EntityFrameworkCore
// Install-Package Pomelo.EntityFrameworkCore.MySqlpublic class AppDbContext : DbContext
{public DbSet<Employee> Employees { get; set; }public DbSet<Department> Departments { get; set; }protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString),options => {options.EnableRetryOnFailure(maxRetryCount: 5,maxRetryDelay: TimeSpan.FromSeconds(30),errorNumbersToAdd: null);});}protected override void OnModelCreating(ModelBuilder modelBuilder){// 配置实体modelBuilder.Entity<Employee>(entity =>{entity.ToTable("employees");entity.HasKey(e => e.Id);entity.Property(e => e.Name).IsRequired().HasMaxLength(100);entity.Property(e => e.Email).HasMaxLength(100);entity.Property(e => e.Salary).HasColumnType("DECIMAL(15,2)");entity.HasIndex(e => e.DepartmentId).HasDatabaseName("idx_department");entity.HasOne<Department>().WithMany().HasForeignKey(e => e.DepartmentId);});modelBuilder.Entity<Department>(entity =>{entity.ToTable("departments");entity.HasKey(d => d.Id);entity.Property(d => d.Name).IsRequired().HasMaxLength(50);});}
}// 使用EF Core进行CRUD操作
using (var context = new AppDbContext())
{// 添加数据var newDept = new Department { Name = "研发部" };context.Departments.Add(newDept);var newEmp = new Employee { Name = "赵六", Email = "zhaoliu@example.com",Salary = 18000,HireDate = DateTime.Now,Department = newDept};context.Employees.Add(newEmp);context.SaveChanges();// 查询数据var highSalaryEmps = context.Employees.Where(e => e.Salary > 15000).Include(e => e.Department).ToList();foreach (var emp in highSalaryEmps){Console.WriteLine($"{emp.Name} 在 {emp.Department?.Name} 部门,薪资 {emp.Salary:C}");}// 更新数据var empToUpdate = context.Employees.Find(1001L);if (empToUpdate != null){empToUpdate.Salary *= 1.1m;context.SaveChanges();}// 删除数据var empToDelete = context.Employees.Find(1002L);if (empToDelete != null){context.Employees.Remove(empToDelete);context.SaveChanges();}
}
实战案例
电商系统数据访问层实现
public interface IProductRepository
{Task<Product> GetByIdAsync(long id);Task<IEnumerable<Product>> SearchAsync(string keyword, int page, int pageSize);Task<long> AddAsync(Product product);Task<bool> UpdateAsync(Product product);Task<bool> DeleteAsync(long id);Task<bool> UpdateStockAsync(long productId, int quantityChange);
}public class ProductRepository : IProductRepository
{private readonly string _connectionString;public ProductRepository(string connectionString){_connectionString = connectionString;}public async Task<Product> GetByIdAsync(long id){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var sql = @"SELECT p.*, c.name AS category_name FROM products pLEFT JOIN categories c ON p.category_id = c.idWHERE p.id = @id";var product = await connection.QueryFirstOrDefaultAsync<Product>(sql, new { id });return product;}}public async Task<IEnumerable<Product>> SearchAsync(string keyword, int page, int pageSize){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var offset = (page - 1) * pageSize;var sql = @"SELECT p.*, c.name AS category_name FROM products pLEFT JOIN categories c ON p.category_id = c.idWHERE p.name LIKE @keyword OR p.description LIKE @keywordORDER BY p.idLIMIT @pageSize OFFSET @offset";var products = await connection.QueryAsync<Product>(sql, new { keyword = $"%{keyword}%", pageSize, offset });return products;}}public async Task<long> AddAsync(Product product){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var sql = @"INSERT INTO products (name, description, price, stock, category_id, created_at, updated_at)VALUES (@Name, @Description, @Price, @Stock, @CategoryId, NOW(), NOW());SELECT LAST_INSERT_ID();";var id = await connection.ExecuteScalarAsync<long>(sql, product);return id;}}public async Task<bool> UpdateAsync(Product product){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var sql = @"UPDATE products SET name = @Name,description = @Description,price = @Price,stock = @Stock,category_id = @CategoryId,updated_at = NOW()WHERE id = @Id";var affected = await connection.ExecuteAsync(sql, product);return affected > 0;}}public async Task<bool> DeleteAsync(long id){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var sql = "DELETE FROM products WHERE id = @id";var affected = await connection.ExecuteAsync(sql, new { id });return affected > 0;}}public async Task<bool> UpdateStockAsync(long productId, int quantityChange){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();// 使用事务确保数据一致性using (var transaction = await connection.BeginTransactionAsync()){try{// 检查库存是否足够var currentStock = await connection.ExecuteScalarAsync<int>("SELECT stock FROM products WHERE id = @productId FOR UPDATE",new { productId }, transaction);if (currentStock + quantityChange < 0){throw new InvalidOperationException("库存不足");}// 更新库存var sql = @"UPDATE products SET stock = stock + @quantityChange, updated_at = NOW()WHERE id = @productId";var affected = await connection.ExecuteAsync(sql, new { productId, quantityChange }, transaction);await transaction.CommitAsync();return affected > 0;}catch{await transaction.RollbackAsync();throw;}}}}
}
报表生成服务
public class ReportService
{private readonly string _connectionString;public ReportService(string connectionString){_connectionString = connectionString;}public async Task<SalesReport> GenerateSalesReport(DateTime startDate, DateTime endDate){using (var connection = new MySqlConnection(_connectionString)){await connection.OpenAsync();var report = new SalesReport{StartDate = startDate,EndDate = endDate,GenerationDate = DateTime.Now};// 获取总销售额var totalSalesSql = @"SELECT IFNULL(SUM(amount), 0) FROM orders WHERE order_date BETWEEN @startDate AND @endDate AND status = 'completed'";report.TotalSales = await connection.ExecuteScalarAsync<decimal>(totalSalesSql, new { startDate, endDate });// 按产品类别统计var byCategorySql = @"SELECT c.name AS category,COUNT(o.id) AS order_count,SUM(o.amount) AS total_amountFROM orders oJOIN order_items oi ON o.id = oi.order_idJOIN products p ON oi.product_id = p.idJOIN categories c ON p.category_id = c.idWHERE o.order_date BETWEEN @startDate AND @endDateAND o.status = 'completed'GROUP BY c.id, c.nameORDER BY total_amount DESC";report.SalesByCategory = (await connection.QueryAsync<CategorySales>(byCategorySql, new { startDate, endDate })).ToList();// 按地区统计var byRegionSql = @"SELECT r.name AS region,COUNT(o.id) AS order_count,SUM(o.amount) AS total_amountFROM orders oJOIN customers c ON o.customer_id = c.idJOIN regions r ON c.region_id = r.idWHERE o.order_date BETWEEN @startDate AND @endDateAND o.status = 'completed'GROUP BY r.id, r.nameORDER BY total_amount DESC";report.SalesByRegion = (await connection.QueryAsync<RegionSales>(byRegionSql, new { startDate, endDate })).ToList();// 销售趋势(按天)var trendSql = @"SELECT DATE(order_date) AS day,COUNT(id) AS order_count,SUM(amount) AS daily_salesFROM ordersWHERE order_date BETWEEN @startDate AND @endDateAND status = 'completed'GROUP BY DATE(order_date)ORDER BY day";report.DailyTrends = (await connection.QueryAsync<DailySalesTrend>(trendSql, new { startDate, endDate })).ToList();return report;}}public async Task<byte[]> ExportSalesReportToExcel(DateTime startDate, DateTime endDate){var report = await GenerateSalesReport(startDate, endDate);using (var package = new ExcelPackage()){var worksheet = package.Workbook.Worksheets.Add("销售报表");// 添加报表标题worksheet.Cells["A1"].Value = "销售报表";worksheet.Cells["A1:D1"].Merge = true;worksheet.Cells["A1"].Style.Font.Bold = true;worksheet.Cells["A1"].Style.Font.Size = 16;// 添加日期范围worksheet.Cells["A2"].Value = $"日期范围: {startDate:yyyy-MM-dd} 至 {endDate:yyyy-MM-dd}";worksheet.Cells["A2:D2"].Merge = true;// 添加生成日期worksheet.Cells["A3"].Value = $"生成时间: {report.GenerationDate:yyyy-MM-dd HH:mm:ss}";worksheet.Cells["A3:D3"].Merge = true;// 添加总销售额worksheet.Cells["A5"].Value = "总销售额:";worksheet.Cells["B5"].Value = report.TotalSales;worksheet.Cells["B5"].Style.Numberformat.Format = "¥#,##0.00";// 按类别添加销售数据worksheet.Cells["A7"].Value = "按产品类别统计";worksheet.Cells["A7:C7"].Merge = true;worksheet.Cells["A7"].Style.Font.Bold = true;int row = 8;worksheet.Cells[row, 1].Value = "类别";worksheet.Cells[row, 2].Value = "订单数";worksheet.Cells[row, 3].Value = "销售额";worksheet.Cells[row, 1, row, 3].Style.Font.Bold = true;foreach (var item in report.SalesByCategory){row++;worksheet.Cells[row, 1].Value = item.Category;worksheet.Cells[row, 2].Value = item.OrderCount;worksheet.Cells[row, 3].Value = item.TotalAmount;worksheet.Cells[row, 3].Style.Numberformat.Format = "¥#,##0.00";}// 按地区添加销售数据row += 2;worksheet.Cells[row, 1].Value = "按地区统计";worksheet.Cells[row, 3].Merge = true;worksheet.Cells[row, 1].Style.Font.Bold = true;row++;worksheet.Cells[row, 1].Value = "地区";worksheet.Cells[row, 2].Value = "订单数";worksheet.Cells[row, 3].Value = "销售额";worksheet.Cells[row, 1, row, 3].Style.Font.Bold = true;foreach (var item in report.SalesByRegion){row++;worksheet.Cells[row, 1].Value = item.Region;worksheet.Cells[row, 2].Value = item.OrderCount;worksheet.Cells[row, 3].Value = item.TotalAmount;worksheet.Cells[row, 3].Style.Numberformat.Format = "¥#,##0.00";}// 自动调整列宽worksheet.Cells[worksheet.Dimension.Address].AutoFitColumns();return package.GetAsByteArray();}}
}public class SalesReport
{public DateTime StartDate { get; set; }public DateTime EndDate { get; set; }public DateTime GenerationDate { get; set; }public decimal TotalSales { get; set; }public List<CategorySales> SalesByCategory { get; set; }public List<RegionSales> SalesByRegion { get; set; }public List<DailySalesTrend> DailyTrends { get; set; }
}public class CategorySales
{public string Category { get; set; }public int OrderCount { get; set; }public decimal TotalAmount { get; set; }
}public class RegionSales
{public string Region { get; set; }public int OrderCount { get; set; }public decimal TotalAmount { get; set; }
}public class DailySalesTrend
{public DateTime Day { get; set; }public int OrderCount { get; set; }public decimal DailySales { get; set; }
}
总结
本文详细介绍了在.NET环境下开发OceanBase应用程序的各个方面,包括:
- 基础操作:连接管理、CRUD操作、事务处理
- 性能优化:批量操作、连接池、查询优化
- 高级特性:JSON处理、分布式事务、存储过程
- 安全实践:参数化查询、权限控制、数据加密
- ORM集成:Dapper和Entity Framework Core的使用
- 实战案例:电商系统数据访问层和报表服务的实现
OceanBase作为一款高性能分布式数据库,与.NET生态系统的结合可以构建出强大、可靠的企业级应用。通过合理利用OceanBase的特性和.NET的强大功能,开发者可以构建出高性能、高可用的分布式应用系统。
在实际开发中,建议根据具体业务场景选择合适的访问方式,对于性能要求高的场景可以使用原生ADO.NET或Dapper,对于需要快速开发的场景可以使用Entity Framework Core。同时,要特别注意分布式环境下的事务处理和性能优化,充分利用OceanBase的分布式特性。