java-Milvus 连接池(多key)与自定义端点监听设计

发布于:2025-07-08 ⋅ 阅读:(21) ⋅ 点赞:(0)

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


java-Milvus 连接池(多key)与自定义端点监听设计

1. MilvusClientV2Pool 是什么

MilvusClientV2Pool 是 Milvus 官方 Java SDK 提供的一个 连接池管理类,用于管理和复用 Milvus 连接客户端(MilvusClientV2 实例)

2. MilvusClientV2Pool中的key

MilvusClientV2Pool 是一个多键(multi-key)连接池管理器,这里的 key 是用来区分和管理不同“连接组”的标识符。

你可以把它理解为“连接池里的子池”的名字或分类标签,每个 key 对应一组单独的连接资源(即一批可用的 MilvusClientV2 实例),你调用 pool.getClient(key) 时,会从对应 key 的连接子池里获取连接。

3. 连接池设计

admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
划分的好处:

模块 连接池 说明
admin-module + insert-module 共用一个池 都偏向写操作、管理操作,不太频繁。写入时延和并发要求可控。
search-module 独立池 查询操作频繁,对并发吞吐和时延更敏感,需要独立池保证稳定性。

4. 连接池暴露给 Actuator

添加依赖:

<!-- 添加 Spring Boot Actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

启用端点配置:

management:
  endpoints:
    web:
      exposure:
        include: "*"

Spring Security 配置修改:

/**
* @description: TODO
* @author 杨镇宇
* @date 2024/7/12 16:01
* @version 1.0
*/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {


    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
                .authorizeRequests()
                .antMatchers("/api/**").permitAll()
                .antMatchers("/swagger-ui.html","/doc.html", "/webjars/**", "/swagger-ui/**",
                        "/swagger-resources/**", "/v2/api-docs").permitAll()
                .antMatchers("/actuator/**").permitAll()  // ✅ 添加这行:放行 actuator
                .anyRequest().authenticated()
                .and()
                .httpBasic()
                .and()
                .exceptionHandling()
                .authenticationEntryPoint((request, response, authException) -> {
                    System.out.println("Authentication failed: " + authException.getMessage());
                    response.setContentType("application/json;charset=UTF-8");
                    response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
                    response.getWriter().write("{\"error\":\"Authentication Failed\"}");
                })
                .and()
                .csrf().disable();
    }

}

5. Milvus连接池与自定义 Milvus 连接池端点demo

添加依赖:

<dependency>
    <groupId>io.milvus</groupId>
    <artifactId>milvus-sdk-java</artifactId>
    <version>2.5.9</version>
</dependency>

注意的是Milvus SDK 本身并不主动声明所有运行必须的依赖,尤其是:

  • protobuf-java
  • grpc-*
  • commons-pool2
  • 其他 Milvus SDK 内部依赖的底层库

所以需要处理一下:
参考https://central.sonatype.com/artifact/io.milvus/milvus-sdk-java/2.5.9
的Maven POM File
在这里插入图片描述

修改所有运行必须的依赖。

下面是我修改后的:



<dependency>
    <groupId>io.milvus</groupId>
    <artifactId>milvus-sdk-java</artifactId>
    <version>2.5.9</version>
    <exclusions>
        <exclusion>
            <artifactId>commons-pool2</artifactId>
            <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
            <artifactId>protobuf-java</artifactId>
            <groupId>com.google.protobuf</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.25.5</version>
</dependency>

代码:
Milvus链接池bean配置:

package org.example.milvus.config;


import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.pool.PoolConfig;
import io.milvus.v2.client.ConnectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




import java.time.Duration;




/**
 * admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
 */
@Configuration
@Slf4j
public class MilvusConfig {




    private final MilvusProperties milvusProperties;
    public MilvusConfig(MilvusProperties milvusProperties){
        this.milvusProperties = milvusProperties;
    }
    /**
     * 创建并返回一个 ConnectConfig 实例,用于配置连接到 Milvus 的参数。
     * 该配置包括连接 URI 和认证 Token,是建立与 Milvus 服务连接的基础。
     *
     * @return 返回一个初始化好的 ConnectConfig 对象
     */
    @Bean
    public ConnectConfig connectConfig() {
        return ConnectConfig.builder()
                .uri(milvusProperties.getUri())  // 设置 Milvus 服务的连接地址
                .token(milvusProperties.getToken())            // 设置访问 Milvus 的认证 Token
                .build();                         // 构建并返回 ConnectConfig 实例
    }




    /**
     * 创建并返回一个 PoolConfig 实例,用于配置连接池参数。
     * 此配置定义了连接池中每个 key 的最大空闲连接数、最大总连接数,
     * 整体连接池的最大连接数,以及连接等待和回收的相关策略。
     *
     * @return 返回一个初始化好的 PoolConfig 对象
     */
    private static PoolConfig createBasePoolConfig (){
        return PoolConfig.builder()
                .maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数
                .maxTotalPerKey(20)                // 每个 key 的最大总连接数
                .maxTotal(50)                       // 连接池整体的最大连接数
                .maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒)
                .minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
                .build();                            // 构建并返回 PoolConfig 实例
    }




    /**
     * 创建并返回一个 MilvusClientV2Pool 实例,用于管理与 Milvus 的连接池。
     * 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池。
     *
     * @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息
     * @return 返回一个初始化好的 MilvusClientV2Pool 对象
     * @throws ClassNotFoundException 如果类未找到,抛出该异常
     * @throws NoSuchMethodException 如果方法不存在,抛出该异常
     */
    @Bean(name = "searchMilvusPool")
    public MilvusClientV2Pool searchPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {
        PoolConfig poolConfig = PoolConfig.builder()
                .maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数
                .maxTotalPerKey(20)                // 每个 key 的最大总连接数
                .maxTotal(50)                       // 连接池整体的最大连接数
                .maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒)
                .minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
                .build();                            // 构建并返回 PoolConfig 实例
        return new MilvusClientV2Pool(
                poolConfig,
                connectConfig
        );
    }




    /**
     * 创建并返回一个用于插入操作的 MilvusClientV2Pool 实例。
     * 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池,
     * 主要用于管理与 Milvus 服务进行插入操作时的连接资源。
     *
     * @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息
     * @return 返回一个初始化好的 MilvusClientV2Pool 对象
     * @throws ClassNotFoundException 如果类未找到,抛出该异常
     * @throws NoSuchMethodException 如果方法不存在,抛出该异常
     */
    @Bean(name = "commonMilvusPool")
    public MilvusClientV2Pool insertPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {
        PoolConfig poolConfig = PoolConfig.builder()
                .maxIdlePerKey(10)                  // 每个 key 的最大空闲连接数
                .maxTotalPerKey(20)                // 每个 key 的最大总连接数
                .maxTotal(50)                       // 连接池整体的最大连接数
                .maxBlockWaitDuration(Duration.ofSeconds(3))  // 最大阻塞等待时间(3秒)
                .minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
                .build();                            // 构建并返回 PoolConfig 实例
        return new MilvusClientV2Pool(
                poolConfig,
                connectConfig
        );
    }


}
package org.example.milvus.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* @author 杨镇宇
* @date 2025/7/4 10:23
* @version 1.0
*/
@Component
@Data
@ConfigurationProperties(prefix = "milvus")
public class MilvusProperties {

    /**
     * 设置 Milvus 服务的连接地址
     */
    private String uri = "http://localhost:19530";

    /**
     * 设置访问 Milvus 的认证 Token
     */
    private String token = "root:Milvus";


}

yml配置:

milvus:
  uri: http://localhost:19530
  token: root:Milvus

Milvus 连接池端点:

package org.example.milvus.config;

import com.google.common.collect.Maps;
import io.milvus.pool.MilvusClientV2Pool;
import org.example.milvus.model.MilvusClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;


/**
* @author 杨镇宇
* @date 2025/7/4 17:13
* @version 1.0
*/
@Component
@Endpoint(id = "milvusPool")
public class MilvusPoolEndpoint {
    private final MilvusClientV2Pool searchMilvusPool;
    private final MilvusClientV2Pool connectConfig;

    // 构造函数注入
    public MilvusPoolEndpoint(
            @Qualifier("searchMilvusPool") MilvusClientV2Pool searchMilvusPool,
            @Qualifier("commonMilvusPool") MilvusClientV2Pool commonMilvusPool) {
        this.searchMilvusPool = searchMilvusPool;
        this.connectConfig = commonMilvusPool;
    }

    @ReadOperation
    public Map<String, Object> milvusPoolStats() {
        Map<String, Object> result = Maps.newHashMap();
        List<String> commonKeys = Arrays.asList(MilvusClient.ADMIN_MODULE, MilvusClient.INSERT_MODULE);
        List<String> searchKeys = Collections.singletonList(MilvusClient.SEARCH_MODULE);

        for (String key : searchKeys) {
            result.put("searchPool_activeCount_" + key, searchMilvusPool.getActiveClientNumber(key));
            result.put("searchPool_idleCount_" + key, searchMilvusPool.getIdleClientNumber(key));
        }

        for (String key : commonKeys) {
            result.put("commonPool_activeCount_" + key, connectConfig.getActiveClientNumber(key));
            result.put("commonPool_idleCount_" + key, connectConfig.getIdleClientNumber(key));
        }

        return result;
    }
}

search-module 这个key的客户端:

package org.example.milvus.model;
/**
* @author 杨镇宇
* @date 2025/7/4 17:15
* @version 1.0
*/

public interface MilvusClient {
    String ADMIN_MODULE = "admin-module";
    String INSERT_MODULE = "insert-module";
    String SEARCH_MODULE = "search-module";
}
package org.example.milvus.model;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.pool.MilvusClientV2Pool;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:44
* @version 1.0
*/

@Component
public class MilvusSearchClient implements MilvusClient{

    private final MilvusClientV2 client;
    private final MilvusClientV2Pool pool;

    public MilvusSearchClient(@Qualifier("searchMilvusPool") MilvusClientV2Pool pool) {
        this.pool = pool;
        this.client = pool.getClient(SEARCH_MODULE);
    }
    public MilvusClientV2 getClient() {
        return client;
    }
    @PreDestroy
    public void close() {
        pool.returnClient(SEARCH_MODULE, client);
    }
}

admin-module + insert-module 这两个key的客户端:

package org.example.milvus.model;

import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;

/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/


@Component
public class MilvusInsertClient implements MilvusClient{

    private final MilvusClientV2 client;
    private final MilvusClientV2Pool pool;

    public MilvusInsertClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {
        this.pool = pool;
        this.client = pool.getClient(INSERT_MODULE);
    }

    public MilvusClientV2 getClient() {
        return client;
    }

    @PreDestroy
    public void close() {
        pool.returnClient(INSERT_MODULE, client);
    }
}

package org.example.milvus.model;

import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;


/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/
@Component
public class MilvusAdminClient implements MilvusClient{


    private final MilvusClientV2 client;
    private final MilvusClientV2Pool pool;

    public MilvusAdminClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {
        this.pool = pool;
        this.client = pool.getClient(ADMIN_MODULE);
    }

    public MilvusClientV2 getClient() {
        return client;
    }

    @PreDestroy
    public void close() {
        pool.returnClient(ADMIN_MODULE, client);
    }
}

效果:
https://127.0.0.1:13145/actuator/milvusPool
在这里插入图片描述

字段名 意义
searchPool_activeCount_search-module = 1 当前 searchMilvusPool 池中 search-module 正在使用的连接数为 1
searchPool_idleCount_search-module = 0 当前 searchMilvusPool 池中 search-module 空闲连接数为 0
commonPool_activeCount_admin-module = 1 当前 commonMilvusPool 池中 admin-module 正在使用的连接数为 1
commonPool_idleCount_admin-module = 0 当前 commonMilvusPool 池中 admin-module 空闲连接数为 0
commonPool_activeCount_insert-module = 1 当前 commonMilvusPool 池中 insert-module 正在使用的连接数为 1
commonPool_idleCount_insert-module = 0 当前 commonMilvusPool 池中 insert-module 空闲连接数为 0

网站公告

今日签到

点亮在社区的每一天
去签到