当前位置: 首页 > news >正文

【golang】基于redis zset实现并行流量控制(计数锁)

在业务开发中,有时需要对某个操作在整个集群中限制并发度,例如限制大模型对话的并行数。基于redis zset实现计数锁,做个笔记。

关键词:并行流量控制、计数锁

package redisutilimport ("context""fmt""math""time""github.com/go-redis/redis/v9"
)// AcquireZSetLock 借助redis zset数据结构实现分布式计数锁。可用于计数任务运行数,防止超限。返回值:zset大小、释放锁的函数、错误信息
func AcquireZSetLock(ctx context.Context, c redis.Client, key string, element string, zsetMaxSize int,expiresIn time.Duration, syncWait time.Duration) (int, func() error, error) {ctx, cancel := context.WithTimeout(ctx, syncWait)defer cancel()for i := 0; ; i++ {select {case <-ctx.Done(): // 接到取消信号,按插入失败处理return -1, func() error { return nil }, ctx.Err()default:}size, err := insertElementToZsetLock(ctx, c, key, element, zsetMaxSize, expiresIn)if err != nil {second := 0.4 + 0.6*math.Exp(-0.17*float64(i)) // f(i=0) = 1.0; f(i=10) = 0.5096,即第10次就会衰减到0.5096秒second = max(second, 0.5)                      // 最小间隔0.5秒,防止过于频繁的请求time.Sleep(time.Duration(second*1000) * time.Millisecond)}releaseFunc := func() error {result, err := c.ZRem(context.Background(), key, element).Result()if err != nil {return fmt.Errorf("redis zrem error: %v. return=%d", err, result)}return nil}return size, releaseFunc, nil}
}// insertElementToZsetLock 插入元素到zset,并删除已过期的元素
func insertElementToZsetLock(ctx context.Context, c redis.Client, key string, element string, zsetMaxSize int, expiresIn time.Duration) (int, error) {luaScript := `local zsetName = KEYS[1]local memberName = ARGV[1]local currentTime = tonumber(ARGV[2])local deadTime = tonumber(ARGV[3])local sizeLimit = tonumber(ARGV[4])-- 删除已过期的元素redis.call("ZREMRANGEBYSCORE", zsetName, "-inf", currentTime)-- 获取集合的大小local setSize = redis.call('ZCard', zsetName)-- 如果集合大小小于限制值,则添加元素,并返回集合大小if setSize < sizeLimit thenredis.call('ZAdd', zsetName, deadTime, memberName)local expireTime = deadTime - currentTimeif expireTime > 0 thenredis.call('EXPIRE', zsetName, expireTime)endreturn setSize+1endreturn -1`currentTime := time.Now().Unix()deadTime := time.Now().Add(expiresIn).Unix() // 过期时间 Unix秒ret, err := c.Do(ctx, "EVAL", luaScript, 1, key, element, currentTime, deadTime, zsetMaxSize).Result()if err != nil {return -1, err}if ret.(int64) < 0 {return zsetMaxSize, fmt.Errorf("zset size reach max size: %d", zsetMaxSize)}return int(ret.(int64)), nil
}

使用示例:

size, release, err := AcquireZSetLock(ctx, client, key, element, 10, 10*time.Second, 3*time.Second)
defer release()
if err != nil {fmt.Println(err)
}
http://www.dtcms.com/a/318722.html

相关文章:

  • InfluxDB 集群部署与高可用方案(一)
  • C基础 15_day
  • 从代码学习LLM - llama3 PyTorch版
  • css优化、提升性能方法都有哪些?
  • Nacos机制
  • 【图像处理基石】什么是数字高程模型?如何使用数字高程模型?
  • 进阶向:AI聊天机器人(NLP+DeepSeek API)
  • 双馈和永磁风机构网型跟网型联合一次调频并入同步机电网,参与系统一次调频,虚拟惯量下垂,虚拟同步机VSG控制matlab/simulink
  • 202506 电子学会青少年等级考试机器人六级实际操作真题
  • PCB工艺-四层板制作流程(简单了解下)
  • 小实验--继电器定时开闭
  • TrustZone技术详解————这篇是AI写的包括图
  • 贝叶斯算法中的参数调优
  • RK3568下用 Qt Charts 实现曲线数据展示
  • python---getsizeof和asizeof的区别
  • 17.Linux :selinux
  • LMS/NLMS最小均值算法:双麦克风降噪
  • CentOS8.5安装19c单机告警及处理
  • 碳纳米管的原子精度制造——展望
  • 福彩双色球第2025090期篮球号码分析
  • docker启动出现Error response from daemon: Container的问题【已解决】
  • 容器化运维工具(2)Kubernetes 详细教程(含图解)
  • 开发避坑指南(18): SpringBoot环境变量配置错误:占位符解析失败解决方案
  • 【数据结构与算法-Day 12】深入浅出栈:从“后进先出”原理到数组与链表双实现
  • 奔图P2500NW打印机加碳粉方法
  • 《Transformer黑魔法Mask与Softmax、Attention的关系:一个-∞符号如何让AI学会“选择性失明“》
  • 深入理解 qRegisterMetaType<T>()
  • DAY32打卡
  • 字符输入流—read方法
  • Kotlin Native调用C curl