当前位置: 首页 > news >正文

MongoDB 读写分离中 实现强制走主读注解

前言

在 MongoDB 的应用场景中,经常会出现更新数据后立即读取的情况。当启用读写分离时,默认的读操作会走从节点,但由于主从同步存在延迟,可能导致读取的数据不是最新版本。虽然 MongoDB 官方 JDBC 驱动提供了相关注解来解决这个问题,但在使用 Spring Data MongoDB 框架时,我并未找到类似的官方实现方案。因此,本文将对这一问题的解决方案进行记录。

实现原理

系统同时创建两个 MongoTemplate,一个连接主库(primary),一个连接从库(secondaryPreferred);
再通过 ProxyFactory 创建一个代理模板作为默认注入的 mongoTemplate
每次调用数据库方法时,代理会先检查 MongoRoutingContext 中的线程标记:若标记为主读则使用主库连接对象,否则使用从库连接对象;
由自定义注解 @MongoPrimaryRead 标记需要强制走主读的方法, 配合 AOP 在方法执行前做标记,再由代理对象路由到主库连接对象

连接示例

readPreference=secondaryPreferred 开启mongo从读偏好

spring.data.mongodb.uri=mongodb://root2:root%123456@192.168.0.1:27017,192.168.0.2:27017/xxxx?authSource=admin&replicaSet=rs0&readPreference=secondaryPreferred

代码实现

自定义注解 MongoPrimaryRead

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;import java.lang.annotation.ElementType;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 强制MongoDB操作使用主节点*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MongoPrimaryRead {
}

切面


import com.xxxx.common.utils.MongoRoutingContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;@Aspect
@Component
public class MongoPrimaryReadAspect {@Around("@annotation(MongoPrimaryRead)")public Object around(ProceedingJoinPoint pjp) throws Throwable {try {MongoRoutingContext.setUsePrimary();return pjp.proceed();} finally {MongoRoutingContext.clear();}}
}

线程标记工具类

public class MongoRoutingContext {private static final ThreadLocal<Boolean> USE_PRIMARY = new ThreadLocal<>();public static void setUsePrimary() { USE_PRIMARY.set(Boolean.TRUE); }public static boolean usePrimary() { return Boolean.TRUE.equals(USE_PRIMARY.get()); }public static void clear() { USE_PRIMARY.remove(); }
}

代理类配置


import com.xxxx.common.utils.MongoRoutingContext;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.core.MongoTemplate;import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;/*** 用途:路由MongoTemplate使用主实例或从实例(添加注解走主节点:@MongoPrimaryRead)* 说明:*  - 明确创建 defaultMongoClient / primaryMongoClient(各自设置 readPreference)*  - 基于两个 client 创建 defaultMongoTemplate / primaryMongoTemplate*  - 基于 defaultMongoTemplate 创建 routing 代理(@Primary,name="mongoTemplate")*  - 代理内部使用 MethodHandle 缓存来加速委派调用**  将此类加入项目后,业务中仍然可以直接 @Autowired MongoTemplate mongoTemplate;*  默认行为为 secondaryPreferred,方法上加 ThreadLocal 控制(如 @PrimaryRead + AOP)时会走 primary。*/
@Configuration
public class MongoRoutingConfiguration {// 缓存已绑定的 MethodHandle(分别为 default/primary delegate 使用)private final ConcurrentHashMap<Method, MethodHandle> defaultHandleCache = new ConcurrentHashMap<>();private final ConcurrentHashMap<Method, MethodHandle> primaryHandleCache = new ConcurrentHashMap<>();@Autowiredprivate MongoProperties mongoProperties;private String getDatabaseName() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());String db = cs.getDatabase();if (db == null || db.isEmpty()) {throw new IllegalArgumentException("MongoDB URI must contain database name!");}return db;}@Bean(destroyMethod = "close", name = "defaultMongoClient")public MongoClient defaultMongoClient() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(cs).readPreference(ReadPreference.secondaryPreferred()).build();return MongoClients.create(settings);}@Bean(destroyMethod = "close", name = "primaryMongoClient")public MongoClient primaryMongoClient() {ConnectionString cs = new ConnectionString(mongoProperties.determineUri());MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(cs).readPreference(ReadPreference.primary()).build();return MongoClients.create(settings);}@Bean(name = "defaultMongoTemplate")public MongoTemplate defaultMongoTemplate(@Qualifier("defaultMongoClient") MongoClient defaultMongoClient) {return new MongoTemplate(defaultMongoClient, getDatabaseName());}@Bean(name = "primaryMongoTemplate")public MongoTemplate primaryMongoTemplate(@Qualifier("primaryMongoClient") MongoClient primaryMongoClient) {return new MongoTemplate(primaryMongoClient, getDatabaseName());}/*** routing proxy:将其标为 @Primary 并命名为 "mongoTemplate"(业务中的 @Autowired MongoTemplate* 将注入此代理),代理在运行时根据 MongoRoutingContext.usePrimary() 选择 delegate。*/@Bean(name = "mongoTemplate")@Primarypublic MongoTemplate routingMongoTemplate(@Qualifier("defaultMongoTemplate") MongoTemplate defaultMongoTemplate,@Qualifier("primaryMongoTemplate") MongoTemplate primaryMongoTemplate) {ProxyFactory pf = new ProxyFactory(defaultMongoTemplate);pf.setProxyTargetClass(true);MethodInterceptor mi = new MethodInterceptor() {private final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {boolean usePrimary = MongoRoutingContext.usePrimary();MongoTemplate delegate = usePrimary ? primaryMongoTemplate : defaultMongoTemplate;Method method = invocation.getMethod();ConcurrentHashMap<Method, MethodHandle> cache = usePrimary ? primaryHandleCache : defaultHandleCache;//  使用 MethodHandle 缓存来加速委派调用MethodHandle handle = cache.get(method);if (handle == null) {try {MethodHandle unbound = LOOKUP.unreflect(method);handle = unbound.bindTo(delegate);MethodHandle prev = cache.putIfAbsent(method, handle);if (prev != null) handle = prev;} catch (IllegalAccessException ex) {// 反射兜底try {return method.invoke(delegate, invocation.getArguments());} catch (Exception e) {throw e.getCause() == null ? e : e.getCause();}}}try {return handle.invokeWithArguments(invocation.getArguments());} catch (Throwable t) {throw t;}}};pf.addAdvice(mi);return (MongoTemplate) pf.getProxy();}
}

至此便可通过使用自定义注解 @MongoPrimaryRead 对需要强制走主读的方法进行标记。从而实现强制主读。

实现的功能的方式有千千万,我这边实现的路子可能不够规范,各路大神如果发现有错误或更好的思路请不吝赐教。

http://www.dtcms.com/a/470407.html

相关文章:

  • Java-146 深入浅出 MongoDB 数据插入、批量写入、BSON 格式与逻辑查询and or not操作指南
  • EasyExcel实现普通导入导出以及按模板导出excel文件
  • ubuntu 24.10安装MongoDB
  • 开源新经济:Web4.0时代的社区激励模型
  • NXP iMX8MM ARM 平台 Weston RDP 远程桌面部署测试
  • 低代码的系统化演进:从工具逻辑到平台架构的技术解读
  • 告别“时间战“:清北AI原创学习力模型,开启教育效率革命
  • 东莞市电商网站建设做室内概念图的网站
  • PowerShell 递归目录文件名冲突检查脚本(不区分大小写)
  • STM32项目分享:基于STM32的泳池防溺水检测手环
  • 权威解析GEO优化:如何提升品牌在AI搜索中的曝光?
  • C语言与Java语言编译过程及文件类型
  • 基于SpringBoot的农产品(商城)销售系统
  • 有名的网站建设wordpress博客站模板下载
  • 网站打不开如何解决深圳企业网站建设服务中心
  • 专业的论坛网站建设开发wordpress静态化
  • hive的一些优化配置
  • 做网站一屏一屏的盖州网站建设
  • 佳木斯建设工程交易中心网站在龙港网站哪里做
  • 工具收集 - ContextMenuManager 右键管理
  • 【软件设计师中级】计算机组成与结构(六):系统性能评测与可靠性基础 - 衡量计算机的“尺子“与“保险“
  • 当游戏NPC有了“灵魂”,网易伏羲解码游戏智能交互场景新实践
  • 热更新:移动应用的“空中加油”技术-详解什么是热更新?-优雅草卓伊凡卓伊凡的挑战
  • 【GD32】软、硬件I2C对比
  • YMODEM 协议介绍以及通信流程分析和Lua语言实现
  • 视频直播点播平台EasyDSS如何助力餐饮行业实现“明厨亮灶”直播?
  • 通过网站做外贸广告公司有哪些
  • 关于网站建设的好处seo搜索优化邵阳
  • 百家号淄博圻谷网站建设做网站页面一般用什么软件
  • CCF-GESP 等级考试 2024年3月认证C++三级真题解析