Go Web 编程快速入门 19 - 附录C:事务与 CRUD(含最佳实践)
数据库事务是确保数据一致性和完整性的重要机制。本章将深入介绍 Go 语言中的事务处理,以及如何实现高质量的 CRUD(创建、读取、更新、删除)操作,并提供生产环境中的最佳实践。
1. 事务基础概念
1.1 ACID 特性
事务必须满足 ACID 特性:
- 原子性(Atomicity): 事务中的所有操作要么全部成功,要么全部失败
- 一致性(Consistency): 事务执行前后,数据库必须处于一致状态
- 隔离性(Isolation): 并发执行的事务之间不能相互干扰
- 持久性(Durability): 事务一旦提交,其结果必须永久保存
1.2 事务隔离级别
package databaseimport ("database/sql""fmt"
)// IsolationLevel 事务隔离级别
type IsolationLevel intconst (ReadUncommitted IsolationLevel = iotaReadCommittedRepeatableReadSerializable
)// String 返回隔离级别的字符串表示
func (il IsolationLevel) String() string {switch il {case ReadUncommitted:return "READ UNCOMMITTED"case ReadCommitted:return "READ COMMITTED"case RepeatableRead:return "REPEATABLE READ"case Serializable:return "SERIALIZABLE"default:return "UNKNOWN"}
}// TransactionManager 事务管理器
type TransactionManager struct {db *sql.DB
}// NewTransactionManager 创建事务管理器
func NewTransactionManager(db *sql.DB) *TransactionManager {return &TransactionManager{db: db}
}// BeginWithIsolation 以指定隔离级别开始事务
func (tm *TransactionManager) BeginWithIsolation(level IsolationLevel) (*sql.Tx, error) {tx, err := tm.db.Begin()if err != nil {return nil, fmt.Errorf("开始事务失败: %w", err)}// 设置隔离级别_, err = tx.Exec(fmt.Sprintf("SET TRANSACTION ISOLATION LEVEL %s", level.String()))if err != nil {tx.Rollback()return nil, fmt.Errorf("设置隔离级别失败: %w", err)}return tx, nil
}
2. 基本事务操作
2.1 事务的创建和管理
package databaseimport ("context""database/sql""fmt""time"
)// Transaction 事务包装器
type Transaction struct {tx *sql.Txcommitted boolrolledBack bool
}// NewTransaction 创建新事务
func NewTransaction(db *sql.DB) (*Transaction, error) {tx, err := db.Begin()if err != nil {return nil, fmt.Errorf("开始事务失败: %w", err)}return &Transaction{tx: tx}, nil
}// NewTransactionWithContext 使用上下文创建事务
func NewTransactionWithContext(ctx context.Context, db *sql.DB) (*Transaction, error) {tx, err := db.BeginTx(ctx, nil)if err != nil {return nil, fmt.Errorf("开始事务失败: %w", err)}return &Transaction{tx: tx}, nil
}// NewTransactionWithOptions 使用选项创建事务
func NewTransactionWithOptions(ctx context.Context, db *sql.DB, opts *sql.TxOptions) (*Transaction, error) {tx, err := db.BeginTx(ctx, opts)if err != nil {return nil, fmt.Errorf("开始事务失败: %w", err)}return &Transaction{tx: tx}, nil
}// Commit 提交事务
func (t *Transaction) Commit() error {if t.committed {return fmt.Errorf("事务已经提交")}if t.rolledBack {return fmt.Errorf("事务已经回滚")}err := t.tx.Commit()if err != nil {return fmt.Errorf("提交事务失败: %w", err)}t.committed = truereturn nil
}// Rollback 回滚事务
func (t *Transaction) Rollback() error {if t.committed {return fmt.Errorf("事务已经提交,无法回滚")}if t.rolledBack {return nil // 已经回滚,不需要重复操作}err := t.tx.Rollback()if err != nil {return fmt.Errorf("回滚事务失败: %w", err)}t.rolledBack = truereturn nil
}// Exec 执行语句
func (t *Transaction) Exec(query string, args ...interface{}) (sql.Result, error) {if t.committed || t.rolledBack {return nil, fmt.Errorf("事务已经结束")}return t.tx.Exec(query, args...)
}// Query 执行查询
func (t *Transaction) Query(query string, args ...interface{}) (*sql.Rows, error) {if t.committed || t.rolledBack {return nil, fmt.Errorf("事务已经结束")}return t.tx.Query(query, args...)
}// QueryRow 执行单行查询
func (t *Transaction) QueryRow(query string, args ...interface{}) *sql.Row {return t.tx.QueryRow(query, args...)
}// Prepare 预编译语句
func (t *Transaction) Prepare(query string) (*sql.Stmt, error) {if t.committed || t.rolledBack {return nil, fmt.Errorf("事务已经结束")}return t.tx.Prepare(query)
}// IsActive 检查事务是否活跃
func (t *Transaction) IsActive() bool {return !t.committed && !t.rolledBack
}
2.2 事务执行器
package databaseimport ("context""database/sql""fmt""time"
)// TransactionExecutor 事务执行器
type TransactionExecutor struct {db *sql.DBtimeout time.Duration
}// NewTransactionExecutor 创建事务执行器
func NewTransactionExecutor(db *sql.DB, timeout time.Duration) *TransactionExecutor {return &TransactionExecutor{db: db,timeout: timeout,}
}// Execute 执行事务
func (te *TransactionExecutor) Execute(fn func(*sql.Tx) error) error {return te.ExecuteWithContext(context.Background(), fn)
}// ExecuteWithContext 使用上下文执行事务
func (te *TransactionExecutor) ExecuteWithContext(ctx context.Context, fn func(*sql.Tx) error) error {// 设置超时if te.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, te.timeout)defer cancel()}// 开始事务tx, err := te.db.BeginTx(ctx, nil)if err != nil {return fmt.Errorf("开始事务失败: %w", err)}// 确保事务会被处理defer func() {if p := recover(); p != nil {tx.Rollback()panic(p) // 重新抛出 panic} else if err != nil {tx.Rollback()} else {err = tx.Commit()}}()// 执行事务函数err = fn(tx)return err
}// ExecuteWithRetry 带重试的事务执行
func (te *TransactionExecutor) ExecuteWithRetry(ctx context.Context,fn func(*sql.Tx) error,maxRetries int,retryDelay time.Duration,
) error {var lastErr errorfor attempt := 0; attempt <= maxRetries; attempt++ {if attempt > 0 {// 等待重试延迟select {case <-ctx.Done():return ctx.Err()case <-time.After(retryDelay):}}err := te.ExecuteWithContext(ctx, fn)if err == nil {return nil}lastErr = err// 检查是否是可重试的错误if !isRetryableError(err) {break}}return fmt.Errorf("事务执行失败,已重试 %d 次: %w", maxRetries, lastErr)
}// isRetryableError 检查错误是否可重试
func isRetryableError(err error) bool {// 这里可以根据具体的数据库错误类型来判断// 例如:死锁、连接超时等errStr := err.Error()// PostgreSQL 死锁错误if contains(errStr, "deadlock detected") {return true}// 连接错误if contains(errStr, "connection") {return true}// 超时错误if contains(errStr, "timeout") {return true}return false
}// contains 检查字符串是否包含子字符串
func contains(s, substr string) bool {return len(s) >= len(substr) && (s == substr || (len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || indexOf(s, substr) >= 0)))
}// indexOf 查找子字符串位置
func indexOf(s, substr string) int {for i := 0; i <= len(s)-len(substr); i++ {if s[i:i+len(substr)] == substr {return i}}return -1
}
3. CRUD 操作实现
3.1 用户管理 CRUD
package databaseimport ("database/sql""fmt""time"
)// User 用户模型
type User struct {ID int `json:"id"`Username string `json:"username"`Email string `json:"email"`Password string `json:"-"` // 不在 JSON 中显示Age int `json:"age"`IsActive bool `json:"is_active"`CreatedAt time.Time `json:"created_at"`UpdatedAt time.Time `json:"updated_at"`
}// CreateUserRequest 创建用户请求
type CreateUserRequest struct {Username string `json:"username" validate:"required,min=3,max=50"`Email string `json:"email" validate:"required,email"`Password string `json:"password" validate:"required,min=6"`Age int `json:"age" validate:"min=1,max=150"`
}// UpdateUserRequest 更新用户请求
type UpdateUserRequest struct {Username *string `json:"username,omitempty" validate:"omitempty,min=3,max=50"`Email *string `json:"email,omitempty" validate:"omitempty,email"`Age *int `json:"age,omitempty" validate:"omitempty,min=1,max=150"`IsActive *bool `json:"is_active,omitempty"`
}// UserRepository 用户仓库接口
type UserRepository interface {Create(req *CreateUserRequest) (*User, error)GetByID(id int) (*User, error)GetByEmail(email string) (*User, error)Update(id int, req *UpdateUserRequest) (*User, error)Delete(id int) errorList(limit, offset int) ([]*User, error)Count() (int, error)
}// userRepository 用户仓库实现
type userRepository struct {db *sql.DBte *TransactionExecutor
}// NewUserRepository 创建用户仓库
func NewUserRepository(db *sql.DB) UserRepository {return &userRepository{db: db,te: NewTransactionExecutor(db, 30*time.Second),}
}// Create 创建用户
func (ur *userRepository) Create(req *CreateUserRequest) (*User, error) {var user *Usererr := ur.te.Execute(func(tx *sql.Tx) error {// 检查用户名是否已存在var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", req.Username).Scan(&exists)if err != nil {return fmt.Errorf("检查用户名失败: %w", err)}if exists {return fmt.Errorf("用户名已存在: %s", req.Username)}// 检查邮箱是否已存在err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)", req.Email).Scan(&exists)if err != nil {return fmt.Errorf("检查邮箱失败: %w", err)}if exists {return fmt.Errorf("邮箱已存在: %s", req.Email)}// 插入新用户query := `INSERT INTO users (username, email, password, age, is_active, created_at, updated_at)VALUES ($1, $2, $3, $4, $5, $6, $7)RETURNING id, username, email, age, is_active, created_at, updated_at`now := time.Now()user = &User{}err = tx.QueryRow(query,req.Username,req.Email,req.Password, // 在实际应用中应该先加密req.Age,true, // 默认激活now,now,).Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {return fmt.Errorf("插入用户失败: %w", err)}return nil})if err != nil {return nil, err}return user, nil
}// GetByID 根据ID获取用户
func (ur *userRepository) GetByID(id int) (*User, error) {query := `SELECT id, username, email, age, is_active, created_at, updated_atFROM usersWHERE id = $1`user := &User{}err := ur.db.QueryRow(query, id).Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return nil, fmt.Errorf("用户不存在: ID=%d", id)}return nil, fmt.Errorf("查询用户失败: %w", err)}return user, nil
}// GetByEmail 根据邮箱获取用户
func (ur *userRepository) GetByEmail(email string) (*User, error) {query := `SELECT id, username, email, age, is_active, created_at, updated_atFROM usersWHERE email = $1`user := &User{}err := ur.db.QueryRow(query, email).Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return nil, fmt.Errorf("用户不存在: Email=%s", email)}return nil, fmt.Errorf("查询用户失败: %w", err)}return user, nil
}// Update 更新用户
func (ur *userRepository) Update(id int, req *UpdateUserRequest) (*User, error) {var user *Usererr := ur.te.Execute(func(tx *sql.Tx) error {// 检查用户是否存在var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", id).Scan(&exists)if err != nil {return fmt.Errorf("检查用户存在性失败: %w", err)}if !exists {return fmt.Errorf("用户不存在: ID=%d", id)}// 构建动态更新语句setParts := []string{}args := []interface{}{}argIndex := 1if req.Username != nil {// 检查用户名是否已被其他用户使用var existingID interr := tx.QueryRow("SELECT id FROM users WHERE username = $1 AND id != $2", *req.Username, id).Scan(&existingID)if err != sql.ErrNoRows {if err == nil {return fmt.Errorf("用户名已被使用: %s", *req.Username)}return fmt.Errorf("检查用户名失败: %w", err)}setParts = append(setParts, fmt.Sprintf("username = $%d", argIndex))args = append(args, *req.Username)argIndex++}if req.Email != nil {// 检查邮箱是否已被其他用户使用var existingID interr := tx.QueryRow("SELECT id FROM users WHERE email = $1 AND id != $2", *req.Email, id).Scan(&existingID)if err != sql.ErrNoRows {if err == nil {return fmt.Errorf("邮箱已被使用: %s", *req.Email)}return fmt.Errorf("检查邮箱失败: %w", err)}setParts = append(setParts, fmt.Sprintf("email = $%d", argIndex))args = append(args, *req.Email)argIndex++}if req.Age != nil {setParts = append(setParts, fmt.Sprintf("age = $%d", argIndex))args = append(args, *req.Age)argIndex++}if req.IsActive != nil {setParts = append(setParts, fmt.Sprintf("is_active = $%d", argIndex))args = append(args, *req.IsActive)argIndex++}if len(setParts) == 0 {return fmt.Errorf("没有要更新的字段")}// 添加 updated_atsetParts = append(setParts, fmt.Sprintf("updated_at = $%d", argIndex))args = append(args, time.Now())argIndex++// 添加 WHERE 条件args = append(args, id)query := fmt.Sprintf(`UPDATE users SET %sWHERE id = $%dRETURNING id, username, email, age, is_active, created_at, updated_at`,joinStrings(setParts, ", "), argIndex)user = &User{}err = tx.QueryRow(query, args...).Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {return fmt.Errorf("更新用户失败: %w", err)}return nil})if err != nil {return nil, err}return user, nil
}// Delete 删除用户
func (ur *userRepository) Delete(id int) error {return ur.te.Execute(func(tx *sql.Tx) error {// 检查用户是否存在var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", id).Scan(&exists)if err != nil {return fmt.Errorf("检查用户存在性失败: %w", err)}if !exists {return fmt.Errorf("用户不存在: ID=%d", id)}// 删除用户result, err := tx.Exec("DELETE FROM users WHERE id = $1", id)if err != nil {return fmt.Errorf("删除用户失败: %w", err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取影响行数失败: %w", err)}if rowsAffected == 0 {return fmt.Errorf("库存不足或商品不存在")}return nil}, 3, time.Second)
}// 4. 使用上下文控制超时
func (tbp *TransactionBestPractices) GoodExample_ContextTimeout() error {ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()tx, err := tbp.db.BeginTx(ctx, nil)if err != nil {return fmt.Errorf("开始事务失败: %w", err)}defer func() {if p := recover(); p != nil {tx.Rollback()panic(p)} else if err != nil {tx.Rollback()} else {err = tx.Commit()}}()// 使用上下文执行查询_, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - 100 WHERE id = 1")if err != nil {return fmt.Errorf("更新账户失败: %w", err)}return nil
}// 5. 批量操作优化
func (tbp *TransactionBestPractices) GoodExample_BatchOperations(userIDs []int) error {if len(userIDs) == 0 {return nil}return tbp.executeTransaction(func(tx *sql.Tx) error {// 使用预编译语句进行批量操作stmt, err := tx.Prepare("UPDATE users SET last_login = $1 WHERE id = $2")if err != nil {return fmt.Errorf("预编译语句失败: %w", err)}defer stmt.Close()now := time.Now()for _, userID := range userIDs {_, err := stmt.Exec(now, userID)if err != nil {return fmt.Errorf("更新用户 %d 失败: %w", userID, err)}}return nil})
}// executeTransaction 执行事务的通用方法
func (tbp *TransactionBestPractices) executeTransaction(fn func(*sql.Tx) error) error {tx, err := tbp.db.Begin()if err != nil {return fmt.Errorf("开始事务失败: %w", err)}defer func() {if p := recover(); p != nil {tx.Rollback()panic(p)} else if err != nil {tx.Rollback()} else {err = tx.Commit()}}()err = fn(tx)return err
}// executeTransactionWithRetry 带重试的事务执行
func (tbp *TransactionBestPractices) executeTransactionWithRetry(fn func(*sql.Tx) error,maxRetries int,retryDelay time.Duration,
) error {var lastErr errorfor attempt := 0; attempt <= maxRetries; attempt++ {if attempt > 0 {time.Sleep(retryDelay)}err := tbp.executeTransaction(fn)if err == nil {return nil}lastErr = err// 检查是否是可重试的错误if !isRetryableError(err) {break}log.Printf("事务执行失败,第 %d 次重试: %v", attempt+1, err)}return fmt.Errorf("事务执行失败,已重试 %d 次: %w", maxRetries, lastErr)
}
5.2 性能优化技巧
package databaseimport ("database/sql""fmt""strings""time"
)// PerformanceOptimizer 性能优化器
type PerformanceOptimizer struct {db *sql.DB
}// NewPerformanceOptimizer 创建性能优化器
func NewPerformanceOptimizer(db *sql.DB) *PerformanceOptimizer {return &PerformanceOptimizer{db: db}
}// 1. 使用批量插入替代逐条插入
func (po *PerformanceOptimizer) BatchInsertUsers(users []User) error {if len(users) == 0 {return nil}// 构建批量插入语句valueStrings := make([]string, 0, len(users))valueArgs := make([]interface{}, 0, len(users)*4)for i, user := range users {valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d)", i*4+1, i*4+2, i*4+3, i*4+4))valueArgs = append(valueArgs, user.Username, user.Email, user.Age, time.Now())}query := fmt.Sprintf("INSERT INTO users (username, email, age, created_at) VALUES %s",strings.Join(valueStrings, ","))tx, err := po.db.Begin()if err != nil {return fmt.Errorf("开始事务失败: %w", err)}defer func() {if err != nil {tx.Rollback()} else {err = tx.Commit()}}()_, err = tx.Exec(query, valueArgs...)if err != nil {return fmt.Errorf("批量插入失败: %w", err)}return nil
}// 2. 使用 UPSERT 操作
func (po *PerformanceOptimizer) UpsertUser(user *User) error {query := `INSERT INTO users (username, email, age, created_at, updated_at)VALUES ($1, $2, $3, $4, $5)ON CONFLICT (email)DO UPDATE SETusername = EXCLUDED.username,age = EXCLUDED.age,updated_at = EXCLUDED.updated_atRETURNING id`now := time.Now()err := po.db.QueryRow(query, user.Username, user.Email, user.Age, now, now).Scan(&user.ID)if err != nil {return fmt.Errorf("UPSERT 用户失败: %w", err)}return nil
}// 3. 分页查询优化
func (po *PerformanceOptimizer) GetUsersWithCursorPagination(cursor int, limit int) ([]*User, error) {// 使用游标分页替代 OFFSET,性能更好query := `SELECT id, username, email, age, is_active, created_at, updated_atFROM usersWHERE id > $1ORDER BY idLIMIT $2`rows, err := po.db.Query(query, cursor, limit)if err != nil {return nil, fmt.Errorf("查询用户失败: %w", err)}defer rows.Close()var users []*Userfor rows.Next() {user := &User{}err := rows.Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {return nil, fmt.Errorf("扫描用户数据失败: %w", err)}users = append(users, user)}return users, rows.Err()
}// 4. 使用索引优化查询
func (po *PerformanceOptimizer) CreateOptimalIndexes() error {indexes := []string{"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email ON users(email)","CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_username ON users(username)","CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_created_at ON users(created_at)","CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_active_created ON users(is_active, created_at) WHERE is_active = true","CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_user_status ON orders(user_id, status)","CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_accounts ON transactions(from_account, to_account)",}for _, indexSQL := range indexes {_, err := po.db.Exec(indexSQL)if err != nil {return fmt.Errorf("创建索引失败: %s, 错误: %w", indexSQL, err)}}return nil
}// 5. 连接池优化
func (po *PerformanceOptimizer) OptimizeConnectionPool() {// 设置最大打开连接数po.db.SetMaxOpenConns(25)// 设置最大空闲连接数po.db.SetMaxIdleConns(5)// 设置连接最大生存时间po.db.SetConnMaxLifetime(5 * time.Minute)// 设置连接最大空闲时间po.db.SetConnMaxIdleTime(1 * time.Minute)
}
5.3 监控和调试
package databaseimport ("context""database/sql""fmt""log""time"
)// TransactionMonitor 事务监控器
type TransactionMonitor struct {db *sql.DB
}// NewTransactionMonitor 创建事务监控器
func NewTransactionMonitor(db *sql.DB) *TransactionMonitor {return &TransactionMonitor{db: db}
}// TransactionStats 事务统计信息
type TransactionStats struct {TotalTransactions int64 `json:"total_transactions"`SuccessfulTransactions int64 `json:"successful_transactions"`FailedTransactions int64 `json:"failed_transactions"`AverageExecutionTime time.Duration `json:"average_execution_time"`LongestTransaction time.Duration `json:"longest_transaction"`ShortestTransaction time.Duration `json:"shortest_transaction"`
}// MonitoredTransaction 被监控的事务
type MonitoredTransaction struct {ID stringStartTime time.TimeEndTime time.TimeSuccess boolError errorDuration time.Duration
}var (transactionStats = &TransactionStats{}transactionLog = make([]*MonitoredTransaction, 0, 1000)
)// ExecuteWithMonitoring 执行带监控的事务
func (tm *TransactionMonitor) ExecuteWithMonitoring(ctx context.Context,name string,fn func(*sql.Tx) error,
) error {startTime := time.Now()txID := fmt.Sprintf("%s_%d", name, startTime.UnixNano())log.Printf("[事务开始] ID: %s, 名称: %s", txID, name)tx, err := tm.db.BeginTx(ctx, nil)if err != nil {tm.recordTransaction(txID, startTime, time.Now(), false, err)return fmt.Errorf("开始事务失败: %w", err)}defer func() {endTime := time.Now()duration := endTime.Sub(startTime)if p := recover(); p != nil {tx.Rollback()err := fmt.Errorf("事务 panic: %v", p)tm.recordTransaction(txID, startTime, endTime, false, err)log.Printf("[事务 Panic] ID: %s, 耗时: %v, 错误: %v", txID, duration, err)panic(p)} else if err != nil {tx.Rollback()tm.recordTransaction(txID, startTime, endTime, false, err)log.Printf("[事务回滚] ID: %s, 耗时: %v, 错误: %v", txID, duration, err)} else {commitErr := tx.Commit()if commitErr != nil {err = commitErrtm.recordTransaction(txID, startTime, endTime, false, err)log.Printf("[事务提交失败] ID: %s, 耗时: %v, 错误: %v", txID, duration, err)} else {tm.recordTransaction(txID, startTime, endTime, true, nil)log.Printf("[事务成功] ID: %s, 耗时: %v", txID, duration)}}}()err = fn(tx)return err
}// recordTransaction 记录事务信息
func (tm *TransactionMonitor) recordTransaction(id string,startTime, endTime time.Time,success bool,err error,
) {duration := endTime.Sub(startTime)// 记录到事务日志transaction := &MonitoredTransaction{ID: id,StartTime: startTime,EndTime: endTime,Success: success,Error: err,Duration: duration,}// 简单的环形缓冲区if len(transactionLog) >= 1000 {transactionLog = transactionLog[1:]}transactionLog = append(transactionLog, transaction)// 更新统计信息transactionStats.TotalTransactions++if success {transactionStats.SuccessfulTransactions++} else {transactionStats.FailedTransactions++}// 更新执行时间统计if transactionStats.TotalTransactions == 1 {transactionStats.AverageExecutionTime = durationtransactionStats.LongestTransaction = durationtransactionStats.ShortestTransaction = duration} else {// 计算平均执行时间totalDuration := transactionStats.AverageExecutionTime * time.Duration(transactionStats.TotalTransactions-1)transactionStats.AverageExecutionTime = (totalDuration + duration) / time.Duration(transactionStats.TotalTransactions)if duration > transactionStats.LongestTransaction {transactionStats.LongestTransaction = duration}if duration < transactionStats.ShortestTransaction {transactionStats.ShortestTransaction = duration}}
}// GetTransactionStats 获取事务统计信息
func (tm *TransactionMonitor) GetTransactionStats() *TransactionStats {return transactionStats
}// GetRecentTransactions 获取最近的事务记录
func (tm *TransactionMonitor) GetRecentTransactions(limit int) []*MonitoredTransaction {if limit <= 0 || limit > len(transactionLog) {limit = len(transactionLog)}start := len(transactionLog) - limitreturn transactionLog[start:]
}// GetFailedTransactions 获取失败的事务
func (tm *TransactionMonitor) GetFailedTransactions(limit int) []*MonitoredTransaction {var failed []*MonitoredTransactionfor i := len(transactionLog) - 1; i >= 0 && len(failed) < limit; i-- {if !transactionLog[i].Success {failed = append(failed, transactionLog[i])}}return failed
}// GetSlowTransactions 获取慢事务
func (tm *TransactionMonitor) GetSlowTransactions(threshold time.Duration, limit int) []*MonitoredTransaction {var slow []*MonitoredTransactionfor i := len(transactionLog) - 1; i >= 0 && len(slow) < limit; i-- {if transactionLog[i].Duration > threshold {slow = append(slow, transactionLog[i])}}return slow
}// AnalyzePerformance 分析性能
func (tm *TransactionMonitor) AnalyzePerformance() {stats := tm.GetTransactionStats()log.Printf("=== 事务性能分析 ===")log.Printf("总事务数: %d", stats.TotalTransactions)log.Printf("成功事务数: %d", stats.SuccessfulTransactions)log.Printf("失败事务数: %d", stats.FailedTransactions)log.Printf("成功率: %.2f%%", float64(stats.SuccessfulTransactions)/float64(stats.TotalTransactions)*100)log.Printf("平均执行时间: %v", stats.AverageExecutionTime)log.Printf("最长执行时间: %v", stats.LongestTransaction)log.Printf("最短执行时间: %v", stats.ShortestTransaction)// 分析慢事务slowTransactions := tm.GetSlowTransactions(time.Second, 10)if len(slowTransactions) > 0 {log.Printf("=== 慢事务分析 (>1s) ===")for _, tx := range slowTransactions {log.Printf("事务 %s: 耗时 %v", tx.ID, tx.Duration)}}// 分析失败事务failedTransactions := tm.GetFailedTransactions(10)if len(failedTransactions) > 0 {log.Printf("=== 失败事务分析 ===")for _, tx := range failedTransactions {log.Printf("事务 %s: 错误 %v", tx.ID, tx.Error)}}
}
6. 完整示例
6.1 电商系统事务处理
package mainimport ("database/sql""fmt""log""time"_ "github.com/lib/pq"
)func main() {// 连接数据库db, err := sql.Open("postgres", "postgres://user:password@localhost/ecommerce?sslmode=disable")if err != nil {log.Fatal("连接数据库失败:", err)}defer db.Close()// 创建表err = createTables(db)if err != nil {log.Fatal("创建表失败:", err)}// 初始化服务userRepo := NewUserRepository(db)orderService := NewOrderService(db)accountService := NewAccountService(db)monitor := NewTransactionMonitor(db)// 示例:创建用户user, err := userRepo.Create(&CreateUserRequest{Username: "john_doe",Email: "john@example.com",Password: "hashed_password",Age: 30,})if err != nil {log.Printf("创建用户失败: %v", err)} else {log.Printf("创建用户成功: %+v", user)}// 示例:创建订单(带监控)err = monitor.ExecuteWithMonitoring(context.Background(),"create_order",func(tx *sql.Tx) error {// 这里可以调用订单服务的方法// 为了简化,直接在这里写逻辑return nil},)if err != nil {log.Printf("创建订单失败: %v", err)}// 分析性能monitor.AnalyzePerformance()
}// createTables 创建数据库表
func createTables(db *sql.DB) error {tables := []string{`CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY,username VARCHAR(50) UNIQUE NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,password VARCHAR(255) NOT NULL,age INTEGER,is_active BOOLEAN DEFAULT true,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)`,`CREATE TABLE IF NOT EXISTS products (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,price DECIMAL(10,2) NOT NULL,stock INTEGER NOT NULL DEFAULT 0,is_active BOOLEAN DEFAULT true,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)`,`CREATE TABLE IF NOT EXISTS orders (id SERIAL PRIMARY KEY,user_id INTEGER REFERENCES users(id),total_amount DECIMAL(10,2) NOT NULL,status VARCHAR(20) DEFAULT 'pending',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)`,`CREATE TABLE IF NOT EXISTS order_items (id SERIAL PRIMARY KEY,order_id INTEGER REFERENCES orders(id),product_id INTEGER REFERENCES products(id),quantity INTEGER NOT NULL,price DECIMAL(10,2) NOT NULL,subtotal DECIMAL(10,2) NOT NULL)`,`CREATE TABLE IF NOT EXISTS accounts (id SERIAL PRIMARY KEY,user_id INTEGER REFERENCES users(id),balance DECIMAL(15,2) DEFAULT 0.00,currency VARCHAR(3) DEFAULT 'USD',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)`,`CREATE TABLE IF NOT EXISTS transactions (id SERIAL PRIMARY KEY,from_account INTEGER REFERENCES accounts(id),to_account INTEGER REFERENCES accounts(id),amount DECIMAL(15,2) NOT NULL,currency VARCHAR(3) NOT NULL,description TEXT,status VARCHAR(20) DEFAULT 'pending',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)`,}for _, table := range tables {_, err := db.Exec(table)if err != nil {return fmt.Errorf("创建表失败: %w", err)}}return nil
}
7. 总结
本章详细介绍了 Go 语言中的事务处理和 CRUD 操作的最佳实践:
7.1 核心概念
- ACID 特性:原子性、一致性、隔离性、持久性
- 事务隔离级别:读未提交、读已提交、可重复读、串行化
- 锁机制:行锁、表锁、共享锁、排他锁
7.2 最佳实践
- 保持事务简短:只包含必要的数据库操作
- 正确的锁定顺序:避免死锁
- 适当的错误处理:区分可重试和不可重试的错误
- 使用上下文控制超时:避免长时间阻塞
- 批量操作优化:提高性能
7.3 性能优化
- 使用批量插入替代逐条插入
- 使用 UPSERT 操作
- 游标分页替代 OFFSET
- 创建合适的索引
- 优化连接池配置
7.4 监控和调试
- 事务执行时间监控
- 失败事务分析
- 慢事务识别
- 性能统计分析
通过遵循这些最佳实践,可以构建高性能、可靠的数据库应用程序,确保数据的一致性和完整性。
}
if rowsAffected == 0 {return fmt.Errorf("删除用户失败,没有行被影响")}return nil
})
}
// List 获取用户列表
func (ur *userRepository) List(limit, offset int) ([]*User, error) {
query := SELECT id, username, email, age, is_active, created_at, updated_at FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2
rows, err := ur.db.Query(query, limit, offset)
if err != nil {return nil, fmt.Errorf("查询用户列表失败: %w", err)
}
defer rows.Close()var users []*User
for rows.Next() {user := &User{}err := rows.Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {return nil, fmt.Errorf("扫描用户数据失败: %w", err)}users = append(users, user)
}if err = rows.Err(); err != nil {return nil, fmt.Errorf("遍历用户数据失败: %w", err)
}return users, nil
}
// Count 获取用户总数
func (ur userRepository) Count() (int, error) {
var count int
err := ur.db.QueryRow("SELECT COUNT() FROM users").Scan(&count)
if err != nil {
return 0, fmt.Errorf(“查询用户总数失败: %w”, err)
}
return count, nil
}
// joinStrings 连接字符串切片
func joinStrings(strs []string, sep string) string {
if len(strs) == 0 {
return “”
}
if len(strs) == 1 {
return strs[0]
}
result := strs[0]
for i := 1; i < len(strs); i++ {result += sep + strs[i]
}
return result
}
### 3.2 批量操作```go
package databaseimport ("database/sql""fmt"
)// BatchUserOperations 批量用户操作
type BatchUserOperations struct {db *sql.DBte *TransactionExecutor
}// NewBatchUserOperations 创建批量用户操作
func NewBatchUserOperations(db *sql.DB) *BatchUserOperations {return &BatchUserOperations{db: db,te: NewTransactionExecutor(db, 60*time.Second), // 批量操作可能需要更长时间}
}// BatchCreate 批量创建用户
func (buo *BatchUserOperations) BatchCreate(requests []*CreateUserRequest) ([]*User, error) {if len(requests) == 0 {return nil, fmt.Errorf("请求列表不能为空")}var users []*Usererr := buo.te.Execute(func(tx *sql.Tx) error {// 预编译插入语句stmt, err := tx.Prepare(`INSERT INTO users (username, email, password, age, is_active, created_at, updated_at)VALUES ($1, $2, $3, $4, $5, $6, $7)RETURNING id, username, email, age, is_active, created_at, updated_at`)if err != nil {return fmt.Errorf("预编译插入语句失败: %w", err)}defer stmt.Close()// 检查重复的用户名和邮箱usernames := make(map[string]bool)emails := make(map[string]bool)for i, req := range requests {if usernames[req.Username] {return fmt.Errorf("第 %d 个请求中的用户名重复: %s", i+1, req.Username)}if emails[req.Email] {return fmt.Errorf("第 %d 个请求中的邮箱重复: %s", i+1, req.Email)}usernames[req.Username] = trueemails[req.Email] = true}// 检查数据库中是否已存在for username := range usernames {var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", username).Scan(&exists)if err != nil {return fmt.Errorf("检查用户名失败: %w", err)}if exists {return fmt.Errorf("用户名已存在: %s", username)}}for email := range emails {var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)", email).Scan(&exists)if err != nil {return fmt.Errorf("检查邮箱失败: %w", err)}if exists {return fmt.Errorf("邮箱已存在: %s", email)}}// 批量插入now := time.Now()for i, req := range requests {user := &User{}err := stmt.QueryRow(req.Username,req.Email,req.Password,req.Age,true,now,now,).Scan(&user.ID,&user.Username,&user.Email,&user.Age,&user.IsActive,&user.CreatedAt,&user.UpdatedAt,)if err != nil {return fmt.Errorf("插入第 %d 个用户失败: %w", i+1, err)}users = append(users, user)}return nil})if err != nil {return nil, err}return users, nil
}// BatchUpdate 批量更新用户
func (buo *BatchUserOperations) BatchUpdate(updates []struct {ID intReq *UpdateUserRequest
}) error {if len(updates) == 0 {return fmt.Errorf("更新列表不能为空")}return buo.te.Execute(func(tx *sql.Tx) error {for i, update := range updates {// 检查用户是否存在var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", update.ID).Scan(&exists)if err != nil {return fmt.Errorf("检查第 %d 个用户存在性失败: %w", i+1, err)}if !exists {return fmt.Errorf("第 %d 个用户不存在: ID=%d", i+1, update.ID)}// 构建更新语句(这里简化处理,实际应用中可以优化)setParts := []string{}args := []interface{}{}argIndex := 1if update.Req.Username != nil {setParts = append(setParts, fmt.Sprintf("username = $%d", argIndex))args = append(args, *update.Req.Username)argIndex++}if update.Req.Email != nil {setParts = append(setParts, fmt.Sprintf("email = $%d", argIndex))args = append(args, *update.Req.Email)argIndex++}if update.Req.Age != nil {setParts = append(setParts, fmt.Sprintf("age = $%d", argIndex))args = append(args, *update.Req.Age)argIndex++}if update.Req.IsActive != nil {setParts = append(setParts, fmt.Sprintf("is_active = $%d", argIndex))args = append(args, *update.Req.IsActive)argIndex++}if len(setParts) == 0 {continue // 跳过没有更新字段的记录}// 添加 updated_atsetParts = append(setParts, fmt.Sprintf("updated_at = $%d", argIndex))args = append(args, time.Now())argIndex++// 添加 WHERE 条件args = append(args, update.ID)query := fmt.Sprintf("UPDATE users SET %s WHERE id = $%d", joinStrings(setParts, ", "), argIndex)result, err := tx.Exec(query, args...)if err != nil {return fmt.Errorf("更新第 %d 个用户失败: %w", i+1, err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取第 %d 个用户影响行数失败: %w", i+1, err)}if rowsAffected == 0 {return fmt.Errorf("第 %d 个用户更新失败,没有行被影响", i+1)}}return nil})
}// BatchDelete 批量删除用户
func (buo *BatchUserOperations) BatchDelete(ids []int) error {if len(ids) == 0 {return fmt.Errorf("ID列表不能为空")}return buo.te.Execute(func(tx *sql.Tx) error {// 检查所有用户是否存在for i, id := range ids {var exists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", id).Scan(&exists)if err != nil {return fmt.Errorf("检查第 %d 个用户存在性失败: %w", i+1, err)}if !exists {return fmt.Errorf("第 %d 个用户不存在: ID=%d", i+1, id)}}// 批量删除for i, id := range ids {result, err := tx.Exec("DELETE FROM users WHERE id = $1", id)if err != nil {return fmt.Errorf("删除第 %d 个用户失败: %w", i+1, err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取第 %d 个用户影响行数失败: %w", i+1, err)}if rowsAffected == 0 {return fmt.Errorf("第 %d 个用户删除失败,没有行被影响", i+1)}}return nil})
}
4. 复杂事务场景
4.1 转账操作示例
package databaseimport ("database/sql""fmt""time"
)// Account 账户模型
type Account struct {ID int `json:"id"`UserID int `json:"user_id"`Balance float64 `json:"balance"`Currency string `json:"currency"`CreatedAt time.Time `json:"created_at"`UpdatedAt time.Time `json:"updated_at"`
}// Transaction 交易记录
type TransactionRecord struct {ID int `json:"id"`FromAccount int `json:"from_account"`ToAccount int `json:"to_account"`Amount float64 `json:"amount"`Currency string `json:"currency"`Description string `json:"description"`Status string `json:"status"`CreatedAt time.Time `json:"created_at"`
}// TransferRequest 转账请求
type TransferRequest struct {FromAccountID int `json:"from_account_id" validate:"required"`ToAccountID int `json:"to_account_id" validate:"required"`Amount float64 `json:"amount" validate:"required,gt=0"`Description string `json:"description"`
}// AccountService 账户服务
type AccountService struct {db *sql.DBte *TransactionExecutor
}// NewAccountService 创建账户服务
func NewAccountService(db *sql.DB) *AccountService {return &AccountService{db: db,te: NewTransactionExecutor(db, 30*time.Second),}
}// Transfer 转账操作
func (as *AccountService) Transfer(req *TransferRequest) (*TransactionRecord, error) {if req.FromAccountID == req.ToAccountID {return nil, fmt.Errorf("不能向同一账户转账")}var txRecord *TransactionRecorderr := as.te.Execute(func(tx *sql.Tx) error {// 1. 锁定并获取源账户信息var fromAccount Accounterr := tx.QueryRow(`SELECT id, user_id, balance, currency, created_at, updated_atFROM accountsWHERE id = $1FOR UPDATE`, req.FromAccountID).Scan(&fromAccount.ID,&fromAccount.UserID,&fromAccount.Balance,&fromAccount.Currency,&fromAccount.CreatedAt,&fromAccount.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return fmt.Errorf("源账户不存在: ID=%d", req.FromAccountID)}return fmt.Errorf("获取源账户失败: %w", err)}// 2. 锁定并获取目标账户信息var toAccount Accounterr = tx.QueryRow(`SELECT id, user_id, balance, currency, created_at, updated_atFROM accountsWHERE id = $1FOR UPDATE`, req.ToAccountID).Scan(&toAccount.ID,&toAccount.UserID,&toAccount.Balance,&toAccount.Currency,&toAccount.CreatedAt,&toAccount.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return fmt.Errorf("目标账户不存在: ID=%d", req.ToAccountID)}return fmt.Errorf("获取目标账户失败: %w", err)}// 3. 检查货币类型if fromAccount.Currency != toAccount.Currency {return fmt.Errorf("货币类型不匹配: %s != %s", fromAccount.Currency, toAccount.Currency)}// 4. 检查余额if fromAccount.Balance < req.Amount {return fmt.Errorf("余额不足: 当前余额 %.2f,转账金额 %.2f", fromAccount.Balance, req.Amount)}// 5. 创建交易记录err = tx.QueryRow(`INSERT INTO transactions (from_account, to_account, amount, currency, description, status, created_at)VALUES ($1, $2, $3, $4, $5, $6, $7)RETURNING id, from_account, to_account, amount, currency, description, status, created_at`,req.FromAccountID,req.ToAccountID,req.Amount,fromAccount.Currency,req.Description,"processing",time.Now(),).Scan(&txRecord.ID,&txRecord.FromAccount,&txRecord.ToAccount,&txRecord.Amount,&txRecord.Currency,&txRecord.Description,&txRecord.Status,&txRecord.CreatedAt,)if err != nil {return fmt.Errorf("创建交易记录失败: %w", err)}// 6. 更新源账户余额_, err = tx.Exec(`UPDATE accounts SET balance = balance - $1, updated_at = $2WHERE id = $3`,req.Amount, time.Now(), req.FromAccountID)if err != nil {return fmt.Errorf("更新源账户余额失败: %w", err)}// 7. 更新目标账户余额_, err = tx.Exec(`UPDATE accounts SET balance = balance + $1, updated_at = $2WHERE id = $3`,req.Amount, time.Now(), req.ToAccountID)if err != nil {return fmt.Errorf("更新目标账户余额失败: %w", err)}// 8. 更新交易状态为成功_, err = tx.Exec(`UPDATE transactions SET status = $1WHERE id = $2`,"completed", txRecord.ID)if err != nil {return fmt.Errorf("更新交易状态失败: %w", err)}txRecord.Status = "completed"return nil})if err != nil {// 如果事务失败,尝试更新交易记录状态为失败if txRecord != nil && txRecord.ID > 0 {as.db.Exec("UPDATE transactions SET status = $1 WHERE id = $2", "failed", txRecord.ID)}return nil, err}return txRecord, nil
}// GetAccountBalance 获取账户余额
func (as *AccountService) GetAccountBalance(accountID int) (*Account, error) {account := &Account{}err := as.db.QueryRow(`SELECT id, user_id, balance, currency, created_at, updated_atFROM accountsWHERE id = $1`, accountID).Scan(&account.ID,&account.UserID,&account.Balance,&account.Currency,&account.CreatedAt,&account.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return nil, fmt.Errorf("账户不存在: ID=%d", accountID)}return nil, fmt.Errorf("获取账户信息失败: %w", err)}return account, nil
}// GetTransactionHistory 获取交易历史
func (as *AccountService) GetTransactionHistory(accountID int, limit, offset int) ([]*TransactionRecord, error) {query := `SELECT id, from_account, to_account, amount, currency, description, status, created_atFROM transactionsWHERE from_account = $1 OR to_account = $1ORDER BY created_at DESCLIMIT $2 OFFSET $3`rows, err := as.db.Query(query, accountID, limit, offset)if err != nil {return nil, fmt.Errorf("查询交易历史失败: %w", err)}defer rows.Close()var transactions []*TransactionRecordfor rows.Next() {tx := &TransactionRecord{}err := rows.Scan(&tx.ID,&tx.FromAccount,&tx.ToAccount,&tx.Amount,&tx.Currency,&tx.Description,&tx.Status,&tx.CreatedAt,)if err != nil {return nil, fmt.Errorf("扫描交易记录失败: %w", err)}transactions = append(transactions, tx)}return transactions, rows.Err()
}
4.2 订单处理系统
package databaseimport ("database/sql""fmt""time"
)// Order 订单模型
type Order struct {ID int `json:"id"`UserID int `json:"user_id"`TotalAmount float64 `json:"total_amount"`Status string `json:"status"`CreatedAt time.Time `json:"created_at"`UpdatedAt time.Time `json:"updated_at"`
}// OrderItem 订单项
type OrderItem struct {ID int `json:"id"`OrderID int `json:"order_id"`ProductID int `json:"product_id"`Quantity int `json:"quantity"`Price float64 `json:"price"`Subtotal float64 `json:"subtotal"`
}// Product 产品模型
type Product struct {ID int `json:"id"`Name string `json:"name"`Price float64 `json:"price"`Stock int `json:"stock"`IsActive bool `json:"is_active"`
}// CreateOrderRequest 创建订单请求
type CreateOrderRequest struct {UserID int `json:"user_id" validate:"required"`Items []struct {ProductID int `json:"product_id" validate:"required"`Quantity int `json:"quantity" validate:"required,gt=0"`} `json:"items" validate:"required,min=1"`
}// OrderService 订单服务
type OrderService struct {db *sql.DBte *TransactionExecutor
}// NewOrderService 创建订单服务
func NewOrderService(db *sql.DB) *OrderService {return &OrderService{db: db,te: NewTransactionExecutor(db, 60*time.Second),}
}// CreateOrder 创建订单
func (os *OrderService) CreateOrder(req *CreateOrderRequest) (*Order, []*OrderItem, error) {var order *Ordervar orderItems []*OrderItemerr := os.te.Execute(func(tx *sql.Tx) error {// 1. 验证用户存在var userExists boolerr := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", req.UserID).Scan(&userExists)if err != nil {return fmt.Errorf("检查用户存在性失败: %w", err)}if !userExists {return fmt.Errorf("用户不存在: ID=%d", req.UserID)}// 2. 验证产品并锁定库存var totalAmount float64productMap := make(map[int]*Product)for i, item := range req.Items {var product Producterr := tx.QueryRow(`SELECT id, name, price, stock, is_activeFROM productsWHERE id = $1FOR UPDATE`, item.ProductID).Scan(&product.ID,&product.Name,&product.Price,&product.Stock,&product.IsActive,)if err != nil {if err == sql.ErrNoRows {return fmt.Errorf("第 %d 个商品不存在: ID=%d", i+1, item.ProductID)}return fmt.Errorf("获取第 %d 个商品信息失败: %w", i+1, err)}if !product.IsActive {return fmt.Errorf("第 %d 个商品已下架: %s", i+1, product.Name)}if product.Stock < item.Quantity {return fmt.Errorf("第 %d 个商品库存不足: %s (库存: %d, 需要: %d)", i+1, product.Name, product.Stock, item.Quantity)}productMap[item.ProductID] = &producttotalAmount += product.Price * float64(item.Quantity)}// 3. 创建订单err = tx.QueryRow(`INSERT INTO orders (user_id, total_amount, status, created_at, updated_at)VALUES ($1, $2, $3, $4, $5)RETURNING id, user_id, total_amount, status, created_at, updated_at`,req.UserID,totalAmount,"pending",time.Now(),time.Now(),).Scan(&order.ID,&order.UserID,&order.TotalAmount,&order.Status,&order.CreatedAt,&order.UpdatedAt,)if err != nil {return fmt.Errorf("创建订单失败: %w", err)}// 4. 创建订单项并更新库存for i, item := range req.Items {product := productMap[item.ProductID]subtotal := product.Price * float64(item.Quantity)// 创建订单项var orderItem OrderItemerr := tx.QueryRow(`INSERT INTO order_items (order_id, product_id, quantity, price, subtotal)VALUES ($1, $2, $3, $4, $5)RETURNING id, order_id, product_id, quantity, price, subtotal`,order.ID,item.ProductID,item.Quantity,product.Price,subtotal,).Scan(&orderItem.ID,&orderItem.OrderID,&orderItem.ProductID,&orderItem.Quantity,&orderItem.Price,&orderItem.Subtotal,)if err != nil {return fmt.Errorf("创建第 %d 个订单项失败: %w", i+1, err)}orderItems = append(orderItems, &orderItem)// 更新库存result, err := tx.Exec(`UPDATE products SET stock = stock - $1WHERE id = $2`,item.Quantity, item.ProductID)if err != nil {return fmt.Errorf("更新第 %d 个商品库存失败: %w", i+1, err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取第 %d 个商品库存更新影响行数失败: %w", i+1, err)}if rowsAffected == 0 {return fmt.Errorf("第 %d 个商品库存更新失败", i+1)}}return nil})if err != nil {return nil, nil, err}return order, orderItems, nil
}// CancelOrder 取消订单
func (os *OrderService) CancelOrder(orderID int) error {return os.te.Execute(func(tx *sql.Tx) error {// 1. 获取订单信息var order Ordererr := tx.QueryRow(`SELECT id, user_id, total_amount, status, created_at, updated_atFROM ordersWHERE id = $1FOR UPDATE`, orderID).Scan(&order.ID,&order.UserID,&order.TotalAmount,&order.Status,&order.CreatedAt,&order.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return fmt.Errorf("订单不存在: ID=%d", orderID)}return fmt.Errorf("获取订单信息失败: %w", err)}// 2. 检查订单状态if order.Status != "pending" {return fmt.Errorf("订单状态不允许取消: %s", order.Status)}// 3. 获取订单项rows, err := tx.Query(`SELECT product_id, quantityFROM order_itemsWHERE order_id = $1`, orderID)if err != nil {return fmt.Errorf("获取订单项失败: %w", err)}defer rows.Close()var orderItems []struct {ProductID intQuantity int}for rows.Next() {var item struct {ProductID intQuantity int}err := rows.Scan(&item.ProductID, &item.Quantity)if err != nil {return fmt.Errorf("扫描订单项失败: %w", err)}orderItems = append(orderItems, item)}if err = rows.Err(); err != nil {return fmt.Errorf("遍历订单项失败: %w", err)}// 4. 恢复库存for i, item := range orderItems {result, err := tx.Exec(`UPDATE products SET stock = stock + $1WHERE id = $2`,item.Quantity, item.ProductID)if err != nil {return fmt.Errorf("恢复第 %d 个商品库存失败: %w", i+1, err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取第 %d 个商品库存恢复影响行数失败: %w", i+1, err)}if rowsAffected == 0 {return fmt.Errorf("第 %d 个商品库存恢复失败", i+1)}}// 5. 更新订单状态_, err = tx.Exec(`UPDATE orders SET status = $1, updated_at = $2WHERE id = $3`,"cancelled", time.Now(), orderID)if err != nil {return fmt.Errorf("更新订单状态失败: %w", err)}return nil})
}// GetOrderWithItems 获取订单及其项目
func (os *OrderService) GetOrderWithItems(orderID int) (*Order, []*OrderItem, error) {var order *Ordervar orderItems []*OrderItemerr := os.te.Execute(func(tx *sql.Tx) error {// 获取订单信息order = &Order{}err := tx.QueryRow(`SELECT id, user_id, total_amount, status, created_at, updated_atFROM ordersWHERE id = $1`, orderID).Scan(&order.ID,&order.UserID,&order.TotalAmount,&order.Status,&order.CreatedAt,&order.UpdatedAt,)if err != nil {if err == sql.ErrNoRows {return fmt.Errorf("订单不存在: ID=%d", orderID)}return fmt.Errorf("获取订单信息失败: %w", err)}// 获取订单项rows, err := tx.Query(`SELECT id, order_id, product_id, quantity, price, subtotalFROM order_itemsWHERE order_id = $1`, orderID)if err != nil {return fmt.Errorf("获取订单项失败: %w", err)}defer rows.Close()for rows.Next() {item := &OrderItem{}err := rows.Scan(&item.ID,&item.OrderID,&item.ProductID,&item.Quantity,&item.Price,&item.Subtotal,)if err != nil {return fmt.Errorf("扫描订单项失败: %w", err)}orderItems = append(orderItems, item)}return rows.Err()})if err != nil {return nil, nil, err}return order, orderItems, nil
}
5. 事务最佳实践
5.1 事务设计原则
package databaseimport ("context""database/sql""fmt""log""time"
)// TransactionBestPractices 事务最佳实践示例
type TransactionBestPractices struct {db *sql.DB
}// NewTransactionBestPractices 创建事务最佳实践示例
func NewTransactionBestPractices(db *sql.DB) *TransactionBestPractices {return &TransactionBestPractices{db: db}
}// 1. 保持事务简短
func (tbp *TransactionBestPractices) GoodExample_ShortTransaction() error {return tbp.executeTransaction(func(tx *sql.Tx) error {// 只包含必要的数据库操作_, err := tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")if err != nil {return err}_, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")return err})
}// 反例:事务中包含耗时操作
func (tbp *TransactionBestPractices) BadExample_LongTransaction() error {return tbp.executeTransaction(func(tx *sql.Tx) error {// 数据库操作_, err := tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")if err != nil {return err}// 错误:在事务中进行耗时的外部调用// time.Sleep(5 * time.Second) // 模拟外部API调用// sendNotification() // 发送通知_, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")return err})
}// 2. 正确的锁定顺序
func (tbp *TransactionBestPractices) GoodExample_LockOrdering(fromID, toID int, amount float64) error {// 确保锁定顺序一致,避免死锁firstID, secondID := fromID, toIDif fromID > toID {firstID, secondID = toID, fromID}return tbp.executeTransaction(func(tx *sql.Tx) error {// 按照固定顺序锁定账户var balance1, balance2 float64err := tx.QueryRow("SELECT balance FROM accounts WHERE id = $1 FOR UPDATE", firstID).Scan(&balance1)if err != nil {return err}err = tx.QueryRow("SELECT balance FROM accounts WHERE id = $1 FOR UPDATE", secondID).Scan(&balance2)if err != nil {return err}// 执行转账逻辑if fromID == firstID {if balance1 < amount {return fmt.Errorf("余额不足")}_, err = tx.Exec("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)if err != nil {return err}_, err = tx.Exec("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)} else {if balance2 < amount {return fmt.Errorf("余额不足")}_, err = tx.Exec("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)if err != nil {return err}_, err = tx.Exec("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)}return err})
}// 3. 适当的错误处理
func (tbp *TransactionBestPractices) GoodExample_ErrorHandling() error {return tbp.executeTransactionWithRetry(func(tx *sql.Tx) error {// 业务逻辑result, err := tx.Exec("UPDATE products SET stock = stock - 1 WHERE id = 1 AND stock > 0")if err != nil {return fmt.Errorf("更新库存失败: %w", err)}rowsAffected, err := result.RowsAffected()if err != nil {return fmt.Errorf("获取影响行数失败: %w", err)
