当前位置: 首页 > news >正文

华为云Flexus+DeepSeek征文 | 基于DeepSeek-V3构建企业知识库问答机器人实战

作者简介

我是摘星,一名专注于云计算和AI技术的开发者。本次通过华为云MaaS平台体验DeepSeek系列模型,将实际使用经验分享给大家,希望能帮助开发者快速掌握华为云AI服务的核心能力。

目录

作者简介

1. 引言

2. 技术选型与架构设计

2.1 技术栈选择

2.2 系统架构设计

2.3 核心工作流程

3. 环境准备与部署

3.1 华为云Flexus环境配置

3.2 基础环境安装

3.3 Python依赖安装

4. 核心功能实现

4.1 文档预处理模块

4.2 向量数据库模块

4.3 DeepSeek-V3集成模块

4.4 问答引擎核心模块

5. Web服务接口实现

5.1 FastAPI后端服务

5.2 Streamlit前端界面

6. 容器化部署

6.1 Docker配置

6.2 Docker Compose配置

7. 系统优化与监控

7.1 性能优化策略

7.2 监控与日志系统

8. 测试与验证

8.1 单元测试

8.2 性能测试

9. 部署指南

9.1 生产环境部署

9.2 Nginx配置

10. 总结与展望

10.1 项目总结

10.2 使用效果分析

10.3 未来改进方向

10.4 技术要点回顾

参考文献


 

1. 引言

随着人工智能技术的快速发展,企业对于智能化知识管理和问答系统的需求日益增长。传统的知识管理方式往往存在信息检索效率低、知识分散、更新不及时等问题。而基于大语言模型的知识库问答机器人能够有效解决这些痛点,为企业提供高效、准确的知识服务。

本文将详细介绍如何利用华为云Flexus云服务器和DeepSeek-V3大语言模型,构建一个功能完善的企业知识库问答机器人。通过本实战项目,您将学会:

  • 如何设计企业级知识库问答系统架构
  • 如何部署和配置DeepSeek-V3模型
  • 如何实现知识库的向量化存储和检索
  • 如何构建用户友好的问答界面
  • 如何优化系统性能和准确性

图1: 企业知识库问答机器人系统架构图

2. 技术选型与架构设计

2.1 技术栈选择

在构建企业知识库问答机器人时,我们选择以下技术栈:

  • 云平台: 华为云Flexus云服务器
  • 大语言模型: DeepSeek-V3
  • 向量数据库: ChromaDB
  • 文本嵌入: Sentence-Transformers
  • Web框架: FastAPI + Streamlit
  • 数据处理: Python + pandas
  • 容器化: Docker

2.2 系统架构设计

整个系统采用模块化设计,主要包含以下几个核心组件:

图1: 企业知识库问答机器人系统架构图

2.3 核心工作流程

系统的核心工作流程如下:

图2: 系统工作流程图

3. 环境准备与部署

3.1 华为云Flexus环境配置

首先,我们需要在华为云上创建Flexus云服务器实例:

# 推荐配置

# CPU: 8核

# 内存: 32GB

# 存储: 500GB SSD

# 操作系统: Ubuntu 22.04 LTS

3.2 基础环境安装

在服务器上安装必要的软件环境:

#!/bin/bash
# 更新系统包
sudo apt update && sudo apt upgrade -y# 安装Python 3.11
sudo apt install python3.11 python3.11-pip python3.11-venv -y# 安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh# 安装Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose# 创建项目目录
mkdir -p /opt/enterprise-qa-bot
cd /opt/enterprise-qa-bot

3.3 Python依赖安装

创建虚拟环境并安装依赖:

# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
streamlit==1.28.1
chromadb==0.4.15
sentence-transformers==2.2.2
openai==1.3.6
pandas==2.1.3
numpy==1.24.3
pydantic==2.5.0
python-multipart==0.0.6
aiofiles==23.2.1
langchain==0.0.340
langchain-community==0.0.1
python-docx==0.8.11
PyPDF2==3.0.1
tiktoken==0.5.1
# 创建虚拟环境
python3.11 -m venv venv
source venv/bin/activate# 安装依赖
pip install -r requirements.txt

4. 核心功能实现

4.1 文档预处理模块

首先实现文档预处理模块,用于处理各种格式的企业文档:

# document_processor.py
import os
import pandas as pd
from typing import List, Dict
from pathlib import Path
import PyPDF2
from docx import Document
import tiktokenclass DocumentProcessor:"""文档预处理器类支持处理PDF、Word、TXT等格式的文档"""def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):"""初始化文档处理器Args:chunk_size: 文本块大小chunk_overlap: 文本块重叠大小"""self.chunk_size = chunk_sizeself.chunk_overlap = chunk_overlapself.encoding = tiktoken.get_encoding("cl100k_base")def extract_text_from_pdf(self, file_path: str) -> str:"""从PDF文件中提取文本Args:file_path: PDF文件路径Returns:提取的文本内容"""text = ""try:with open(file_path, 'rb') as file:pdf_reader = PyPDF2.PdfReader(file)for page in pdf_reader.pages:text += page.extract_text() + "\n"except Exception as e:print(f"处理PDF文件 {file_path} 时出错: {str(e)}")return textdef extract_text_from_docx(self, file_path: str) -> str:"""从Word文档中提取文本Args:file_path: Word文档路径Returns:提取的文本内容"""text = ""try:doc = Document(file_path)for paragraph in doc.paragraphs:text += paragraph.text + "\n"except Exception as e:print(f"处理Word文档 {file_path} 时出错: {str(e)}")return textdef extract_text_from_txt(self, file_path: str, encoding: str = 'utf-8') -> str:"""从文本文件中提取内容Args:file_path: 文本文件路径encoding: 文件编码Returns:文件内容"""try:with open(file_path, 'r', encoding=encoding) as file:return file.read()except UnicodeDecodeError:# 尝试其他编码for enc in ['gbk', 'gb2312', 'latin-1']:try:with open(file_path, 'r', encoding=enc) as file:return file.read()except UnicodeDecodeError:continueexcept Exception as e:print(f"处理文本文件 {file_path} 时出错: {str(e)}")return ""def split_text_into_chunks(self, text: str, metadata: Dict = None) -> List[Dict]:"""将文本分割成块Args:text: 输入文本metadata: 文档元数据Returns:文本块列表"""if not text.strip():return []# 使用tiktoken计算token数量tokens = self.encoding.encode(text)chunks = []start = 0chunk_id = 0while start < len(tokens):# 确定当前块的结束位置end = min(start + self.chunk_size, len(tokens))# 解码当前块chunk_tokens = tokens[start:end]chunk_text = self.encoding.decode(chunk_tokens)# 创建块数据chunk_data = {'id': f"{metadata.get('filename', 'unknown')}_{chunk_id}",'text': chunk_text.strip(),'metadata': {**(metadata or {}),'chunk_id': chunk_id,'start_token': start,'end_token': end}}chunks.append(chunk_data)# 移动到下一个块的开始位置(考虑重叠)start = end - self.chunk_overlapchunk_id += 1return chunksdef process_documents(self, document_dir: str) -> List[Dict]:"""批量处理文档目录Args:document_dir: 文档目录路径Returns:处理后的文档块列表"""all_chunks = []# 支持的文件格式supported_extensions = {'.pdf', '.docx', '.txt', '.md'}# 遍历文档目录for file_path in Path(document_dir).rglob('*'):if file_path.suffix.lower() in supported_extensions:print(f"正在处理文件: {file_path}")# 根据文件格式提取文本if file_path.suffix.lower() == '.pdf':text = self.extract_text_from_pdf(str(file_path))elif file_path.suffix.lower() == '.docx':text = self.extract_text_from_docx(str(file_path))else:  # .txt, .mdtext = self.extract_text_from_txt(str(file_path))# 准备元数据metadata = {'filename': file_path.name,'filepath': str(file_path),'file_type': file_path.suffix.lower(),'file_size': file_path.stat().st_size}# 分割文本并添加到结果中chunks = self.split_text_into_chunks(text, metadata)all_chunks.extend(chunks)print(f"文件 {file_path.name} 处理完成,生成 {len(chunks)} 个文本块")print(f"总共处理了 {len(all_chunks)} 个文本块")return all_chunks

4.2 向量数据库模块

实现向量数据库管理模块,用于存储和检索文档向量:

# vector_store.py
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional
import numpy as npclass VectorStore:"""向量数据库管理类基于ChromaDB实现文档向量存储和检索"""def __init__(self, persist_directory: str = "./chroma_db",collection_name: str = "enterprise_knowledge",embedding_model: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):"""初始化向量数据库Args:persist_directory: 数据库持久化目录collection_name: 集合名称embedding_model: 嵌入模型名称"""self.persist_directory = persist_directoryself.collection_name = collection_name# 初始化ChromaDB客户端self.client = chromadb.PersistentClient(path=persist_directory,settings=Settings(allow_reset=True))# 初始化嵌入模型print(f"正在加载嵌入模型: {embedding_model}")self.embedding_model = SentenceTransformer(embedding_model)print("嵌入模型加载完成")# 获取或创建集合self.collection = self.client.get_or_create_collection(name=collection_name,metadata={"description": "企业知识库向量存储"})def add_documents(self, documents: List[Dict]) -> None:"""添加文档到向量数据库Args:documents: 文档列表,每个文档包含id、text和metadata"""if not documents:print("没有文档需要添加")return# 提取文本内容texts = [doc['text'] for doc in documents]ids = [doc['id'] for doc in documents]metadatas = [doc.get('metadata', {}) for doc in documents]print(f"正在生成 {len(texts)} 个文档的向量表示...")# 生成嵌入向量embeddings = self.embedding_model.encode(texts, show_progress_bar=True,normalize_embeddings=True).tolist()print("向量生成完成,正在存储到数据库...")# 批量添加到ChromaDBbatch_size = 100for i in range(0, len(documents), batch_size):batch_ids = ids[i:i+batch_size]batch_embeddings = embeddings[i:i+batch_size]batch_texts = texts[i:i+batch_size]batch_metadatas = metadatas[i:i+batch_size]self.collection.add(ids=batch_ids,embeddings=batch_embeddings,documents=batch_texts,metadatas=batch_metadatas)print(f"已处理 {min(i+batch_size, len(documents))}/{len(documents)} 个文档")print("所有文档已成功添加到向量数据库")def search_similar_documents(self, query: str, top_k: int = 5,similarity_threshold: float = 0.7) -> List[Dict]:"""检索相似文档Args:query: 查询文本top_k: 返回的文档数量similarity_threshold: 相似度阈值Returns:相似文档列表"""# 生成查询向量query_embedding = self.embedding_model.encode([query], normalize_embeddings=True)[0].tolist()# 在ChromaDB中搜索results = self.collection.query(query_embeddings=[query_embedding],n_results=top_k,include=['documents', 'metadatas', 'distances'])# 格式化结果similar_docs = []for i in range(len(results['ids'][0])):distance = results['distances'][0][i]similarity = 1 - distance  # ChromaDB使用欧几里得距离# 过滤低相似度的文档if similarity >= similarity_threshold:doc = {'id': results['ids'][0][i],'text': results['documents'][0][i],'metadata': results['metadatas'][0][i],'similarity': similarity}similar_docs.append(doc)return similar_docsdef get_collection_stats(self) -> Dict:"""获取集合统计信息Returns:统计信息字典"""count = self.collection.count()return {'total_documents': count,'collection_name': self.collection_name,'persist_directory': self.persist_directory}def clear_collection(self) -> None:"""清空集合"""self.client.delete_collection(self.collection_name)self.collection = self.client.create_collection(name=self.collection_name,metadata={"description": "企业知识库向量存储"})print("集合已清空")

4.3 DeepSeek-V3集成模块

实现与DeepSeek-V3模型的集成:

# deepseek_client.py
import openai
from typing import List, Dict, Optional
import json
import timeclass DeepSeekClient:"""DeepSeek-V3模型客户端提供与DeepSeek模型的交互接口"""def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):"""初始化DeepSeek客户端Args:api_key: DeepSeek API密钥base_url: API基础URL"""self.client = openai.OpenAI(api_key=api_key,base_url=base_url)# 模型配置self.model_name = "deepseek-chat"self.max_tokens = 2000self.temperature = 0.1def generate_answer(self, question: str, context_documents: List[Dict],system_prompt: Optional[str] = None) -> Dict:"""基于上下文文档生成答案Args:question: 用户问题context_documents: 相关上下文文档system_prompt: 系统提示词Returns:生成的答案和元信息"""# 构建上下文context = self._build_context(context_documents)# 默认系统提示词if system_prompt is None:system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答上下文信息:
{context}
"""# 构建消息messages = [{"role": "system","content": system_prompt.format(context=context)},{"role": "user","content": f"问题:{question}"}]try:# 调用DeepSeek APIstart_time = time.time()response = self.client.chat.completions.create(model=self.model_name,messages=messages,max_tokens=self.max_tokens,temperature=self.temperature,stream=False)response_time = time.time() - start_time# 提取回答answer = response.choices[0].message.content# 构建返回结果result = {'answer': answer,'question': question,'context_documents': context_documents,'response_time': response_time,'model_info': {'model': self.model_name,'temperature': self.temperature,'max_tokens': self.max_tokens},'usage': {'prompt_tokens': response.usage.prompt_tokens,'completion_tokens': response.usage.completion_tokens,'total_tokens': response.usage.total_tokens}}return resultexcept Exception as e:print(f"调用DeepSeek API时出错: {str(e)}")return {'answer': "抱歉,生成答案时出现错误,请稍后再试。",'error': str(e),'question': question}def _build_context(self, documents: List[Dict]) -> str:"""构建上下文字符串Args:documents: 文档列表Returns:格式化的上下文字符串"""if not documents:return "暂无相关上下文信息。"context_parts = []for i, doc in enumerate(documents, 1):# 获取文档信息text = doc.get('text', '')metadata = doc.get('metadata', {})similarity = doc.get('similarity', 0)# 格式化文档doc_context = f"""
文档{i}(相似度: {similarity:.3f}):
来源:{metadata.get('filename', '未知')}
内容:{text}
---
"""context_parts.append(doc_context)return "\n".join(context_parts)def stream_generate_answer(self, question: str, context_documents: List[Dict],system_prompt: Optional[str] = None):"""流式生成答案(用于实时显示)Args:question: 用户问题context_documents: 相关上下文文档system_prompt: 系统提示词Yields:生成的文本片段"""# 构建上下文和消息(与generate_answer相同)context = self._build_context(context_documents)if system_prompt is None:system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答上下文信息:
{context}
"""messages = [{"role": "system","content": system_prompt.format(context=context)},{"role": "user","content": f"问题:{question}"}]try:# 流式调用APIresponse = self.client.chat.completions.create(model=self.model_name,messages=messages,max_tokens=self.max_tokens,temperature=self.temperature,stream=True)for chunk in response:if chunk.choices[0].delta.content is not None:yield chunk.choices[0].delta.contentexcept Exception as e:yield f"错误:{str(e)}"

4.4 问答引擎核心模块

实现整合各个组件的问答引擎:

# qa_engine.py
from typing import List, Dict, Optional
from vector_store import VectorStore
from deepseek_client import DeepSeekClient
from document_processor import DocumentProcessor
import timeclass QAEngine:"""问答引擎核心类整合文档处理、向量检索和答案生成功能"""def __init__(self, deepseek_api_key: str,vector_store_config: Dict = None,retrieval_config: Dict = None):"""初始化问答引擎Args:deepseek_api_key: DeepSeek API密钥vector_store_config: 向量数据库配置retrieval_config: 检索配置"""# 初始化向量数据库vs_config = vector_store_config or {}self.vector_store = VectorStore(**vs_config)# 初始化DeepSeek客户端self.deepseek_client = DeepSeekClient(api_key=deepseek_api_key)# 初始化文档处理器self.doc_processor = DocumentProcessor()# 检索配置self.retrieval_config = retrieval_config or {'top_k': 5,'similarity_threshold': 0.7}# 记录问答历史self.qa_history = []def add_knowledge_base(self, documents_dir: str) -> Dict:"""添加知识库文档Args:documents_dir: 文档目录路径Returns:处理结果统计"""print(f"开始处理知识库文档目录: {documents_dir}")start_time = time.time()# 处理文档document_chunks = self.doc_processor.process_documents(documents_dir)if not document_chunks:return {'success': False,'message': '没有找到可处理的文档','processed_docs': 0,'processing_time': 0}# 添加到向量数据库self.vector_store.add_documents(document_chunks)processing_time = time.time() - start_timereturn {'success': True,'message': '知识库添加成功','processed_docs': len(document_chunks),'processing_time': processing_time}def ask_question(self, question: str, include_sources: bool = True) -> Dict:"""回答问题Args:question: 用户问题include_sources: 是否包含来源信息Returns:问答结果"""start_time = time.time()# 检索相关文档print(f"检索问题相关文档: {question}")relevant_docs = self.vector_store.search_similar_documents(query=question,**self.retrieval_config)if not relevant_docs:result = {'question': question,'answer': '抱歉,我在知识库中没有找到与您问题相关的信息。请尝试换个问法或确认知识库是否包含相关内容。','sources': [],'retrieval_time': time.time() - start_time,'relevant_docs_count': 0}else:# 生成答案print(f"找到 {len(relevant_docs)} 个相关文档,正在生成答案...")generation_result = self.deepseek_client.generate_answer(question=question,context_documents=relevant_docs)# 准备来源信息sources = []if include_sources:for doc in relevant_docs:source_info = {'filename': doc['metadata'].get('filename', '未知'),'similarity': doc['similarity'],'text_preview': doc['text'][:200] + '...' if len(doc['text']) > 200 else doc['text']}sources.append(source_info)result = {'question': question,'answer': generation_result.get('answer', '生成答案时出现错误'),'sources': sources,'retrieval_time': time.time() - start_time,'response_time': generation_result.get('response_time', 0),'relevant_docs_count': len(relevant_docs),'model_usage': generation_result.get('usage', {}),'error': generation_result.get('error')}# 记录问答历史self.qa_history.append({'timestamp': time.time(),'question': question,'answer': result['answer'],'docs_count': result['relevant_docs_count']})return resultdef stream_ask_question(self, question: str):"""流式问答(用于实时显示)Args:question: 用户问题Yields:答案片段"""# 检索相关文档relevant_docs = self.vector_store.search_similar_documents(query=question,**self.retrieval_config)if not relevant_docs:yield "抱歉,我在知识库中没有找到与您问题相关的信息。"return# 流式生成答案for chunk in self.deepseek_client.stream_generate_answer(question=question,context_documents=relevant_docs):yield chunkdef get_statistics(self) -> Dict:"""获取系统统计信息Returns:统计信息字典"""vector_stats = self.vector_store.get_collection_stats()return {'knowledge_base': vector_stats,'qa_sessions': len(self.qa_history),'recent_questions': [qa['question'] for qa in self.qa_history[-10:]]}def clear_knowledge_base(self) -> None:"""清空知识库"""self.vector_store.clear_collection()print("知识库已清空")def export_qa_history(self, filename: str = None) -> str:"""导出问答历史Args:filename: 导出文件名Returns:导出文件路径"""if filename is None:filename = f"qa_history_{int(time.time())}.json"import jsonwith open(filename, 'w', encoding='utf-8') as f:json.dump(self.qa_history, f, ensure_ascii=False, indent=2)return filename

5. Web服务接口实现

5.1 FastAPI后端服务

# main.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import os
import tempfile
import shutil
from qa_engine import QAEngineapp = FastAPI(title="企业知识库问答API", version="1.0.0")# 配置CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 初始化问答引擎
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:raise ValueError("请设置DEEPSEEK_API_KEY环境变量")qa_engine = QAEngine(deepseek_api_key=DEEPSEEK_API_KEY)# 请求模型
class QuestionRequest(BaseModel):question: strinclude_sources: bool = Trueclass KnowledgeBaseRequest(BaseModel):documents_dir: str# 响应模型
class QuestionResponse(BaseModel):question: stranswer: strsources: List[dict] = []retrieval_time: floatresponse_time: float = 0relevant_docs_count: interror: Optional[str] = None@app.get("/")
async def root():"""根路径"""return {"message": "企业知识库问答API服务正在运行"}@app.post("/ask", response_model=QuestionResponse)
async def ask_question(request: QuestionRequest):"""问答接口"""try:result = qa_engine.ask_question(question=request.question,include_sources=request.include_sources)return QuestionResponse(**result)except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.post("/knowledge-base/add")
async def add_knowledge_base(request: KnowledgeBaseRequest):"""添加知识库"""try:if not os.path.exists(request.documents_dir):raise HTTPException(status_code=404, detail="文档目录不存在")result = qa_engine.add_knowledge_base(request.documents_dir)return resultexcept Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.post("/knowledge-base/upload")
async def upload_documents(files: List[UploadFile] = File(...)):"""上传文档到知识库"""try:# 创建临时目录temp_dir = tempfile.mkdtemp()# 保存上传的文件for file in files:file_path = os.path.join(temp_dir, file.filename)with open(file_path, "wb") as buffer:shutil.copyfileobj(file.file, buffer)# 处理文档result = qa_engine.add_knowledge_base(temp_dir)# 清理临时目录shutil.rmtree(temp_dir)return resultexcept Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.get("/statistics")
async def get_statistics():"""获取系统统计信息"""try:return qa_engine.get_statistics()except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.delete("/knowledge-base")
async def clear_knowledge_base():"""清空知识库"""try:qa_engine.clear_knowledge_base()return {"message": "知识库已清空"}except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.get("/health")
async def health_check():"""健康检查"""return {"status": "healthy", "service": "enterprise-qa-bot"}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

5.2 Streamlit前端界面

# streamlit_app.py
import streamlit as st
import requests
import json
import time
from typing import List, Dict# 配置页面
st.set_page_config(page_title="企业知识库问答机器人",page_icon="🤖",layout="wide",initial_sidebar_state="expanded"
)# API配置
API_BASE_URL = "http://localhost:8000"def init_session_state():"""初始化会话状态"""if 'chat_history' not in st.session_state:st.session_state.chat_history = []if 'knowledge_base_stats' not in st.session_state:st.session_state.knowledge_base_stats = {}def call_api(endpoint: str, method: str = "GET", data: dict = None, files: dict = None):"""调用API"""url = f"{API_BASE_URL}{endpoint}"try:if method == "GET":response = requests.get(url)elif method == "POST":if files:response = requests.post(url, files=files)else:response = requests.post(url, json=data)elif method == "DELETE":response = requests.delete(url)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:st.error(f"API调用失败: {str(e)}")return Nonedef display_chat_message(role: str, content: str, sources: List[Dict] = None):"""显示聊天消息"""with st.chat_message(role):st.write(content)# 显示来源信息if sources and role == "assistant":with st.expander("📚 参考来源", expanded=False):for i, source in enumerate(sources, 1):st.write(f"**来源 {i}**: {source['filename']}")st.write(f"**相似度**: {source['similarity']:.3f}")st.write(f"**内容预览**: {source['text_preview']}")st.write("---")def main():"""主函数"""init_session_state()# 标题st.title("🤖 企业知识库问答机器人")st.caption("基于DeepSeek-V3构建的智能问答系统")# 侧边栏 - 知识库管理with st.sidebar:st.header("📁 知识库管理")# 上传文档st.subheader("上传文档")uploaded_files = st.file_uploader("选择文档文件",type=['pdf', 'docx', 'txt', 'md'],accept_multiple_files=True,help="支持PDF、Word、文本和Markdown格式")if st.button("上传并处理文档", disabled=not uploaded_files):with st.spinner("正在处理文档..."):files = {"files": [("files", file) for file in uploaded_files]}result = call_api("/knowledge-base/upload", "POST", files=files)if result and result.get('success'):st.success(f"成功处理 {result['processed_docs']} 个文档块")st.info(f"处理时间: {result['processing_time']:.2f} 秒")else:st.error("文档处理失败")# 知识库统计st.subheader("知识库统计")if st.button("刷新统计信息"):stats = call_api("/statistics")if stats:st.session_state.knowledge_base_stats = statsif st.session_state.knowledge_base_stats:stats = st.session_state.knowledge_base_statskb_stats = stats.get('knowledge_base', {})st.metric("文档总数", kb_stats.get('total_documents', 0))st.metric("问答会话", stats.get('qa_sessions', 0))# 最近问题recent_questions = stats.get('recent_questions', [])if recent_questions:st.write("**最近问题:**")for q in recent_questions[-5:]:st.write(f"• {q}")# 清空知识库st.subheader("危险操作")if st.button("🗑️ 清空知识库", type="secondary"):if st.checkbox("确认清空知识库"):result = call_api("/knowledge-base", "DELETE")if result:st.success("知识库已清空")st.session_state.chat_history = []# 主界面 - 聊天st.header("💬 智能问答")# 显示聊天历史for message in st.session_state.chat_history:display_chat_message(message["role"], message["content"], message.get("sources"))# 问题输入if question := st.chat_input("请输入您的问题..."):# 显示用户问题display_chat_message("user", question)st.session_state.chat_history.append({"role": "user", "content": question})# 生成回答with st.chat_message("assistant"):with st.spinner("正在思考..."):# 调用问答APIresponse = call_api("/ask", "POST", {"question": question,"include_sources": True})if response:answer = response.get('answer', '生成答案时出现错误')sources = response.get('sources', [])# 显示答案st.write(answer)# 显示性能信息col1, col2, col3 = st.columns(3)with col1:st.metric("检索时间", f"{response.get('retrieval_time', 0):.2f}s")with col2:st.metric("生成时间", f"{response.get('response_time', 0):.2f}s")with col3:st.metric("相关文档", response.get('relevant_docs_count', 0))# 显示来源if sources:with st.expander("📚 参考来源", expanded=False):for i, source in enumerate(sources, 1):st.write(f"**来源 {i}**: {source['filename']}")st.write(f"**相似度**: {source['similarity']:.3f}")st.write(f"**内容预览**: {source['text_preview']}")st.write("---")# 保存到聊天历史st.session_state.chat_history.append({"role": "assistant","content": answer,"sources": sources})else:error_msg = "抱歉,服务暂时不可用,请稍后再试。"st.error(error_msg)st.session_state.chat_history.append({"role": "assistant","content": error_msg})# 底部信息st.markdown("---")st.markdown("💡 **使用提示**: ""1. 先上传相关文档到知识库 ""2. 然后就可以基于文档内容进行问答 ""3. 系统会自动检索最相关的内容来回答问题")if __name__ == "__main__":main()

6. 容器化部署

6.1 Docker配置

创建Dockerfile用于容器化部署:

# Dockerfile
FROM python:3.11-slim# 设置工作目录
WORKDIR /app# 安装系统依赖
RUN apt-get update && apt-get install -y \gcc \g++ \curl \&& rm -rf /var/lib/apt/lists/*# 复制依赖文件
COPY requirements.txt .# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码
COPY . .# 创建必要的目录
RUN mkdir -p /app/chroma_db /app/uploads /app/logs# 暴露端口
EXPOSE 8000 8501# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \CMD curl -f http://localhost:8000/health || exit 1# 启动脚本
COPY start.sh /start.sh
RUN chmod +x /start.shCMD ["/start.sh"]

启动脚本:

#!/bin/bash
# start.shecho "启动企业知识库问答机器人..."# 启动FastAPI后端服务
echo "启动API服务..."
uvicorn main:app --host 0.0.0.0 --port 8000 &# 等待API服务启动
sleep 10# 启动Streamlit前端
echo "启动前端界面..."
streamlit run streamlit_app.py --server.port 8501 --server.address 0.0.0.0 &# 保持容器运行
wait

6.2 Docker Compose配置

# docker-compose.yml
version: '3.8'services:qa-bot:build: .container_name: enterprise-qa-botports:- "8000:8000"  # API服务- "8501:8501"  # Streamlit界面environment:- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}volumes:- ./data:/app/data- ./chroma_db:/app/chroma_db- ./logs:/app/logsrestart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost:8000/health"]interval: 30stimeout: 10sretries: 3nginx:image: nginx:alpinecontainer_name: qa-bot-nginxports:- "80:80"- "443:443"volumes:- ./nginx.conf:/etc/nginx/nginx.conf- ./ssl:/etc/nginx/ssldepends_on:- qa-botrestart: unless-stoppedvolumes:chroma_db:logs:

7. 系统优化与监控

7.1 性能优化策略

为了提高系统性能,我们可以实施以下优化策略:

# performance_optimizer.py
import functools
import time
from typing import Dict, List
import threading
from concurrent.futures import ThreadPoolExecutorclass PerformanceOptimizer:"""性能优化器提供缓存、批处理、异步处理等优化功能"""def __init__(self):self.cache = {}self.cache_lock = threading.Lock()self.max_cache_size = 1000self.cache_ttl = 3600  # 1小时def cache_result(self, ttl: int = None):"""结果缓存装饰器Args:ttl: 缓存存活时间(秒)"""def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 生成缓存键cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"with self.cache_lock:# 检查缓存if cache_key in self.cache:cached_result, timestamp = self.cache[cache_key]cache_age = time.time() - timestampif cache_age < (ttl or self.cache_ttl):print(f"缓存命中: {func.__name__}")return cached_resultelse:# 缓存过期,删除del self.cache[cache_key]# 执行函数result = func(*args, **kwargs)with self.cache_lock:# 清理过期缓存if len(self.cache) >= self.max_cache_size:self._cleanup_cache()# 存储结果self.cache[cache_key] = (result, time.time())return resultreturn wrapperreturn decoratordef _cleanup_cache(self):"""清理过期缓存"""current_time = time.time()expired_keys = []for key, (_, timestamp) in self.cache.items():if current_time - timestamp > self.cache_ttl:expired_keys.append(key)for key in expired_keys:del self.cache[key]print(f"清理了 {len(expired_keys)} 个过期缓存项")# 应用性能优化
optimizer = PerformanceOptimizer()# 在QAEngine中应用缓存
class OptimizedQAEngine(QAEngine):"""优化版问答引擎"""@optimizer.cache_result(ttl=1800)  # 30分钟缓存def ask_question(self, question: str, include_sources: bool = True) -> Dict:"""带缓存的问答方法"""return super().ask_question(question, include_sources)def batch_ask_questions(self, questions: List[str]) -> List[Dict]:"""批量问答"""with ThreadPoolExecutor(max_workers=4) as executor:futures = [executor.submit(self.ask_question, q) for q in questions]results = [future.result() for future in futures]return results

7.2 监控与日志系统

# monitoring.py
import logging
import time
import psutil
import threading
from typing import Dict
from datetime import datetime
import jsonclass SystemMonitor:"""系统监控类监控API性能、资源使用情况等"""def __init__(self, log_file: str = "system_monitor.log"):self.log_file = log_fileself.setup_logging()self.metrics = {'api_calls': 0,'successful_queries': 0,'failed_queries': 0,'total_response_time': 0,'avg_response_time': 0,'cache_hits': 0,'cache_misses': 0}self.metrics_lock = threading.Lock()# 启动监控线程self.monitor_thread = threading.Thread(target=self._monitor_system, daemon=True)self.monitor_thread.start()def setup_logging(self):"""设置日志记录"""logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(self.log_file, encoding='utf-8'),logging.StreamHandler()])self.logger = logging.getLogger('SystemMonitor')def log_api_call(self, endpoint: str, response_time: float, success: bool):"""记录API调用"""with self.metrics_lock:self.metrics['api_calls'] += 1self.metrics['total_response_time'] += response_timeself.metrics['avg_response_time'] = (self.metrics['total_response_time'] / self.metrics['api_calls'])if success:self.metrics['successful_queries'] += 1else:self.metrics['failed_queries'] += 1self.logger.info(f"API调用 - 端点: {endpoint}, "f"响应时间: {response_time:.3f}s, "f"成功: {success}")def log_cache_event(self, event_type: str):"""记录缓存事件"""with self.metrics_lock:if event_type == 'hit':self.metrics['cache_hits'] += 1elif event_type == 'miss':self.metrics['cache_misses'] += 1def get_system_metrics(self) -> Dict:"""获取系统指标"""cpu_percent = psutil.cpu_percent(interval=1)memory = psutil.virtual_memory()disk = psutil.disk_usage('/')with self.metrics_lock:metrics = self.metrics.copy()return {'timestamp': datetime.now().isoformat(),'system': {'cpu_percent': cpu_percent,'memory_percent': memory.percent,'memory_used_gb': memory.used / (1024**3),'disk_percent': disk.percent,'disk_free_gb': disk.free / (1024**3)},'application': metrics}def _monitor_system(self):"""系统监控后台线程"""while True:try:metrics = self.get_system_metrics()# 记录关键指标if metrics['system']['cpu_percent'] > 80:self.logger.warning(f"CPU使用率过高: {metrics['system']['cpu_percent']:.1f}%")if metrics['system']['memory_percent'] > 85:self.logger.warning(f"内存使用率过高: {metrics['system']['memory_percent']:.1f}%")# 每5分钟记录一次详细指标self.logger.info(f"系统指标: {json.dumps(metrics, ensure_ascii=False, indent=2)}")time.sleep(300)  # 5分钟except Exception as e:self.logger.error(f"监控系统时出错: {str(e)}")time.sleep(60)# 全局监控实例
monitor = SystemMonitor()

8. 测试与验证

8.1 单元测试

# test_qa_engine.py
import unittest
import tempfile
import os
from qa_engine import QAEngine
from document_processor import DocumentProcessor
from vector_store import VectorStoreclass TestQAEngine(unittest.TestCase):"""问答引擎测试类"""def setUp(self):"""测试前设置"""self.test_api_key = "test_api_key"self.temp_dir = tempfile.mkdtemp()# 创建测试文档test_doc_path = os.path.join(self.temp_dir, "test_doc.txt")with open(test_doc_path, 'w', encoding='utf-8') as f:f.write("""人工智能(AI)是计算机科学的一个分支,它试图理解智能的本质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。""")def tearDown(self):"""测试后清理"""import shutilshutil.rmtree(self.temp_dir)def test_document_processing(self):"""测试文档处理功能"""processor = DocumentProcessor()chunks = processor.process_documents(self.temp_dir)self.assertGreater(len(chunks), 0)self.assertIn('text', chunks[0])self.assertIn('metadata', chunks[0])def test_vector_store(self):"""测试向量数据库功能"""vector_store = VectorStore(persist_directory=tempfile.mkdtemp())# 测试文档test_docs = [{'id': 'doc1','text': '人工智能是计算机科学的分支','metadata': {'source': 'test'}}]# 添加文档vector_store.add_documents(test_docs)# 搜索文档results = vector_store.search_similar_documents("人工智能", top_k=1)self.assertGreater(len(results), 0)self.assertIn('人工智能', results[0]['text'])if __name__ == '__main__':unittest.main()

8.2 性能测试

# performance_test.py
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import requestsclass PerformanceTest:"""性能测试类"""def __init__(self, api_base_url: str = "http://localhost:8000"):self.api_base_url = api_base_urlself.test_questions = ["什么是人工智能?","机器学习的主要方法有哪些?","深度学习和机器学习的区别是什么?","自然语言处理包括哪些技术?","计算机视觉的应用领域有哪些?"]def test_single_request(self, question: str) -> float:"""测试单个请求的响应时间"""start_time = time.time()try:response = requests.post(f"{self.api_base_url}/ask",json={"question": question, "include_sources": True},timeout=30)response.raise_for_status()end_time = time.time()return end_time - start_timeexcept Exception as e:print(f"请求失败: {str(e)}")return -1def test_concurrent_requests(self, num_threads: int = 5, num_requests: int = 20):"""测试并发请求性能"""print(f"开始并发测试 - {num_threads} 线程, {num_requests} 请求")questions = (self.test_questions * (num_requests // len(self.test_questions) + 1))[:num_requests]response_times = []with ThreadPoolExecutor(max_workers=num_threads) as executor:futures = [executor.submit(self.test_single_request, q) for q in questions]for future in futures:response_time = future.result()if response_time > 0:response_times.append(response_time)if response_times:avg_time = statistics.mean(response_times)median_time = statistics.median(response_times)p95_time = sorted(response_times)[int(len(response_times) * 0.95)]print(f"性能测试结果:")print(f"- 成功请求: {len(response_times)}/{num_requests}")print(f"- 平均响应时间: {avg_time:.3f}s")print(f"- 中位数响应时间: {median_time:.3f}s")print(f"- 95%分位数响应时间: {p95_time:.3f}s")print(f"- 最快响应时间: {min(response_times):.3f}s")print(f"- 最慢响应时间: {max(response_times):.3f}s")else:print("所有请求都失败了")if __name__ == "__main__":tester = PerformanceTest()tester.test_concurrent_requests(num_threads=3, num_requests=15)

9. 部署指南

9.1 生产环境部署

#!/bin/bash
# deploy.sh - 生产环境部署脚本echo "开始部署企业知识库问答机器人..."# 1. 创建项目目录
PROJECT_DIR="/opt/enterprise-qa-bot"
sudo mkdir -p $PROJECT_DIR
cd $PROJECT_DIR# 2. 克隆代码或复制文件
# git clone https://github.com/your-repo/enterprise-qa-bot.git .# 3. 设置环境变量
cat > .env << EOF
DEEPSEEK_API_KEY=your_deepseek_api_key_here
ENVIRONMENT=production
LOG_LEVEL=INFO
EOF# 4. 构建Docker镜像
echo "构建Docker镜像..."
docker-compose build# 5. 启动服务
echo "启动服务..."
docker-compose up -d# 6. 等待服务启动
echo "等待服务启动..."
sleep 30# 7. 健康检查
echo "执行健康检查..."
if curl -f http://localhost:8000/health; thenecho "✅ API服务启动成功"
elseecho "❌ API服务启动失败"exit 1
fiif curl -f http://localhost:8501; thenecho "✅ 前端服务启动成功"
elseecho "❌ 前端服务启动失败"exit 1
fiecho "🎉 部署完成!"
echo "API服务地址: http://your-server:8000"
echo "前端界面地址: http://your-server:8501"

9.2 Nginx配置

# nginx.conf
events {worker_connections 1024;
}http {upstream api_backend {server qa-bot:8000;}upstream frontend_backend {server qa-bot:8501;}server {listen 80;server_name your-domain.com;# API代理location /api/ {proxy_pass http://api_backend/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 超时设置proxy_connect_timeout 60s;proxy_send_timeout 60s;proxy_read_timeout 60s;}# 前端代理location / {proxy_pass http://frontend_backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# WebSocket支持(Streamlit需要)proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# 静态文件缓存location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg)$ {expires 1y;add_header Cache-Control "public, immutable";}}
}

10. 总结与展望

10.1 项目总结

通过本项目的实战开发,我们成功构建了一个基于DeepSeek-V3的企业知识库问答机器人。该系统具有以下特点:

技术优势:

  • 先进的语言模型: 采用DeepSeek-V3,具备强大的中文理解和生成能力
  • 高效的向量检索: 基于Sentence-Transformers和ChromaDB实现精准的语义搜索
  • 模块化架构: 清晰的代码结构,便于维护和扩展
  • 容器化部署: 支持Docker部署,便于环境管理和扩容

功能特性:

  • 多格式文档支持: 支持PDF、Word、文本等多种文档格式
  • 智能文档分割: 基于token数量的智能文本分块
  • 实时问答: 支持流式输出,提供良好的用户体验
  • 来源追溯: 提供答案来源信息,增强可信度

性能优化:

  • 结果缓存: 减少重复计算,提高响应速度
  • 并发处理: 支持多线程处理,提高系统吞吐量
  • 监控告警: 完善的监控体系,保障系统稳定运行

10.2 使用效果分析

基于实际测试,系统在以下方面表现优秀:

图3: 系统性能指标统计图

10.3 未来改进方向

  1. 多模态支持: 增加图片、表格等多模态内容的理解能力
  2. 知识图谱集成: 结合知识图谱技术,提供更加结构化的知识服务
  3. 个性化推荐: 基于用户历史行为,提供个性化的问题推荐
  4. 实时学习: 支持在线学习新知识,持续优化回答质量
  5. 多语言支持: 扩展对多种语言的支持能力

10.4 技术要点回顾

在本项目中,我们重点运用了以下技术:

  • 华为云Flexus: 提供稳定可靠的云计算基础设施
  • DeepSeek-V3: 提供强大的自然语言理解和生成能力
  • 向量数据库: 实现高效的语义检索
  • RAG架构: 结合检索和生成,提供准确的知识问答
  • 微服务架构: 实现系统的模块化和可扩展性

通过本项目的实践,我们深入理解了企业级AI应用的开发流程,掌握了从需求分析、架构设计、代码实现到部署运维的完整技术栈。这为后续开发更复杂的AI应用奠定了坚实的基础。

参考文献

  1. DeepSeek官方文档
  2. 华为云Flexus产品介绍
  3. ChromaDB官方文档
  4. Sentence-Transformers文档
  5. FastAPI官方文档
  6. Streamlit官方文档
  7. RAG技术原理与实践
  8. 企业级AI应用开发最佳实践

相关文章:

  • 【JavaEE】万字详解HTTP协议
  • 神经网络-Day45
  • centos部署k8s v1.33版本
  • 亚马逊AWS云服务器高效使用指南:最大限度降低成本的实战策略
  • iOS 项目怎么构建稳定性保障机制?一次系统性防错经验分享(含 KeyMob 工具应用)
  • 从零搭建到 App Store 上架:跨平台开发者使用 Appuploader与其他工具的实战经验
  • element-plus 单选组件 el-radio,选不上,又没报错,直接复制官网也不行解决方案
  • Jenkins自动化部署Maven项目
  • React Router 中 navigate 后浏览器返回按钮不起作用的问题记录
  • vue对axios的封装和使用
  • React从基础入门到高级实战:React 实战项目 - 项目二:电商平台前端
  • 网页前端开发(基础进阶4--axios)
  • 智能照明系统:具备认知能力的“光神经网络”
  • .net ORM框架dapper批量插入
  • Socket编程UDP\TCP
  • 智慧赋能:移动充电桩的能源供给革命与便捷服务升级
  • 数字孪生在建设智慧城市中可以起到哪些作用或帮助?
  • SpringBoot+Mysql实现的停车场收费小程序系统+文档
  • 基于 TensorFlow 2 的 WGAN来生成表格数据、数值数据和序列数据。 WGAN生成对抗网络。代码仅供参考
  • 【Java工程师面试全攻略】Day5:MySQL数据库面试精要
  • 一个公司做多个网站是好还是坏/广东seo推广外包
  • 云南做网站报价/国际军事新闻最新消息
  • 南通网站建设价格/最近有哪些新闻
  • 自己网站上做支付宝怎么收费的/前端seo是什么
  • 东莞网上做公司网站/seo服务指什么意思
  • 厦门酒店团购网站建设/推广赚钱的平台有哪些