Milvus 数据操作全解析:从连接到混合查询

发布于:2025-08-29 ⋅ 阅读:(20) ⋅ 点赞:(0)

1. 链接Milvus

import random
from datetime import datetime
from time import sleep

from pymilvus import MilvusClient, AnnSearchRequest, RRFRanker

# client = MilvusClient(
#     uri="http://localhost:19530",
#     token="root:Milvus"
# )


from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# 连接Milvus
connections.connect(
    # alias="cluster",
    alias="default",
    host="localhost",
    port=19530
)

# 检查连接
print("服务器版本:", utility.get_server_version())

2. 创建包含多种标量和向量字段的集合

def create_collection():
    # 定义复杂的字段结构
    fields = [
        # 主键字段
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),

        # 向量字段
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
        FieldSchema(name="keyword_embedding", dtype=DataType.FLOAT_VECTOR, dim=64),

        # 标量字段
        FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="subcategory", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="price", dtype=DataType.FLOAT),
        FieldSchema(name="rating", dtype=DataType.INT64),
        FieldSchema(name="is_active", dtype=DataType.BOOL),
        FieldSchema(name="created_time", dtype=DataType.INT64),  # 时间戳
        FieldSchema(name="tags", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=100, max_length=32)
    ]

    # 创建集合模式
    schema = CollectionSchema(
        fields,
        description="完整的标量和向量操作示例"
    )

    # 创建集合
    collection_name = "full_example_collection"
    collection = Collection(
        name=collection_name,
        schema=schema,
        using='default',
        num_shards=2
    )

    print(f"集合 '{collection_name}' 创建成功")
    return collection

3. 获取集合

def get_collection():
    collection = Collection(name="full_example_collection", using="default")

    # 3. 打印集合信息
    print("集合名称:", collection.name)
    print("是否自动 ID:", collection.schema.auto_id)
    print("主键字段:", collection.primary_field.name)
    print("所有字段:", [f.name for f in collection.schema.fields])

    # 加载集合到内存(搜索前必须 load)
    collection.load()
    return collection


# 获取集合
collection = get_collection()

4. 创建索引

def create_index(collection: Collection):
    # 为主向量字段创建索引
    vector_index_params = {
        "index_type": "IVF_FLAT",
        "metric_type": "L2",
        "params": {"nlist": 128}
    }

    collection.create_index(
        field_name="embedding",
        index_params=vector_index_params
    )

    # 为关键词向量字段创建索引
    keyword_index_params = {
        "index_type": "IVF_FLAT",
        "metric_type": "IP",  # 内积相似度
        "params": {"nlist": 64}
    }

    collection.create_index(
        field_name="keyword_embedding",
        index_params=keyword_index_params
    )

    print("向量索引创建完成")

    # 标量字段索引(Milvus 2.3+支持)
    # 注意:标量索引主要用于加速过滤操作
    scalar_index_params = {
        "index_type": "INVERTED"
    }

    # 为常用过滤字段创建标量索引
    collection.create_index(field_name="category", index_params=scalar_index_params)
    collection.create_index(field_name="subcategory", index_params=scalar_index_params)
    collection.create_index(field_name="is_active", index_params=scalar_index_params)
    collection.create_index(field_name="rating", index_params=scalar_index_params)

    print("标量索引创建完成")

5. 插入数据

def generate_complex_data(num_entities):
    data = []

    for i in range(num_entities):
        # ID
        id_val = i

        # 向量数据
        embedding = [random.random() for _ in range(128)]
        keyword_embedding = [random.random() for _ in range(64)]

        # 标量数据
        title = f"商品标题_{i}"
        content = f"这是第{i}个商品的详细描述内容,包含丰富的信息..."
        category = random.choice(["电子产品", "服装", "食品", "图书", "家居"])
        subcategory = random.choice(["手机", "电脑", "上衣", "零食", "小说"])
        price = round(random.uniform(10.0, 1000.0), 2)
        rating = random.randint(1, 5)
        is_active = random.choice([True, False])
        created_time = int(datetime.now().timestamp()) + random.randint(-1000000, 1000000)
        tags = random.sample(["热门", "新品", "促销", "推荐", "限量"], random.randint(1, 3))

        data.append({
            "id": id_val,
            "embedding": embedding,
            "keyword_embedding": keyword_embedding,
            "title": title,
            "content": content,
            "category": category,
            "subcategory": subcategory,
            "price": price,
            "rating": rating,
            "is_active": is_active,
            "created_time": created_time,
            "tags": tags
        })

    return data


def insert_collection(collection):
    # 插入数据
    complex_data = generate_complex_data(1000)

    # 转换为Milvus需要的格式
    insert_data = [
        [item["id"] for item in complex_data],
        [item["embedding"] for item in complex_data],
        [item["keyword_embedding"] for item in complex_data],
        [item["title"] for item in complex_data],
        [item["content"] for item in complex_data],
        [item["category"] for item in complex_data],
        [item["subcategory"] for item in complex_data],
        [item["price"] for item in complex_data],
        [item["rating"] for item in complex_data],
        [item["is_active"] for item in complex_data],
        [item["created_time"] for item in complex_data],
        [item["tags"] for item in complex_data]
    ]

    insert_result = collection.insert(insert_data)
    print(f"插入了 {len(insert_result.primary_keys)} 条复杂记录")

6. 单向量查询

def single_vector_search_data(query_vector, top_k=10, filter_expr=None):
    """
    单向量搜索
    metric_type 距离度量方式:
        "L2" :欧氏距离(数值越小越相似)
        "IP" :内积(越大越相似)
        "COSINE" :余弦相似度
    nprobe IVF 类索引:
        如果用了 IVF 类索引,表示搜索时查看多少个“最近的聚类中心”
            • 值越大 → 精度越高,速度越慢
            • 值越小 → 速度快,精度低
        合法范围:1 ≤ nprobe ≤ nlist
        取值推荐:一般取nlist的 10%~50%,如nlist=128 → nprobe=10~64
    """
    search_params = {
        "metric_type": "L2",  # 使用欧氏距离(L2 距离)
        "params": {"nprobe": 10}  # IVF 索引参数:每次搜索探测 10 个聚类中心
    }

    results = collection.search(
        data=[query_vector],  # 要搜索的向量列表
        anns_field="embedding",  # 在哪个字段上做近似最近邻搜索
        param=search_params,  # 搜索参数(如算法、精度)
        limit=top_k,  # 返回前 k 个最相似的结果
        expr=filter_expr,  # 可选:过滤条件(如 price > 100)
        output_fields=["id", "title", "category", "price", "rating"]  # 返回哪些标量字段
    )

    return results


def single_vector_search():
    # 执行向量搜索
    query_vector = [random.random() for _ in range(128)]
    search_results = single_vector_search_data(query_vector, top_k=5)

    print("向量搜索结果:")
    for i, result in enumerate(search_results[0]):
        print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}, "
              f"分类={result.entity.get('category')}, 价格={result.entity.get('price')}")

7. 多向量查询

def muli_vector_hybrid_search_data(query_embedding, keyword_embedding, top_k=10):
    """混合搜索:结合内容向量和关键词向量"""

    # 内容向量搜索
    content_search_param = {
        "data": [query_embedding],
        "anns_field": "embedding",
        "param": {
            "metric_type": "L2",
            "params": {"nprobe": 10}
        },
        "limit": top_k * 2
    }

    # 关键词向量搜索
    keyword_search_param = {
        "data": [keyword_embedding],
        "anns_field": "keyword_embedding",
        "param": {
            "metric_type": "IP",
            "params": {"nprobe": 10}
        },
        "limit": top_k * 2
    }

    # 执行混合搜索
    search_result_1 = AnnSearchRequest(**content_search_param)
    search_result_2 = AnnSearchRequest(**keyword_search_param)

    results = collection.hybrid_search(
        reqs=[search_result_1, search_result_2],
        rerank=RRFRanker(),
        limit=top_k,
        output_fields=["id", "title", "category", "price"]
    )

    return results


def muli_vector_hybrid_search():
    # 执行混合搜索
    content_query = [random.random() for _ in range(128)]
    keyword_query = [random.random() for _ in range(64)]
    hybrid_results = muli_vector_hybrid_search_data(content_query, keyword_query, top_k=5)

    print("\n混合搜索结果:")
    for i, result in enumerate(hybrid_results[0]):
        print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}")

8. 标量查询

def scalar_query(expr, limit=100):
    """基础标量查询"""
    results = collection.query(
        expr=expr,
        output_fields=["id", "title", "category", "price", "rating", "is_active"],
        limit=limit
    )
    return results


# --- 简单标量搜索 ---
def simple_scalar_query():
    # 各种标量查询示例
    print("\n--- 标量查询示例 --- :")

    # 1. 等值查询
    results = scalar_query('category == "电子产品"')
    print(f"电子产品数量: {len(results)}")

    # 2. 范围查询
    results = scalar_query('price >= 100.0 and price <= 500.0')
    print(f"价格在100-500之间的商品数量: {len(results)}")

    # 3. 多条件组合查询
    results = scalar_query('category == "服装" and rating >= 4 and is_active == True')
    print(f"高评分服装商品数量: {len(results)}")

    # 4. 数组字段查询
    results = scalar_query('array_contains(tags, "热门")')
    print(f"热门商品数量: {len(results)}")

    # 5. 时间范围查询
    current_time = int(datetime.now().timestamp())
    one_week_ago = current_time - 7 * 24 * 3600
    results = scalar_query(f'created_time >= {one_week_ago}')
    print(f"一周内创建的商品数量: {len(results)}")


# --- 复杂标量查询 ---
def complex_scalar_query():
    """复杂标量查询示例"""

    print("\n--- 复杂标量查询示例 --- :")
    # 1. IN查询
    results = scalar_query('category in ["电子产品", "服装"]')
    print(f"电子产品和服装商品数量: {len(results)}")

    # 2. LIKE查询(模糊匹配)
    results = scalar_query('title like "商品标题_1%"')
    print(f"标题以'商品标题_1'开头的商品数量: {len(results)}")

    # 3. 数组包含多个元素
    results = scalar_query('array_contains_any(tags, ["热门", "推荐"])')
    print(f"热门或推荐商品数量: {len(results)}")

    # 4. 复杂组合条件
    complex_expr = '''
    (category == "电子产品" and price > 200.0) or 
    (category == "服装" and rating >= 4) and 
    is_active == True
    '''
    results = scalar_query(complex_expr)
    print(f"复杂条件查询结果数量: {len(results)}")


def vector_search_with_scalar_filter(query_vector, filter_expr, top_k=10):
    """带标量过滤的向量搜索"""
    search_params = {
        "metric_type": "L2",
        "params": {"nprobe": 10}
    }

    results = collection.search(
        data=[query_vector],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        expr=filter_expr,
        output_fields=["id", "title", "category", "price", "rating"]
    )

    return results

9. 组合查询(标量+向量)

def vector_search_with_scalar():
    # 带过滤的向量搜索
    query_vector = [random.random() for _ in range(128)]
    filter_expr = 'category == "电子产品" and price < 500.0 and rating >= 3'

    filtered_results = vector_search_with_scalar_filter(query_vector, filter_expr, top_k=5)

    print("\n=== 带标量过滤的向量搜索结果 ===")
    for i, result in enumerate(filtered_results[0]):
        print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}, "
              f"价格={result.entity.get('price')}, 评分={result.entity.get('rating')}")

10. 数据更新

def update_record(record_id, update_fields):
    """更新记录(标量和向量字段)"""
    # 1. 查询原记录
    expr = f"id == {record_id}"
    original_data = collection.query(
        expr=expr,
        output_fields=["id", "embedding", "keyword_embedding", "title", "content",
                       "category", "subcategory", "price", "rating", "is_active",
                       "created_time", "tags"]
    )

    if not original_data:
        print(f"记录 {record_id} 不存在")
        return False

    original = original_data[0]

    # 2. 删除原记录
    collection.delete(expr=expr)

    # 3. 准备新数据(保留未更新的字段)
    new_data = [
        [original['id']],
        [update_fields.get('embedding', original['embedding'])],
        [update_fields.get('keyword_embedding', original.get('keyword_embedding', [0] * 64))],
        [update_fields.get('title', original['title'])],
        [update_fields.get('content', original['content'])],
        [update_fields.get('category', original['category'])],
        [update_fields.get('subcategory', original['subcategory'])],
        [update_fields.get('price', original['price'])],
        [update_fields.get('rating', original['rating'])],
        [update_fields.get('is_active', original['is_active'])],
        [original['created_time']],  # 保持创建时间不变
        [update_fields.get('tags', original.get('tags', []))]
    ]

    # 4. 插入更新后的记录
    collection.insert(new_data)

    return True


# 更新操作(标量和向量)
def update():
    # 验证更新结果
    updated_data_pre = collection.query(
        expr="id == 10",
        output_fields=["id", "title", "price", "rating", "tags"]
    )
    print("\n更新前的数据:", updated_data_pre[0])

    # 更新示例
    update_fields = {
        'title': '更新后的商品标题003',
        'price': 199.99,
        'rating': 4,
        'tags': ['热门', '新品', '促销', '限时']
    }
    success = update_record(record_id=10, update_fields=update_fields)
    if success:
        sleep(2)
        print("\n记录更新成功")
        # 验证更新结果
        updated_data = collection.query(
            expr="id == 10",
            output_fields=["id", "title", "price", "rating", "tags"]
        )
        print("\n更新后的数据:", updated_data[0])

11. 数据删除


def delete_by_conditions(expr):
    """根据标量条件删除记录"""
    result = collection.delete(expr=expr)
    return result


# 删除
def delete():
    # 各种删除操作示例
    print("\n=== 删除操作示例 ===")

    # 1. 删除单个记录
    delete_result = delete_by_conditions("id == 50")
    print(f"\n删除ID为50的记录数: {len(delete_result.primary_keys)}")

    # 2. 删除满足条件的记录
    delete_result = delete_by_conditions('category == "食品" and price < 50.0')
    print(f"\n删除价格低于50的食品记录数: {len(delete_result.primary_keys)}")

    # 3. 删除多个分类的记录
    delete_result = delete_by_conditions('category in ["图书", "家居"]')
    print(f"\n删除图书和家居记录数: {len(delete_result.primary_keys)}")

    # 4. 删除时间范围外的记录
    old_time = int(datetime.now().timestamp()) - 30 * 24 * 3600  # 30天前
    delete_result = delete_by_conditions(f'created_time < {old_time}')
    print(f"\n删除30天前的记录数: {len(delete_result.primary_keys)}")

12. 统计与聚合

def get_collection_stats():
    """获取集合统计信息"""
    print("=== 集合统计信息 ===")
    print(f"总实体数: {collection.num_entities}")

    # 查询不同分类的数量
    categories = ["电子产品", "服装", "食品", "图书", "家居"]
    for category in categories:
        results = collection.query(
            expr=f'category == "{category}"',
            output_fields=["id"]
        )
        print(f"{category}分类商品数: {len(results)}")

    # 查询评分分布
    for rating in range(1, 6):
        results = collection.query(
            expr=f'rating == {rating}',
            output_fields=["id"]
        )
        print(f"{rating}星评分商品数: {len(results)}")

13. main方法

if __name__ == '__main__':
    # 创建集合
    # create_collection()
    # 创建索引
    # create_index(collection)
    # 插入数据
    # insert_collection(collection)
    # 单向量搜索
    # single_vector_search()
    # 混合向量搜索
    # muli_vector_hybrid_search()
    # 标量查询
    # simple_scalar_query()
    # 复杂标量查询
    # complex_scalar_query()
    # 混合查询
    # vector_search_with_scalar()
    # 更新数据
    # update()
    # 删除
    # delete()
    # 统计和聚合
    # get_collection_stats()
    # 清理资源
    # cleanup()
    # 性能优化和最佳实践
    performance_tips()


网站公告

今日签到

点亮在社区的每一天
去签到