当前位置: 首页 > news >正文

Python实现限流算法

1. 常见的限流算法

限流是在软件开发和系统设计中常用的技术,用于控制某个时间段内的请求量,以此保护系统的稳定性和可用性。以下是几种常见的限流算法:

1.1 固定窗口计数器算法 (Fixed Window Counter)

固定窗口计数器算法将时间分割成固定大小的窗口,每个窗口独立计算请求的数量。当请求数量超过在该窗口允许的最大请求数时,新的请求会被拒绝,直到下一个窗口开始。

优点:实现简单,性能好。
缺点:边界效应问题(窗口切换时可能会出现瞬间的请求高峰)。

1.2. 滑动窗口日志算法 (Sliding Window Log)

滑动窗口算法是固定窗口算法的一个改进版本,它记录每个请求的确切时间戳。通过计算当前时间向前推一个窗口长度的时间内的所有请求,来确定是否达到了限流条件。

优点:平滑请求分布,减少边界效应。
缺点:需要更复杂的数据结构和计算。

1.3. 漏桶算法 (Leaky Bucket)

漏桶算法 metaphorically 将所有传入的请求放入一个固定容量的桶中,请求以恒定的速率从桶中“漏出”。如果桶满了,新的请求将被拒绝。

优点:输出速率恒定,可以平滑突发流量。
缺点:对于突发流量的响应不够灵活。

1.4. 令牌桶算法 (Token Bucket)

令牌桶算法为每个请求提供一个令牌,令牌以固定的速率生成并存储在桶中。如果桶中有足够的令牌,请求可以继续;如果没有,请求将被拒绝。令牌桶可以允许一定程度的突发流量,因为桶中可以积累令牌。

优点:允许一定程度的突发流量,比漏桶更灵活。
缺点:实现相对复杂。

1.5. 响应速率限制 (Rate Limiting with Queues)

通过使用队列来控制任务的执行速率。系统按照设定的速率从队列中取出并处理请求。如果队列满了,新的请求可以根据策略被拒绝或排队等待。

优点:可以更细致地控制请求的处理。
缺点:可能需要更复杂的队列管理和超时处理。

2. 应用场景

  • 固定窗口和滑动窗口:适用于需要简单快速实现的场景。
  • 漏桶和令牌桶:适用于需要平滑处理高峰流量的场景,如视频流服务。
  • 响应速率限制:适用于后端服务,如API服务器,需要控制处理速率,防止过载。

选择合适的限流算法取决于具体的应用场景、系统架构以及预期的流量模式。每种算法都有其优势和局限性,理解这些可以帮助设计更健壯和高效的系统。

3. Python实现滑动窗口日志算法

Redis 作为数据库、缓存、消息中间件等,是单线程的,又在内存中操作,所以速度极快。因其原子操作等特性,使用 Redis 实现一个限流工具是十分方便的。以下是使用Python实现一种限流算法的代码。

#!/usr/bin/env python
# coding=utf-8

import typing
import math
import time
import functools
import redis


redis_conf = {"host": "127.0.0.1", "port": 6379, "db": 0, "socket_timeout": 10}
__pool = redis.ConnectionPool(encoding='utf-8', decode_responses=True, **redis_conf)
throttler_redis = redis.StrictRedis(connection_pool=__pool)


class ThrottlingException(Exception):
    pass


def sliding_window_log_throttler(
    fn: typing.Optional[typing.Callable] = None,
    threshold: int = 10,
    duration: int = 1,
    lock_timeout: int = 3,
) -> typing.Callable:
    """
    :param fn:
    :param threshold: 单个窗口内的限流次数
    :param duration: 限制的窗口时间段,单位秒; 建议取值[1,60],否则计算有int溢出的风险
    :param lock_timeout: 资源锁的超时时间,单位秒; 设置值应大于fn函数执行耗时
    :return:
    """
    if not callable(fn):
        return functools.partial(
            sliding_window_log_throttler,
            threshold=threshold,
            duration=duration,
            lock_timeout=lock_timeout,
        )

    @functools.wraps(fn)
    def _wrapper(*args: tuple, **kwargs: dict) -> typing.Any:
        # 函数路径作为key
        method_location = "{}.{}".format(fn.__module__, fn.__qualname__)
        wait_key = f"throttling:wait:{method_location}"
        lock_key = f"throttling:lock:{method_location}"

        get_lock = False
        try:

            if lock_timeout > 0:
                # 加锁是为了解决并发竞争资源的问题
                get_lock = throttler_redis.set(lock_key, 1, ex=lock_timeout, nx=True)
                if not get_lock:
                    try:
                        throttler_redis.delete(wait_key)
                        # 阻塞式等待下一个锁:若是超过窗口时间还未能获得锁,则表示超过限流
                        throttler_redis.blpop(wait_key, timeout=duration)
                        get_lock = throttler_redis.set(lock_key, 1, ex=lock_timeout, nx=True)
                    except Exception:
                        pass
                if not get_lock:
                    raise ThrottlingException("未竞争到资源")

            # 按照窗口滑动计算限流
            key = f"throttling:method:{method_location}"
            # 精确度, 微秒μs,精度越高越不容易受并发影响
            multiple = 1000000
            # 计算时间戳
            now_ts = math.floor(time.time() * multiple)
            # 时间窗口左边界
            old_ts = now_ts - (duration * multiple)
            # 删除时间窗口之前的数据
            throttler_redis.zremrangebyscore(key, 0, old_ts)
            # 获取窗口内的行为数量
            count = throttler_redis.zcard(key)

            get_succ = count == 0 or count < threshold
            if get_succ:
                # 记录行为; 只要保证score的唯一性即可,这里使用毫秒时间戳作为唯一值
                # mapping构造:{member: score} ,score是用于排序的数值
                create = throttler_redis.zadd(key, {now_ts: now_ts})
                if create == 0:
                    # 时间戳并发重复,数据只是被覆盖,并未新增;未获得令牌
                    get_succ = False

            if not get_succ:
                raise ThrottlingException(f"超过限流限制: {threshold}")

        finally:
            if get_lock:
                throttler_redis.delete(lock_key)
                throttler_redis.lpush(wait_key, 1)

        # 执行函数
        ret = fn(*args, **kwargs)
        return ret

    return _wrapper

使用测试函数验证,输出结果:

In [188]: @sliding_window_log_throttler(threshold=3)
     ...: def demo(a, b):
     ...:     return a + b
     ...:

In [189]: for i in range(10):
     ...:     print(i)
     ...:     demo(1,i)
     ...:
0
1
2
3
---------------------------------------------------------------------------
ThrottlingException                       Traceback (most recent call last)
<ipython-input-189-e36de3c39367> in <module>
      1 for i in range(10):
      2     print(i)
----> 3     demo(1,i)
      4

<ipython-input-174-782ccecce3f9> in _wrapper(*args, **kwargs)
     67
     68             if not get_succ:
---> 69                 raise ThrottlingException(f"超过限流限制: {threshold}")
     70
     71         finally:

ThrottlingException: 超过限流限制: 3

相关文章:

  • 使用-v选项查看编译器详细搜索路径(g++示例)g++ -v -c main.cpp 发现自定义路径没有被包含怎么办
  • 如何用Deepseek制作流程图?
  • 数据结构---堆栈和列
  • 77.HarmonyOS NEXT ImageViewerView 组件深度剖析: Swiper容器与懒加载深度解析
  • 智慧加油站小程序数据库设计文档
  • Redis 的应用场景
  • 提升HDFS存储和读取效率
  • Json实现深拷贝的缺点
  • 2.5 python接口编程
  • Java File 类详解
  • Wireshark 抓包全解析:从数据捕获到报文分析
  • 2024年12月CCF-GESP编程能力等级认证C++编程二级真题解析
  • 深入探索人工智能(AI):分类、应用与案例分析
  • 基于模态特定因子的高效低秩多模态融合方法解析
  • 基于Python的天气预报数据可视化分析系统-Flask+html
  • python从邮件中提取链接中的符号为什么会变成amp; 解决办法
  • 华为手机助手输入连接码时光标乱跳
  • 本地部署Spark集群
  • Markdig:强大的 .NET Markdown 解析器详解
  • 003_快乐数
  • 莱布雷希特专栏:古典乐坛边缘人
  • 持续降雨存在落石风险,贵州黄果树景区水帘洞将封闭至6月初
  • 上海公办小学验证今起开始,下周一和周二分区进行民办摇号
  • 多个“首次”!上市公司重大资产重组新规落地
  • “9+2”复式票,浦东购彩者拿下体彩大乐透1153万头奖
  • 李强:把做强国内大循环作为推动经济行稳致远的战略之举