内容安全优化:基于Redis实现分级反爬虫策略
摘要:基于 Redis 实现分级反爬虫策略,提升系统安全性。
分级题目反爬虫策略
需求分析
对于以内容为卖点的网站(尤其是知识付费网站),网站上的数据可能被爬虫程序大规模抓取,导致竞争对手获得不当优势或者数据被滥用。而且恶意爬虫的频繁请求也会导致服务器的负载增加,影响正常用户的访问体验。
所以我们需要一些反爬虫的措施,来保护我们的系统和内容。
方案设计
反爬虫的手段
反爬的手段,包括使用协议条款、业务层处理、接入专业的反爬监测系统、设置溯源手段等等。
💡 有读者可能会想到 "限流"。限流确实可以用来减缓爬虫的请求频率,通过限制每个用户或 IP 在一定时间内的请求数量来降低对系统的压力。然而,限流的核心目的是保护系统的可用性和性能,防止系统因瞬时过载而崩溃。它更像是一种流量控制措施,而不是专门的防爬策略。
如何反爬虫,是一门艺术,无法做到 100% 防御,但是可以 根据实际业务定制 一些特殊的限制逻辑、处理逻辑、追溯手段。即使防不住爬虫,但我们可以通过日志等途径定位到盗取内容或影响我们系统的用户,进行封号限制或追责。这也是本文中要带大家实践的内容。
多级反爬虫策略
对于我们的面试刷题网站,假设用户要浏览题目的答案(或者其他要保护的内容),那么 必须要先登录,这样才能够进行追溯,可以简单统计用户访问频率来限制数据的爬取。
频率限制为多少呢?
通过分析我们的系统使用情况、再加上人为主观判断:正常用户浏览一道面试题的时间是分钟级别的。我们稍微放宽一点,默认同一用户每分钟最多能够获取 10 道题目。
建议采用分级反爬虫策略,先告警、再采取强制措施,可以有效减少误封的风险:
- 如果每分钟超过 10 道题目,则给管理员发送告警,比如发送邮件或者短信。
- 如果每分钟超过 20 道题目,则直接将账号踢下线,且进行封号操作。(或者限制一段时间无法访问)
具体的策略大家可以针对实际情况自己定制。
那么如何统计用户访问题目的频率呢?有下面 3 种方案:
统计访问频率 - 结合已有系统
在本项目之前的教程中,我们学习了 Hotkey 热 key 探测系统和 Sentinel 流控系统,这些系统的实现关键都在于如何统计一段时间内的调用频率。
所以我们完全可以利用这些系统帮我们进行统计,比如在 Hotkey 中配置热 key 的规则(业务前缀 + 用户 id 作为 key),如果发现一个 key 成为了热 key,就表示该 key 对应的用户访问题目过多,就可以进行后续处理。
或者在 Sentinel 中配置热点参数限流策略,如果某用户访问题目过多,该用户后续访问时会抛出流量阻塞异常,就可以进行后续处理。
这种方案的优点是利用专业的系统进行统计,更精准更可靠;缺点就是要依靠第三方依赖保证统计的可用性,万一 Sentinel 或 Hotkey 挂了,统计也失效了。
统计访问频率 - 基于本地计数器(单机)
如果项目采用单机部署,可以使用 LongAdder
进行高并发的计数操作。
LongAdder 是 Java 中并发包提供的一个计数器,它扩展了 AtomicLong,主要通过分段(分散热点)来减少多线程竞争,能够在高并发情况下提供比 AtomicLong 更高的性能。
其工作原理是将计数值分散到多个独立的变量中,并且每个线程只更新其中一个变量,这样可以减少多个线程同时访问同一个变量的竞争。当需要获取总值时,会将这些变量的值汇总起来。
假设我们需要统计每个用户每分钟的访问次数,可以通过以下方式实现:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;public class RequestRateLimiter {// 用于存储每个用户的访问次数计数器private ConcurrentHashMap<String, LongAdder> userRequestCounts = new ConcurrentHashMap<>();// 统计的时间间隔(比如每分钟重置一次计数)private final long interval;public RequestRateLimiter(long intervalInSeconds) {this.interval = intervalInSeconds;startResetTask();}/*** 每次用户访问时调用此方法* @param userId 用户的唯一标识符*/public void recordRequest(String userId) {// 获取或者初始化用户的访问计数器userRequestCounts.computeIfAbsent(userId, key -> new LongAdder()).increment();}/*** 获取用户的当前访问次数* @param userId 用户的唯一标识符* @return 用户的访问次数*/public long getRequestCount(String userId) {return userRequestCounts.getOrDefault(userId, new LongAdder()).sum();}/*** 定期重置每个用户的访问计数器*/private void startResetTask() {// 定时任务,每隔指定的时间间隔重置计数new Thread(() -> {while (true) {try {TimeUnit.SECONDS.sleep(interval); // 等待指定的时间间隔} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}// 重置每个用户的计数器userRequestCounts.clear();}}).start();}public static void main(String[] args) {RequestRateLimiter limiter = new RequestRateLimiter(60); // 每分钟重置一次// 模拟用户请求limiter.recordRequest("user1");limiter.recordRequest("user1");System.out.println("User1's current request count: " + limiter.getRequestCount("user1")); // 输出 2}
}
上述代码中,我们通过定时任务,每隔指定的时间间隔重置计数。
统计访问频率 - 基于 Redis 统计(分布式)
如果项目多机分布式部署,那么就需要利用一个独立的持久化存储来统计访问频率。
分布式存储 Redis 的 string 结构支持 incr 累加操作,可以对每个用户分钟(或其他时间精度)级别的访问次数进行累加统计。
1)设计 Redis 键值对
要能区分出用户和时间窗,示例 key 为:user:access:{userId}:{timestamp_in_minutes}
- {userId} 是用户 ID。
- {timestamp_in_minutes} 是当前的分钟级时间戳,即将当前时间戳转化为分钟,这样每分钟的访问都会被统计到一个 key 中。
每个 key 的 value,就是该用户在这分钟内的访问次数。
2)Redis 操作逻辑
对 Redis 的操作包括设置 key、给 key 增加计数、给 key 设置过期时间。
听起来并不难,但很多同学可能会使用如下的代码:
//假设使用 jedis 客户端
// 使用 Redis 的 INCR 操作增加当前秒的访问次数
jedis.incr(redisKey);// 设置过期时间(TTL),例如只保存60秒的数据
jedis.expire(redisKey, 60); // 60秒后自动过期
然而,这种方法存在潜在的设计陷阱:incr 和 expire 是两个独立的操作。如果你在高并发情况下调用 incr() 之后发生上下文切换(比如另一个线程执行操作),可能会导致两个问题:
- 过期时间重置:如果在高并发场景下,多次调用 incr() 后又多次调用 expire(),可能会不断重置该 key 的过期时间,导致这个 key 永远不会过期。(当前场景不会,但是连续性访问的场景就有可能发生)
- 非原子操作:由于 incr() 和 expire() 是独立操作,在并发情况下,两个线程都可能先判断 key 不存在,然后各自执行 set 操作,导致计数逻辑出错。
所以我们要确保计数和过期时间的操作是原子性的,可以使用 Redis 的 Lua 脚本来完成。如果 key 不存在,则初始化并设置过期时间,否则只进行计数。
示例脚本如下:
String luaScript = "if redis.call('exists', KEYS[1]) == 1 then " +" return redis.call('incr', KEYS[1]); " +"else " +" redis.call('set', KEYS[1], 1); " +" redis.call('expire', KEYS[1], 180); " + // 设置 180 秒过期时间" return 1; " +"end";
这里一定要给 Redis key 设置过期时间!因为统计超过一分钟,之前的数据就没什么用了。我选择 180 秒作为过期时间的目的是,给当前时间窗口(如 60 秒)多留一点冗余时间,以防止意外情况下需要用到历史记录,这是一个习惯。
对于我们的项目,主要的目的是反爬虫,而不是应对高并发大流量的请求,所以不需要结合 Sentinel 或 Hotkey 去精确统计流量。自主实现固定时间窗口(1 分钟)的访问频率统计就足够了。为了便于项目扩展为分布式,使用 Redis 方案来实现。
后端开发
1 通用计数器
可以在 manager 包中封装一个计数器类,作为通用能力,便于复用。代码如下:
@Slf4j
@Service
public class CounterManager {@Resourceprivate RedissonClient redissonClient;/*** 增加并返回计数,默认统计一分钟内的计数结果** @param key 缓存键* @return*/public long incrAndGetCounter(String key) {return incrAndGetCounter(key, 1, TimeUnit.MINUTES);}/*** 增加并返回计数** @param key 缓存键* @param timeInterval 时间间隔* @param timeUnit 时间间隔单位* @return*/public long incrAndGetCounter(String key, int timeInterval, TimeUnit timeUnit) {int expirationTimeInSeconds;switch (timeUnit) {case SECONDS:expirationTimeInSeconds = timeInterval;break;case MINUTES:expirationTimeInSeconds = timeInterval * 60;break;case HOURS:expirationTimeInSeconds = timeInterval * 60 * 60;break;default:throw new IllegalArgumentException("Unsupported TimeUnit. Use SECONDS, MINUTES, or HOURS.");}return incrAndGetCounter(key, timeInterval, timeUnit, expirationTimeInSeconds);}/*** 增加并返回计数** @param key 缓存键* @param timeInterval 时间间隔* @param timeUnit 时间间隔单位* @param expirationTimeInSeconds 计数器缓存过期时间* @return*/public long incrAndGetCounter(String key, int timeInterval, TimeUnit timeUnit, int expirationTimeInSeconds) {if (StrUtil.isBlank(key)) {return 0;}// 根据时间粒度生成 redisKeylong timeFactor;switch (timeUnit) {case SECONDS:timeFactor = Instant.now().getEpochSecond() / timeInterval;break;case MINUTES:timeFactor = Instant.now().getEpochSecond() / 60 / timeInterval;break;case HOURS:timeFactor = Instant.now().getEpochSecond() / 3600 / timeInterval;break;default:throw new IllegalArgumentException("Unsupported TimeUnit. Use SECONDS, MINUTES, or HOURS.");}String redisKey = key + ":" + timeFactor;// Lua 脚本String luaScript ="if redis.call('exists', KEYS[1]) == 1 then " +" return redis.call('incr', KEYS[1]); " +"else " +" redis.call('set', KEYS[1], 1); " +" redis.call('expire', KEYS[1], ARGV[1]); " +" return 1; " +"end";// 执行 Lua 脚本RScript script = redissonClient.getScript(IntegerCodec.INSTANCE);Object countObj = script.eval(RScript.Mode.READ_WRITE,luaScript,RScript.ReturnType.INTEGER,Collections.singletonList(redisKey), expirationTimeInSeconds);return (long) countObj;}
}
其中,通过 Java 8 的 java.time.Instant
类获取 {timestamp_in_minutes}:
// getEpochSecond 获取秒级时间戳,除以 60 为分钟,除以 3600 为小时
long timestampInMinutes = Instant.now().getEpochSecond() / 60;
通过 Redisson 提交 Lua 脚本到 Redis 执行:
RScript script = redissonClient.getScript(IntegerCodec.INSTANCE);
Object countObj = script.eval(RScript.Mode.READ_WRITE,luaScript,RScript.ReturnType.INTEGER,Collections.singletonList(redisKey), expirationTimeInSeconds);
2 检测爬虫逻辑
针对我们的需求,编写一个检测爬虫并分级处理的方法。每次请求题目时,都会调用该方法。
代码如下:
@Resource
private CounterManager counterManager;/*** 检测爬虫** @param loginUserId*/
private void crawlerDetect(long loginUserId) {// 调用多少次时告警final int WARN_COUNT = 10;// 超过多少次封号final int BAN_COUNT = 20;// 拼接访问 keyString key = String.format("user:access:%s", loginUserId);// 一分钟内访问次数,180 秒过期long count = counterManager.incrAndGetCounter(key, 1, TimeUnit.MINUTES, 180);// 是否封号if (count > BAN_COUNT) {// 踢下线StpUtil.kickout(loginUserId);// 封号User updateUser = new User();updateUser.setId(loginUserId);updateUser.setUserRole("ban");userService.updateById(updateUser);throw new BusinessException(ErrorCode.NO_AUTH_ERROR, "访问太频繁,已被封号");}// 是否告警if (count == WARN_COUNT) {// 可以改为向管理员发送邮件通知throw new BusinessException(110, "警告访问太频繁");}
}
3 使用检测爬虫方法
在 getQuestionVOById 接口中调用检测爬虫的方法即可:
@GetMapping("/get/vo")
public BaseResponse<QuestionVO> getQuestionVOById(long id, HttpServletRequest request) {ThrowUtils.throwIf(id <= 0, ErrorCode.PARAMS_ERROR);// 检测和处置爬虫User loginUser = userService.getLoginUser(request);crawlerDetect(loginUser.getId());// 查询数据库Question question = questionService.getById(id);ThrowUtils.throwIf(question == null, ErrorCode.NOT_FOUND_ERROR);// 获取封装类return ResultUtils.success(questionService.getQuestionVO(question, request));
}
大功告成!