当前位置: 首页 > news >正文

milvus实战-基于Ollama+bge-large-zh搭建嵌入模型,fastAPI提供http服务将PDF文件写入milvus向量库

0 环境准备

  • ollama已部署嵌入模型quentinz/bge-large-zh-v1.5:latest
  • 已安装miniconda环境
  • 具备科学上网条件(docker安装milvus时需要)

1 milvus安装

1.1 启动Docker Desktop

windows环境下,docker拉取镜像需要先启动Docker  Desktop。否则拉取镜像时会报错,无法拉取镜像。

1.2 下载milvus的docker-compose.yml

在powershell输入以下命令

Invoke-WebRequest https://github.com/milvus-io/milvus/releases/download/v2.4.15/milvus-standalone-docker-compose.yml -OutFile docker-compose.yml

1.3 启动milvus

docker compose up -d

2 开发环境准备

2.1 创建python环境

        通过conda命令创建python环境,保持买个项目的python环境独立,防止项目之间包冲突,方便管理项目依赖。

conda create -n LangchainDemo python=3.10

2.2 pycharm创建项目

  1. 解释器类型:选择自定义环境
  2. 环境:选择现有
  3. 类型:选择conda
  4. 环境:选择上一步创建的环境

2.3 激活python环境

 conda activate LangchainDemo

2.4 安装项目依赖包

        安装项目必要的依赖,包含fastapi、milvus、pdfplumber、ollama等。pdfpy解析可能存在乱码,选用pdfplumber效果更佳。

pip install fastapi uvicorn pymilvus python-multipart pdfplumber ollama

3 程序实现

3.1 导入依赖包

import os
import uuid
import asyncio
import pdfplumber
from fastapi import FastAPI, UploadFile, File
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from pymilvus.orm import utility
from tenacity import retry, stop_after_attempt
import ollama

3.2 定义FastAPI

app = FastAPI()

3.3 定义文本切割逻辑

        使用pdfplumber打开pdf文件,按自然段落分割文本,设置默认500字符一个分块,且有100个字符重叠。


def extract_text_with_pdfnumber(pdf_path):
    """使用pdfplumber提取PDF文本(保留段落结构)[2]()[5]()"""
    with pdfplumber.open(pdf_path) as pdf:
        text = []
        for page in pdf.pages:
            # 按自然段落分割文本
            paragraphs = page.extract_text()
            text.append(paragraphs.replace('\n', ''))
        return '\n\n'.join(text)

def chunk_with_overlap(text, chunk_size=500, overlap=100):
    """带重叠的分块策略[1]()"""
    chunks = []
    words = text.split()
    start_idx = 0
    while start_idx < len(words):
        end_idx = start_idx + chunk_size
        chunk = ' '.join(words[start_idx:end_idx])
        chunks.append(chunk)
        start_idx = end_idx - overlap  #  设置重叠部分

        # 处理末尾不足的情况
        if end_idx < len(words):
            break

    return chunks

3.4 构建嵌入模型

        连接ollama部署的嵌入模型,bge-large-zh对中文字符处理较好。设置调用嵌入模型失败时可重试3次。


@retry(stop=stop_after_attempt(3))
async def generate_embeddings(text):
    """使用Ollama生成文本嵌入"""
    loop = asyncio.get_event_loop()

    return await loop.run_in_executor(None,
                                      lambda: ollama.Client(host='http://localhost:11434').embeddings(
                                          model="quentinz/bge-large-zh-v1.5:latest", prompt=text)['embedding']
           

3.5 连接milvus

connections.connect("default", host="localhost", port="19530")
collection_name = "pdf_documents"

3.6 构建milvus collection

        定义pdf文本存储的collection的schema,对应数据库的表和字段。

if not utility.has_collection(collection_name):
    # 创建集合
    fields = [
        FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=20000),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
        FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256)
    ]
    schema = CollectionSchema(fields=fields, description="pdf_documents")
    collection = Collection(name=collection_name, schema=schema)

    # 创建索引
    index_params = {
        "index_type": "IVF_FLAT",
        "metric_type": "L2",
        "params": {"nlist": 128}
    }
    collection.create_index("vector", index_params)
else:
    collection = Collection(collection_name)

注意:bge-large-zh只能支持到1024维度,500字符对应content的大于需要20000的长度。如果前面修改了嵌入模型或者分块大小,此处也需要调整。

3.7 定义http上传pdf文件处理流程

@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(...),
                     chunk_size=500,
                     overlap=100):
    print(f"开始上传文件《{file.filename}》")
    """上传PDF文件"""
    try:
        #  临时保存文件
        temp_path = f"temp_{uuid.uuid4()}.pdf"
        with open(temp_path, "wb") as f:
            # 流式写入文件
            while chunk := await file.read(1024):
                f.write(chunk)

        # 解析PDF
        text = extract_text_with_pdfnumber(temp_path)
        os.remove(temp_path)

        # 分块处理
        chunks = chunk_with_overlap(text, chunk_size, overlap)

        # 批量生成嵌入
        embeddings = []
        for chunk in chunks:
            embeddings.append(await generate_embeddings(chunk))

        # 构建插入数据
        entities = [
            {
                "id": str(uuid.uuid4()),
                "content": chunk,
                "vector": emb,
                "source": file.filename
            } for chunk, emb in zip(chunks, embeddings)
        ]

        batch_size = 100
        for i in range(0, len(entities), batch_size):
            insert_result = collection.insert(entities[i:i+batch_size])

        collection.flush()
        return {"status": "success", "chunks_processed": len(chunks)}

    except Exception as e:
        return {"error": str(e)}, 500

3.8 实现查询milvus逻辑

@app.get("/search")
async def semantic_search(query: str, top_k=5):
    query_embedding = await generate_embeddings(query)
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

    # 加载集合到内存中
    collection.load()

    results = collection.search(
        data=[query_embedding],
        anns_field="vector",
        param=search_params,
        limit=top_k,
        output_fields=["content", "source"]
    )
    return [{"score": hit.score, "metadata": hit.entity.to_dict()} for hit in results[0]]

3.9 启动http服务

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8321)

4 测试

        建议使用apifox请求http接口。

4.1 测试上传PDF文件解析入库

        用post请求,在body标签页选择form-data填写file参数,参数类型选择file,然后上传文件。上传成功后返回success。

4.2 查询milvus测试

        用get请求,在param中填写query字段,并填写需要查询的内容,如下图:

附录一:完整代码示例

import os
import uuid
import asyncio
import pdfplumber
from fastapi import FastAPI, UploadFile, File
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from pymilvus.orm import utility
from tenacity import retry, stop_after_attempt
import ollama

app = FastAPI()

def extract_text_with_pdfnumber(pdf_path):
    """使用pdfplumber提取PDF文本(保留段落结构)[2]()[5]()"""
    with pdfplumber.open(pdf_path) as pdf:
        text = []
        for page in pdf.pages:
            # 按自然段落分割文本
            paragraphs = page.extract_text()
            text.append(paragraphs.replace('\n', ''))
        return '\n\n'.join(text)

def chunk_with_overlap(text, chunk_size=500, overlap=100):
    """带重叠的分块策略[1]()"""
    chunks = []
    words = text.split()
    start_idx = 0
    while start_idx < len(words):
        end_idx = start_idx + chunk_size
        chunk = ' '.join(words[start_idx:end_idx])
        chunks.append(chunk)
        start_idx = end_idx - overlap  #  设置重叠部分

        # 处理末尾不足的情况
        if end_idx < len(words):
            break

    return chunks

@retry(stop=stop_after_attempt(3))
async def generate_embeddings(text):
    """使用Ollama生成文本嵌入"""
    loop = asyncio.get_event_loop()

    return await loop.run_in_executor(None,
                                      lambda: ollama.Client(host='http://localhost:11434').embeddings(
                                          model="quentinz/bge-large-zh-v1.5:latest", prompt=text)['embedding']
                                      )

connections.connect("default", host="localhost", port="19530")
collection_name = "pdf_documents"

# 检查集合是否存在,如果存在则删除
# if utility.has_collection(collection_name):
#     collection = Collection(collection_name)
#     collection.drop()

if not utility.has_collection(collection_name):
    # 创建集合
    fields = [
        FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=20000),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
        FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256)
    ]
    schema = CollectionSchema(fields=fields, description="pdf_documents")
    collection = Collection(name=collection_name, schema=schema)

    # 创建索引
    index_params = {
        "index_type": "IVF_FLAT",
        "metric_type": "L2",
        "params": {"nlist": 128}
    }
    collection.create_index("vector", index_params)
else:
    collection = Collection(collection_name)

@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(...),
                     chunk_size=500,
                     overlap=100):
    print(f"开始上传文件《{file.filename}》")
    """上传PDF文件"""
    try:
        #  临时保存文件
        temp_path = f"temp_{uuid.uuid4()}.pdf"
        with open(temp_path, "wb") as f:
            # 流式写入文件
            while chunk := await file.read(1024):
                f.write(chunk)

        # 解析PDF
        text = extract_text_with_pdfnumber(temp_path)
        os.remove(temp_path)

        # 分块处理
        chunks = chunk_with_overlap(text, chunk_size, overlap)

        # 批量生成嵌入
        embeddings = []
        for chunk in chunks:
            embeddings.append(await generate_embeddings(chunk))

        # 构建插入数据
        entities = [
            {
                "id": str(uuid.uuid4()),
                "content": chunk,
                "vector": emb,
                "source": file.filename
            } for chunk, emb in zip(chunks, embeddings)
        ]

        batch_size = 100
        for i in range(0, len(entities), batch_size):
            insert_result = collection.insert(entities[i:i+batch_size])

        collection.flush()
        return {"status": "success", "chunks_processed": len(chunks)}

    except Exception as e:
        return {"error": str(e)}, 500


@app.get("/search")
async def semantic_search(query: str, top_k=5):
    query_embedding = await generate_embeddings(query)
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

    # 加载集合到内存中
    collection.load()

    results = collection.search(
        data=[query_embedding],
        anns_field="vector",
        param=search_params,
        limit=top_k,
        output_fields=["content", "source"]
    )
    return [{"score": hit.score, "metadata": hit.entity.to_dict()} for hit in results[0]]


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8321)

附录二:错误处理

        apifox请求http接口填写接口地址时,如果代码路径search或者upload_pdf后面有"/"斜杠则在apifox请求时也要加"/"斜杠,否则会报错307 Temporary Redirect,Expected boundary character 45, got 8 at index 2,如下:

此时错误的请求,url最后没有斜杠"/"

此时正确的请求url应该最后有斜杠"/",如下:

相关文章:

  • 算法·动态规划·入门
  • Parsing error: Unexpected token, expected “,“
  • 矩阵可相似对角化
  • 深入分析和讲解虚拟化技术原理
  • 洛谷 [语言月赛 202503] 题解(C++)
  • vlan路由间配置
  • 飞牛-NAS风扇速度设置
  • 1、双指针法
  • 自由学习记录(46)
  • UE4学习笔记 FPS游戏制作11 把枪提出为对象
  • 2025.3.23机器学习笔记:文献阅读
  • soft回归用内置函数
  • 软考-高项,知识点一览八 整合管理
  • CUDA Lazy Loading:优化GPU程序初始化与内存使用的利器
  • 【蓝桥杯】12111暖气冰场(多源BFS 或者 二分)
  • ‘闭包‘, ‘装饰器‘及其应用场景
  • 西门子200smart之modbus_TCP(做从站与第三方设备)通讯
  • 从头开始学C语言第二十九天——指针数组
  • JavaScript-日期对象与节点操作详解
  • Apache Flink技术原理深入解析:任务执行流程全景图
  • 最高法、证监会:常态化开展证券纠纷代表人诉讼,降低投资者维权成本
  • 苏轼“胡为适南海”?
  • 蚊媒传染病、手足口病……上海疾控发布近期防病提示
  • 科技部等七部门:优先支持取得关键核心技术突破的科技型企业上市融资
  • 阿尔巴尼亚执政党连续第四次赢得议会选举,反对党此前雇用特朗普竞选经理
  • 云南大理铁路枢纽工程建设取得两大进展,预计明年建成