【限流器设计】固定窗口计数法
前言
在高并发系统或接口服务中,频繁的请求可能导致系统资源被快速耗尽,甚至造成服务不可用。为了保证系统的稳定性和响应能力,我们需要对接口调用进行限流,即控制单位时间内的最大请求次数。
本文将以 C++ 实现的限流器为例,讲解如何通过固定窗口计数法来实现高精度、易维护的限流机制。
核心目标
控制调用频率:每单位时间允许的最大接口调用次数;
保证稳定性:避免短时间突发流量;
便于监控:记录调用情况方便调试。
算法设计
本文使用 固定窗口计数法(Fixed Window Counter)实现限流。
设计方案
时间区间 | 调用次数 | 内部控制值 | 说明 |
---|---|---|---|
第1s | 1 | 0 → 1 | callCount=1,窗口开始计时 |
2 | 1 → 2 | callCount=2 | |
3 | 2 → 3 | callCount=3,触发限流,等待剩余窗口时间,计数重置 | |
第2s | 4 | 3 → 1 | 新窗口开始,callCount=1 |
5 | 4 → 2 | callCount=2 | |
6 | 5 → 3 | callCount=3,触发限流,等待剩余窗口时间,计数重置 | |
第3s | 7 | 6 → 1 | 新窗口开始,callCount=1 |
... | ... | ... | ... |
伪代码:
Wait() {now = 当前时间if (m_windowStartNs== 0)初始化窗口起始时间if (m_callCount == m_maxCalls - 1) {elapsed = now - m_windowStartNsif (elapsed < m_windowNs){sleep(窗口剩余时间)}重置计数器和窗口起始时间} m_callCount++;
}
C++代码示例
#include <iostream>
#include <vector>
#include <ctime>
#include <iomanip>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>#define RATE_LIMITER_DEBUG 1// ===================== 高精度时间戳工具 =====================
namespace HighTimestamp
{// 获取单调递增纳秒时间inline uint64_t GetTimeNano(){struct timespec t;clock_gettime(CLOCK_MONOTONIC, &t);return static_cast<uint64_t>(t.tv_sec) * 1000000000ULL + t.tv_nsec;}
}
// ==========================================================// ===================== 异步日志器 =====================
class AsyncLogger
{
public:AsyncLogger(): m_stop(false){m_thread = std::thread([this]{ Process(); });}~AsyncLogger(){{std::lock_guard<std::mutex> lock(m_mutex);m_stop = true;}m_cv.notify_all();if (m_thread.joinable()){m_thread.join();}}// 往队列中添加日志void Log(const std::string &msg){{std::lock_guard<std::mutex> lock(m_mutex);m_queue.push(msg);}m_cv.notify_one();}private:void Process(){while (true){std::unique_lock<std::mutex> lock(m_mutex);m_cv.wait(lock, [this]{ return !m_queue.empty() || m_stop; });while (!m_queue.empty()){std::string msg = m_queue.front();m_queue.pop();lock.unlock(); // 减小锁粒度std::cout << msg << std::endl;lock.lock();}if (m_stop && m_queue.empty())break;}}std::queue<std::string> m_queue;std::mutex m_mutex;std::condition_variable m_cv;std::thread m_thread;bool m_stop;
};
// ==========================================================// 模拟 send 函数
enum SendResult
{OK,RETRY,ERROR
};
SendResult Send(int) { return OK; }// ===================== 限流器 =====================
class RateLimiter
{
public:RateLimiter(uint32_t maxCalls, uint64_t windowNs, AsyncLogger *logger): m_maxCalls(maxCalls), m_windowNs(windowNs), m_logger(logger){}// 调用前等待,保证限流void Wait(){uint64_t now = HighTimestamp::GetTimeNano();if (m_windowStartNs == 0) // 首次则需要初始化窗口开始时间{m_windowStartNs = now;}if (m_callCount == m_maxCalls - 1){uint64_t elapsed = now - m_windowStartNs;if (elapsed < m_windowNs) // 说明窗口还未满,需要sleep{
#if 0 // 打印调用m_maxCalls次的实际耗时m_logger->Log("[信息] 达到调用次数= " + std::to_string(m_callCount) + ",实际耗时=" + std::to_string(elapsed / 1e6) + " ms,等待剩余时间...");
#endifuint64_t sleepNs = m_windowNs - elapsed;m_ts.tv_sec = sleepNs / 1000000000ULL;m_ts.tv_nsec = sleepNs % 1000000000ULL;nanosleep(&m_ts, nullptr);}else // 说明窗口已占满甚至多占,不需要sleep{
#if RATE_LIMITER_DEBUGm_logger->Log("[提示] 窗口已满,但速率低于限制,无需 sleep");
#endif}#if RATE_LIMITER_DEBUGuint64_t realElapsed = HighTimestamp::GetTimeNano() - m_windowStartNs;double rate = (double)m_callCount * 1e9 / realElapsed;m_logger->Log("[统计] 调用次数=" + std::to_string(m_callCount) +", 耗时=" + std::to_string(realElapsed / 1e6) +" ms, 实际速率=" + std::to_string(rate) + " 次/秒");
#endifm_callCount = 0; // 重置窗口计数m_windowStartNs = HighTimestamp::GetTimeNano(); // 重置窗口开始时间}++m_callCount; // 增加计数}private:uint32_t m_maxCalls = 0; // 每窗口最大调用次数uint64_t m_windowNs = 0; // 窗口长度(纳秒)uint32_t m_callCount = 0; // 当前窗口已调用次数uint64_t m_windowStartNs = 0; // 当前窗口开始时间AsyncLogger *m_logger = nullptr; // 异步日志器struct timespec m_ts{};
};
// ==========================================================int main()
{static const int32_t vecSize = 1200000;std::vector<int> vec(vecSize, 42); // 大数据量测试const uint32_t MAX_CALLS = 10000; // 每窗口最大调用次数const uint64_t WINDOW_NS = 1000000000ULL; // 窗口长度 1 秒AsyncLogger logger;RateLimiter limiter(MAX_CALLS, WINDOW_NS, &logger);uint32_t totalCalls = 0;uint64_t startTime = HighTimestamp::GetTimeNano();for (uint32_t i = 0; i < vec.size(); ++i){while (true){SendResult err = Send(vec[i]);if (err == OK){limiter.Wait();++totalCalls;break;}else if (err == RETRY){continue;}else{logger.Log("Send error! err =" + std::to_string(err));break;}}}uint64_t now = HighTimestamp::GetTimeNano();uint64_t elapsed = now - startTime;logger.Log("测试结束,总调用次数=" + std::to_string(totalCalls) + "\n" +"数据量=" + std::to_string(vec.size()) + "\n" +"实际耗时=" + std::to_string(elapsed / 1e6) + " ms\n" +"平均速率=" + std::to_string(totalCalls / (elapsed / 1e9)) + " 次/秒");return 0;
}
输出:
总结
固定窗口计数法是一种简单、高效的限流方案,适用于大部分接口调用控制场景。它结合高精度时间和异步日志器,兼顾了性能与可观测性。