"""
Created on Sun Oct 12 21:09:04 2025@author: 18309
"""import requests
import json
import os
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s'
)
RAGFLOW_SERVER = "http://IP:端口"
API_KEY = "ragflow-U5N************Mm"
HEADERS = {"Authorization": f"Bearer {API_KEY}","Content-Type": "application/json"
}import os
def get_all_filenames(folder_path):"""获取指定文件夹下的所有文件名称参数:folder_path: 目标文件夹路径返回:字典,包含文件列表和子文件夹列表"""if not os.path.exists(folder_path):print(f"错误:文件夹 '{folder_path}' 不存在")return Noneif not os.path.isdir(folder_path):print(f"错误:'{folder_path}' 不是一个文件夹")return Nonefile_names = []subfolder_names = []for entry in os.listdir(folder_path):entry_full_path = os.path.join(folder_path, entry)if os.path.isfile(entry_full_path):file_names.append(entry)elif os.path.isdir(entry_full_path):subfolder_names.append(entry)return {"files": file_names,"subfolders": subfolder_names}
def get_knowledge_bases():"""获取所有知识库列表"""url = f"{RAGFLOW_SERVER}/api/v1/datasets"try:response = requests.get(url, headers=HEADERS)response.raise_for_status()result = response.json()if isinstance(result, list):return resultelif isinstance(result, dict) and "data" in result:return result["data"]else:logging.error("知识库列表响应结构不符合预期")return []except requests.exceptions.RequestException as e:logging.error(f"获取知识库失败: {str(e)}")return []
def create_knowledge_base(name, description=""):"""创建新的知识库,返回知识库ID或None"""url = f"{RAGFLOW_SERVER}/api/v1/datasets"payload = {"name": name,"description": description}try:response = requests.post(url,headers=HEADERS,data=json.dumps(payload))if response.status_code == 200:result = response.json()if "id" in result:return result["id"]elif "data" in result and "id" in result["data"]:return result["data"]["id"]else:logging.error("创建知识库响应结构异常")return Noneelif response.status_code == 409:logging.error(f"知识库 '{name}' 已存在")return Noneelse:logging.error(f"创建失败,状态码: {response.status_code}, 响应: {response.text}")return Noneexcept requests.exceptions.RequestException as e:logging.error(f"创建知识库请求异常: {str(e)}")return None
def upload_and_parse_file(kb_id, file_path, language="zh"):"""上传文件到指定知识库并解析,返回文档ID或None"""if not os.path.exists(file_path):logging.error(f"文件不存在: {file_path}")return Noneif not os.path.isfile(file_path):logging.error(f"{file_path} 不是有效的文件")return Noneurl = f"{RAGFLOW_SERVER}/api/v1/datasets/{kb_id}/documents"upload_headers = {"Authorization": f"Bearer {API_KEY}"}try:with open(file_path, 'rb') as f:files = {"file": (os.path.basename(file_path), f)}data = {"language": language}response = requests.post(url,headers=upload_headers,files=files,data=data)response.raise_for_status()result = response.json()if "id" in result:return result["id"]elif "data" in result and isinstance(result["data"], list) and len(result["data"]) > 0:return result["data"][0]["id"]else:logging.error("文件上传响应结构不符合预期")return Noneexcept requests.exceptions.RequestException as e:logging.error(f"文件上传失败: {str(e)}")return Noneexcept Exception as e:logging.error(f"处理文件时出错: {str(e)}")return None
if __name__ == "__main__":target_folder = "D:/WORK/目标文件" result = get_all_filenames(target_folder)file_lst = result['files']logging.info("获取所有知识库...")knowledge_bases = get_knowledge_bases()if knowledge_bases:logging.info(f"发现 {len(knowledge_bases)} 个知识库:")for kb in knowledge_bases:logging.info(f" - 名称: {kb.get('name')}, ID: {kb.get('id')}")else:logging.info("没有找到现有知识库")target_kb_name = "【成果库】-test"target_kb_id = Noneif knowledge_bases:for kb in knowledge_bases:if kb.get('name') == target_kb_name:target_kb_id = kb.get('id')breakif not target_kb_id:logging.info(f"创建新知识库: {target_kb_name}")target_kb_id = create_knowledge_base(name=target_kb_name,description="存储科技项目历年成果")if not target_kb_id:logging.error("无法创建目标知识库,程序退出")exit(1)else:logging.info(f"知识库创建成功,ID: {target_kb_id}")file_full_path = []for file in file_lst:file_path = target_folder + '/' + filefile_full_path.append(file_path)files_to_upload = file_full_pathfor file_path in files_to_upload:if os.path.exists(file_path):logging.info(f"正在上传并解析: {file_path}")doc_id = upload_and_parse_file(target_kb_id, file_path)if doc_id:logging.info(f"文件处理成功,文档ID: {doc_id}")else:logging.warning(f"文件 {file_path} 处理失败")else:logging.warning(f"文件 {file_path} 不存在,跳过")logging.info("所有操作完成")