SpringBoot默认并发处理(Tomcat)、项目限流详解
SpringBoot默认并发处理
在 Spring Boot 项目中,默认情况下,同一时间能处理的请求数由内嵌的 Tomcat 服务器的线程池配置决定。
默认并发处理能力
请求处理流程
- 请求到达:新请求首先进入 TCP 连接队列(最大 max-connections)。
- 线程分配:
如果有空闲线程,立即处理请求。 - 如果所有线程繁忙,请求进入等待队列(最多 accept-count 个)。
队列满时:若等待队列已满,直接返回 Connection refused 错误(HTTP 503)。
配置调整
在 application.properties 或 application.yml 中修改默认值:
# 调整 Tomcat 线程池参数
server.tomcat.threads.max=500 # 提高最大工作线程数
server.tomcat.accept-count=200 # 增大等待队列长度
server.tomcat.max-connections=10000 # 增加 TCP 连接队列容量
限流业务场景
在短时间内,接口承载成千上万的请求,首先要考虑程序的并发性。大流量会直接将系统打垮,无法对外提供服务。那为了防止出现这种情况最常见的解决方案之一就是限流,当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在 Spring Boot 中,常见的限流算法包括 计数器算法、漏桶算法、令牌桶算法,以及 Resilience4j 和 Spring Cloud Gateway 的集成方案。
常见限流算法
固定窗口计数器限流
原理
在固定时间窗口(如1秒)内统计请求数,超过阈值则拒绝请求。
缺点:存在窗口临界突发问题(如窗口切换时请求翻倍)。
实现示例
@Component
public class FixedWindowCounter {private final AtomicInteger counter = new AtomicInteger(0);private final int limit = 100; // 每秒100个请求@Scheduled(fixedRate = 1000) // 每秒重置计数器public void resetCounter() {counter.set(0);}public boolean allowRequest() {return counter.incrementAndGet() <= limit;}
}// 在Controller中使用
@RestController
public class ApiController {@Autowiredprivate FixedWindowCounter counter;@GetMapping("/api")public ResponseEntity<String> handleRequest() {if (counter.allowRequest()) {return ResponseEntity.ok("Success");} else {return ResponseEntity.status(429).body("Too Many Requests");}}
}
滑动窗口计数器限流
原理
将时间窗口划分为多个小窗口(如1秒分为10个100ms的窗口),统计最近N个小窗口的总请求数。
优点:平滑过渡,减少临界问题。
实现示例
@Component
public class SlidingWindowCounter {private final LinkedList<Long> timestamps = new LinkedList<>();private final int windowSize = 1000; // 时间窗口1秒private final int limit = 100; // 窗口内最多100个请求public synchronized boolean allowRequest() {long now = System.currentTimeMillis();// 移除超出时间窗口的旧记录while (!timestamps.isEmpty() && now - timestamps.getFirst() > windowSize) {timestamps.removeFirst();}if (timestamps.size() < limit) {timestamps.addLast(now);return true;}return false;}
}
漏桶算法(Leaky Bucket)
漏桶算法原理
漏桶算法是一种流量整形技术,用于控制请求的处理速率,确保系统平稳处理流量。其核心思想如下:
桶容量(Capacity):桶的最大请求容量,超过则拒绝新请求。
恒定漏出速率(Leak Rate):以固定速率处理请求(如每秒处理10个请求)。
请求处理流程:
请求到达时,若桶未满,加入队列等待处理。
桶满时,新请求被拒绝。
漏桶以恒定速率从队列取出请求处理。
漏桶算法示例
- 漏桶的容量和漏出速率
// application.yml
leaky-bucket:capacity: 10 # 桶容量(最多容纳10个请求)leak-rate: 1000 # 漏出速率(每1000毫秒处理1个请求)// LeakyBucketConfig
@Configuration
@ConfigurationProperties(prefix = "leaky-bucket")
@Data
public class LeakyBucketConfig {private int capacity; // 桶容量private long leakRate; // 漏出速率(毫秒)
}
- 实现漏桶算法
@Component
public class LeakyBucket {private final Queue<Request> bucket = new ConcurrentLinkedQueue<>();private final int capacity;private final long leakRate;public LeakyBucket(LeakyBucketConfig config) {this.capacity = config.getCapacity();this.leakRate = config.getLeakRate();this.initLeakTask();}// 初始化漏出任务private void initLeakTask() {ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();scheduler.scheduleAtFixedRate(this::leak, 0, leakRate, TimeUnit.MILLISECONDS);}// 处理漏出请求private void leak() {if (!bucket.isEmpty()) {Request request = bucket.poll();request.process();}}// 尝试加入请求public boolean tryAddRequest(Request request) {if (bucket.size() < capacity) {return bucket.offer(request);}return false;}
}
- 请求处理类
public class Request {private final String data;public Request(String data) {this.data = data;}public void process() {System.out.println("处理请求: " + data + ",时间: " + new Date());}
}
- controller
@RestController
public class ApiController {private final LeakyBucket leakyBucket;public ApiController(LeakyBucket leakyBucket) {this.leakyBucket = leakyBucket;}@PostMapping("/api")public ResponseEntity<String> handleRequest(@RequestBody String data) {Request request = new Request(data);if (leakyBucket.tryAddRequest(request)) {return ResponseEntity.ok("请求已接受");} else {return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body("请求过多,请稍后重试");}}
}
漏桶算法适用性总结
异步限流:漏桶算法的设计初衷是异步处理,适合需要流量整形的场景(如API限流、消息队列消费)。
同步适配:通过阻塞或Future可强制同步,但会牺牲性能,不推荐在高并发场景使用。
替代方案:若需同步限流,可考虑 计数器算法 或 信号量(如Semaphore)。
结合Future模拟同步示例:
public class SyncLeakyBucket {private final BlockingQueue<CompletableFuture<String>> queue = new LinkedBlockingQueue<>(10);private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public SyncLeakyBucket() {// 每秒处理1个请求scheduler.scheduleAtFixedRate(() -> {CompletableFuture<String> future = queue.poll();if (future != null) {future.complete("处理结果");}}, 0, 1, TimeUnit.SECONDS);}// 同步调用:阻塞等待结果public String processSync(String data) throws InterruptedException, ExecutionException {CompletableFuture<String> future = new CompletableFuture<>();if (queue.offer(future)) {return future.get(); // 阻塞直到结果返回} else {throw new RuntimeException("请求被拒绝");}}
}// 使用示例
SyncLeakyBucket bucket = new SyncLeakyBucket();
String result = bucket.processSync("data"); // 同步阻塞
令牌桶算法(Token Bucket)
令牌桶算法原理
令牌桶算法通过以下机制控制请求速率:
令牌生成:以固定速率(如每秒10个)向桶中添加令牌。
桶容量:桶中最多存储的令牌数(如100个),超出部分丢弃。
请求处理:请求到达时尝试获取令牌,获取成功则处理,否则拒绝。
突发处理:桶内积累的令牌允许一次性消耗,支持突发流量。
实现示例
- 创建令牌桶组件
@Component
public class TokenBucket {private final int capacity; // 桶容量private final int refillRate; // 每秒填充的令牌数private AtomicInteger tokens; // 当前令牌数private long lastRefillTime; // 上次填充时间public TokenBucket(@Value("${token-bucket.capacity:100}") int capacity,@Value("${token-bucket.refill-rate:10}") int refillRate) {this.capacity = capacity;this.refillRate = refillRate;this.tokens = new AtomicInteger(capacity);this.lastRefillTime = System.currentTimeMillis();}// 尝试获取令牌public boolean tryAcquire() {refillTokens(); // 先补充令牌while (true) {int current = tokens.get();if (current <= 0) {return false;}if (tokens.compareAndSet(current, current - 1)) {return true;}}}// 补充令牌(线程安全)private synchronized void refillTokens() {long now = System.currentTimeMillis();long elapsedTime = now - lastRefillTime;int tokensToAdd = (int) (elapsedTime * refillRate / 1000);if (tokensToAdd > 0) {tokens.set(Math.min(tokens.get() + tokensToAdd, capacity));lastRefillTime = now;}}
}
- controller
@RestController
public class ApiController {private final TokenBucket tokenBucket;public ApiController(TokenBucket tokenBucket) {this.tokenBucket = tokenBucket;}@GetMapping("/api")public ResponseEntity<String> handleRequest() {if (tokenBucket.tryAcquire()) {return ResponseEntity.ok("请求处理成功");} else {return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body("请求过多,请稍后重试");}}
}
令牌桶算法 vs 漏桶算法
令牌桶通过以下方式支持突发流量:
令牌积累:当请求较少时,未使用的令牌会在桶中积累(最多达到桶容量)。
突发消耗:当突发请求到达时,可一次性消耗所有积累的令牌,处理多个请求。
常见限流实现
Guava 的 RateLimiter(单机限流)
基于令牌桶算法,适合单机限流。
优点:简单易用,适合单机场景。
缺点:不支持分布式环境。
- 添加依赖
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.0.1-jre</version>
</dependency>
- 在Controller中使用RateLimiter:
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class DemoController {// 每秒允许2个请求private final RateLimiter rateLimiter = RateLimiter.create(2.0);@GetMapping("/api/resource")public String getResource() {if (rateLimiter.tryAcquire()) { // 尝试获取令牌return "Success";} else {return "Too many requests!";}}
}
基于 AOP + 自定义注解(单机限流)
通过AOP拦截方法调用,实现限流逻辑。
优点:灵活,可自定义限流规则。
缺点:单机限流,计数器未持久化。
- 定义限流注解:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimit {int value(); // 允许的请求数int timeWindow() default 60; // 时间窗口(秒)
}
- aop切面
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;@Aspect
@Component
public class RateLimitAspect {private final ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();private final ConcurrentHashMap<String, Long> timestamps = new ConcurrentHashMap<>();@Around("@annotation(rateLimit)")public Object limit(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {String key = pjp.getSignature().toLongString();int limit = rateLimit.value();long window = rateLimit.timeWindow() * 1000L;long now = System.currentTimeMillis();synchronized (this) {if (timestamps.getOrDefault(key, 0L) < now - window) {counters.put(key, new AtomicInteger(0));timestamps.put(key, now);}AtomicInteger count = counters.get(key);if (count != null && count.incrementAndGet() > limit) {throw new RuntimeException("Rate limit exceeded");}}return pjp.proceed();}
}
- Controller
@RateLimit(value = 10, timeWindow = 60) // 60秒内允许10次请求
@GetMapping("/limited-api")
public String limitedApi() {return "Processed";
}
Redis + Lua 脚本(分布式限流)
利用Redis的原子性操作实现分布式限流。
优点:支持分布式环境,高并发安全。
缺点:依赖Redis,需要维护脚本。
固定窗口计数器示例:
- redis依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- Lua脚本 rate_limiter.lua
-- KEYS[1] = 限流键, ARGV[1] = 窗口时间(秒), ARGV[2] = 最大请求数
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])local current = redis.call('GET', key)
if current and tonumber(current) >= limit thenreturn 0 -- 超过阈值,拒绝请求
elseredis.call('INCR', key)if tonumber(current) == 0 then -- 首次设置过期时间redis.call('EXPIRE', key, window)endreturn 1 -- 允许请求
end
- 在Service中调用Lua脚本
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import java.util.Collections;@Service
public class RateLimitService {private final RedisTemplate<String, String> redisTemplate;private final RedisScript<Long> rateLimiterScript;public RateLimitService(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;this.rateLimiterScript = RedisScript.of(new ClassPathResource("rate_limiter.lua"), Long.class);}public boolean allowRequest(String key, int limit, int windowSec) {Long result = redisTemplate.execute(rateLimiterScript,Collections.singletonList(key), // 限流键 列表 String.valueOf(windowSec) // ARGV 参数String.valueOf(limit) // ARGV 参数);return result != null && result == 1;}
}
- Controller
@GetMapping("/distributed-api")
public String distributedApi() {String key = "user:123:api"; // 根据用户或接口生成唯一keyif (rateLimitService.allowRequest(key, 60, 100)) {return "Success";} else {return "Too many requests!";}
}
使用 Sentinel(生产级方案)
通过阿里开源的Sentinel实现流量控制、熔断降级。
优点:功能强大,支持动态规则配置。
缺点:需要额外部署控制台。
- 添加依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.0.5.0</version>
</dependency>
- 配置资源点
@GetMapping("/sentinel-api")
@SentinelResource(value = "sentinelApi", blockHandler = "blockHandler")
public String sentinelApi() {return "Success";
}public String blockHandler(BlockException ex) {return "Blocked by Sentinel";
}
- 在Sentinel控制台配置规则(QPS=2)
// 代码初始化规则(可选)
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("sentinelApi");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(2);
rules.add(rule);
FlowRuleManager.loadRules(rules);
Spring Cloud Gateway 限流(网关层限流)
适用于微服务架构,在网关层统一限流。
优点:网关层统一管控,适合微服务。
缺点:需部署网关和Redis。
- 添加依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
- 配置路由规则(application.yml)
spring:cloud:gateway:routes:- id: my_routeuri: http://localhost:8080predicates:- Path=/api/**filters:- name: RequestRateLimiterargs:redis-rate-limiter.replenishRate: 1 # 每秒1个令牌redis-rate-limiter.burstCapacity: 2 # 桶容量key-resolver: "#{@userKeyResolver}"
- 定义KeyResolver(按用户限流)
@Bean
public KeyResolver userKeyResolver() {return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}