Redisson功能完整指南
目录
- 分布式锁和同步工具
- 分布式数据结构
- 原子操作和计数器
- 分布式对象和缓存
- 消息和发布订阅
- 高级数据结构
- 分布式服务
- 事务和批处理
- 地理空间数据
- 时间序列数据
一、分布式锁和同步工具
1.1 RLock(可重入分布式锁)
@Service
public class InventoryService {@Autowiredprivate RedissonClient redissonClient;public boolean reduceInventory(String productId, int quantity) {RLock lock = redissonClient.getLock("inventory:lock:" + productId);try {if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {int currentInventory = getCurrentInventory(productId);if (currentInventory >= quantity) {updateInventory(productId, currentInventory - quantity);return true;}return false;}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}return false;}
}
1.2 RReadWriteLock(分布式读写锁)
@Service
public class DocumentService {@Autowiredprivate RedissonClient redissonClient;public String readDocument(String documentId) {RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);RLock readLock = rwLock.readLock();try {if (readLock.tryLock(10, TimeUnit.SECONDS)) {return performDocumentRead(documentId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (readLock.isHeldByCurrentThread()) {readLock.unlock();}}return null;}public boolean updateDocument(String documentId, String content) {RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);RLock writeLock = rwLock.writeLock();try {if (writeLock.tryLock(10, TimeUnit.SECONDS)) {performDocumentWrite(documentId, content);return true;}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (writeLock.isHeldByCurrentThread()) {writeLock.unlock();}}return false;}
}
1.3 RFairLock(公平分布式锁)
@Service
public class TicketBookingService {@Autowiredprivate RedissonClient redissonClient;public boolean bookTicket(String eventId, String userId) {RFairLock fairLock = redissonClient.getFairLock("ticket:booking:" + eventId);try {if (fairLock.tryLock(30, 60, TimeUnit.SECONDS)) {int availableTickets = getAvailableTickets(eventId);if (availableTickets > 0) {reserveTicket(eventId, userId);updateAvailableTickets(eventId, availableTickets - 1);return true;}return false;}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (fairLock.isHeldByCurrentThread()) {fairLock.unlock();}}return false;}
}
1.4 RSemaphore(分布式信号量)
@Service
public class DownloadService {@Autowiredprivate RedissonClient redissonClient;public boolean downloadFile(String fileId) {RSemaphore semaphore = redissonClient.getSemaphore("download:limit");semaphore.trySetPermits(5); try {if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {try {performDownload(fileId);return true;} finally {semaphore.release();}}return false;} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}
}
二、分布式数据结构
2.1 RMap(分布式Map)
@Service
public class UserSessionService {@Autowiredprivate RedissonClient redissonClient;public void saveUserSession(String userId, UserSession session) {RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");sessionMap.put(userId, session);sessionMap.expire(Duration.ofHours(2)); }public UserSession getUserSession(String userId) {RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");return sessionMap.get(userId);}public void removeUserSession(String userId) {RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");sessionMap.remove(userId);}
}
2.2 RQueue(分布式队列)
@Service
public class OrderProcessingService {@Autowiredprivate RedissonClient redissonClient;public void submitOrder(Order order) {RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");orderQueue.offer(order);}@Scheduled(fixedDelay = 1000)public void processOrders() {RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");Order order = orderQueue.poll();if (order != null) {processOrder(order);}}
}
2.3 RDelayedQueue(延时队列)
@Service
public class DelayedNotificationService {@Autowiredprivate RedissonClient redissonClient;public void scheduleNotification(String message, int delayMinutes) {RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(destinationQueue);delayedQueue.offer(message, delayMinutes, TimeUnit.MINUTES);}@Scheduled(fixedDelay = 5000)public void processNotifications() {RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");String message = destinationQueue.poll();if (message != null) {sendNotification(message);}}
}
三、原子操作和计数器
3.1 RAtomicLong(原子长整型)
@Service
public class StatisticsService {@Autowiredprivate RedissonClient redissonClient;public long incrementPageViews(String pageId) {RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);return pageViews.incrementAndGet();}public long getPageViews(String pageId) {RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);return pageViews.get();}public void resetPageViews(String pageId) {RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);pageViews.set(0);}
}
3.2 RLongAdder(长整型累加器)
@Service
public class HighConcurrencyCounterService {@Autowiredprivate RedissonClient redissonClient;public void incrementCounter(String eventType) {RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);counter.increment();}public long getEventCount(String eventType) {RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);return counter.sum();}
}
四、分布式对象和缓存
4.1 RBucket(分布式对象桶)
@Service
public class ConfigurationService {@Autowiredprivate RedissonClient redissonClient;public void saveConfig(String key, Object config) {RBucket<Object> bucket = redissonClient.getBucket("config:" + key);bucket.set(config, Duration.ofDays(1)); }@SuppressWarnings("unchecked")public <T> T getConfig(String key, Class<T> type) {RBucket<T> bucket = redissonClient.getBucket("config:" + key);return bucket.get();}public boolean configExists(String key) {RBucket<Object> bucket = redissonClient.getBucket("config:" + key);return bucket.isExists();}
}
4.2 RMapCache(带过期时间的Map)
@Service
public class TokenService {@Autowiredprivate RedissonClient redissonClient;public void storeToken(String userId, String token, Duration expiry) {RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");tokenCache.put(userId, token, expiry.toMillis(), TimeUnit.MILLISECONDS);}public String getToken(String userId) {RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");return tokenCache.get(userId);}public void invalidateToken(String userId) {RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");tokenCache.remove(userId);}
}
五、消息和发布订阅
5.1 RTopic(主题发布订阅)
@Service
public class EventPublisher {@Autowiredprivate RedissonClient redissonClient;public void publishEvent(String eventType, Object eventData) {RTopic topic = redissonClient.getTopic("events:" + eventType);topic.publish(eventData);}
}@Component
public class EventSubscriber {@Autowiredprivate RedissonClient redissonClient;@PostConstructpublic void subscribeToEvents() {RTopic topic = redissonClient.getTopic("events:user");topic.addListener(String.class, (channel, message) -> {System.out.println("收到用户事件: " + message);handleUserEvent(message);});}
}
六、高级数据结构
6.1 RBloomFilter(布隆过滤器)
@Service
public class DuplicateCheckService {@Autowiredprivate RedissonClient redissonClient;@PostConstructpublic void initBloomFilter() {RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");bloomFilter.tryInit(1000000L, 0.03);}public boolean mightContain(String item) {RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");return bloomFilter.contains(item);}public void addItem(String item) {RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");bloomFilter.add(item);}
}
6.2 RRateLimiter(分布式限流器)
@Service
public class ApiRateLimiter {@Autowiredprivate RedissonClient redissonClient;public boolean isAllowed(String apiKey) {RRateLimiter rateLimiter = redissonClient.getRateLimiter("api:limit:" + apiKey);rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);return rateLimiter.tryAcquire(1);}
}
Redisson功能总结表
分布式锁类型对比
锁类型 | 特点 | 适用场景 |
---|
RLock | 可重入、支持超时 | 一般业务操作 |
RReadWriteLock | 读写分离 | 读多写少场景 |
RFairLock | 公平获取锁 | 避免线程饥饿 |
RMultiLock | 同时锁定多个资源 | 防止死锁的资源操作 |
RRedLock | 高安全性 | 关键业务操作 |
RSpinLock | 减少线程切换 | 短时间持有的操作 |
RFencedLock | 版本控制 | 需要检测锁有效性 |
数据结构特点
- RMap: 分布式HashMap,支持过期时间
- RQueue: 分布式队列,支持阻塞操作
- RDelayedQueue: 延时队列,适用于定时任务
- RAtomicLong: 原子计数器,支持高并发
- RBloomFilter: 布隆过滤器,快速去重检测
- RRateLimiter: 限流器,API限流控制
- RGeo: 地理位置数据,支持距离计算
- RTimeSeries: 时间序列数据,适用于监控指标
最佳实践建议
- 选择合适的锁类型:根据业务场景选择最适合的锁
- 设置合理的超时时间:避免死锁和长时间等待
- 正确释放资源:使用try-finally确保锁被释放
- 监控性能指标:关注锁的竞争情况和持有时间
- 合理使用缓存:根据数据访问模式选择缓存策略
七、分布式服务
7.1 RExecutorService(分布式执行器)
@Service
public class DistributedTaskService {@Autowiredprivate RedissonClient redissonClient;public void executeDistributedTask() {RExecutorService executor = redissonClient.getExecutorService("distributed:tasks");executor.submit(() -> {System.out.println("执行分布式任务: " + Thread.currentThread().getName());performBusinessLogic();return "任务完成";});}
}
八、事务和批处理
8.1 RBatch(批处理操作)
@Service
public class BatchOperationService {@Autowiredprivate RedissonClient redissonClient;public void batchUpdateUsers(List<User> users) {RBatch batch = redissonClient.createBatch();for (User user : users) {RMapAsync<String, User> userMapAsync = batch.getMap("users");userMapAsync.putAsync(user.getId(), user);RAtomicLongAsync counterAsync = batch.getAtomicLong("user:count");counterAsync.incrementAndGetAsync();}BatchResult<?> result = batch.execute();System.out.println("批处理完成,操作数: " + result.getResponses().size());}
}
九、地理空间数据
9.1 RGeo(地理空间数据结构)
@Service
public class LocationService {@Autowiredprivate RedissonClient redissonClient;public void addLocation(String locationId, double longitude, double latitude) {RGeo<String> geo = redissonClient.getGeo("locations");geo.add(longitude, latitude, locationId);}public List<String> findNearbyLocations(double longitude, double latitude, double radius) {RGeo<String> geo = redissonClient.getGeo("locations");return geo.radius(longitude, latitude, radius, GeoUnit.KILOMETERS);}public Double getDistance(String location1, String location2) {RGeo<String> geo = redissonClient.getGeo("locations");return geo.dist(location1, location2, GeoUnit.KILOMETERS);}
}
十、时间序列数据
10.1 RTimeSeries(时间序列)
@Service
public class MetricsService {@Autowiredprivate RedissonClient redissonClient;public void recordMetric(String metricName, double value) {RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);timeSeries.add(System.currentTimeMillis(), value);}public Map<Long, Double> getMetricRange(String metricName, long fromTime, long toTime) {RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);return timeSeries.range(fromTime, toTime);}
}
高级锁特性示例
锁的高级特性
@Service
public class AdvancedLockService {@Autowiredprivate RedissonClient redissonClient;public void demonstrateAdvancedFeatures(String resourceId) {RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);try {boolean isLocked = lock.isLocked();boolean isHeldByCurrentThread = lock.isHeldByCurrentThread();long remainTimeToLive = lock.remainTimeToLive();if (lock.tryLock(10, 300, TimeUnit.SECONDS)) {recursiveOperation(resourceId, 3);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}private void recursiveOperation(String resourceId, int depth) {if (depth <= 0) return;RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);try {if (lock.tryLock(5, TimeUnit.SECONDS)) {System.out.println("递归深度: " + depth + ", 持有计数: " + lock.getHoldCount());Thread.sleep(100);recursiveOperation(resourceId, depth - 1);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
CMS系统综合应用示例
页面管理综合服务
@Service
public class EnhancedPageManagementService {@Autowiredprivate RedissonClient redissonClient;public void managePageLifecycle(Long pageId) {RLock lock = redissonClient.getLock("page:manage:" + pageId);try {if (lock.tryLock(10, TimeUnit.SECONDS)) {RAtomicLong processCount = redissonClient.getAtomicLong("page:process:count");processCount.incrementAndGet();RSet<Long> activePages = redissonClient.getSet("pages:active");activePages.add(pageId);RMapCache<String, Object> configCache = redissonClient.getMapCache("page:config");configCache.put("page:" + pageId, getPageConfig(pageId), Duration.ofHours(1));RTopic statusTopic = redissonClient.getTopic("page:lifecycle:events");statusTopic.publish(Map.of("pageId", pageId, "action", "managed"));RQueue<Long> processQueue = redissonClient.getQueue("pages:process:queue");processQueue.offer(pageId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}