当前位置: 首页 > news >正文

java-Milvus 连接池(多key)与自定义端点监听设计

文章目录

  • 前言
    • java-Milvus 连接池(多key)与自定义端点监听设计
      • 1. MilvusClientV2Pool 是什么
      • 2. MilvusClientV2Pool中的key
      • 3. 连接池设计
      • 4. 连接池暴露给 Actuator
      • 5. Milvus连接池与自定义 Milvus 连接池端点demo

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


java-Milvus 连接池(多key)与自定义端点监听设计

1. MilvusClientV2Pool 是什么

MilvusClientV2Pool 是 Milvus 官方 Java SDK 提供的一个 连接池管理类,用于管理和复用 Milvus 连接客户端(MilvusClientV2 实例)

2. MilvusClientV2Pool中的key

MilvusClientV2Pool 是一个多键(multi-key)连接池管理器,这里的 key 是用来区分和管理不同“连接组”的标识符。

你可以把它理解为“连接池里的子池”的名字或分类标签,每个 key 对应一组单独的连接资源(即一批可用的 MilvusClientV2 实例),你调用 pool.getClient(key) 时,会从对应 key 的连接子池里获取连接。

3. 连接池设计

admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
划分的好处:

模块连接池说明
admin-module + insert-module共用一个池都偏向写操作、管理操作,不太频繁。写入时延和并发要求可控。
search-module独立池查询操作频繁,对并发吞吐和时延更敏感,需要独立池保证稳定性。

4. 连接池暴露给 Actuator

添加依赖:

<!-- 添加 Spring Boot Actuator -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

启用端点配置:

management:endpoints:web:exposure:include: "*"

Spring Security 配置修改:

/**
* @description: TODO
* @author 杨镇宇
* @date 2024/7/12 16:01
* @version 1.0
*/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().antMatchers("/api/**").permitAll().antMatchers("/swagger-ui.html","/doc.html", "/webjars/**", "/swagger-ui/**","/swagger-resources/**", "/v2/api-docs").permitAll().antMatchers("/actuator/**").permitAll()  // ✅ 添加这行:放行 actuator.anyRequest().authenticated().and().httpBasic().and().exceptionHandling().authenticationEntryPoint((request, response, authException) -> {System.out.println("Authentication failed: " + authException.getMessage());response.setContentType("application/json;charset=UTF-8");response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);response.getWriter().write("{\"error\":\"Authentication Failed\"}");}).and().csrf().disable();}}

5. Milvus连接池与自定义 Milvus 连接池端点demo

添加依赖:

<dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.9</version>
</dependency>

注意的是Milvus SDK 本身并不主动声明所有运行必须的依赖,尤其是:

  • protobuf-java
  • grpc-*
  • commons-pool2
  • 其他 Milvus SDK 内部依赖的底层库

所以需要处理一下:
参考https://central.sonatype.com/artifact/io.milvus/milvus-sdk-java/2.5.9
的Maven POM File
在这里插入图片描述

修改所有运行必须的依赖。

下面是我修改后的:

<dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.9</version><exclusions><exclusion><artifactId>commons-pool2</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>protobuf-java</artifactId><groupId>com.google.protobuf</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.12.0</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.25.5</version>
</dependency>

代码:
Milvus链接池bean配置:

package org.example.milvus.config;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.pool.PoolConfig;
import io.milvus.v2.client.ConnectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;/*** admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池*/
@Configuration
@Slf4j
public class MilvusConfig {private final MilvusProperties milvusProperties;public MilvusConfig(MilvusProperties milvusProperties){this.milvusProperties = milvusProperties;}/*** 创建并返回一个 ConnectConfig 实例,用于配置连接到 Milvus 的参数。* 该配置包括连接 URI 和认证 Token,是建立与 Milvus 服务连接的基础。** @return 返回一个初始化好的 ConnectConfig 对象*/@Beanpublic ConnectConfig connectConfig() {return ConnectConfig.builder().uri(milvusProperties.getUri())  // 设置 Milvus 服务的连接地址.token(milvusProperties.getToken())            // 设置访问 Milvus 的认证 Token.build();                         // 构建并返回 ConnectConfig 实例}/*** 创建并返回一个 PoolConfig 实例,用于配置连接池参数。* 此配置定义了连接池中每个 key 的最大空闲连接数、最大总连接数,* 整体连接池的最大连接数,以及连接等待和回收的相关策略。** @return 返回一个初始化好的 PoolConfig 对象*/private static PoolConfig createBasePoolConfig (){return PoolConfig.builder().maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数.maxTotalPerKey(20)                // 每个 key 的最大总连接数.maxTotal(50)                       // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build();                            // 构建并返回 PoolConfig 实例}/*** 创建并返回一个 MilvusClientV2Pool 实例,用于管理与 Milvus 的连接池。* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池。** @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息* @return 返回一个初始化好的 MilvusClientV2Pool 对象* @throws ClassNotFoundException 如果类未找到,抛出该异常* @throws NoSuchMethodException 如果方法不存在,抛出该异常*/@Bean(name = "searchMilvusPool")public MilvusClientV2Pool searchPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {PoolConfig poolConfig = PoolConfig.builder().maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数.maxTotalPerKey(20)                // 每个 key 的最大总连接数.maxTotal(50)                       // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build();                            // 构建并返回 PoolConfig 实例return new MilvusClientV2Pool(poolConfig,connectConfig);}/*** 创建并返回一个用于插入操作的 MilvusClientV2Pool 实例。* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池,* 主要用于管理与 Milvus 服务进行插入操作时的连接资源。** @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息* @return 返回一个初始化好的 MilvusClientV2Pool 对象* @throws ClassNotFoundException 如果类未找到,抛出该异常* @throws NoSuchMethodException 如果方法不存在,抛出该异常*/@Bean(name = "commonMilvusPool")public MilvusClientV2Pool insertPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {PoolConfig poolConfig = PoolConfig.builder().maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数.maxTotalPerKey(20)                // 每个 key 的最大总连接数.maxTotal(50)                       // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build();                            // 构建并返回 PoolConfig 实例return new MilvusClientV2Pool(poolConfig,connectConfig);}}
package org.example.milvus.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/**
* @author 杨镇宇
* @date 2025/7/4 10:23
* @version 1.0
*/
@Component
@Data
@ConfigurationProperties(prefix = "milvus")
public class MilvusProperties {/*** 设置 Milvus 服务的连接地址*/private String uri = "http://localhost:19530";/*** 设置访问 Milvus 的认证 Token*/private String token = "root:Milvus";}

yml配置:

milvus:uri: http://localhost:19530token: root:Milvus

Milvus 连接池端点:

package org.example.milvus.config;import com.google.common.collect.Maps;
import io.milvus.pool.MilvusClientV2Pool;
import org.example.milvus.model.MilvusClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;/**
* @author 杨镇宇
* @date 2025/7/4 17:13
* @version 1.0
*/
@Component
@Endpoint(id = "milvusPool")
public class MilvusPoolEndpoint {private final MilvusClientV2Pool searchMilvusPool;private final MilvusClientV2Pool connectConfig;// 构造函数注入public MilvusPoolEndpoint(@Qualifier("searchMilvusPool") MilvusClientV2Pool searchMilvusPool,@Qualifier("commonMilvusPool") MilvusClientV2Pool commonMilvusPool) {this.searchMilvusPool = searchMilvusPool;this.connectConfig = commonMilvusPool;}@ReadOperationpublic Map<String, Object> milvusPoolStats() {Map<String, Object> result = Maps.newHashMap();List<String> commonKeys = Arrays.asList(MilvusClient.ADMIN_MODULE, MilvusClient.INSERT_MODULE);List<String> searchKeys = Collections.singletonList(MilvusClient.SEARCH_MODULE);for (String key : searchKeys) {result.put("searchPool_activeCount_" + key, searchMilvusPool.getActiveClientNumber(key));result.put("searchPool_idleCount_" + key, searchMilvusPool.getIdleClientNumber(key));}for (String key : commonKeys) {result.put("commonPool_activeCount_" + key, connectConfig.getActiveClientNumber(key));result.put("commonPool_idleCount_" + key, connectConfig.getIdleClientNumber(key));}return result;}
}

search-module 这个key的客户端:

package org.example.milvus.model;
/**
* @author 杨镇宇
* @date 2025/7/4 17:15
* @version 1.0
*/public interface MilvusClient {String ADMIN_MODULE = "admin-module";String INSERT_MODULE = "insert-module";String SEARCH_MODULE = "search-module";
}
package org.example.milvus.model;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.pool.MilvusClientV2Pool;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:44
* @version 1.0
*/@Component
public class MilvusSearchClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusSearchClient(@Qualifier("searchMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(SEARCH_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(SEARCH_MODULE, client);}
}

admin-module + insert-module 这两个key的客户端:

package org.example.milvus.model;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/@Component
public class MilvusInsertClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusInsertClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(INSERT_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(INSERT_MODULE, client);}
}
package org.example.milvus.model;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/
@Component
public class MilvusAdminClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusAdminClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(ADMIN_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(ADMIN_MODULE, client);}
}

效果:
https://127.0.0.1:13145/actuator/milvusPool
在这里插入图片描述

字段名意义
searchPool_activeCount_search-module = 1当前 searchMilvusPool 池中 search-module 正在使用的连接数为 1
searchPool_idleCount_search-module = 0当前 searchMilvusPool 池中 search-module 空闲连接数为 0
commonPool_activeCount_admin-module = 1当前 commonMilvusPool 池中 admin-module 正在使用的连接数为 1
commonPool_idleCount_admin-module = 0当前 commonMilvusPool 池中 admin-module 空闲连接数为 0
commonPool_activeCount_insert-module = 1当前 commonMilvusPool 池中 insert-module 正在使用的连接数为 1
commonPool_idleCount_insert-module = 0当前 commonMilvusPool 池中 insert-module 空闲连接数为 0
http://www.dtcms.com/a/268631.html

相关文章:

  • C++开源项目—2048.cpp
  • 部署MongoDB
  • 接口漏洞怎么抓?Fiddler 中文版 + Postman + Wireshark 实战指南
  • 记录一个关于Maven配置TSF的报错问题
  • 基于 Three.js 开发三维引擎-02动态圆柱墙体实现
  • Python中50个常用的内置函数(2/2)
  • 剑指offer第2版:动态规划+记忆化搜索
  • 回溯题解——子集【LeetCode】输入的视角(选或不选)
  • YOLOv11模型轻量化挑战:边缘计算设备部署优化方案
  • FastAPI依赖注入:构建高可维护API的核心理念与实战
  • Modbus_TCP 客户端低版本指令(归档)
  • Hadoop 分布式存储与计算框架详解
  • Web后端开发-请求响应
  • NLP:文本特征处理和回译数据增强法
  • Mac-右键用 VS Code 打开文件夹
  • 【Echarts】“折线+柱状”实现双图表-家庭用电量可视化【文章附完整代码】
  • 泛微虚拟视图-数据虚拟化集成
  • 从库函数到API接口,深挖不同语言背后的“封装”与“调用”思想
  • pytest通过pytest_runtest_makereport添加失败截图到Allure报告中
  • 常见问题与最佳实践——AI教你学Docker
  • 1-Kafka介绍及常见应用场景
  • 学习基于springboot秒杀系统-环境配置(接口封装,mybatis,mysql,redis(Linux))
  • 2025年全国青少年信息素养大赛图形化(Scratch)编程小学低年级组初赛样题答案+解析
  • 登山第二十六梯:单目3D检测一切——一只眼看世界
  • 【C++开源库使用】使用libcurl开源库发送url请求(http请求)去下载用户头像文件(附完整源码)
  • 【R语言】 在读取 CSV 或 Excel 文件时的标准输出
  • 自定义简单线性回归模型
  • 【AI大模型】神经网络反向传播:核心原理与完整实现
  • 电脑电压过高的影响与风险分析
  • 轨迹优化 | 基于激光雷达的欧氏距离场ESDF地图构建(附ROS C++仿真)