网站架设教程百度极速版免费下载安装
1. 背景
AI智能体(工作流/多智能体)都以来大量的工具才能高效完成计算密集及结果确定的任务。知名的notebook/jupyterlab是不错的选择。
这里有个综合能力强且开源的python在线开发部署一体平台DataFlux-Func,同时具有python微服务开发/部署,定时任务调度,可弹性扩容,安全鉴权,虚拟路径等生产级能力,可以作为接口平台/数据处理平台,满足小规模的产品应用需求。开源。
github代码库:
https://github.com/GuanceCloud/dataflux-func.git
2.安装(docker-compose)
参考文档:
https://func.guance.com/#/doc-index
官方提供了自动安装(基于docker swarm)只用按文档执行bash命令就自动安装了。同时提供了helm用于k8s安装(生产级)。
因为需要window下安装,这里提供docker compose的安装方法。
2.1 下载镜像(先找个目录)
./download-portable.sh --arch=x86_64
会有下载几个镜像
图片中的docker-compose.yaml, 文件内容如下:
* (mini版,worker只有1个,如果需要扩容worker,可修改scale参数>1)
* 请修改:宿主机的共享目录,密码端口等,改成自己喜欢的。
version: '3.8'services:# MYSQL STARTmysql:image: pubrepo.jiagouyun.com/dataflux-func/mysql:5.7.26labels:- mysqllogging:driver: json-fileoptions:max-size: 1mmax-file: 10networks:- datafluxfuncvolumes:- "D:/dockerDFFVol/mysql:/var/lib/mysql"environment:MYSQL_ROOT_PASSWORD: 2fd9179461546968MYSQL_DATABASE: dataflux_funccommand: --tls-version=TLSv1.2 --innodb-large-prefix=on --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --performance-schema=off --table-open-cache=400 --default-authentication-plugin=mysql_native_password# MYSQL END# REDIS STARTredis:image: pubrepo.jiagouyun.com/dataflux-func/redis:5.0.7labels:- redislogging:driver: json-fileoptions:max-size: 1mmax-file: 10networks:- datafluxfuncvolumes:- "D:/dockerDFFVol/redis:/data"command: --stop-writes-on-bgsave-error yes# REDIS END# WORKER STARTworker:image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3labels:- workervolumes:- "D:/dockerDFFVol/data:/data"logging:driver: json-fileoptions:max-size: 10mmax-file: 10networks:- datafluxfuncenvironment:HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式command: ./run-worker.shscale: 1 # 替换了deploy.replicas# WORKER ENDbeat:image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3labels:- beatvolumes:- "D:/dockerDFFVol/data:/data"logging:driver: json-fileoptions:max-size: 1mmax-file: 10networks:- datafluxfuncenvironment:HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式redis_host: rediscommand: ./run-beat.shserver:image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3labels:- servervolumes:- "D:/dockerDFFVol/data:/data"logging:driver: json-fileoptions:max-size: 10mmax-file: 10networks:- datafluxfuncenvironment:HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式ports:- "9000:8088"command: ./run-server.shnetworks:datafluxfunc:driver: bridge # 替换了overlay驱动
2.2 导入镜像启动服务初始化
docker load < dataflux-func.tar.gz
docker load < mysql.tar.gz
docker load < redis.tar.gz
导入后:
启动:
docker-compose -p dff_6_2_3 up -d
访问:http://localhost:9000,有个配置初始化页面。
* 这里补齐一些配置,创建use-config.yaml文件,如下。
SECRET: 4xKQ1iaLwMnC72Jn
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: root
MYSQL_PASSWORD: root
MYSQL_DATABASE: dataflux_func
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_DATABASE: 5
REDIS_PASSWORD: ''
REDIS_USE_TLS: false
LOG_LEVEL: WARNING
LOG_FILE_FORMAT: text
WEB_BASE_URL: ''
DB_ENGINE: mysql
_IS_INSTALLED: true
放到宿主机的共享目录下:
重启以下,访问:http://localhost:9000。点击初始化按钮,系统自动完成初始化。
2.3 正式登录
(用默认密码:admin/admin)
3. 微服务在线开发/部署
在线创建脚本,编辑,调试。语法高亮,好用,不亚于notebook。
没问题就点发布(这里比notebook还要好,发布后版本与编辑版本分离;编辑修改不影响在线的服务)
管理也去-同步API,创建一个公开的微服务API。选一下就行了。
用示例,系统给了调式的例子(post/get)都有,拷贝修改参数跑下。(生产环境,可以作为ping,验证服务状态)
4. 智能体调用工具
dify创建个工具:
我一般让deepseek等大模型帮忙生成,很多框架也有代码转定义的工具:
{"openapi": "3.1.0","info": {"title": "Addition Calculator","description": "A simple tool that adds two numbers together.","version": "v1.0.0"},"servers": [{"url": "http://host.docker.internal:5000"}],"paths": {"/plus": {"post": {"description": "Adds two numbers together and returns the sum","operationId": "AddNumbers","requestBody": {"required": true,"content": {"application/json": {"schema": {"type": "object","properties": {"x": {"type": "number","description": "First number to add","example": 5},"y": {"type": "number","description": "Second number to add","example": 3}},"required": ["x", "y"]}}}},"responses": {"200": {"description": "Successful addition","content": {"application/json": {"schema": {"type": "object","properties": {"result": {"type": "number","description": "The sum of x and y"}}}}}},"400": {"description": "Bad request","content": {"application/json": {"schema": {"type": "object","properties": {"error": {"type": "string","example": "Missing fields or invalid numbers"}}}}}},"500": {"description": "Calculation error","content": {"application/json": {"schema": {"type": "object","properties": {"error": {"type": "string","example": "Error during calculation"}}}}}}},"deprecated": false}}},"components": {"schemas": {"AdditionRequest": {"type": "object","properties": {"x": {"type": "number","description": "First number to add"},"y": {"type": "number","description": "Second number to add"}},"required": ["x", "y"]},"AdditionResponse": {"type": "object","properties": {"result": {"type": "number","description": "The sum of x and y"}}},"AdditionError": {"type": "object","properties": {"error": {"type": "string","description": "Error message"}}}}}
}
5. 数据开发平台
可以安装你想要的库:
让大模型开发个同步数据库表任务工具方法:
*(方法写的很强大,而且不需要改,实验中密码记错了只改了密码;代码写在最后)
* 可以写更多跨库SQL到表的工具;同库select insert工具等。
配置一个定时调度(点点点...)
验证:
先删除表
等1分钟:
可以查看日志:
附录
数据同步代码参考
MySQLSync工具类完全由大模型写完,运行无问题。
将其转化为定时任务,只用写个DFF的方法引用工具类的方法就好了。
import mysql.connector
from mysql.connector import errorcode
import time
from threading import Lockclass MySQLSync:def __init__(self, source_config, target_config):self.source_config = source_configself.target_config = target_configself.lock = Lock()def _get_connection(self, config):"""创建数据库连接池"""try:return mysql.connector.connect(host=config['host'],port=config['port'],user=config['user'],password=config['password'],database=config['database'],pool_name=f"{config['database']}_pool",pool_size=3)except mysql.connector.Error as err:if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:print("用户名或密码错误")elif err.errno == errorcode.ER_BAD_DB_ERROR:print("数据库不存在")else:print(f"连接错误: {err}")raisedef full_sync(self, table_name):"""全量同步表数据"""try:with self.lock:source_conn = self._get_connection(self.source_config)target_conn = self._get_connection(self.target_config)source_cursor = source_conn.cursor(dictionary=True)target_cursor = target_conn.cursor()# 获取源表结构source_cursor.execute(f"SHOW CREATE TABLE {table_name}")create_table_sql = source_cursor.fetchone()['Create Table']# 在目标库创建表(如果不存在)target_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")target_cursor.execute(create_table_sql)# 分页查询源数据page_size = 1000offset = 0total_rows = 0while True:source_cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {page_size}")rows = source_cursor.fetchall()if not rows:break# 构建批量插入语句columns = list(rows[0].keys())placeholders = ['%s'] * len(columns)insert_sql = f"""INSERT INTO {table_name} ({','.join(columns)})VALUES ({','.join(placeholders)})"""# 执行批量插入target_cursor.executemany(insert_sql,[tuple(row.values()) for row in rows])total_rows += len(rows)offset += page_sizeprint(f"已同步 {total_rows} 行数据...")target_conn.commit()print(f"全量同步完成,共同步 {total_rows} 行数据")except Exception as e:print(f"同步失败: {str(e)}")if 'target_conn' in locals():target_conn.rollback()finally:for conn in [source_conn, target_conn]:if conn and conn.is_connected():conn.close()def incremental_sync(self, table_name):"""增量同步(基于binlog)"""try:# 使用canal-python客户端监听binlogfrom canal.client import Clientfrom canal.protocol import EntryProtocol_pb2client = Client()client.connect(host=self.source_config['host'],port=11111 # canal服务端端口)client.check_valid(username=self.source_config['user'].encode(),password=self.source_config['password'].encode())client.subscribe(client_id=b'1001',destination=b'example',filter=bytes(f'{self.source_config["database"]}\\.{table_name}', 'utf-8'))print(f"开始监听 {table_name} 表的变更...")while True:message = client.get(100)entries = message['entries']for entry in entries:if entry.entryType in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,EntryProtocol_pb2.EntryType.TRANSACTIONEND]:continuerow_change = EntryProtocol_pb2.RowChange()row_change.MergeFromString(entry.storeValue)event_type = row_change.eventTypeheader = entry.headerwith self.lock:target_conn = self._get_connection(self.target_config)target_cursor = target_conn.cursor()try:for row in row_change.rowDatas:if event_type == EntryProtocol_pb2.EventType.INSERT:columns = [col.name for col in row.afterColumns]values = [col.value for col in row.afterColumns]placeholders = ['%s'] * len(columns)insert_sql = f"""INSERT INTO {table_name} ({','.join(columns)})VALUES ({','.join(placeholders)})"""target_cursor.execute(insert_sql, values)elif event_type == EntryProtocol_pb2.EventType.UPDATE:set_clause = []where_clause = []params = []for col in row.afterColumns:set_clause.append(f"{col.name}=%s")params.append(col.value)if col.updated:where_clause.append(f"{col.name}=%s")params.append(row.beforeColumns[int(col.index)].value)update_sql = f"""UPDATE {table_name}SET {','.join(set_clause)}WHERE {' AND '.join(where_clause)}"""target_cursor.execute(update_sql, params)elif event_type == EntryProtocol_pb2.EventType.DELETE:where_clause = []params = []for col in row.beforeColumns:where_clause.append(f"{col.name}=%s")params.append(col.value)delete_sql = f"""DELETE FROM {table_name}WHERE {' AND '.join(where_clause)}"""target_cursor.execute(delete_sql, params)target_conn.commit()print(f"已处理 {len(row_change.rowDatas)} 条变更记录")except Exception as e:target_conn.rollback()print(f"处理变更失败: {str(e)}")finally:if target_conn and target_conn.is_connected():target_conn.close()time.sleep(0.1)except KeyboardInterrupt:print("停止监听")except Exception as e:print(f"增量同步异常: {str(e)}")@DFF.API('同步表任务', category='bigdata', tags=['bigdata', 'simple'], cache_result=300, timeout=3600)
def syncTable(tbl_name):'''两数相加输入参数 x, y 均为数字类型,返回结果为两者之和'''# 配置源库和目标库source_config = {'host': 'host.docker.internal','port': 33306,'user': 'readonly_user','password': '改我','database': 'e01'}target_config = {'host': 'host.docker.internal','port': 33306,'user': 'root','password': '改我','database': 'e02'}sync = MySQLSync(source_config, target_config)# 执行全量同步sync.full_sync(tbl_name)# 测试函数不需要装饰器
def test_plus():syncTable('region')