当前位置: 首页 > news >正文

Redis 7.0中5种新特性及实战应用

Redis 7.0引入了多项革命性的新特性,不仅在性能和可靠性方面有所提升,更在功能和使用体验上有了质的飞跃。本文将介绍Redis 7.0的五大关键新特性,可以根据实际情况利用Redis 7.0的强大功能,构建更高效、更可靠的应用系统。

特性一:Redis Functions(函数存储)

技术原理

Redis Functions是Redis 7.0引入的重量级特性,它允许开发者将Lua脚本作为命名函数存储在Redis服务器中。与传统的EVAL命令不同,Redis Functions支持创建库(Library)的概念,可以将相关功能的函数组织在一起,提供更好的可管理性。

关键优势:

  • 函数持久化存储在Redis中,无需每次连接时重新加载
  • 支持函数版本管理和库的概念
  • 提供更好的权限控制和可观测性
  • 减少网络传输开销,提高执行效率

实现示例

创建和注册函数库
# 创建一个简单的计数器函数库
FUNCTION LOAD "
#!lua name=mycounterredis.register_function('incr_by_and_get', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local result = redis.call('INCRBY', key, increment)return result
end)redis.register_function('get_and_incr_by', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local current = tonumber(redis.call('GET', key)) or 0redis.call('INCRBY', key, increment)return current
end)
"
调用函数
# 使用FCALL命令调用函数
FCALL incr_by_and_get 1 my_counter 5
# 返回递增后的值,比如 5FCALL get_and_incr_by 1 my_counter 3
# 返回递增前的值,比如 5(然后递增到 8)
Java客户端示例
@Service
public class RedisCounterService {private final StringRedisTemplate redisTemplate;public RedisCounterService(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void initializeCounterFunctions() {String functionScript = """#!lua name=mycounterredis.register_function('incr_by_and_get', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local result = redis.call('INCRBY', key, increment)return resultend)redis.register_function('get_and_incr_by', function(keys, args)local key = keys[1]local increment = tonumber(args[1])local current = tonumber(redis.call('GET', key)) or 0redis.call('INCRBY', key, increment)return currentend)""";try {redisTemplate.execute((RedisCallback<Object>) connection -> {connection.serverCommands().functionLoad(functionScript);return null;});} catch (Exception e) {// 处理已存在的情况if (e.getMessage().contains("already exists")) {log.info("Counter functions already loaded");} else {throw e;}}}public Long incrementAndGet(String counterKey, int increment) {return redisTemplate.execute((RedisCallback<Long>) connection -> (Long) connection.serverCommands().functionCall("incr_by_and_get", Arrays.asList(counterKey), Arrays.asList(String.valueOf(increment))));}public Long getAndIncrement(String counterKey, int increment) {return redisTemplate.execute((RedisCallback<Long>) connection -> (Long) connection.serverCommands().functionCall("get_and_incr_by", Arrays.asList(counterKey), Arrays.asList(String.valueOf(increment))));}
}

实际应用场景

1. 原子计数器与限流器

函数库可以实现复杂的计数逻辑,如分布式限流器、访问频率控制等。

#!lua name=ratelimiter-- 令牌桶限流算法
redis.register_function('acquire_token', function(keys, args)local key = keys[1]local capacity = tonumber(args[1])local rate = tonumber(args[2])local requested = tonumber(args[3]) or 1local now = tonumber(redis.call('TIME')[1])-- 获取当前桶状态local bucket = redis.call('HMGET', key, 'last_refill', 'tokens')local last_refill = tonumber(bucket[1]) or nowlocal tokens = tonumber(bucket[2]) or capacity-- 计算令牌补充local elapsed = now - last_refilllocal new_tokens = math.min(capacity, tokens + elapsed * rate)-- 尝试获取令牌if new_tokens >= requested thennew_tokens = new_tokens - requestedredis.call('HMSET', key, 'last_refill', now, 'tokens', new_tokens)return 1 -- 成功elseredis.call('HMSET', key, 'last_refill', now, 'tokens', new_tokens)return 0 -- 失败end
end)
2. 复杂业务逻辑封装

电商场景中的下单流程,涉及库存检查、价格计算、订单创建等多个步骤:

#!lua name=ordersystem-- 下单流程
redis.register_function('create_order', function(keys, args)local product_key = keys[1]local order_key = keys[2]local product_id = args[1]local quantity = tonumber(args[2])local user_id = args[3]-- 检查库存local stock = tonumber(redis.call('HGET', product_key, 'stock'))if not stock or stock < quantity thenreturn {err = "Insufficient stock"}end-- 获取价格local price = tonumber(redis.call('HGET', product_key, 'price'))if not price thenreturn {err = "Product price not found"}end-- 创建订单local order_id = redis.call('INCR', 'order:id:counter')local total = price * quantity-- 减库存redis.call('HINCRBY', product_key, 'stock', -quantity)-- 保存订单local order_data = {id = order_id,user_id = user_id,product_id = product_id,quantity = quantity,price = price,total = total,status = "created",create_time = redis.call('TIME')[1]}redis.call('HMSET', order_key .. order_id, 'id', order_data.id,'user_id', order_data.user_id,'product_id', order_data.product_id,'quantity', order_data.quantity,'price', order_data.price,'total', order_data.total,'status', order_data.status,'create_time', order_data.create_time)-- 添加到用户订单列表redis.call('SADD', 'user:' .. user_id .. ':orders', order_id)return {order_id = order_id,total = total}
end)
3. 数据一致性保证

在需要保证多个操作原子性的场景中特别有用,如积分兑换:

#!lua name=pointsystem-- 积分兑换
redis.register_function('redeem_points', function(keys, args)local user_points_key = keys[1]local reward_key = keys[2]local user_rewards_key = keys[3]local user_id = args[1]local reward_id = args[2]local required_points = tonumber(args[3])-- 检查用户积分local current_points = tonumber(redis.call('GET', user_points_key)) or 0if current_points < required_points thenreturn {success = false, reason = "Insufficient points"}end-- 检查奖励是否有效local reward_exists = redis.call('EXISTS', reward_key)if reward_exists == 0 thenreturn {success = false, reason = "Reward not found"}end-- 扣减积分redis.call('DECRBY', user_points_key, required_points)-- 记录兑换历史local redeem_id = redis.call('INCR', 'redeem:id:counter')redis.call('HMSET', 'redeem:' .. redeem_id,'user_id', user_id,'reward_id', reward_id,'points', required_points,'time', redis.call('TIME')[1])-- 添加到用户奖励列表redis.call('SADD', user_rewards_key, reward_id)return {success = true,redeem_id = redeem_id,remaining_points = current_points - required_points}
end)

最佳实践

  1. 功能分组:按业务功能将相关函数组织到同一个库中,提高代码可维护性
  2. 版本管理:为函数库添加版本信息,便于升级和回滚
  3. 错误处理:在Lua函数中添加完善的错误处理逻辑
  4. 权限控制:结合ACL限制函数的调用权限
  5. 单一职责:每个函数保持功能单一,避免过于复杂的逻辑

特性二:分片发布/订阅(Sharded Pub/Sub)

技术原理

Redis 7.0引入了分片发布/订阅功能,这是对传统Pub/Sub模型的重要增强。传统的Pub/Sub在集群环境下存在效率和可扩展性问题,因为消息需要在所有节点间广播。分片Pub/Sub通过将频道分布到特定的节点,实现了更高效的消息传递。

关键优势:

  • 消息只在特定节点处理,减少网络开销
  • 频道数据和订阅信息只存储在特定节点,降低内存使用
  • 更好的可扩展性,适合大规模Redis集群
  • 避免了全局广播带来的性能问题

实现示例

Redis命令
# 订阅分片频道
SSUBSCRIBE news.sports# 向分片频道发布消息
SPUBLISH news.sports "Team A won the championship"# 退订分片频道
SUNSUBSCRIBE news.sports
Java实现
@Service
public class ShardedPubSubService {private final RedisTemplate<String, String> redisTemplate;private final Map<String, MessageListener> subscriptions = new ConcurrentHashMap<>();public ShardedPubSubService(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;}public void publish(String channel, String message) {redisTemplate.execute((RedisCallback<Long>) connection -> {// 使用底层连接直接执行SPUBLISH命令return connection.execute("SPUBLISH", channel.getBytes(), message.getBytes());});}public void subscribe(String channel, Consumer<String> messageHandler) {MessageListener listener = (message, pattern) -> messageHandler.accept(new String(message.getBody()));RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisTemplate.getConnectionFactory());container.afterPropertiesSet();// 注册分片订阅container.addMessageListener(listener, new PatternTopic("__shard__:" + channel));// 存储引用以便后续取消订阅subscriptions.put(channel, listener);}public void unsubscribe(String channel) {MessageListener listener = subscriptions.remove(channel);if (listener != null) {redisTemplate.execute((RedisCallback<Void>) connection -> {connection.execute("SUNSUBSCRIBE", channel.getBytes());return null;});}}
}

实际应用场景

1. 地理位置感知的消息推送

在基于地理位置的应用中,可以使用分片Pub/Sub向特定区域的用户推送消息:

@Service
public class LocationBasedNotificationService {private final ShardedPubSubService pubSubService;private final UserLocationService locationService;// 发送区域通知public void sendAreaNotification(String areaCode, String message) {String channel = "location.area." + areaCode;pubSubService.publish(channel, message);}// 用户订阅自己所在区域的通知public void subscribeUserToAreaNotifications(String userId) {String userArea = locationService.getUserAreaCode(userId);String channel = "location.area." + userArea;pubSubService.subscribe(channel, message -> {// 处理接收到的区域通知notifyUser(userId, message);});}
}
2. 实时聊天系统

分片Pub/Sub非常适合大规模聊天应用,减轻服务器负担:

@Service
public class ChatService {private final ShardedPubSubService pubSubService;// 发送聊天消息public void sendChatMessage(String roomId, ChatMessage message) {String channel = "chat.room." + roomId;String messageJson = objectMapper.writeValueAsString(message);pubSubService.publish(channel, messageJson);}// 用户加入聊天室public void joinChatRoom(String userId, String roomId) {String channel = "chat.room." + roomId;// 订阅聊天室消息pubSubService.subscribe(channel, messageJson -> {ChatMessage message = objectMapper.readValue(messageJson, ChatMessage.class);// 将消息发送到用户WebSocketwebSocketService.sendToUser(userId, message);});// 发送加入通知ChatMessage joinMessage = new ChatMessage("system", userId + " joined the room", System.currentTimeMillis());pubSubService.publish(channel, objectMapper.writeValueAsString(joinMessage));}
}
3. 分布式系统状态同步

使用分片Pub/Sub实现微服务间的高效状态同步:

@Service
public class SystemStateManager {private final ShardedPubSubService pubSubService;@PostConstructpublic void init() {// 订阅配置更新pubSubService.subscribe("system.config", this::handleConfigUpdate);// 订阅服务状态变更pubSubService.subscribe("system.service." + getCurrentServiceName(), this::handleServiceCommand);}// 发布配置变更public void publishConfigChange(String configKey, String configValue) {ConfigChangeEvent event = new ConfigChangeEvent(configKey, configValue, System.currentTimeMillis());pubSubService.publish("system.config", objectMapper.writeValueAsString(event));}// 发送服务指令public void sendServiceCommand(String serviceName, String command, Map<String, Object> params) {ServiceCommand cmd = new ServiceCommand(command, params, System.currentTimeMillis());pubSubService.publish("system.service." + serviceName, objectMapper.writeValueAsString(cmd));}private void handleConfigUpdate(String message) {ConfigChangeEvent event = objectMapper.readValue(message, ConfigChangeEvent.class);// 更新本地配置configManager.updateConfig(event.getKey(), event.getValue());}private void handleServiceCommand(String message) {ServiceCommand command = objectMapper.readValue(message, ServiceCommand.class);// 执行命令commandExecutor.execute(command.getCommand(), command.getParams());}
}

最佳实践

  1. 频道命名规范:使用层次化的命名方式(如"category.subcategory.id")
  2. 消息序列化:使用JSON或其他格式序列化消息,便于跨语言使用
  3. 错误处理:在订阅处理程序中添加异常处理逻辑
  4. 合理分片:根据业务特性合理设计频道分布
  5. 组合传统Pub/Sub:某些需要全局广播的场景可以继续使用传统Pub/Sub

特性三:多部分AOF(Multi-part AOF)

技术原理

Redis 7.0对AOF(Append Only File)持久化机制进行了重大改进,引入了多部分AOF文件结构。传统AOF是单一文件,在重写时会导致磁盘压力和性能波动。新的多部分AOF由基础文件(base files)和增量文件(incremental files)组成,提供更高效的持久化机制。

关键优势:

  • 减少AOF重写期间的磁盘I/O压力
  • 降低内存使用峰值
  • 更快的重写过程,减少性能波动
  • 可靠性提升,减少数据丢失风险

配置示例

# redis.conf 配置# 启用AOF持久化
appendonly yes# 使用新的多部分AOF格式
aof-use-rdb-preamble yes# 设置AOF目录
dir /data/redis# 文件名前缀 (Redis 7.0新增)
appendfilename "appendonly.aof"# AOF自动重写
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

工作原理详解

多部分AOF的工作流程如下:

  1. 初始化:Redis在启动时创建一个基础(base)AOF文件和一个增量(incr)AOF文件
  2. 命令记录:新的写入命令追加到增量AOF文件中
  3. 重写触发:当满足重写条件时,Redis创建一个新的基础文件
  4. 文件管理:重写完成后,历史增量文件被清理,新的增量文件开始接收命令

文件命名:

  • 基础文件:appendonly_base.aof
  • 增量文件:appendonly_incr.aof.{seq}
  • 清单文件:appendonly.manifest.json(包含所有AOF文件的信息)

实际应用场景

1. 高写入量数据库优化

对于高频写入的Redis实例,多部分AOF可以显著减少写入峰值带来的性能波动:

# 针对高写入量的配置优化# 使用更激进的AOF fsync策略,减少数据丢失风险
appendfsync everysec# 优化内存使用
aof-rewrite-incremental-fsync yes# 增加重写阈值,减少重写频率
auto-aof-rewrite-percentage 200
auto-aof-rewrite-min-size 128mb
2. 快速恢复方案实现

利用多部分AOF特性实现更快的数据恢复策略:

@Service
public class RedisPersistenceManager {@Value("${redis.data.dir}")private String redisDataDir;// 执行AOF分析public AOFAnalysisResult analyzeAOF() {File manifestFile = new File(redisDataDir, "appendonly.manifest.json");if (!manifestFile.exists()) {return new AOFAnalysisResult(false, "Manifest file not found");}// 解析清单文件AOFManifest manifest = objectMapper.readValue(manifestFile, AOFManifest.class);// 分析AOF文件信息long totalSize = 0;long commandCount = 0;File baseFile = new File(redisDataDir, manifest.getBaseAofName());totalSize += baseFile.length();for (String incrFile : manifest.getIncrAofNames()) {File f = new File(redisDataDir, incrFile);totalSize += f.length();commandCount += countCommands(f);}return new AOFAnalysisResult(true, "Base: " + manifest.getBaseAofName() + ", Incremental: " + manifest.getIncrAofNames().size() + ", Total size: " + formatSize(totalSize) + ", Commands: " + commandCount);}// 手动触发AOF重写public void triggerAofRewrite() {redisTemplate.execute((RedisCallback<String>) connection -> {connection.serverCommands().bgrewriteaof();return null;});}
}
3. 系统升级与迁移策略

在系统升级或Redis迁移场景中,利用多部分AOF简化流程:

@Service
public class RedisMigrationService {// 准备Redis迁移public MigrationPlan prepareMigration(String sourceRedisUrl, String targetRedisUrl) {MigrationPlan plan = new MigrationPlan();// 1. 分析源Redis AOF状态AOFAnalysisResult aofAnalysis = analyzeSourceRedisAOF(sourceRedisUrl);plan.setAofAnalysis(aofAnalysis);// 2. 如果未使用多部分AOF,建议升级if (!aofAnalysis.isMultiPartAof()) {plan.addStep("Enable multi-part AOF on source Redis");}// 3. 触发AOF重写以创建干净的基础文件plan.addStep("Trigger AOF rewrite to create clean base file");// 4. 设置数据传输步骤plan.addStep("Copy base AOF file to target server");plan.addStep("Start target Redis with base file");plan.addStep("Continue copying incremental files during migration");return plan;}// 执行迁移过程中的增量同步public void syncIncrementalAOF(String sourceRedisDataDir, String targetRedisDataDir) {// 读取源Redis的清单文件AOFManifest sourceManifest = readManifest(sourceRedisDataDir);// 读取目标Redis的清单文件AOFManifest targetManifest = readManifest(targetRedisDataDir);// 找出目标缺少的增量文件List<String> filesToSync = new ArrayList<>();for (String incrFile : sourceManifest.getIncrAofNames()) {if (!targetManifest.getIncrAofNames().contains(incrFile)) {filesToSync.add(incrFile);}}// 同步缺少的文件for (String file : filesToSync) {copyFile(new File(sourceRedisDataDir, file), new File(targetRedisDataDir, file));}// 更新目标Redis的清单文件writeManifest(targetRedisDataDir, sourceManifest);}
}

最佳实践

  1. 磁盘规划:为AOF文件分配足够的磁盘空间,预留增长空间
  2. 监控AOF状态:定期检查AOF文件大小和重写频率
  3. 备份策略:将AOF文件纳入常规备份计划
  4. fsync策略选择:根据数据重要性和性能需求选择合适的fsync策略
  5. 与RDB结合:在某些场景下同时启用RDB快照,提供额外保护

特性四:访问控制列表(ACL)增强

技术原理

Redis 7.0对ACL(Access Control List)系统进行了重要增强,提供了更精细的权限控制机制。新增的ACL功能包括Pub/Sub频道权限控制、KEYS命令的模式匹配权限,以及针对Redis Functions的权限管理。

关键增强:

  • 支持对Pub/Sub频道的读写权限控制
  • 能够定义KEYS命令可查询的键模式
  • 对Redis Functions的调用权限控制
  • 改进的权限继承和组合模式

实现示例

ACL规则配置
# 创建只能访问特定前缀键且只读的用户
ACL SETUSER readonly ON >secret123 ~product:* +get +scan +keys +zrange +hgetall# 创建有Pub/Sub特定频道权限的用户
ACL SETUSER publisher ON >pubpassword ~notification:* +@all &channel:notifications:*# 创建可以调用特定函数的用户
ACL SETUSER func_user ON >funcpass ~* +@all %f:mycounter:incr_by_and_get# 使用键模式限制keys命令
ACL SETUSER admin ON >adminpass ~* +@all %K~user:*
Java配置示例
@Configuration
public class RedisSecurityConfig {@Beanpublic RedisConnectionFactory redisConnectionFactory() {LettuceConnectionFactory factory = new LettuceConnectionFactory();factory.setUsername("app_user");factory.setPassword("app_password");return factory;}@Bean@Profile("admin")public RedisSecurityService redisSecurityService(StringRedisTemplate redisTemplate) {return new RedisSecurityService(redisTemplate);}
}@Service
public class RedisSecurityService {private final StringRedisTemplate redisTemplate;public RedisSecurityService(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void createReadOnlyUser(String username, String password, List<String> keyPatterns) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password);// 添加键模式for (String pattern : keyPatterns) {aclCommand.append(" ~").append(pattern);}// 添加只读命令权限aclCommand.append(" +get +mget +scan +keys +hget +hgetall +zrange +scard +smembers +lrange +info");redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public void createFunctionUser(String username, String password, String libraryName, List<String> functions) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password).append(" ~*"); // 允许访问所有键// 添加基本命令权限aclCommand.append(" +@read +@write");// 添加函数调用权限for (String function : functions) {aclCommand.append(" %f:").append(libraryName).append(":").append(function);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public void createPubSubUser(String username, String password, List<String> channelPatterns, boolean publishOnly) {StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(username).append(" ON >").append(password).append(" ~*"); // 通常Pub/Sub用户不需要键访问权限,但根据需要可以调整// 添加Pub/Sub权限if (publishOnly) {aclCommand.append(" +publish");} else {aclCommand.append(" +publish +subscribe +psubscribe");}// 添加频道权限for (String pattern : channelPatterns) {aclCommand.append(" &").append(pattern);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}public List<Map<String, Object>> listUsers() {List<Object> result = (List<Object>) redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute("ACL LIST"));List<Map<String, Object>> users = new ArrayList<>();if (result != null) {for (Object userInfo : result) {String[] parts = userInfo.toString().split(" ");Map<String, Object> user = new HashMap<>();user.put("username", parts[1]);// 解析权限信息List<String> flags = new ArrayList<>();List<String> commands = new ArrayList<>();List<String> keys = new ArrayList<>();List<String> channels = new ArrayList<>();List<String> functions = new ArrayList<>();for (int i = 2; i < parts.length; i++) {String part = parts[i];if (part.startsWith("+") || part.startsWith("-")) {commands.add(part);} else if (part.startsWith("~")) {keys.add(part);} else if (part.startsWith("&")) {channels.add(part);} else if (part.startsWith("%")) {functions.add(part);} else if (part.equals("on") || part.equals("off")) {flags.add(part);}}user.put("flags", flags);user.put("commands", commands);user.put("keys", keys);user.put("channels", channels);user.put("functions", functions);users.add(user);}}return users;}
}

实际应用场景

1. 多租户SaaS应用

为不同租户创建隔离的Redis访问权限:

@Service
public class TenantRedisManager {private final RedisSecurityService redisSecurityService;public void setupNewTenant(String tenantId) {// 为租户创建只能访问自己数据的用户String username = "tenant_" + tenantId;String password = generateSecurePassword();// 键模式限制List<String> keyPatterns = Arrays.asList("tenant:" + tenantId + ":*","shared:public:*");// 创建用户redisSecurityService.createReadOnlyUser(username, password, keyPatterns);// 保存凭证(安全存储)credentialManager.storeTenantRedisCredentials(tenantId, username, password);// 创建具有写入权限的用户String adminUsername = "tenant_" + tenantId + "_admin";String adminPassword = generateSecurePassword();StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER ").append(adminUsername).append(" ON >").append(adminPassword);// 添加键模式for (String pattern : keyPatterns) {aclCommand.append(" ~").append(pattern);}// 添加完整读写权限aclCommand.append(" +@all");// 执行ACL命令redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));// 保存管理员凭证credentialManager.storeTenantRedisAdminCredentials(tenantId, adminUsername, adminPassword);}
}
2. 消息系统中的发布者与订阅者分离

在消息系统中创建不同角色的用户:

@Service
public class NotificationSystemManager {private final RedisSecurityService redisSecurityService;// 设置消息发布者public void setupPublisherAccount(String system, String password) {List<String> channels = Arrays.asList("notifications:" + system + ":*");redisSecurityService.createPubSubUser("publisher_" + system, password, channels, true  // 只有发布权限);}// 设置订阅者public void setupSubscriberAccount(String subscriberId, String password, List<String> systems) {List<String> channels = systems.stream().map(system -> "notifications:" + system + ":*").collect(Collectors.toList());StringBuilder aclCommand = new StringBuilder();aclCommand.append("ACL SETUSER subscriber_").append(subscriberId).append(" ON >").append(password).append(" ~*")  // 不需要键访问权限.append(" +subscribe +psubscribe");  // 只有订阅权限// 添加频道权限for (String channel : channels) {aclCommand.append(" &").append(channel);}redisTemplate.execute((RedisCallback<Object>) connection -> connection.execute(aclCommand.toString()));}
}
3. API网关限流功能

为API网关创建有限制的Redis Functions调用权限:

@Service
public class ApiGatewayRateLimiterService {private final RedisSecurityService redisSecurityService;private final StringRedisTemplate adminRedisTemplate;@PostConstructpublic void setupRateLimiter() {// 部署限流函数库String rateLimiterScript = """#!lua name=ratelimiterredis.register_function('check_rate_limit', function(keys, args)local key = keys[1]local limit = tonumber(args[1])local window = tonumber(args[2])local current = redis.call('INCR', key)if current == 1 thenredis.call('EXPIRE', key, window)endreturn current <= limitend)""";adminRedisTemplate.execute((RedisCallback<Object>) connection -> connection.serverCommands().functionLoad(rateLimiterScript));// 为API网关创建专用用户String gatewayUser = "api_gateway";String gatewayPassword = secureRandomPassword();List<String> functions = Arrays.asList("check_rate_limit");redisSecurityService.createFunctionUser(gatewayUser, gatewayPassword, "ratelimiter", functions);// 安全保存凭证configService.saveApiGatewayRedisCredentials(gatewayUser, gatewayPassword);}
}

最佳实践

  1. 最小权限原则:为每个用户仅分配必要的权限
  2. 密码复杂性:使用强密码,并定期轮换
  3. 功能分割:按照功能角色创建不同的用户,避免单一用户拥有过多权限
  4. 监控ACL操作:记录和审计ACL变更
  5. 分层权限模型:实现权限继承和组合,简化管理
  6. 定期审核:定期检查和清理不再使用的账户

特性五:客户端缓存增强

技术原理

Redis 7.0对客户端缓存(Client-side Caching)进行了增强,使其更加实用和高效。客户端缓存允许Redis客户端在本地缓存数据,通过服务器通知机制在数据变更时使缓存失效。Redis 7.0添加了对集群环境和哈希子字段的支持,扩展了这一功能的应用范围。

关键优势:

  • 减少网络请求和Redis服务器负载
  • 降低读取操作延迟
  • 支持分片集群环境的客户端缓存
  • 支持哈希字段级别的缓存控制
  • 改进的内存效率和缓存追踪

实现原理

客户端缓存有两种模式:

  1. 跟踪模式:Redis服务器记录每个客户端缓存的键,当键变更时发送通知
  2. 广播模式:服务器广播所有键的变更,客户端根据自己的缓存内容决定是否使缓存失效

Redis 7.0中的改进包括:

  • 集群环境中的缓存一致性
  • 哈希字段级别的追踪
  • 优化的内存使用

实现示例

使用Lettuce客户端实现
@Configuration
public class RedisClientCacheConfig {@Beanpublic ClientResources clientResources() {return ClientResources.builder().build();}@Beanpublic RedisClient redisClient(ClientResources clientResources) {return RedisClient.create(clientResources, RedisURI.create("redis://localhost:6379"));}@Beanpublic StatefulRedisConnection<String, String> connection(RedisClient redisClient) {return redisClient.connect();}@Beanpublic RedisCommands<String, String> redisCommands(StatefulRedisConnection<String, String> connection) {return connection.sync();}
}@Service
public class CachingRedisClient {private final RedisCommands<String, String> redis;private final StatefulRedisConnection<String, String> connection;private final Map<String, String> localCache = new ConcurrentHashMap<>();private final Set<String> trackedKeys = ConcurrentHashMap.newKeySet();public CachingRedisClient(RedisCommands<String, String> redis, StatefulRedisConnection<String, String> connection) {this.redis = redis;this.connection = connection;setupClientCaching();}private void setupClientCaching() {// 设置失效通知处理器connection.addListener(message -> {if (message instanceof PushMessage) {PushMessage pushMessage = (PushMessage) message;if ("invalidate".equals(pushMessage.getType())) {List<Object> invalidations = pushMessage.getContent();// 处理失效通知processInvalidations(invalidations);}}});// 启用客户端缓存,使用跟踪模式redis.clientTracking(ClientTrackingArgs.Builder.enabled().bcast().prefixes("user:", "product:").optIn());}public String get(String key) {// 先检查本地缓存String cachedValue = localCache.get(key);if (cachedValue != null) {return cachedValue;}// 本地缓存未命中,从Redis获取String value = redis.get(key);if (value != null) {// 存入本地缓存localCache.put(key, value);trackedKeys.add(key);}return value;}public void set(String key, String value) {// 更新Redisredis.set(key, value);// 更新本地缓存localCache.put(key, value);trackedKeys.add(key);}private void processInvalidations(List<Object> invalidations) {if (invalidations.size() >= 2) {String invalidationType = new String((byte[]) invalidations.get(0));if ("key".equals(invalidationType)) {// 单个键失效String key = new String((byte[]) invalidations.get(1));localCache.remove(key);trackedKeys.remove(key);} else if ("prefix".equals(invalidationType)) {// 前缀失效String prefix = new String((byte[]) invalidations.get(1));// 移除所有匹配前缀的缓存项Iterator<Map.Entry<String, String>> it = localCache.entrySet().iterator();while (it.hasNext()) {String key = it.next().getKey();if (key.startsWith(prefix)) {it.remove();trackedKeys.remove(key);}}}}}// 手动使缓存失效public void invalidateCache(String key) {localCache.remove(key);trackedKeys.remove(key);}// 获取缓存统计信息public Map<String, Object> getCacheStats() {Map<String, Object> stats = new HashMap<>();stats.put("cacheSize", localCache.size());stats.put("trackedKeysCount", trackedKeys.size());// 简单的统计信息Map<String, Integer> prefixCounts = new HashMap<>();for (String key : localCache.keySet()) {String prefix = key.split(":")[0] + ":";prefixCounts.put(prefix, prefixCounts.getOrDefault(prefix, 0) + 1);}stats.put("prefixCounts", prefixCounts);return stats;}
}

哈希字段级别缓存

@Service
public class HashFieldCachingService {private final RedisCommands<String, String> redis;private final StatefulRedisConnection<String, String> connection;private final Map<String, Map<String, String>> hashCache = new ConcurrentHashMap<>();public HashFieldCachingService(RedisCommands<String, String> redis, StatefulRedisConnection<String, String> connection) {this.redis = redis;this.connection = connection;setupClientCaching();}private void setupClientCaching() {// 设置失效通知处理器connection.addListener(message -> {if (message instanceof PushMessage) {PushMessage pushMessage = (PushMessage) message;if ("invalidate".equals(pushMessage.getType())) {List<Object> invalidations = pushMessage.getContent();// 处理失效通知processInvalidations(invalidations);}}});// 启用客户端缓存,使用跟踪模式redis.clientTracking(ClientTrackingArgs.Builder.enabled().prefixes("user:", "product:").optIn());}// 获取哈希字段public String hget(String key, String field) {// 先检查本地缓存Map<String, String> cachedHash = hashCache.get(key);if (cachedHash != null && cachedHash.containsKey(field)) {return cachedHash.get(field);}// 本地缓存未命中,从Redis获取String value = redis.hget(key, field);if (value != null) {// 存入本地缓存cachedHash = hashCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());cachedHash.put(field, value);}return value;}// 获取整个哈希public Map<String, String> hgetall(String key) {// 先检查本地缓存是否有完整哈希Map<String, String> cachedHash = hashCache.get(key);// 如果不存在或者不确定是否完整,从Redis获取Map<String, String> redisHash = redis.hgetall(key);if (!redisHash.isEmpty()) {// 更新本地缓存hashCache.put(key, new ConcurrentHashMap<>(redisHash));return redisHash;}return cachedHash != null ? cachedHash : new HashMap<>();}// 设置哈希字段public void hset(String key, String field, String value) {// 更新Redisredis.hset(key, field, value);// 更新本地缓存Map<String, String> cachedHash = hashCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());cachedHash.put(field, value);}private void processInvalidations(List<Object> invalidations) {if (invalidations.size() >= 2) {String invalidationType = new String((byte[]) invalidations.get(0));if ("key".equals(invalidationType)) {// 单个键失效String key = new String((byte[]) invalidations.get(1));hashCache.remove(key);} else if ("prefix".equals(invalidationType)) {// 前缀失效String prefix = new String((byte[]) invalidations.get(1));// 移除所有匹配前缀的缓存项hashCache.keySet().removeIf(key -> key.startsWith(prefix));}}}
}

实际应用场景

1. 用户配置文件管理

在需要频繁读取用户个人信息但写入较少的场景中:

@Service
public class UserProfileService {private final CachingRedisClient redisClient;// 获取用户资料public UserProfile getUserProfile(String userId) {String cacheKey = "user:" + userId + ":profile";// 利用客户端缓存获取数据String profileJson = redisClient.get(cacheKey);if (profileJson != null) {return objectMapper.readValue(profileJson, UserProfile.class);}return null;}// 更新用户资料public void updateUserProfile(String userId, UserProfile profile) {String cacheKey = "user:" + userId + ":profile";// 序列化为JSONString profileJson = objectMapper.writeValueAsString(profile);// 更新Redis,客户端缓存会自动更新redisClient.set(cacheKey, profileJson);// 记录审计日志logProfileUpdate(userId, profile);}// 批量获取多个用户资料public Map<String, UserProfile> getUserProfiles(List<String> userIds) {Map<String, UserProfile> results = new HashMap<>();for (String userId : userIds) {UserProfile profile = getUserProfile(userId);if (profile != null) {results.put(userId, profile);}}return results;}
}
2. 产品目录展示

电商平台中的产品信息缓存:

@Service
public class ProductCatalogService {private final HashFieldCachingService hashCache;// 获取产品基本信息public Product getProductBasicInfo(String productId) {String key = "product:" + productId;// 获取基本信息字段String name = hashCache.hget(key, "name");String price = hashCache.hget(key, "price");String category = hashCache.hget(key, "category");if (name != null && price != null) {Product product = new Product();product.setId(productId);product.setName(name);product.setPrice(Double.parseDouble(price));product.setCategory(category);return product;}return null;}// 获取产品完整信息public ProductDetails getProductDetails(String productId) {String key = "product:" + productId;// 获取完整哈希Map<String, String> productData = hashCache.hgetall(key);if (productData.isEmpty()) {return null;}// 构建产品详情对象ProductDetails details = new ProductDetails();details.setId(productId);details.setName(productData.get("name"));details.setPrice(Double.parseDouble(productData.get("price")));details.setCategory(productData.get("category"));details.setDescription(productData.get("description"));details.setBrand(productData.get("brand"));// 处理可选字段if (productData.containsKey("stock")) {details.setStock(Integer.parseInt(productData.get("stock")));}if (productData.containsKey("rating")) {details.setRating(Double.parseDouble(productData.get("rating")));}// 处理图片列表if (productData.containsKey("images")) {details.setImages(Arrays.asList(productData.get("images").split(",")));}return details;}// 更新产品价格public void updateProductPrice(String productId, double newPrice) {String key = "product:" + productId;hashCache.hset(key, "price", String.valueOf(newPrice));// 记录价格变更日志logPriceChange(productId, newPrice);}
}
3. 分布式配置管理

管理应用配置并实时同步更新:

@Service
public class DistributedConfigService {private final CachingRedisClient redisClient;private final Map<String, Map<String, ConfigValue>> configCache = new ConcurrentHashMap<>();// 获取配置值public String getConfigValue(String application, String key) {String cacheKey = "config:" + application + ":" + key;// 使用客户端缓存获取值String value = redisClient.get(cacheKey);if (value != null) {// 解析JSON值ConfigValue configValue = objectMapper.readValue(value, ConfigValue.class);return configValue.getValue();}return null;}// 更新配置值public void setConfigValue(String application, String key, String value) {String cacheKey = "config:" + application + ":" + key;// 创建带版本的配置值对象ConfigValue configValue = new ConfigValue();configValue.setValue(value);configValue.setVersion(System.currentTimeMillis());configValue.setUpdatedBy(getCurrentUser());// 序列化为JSONString valueJson = objectMapper.writeValueAsString(configValue);// 更新RedisredisClient.set(cacheKey, valueJson);// 发布配置变更事件publishConfigChangeEvent(application, key, value);}// 获取应用的所有配置public Map<String, String> getAllConfig(String application) {// 使用SCAN命令查找所有应用配置键Set<String> configKeys = scanKeys("config:" + application + ":*");Map<String, String> config = new HashMap<>();for (String fullKey : configKeys) {String key = fullKey.substring(("config:" + application + ":").length());String value = getConfigValue(application, key);if (value != null) {config.put(key, value);}}return config;}
}

最佳实践

  1. 选择合适的缓存模式:根据数据访问模式选择追踪或广播模式
  2. 控制缓存粒度:对于频繁变动的数据,考虑使用更细粒度的缓存
  3. 缓存大小管理:使用LRU或其他策略控制本地缓存大小
  4. 设置过期策略:为本地缓存设置合理的过期时间
  5. 优雅处理失效通知:实现健壮的失效处理逻辑
  6. 监控缓存效率:跟踪缓存命中率和内存使用情况

总结

Redis 7.0通过这五大核心特性:Redis Functions、分片Pub/Sub、多部分AOF、ACL增强以及客户端缓存优化,显著提升了Redis的功能性、性能和可靠性。

相关文章:

  • AI Agent 入门指南:从 LLM 到智能体
  • 人工智能如何革新数据可视化领域?探索未来趋势
  • 【编程干货】本地用 Ollama + LLaMA 3 实现 Model Context Protocol(MCP)对话服务
  • 手撕算法(1)
  • Python-map从基础到进阶
  • 42 python http之urllib库
  • Vue 自定义指令输入校验过滤
  • 【前缀和】矩阵区域和
  • Hadoop架构再探讨
  • 【MongoDB篇】MongoDB的聚合框架!
  • python刷题笔记:三目运算符的写法
  • 高等数学第五章---定积分(§5.1定积分的概念、性质和应用)
  • 【HFP】蓝牙语音通信高级功能解析:快速拨号与呼叫等待协议实现
  • 【日撸 Java 三百行】Day 4(条件语句实战——闰年问题)
  • ORACLE EBS 12.1 启用https 简单策略
  • STM32H743单片机实现ADC+DMA多通道检测+事件组
  • nut-list和nut-swipe搭配:nut-cell侧滑定义无法冒泡打开及bug(含代码、案例、截图)
  • 继电器负载知识
  • 内存的位运算
  • Dify - Stable Diffusion
  • 公募基金改革八大要点:建立浮动管理费收取机制、降低规模排名考核权重
  • 央行、证监会:科技创新债券含公司债券、企业债券、非金融企业债务融资工具等
  • 巴基斯坦所有主要城市宣布进入紧急状态,学校和教育机构停课
  • 青年与城市共成长,第六届上海创新创业青年50人论坛将举办
  • 欧盟官员:欧盟酝酿对美关税政策反制措施,包含所有选项
  • 两个灵魂,一支画笔,意大利艺术伴侣的上海灵感之旅