JAVA如何实现Redis同步
在Java中实现多个Redis实例之间的数据同步,主要有四种主流方案。
一、基于双写的同步策略(可重试)
这种方式简单,但需要处理错误和一致性。我们可以使用事务或补偿机制来尽量保证一致性,但无法完全保证(分布式事务的难题)。
import org.springframework.data.redis.connection.RedisConnectionFailureException;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Logger;public class RetryableDualWriteRedisSync {private static final Logger log = Logger.getLogger(RetryableDualWriteRedisSync.class.getName());private static final int MAX_RETRIES = 3;private static final long INITIAL_DELAY_MS = 100;private final RedisTemplate<String, Object> primaryRedis;private final RedisTemplate<String, Object> secondaryRedis;private final RedisTemplate<String, Object> tertiaryRedis;private final ExecutorService executorService;public RetryableDualWriteRedisSync(RedisTemplate<String, Object> primaryRedis,RedisTemplate<String, Object> secondaryRedis,RedisTemplate<String, Object> tertiaryRedis,ExecutorService executorService) {this.primaryRedis = primaryRedis;this.secondaryRedis = secondaryRedis;this.tertiaryRedis = tertiaryRedis;this.executorService = executorService;}public RetryableDualWriteRedisSync(RedisTemplate<String, Object> primaryRedis,RedisTemplate<String, Object> secondaryRedis,RedisTemplate<String, Object> tertiaryRedis) {this(primaryRedis, secondaryRedis, tertiaryRedis, Executors.newFixedThreadPool(3));}@Transactionalpublic void syncSet(String key, Object value) {// 主库同步写入executeWithRetry(primaryRedis, redis -> redis.opsForValue().set(key, value), "Primary");// 从库异步写入(带重试)CompletableFuture.runAsync(() -> executeWithRetry(secondaryRedis, redis -> redis.opsForValue().set(key, value), "Secondary"),executorService).exceptionally(e -> {log.error("Secondary Redis最终写入失败", e);return null;});// 第三个库异步写入(带重试)CompletableFuture.runAsync(() -> executeWithRetry(tertiaryRedis, redis -> redis.opsForValue().set(key, value), "Tertiary"),executorService).exceptionally(e -> {log.error("Tertiary Redis最终写入失败", e);return null;});}public void syncDelete(String key) {// 主库同步删除executeWithRetry(primaryRedis, redis -> redis.delete(key), "Primary");// 从库异步删除(带重试)CompletableFuture.runAsync(() -> executeWithRetry(secondaryRedis, redis -> redis.delete(key), "Secondary"),executorService).exceptionally(e -> {log.error("Secondary Redis最终删除失败", e);return null;});// 第三个库异步删除(带重试)CompletableFuture.runAsync(() -> executeWithRetry(tertiaryRedis, redis -> redis.delete(key), "Tertiary"),executorService).exceptionally(e -> {log.error("Tertiary Redis最终删除失败", e);return null;});}private void executeWithRetry(RedisTemplate<String, Object> redis,Consumer<RedisTemplate<String, Object>> operation,String redisName) {int attempts = 0;while (attempts < MAX_RETRIES) {try {operation.accept(redis);log.info("{} Redis操作成功", redisName);return;} catch (RedisConnectionFailureException e) {attempts++;long delay = INITIAL_DELAY_MS * (long) Math.pow(2, attempts - 1);log.warn("{} Redis连接失败,重试 {}/{},等待 {}ms", redisName, attempts, MAX_RETRIES, delay);try {Thread.sleep(delay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();log.warning("重试等待被中断");break;}} catch (Exception e) {log.error("{} Redis操作非连接异常,不再重试", redisName, e);break;}}log.error("{} Redis操作失败,达到最大重试次数", redisName);// 可添加降级策略或事件通知}public void shutdown() {if (executorService != null && !executorService.isShutdown()) {executorService.shutdown();}}
}
调用示例:
// 配置和初始化
@Configuration
public class RedisConfig {@Beanpublic RetryableDualWriteRedisSync redisSync(RedisTemplate<String, Object> primaryRedis,RedisTemplate<String, Object> secondaryRedis,RedisTemplate<String, Object> tertiaryRedis) {return new RetryableDualWriteRedisSync(primaryRedis, secondaryRedis, tertiaryRedis);}
}// 使用
@Service
public class MyService {@Autowiredprivate RetryableDualWriteRedisSync redisSync;public void updateData(String key, Object value) {// 更新数据库逻辑...// 同步到多个 RedisredisSync.syncSet(key, value);}
}
二、基于Redis Pub/Sub的同步方案(实时性强)
基于 Redis 的发布 / 订阅(Pub/Sub)机制实现数据同步是一种更可靠的方案,可以替代之前的双写模式,减少直接操作多个 Redis 实例带来的复杂性和一致性问题。下面是一个基于 Redis Pub/Sub 的同步方案实现:
架构设计:
+-------------+ +---------------+ +---------------+
| 业务服务 | | Redis 主节点 | | Redis 从节点 |
| | | | | |
| writeData()|--->| publish(key) |--->| subscribe() |
| | | | | updateLocal |
+-------------+ +---------------+ +---------------+
代码实现:
1.消息发布者(主服务)
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class RedisMessagePublisher {private static final String CHANNEL = "redis:sync-channel";private final RedisTemplate<String, Object> redisTemplate;public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;}public void publishSyncMessage(String key, Object value) {// 构建同步消息RedisSyncMessage message = new RedisSyncMessage(key, value, System.currentTimeMillis());// 发布消息到频道redisTemplate.convertAndSend(CHANNEL, message);// 在主库执行写入(也可以在发布前执行)redisTemplate.opsForValue().set(key, value);}
}// 消息实体类
class RedisSyncMessage {private String key;private Object value;private long timestamp;// 构造方法、Getter/Setter 略
}
2.消息订阅者(从服务)
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;@Component
public class RedisMessageSubscriber implements MessageListener {private final RedisTemplate<String, Object> localRedisTemplate;public RedisMessageSubscriber(RedisTemplate<String, Object> localRedisTemplate) {this.localRedisTemplate = localRedisTemplate;}@Overridepublic void onMessage(Message message, byte[] pattern) {// 反序列化消息RedisSyncMessage syncMessage = (RedisSyncMessage) localRedisTemplate.getValueSerializer().deserialize(message.getBody());// 更新本地 RedislocalRedisTemplate.opsForValue().set(syncMessage.getKey(), syncMessage.getValue());// 记录同步日志log.info("同步 Redis 数据: key={}, timestamp={}", syncMessage.getKey(), syncMessage.getTimestamp());}
}
3.配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configuration
public class RedisPubSubConfig {private static final String CHANNEL = "redis:sync-channel";@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new ChannelTopic(CHANNEL));return container;}@Beanpublic MessageListenerAdapter listenerAdapter(RedisMessageSubscriber subscriber) {return new MessageListenerAdapter(subscriber, "onMessage");}
}
4.业务服务集成
@Service
public class BusinessService {@Autowiredprivate RedisMessagePublisher redisPublisher;@Transactionalpublic void updateData(String key, Object value) {// 1. 更新数据库databaseService.update(key, value);// 2. 发布 Redis 同步消息redisPublisher.publishSyncMessage(key, value);}
}
相比于第一种方式,这种方式存在诸多优势,其一,解耦多实例操作:主服务只需操作一个 Redis 实例并发布消息,从服务独立订阅并更新本地数据,降低了代码复杂度;其二,可靠性提升,Redis Pub/Sub 保证消息按顺序发送和接收,可结合 Redis Stream 实现持久化消息队列,进一步提高可靠性,从节点故障恢复后可重新订阅丢失的消息;其三,可轻松添加新的从节点,无需修改主服务代码。
但需要注意的是,其一,Redis Pub/Sub 默认不保证消息持久化,若从节点在发布时离线,可能丢失消息,可改用 Redis Stream 或结合持久化存储(如数据库)记录同步状态;其二,从节点更新存在延迟,需业务层接受 “最终一致性”,而非强一致性;其三,订阅者需处理可能的异常(如 Redis 连接失败),并实现重试机制。
三、基于消息队列的同步方案(较可靠)
我们可以将需要同步的数据变更作为消息发送到消息队列,然后由多个消费者分别写入各自的Redis。这里我们以RabbitMQ为例来实现。
架构设计:
+-------------+ +---------------+ +---------------+ +---------------+
| 业务服务 | | RabbitMQ | | 消费者服务 | | Redis 从节点 |
| | | (Exchange) | | | | |
| writeData()|--->| publish(msg) |--->| consume(msg) |--->| updateRedis |
+-------------+ +---------------+ +---------------+ +---------------+
代码实现:
1.消息生产者(业务服务)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;@Service
public class RedisSyncProducer {private static final String EXCHANGE = "redis.sync.exchange";private static final String ROUTING_KEY = "redis.sync.key";private final RabbitTemplate rabbitTemplate;public RedisSyncProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendSyncMessage(String key, Object value, OperationType operationType) {RedisSyncMessage message = new RedisSyncMessage(UUID.randomUUID().toString(), // 唯一消息IDkey,value,System.currentTimeMillis(),operationType);// 发送消息到 RabbitMQrabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message);// 在主库执行写入(也可放在发送前,根据业务需求调整)primaryRedis.opsForValue().set(key, value);}
}// 消息实体类(与之前类似)
@Data
@AllArgsConstructor
@NoArgsConstructor
class RedisSyncMessage {private String messageId;private String key;private Object value;private long timestamp;private OperationType operationType;
}enum OperationType {SET, DELETE
}
2.消息消费者(同步服务)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class RedisSyncConsumer {private final RedisTemplate<String, Object> redisTemplate;private final ProcessedMessageService processedMessageService; // 幂等性服务public RedisSyncConsumer(RedisTemplate<String, Object> redisTemplate,ProcessedMessageService processedMessageService) {this.redisTemplate = redisTemplate;this.processedMessageService = processedMessageService;}@RabbitListener(queues = "redis.sync.queue")public void consume(RedisSyncMessage message) {// 1. 幂等性检查if (processedMessageService.isProcessed(message.getMessageId())) {return;}try {// 2. 根据操作类型执行 Redis 操作switch (message.getOperationType()) {case SET:redisTemplate.opsForValue().set(message.getKey(), message.getValue());break;case DELETE:redisTemplate.delete(message.getKey());break;}// 3. 标记消息为已处理processedMessageService.markAsProcessed(message.getMessageId());} catch (Exception e) {// 4. 异常处理(可重试或记录失败)log.error("处理 Redis 同步消息失败: {}", message, e);throw new RuntimeException("Redis 同步失败", e);}}
}
3.RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE = "redis.sync.exchange";public static final String QUEUE = "redis.sync.queue";public static final String ROUTING_KEY = "redis.sync.key";// 声明交换机@Beanpublic DirectExchange redisSyncExchange() {return new DirectExchange(EXCHANGE, true, false);}// 声明队列@Beanpublic Queue redisSyncQueue() {// 持久化队列return new Queue(QUEUE, true, false, false);}// 绑定队列和交换机@Beanpublic Binding binding() {return BindingBuilder.bind(redisSyncQueue()).to(redisSyncExchange()).with(ROUTING_KEY);}
}
4.消费者配置(重试与异常处理)
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;@Configuration
public class RabbitMQConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory =new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 配置重试策略(3次重试,指数退避)RetryTemplate retryTemplate = new RetryTemplate();ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(500); // 初始间隔500msbackOffPolicy.setMultiplier(2.0); // 每次重试间隔翻倍backOffPolicy.setMaxInterval(5000); // 最大间隔5秒SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3); // 最多重试3次retryTemplate.setRetryPolicy(retryPolicy);retryTemplate.setBackOffPolicy(backOffPolicy);factory.setRetryTemplate(retryTemplate);factory.setRecoveryCallback(new RejectAndDontRequeueRecoverer()); // 重试失败后拒绝消息// 手动确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
}
5.幂等性保障
@Service
public class ProcessedMessageService {private final RedisTemplate<String, String> redisTemplate;private static final String PROCESSED_MSG_KEY = "processed:redis-sync:";public ProcessedMessageService(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;}// 判断消息是否已处理public boolean isProcessed(String messageId) {return redisTemplate.opsForValue().get(PROCESSED_MSG_KEY + messageId) != null;}// 标记消息为已处理(设置过期时间)public void markAsProcessed(String messageId) {redisTemplate.opsForValue().set(PROCESSED_MSG_KEY + messageId, "processed", 24, TimeUnit.HOURS);}
}
6.RabbitMQ高级配置(可选)
(1)私信队列配置
当消息消费失败且达到最大重试次数后,将消息发送到死信队列,便于后续人工处理:
@Configuration
public class RabbitMQDLQConfig {public static final String DLQ_QUEUE = "redis.sync.dlq.queue";public static final String DLQ_EXCHANGE = "redis.sync.dlq.exchange";public static final String DLQ_ROUTING_KEY = "redis.sync.dlq.key";// 声明死信交换机@Beanpublic DirectExchange dlqExchange() {return new DirectExchange(DLQ_EXCHANGE, true, false);}// 声明死信队列@Beanpublic Queue dlqQueue() {return new Queue(DLQ_QUEUE, true, false, false);}// 绑定死信队列和死信交换机@Beanpublic Binding dlqBinding() {return BindingBuilder.bind(dlqQueue()).to(dlqExchange()).with(DLQ_ROUTING_KEY);}// 在原队列配置中添加死信队列参数@Beanpublic Queue redisSyncQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLQ_EXCHANGE);args.put("x-dead-letter-routing-key", DLQ_ROUTING_KEY);return new Queue(QUEUE, true, false, false, args);}
}
(2)消息持久化配置
确保消息不会因 RabbitMQ 重启而丢失:
// 在生产者发送消息时设置持久化
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;
});
通过 RabbitMQ 实现 Redis 同步,可获得更高的可靠性和更灵活的路由机制,适合对数据一致性要求较高的业务场景。
四、使用Redis Replica(官方推荐)
使用 Redis 主从复制(Redis Replica)实现数据同步是 Redis 原生支持的方案,通过配置主节点(Master)和从节点(Replica),让从节点自动复制主节点的数据,无需额外开发同步逻辑,适合对数据一致性和可靠性有基础要求的场景。
数据同步流程:
- 从节点启动时,会向主节点发送
SYNC
命令(Redis 2.8+ 用PSYNC
优化增量同步)。 - 主节点生成 RDB 快照并发送给从节点,从节点加载快照完成全量同步。
- 之后主节点的所有写操作(如
SET
、DEL
等)会通过 复制积压缓冲区(replication backlog) 异步发送给从节点,从节点执行相同命令完成增量同步,保持数据一致。
特点:
- 原生支持:无需额外中间件,Redis 内置实现。
- 读写分离:主节点负责写操作,从节点负责读操作,分担主节点压力。
- 高可用基础:结合哨兵(Sentinel)可实现主节点故障时自动切换到从节点,提升可用性。
配置Redis主从复制:
1.主节点(Master)配置(redis.conf)
主节点默认无需特殊配置(默认允许从节点连接),可按需设置密码或绑定 IP:
# 主节点端口(默认6379)
port 6379
# 绑定IP(允许远程连接可设为 0.0.0.0)
bind 0.0.0.0
# 密码(若需认证)
requirepass "master_password"
# 开启AOF持久化(可选,增强数据安全性)
appendonly yes
2.从节点(Replica)配置(redis.conf)
# 从节点端口(如6380)
port 6380
# 绑定IP
bind 0.0.0.0
# 配置主节点地址和端口
replicaof 127.0.0.1 6379
# 主节点密码(若主节点设置了requirepass)
masterauth "master_password"
# 从节点只读(默认开启,避免从节点写入数据)
replica-read-only yes
3.启动与验证
- 先启动主节点:
redis-server redis-master.conf
- 再启动从节点:
redis-server redis-replica.conf
- 验证主从关系:
连接主节点执行info replication
,查看connected_slaves
字段是否包含从节点信息;
连接从节点执行info replication
,查看role
是否为slave
,master_host
和master_port
是否正确。
结合业务代码的使用方式:
1.区分读写操作
在代码中通过不同的 RedisTemplate
连接主从节点,主节点负责写,从节点负责读:
@Configuration
public class RedisReplicaConfig {// 主节点(写操作)@Bean("masterRedisTemplate")public RedisTemplate<String, Object> masterRedisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(masterConnectionFactory());// 序列化配置(省略)return template;}@Beanpublic RedisConnectionFactory masterConnectionFactory() {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("主节点IP");config.setPort(6379);config.setPassword(RedisPassword.of("master_password"));return new LettuceConnectionFactory(config);}// 从节点(读操作)@Bean("replicaRedisTemplate")public RedisTemplate<String, Object> replicaRedisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(replicaConnectionFactory());// 序列化配置(省略)return template;}@Beanpublic RedisConnectionFactory replicaConnectionFactory() {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("从节点IP");config.setPort(6380);config.setPassword(RedisPassword.of("master_password")); // 与主节点密码一致return new LettuceConnectionFactory(config);}
}
2.业务层使用
@Service
public class BusinessService {@Autowired@Qualifier("masterRedisTemplate")private RedisTemplate<String, Object> masterRedis; // 写操作@Autowired@Qualifier("replicaRedisTemplate")private RedisTemplate<String, Object> replicaRedis; // 读操作// 写操作(走主节点)public void setData(String key, Object value) {masterRedis.opsForValue().set(key, value);}// 读操作(走从节点)public Object getData(String key) {return replicaRedis.opsForValue().get(key);}
}
3.哨兵配置(可选)
生产环境建议搭配哨兵(Sentinel)使用,配置示例:
# 哨兵配置(sentinel.conf)
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2 # 监控主节点,2个哨兵认为主节点宕机则触发故障转移
sentinel down-after-milliseconds mymaster 30000 # 30秒未响应视为宕机
sentinel auth-pass mymaster master_password # 主节点密码
优势:
- 原生高效:Redis 主从复制基于内存快照和命令传播,性能损耗低,同步延迟小(毫秒级)。
- 高可用性:结合哨兵(Sentinel)可实现自动故障转移(主节点宕机后,哨兵选举从节点升为主节点)。
- 读写分离:从节点分担读压力,提升系统整体吞吐量。
- 部署简单:无需额外中间件,仅通过配置文件即可实现,运维成本低。
五、总结
Redis 主从复制(Replica)是最原生、最高效的 Redis 同步方案,适合中小规模系统或对部署复杂度敏感的场景。若需更高可靠性(如跨机房同步)或强一致性,可结合消息队列方案作为补充(如主节点写入后发送消息到队列,从节点消费消息校验数据)。
方案选型建议:
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
双写 | 同步实例少(<3) | 实现简单、延迟低 | 一致性难保证 |
Pub/Sub | 实时性要求高 | 低延迟、无需中间件 | 可靠性较低 |
消息队列 | 大规模集群 | 高可靠、支持重试 | 架构复杂 |
Redis Replica | 主从架构 | 官方支持、高效 | 单向同步 |
Redis Replica+消息队列 | 复杂环境 | 灵活、可靠 | 实现复杂 |