当前位置: 首页 > wzjs >正文

wordpress导入产品无锡百度seo优化

wordpress导入产品,无锡百度seo优化,www技术支持 重庆网站建设,自己电脑怎么做网站服务器目的 本教程旨在展示怎么用Dify配置智能问数(自然语言方式提问,返回SQL结果)。 依赖 Dify 1.2.0 Ollama 0.7.0 大模型qwen3:8b、deepseek-r1:7b、 qwen2.5-coder:latest 先Dify在市场里下载Ollama插件 步骤 创建聊天流程 1、创建聊天编排chatflow&#xff…

目的

本教程旨在展示怎么用Dify配置智能问数(自然语言方式提问,返回SQL结果)。

依赖

Dify 1.2.0

Ollama 0.7.0

大模型qwen3:8b、deepseek-r1:7b、 qwen2.5-coder:latest

先Dify在市场里下载Ollama插件

步骤

创建聊天流程

1、创建聊天编排chatflow,指定应用的名称。

新增时间插件

2、点击默认的流程中开始和LLM节点的加号 “+”新增选择工具里的“时间”插件。

选择时间里的获取当前时间。

点击“获取当前日期”节点,修改时间格式和时区分别为:

%Y-%m-%d
亚洲/上海

配置LLM

如果当前Dify没有配置“模型供应商”,需点击右上角用户,然后点击“设置”

选中模型供应商,点击添加模型,

输入模型名称 qwen3:8b,基础URL http://host.docker.internal:11434,

:基础URL可改为实际地址,当前环境是Ollama装在windows上,dify在WSL里的docker镜像内。

后点击保存。

如果LLM已经配置好,可以从模型下拉框里选择配置的大模型。如qwen2.5-coder:latest

配置System提示词

详细内容见下:

## 角色

你是一位精通MySQL数据库SQL查询语句的专家。

## 任务

根据提供的数据库的表结构,将用户输入的内容转换为MySQL数据库的SQL查询语句,函数用Mysql里的函数。

## 数据库的表结构

商品表结构如下:

CREATE TABLE t_product (

id INT PRIMARY KEY AUTO_INCREMENT COMMENT '商品ID',

name VARCHAR(50) NOT NULL COMMENT '商品名称',

unit VARCHAR(10) NOT NULL COMMENT '单位'

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表';

仓库表结构如下:

CREATE TABLE t_warehouse (

id INT PRIMARY KEY AUTO_INCREMENT COMMENT '仓库ID',

name VARCHAR(50) NOT NULL COMMENT '仓库名称'

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仓库表';

库存表结构如下:

CREATE TABLE t_inventory (

id INT PRIMARY KEY AUTO_INCREMENT COMMENT '记录ID',

product_id INT NOT NULL COMMENT '商品ID,关联product_id表的id',

product_name VARCHAR(50) NOT NULL COMMENT '商品名称(冗余)',

warehouse_id INT NOT NULL COMMENT '仓库ID,关联t_warehouse表的id',

quantity INT NOT NULL DEFAULT 0 COMMENT '库存数量',

FOREIGN KEY (product_id) REFERENCES t_product(id),

FOREIGN KEY (warehouse_id) REFERENCES t_warehouse(id),

UNIQUE KEY (product_id, warehouse_id) COMMENT '防止重复记录'

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存表';

入库记录表结构如下:查询时,要加is_deleted = 0

CREATE TABLE t_stock_in (

id INT PRIMARY KEY AUTO_INCREMENT COMMENT '记录ID',

product_id INT NOT NULL COMMENT '商品ID,关联product_id表的id',

product_name VARCHAR(50) NOT NULL COMMENT '商品名称(冗余)',

warehouse_id INT NOT NULL COMMENT '仓库ID,关联t_warehouse表的id',

quantity INT NOT NULL COMMENT '入库数量',

operator VARCHAR(20) COMMENT '操作人',

batch_no VARCHAR(30) COMMENT '批次号',

create_time datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT '入库时间',

is_deleted TINYINT(1) DEFAULT 0 COMMENT '删除标记:0-正常 1-已删除',

FOREIGN KEY (product_id) REFERENCES t_product(id),

FOREIGN KEY (warehouse_id) REFERENCES t_warehouse(id)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='入库记录表';

## 系统参数

当前时间:

## 要求

1. 需要严格按照数据库的表结构来生成。

2. 将生成的SQL语句封装到一个JSON数组中,格式如下:

``` {

"sql": "SELECT product_id FROM t_inventory"

}

```

3. 确保SQL查询语法符合‌PostgreSQL语法。

4. 不返回思考过程和中间结果,给出最终的一个SQL

点击LLM节点右侧的“+”新增节点,这里选择“代码执行”,重命名节点为SQL提取。

配置“输入变量”为上一步LLM的输出text变量。

编写SQL提取代码

SQL提取的python代码,详见:

from typing import Dict, Any
import json
import redef main(arg1: str) -> Dict[str, Any]:"""从JSON字符串中提取SQL语句并返回结构化字典参数:arg1: 包含SQL语句的输入字符串返回:包含以下可能键的字典:- result: 提取到的SQL语句(可能为None)- status: 执行状态(success/error)- error: 错误描述(仅status为error时存在)- raw_extract: 原始提取内容(调试用)"""response = {"result": None}try:# 尝试解析外层JSONtry:data = json.loads(arg1)except json.JSONDecodeError:data = None# 优先从结构化数据中查找if isinstance(data, dict):# 从text字段的代码块中提取if 'text' in data:code_blocks = re.findall(r'```json\n(.*?)\n```',data['text'],re.DOTALL)for block in code_blocks:try:inner_data = json.loads(block.strip())if isinstance(inner_data, dict) and 'sql' in inner_data:response.update({"result": inner_data['sql']})return responseexcept json.JSONDecodeError:continue# 直接检查sql字段if 'sql' in data:response.update({"result": data['sql']})return response# 兜底方案:原始字符串正则匹配sql_pattern = r'"sql"\s*:\s*"((?:\\"|[^"])*)"'match = re.search(sql_pattern, arg1, re.DOTALL)if match:# 处理转义字符raw_sql = match.group(1).replace('\\"', '"')response.update({"result": raw_sql,})return responsereturn responseexcept Exception as e:response.update({"error": f"Processing error: {str(e)}","raw_extract": arg1[:100] + "..." if len(arg1) > 100 else arg1})return response

定义输出变量为result

新增HTTP请求

该请求是执行数据库用,点击SQL提取节点右侧“+”号,新增工具“HTTP请求”

配置请求类型、URL地址和参数
这里如POST

URL:http://172.20.10.10:5000/execute

参数输入/并选择SQL提取里的result:

{

"sql": "/"

}

新增直接回复

http请求节点后边点“+”新增直接回复节点

完整流程

附录

from flask import Flask, request, jsonify
import re
import logging
from logging.handlers import RotatingFileHandler
import configparser
import os
from datetime import datetime, dateapp = Flask(__name__)# 定义错误代码(数值类型)
SUCCESS = 0
MISSING_REQUEST = 1001
EMPTY_SQL = 1002
INVALID_QUERY_TYPE = 1003
DANGEROUS_SQL = 1004
EXECUTION_ERROR = 1005
ENDPOINT_NOT_FOUND = 1006
INTERNAL_ERROR = 1007
DB_CONFIG_NOT_FOUND = 1008
UNSUPPORTED_DB_TYPE = 1009  # 新增:不支持的数据库类型错误# 配置日志系统
log_handler = RotatingFileHandler('sql_service.log', maxBytes=1000000, backupCount=5)
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler.setFormatter(log_formatter)
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)# 读取数据库配置
config = configparser.ConfigParser()
config_file = 'config.ini'
if not os.path.exists(config_file):app.logger.error(f"Configuration file {config_file} not found")raise FileNotFoundError(f"Configuration file {config_file} not found")config.read(config_file,encoding='utf-8')# 存储所有数据库配置的字典
DB_CONFIGS = {}
# 默认配置名
DEFAULT_CONFIG = 'postgres'# 读取所有数据库配置节
for section in config.sections():try:db_type = config.get(section, 'db_type', fallback='postgres').lower()config_data = {'db_type': db_type,'host': config.get(section, 'host'),'user': config.get(section, 'user'),'password': config.get(section, 'password'),'database': config.get(section, 'database'),'port': config.get(section, 'port', fallback=''),}# 为不同数据库类型设置默认端口if not config_data['port']:if db_type == 'postgres':config_data['port'] = '5432'elif db_type == 'mysql':config_data['port'] = '3306'DB_CONFIGS[section] = config_dataapp.logger.info(f"Database configuration '{section}' (Type: {db_type}) loaded successfully")except (configparser.NoOptionError, configparser.NoSectionError) as e:app.logger.error(f"Error in section {section}: {str(e)}")# 检查至少有一个有效配置
if not DB_CONFIGS:app.logger.error("No valid database configurations found")raise RuntimeError("No valid database configurations found")def is_select_query(sql):"""检查是否为SELECT查询语句"""# 移除注释(单行和多行)cleaned_sql = re.sub(r'--.*?$|/\*.*?\*/', '', sql, flags=re.DOTALL | re.MULTILINE).strip()# 检查是否以SELECT或WITH开头return cleaned_sql.lower().startswith(('select', 'with'))def validate_sql(sql):"""基础SQL验证(防止非查询操作)"""forbidden_keywords = ['insert', 'update', 'delete', 'drop', 'alter', 'create','truncate', 'grant', 'revoke', 'commit', 'rollback',# PostgreSQL 危险函数'pg_sleep', 'pg_read_file', 'pg_write_file', 'dblink',# MySQL 危险函数'sleep', 'load_file', 'into outfile', 'into dumpfile','master.', 'slave.', 'sys_exec', 'sys_eval']pattern = r'\b(' + '|'.join(forbidden_keywords) + r')\b'return not re.search(pattern, sql.lower(), re.IGNORECASE)def execute_query(sql, db_config_name):"""执行SQL查询并返回结果,自动格式化日期时间类型"""# 获取数据库配置db_config = DB_CONFIGS.get(db_config_name)if not db_config:return None, f"Database configuration '{db_config_name}' not found"db_type = db_config['db_type']try:if db_type == 'postgres':import psycopg2conn = psycopg2.connect(host=db_config['host'],user=db_config['user'],password=db_config['password'],dbname=db_config['database'],port=db_config['port'])conn.set_session(readonly=True)elif db_type == 'mysql':import mysql.connectorfrom mysql.connector import Errorconn = mysql.connector.connect(host=db_config['host'],user=db_config['user'],password=db_config['password'],database=db_config['database'],port=db_config['port'])# MySQL设置只读模式cursor = conn.cursor()cursor.execute("SET SESSION TRANSACTION READ ONLY")cursor.close()else:return None, f"Unsupported database type: {db_type}"cursor = conn.cursor()cursor.execute(sql)# 获取列名columns = [col[0] for col in cursor.description]# 处理结果并格式化日期字段results = []for row in cursor.fetchall():row_dict = {}for i, (col_name, value) in enumerate(zip(columns, row)):if isinstance(value, datetime):row_dict[col_name] = value.strftime('%Y-%m-%d %H:%M:%S')elif isinstance(value, date):row_dict[col_name] = value.strftime('%Y-%m-%d')else:# 处理MySQL的DECIMAL类型if hasattr(value, '__float__'):row_dict[col_name] = float(value)else:row_dict[col_name] = valueresults.append(row_dict)cursor.close()conn.close()return results, Noneexcept ImportError as e:error_msg = f"Database driver not installed for {db_type}: {str(e)}"app.logger.error(error_msg)return None, error_msgexcept Exception as e:# 捕获特定数据库错误if db_type == 'postgres':import psycopg2if isinstance(e, psycopg2.Error):return None, f"PostgreSQL error: {e.pgerror}"elif db_type == 'mysql':import mysql.connectorif isinstance(e, mysql.connector.Error):return None, f"MySQL error: {e.msg}"return None, f"Database error: {str(e)}"@app.route('/execute', methods=['POST'])
def execute_sql():"""执行SQL的API端点"""data = request.get_json()if not data:app.logger.error("Empty request received")return jsonify({"success": False,"code": MISSING_REQUEST,"message": "Request body must be JSON","data": None}), 400sql = data.get('sql', '').strip()# 获取数据库配置名,默认为 'postgres'db_config_name = data.get('db_config', DEFAULT_CONFIG)# 验证输入if not sql:app.logger.error("Empty SQL statement received")return jsonify({"success": False,"code": EMPTY_SQL,"message": "SQL parameter is required","data": None}), 400# 检查数据库配置是否存在if db_config_name not in DB_CONFIGS:app.logger.error(f"Database configuration '{db_config_name}' not found")return jsonify({"success": False,"code": DB_CONFIG_NOT_FOUND,"message": f"Database configuration '{db_config_name}' not found","available_configs": list(DB_CONFIGS.keys())}), 400# 检查是否为SELECT查询if not is_select_query(sql):app.logger.warning(f"Non-SELECT query attempted: {sql}")return jsonify({"success": False,"code": INVALID_QUERY_TYPE,"message": "Only SELECT queries are allowed","sql_sample": sql[:100] + "..." if len(sql) > 100 else sql}), 400# 验证SQL安全性if not validate_sql(sql):app.logger.warning(f"Potential dangerous SQL detected: {sql}")return jsonify({"success": False,"code": DANGEROUS_SQL,"message": "SQL contains forbidden keywords","sql_sample": sql[:100] + "..." if len(sql) > 100 else sql}), 400# 执行查询results, error = execute_query(sql, db_config_name)if error:app.logger.error(f"SQL execution failed with config '{db_config_name}': {sql} | Error: {error}")return jsonify({"success": False,"code": EXECUTION_ERROR,"message": "SQL execution failed","details": error,"sql_sample": sql[:100] + "..." if len(sql) > 100 else sql,"db_config": db_config_name}), 400app.logger.info(f"SQL executed successfully with config '{db_config_name}': {sql}")# 构建标准化响应response_data = {"success": True,"code": SUCCESS,"message": "Query executed successfully","db_config": db_config_name,"db_type": DB_CONFIGS[db_config_name]['db_type'],"data": {"count": len(results),"results": results}}# 添加列名信息(如果有结果)if results:response_data["data"]["columns"] = list(results[0].keys())else:response_data["data"]["columns"] = []response_data["message"] = "Query executed successfully but returned no results"return jsonify(response_data), 200@app.errorhandler(404)
def not_found(error):app.logger.warning(f"Endpoint not found: {request.path}")return jsonify({"success": False,"code": ENDPOINT_NOT_FOUND,"message": "Endpoint not found","requested_path": request.path}), 404@app.errorhandler(500)
def internal_error(error):app.logger.error(f"Internal server error: {error}")return jsonify({"success": False,"code": INTERNAL_ERROR,"message": "Internal server error","details": str(error)}), 500if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=False)

注:建表语句及提示词参考:https://blog.csdn.net/beilingcc/article/details/147162349

http://www.dtcms.com/wzjs/98892.html

相关文章:

  • 怎样维护公司网站网络营销策划书案例
  • 一起做彩票网站的人免费的自助建站
  • 无做弊的棋牌游戏网站随州今日头条新闻
  • 宝鸡网站建设软文代发价格
  • 佛山网站建设外包公司页面seo是什么意思
  • wordpress 文件列表杭州排名优化公司
  • wordpress信息收集表单制作seo是什么级别
  • 阿里服务器怎么做网站服务器吗新闻摘抄2022最新5篇
  • .net网站开发实站上海最新发布最新
  • 做执法设备有哪些网站凡科建站登录官网
  • 米拓cms可以做企业网站吗陕西seo排名
  • 自适应企业建站企业网站没有友情链接
  • 冷水滩做微信网站武汉seo结算
  • 网页版微信二维码扫描网站运营推广选择乐云seo
  • 手机版scratch下载保定seo排名外包
  • 网上手机商城网站建设怎么开通网站平台
  • 北京外贸网站建设如何建立网站平台
  • 提高网站知名度种子搜索神器 bt 下载
  • 营销型网站建设的指导原则简阳seo排名优化课程
  • 网站建设网站制作价格销售成功案例分享
  • 池州商城网站开发电脑系统优化工具
  • 建设交通职业技术学院招聘信息网站微信怎么推广自己的产品
  • 做搜索引擎的网站搜索引擎营销例子
  • 文本编辑器做网站百度seo排名优化软件分类
  • 网站模板大小夫唯老师seo
  • 网站内容管理html简单网页成品
  • 武汉市做网站最有效的推广方式
  • wordpress怎么放验证文件seo快速优化技术
  • 做纯静态网站怎么样域名注册时间查询
  • wordpress 宁皓济南网站推广优化