当前位置: 首页 > news >正文

深度剖析:Dify+Sanic+Vue+ECharts 搭建 Text2SQL 项目 sanic-web 的 Debug 实战

目录

  • 项目背景介绍
    • sanic-web
  • Dify\_service handle\_think\_tag报错NoneType
    • 问题描述
    • debug
  • Dify调用不成功,一直转圈圈
    • 问题描述
    • debug
  • 前端markdown格式只显示前5页
    • 问题描述
    • debug
      • 1. 修改代码
      • 2.重新构建1.1.3镜像
      • 3.更新sanic-web/docker/docker-compose.yaml
      • 4. 重新部署
  • Dify超时60秒,服务器报错
    • 问题描述
    • debug

项目背景介绍

sanic-web

项目地址:https://github.com/apconw/sanic-web

一个轻量级、支持全链路且易于二次开发的大模型应用项目(Large Model Data Assistant) 支持DeepSeek/Qwen2.5等大模型 基于 Dify 、Ollama&Vllm、Sanic 和 Text2SQL 📊 等技术构建的一站式大模型应用开发项目,采用 Vue3、TypeScript 和 Vite 5 打造现代UI。它支持通过 ECharts 📈 实现基于大模型的数据图形化问答,具备处理 CSV 文件 📂 表格问答的能力。同时,能方便对接第三方开源 RAG 系统 检索系统 🌐等,以支持广泛的通用知识问答。

这个项目可以作为text2sql的经典案例,通过自然语言来访问业务数据库,最终使用echarts图表可视化展示分析数据。使用了独立开发的web页面,对于前端小伙伴来说也比较友好,这完全可以作为一个AI智能助手的Demo实现。
在这里插入图片描述

Dify_service handle_think_tag报错NoneType

问题描述

在这里插入图片描述

debug

修改services/dify_service.py/handle_think_tag代码,如下:

    @staticmethodasync def handle_think_tag(answer):"""处理<think>标签内的内容:param answer""""""处理<think>标签内的内容,或JSON格式的thoughts字段:param answer"""think_content = ""remaining_content = answer# 会遇到answer可能不能解析到,先尝试解析为JSONtry:data = json.loads(answer)if isinstance(data, dict) and "thoughts" in data:think_content = data["thoughts"]remaining_content = answerreturn think_content, remaining_contentexcept Exception:pass# 再尝试正则提取<think>标签match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)if match:think_content = match.group(1)remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()return think_content, remaining_content# 如果都没有,返回空return "", answer

Dify调用不成功,一直转圈圈

问题描述

在这里插入图片描述

debug

检查本地dify的端口号,修改sanic-web/docker/docker-compose.yaml中dify端口号到本地端口号,比如原端口号是18000,修改成80。

  chat-service:image: apconw/sanic-web:1.1.2container_name: sanic-webenvironment:- ENV=test- DIFY_SERVER_URL=http://host.docker.internal:80- DIFY_DATABASE_QA_API_KEY=app-AXDUw8TtcY7N6TMGHkPaC4VF- MINIO_ENDPOINT=host.docker.internal:19000- MINIO_ACCESS_KEY=sIR5eeDkiwoo779yNJbw- MiNIO_SECRET_KEY=MreuQ3aC1ymHJeo3QfzSg7aPz7PqlxeOw39nZUdEports:- "8088:8088"extra_hosts:- "host.docker.internal:host-gateway"

前端markdown格式只显示前5页

问题描述

在这里插入图片描述

debug

1. 修改代码

修改web/src/components/MarkdownPreview/MarkdownTable.vue第39行-40行代码,将:data="pagedTableData"改为:data=“tableData”,并移除:pagination="pagination"属性:

<template><div style="background-color: #ffffff"><n-cardtitle="表格"embeddedbordered:content-style="{ 'background-color': '#ffffff' }":header-style="{color: '#26244c',height: '10px','background-color': '#f0effe','text-align': 'left','font-size': '14px','font-family': 'PMingLiU'}":footer-style="{color: '#666','background-color': '#ffffff','text-align': 'left','font-size': '14px','font-family': 'PMingLiU'}"><divstyle="display: flex;justify-content: space-between;margin-bottom: 10px;"></div><n-data-tablestyle="height: 550px;width: 850px;margin: 0px 10px;background-color: #ffffff;":columns="columns":data="tableData":max-height="550"virtual-scrollvirtual-scroll-x:scroll-x="scrollX":min-row-height="minRowHeight":height-for-row="heightForRow"virtual-scroll-header:header-height="48"/><template #footer>数据来源: 大模型生成的数据, 以上信息仅供参考</template></n-card></div>
</template>

2.重新构建1.1.3镜像

# 进入web的docker目录
cd docker
# 查看原始Dockerfile ,这步也可以省略
cat Dockerfile
# 使用原始Dockerfile构建新镜像 
docker build -t apconw/chat-vue3-mvp:1.1.3 -f Dockerfile ..

3.更新sanic-web/docker/docker-compose.yaml

services:chat-web:image: apconw/chat-vue3-mvp:1.1.3  # 更新为新版本container_name: chat-vue3-mvpports:- "8081:80"extra_hosts:- "host.docker.internal:host-gateway"depends_on:- chat-service

4. 重新部署

docker-compose down
docker-compose up -d

Dify超时60秒,服务器报错

问题描述

2025/05/09 08:16:19 [error] 20#20: *1 upstream timed out (110: Operation timed out) while reading response header from upstream, client: 192.168.65.1, server: localhost, request: “POST /sanic/dify/get_answer HTTP/1.1”, upstream: “http://192.168.65.254:8088/dify/get_answer”, host: “localhost:8081”, referrer: “http://localhost:8081/chat”


192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_answer HTTP/1.1” 504 497 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”


192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_dify_suggested HTTP/1.1” 200 63 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”

debug

修改dify_service.py中的DiFyRequest,添加心跳机制:

  1. 添加心跳机制:
  • 新增send_heartbeat方法发送SSE心跳
  • 在处理请求过程中每10秒发送一次心跳
  • 在开始、结束和错误处理时也发送心跳
  1. 增强错误处理:
  • 添加send_error_message方法,统一错误信息发送
  • 完善异常处理,确保错误信息能发送到客户端
  1. 增加超时时间:
  • 将aiohttp的超时设置从2分钟增加到5分钟
  1. 确保连接正确关闭:
  • 在finally块中确保发送最后的心跳消息
  • 确保调用res_end方法正确关闭连接

class DiFyRequest:"""DiFy操作服务类"""def __init__(self):passasync def exec_query(self, res):"""执行查询并处理流式响应"""try:# 获取请求体内容 从res流对象获取request-bodyreq_body_content = res.request.body# 将字节流解码为字符串body_str = req_body_content.decode("utf-8")req_obj = json.loads(body_str)logging.info(f"query param: {body_str}")# str(uuid.uuid4())chat_id = req_obj.get("chat_id")qa_type = req_obj.get("qa_type")#  使用正则表达式移除所有空白字符(包括空格、制表符、换行符等)query = req_obj.get("query")cleaned_query = re.sub(r"\s+", "", query)# 获取登录用户信息token = res.request.headers.get("Authorization")if not token:raise MyException(SysCodeEnum.c_401)if token.startswith("Bearer "):token = token.split(" ")[1]# 封装问答上下文信息qa_context = QaContext(token, cleaned_query, chat_id)# 判断请求类别app_key = self._get_authorization_token(qa_type)# 构建请求参数dify_service_url, body_params, headers = self._build_request(chat_id, cleaned_query, app_key, qa_type)# 收集流式输出结果t02_answer_data = []# 收集业务数据流式输出结果t04_answer_data = {}# 发送初始连接消息await self.send_heartbeat(res, "开始处理请求")# 心跳计时器last_heartbeat = time.time()async with aiohttp.ClientSession(read_bufsize=1024 * 16) as session:async with session.post(dify_service_url,headers=headers,json=body_params,timeout=aiohttp.ClientTimeout(total=60 * 5),  # 增加到5分钟超时) as response:logging.info(f"dify response status: {response.status}")if response.status == 200:data_type = ""bus_data = ""while True:# 发送心跳保持连接current_time = time.time()if current_time - last_heartbeat > 10:  # 每10秒发送一次心跳await self.send_heartbeat(res, "处理中...")last_heartbeat = current_timereader = response.contentreader._high_water = 10 * 1024 * 1024  # 设置为10MBchunk = await reader.readline()if not chunk:# 发送最后的心跳await self.send_heartbeat(res, "读取数据完成")breakstr_chunk = chunk.decode("utf-8")# 处理数据块if str_chunk.startswith("data"):# 更新最后心跳时间last_heartbeat = time.time()str_data = str_chunk[5:]data_json = json.loads(str_data)event_name = data_json.get("event")conversation_id = data_json.get("conversation_id")message_id = data_json.get("message_id")task_id = data_json.get("task_id")# 处理消息事件...# 这里保留原有的事件处理逻辑if DiFyCodeEnum.MESSAGE.value[0] == event_name:answer = data_json.get("answer")if answer and answer.startswith("dify_"):event_list = answer.split("_")if event_list[1] == "0":# 输出开始data_type = event_list[2]if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "begin"}, "dataType": data_type},)elif event_list[1] == "1":# 输出结束data_type = event_list[2]if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "end"}, "dataType": data_type},)# 输出业务数据elif bus_data and data_type == DataTypeEnum.BUS_DATA.value[0]:res_data = process(json.loads(bus_data)["data"])await self.send_message(res,answer,{"data": res_data, "dataType": data_type},)t04_answer_data = {"data": res_data, "dataType": data_type}data_type = ""elif len(data_type) > 0:# 这里输出 t02之间的内容if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "continue", "content": answer}, "dataType": data_type},)t02_answer_data.append(answer)# 这里设置业务数据if data_type == DataTypeEnum.BUS_DATA.value[0]:bus_data = answerelif DiFyCodeEnum.MESSAGE_ERROR.value[0] == event_name:# 输出异常情况日志error_msg = data_json.get("message")logging.error(f"Error 调用dify失败错误信息: {data_json}")await res.write("data:"+ json.dumps({"data": {"messageType": "error", "content": "调用失败请查看dify日志,错误信息: " + error_msg},"dataType": DataTypeEnum.ANSWER.value[0],},ensure_ascii=False,)+ "\n\n")elif DiFyCodeEnum.MESSAGE_END.value[0] == event_name:t02_message_json = {"data": {"messageType": "continue", "content": "".join(t02_answer_data)},"dataType": DataTypeEnum.ANSWER.value[0],}print(t02_message_json)if t02_message_json:await self._save_message(t02_message_json, qa_context, conversation_id, message_id, task_id, qa_type)if t04_answer_data:await self._save_message(t04_answer_data, qa_context, conversation_id, message_id, task_id, qa_type)t02_answer_data = []t04_answer_data = {}except Exception as e:logging.error(f"Error during get_answer: {e}")traceback.print_exception(e)# 发送错误信息await self.send_error_message(res, f"处理请求出错: {str(e)}")return {"error": str(e)}  # 返回错误信息作为字典finally:# 确保连接正确关闭await self.send_heartbeat(res, "请求处理完成")await self.res_end(res)async def send_heartbeat(self, res, message="心跳"):"""发送心跳信息保持连接活跃"""try:await res.write(f"data:{json.dumps({'heartbeat': True, 'message': message}, ensure_ascii=False)}\n\n")except Exception as e:logging.error(f"发送心跳失败: {e}")async def send_error_message(self, res, error_message):"""发送错误信息"""try:await res.write("data:"+ json.dumps({"data": {"messageType": "error", "content": error_message},"dataType": DataTypeEnum.ANSWER.value[0],},ensure_ascii=False,)+ "\n\n")except Exception as e:logging.error(f"发送错误信息失败: {e}")@staticmethodasync def handle_think_tag(answer):"""处理<think>标签内的内容:param answer""""""处理<think>标签内的内容,或JSON格式的thoughts字段:param answer"""think_content = ""remaining_content = answer# 会遇到answer可能不能解析到,先尝试解析为JSONtry:data = json.loads(answer)if isinstance(data, dict) and "thoughts" in data:think_content = data["thoughts"]remaining_content = answerreturn think_content, remaining_contentexcept Exception:pass# 再尝试正则提取<think>标签match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)if match:think_content = match.group(1)remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()return think_content, remaining_content# 如果都没有,返回空return "", answer@staticmethodasync def _save_message(message, qa_context, conversation_id, message_id, task_id, qa_type):"""保存消息记录并发送SSE数据:param message::param qa_context::param conversation_id::param message_id::param task_id::param qa_type::return:"""# 保存用户问答记录 1.保存用户问题 2.保存用户答案 t02 和 t04if "content" in message["data"]:await add_question_record(qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, message, "", qa_type)elif message["dataType"] == DataTypeEnum.BUS_DATA.value[0]:await add_question_record(qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, "", message, qa_type)async def send_message(self, response, answer, message):"""SSE 格式发送数据,每一行以 data: 开头"""if answer.lstrip().startswith("<think>"):# 处理deepseek模型思考过程样式think_content, remaining_content = await self.handle_think_tag(answer)# 发送<think>标签内的内容message = {"data": {"messageType": "continue", "content": "> " + think_content.replace("\n", "") + "\n\n" + remaining_content},"dataType": "t02",}await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")else:await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")@staticmethodasync def res_begin(res, chat_id):""":param res::param chat_id::return:"""await res.write("data:"+ json.dumps({"data": {"id": chat_id},"dataType": DataTypeEnum.TASK_ID.value[0],})+ "\n\n")@staticmethodasync def res_end(res):""":param res::return:"""await res.write("data:"+ json.dumps({"data": "DONE","dataType": DataTypeEnum.STREAM_END.value[0],})+ "\n\n")@staticmethoddef _build_request(chat_id, query, app_key, qa_type):"""构建请求参数:param chat_id: 对话id:param app_key: api key:param query: 用户问题:param qa_type: 问答类型:return:"""# 通用问答时,使用上次会话id 实现多轮对话效果conversation_id = ""if qa_type == DiFyAppEnum.COMMON_QA.value[0]:qa_record = query_user_qa_record(chat_id)if qa_record and len(qa_record) > 0:conversation_id = qa_record[0]["conversation_id"]body_params = {"query": query,"inputs": {"qa_type": qa_type},"response_mode": "streaming","conversation_id": conversation_id,"user": "abc-123",}headers = {"Content-Type": "application/json","Authorization": f"Bearer {app_key}",}dify_service_url = DiFyRestApi.build_url(DiFyRestApi.DIFY_REST_CHAT)return dify_service_url, body_params, headers@staticmethoddef _get_authorization_token(qa_type: str):"""根据请求类别获取api/token固定走一个dify流app-IzudxfuN8uO2bvuCpUHpWhvH master分支默认的数据问答key:param qa_type:return:"""# 遍历枚举成员并检查第一个元素是否与测试字符串匹配for member in DiFyAppEnum:if member.value[0] == qa_type:return os.getenv("DIFY_DATABASE_QA_API_KEY")else:raise ValueError(f"问答类型 '{qa_type}' 不支持")

相关文章:

  • 【Unity】用事件广播的方式实现游戏暂停,简单且实用!
  • 单元化架构
  • <PLC><视觉><机器人>基于海康威视视觉检测和UR机械臂,如何实现N点标定?
  • IEEE 列表会议第五届机器人、自动化与智能控制国际会议
  • 软件架构风格系列(4):事件驱动架构
  • Python打卡 DAY 27
  • 大模型在数据分析领域的研究综述
  • CSS:颜色的三种表示方式
  • 学习以任务为中心的潜动作,随地采取行动
  • Servlet 深度解析:生命周期、请求响应与状态管理
  • PCIe数据采集系统详解
  • JAVA:线程调度器与时间分片的技术指南
  • 数据通信原理 光纤通信 期末速成
  • Android minSdk从21升级24后SO库异常
  • linux防火墙
  • 单序列双指针---初阶篇
  • 原生小程序+springboot+vue+协同过滤算法的音乐推荐系统(源码+论文+讲解+安装+部署+调试)
  • 文件上传Ⅲ
  • css:倒影倾斜效果
  • OpenAI与微软洽谈新融资及IPO,Instagram因TikTok流失四成用户
  • 特朗普称即将与伊朗达成核协议,外交部:中方愿继续发挥建设性作用
  • 李成钢出席中国与《数字经济伙伴关系协定》成员部级会议
  • 北京警方:海淀发生小客车刮碰行人事故4人受伤,肇事司机已被查获
  • 李公明︱一周书记:当前科学观中的盲点、危机与……人类命运
  • 车载抬头显示爆发在即?业内:凭借市场和产业链优势,国内供应商实现反超
  • 女外交官郑璇已任中国驻莫桑比克大使