java-Milvus 连接池(多key)与自定义端点监听设计
文章目录
- 前言
- java-Milvus 连接池(多key)与自定义端点监听设计
- 1. MilvusClientV2Pool 是什么
- 2. MilvusClientV2Pool中的key
- 3. 连接池设计
- 4. 连接池暴露给 Actuator
- 5. Milvus连接池与自定义 Milvus 连接池端点demo
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
java-Milvus 连接池(多key)与自定义端点监听设计
1. MilvusClientV2Pool 是什么
MilvusClientV2Pool 是 Milvus 官方 Java SDK 提供的一个 连接池管理类,用于管理和复用 Milvus 连接客户端(MilvusClientV2 实例)
2. MilvusClientV2Pool中的key
MilvusClientV2Pool 是一个多键(multi-key)连接池管理器,这里的 key 是用来区分和管理不同“连接组”的标识符。
你可以把它理解为“连接池里的子池”的名字或分类标签,每个 key 对应一组单独的连接资源(即一批可用的 MilvusClientV2 实例),你调用 pool.getClient(key) 时,会从对应 key 的连接子池里获取连接。
3. 连接池设计
admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
划分的好处:
模块 | 连接池 | 说明 |
---|---|---|
admin-module + insert-module | 共用一个池 | 都偏向写操作、管理操作,不太频繁。写入时延和并发要求可控。 |
search-module | 独立池 | 查询操作频繁,对并发吞吐和时延更敏感,需要独立池保证稳定性。 |
4. 连接池暴露给 Actuator
添加依赖:
<!-- 添加 Spring Boot Actuator -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
启用端点配置:
management:endpoints:web:exposure:include: "*"
Spring Security 配置修改:
/**
* @description: TODO
* @author 杨镇宇
* @date 2024/7/12 16:01
* @version 1.0
*/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().antMatchers("/api/**").permitAll().antMatchers("/swagger-ui.html","/doc.html", "/webjars/**", "/swagger-ui/**","/swagger-resources/**", "/v2/api-docs").permitAll().antMatchers("/actuator/**").permitAll() // ✅ 添加这行:放行 actuator.anyRequest().authenticated().and().httpBasic().and().exceptionHandling().authenticationEntryPoint((request, response, authException) -> {System.out.println("Authentication failed: " + authException.getMessage());response.setContentType("application/json;charset=UTF-8");response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);response.getWriter().write("{\"error\":\"Authentication Failed\"}");}).and().csrf().disable();}}
5. Milvus连接池与自定义 Milvus 连接池端点demo
添加依赖:
<dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.9</version>
</dependency>
注意的是Milvus SDK 本身并不主动声明所有运行必须的依赖,尤其是:
- protobuf-java
- grpc-*
- commons-pool2
- 其他 Milvus SDK 内部依赖的底层库
所以需要处理一下:
参考https://central.sonatype.com/artifact/io.milvus/milvus-sdk-java/2.5.9
的Maven POM File
修改所有运行必须的依赖。
下面是我修改后的:
<dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.9</version><exclusions><exclusion><artifactId>commons-pool2</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>protobuf-java</artifactId><groupId>com.google.protobuf</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.12.0</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.25.5</version>
</dependency>
代码:
Milvus链接池bean配置:
package org.example.milvus.config;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.pool.PoolConfig;
import io.milvus.v2.client.ConnectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;/*** admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池*/
@Configuration
@Slf4j
public class MilvusConfig {private final MilvusProperties milvusProperties;public MilvusConfig(MilvusProperties milvusProperties){this.milvusProperties = milvusProperties;}/*** 创建并返回一个 ConnectConfig 实例,用于配置连接到 Milvus 的参数。* 该配置包括连接 URI 和认证 Token,是建立与 Milvus 服务连接的基础。** @return 返回一个初始化好的 ConnectConfig 对象*/@Beanpublic ConnectConfig connectConfig() {return ConnectConfig.builder().uri(milvusProperties.getUri()) // 设置 Milvus 服务的连接地址.token(milvusProperties.getToken()) // 设置访问 Milvus 的认证 Token.build(); // 构建并返回 ConnectConfig 实例}/*** 创建并返回一个 PoolConfig 实例,用于配置连接池参数。* 此配置定义了连接池中每个 key 的最大空闲连接数、最大总连接数,* 整体连接池的最大连接数,以及连接等待和回收的相关策略。** @return 返回一个初始化好的 PoolConfig 对象*/private static PoolConfig createBasePoolConfig (){return PoolConfig.builder().maxIdlePerKey(10) // 每个 key 的最大空闲连接数.maxTotalPerKey(20) // 每个 key 的最大总连接数.maxTotal(50) // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build(); // 构建并返回 PoolConfig 实例}/*** 创建并返回一个 MilvusClientV2Pool 实例,用于管理与 Milvus 的连接池。* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池。** @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息* @return 返回一个初始化好的 MilvusClientV2Pool 对象* @throws ClassNotFoundException 如果类未找到,抛出该异常* @throws NoSuchMethodException 如果方法不存在,抛出该异常*/@Bean(name = "searchMilvusPool")public MilvusClientV2Pool searchPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {PoolConfig poolConfig = PoolConfig.builder().maxIdlePerKey(10) // 每个 key 的最大空闲连接数.maxTotalPerKey(20) // 每个 key 的最大总连接数.maxTotal(50) // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build(); // 构建并返回 PoolConfig 实例return new MilvusClientV2Pool(poolConfig,connectConfig);}/*** 创建并返回一个用于插入操作的 MilvusClientV2Pool 实例。* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池,* 主要用于管理与 Milvus 服务进行插入操作时的连接资源。** @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息* @return 返回一个初始化好的 MilvusClientV2Pool 对象* @throws ClassNotFoundException 如果类未找到,抛出该异常* @throws NoSuchMethodException 如果方法不存在,抛出该异常*/@Bean(name = "commonMilvusPool")public MilvusClientV2Pool insertPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {PoolConfig poolConfig = PoolConfig.builder().maxIdlePerKey(10) // 每个 key 的最大空闲连接数.maxTotalPerKey(20) // 每个 key 的最大总连接数.maxTotal(50) // 连接池整体的最大连接数.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒).minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒).build(); // 构建并返回 PoolConfig 实例return new MilvusClientV2Pool(poolConfig,connectConfig);}}
package org.example.milvus.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/**
* @author 杨镇宇
* @date 2025/7/4 10:23
* @version 1.0
*/
@Component
@Data
@ConfigurationProperties(prefix = "milvus")
public class MilvusProperties {/*** 设置 Milvus 服务的连接地址*/private String uri = "http://localhost:19530";/*** 设置访问 Milvus 的认证 Token*/private String token = "root:Milvus";}
yml配置:
milvus:uri: http://localhost:19530token: root:Milvus
Milvus 连接池端点:
package org.example.milvus.config;import com.google.common.collect.Maps;
import io.milvus.pool.MilvusClientV2Pool;
import org.example.milvus.model.MilvusClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;/**
* @author 杨镇宇
* @date 2025/7/4 17:13
* @version 1.0
*/
@Component
@Endpoint(id = "milvusPool")
public class MilvusPoolEndpoint {private final MilvusClientV2Pool searchMilvusPool;private final MilvusClientV2Pool connectConfig;// 构造函数注入public MilvusPoolEndpoint(@Qualifier("searchMilvusPool") MilvusClientV2Pool searchMilvusPool,@Qualifier("commonMilvusPool") MilvusClientV2Pool commonMilvusPool) {this.searchMilvusPool = searchMilvusPool;this.connectConfig = commonMilvusPool;}@ReadOperationpublic Map<String, Object> milvusPoolStats() {Map<String, Object> result = Maps.newHashMap();List<String> commonKeys = Arrays.asList(MilvusClient.ADMIN_MODULE, MilvusClient.INSERT_MODULE);List<String> searchKeys = Collections.singletonList(MilvusClient.SEARCH_MODULE);for (String key : searchKeys) {result.put("searchPool_activeCount_" + key, searchMilvusPool.getActiveClientNumber(key));result.put("searchPool_idleCount_" + key, searchMilvusPool.getIdleClientNumber(key));}for (String key : commonKeys) {result.put("commonPool_activeCount_" + key, connectConfig.getActiveClientNumber(key));result.put("commonPool_idleCount_" + key, connectConfig.getIdleClientNumber(key));}return result;}
}
search-module 这个key的客户端:
package org.example.milvus.model;
/**
* @author 杨镇宇
* @date 2025/7/4 17:15
* @version 1.0
*/public interface MilvusClient {String ADMIN_MODULE = "admin-module";String INSERT_MODULE = "insert-module";String SEARCH_MODULE = "search-module";
}
package org.example.milvus.model;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.pool.MilvusClientV2Pool;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:44
* @version 1.0
*/@Component
public class MilvusSearchClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusSearchClient(@Qualifier("searchMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(SEARCH_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(SEARCH_MODULE, client);}
}
admin-module + insert-module 这两个key的客户端:
package org.example.milvus.model;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/@Component
public class MilvusInsertClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusInsertClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(INSERT_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(INSERT_MODULE, client);}
}
package org.example.milvus.model;import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/
@Component
public class MilvusAdminClient implements MilvusClient{private final MilvusClientV2 client;private final MilvusClientV2Pool pool;public MilvusAdminClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {this.pool = pool;this.client = pool.getClient(ADMIN_MODULE);}public MilvusClientV2 getClient() {return client;}@PreDestroypublic void close() {pool.returnClient(ADMIN_MODULE, client);}
}
效果:
https://127.0.0.1:13145/actuator/milvusPool
字段名 | 意义 |
---|---|
searchPool_activeCount_search-module = 1 | 当前 searchMilvusPool 池中 search-module 正在使用的连接数为 1 |
searchPool_idleCount_search-module = 0 | 当前 searchMilvusPool 池中 search-module 空闲连接数为 0 |
commonPool_activeCount_admin-module = 1 | 当前 commonMilvusPool 池中 admin-module 正在使用的连接数为 1 |
commonPool_idleCount_admin-module = 0 | 当前 commonMilvusPool 池中 admin-module 空闲连接数为 0 |
commonPool_activeCount_insert-module = 1 | 当前 commonMilvusPool 池中 insert-module 正在使用的连接数为 1 |
commonPool_idleCount_insert-module = 0 | 当前 commonMilvusPool 池中 insert-module 空闲连接数为 0 |