当前位置: 首页 > news >正文

【Go】并发编程的核心思想 CSP 模型

概述

Go 的 CSP(Communicating Sequential Processes)模型 是其并发编程的核心思想,源自 Tony Hoare 1978 年提出的理论。Go 通过 goroutine 和 channel 实现了 CSP 模型,强调:

“Do not communicate by sharing memory; instead, share memory by communicating.”
—Rob Pike

即:不要通过共享内存来通信,而应通过通信来共享内存


核心组件

  • Goroutine:轻量级协程(用户态线程),由 Go runtime 调度,启动成本极低(初始栈仅 2KB),可轻松构建成千上万个。
  • Channel:goroutine 之间的通信管道,类型安全、支持双向/单向,天然支持同步和阻塞控制。

CSP 关键特点

  • 顺序性:每个 goroutine 内部是顺序执行的(sequential),避免了传统多线程中复杂的锁逻辑。
  • 通信驱动协同:goroutine 之间不直接共享变量,而是通过 channel 传递数据(值语义),实现解耦与同步。
  • 无锁并发:避免 mutex 竞态,用 channel 的阻塞/非阻塞操作协调执行顺序。
  • 可组合性:channel 可作为参数传递、返回值,甚至 channel of channels,支持构建复杂并发拓扑(如 pipeline、 fan-in/fan-out).

代码样例

获取 token,业务场景:获取服务端 token,例如 微信公众号 token、钉钉api token 等

特征
  1. token 需要从第三方服务获取
  2. token 存在过期,过期自动调用第三方服务刷新
  3. 多个并发请求时:
    • 只允许一个 goroutine 去刷新(避免“惊群效应”);
    • 其他请求等待新 token 或复用已刷新的;
  4. 支持优雅关闭、超时、重试、日志监控。

CSP 模型设计思路

我们要把 token 生命周期管理 封装成一个独立的 “Token Manager” goroutine, 它:

  • 拥有 token 状态(current token + expire time) -> 状态私有化,天然线程安全;
  • 通过 channel 接收外部 “获取 token 请求”;
  • 通过 channel 返回 token 或错误;
  • 内部顺序执行判断、刷新、缓存逻辑 -> 无锁、可推理。
    其他业务 goroutine 只需:发请求 -> 等结果,完全不关系刷新细节。

代码实现

TokenManager 代码

package mainimport ()// TokenResult 封装返回结果(返回 token 或 错误)
type TokenResult struct{Token stringErr error
}// TokenRequest 请求结构(添加上下文、返回结果)
type TokenRequest struct {Ctx context.ContextResp chan<- TokenResult
}// TokenManager 封装所有状态和逻辑
type TokenManager struct {reqCh chan TokenRequest    // 外部请求通道 refresh chan struct{}      // 内部刷新触发cancel context.CancelFunc  // 关于关闭closed atomic.Bool 				 // 原子标志// 内部状态currentToken stringexpiredAt time.Time
}func NewTokenManager() *TokenManager{ctx, cancel := context.WithCancel(context.Background())tm := &TokenManager{reqCh: make(chan TokenRequest,100), // 缓存防压垮refresh: make(chan struct{}, 1), // 防重复触发cancel: cancel,}go tm.run(ctx)return tm
}func (tm *TokenManager) GetToken(ctx context.Context)(string, error) {if tm.closed.Load() {return "", errors.New("token manage closed")}respCh := make(chan TokenResult, 1)req := TokenResult{Ctx: ctx, Resp: respCh}select {case tm.reqCh <- req: // 请求入队case <- ctx.Done():return "", ctx.Error()}select {case res := <-respCh:return res.Token, res.Errcase <-ctx.Done:return "", ctx.Error()}
}func (tm *TokenManager) Close(){tm.cancel()tm.closed.Store(true)
}func (tm *TokenManager) run(ctx context.Context) {for {select {case <-ctx.Done():log.Println("TokenManager shutting down")return case req := <-tm.reqCh:if tm.isValid() {req.Resp <- TokenResult{Token: tm.currentToken}continue}select {case tm.refresh <- struct{}{}:tm.doRefreshAndReply(req)default:tm.enqueue(req)}case <-tm.refresh:// 实际刷新逻辑已在 doRefreshAndReply 中处理}}
}// isValid 判断 token 是否还能用(预留 5s 缓冲)
func (tm *TokenManager) isValid() bool {return tm.currentToken != "" && time.Now().Before(tm.expireAt.Add(-5*time.Second))
}// doRefreshAndReply 执行刷新,并通知所有等待者
func (tm *TokenManager) doRefreshAndReply(firstReq TokenRequest) {// 🙋 这里是唯一执行刷新的地方!无竞争newToken, expire, err := tm.callThirdPartyAPI()if err != nil {// 刷新失败:通知所有等待者(包括 firstReq)tm.broadcastError(err)firstReq.Resp <- TokenResult{Err: err}return}// 刷新成功:更新状态tm.currentToken = newTokentm.expireAt = expire// 通知 firstReqfirstReq.Resp <- TokenResult{Token: newToken}// 通知其他等待者(若有)tm.broadcastToken(newToken)
}// callThirdPartyAPI 模拟调用第三方服务(替换为你的真实逻辑)
func (tm *TokenManager) callThirdPartyAPI() (token string, expire time.Time, err error) {log.Println("Calling 3rd-party API to refresh token...")// 模拟网络延迟time.Sleep(200 * time.Millisecond)// 实际中这里可能是:// resp, err := http.Post(...); json.Unmarshal(...); ...return "new_jwt_token_abc123", time.Now().Add(10*time.Minute), nil
}// ===== 等待队列支持(避免多个请求重复刷新)=====var waiterQueue []TokenRequest // 简单 slice;高并发可用 channel 或 sync.Condfunc (tm *TokenManager) enqueueWaiter(req TokenRequest) {waiterQueue = append(waiterQueue, req)
}func (tm *TokenManager) broadcastToken(token string) {for _, req := range waiterQueue {select {case req.Resp <- TokenResult{Token: token}:case <-req.Ctx.Done():// 请求已取消,跳过}}waiterQueue = waiterQueue[:0] // 清空
}func (tm *TokenManager) broadcastError(err error) {for _, req := range waiterQueue {select {case req.Resp <- TokenResult{Err: err}:case <-req.Ctx.Done():}}waiterQueue = waiterQueue[:0]
}

业务调用

func main() {tm := NewTokenManager()defer tm.Close()// 模拟 10 个并发请求var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()token, err := tm.GetToken(ctx)if err != nil {log.Printf("[Worker %d] Error: %v", id, err)return}log.Printf("[Worker %d] Got token: %.10s...", id, token)}(i)}wg.Wait()
}
这个 demo 涉及的 “CSP” 模型
特性实现方式好处
状态私有currentToken, expireAtrun() 访问无锁、无竞态
通信代替共享业务 → reqCh → TokenManager → respCh → 业务解耦、可测试
顺序执行核心逻辑doRefreshAndReply 是单一 goroutine 中的顺序流易推理、易加日志/监控
天然防惊群refresh channel + select default避免 100 个 goroutine 同时刷 token
可观测性友好可在 callThirdPartyAPI 周围加 metrics/log/trace运维友好(契合你 DBA/运维经验)
http://www.dtcms.com/a/593067.html

相关文章:

  • 《Vue项目开发实战》第四章:组件封装--ToolBar
  • Redis拒绝策略
  • iphone Delta模拟器如何从夸克网盘导入游戏ROM 附游戏资源下载
  • 专业网站建设平台网站建设功能评估表
  • 做农业网站怎么赚钱58同城北京网站建设
  • 如何在命令行中调用Dev-C++的编译器?
  • C语言自定义类型:联合体与枚举
  • 在线网站推广工具WordPress 付费下载阅读
  • Windows2008 如何禁用FSO?
  • 了解一下LSTM:长短期记忆网络(改进的RNN)
  • 【微服务 - easy视频 | day03】服务与服务之间的调用
  • 网站建设定做mvc网站建设的实验报告
  • CMP(类Cloudera CMP 7 404版华为Kunpeng)告别CDH/CDP,拥抱自主可控的新时代
  • 生成ios钱包pkpass文件
  • Paimon——官网阅读:理解文件
  • 做网站应该学什么语言网站建设裕鸿国际
  • Xenium数据分析 | 使用Xenium Ranger重新分析数据
  • MySQL 8.0 新特性详解:窗口函数,开启数据分析的潘多拉魔盒
  • 基于模板匹配的数字和大写字母识别(Matlab)
  • 网站编程赚钱企业门户网站 php
  • 网站有什么到期网站空间哪里买
  • LeetCode 分类刷题:2816. 翻倍以链表形式表示的数字
  • 一文掌握,soular安装与配置
  • Whole-Body Control——双足机器人全身控制技术 论文阅读笔记
  • LeetCode hot100:240 搜索二维矩阵 II:三种解法对比
  • Wireshark笔记-DNS流程与数据包解析
  • SRv6论文阅读
  • 做电子烟外贸网站有哪些建设银行亚洲官方网站
  • 11.9 LeetCode 题目汇总与解题思路
  • leetcode 707 设计链表