redisTemplate简单实现幂等性校验
@Service
public class MessageConsumerService {private final StringRedisTemplate redisTemplate;private static final String PROCESSED_MSG_KEY = "processed_msgs";private static final long EXPIRE_TIME = 7; // 过期时间(天)public MessageConsumerService(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void processMessage(String msgId, String payload) {// 1. 原子性检查消息是否已处理 + 添加到已处理集合;设置过期时间,避免集合无限增长Boolean added = redisTemplate.opsForSet().add(PROCESSED_MSG_KEY, msgId,7, TimeUnit.DAYS);// 2. 如果返回 null 或 false,表示消息已存在,直接返回if (added == null || !added) {log.info("消息已处理,跳过: {}", msgId);return;}try {// 4. 处理实际业务逻辑doProcess(payload);} catch (Exception e) {// 5. 业务处理失败时,需移除已处理标记(可选)redisTemplate.opsForSet().remove(PROCESSED_MSG_KEY, msgId);throw e;}}private void doProcess(String payload) {// 业务逻辑实现log.info("处理消息: {}", payload);}
}
幂等性校验的关键要素
- 唯一消息 ID
消息发送方必须生成全局唯一的 msgId
常见实现方式:UUID、雪花算法(Snowflake)、业务主键哈希 - 原子性操作
必须保证 检查存在性 和 标记已处理 是原子操作
推荐使用 Redis 的 Lua 脚本或 SADD 命令(返回值可判断是否新增) - 过期时间设置
避免集合无限增长,占用过多内存
过期时间应大于消息可能的最大重试时间窗口(如 7 天) - 异常处理
业务处理失败时,是否需要回滚已处理标记?
若选择回滚,需保证失败时原子性移除标记
若不回滚,可能导致消息永久不被处理(需结合死信队列)