【Go】并发编程的核心思想 CSP 模型
概述
Go 的 CSP(Communicating Sequential Processes)模型 是其并发编程的核心思想,源自 Tony Hoare 1978 年提出的理论。Go 通过 goroutine 和 channel 实现了 CSP 模型,强调:
“Do not communicate by sharing memory; instead, share memory by communicating.”
—Rob Pike
即:不要通过共享内存来通信,而应通过通信来共享内存。
核心组件
- Goroutine:轻量级协程(用户态线程),由 Go runtime 调度,启动成本极低(初始栈仅 2KB),可轻松构建成千上万个。
- Channel:goroutine 之间的通信管道,类型安全、支持双向/单向,天然支持同步和阻塞控制。
CSP 关键特点
- 顺序性:每个 goroutine 内部是顺序执行的(sequential),避免了传统多线程中复杂的锁逻辑。
- 通信驱动协同:goroutine 之间不直接共享变量,而是通过 channel 传递数据(值语义),实现解耦与同步。
- 无锁并发:避免 mutex 竞态,用 channel 的阻塞/非阻塞操作协调执行顺序。
- 可组合性:channel 可作为参数传递、返回值,甚至 channel of channels,支持构建复杂并发拓扑(如 pipeline、 fan-in/fan-out).
代码样例
获取 token,业务场景:获取服务端 token,例如 微信公众号 token、钉钉api token 等
特征
- token 需要从第三方服务获取
- token 存在过期,过期自动调用第三方服务刷新
- 多个并发请求时:
- 只允许一个 goroutine 去刷新(避免“惊群效应”);
- 其他请求等待新 token 或复用已刷新的;
- 支持优雅关闭、超时、重试、日志监控。
CSP 模型设计思路
我们要把 token 生命周期管理 封装成一个独立的 “Token Manager” goroutine, 它:
- 拥有 token 状态(current token + expire time) -> 状态私有化,天然线程安全;
- 通过 channel 接收外部 “获取 token 请求”;
- 通过 channel 返回 token 或错误;
- 内部顺序执行判断、刷新、缓存逻辑 -> 无锁、可推理。
其他业务 goroutine 只需:发请求 -> 等结果,完全不关系刷新细节。
代码实现
TokenManager 代码
package mainimport ()// TokenResult 封装返回结果(返回 token 或 错误)
type TokenResult struct{Token stringErr error
}// TokenRequest 请求结构(添加上下文、返回结果)
type TokenRequest struct {Ctx context.ContextResp chan<- TokenResult
}// TokenManager 封装所有状态和逻辑
type TokenManager struct {reqCh chan TokenRequest // 外部请求通道 refresh chan struct{} // 内部刷新触发cancel context.CancelFunc // 关于关闭closed atomic.Bool // 原子标志// 内部状态currentToken stringexpiredAt time.Time
}func NewTokenManager() *TokenManager{ctx, cancel := context.WithCancel(context.Background())tm := &TokenManager{reqCh: make(chan TokenRequest,100), // 缓存防压垮refresh: make(chan struct{}, 1), // 防重复触发cancel: cancel,}go tm.run(ctx)return tm
}func (tm *TokenManager) GetToken(ctx context.Context)(string, error) {if tm.closed.Load() {return "", errors.New("token manage closed")}respCh := make(chan TokenResult, 1)req := TokenResult{Ctx: ctx, Resp: respCh}select {case tm.reqCh <- req: // 请求入队case <- ctx.Done():return "", ctx.Error()}select {case res := <-respCh:return res.Token, res.Errcase <-ctx.Done:return "", ctx.Error()}
}func (tm *TokenManager) Close(){tm.cancel()tm.closed.Store(true)
}func (tm *TokenManager) run(ctx context.Context) {for {select {case <-ctx.Done():log.Println("TokenManager shutting down")return case req := <-tm.reqCh:if tm.isValid() {req.Resp <- TokenResult{Token: tm.currentToken}continue}select {case tm.refresh <- struct{}{}:tm.doRefreshAndReply(req)default:tm.enqueue(req)}case <-tm.refresh:// 实际刷新逻辑已在 doRefreshAndReply 中处理}}
}// isValid 判断 token 是否还能用(预留 5s 缓冲)
func (tm *TokenManager) isValid() bool {return tm.currentToken != "" && time.Now().Before(tm.expireAt.Add(-5*time.Second))
}// doRefreshAndReply 执行刷新,并通知所有等待者
func (tm *TokenManager) doRefreshAndReply(firstReq TokenRequest) {// 🙋 这里是唯一执行刷新的地方!无竞争newToken, expire, err := tm.callThirdPartyAPI()if err != nil {// 刷新失败:通知所有等待者(包括 firstReq)tm.broadcastError(err)firstReq.Resp <- TokenResult{Err: err}return}// 刷新成功:更新状态tm.currentToken = newTokentm.expireAt = expire// 通知 firstReqfirstReq.Resp <- TokenResult{Token: newToken}// 通知其他等待者(若有)tm.broadcastToken(newToken)
}// callThirdPartyAPI 模拟调用第三方服务(替换为你的真实逻辑)
func (tm *TokenManager) callThirdPartyAPI() (token string, expire time.Time, err error) {log.Println("Calling 3rd-party API to refresh token...")// 模拟网络延迟time.Sleep(200 * time.Millisecond)// 实际中这里可能是:// resp, err := http.Post(...); json.Unmarshal(...); ...return "new_jwt_token_abc123", time.Now().Add(10*time.Minute), nil
}// ===== 等待队列支持(避免多个请求重复刷新)=====var waiterQueue []TokenRequest // 简单 slice;高并发可用 channel 或 sync.Condfunc (tm *TokenManager) enqueueWaiter(req TokenRequest) {waiterQueue = append(waiterQueue, req)
}func (tm *TokenManager) broadcastToken(token string) {for _, req := range waiterQueue {select {case req.Resp <- TokenResult{Token: token}:case <-req.Ctx.Done():// 请求已取消,跳过}}waiterQueue = waiterQueue[:0] // 清空
}func (tm *TokenManager) broadcastError(err error) {for _, req := range waiterQueue {select {case req.Resp <- TokenResult{Err: err}:case <-req.Ctx.Done():}}waiterQueue = waiterQueue[:0]
}
业务调用
func main() {tm := NewTokenManager()defer tm.Close()// 模拟 10 个并发请求var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()token, err := tm.GetToken(ctx)if err != nil {log.Printf("[Worker %d] Error: %v", id, err)return}log.Printf("[Worker %d] Got token: %.10s...", id, token)}(i)}wg.Wait()
}
这个 demo 涉及的 “CSP” 模型
| 特性 | 实现方式 | 好处 |
|---|---|---|
| 状态私有 | currentToken, expireAt 仅 run() 访问 | 无锁、无竞态 |
| 通信代替共享 | 业务 → reqCh → TokenManager → respCh → 业务 | 解耦、可测试 |
| 顺序执行核心逻辑 | doRefreshAndReply 是单一 goroutine 中的顺序流 | 易推理、易加日志/监控 |
| 天然防惊群 | refresh channel + select default | 避免 100 个 goroutine 同时刷 token |
| 可观测性友好 | 可在 callThirdPartyAPI 周围加 metrics/log/trace | 运维友好(契合你 DBA/运维经验) |
