20250409-大数据-python数据处理平台/接口平台(DFF)
1. 背景
AI智能体(工作流/多智能体)都以来大量的工具才能高效完成计算密集及结果确定的任务。知名的notebook/jupyterlab是不错的选择。
这里有个综合能力强且开源的python在线开发部署一体平台DataFlux-Func,同时具有python微服务开发/部署,定时任务调度,可弹性扩容,安全鉴权,虚拟路径等生产级能力,可以作为接口平台/数据处理平台,满足小规模的产品应用需求。开源。
github代码库:
https://github.com/GuanceCloud/dataflux-func.git
2.安装(docker-compose)
参考文档:
https://func.guance.com/#/doc-index
官方提供了自动安装(基于docker swarm)只用按文档执行bash命令就自动安装了。同时提供了helm用于k8s安装(生产级)。
因为需要window下安装,这里提供docker compose的安装方法。
2.1 下载镜像(先找个目录)
./download-portable.sh --arch=x86_64
会有下载几个镜像
图片中的docker-compose.yaml, 文件内容如下:
* (mini版,worker只有1个,如果需要扩容worker,可修改scale参数>1)
* 请修改:宿主机的共享目录,密码端口等,改成自己喜欢的。
version: '3.8'
services:
# MYSQL START
mysql:
image: pubrepo.jiagouyun.com/dataflux-func/mysql:5.7.26
labels:
- mysql
logging:
driver: json-file
options:
max-size: 1m
max-file: 10
networks:
- datafluxfunc
volumes:
- "D:/dockerDFFVol/mysql:/var/lib/mysql"
environment:
MYSQL_ROOT_PASSWORD: 2fd9179461546968
MYSQL_DATABASE: dataflux_func
command: --tls-version=TLSv1.2 --innodb-large-prefix=on --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --performance-schema=off --table-open-cache=400 --default-authentication-plugin=mysql_native_password
# MYSQL END
# REDIS START
redis:
image: pubrepo.jiagouyun.com/dataflux-func/redis:5.0.7
labels:
- redis
logging:
driver: json-file
options:
max-size: 1m
max-file: 10
networks:
- datafluxfunc
volumes:
- "D:/dockerDFFVol/redis:/data"
command: --stop-writes-on-bgsave-error yes
# REDIS END
# WORKER START
worker:
image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
labels:
- worker
volumes:
- "D:/dockerDFFVol/data:/data"
logging:
driver: json-file
options:
max-size: 10m
max-file: 10
networks:
- datafluxfunc
environment:
HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
command: ./run-worker.sh
scale: 1 # 替换了deploy.replicas
# WORKER END
beat:
image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
labels:
- beat
volumes:
- "D:/dockerDFFVol/data:/data"
logging:
driver: json-file
options:
max-size: 1m
max-file: 10
networks:
- datafluxfunc
environment:
HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
redis_host: redis
command: ./run-beat.sh
server:
image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
labels:
- server
volumes:
- "D:/dockerDFFVol/data:/data"
logging:
driver: json-file
options:
max-size: 10m
max-file: 10
networks:
- datafluxfunc
environment:
HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
ports:
- "9000:8088"
command: ./run-server.sh
networks:
datafluxfunc:
driver: bridge # 替换了overlay驱动
2.2 导入镜像启动服务初始化
docker load < dataflux-func.tar.gz
docker load < mysql.tar.gz
docker load < redis.tar.gz
导入后:
启动:
docker-compose -p dff_6_2_3 up -d
访问:http://localhost:9000,有个配置初始化页面。
* 这里补齐一些配置,创建use-config.yaml文件,如下。
SECRET: 4xKQ1iaLwMnC72Jn
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: root
MYSQL_PASSWORD: root
MYSQL_DATABASE: dataflux_func
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_DATABASE: 5
REDIS_PASSWORD: ''
REDIS_USE_TLS: false
LOG_LEVEL: WARNING
LOG_FILE_FORMAT: text
WEB_BASE_URL: ''
DB_ENGINE: mysql
_IS_INSTALLED: true
放到宿主机的共享目录下:
重启以下,访问:http://localhost:9000。点击初始化按钮,系统自动完成初始化。
2.3 正式登录
(用默认密码:admin/admin)
3. 微服务在线开发/部署
在线创建脚本,编辑,调试。语法高亮,好用,不亚于notebook。
没问题就点发布(这里比notebook还要好,发布后版本与编辑版本分离;编辑修改不影响在线的服务)
管理也去-同步API,创建一个公开的微服务API。选一下就行了。
用示例,系统给了调式的例子(post/get)都有,拷贝修改参数跑下。(生产环境,可以作为ping,验证服务状态)
4. 智能体调用工具
dify创建个工具:
我一般让deepseek等大模型帮忙生成,很多框架也有代码转定义的工具:
{
"openapi": "3.1.0",
"info": {
"title": "Addition Calculator",
"description": "A simple tool that adds two numbers together.",
"version": "v1.0.0"
},
"servers": [
{
"url": "http://host.docker.internal:5000"
}
],
"paths": {
"/plus": {
"post": {
"description": "Adds two numbers together and returns the sum",
"operationId": "AddNumbers",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "First number to add",
"example": 5
},
"y": {
"type": "number",
"description": "Second number to add",
"example": 3
}
},
"required": ["x", "y"]
}
}
}
},
"responses": {
"200": {
"description": "Successful addition",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"result": {
"type": "number",
"description": "The sum of x and y"
}
}
}
}
}
},
"400": {
"description": "Bad request",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"error": {
"type": "string",
"example": "Missing fields or invalid numbers"
}
}
}
}
}
},
"500": {
"description": "Calculation error",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"error": {
"type": "string",
"example": "Error during calculation"
}
}
}
}
}
}
},
"deprecated": false
}
}
},
"components": {
"schemas": {
"AdditionRequest": {
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "First number to add"
},
"y": {
"type": "number",
"description": "Second number to add"
}
},
"required": ["x", "y"]
},
"AdditionResponse": {
"type": "object",
"properties": {
"result": {
"type": "number",
"description": "The sum of x and y"
}
}
},
"AdditionError": {
"type": "object",
"properties": {
"error": {
"type": "string",
"description": "Error message"
}
}
}
}
}
}
5. 数据开发平台
可以安装你想要的库:
让大模型开发个同步数据库表任务工具方法:
*(方法写的很强大,而且不需要改,实验中密码记错了只改了密码;代码写在最后)
* 可以写更多跨库SQL到表的工具;同库select insert工具等。
配置一个定时调度(点点点...)
验证:
先删除表
等1分钟:
可以查看日志:
附录
数据同步代码参考
MySQLSync工具类完全由大模型写完,运行无问题。
将其转化为定时任务,只用写个DFF的方法引用工具类的方法就好了。
import mysql.connector
from mysql.connector import errorcode
import time
from threading import Lock
class MySQLSync:
def __init__(self, source_config, target_config):
self.source_config = source_config
self.target_config = target_config
self.lock = Lock()
def _get_connection(self, config):
"""创建数据库连接池"""
try:
return mysql.connector.connect(
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
pool_name=f"{config['database']}_pool",
pool_size=3
)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("用户名或密码错误")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("数据库不存在")
else:
print(f"连接错误: {err}")
raise
def full_sync(self, table_name):
"""全量同步表数据"""
try:
with self.lock:
source_conn = self._get_connection(self.source_config)
target_conn = self._get_connection(self.target_config)
source_cursor = source_conn.cursor(dictionary=True)
target_cursor = target_conn.cursor()
# 获取源表结构
source_cursor.execute(f"SHOW CREATE TABLE {table_name}")
create_table_sql = source_cursor.fetchone()['Create Table']
# 在目标库创建表(如果不存在)
target_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
target_cursor.execute(create_table_sql)
# 分页查询源数据
page_size = 1000
offset = 0
total_rows = 0
while True:
source_cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {page_size}")
rows = source_cursor.fetchall()
if not rows:
break
# 构建批量插入语句
columns = list(rows[0].keys())
placeholders = ['%s'] * len(columns)
insert_sql = f"""
INSERT INTO {table_name} ({','.join(columns)})
VALUES ({','.join(placeholders)})
"""
# 执行批量插入
target_cursor.executemany(
insert_sql,
[tuple(row.values()) for row in rows]
)
total_rows += len(rows)
offset += page_size
print(f"已同步 {total_rows} 行数据...")
target_conn.commit()
print(f"全量同步完成,共同步 {total_rows} 行数据")
except Exception as e:
print(f"同步失败: {str(e)}")
if 'target_conn' in locals():
target_conn.rollback()
finally:
for conn in [source_conn, target_conn]:
if conn and conn.is_connected():
conn.close()
def incremental_sync(self, table_name):
"""增量同步(基于binlog)"""
try:
# 使用canal-python客户端监听binlog
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
client = Client()
client.connect(
host=self.source_config['host'],
port=11111 # canal服务端端口
)
client.check_valid(
username=self.source_config['user'].encode(),
password=self.source_config['password'].encode()
)
client.subscribe(
client_id=b'1001',
destination=b'example',
filter=bytes(f'{self.source_config["database"]}\\.{table_name}', 'utf-8')
)
print(f"开始监听 {table_name} 表的变更...")
while True:
message = client.get(100)
entries = message['entries']
for entry in entries:
if entry.entryType in [
EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
EntryProtocol_pb2.EntryType.TRANSACTIONEND
]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
with self.lock:
target_conn = self._get_connection(self.target_config)
target_cursor = target_conn.cursor()
try:
for row in row_change.rowDatas:
if event_type == EntryProtocol_pb2.EventType.INSERT:
columns = [col.name for col in row.afterColumns]
values = [col.value for col in row.afterColumns]
placeholders = ['%s'] * len(columns)
insert_sql = f"""
INSERT INTO {table_name} ({','.join(columns)})
VALUES ({','.join(placeholders)})
"""
target_cursor.execute(insert_sql, values)
elif event_type == EntryProtocol_pb2.EventType.UPDATE:
set_clause = []
where_clause = []
params = []
for col in row.afterColumns:
set_clause.append(f"{col.name}=%s")
params.append(col.value)
if col.updated:
where_clause.append(f"{col.name}=%s")
params.append(row.beforeColumns[int(col.index)].value)
update_sql = f"""
UPDATE {table_name}
SET {','.join(set_clause)}
WHERE {' AND '.join(where_clause)}
"""
target_cursor.execute(update_sql, params)
elif event_type == EntryProtocol_pb2.EventType.DELETE:
where_clause = []
params = []
for col in row.beforeColumns:
where_clause.append(f"{col.name}=%s")
params.append(col.value)
delete_sql = f"""
DELETE FROM {table_name}
WHERE {' AND '.join(where_clause)}
"""
target_cursor.execute(delete_sql, params)
target_conn.commit()
print(f"已处理 {len(row_change.rowDatas)} 条变更记录")
except Exception as e:
target_conn.rollback()
print(f"处理变更失败: {str(e)}")
finally:
if target_conn and target_conn.is_connected():
target_conn.close()
time.sleep(0.1)
except KeyboardInterrupt:
print("停止监听")
except Exception as e:
print(f"增量同步异常: {str(e)}")
@DFF.API('同步表任务', category='bigdata', tags=['bigdata', 'simple'], cache_result=300, timeout=3600)
def syncTable(tbl_name):
'''
两数相加
输入参数 x, y 均为数字类型,返回结果为两者之和
'''
# 配置源库和目标库
source_config = {
'host': 'host.docker.internal',
'port': 33306,
'user': 'readonly_user',
'password': '改我',
'database': 'e01'
}
target_config = {
'host': 'host.docker.internal',
'port': 33306,
'user': 'root',
'password': '改我',
'database': 'e02'
}
sync = MySQLSync(source_config, target_config)
# 执行全量同步
sync.full_sync(tbl_name)
# 测试函数不需要装饰器
def test_plus():
syncTable('region')