MongoDB 读写分离中 实现强制走主读注解
前言
在 MongoDB 的应用场景中,经常会出现更新数据后立即读取的情况。当启用读写分离时,默认的读操作会走从节点,但由于主从同步存在延迟,可能导致读取的数据不是最新版本。虽然 MongoDB 官方 JDBC 驱动提供了相关注解来解决这个问题,但在使用 Spring Data MongoDB 框架时,我并未找到类似的官方实现方案。因此,本文将对这一问题的解决方案进行记录。
实现原理
系统同时创建两个 MongoTemplate,一个连接主库(primary),一个连接从库(secondaryPreferred);
再通过 ProxyFactory 创建一个代理模板作为默认注入的 mongoTemplate;
每次调用数据库方法时,代理会先检查 MongoRoutingContext 中的线程标记:若标记为主读则使用主库连接对象,否则使用从库连接对象;
由自定义注解 @MongoPrimaryRead 标记需要强制走主读的方法, 配合 AOP 在方法执行前做标记,再由代理对象路由到主库连接对象;
连接示例
readPreference=secondaryPreferred 开启mongo从读偏好
spring.data.mongodb.uri=mongodb://root2:root%123456@192.168.0.1:27017,192.168.0.2:27017/xxxx?authSource=admin&replicaSet=rs0&readPreference=secondaryPreferred
代码实现
自定义注解 MongoPrimaryRead
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;import java.lang.annotation.ElementType;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 强制MongoDB操作使用主节点*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MongoPrimaryRead {
}
切面
import com.xxxx.common.utils.MongoRoutingContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;@Aspect
@Component
public class MongoPrimaryReadAspect {@Around("@annotation(MongoPrimaryRead)")public Object around(ProceedingJoinPoint pjp) throws Throwable {try {MongoRoutingContext.setUsePrimary();return pjp.proceed();} finally {MongoRoutingContext.clear();}}
}
线程标记工具类
public class MongoRoutingContext {private static final ThreadLocal<Boolean> USE_PRIMARY = new ThreadLocal<>();public static void setUsePrimary() { USE_PRIMARY.set(Boolean.TRUE); }public static boolean usePrimary() { return Boolean.TRUE.equals(USE_PRIMARY.get()); }public static void clear() { USE_PRIMARY.remove(); }
}
代理类配置
import com.xxxx.common.utils.MongoRoutingContext;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.core.MongoTemplate;import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;/*** 用途:路由MongoTemplate使用主实例或从实例(添加注解走主节点:@MongoPrimaryRead)* 说明:* - 明确创建 defaultMongoClient / primaryMongoClient(各自设置 readPreference)* - 基于两个 client 创建 defaultMongoTemplate / primaryMongoTemplate* - 基于 defaultMongoTemplate 创建 routing 代理(@Primary,name="mongoTemplate")* - 代理内部使用 MethodHandle 缓存来加速委派调用** 将此类加入项目后,业务中仍然可以直接 @Autowired MongoTemplate mongoTemplate;* 默认行为为 secondaryPreferred,方法上加 ThreadLocal 控制(如 @PrimaryRead + AOP)时会走 primary。*/
@Configuration
public class MongoRoutingConfiguration {// 缓存已绑定的 MethodHandle(分别为 default/primary delegate 使用)private final ConcurrentHashMap<Method, MethodHandle> defaultHandleCache = new ConcurrentHashMap<>();private final ConcurrentHashMap<Method, MethodHandle> primaryHandleCache = new ConcurrentHashMap<>();@Autowiredprivate MongoProperties mongoProperties;private String getDatabaseName() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());String db = cs.getDatabase();if (db == null || db.isEmpty()) {throw new IllegalArgumentException("MongoDB URI must contain database name!");}return db;}@Bean(destroyMethod = "close", name = "defaultMongoClient")public MongoClient defaultMongoClient() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(cs).readPreference(ReadPreference.secondaryPreferred()).build();return MongoClients.create(settings);}@Bean(destroyMethod = "close", name = "primaryMongoClient")public MongoClient primaryMongoClient() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(cs).readPreference(ReadPreference.primary()).build();return MongoClients.create(settings);}@Bean(name = "defaultMongoTemplate")public MongoTemplate defaultMongoTemplate(@Qualifier("defaultMongoClient") MongoClient defaultMongoClient) {return new MongoTemplate(defaultMongoClient, getDatabaseName());}@Bean(name = "primaryMongoTemplate")public MongoTemplate primaryMongoTemplate(@Qualifier("primaryMongoClient") MongoClient primaryMongoClient) {return new MongoTemplate(primaryMongoClient, getDatabaseName());}/*** routing proxy:将其标为 @Primary 并命名为 "mongoTemplate"(业务中的 @Autowired MongoTemplate* 将注入此代理),代理在运行时根据 MongoRoutingContext.usePrimary() 选择 delegate。*/@Bean(name = "mongoTemplate")@Primarypublic MongoTemplate routingMongoTemplate(@Qualifier("defaultMongoTemplate") MongoTemplate defaultMongoTemplate,@Qualifier("primaryMongoTemplate") MongoTemplate primaryMongoTemplate) {ProxyFactory pf = new ProxyFactory(defaultMongoTemplate);pf.setProxyTargetClass(true);MethodInterceptor mi = new MethodInterceptor() {private final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {boolean usePrimary = MongoRoutingContext.usePrimary();MongoTemplate delegate = usePrimary ? primaryMongoTemplate : defaultMongoTemplate;Method method = invocation.getMethod();ConcurrentHashMap<Method, MethodHandle> cache = usePrimary ? primaryHandleCache : defaultHandleCache;// 使用 MethodHandle 缓存来加速委派调用MethodHandle handle = cache.get(method);if (handle == null) {try {MethodHandle unbound = LOOKUP.unreflect(method);handle = unbound.bindTo(delegate);MethodHandle prev = cache.putIfAbsent(method, handle);if (prev != null) handle = prev;} catch (IllegalAccessException ex) {// 反射兜底try {return method.invoke(delegate, invocation.getArguments());} catch (Exception e) {throw e.getCause() == null ? e : e.getCause();}}}try {return handle.invokeWithArguments(invocation.getArguments());} catch (Throwable t) {throw t;}}};pf.addAdvice(mi);return (MongoTemplate) pf.getProxy();}
}
至此便可通过使用自定义注解 @MongoPrimaryRead 对需要强制走主读的方法进行标记。从而实现强制主读。
实现的功能的方式有千千万,我这边实现的路子可能不够规范,各路大神如果发现有错误或更好的思路请不吝赐教。