Redis面试
redis大Key怎么处理
1、拆分大Key,按业务维度拆分
案例:比如key=userId,value=用户全部信息
改进:用户信息拆成不同的Key,每个Key存的内容不一样。
// 用户基础信息 → user:base:{id}
// 用户扩展信息 → user:ext:{id}
// 用户统计信息 → user:stats:{id}
2、使用合适的数据结构
public class DataStructureOptimization {// 不良设计:用String存储大型JSONredis.set("product:123", "{...几万字的JSON...}");// 优化设计:用Hash存储,按需获取字段public void optimizeProductStorage() {// 拆分为多个Hash字段//hset的三个参数:key: 哈希表的键、field: 字段名、value: 字段值redis.hset("product:123", "base_info", "{基本信息}");redis.hset("product:123", "description", "{商品描述}");redis.hset("product:123", "specs", "{规格参数}");// 查询的时候,按需获取,避免传输整个大对象String baseInfo = redis.hget("product:123", "base_info");}
}
3、设置过期时间和淘汰策略
# 针对可能成为的大Key,设置TTL
SET large:cache:data "value" EX 3600 # 1小时后过期# 配置适当的内存淘汰策略
CONFIG SET maxmemory-policy allkeys-lru
问题
分布式锁:保障设置锁和释放锁是同一个线程。
1、设置锁时,在值中存入唯一标识(例如UUID、线程ID等),确保每个锁的值为唯一。
2、释放锁时,先比较当前锁的值是否与设置时存入的值一致,如果一致,则删除锁;否则,不删除。
加锁
public class RedisDistributedLock {private Jedis jedis;public String tryLock(String lockKey, long expireTime) {// 生成唯一标识String lockValue = UUID.randomUUID().toString();// 使用SET命令,并设置过期时间String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime);if ("OK".equals(result)) {return lockValue; // 获取锁成功,返回唯一标识}return null; // 获取锁失败}
}
释放锁
//为什么使用Lua,主要是为了判断redis的值是否匹配,同时做删除动作。
public class RedisDistributedLock {// 释放锁的Lua脚本,其中KEYS[1]就是key,ARGV[1]就是value。private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";public boolean unlock(String lockKey, String lockValue) {// 使用Lua脚本释放锁,确保原子性Object result = jedis.eval(UNLOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(lockValue));// 如果返回1,表示删除成功;返回0,表示锁的值不匹配或锁已不存在return Long.valueOf(1).equals(result);}
}
使用示例
public class Example {public void businessMethod() {String lockKey = "order:123";String lockValue = null;try {// 尝试获取锁lockValue = redisLock.tryLock(lockKey, 30000);if (lockValue != null) {// 执行业务代码// ...}} finally {if (lockValue != null) {// 释放锁redisLock.unlock(lockKey, lockValue);}}}
}
关键点说明
1、唯一标识:使用UUID生成锁的值,确保每个线程的锁值唯一。
2、原子性加锁:使用SET命令的NX和PX选项,保证设置值和过期时间的原子性。不至于设置锁后,处理失败,导致一直不释放。
3、原子性解锁:使用Lua脚本,将比较和删除操作原子化,避免误删。因为有个查询比较和和删除的动作。
注意事项:
过期时间:设置合理的过期时间,防止业务执行时间过长导致锁自动释放而业务还在执行的问题。
可重入:上述锁不可重入,如需可重入,需记录重入次数,可以使用Redis的Hash结构,并在Lua脚本中处理重入逻辑。
集群环境:在Redis集群环境下,需要考虑故障转移带来的问题,此时可以使用Redlock算法。
问题:怎么实现,当获取锁失败,怎么实现超时退出,因为redis是不支持设置超时时间的。
原理:通过while循环获取锁,超时结束循环。
expireMs过去时间,waitMs等待超时时间。
public boolean tryLock(String key, long expireMs, long waitMs) throws InterruptedException {String clientId = UUID();long deadline = System.currentTimeMillis() + waitMs;//计算等待截止时间。while (System.currentTimeMillis() < deadline) {try (Jedis jedis = jedisPool.getResource()) {String result = jedis.set(key, clientId, "NX", "PX", expireMs);if ("OK".equals(result)) {// 记录锁信息storeLockInfo(key, clientId, expireMs);return true;}// 锁被占用,等待重试几秒,后来在尝试获取。Thread.sleep(Math.min(100, waitMs / 10));}}return false;
}
问题:如果拿到锁,任务还没执行完,key过期了怎么办。也就是key续期。
原理:看门狗自动续期。通过延迟线程池。比如:5秒钟执行一次Task,每次过期时间+15秒。
startWatchdog(key,value,过期时间);public class LockWatchdog {private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private final Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();private void startWatchdog(String key, String clientId, long expireMs) {// 在锁过期前1/3时间开始续期long renewalInterval = expireMs * 2 / 3;ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {if (!renewLock(key, clientId, expireMs)) { //获取锁,获取成功说明设置过期时间成功。// 续期失败,可能是锁已释放或丢失stopWatchdog(key);System.err.println("锁续期失败: " + key);}}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);renewalTasks.put(key, future); //往延迟线程中丢一个Task进去,包含(首次延迟多少秒,每次间隔多少秒秒执行一次,直到task执行完。)也就是说可以续期多次。只要是没执行完,就会续期。}//获取锁,如果value一样,说明获取成功,并设置过期时间,通过LUA实现原子性。private boolean renewLock(String key, String clientId, long expireMs) {try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +" return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +" return 0 " +"end";Object result=jedis.eval(script,Collections.singletonList(key), Arrays.asList(clientId, String.valueOf(expireMs)));return Long.valueOf(1).equals(result);} catch (Exception e) {return false;}}
}
问题:如果要支持重入,怎么实现。可重入锁实现
原理:通过ThreadLocal,记录当前线程重入次数。释放也是一样。
public class ReentrantRedisLock {private final ThreadLocal<Map<String, Integer>> reentrantCount = ThreadLocal.withInitial(HashMap::new);public boolean tryLock(String key, long expireMs) {Map<String, Integer> counts = reentrantCount.get();Integer count = counts.get(key);if (count != null && count > 0) {// 重入:增加计数counts.put(key, count + 1);return true;}// 首次获取锁if (acquireNewLock(key, expireMs)) {counts.put(key, 1);return true;}return false;}public boolean unlock(String key) {Map<String, Integer> counts = reentrantCount.get();Integer currentCount = counts.get(key);if (currentCount == null || currentCount <= 0) {throw new IllegalStateException("未持有锁: " + key);}if (currentCount > 1) {// 重入次数减1,不释放Redis锁counts.put(key, currentCount - 1);return true;} else {// 最后一次,释放Redis锁counts.remove(key);return doUnlock(key);}}
}
问题:多实例分布式锁可重入实现
原理:通过redis中的Hash来实现,多个线程抢同一个Key,如果抢到Map中的Key就是实例Ip+线程ID+重入次数。
# 锁的存储结构:Hash类型
Key: lock:order:1234
Field: 实例ID:线程ID:重入次数
Value: 当前持有者信息 + 时间戳
# 示例:
key = lock:order:1234
1) "192.168.1.100:8080-123-1" # 字段:实例-线程-重入次数
2) "{\"clientId\":\"192.168.1.100:8080-123\",\"timestamp\":1634567890000}"
