当前位置: 首页 > news >正文

20250409-大数据-python数据处理平台/接口平台(DFF)

1. 背景

AI智能体(工作流/多智能体)都以来大量的工具才能高效完成计算密集及结果确定的任务。知名的notebook/jupyterlab是不错的选择。

这里有个综合能力强且开源的python在线开发部署一体平台DataFlux-Func,同时具有python微服务开发/部署,定时任务调度,可弹性扩容,安全鉴权,虚拟路径等生产级能力,可以作为接口平台/数据处理平台,满足小规模的产品应用需求。开源。

github代码库:

https://github.com/GuanceCloud/dataflux-func.git
 

2.安装(docker-compose)

参考文档:

https://func.guance.com/#/doc-index

官方提供了自动安装(基于docker swarm)只用按文档执行bash命令就自动安装了。同时提供了helm用于k8s安装(生产级)。

因为需要window下安装,这里提供docker compose的安装方法。

2.1 下载镜像(先找个目录)

./download-portable.sh --arch=x86_64

会有下载几个镜像

图片中的docker-compose.yaml, 文件内容如下:

* (mini版,worker只有1个,如果需要扩容worker,可修改scale参数>1)

* 请修改:宿主机的共享目录,密码端口等,改成自己喜欢的。

version: '3.8'

services:
  # MYSQL START
  mysql:
    image: pubrepo.jiagouyun.com/dataflux-func/mysql:5.7.26
    labels:
      - mysql
    logging:
      driver: json-file
      options:
        max-size: 1m
        max-file: 10
    networks:
      - datafluxfunc
    volumes:
      - "D:/dockerDFFVol/mysql:/var/lib/mysql"
    environment:
      MYSQL_ROOT_PASSWORD: 2fd9179461546968
      MYSQL_DATABASE: dataflux_func
    command: --tls-version=TLSv1.2 --innodb-large-prefix=on --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --performance-schema=off --table-open-cache=400 --default-authentication-plugin=mysql_native_password
  # MYSQL END

  # REDIS START
  redis:
    image: pubrepo.jiagouyun.com/dataflux-func/redis:5.0.7
    labels:
      - redis
    logging:
      driver: json-file
      options:
        max-size: 1m
        max-file: 10
    networks:
      - datafluxfunc
    volumes:
      - "D:/dockerDFFVol/redis:/data"
    command: --stop-writes-on-bgsave-error yes
  # REDIS END

  # WORKER START
  worker:
    image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
    labels:
      - worker
    volumes:
      - "D:/dockerDFFVol/data:/data"
    logging:
      driver: json-file
      options:
        max-size: 10m
        max-file: 10
    networks:
      - datafluxfunc
    environment:
      HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
    command: ./run-worker.sh
    scale: 1  # 替换了deploy.replicas
  # WORKER END

  beat:
    image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
    labels:
      - beat
    volumes:
      - "D:/dockerDFFVol/data:/data"
    logging:
      driver: json-file
      options:
        max-size: 1m
        max-file: 10
    networks:
      - datafluxfunc
    environment:
      HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
      redis_host: redis
    command: ./run-beat.sh

  server:
    image: pubrepo.jiagouyun.com/dataflux-func/dataflux-func:6.2.3
    labels:
      - server
    volumes:
      - "D:/dockerDFFVol/data:/data"
    logging:
      driver: json-file
      options:
        max-size: 10m
        max-file: 10
    networks:
      - datafluxfunc
    environment:
      HOST_HOSTNAME: 0.0.0.0 # 修改了Swarm特有的变量引用方式
    ports:
      - "9000:8088"
    command: ./run-server.sh

networks:
  datafluxfunc:
    driver: bridge  # 替换了overlay驱动

2.2 导入镜像启动服务初始化

docker load < dataflux-func.tar.gz
docker load < mysql.tar.gz
docker load < redis.tar.gz

导入后:

启动:

docker-compose -p dff_6_2_3 up -d

访问:http://localhost:9000,有个配置初始化页面。

* 这里补齐一些配置,创建use-config.yaml文件,如下。

SECRET: 4xKQ1iaLwMnC72Jn
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: root
MYSQL_PASSWORD: root
MYSQL_DATABASE: dataflux_func
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_DATABASE: 5
REDIS_PASSWORD: ''
REDIS_USE_TLS: false
LOG_LEVEL: WARNING
LOG_FILE_FORMAT: text
WEB_BASE_URL: ''
DB_ENGINE: mysql
_IS_INSTALLED: true

放到宿主机的共享目录下:

重启以下,访问:http://localhost:9000。点击初始化按钮,系统自动完成初始化。

2.3 正式登录

(用默认密码:admin/admin)

3. 微服务在线开发/部署

在线创建脚本,编辑,调试。语法高亮,好用,不亚于notebook。

没问题就点发布(这里比notebook还要好,发布后版本与编辑版本分离;编辑修改不影响在线的服务)

管理也去-同步API,创建一个公开的微服务API。选一下就行了。

用示例,系统给了调式的例子(post/get)都有,拷贝修改参数跑下。(生产环境,可以作为ping,验证服务状态)

4. 智能体调用工具

dify创建个工具:

我一般让deepseek等大模型帮忙生成,很多框架也有代码转定义的工具:

{
  "openapi": "3.1.0",
  "info": {
    "title": "Addition Calculator",
    "description": "A simple tool that adds two numbers together.",
    "version": "v1.0.0"
  },
  "servers": [
    {
      "url": "http://host.docker.internal:5000"
    }
  ],
  "paths": {
    "/plus": {
      "post": {
        "description": "Adds two numbers together and returns the sum",
        "operationId": "AddNumbers",
        "requestBody": {
          "required": true,
          "content": {
            "application/json": {
              "schema": {
                "type": "object",
                "properties": {
                  "x": {
                    "type": "number",
                    "description": "First number to add",
                    "example": 5
                  },
                  "y": {
                    "type": "number",
                    "description": "Second number to add",
                    "example": 3
                  }
                },
                "required": ["x", "y"]
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "Successful addition",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "result": {
                      "type": "number",
                      "description": "The sum of x and y"
                    }
                  }
                }
              }
            }
          },
          "400": {
            "description": "Bad request",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "error": {
                      "type": "string",
                      "example": "Missing fields or invalid numbers"
                    }
                  }
                }
              }
            }
          },
          "500": {
            "description": "Calculation error",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "error": {
                      "type": "string",
                      "example": "Error during calculation"
                    }
                  }
                }
              }
            }
          }
        },
        "deprecated": false
      }
    }
  },
  "components": {
    "schemas": {
      "AdditionRequest": {
        "type": "object",
        "properties": {
          "x": {
            "type": "number",
            "description": "First number to add"
          },
          "y": {
            "type": "number",
            "description": "Second number to add"
          }
        },
        "required": ["x", "y"]
      },
      "AdditionResponse": {
        "type": "object",
        "properties": {
          "result": {
            "type": "number",
            "description": "The sum of x and y"
          }
        }
      },
      "AdditionError": {
        "type": "object",
        "properties": {
          "error": {
            "type": "string",
            "description": "Error message"
          }
        }
      }
    }
  }
}

5. 数据开发平台

可以安装你想要的库:

让大模型开发个同步数据库表任务工具方法:

*(方法写的很强大,而且不需要改,实验中密码记错了只改了密码;代码写在最后)

*  可以写更多跨库SQL到表的工具;同库select insert工具等。

配置一个定时调度(点点点...)

验证:

先删除表

等1分钟:

可以查看日志:

附录

数据同步代码参考

MySQLSync工具类完全由大模型写完,运行无问题。

将其转化为定时任务,只用写个DFF的方法引用工具类的方法就好了。

import mysql.connector
from mysql.connector import errorcode
import time
from threading import Lock

class MySQLSync:
    def __init__(self, source_config, target_config):
        self.source_config = source_config
        self.target_config = target_config
        self.lock = Lock()

    def _get_connection(self, config):
        """创建数据库连接池"""
        try:
            return mysql.connector.connect(
                host=config['host'],
                port=config['port'],
                user=config['user'],
                password=config['password'],
                database=config['database'],
                pool_name=f"{config['database']}_pool",
                pool_size=3
            )
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("用户名或密码错误")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("数据库不存在")
            else:
                print(f"连接错误: {err}")
            raise

    def full_sync(self, table_name):
        """全量同步表数据"""
        try:
            with self.lock:
                source_conn = self._get_connection(self.source_config)
                target_conn = self._get_connection(self.target_config)

                source_cursor = source_conn.cursor(dictionary=True)
                target_cursor = target_conn.cursor()

                # 获取源表结构
                source_cursor.execute(f"SHOW CREATE TABLE {table_name}")
                create_table_sql = source_cursor.fetchone()['Create Table']

                # 在目标库创建表(如果不存在)
                target_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
                target_cursor.execute(create_table_sql)

                # 分页查询源数据
                page_size = 1000
                offset = 0
                total_rows = 0

                while True:
                    source_cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {page_size}")
                    rows = source_cursor.fetchall()

                    if not rows:
                        break

                    # 构建批量插入语句
                    columns = list(rows[0].keys())
                    placeholders = ['%s'] * len(columns)
                    insert_sql = f"""
                        INSERT INTO {table_name} ({','.join(columns)})
                        VALUES ({','.join(placeholders)})
                    """

                    # 执行批量插入
                    target_cursor.executemany(
                        insert_sql,
                        [tuple(row.values()) for row in rows]
                    )

                    total_rows += len(rows)
                    offset += page_size
                    print(f"已同步 {total_rows} 行数据...")

                target_conn.commit()
                print(f"全量同步完成,共同步 {total_rows} 行数据")

        except Exception as e:
            print(f"同步失败: {str(e)}")
            if 'target_conn' in locals():
                target_conn.rollback()
        finally:
            for conn in [source_conn, target_conn]:
                if conn and conn.is_connected():
                    conn.close()

    def incremental_sync(self, table_name):
        """增量同步(基于binlog)"""
        try:
            # 使用canal-python客户端监听binlog
            from canal.client import Client
            from canal.protocol import EntryProtocol_pb2

            client = Client()
            client.connect(
                host=self.source_config['host'],
                port=11111  # canal服务端端口
            )
            client.check_valid(
                username=self.source_config['user'].encode(),
                password=self.source_config['password'].encode()
            )
            client.subscribe(
                client_id=b'1001',
                destination=b'example',
                filter=bytes(f'{self.source_config["database"]}\\.{table_name}', 'utf-8')
            )

            print(f"开始监听 {table_name} 表的变更...")

            while True:
                message = client.get(100)
                entries = message['entries']

                for entry in entries:
                    if entry.entryType in [
                        EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
                        EntryProtocol_pb2.EntryType.TRANSACTIONEND
                    ]:
                        continue

                    row_change = EntryProtocol_pb2.RowChange()
                    row_change.MergeFromString(entry.storeValue)

                    event_type = row_change.eventType
                    header = entry.header

                    with self.lock:
                        target_conn = self._get_connection(self.target_config)
                        target_cursor = target_conn.cursor()

                        try:
                            for row in row_change.rowDatas:
                                if event_type == EntryProtocol_pb2.EventType.INSERT:
                                    columns = [col.name for col in row.afterColumns]
                                    values = [col.value for col in row.afterColumns]
                                    placeholders = ['%s'] * len(columns)

                                    insert_sql = f"""
                                        INSERT INTO {table_name} ({','.join(columns)})
                                        VALUES ({','.join(placeholders)})
                                    """
                                    target_cursor.execute(insert_sql, values)

                                elif event_type == EntryProtocol_pb2.EventType.UPDATE:
                                    set_clause = []
                                    where_clause = []
                                    params = []

                                    for col in row.afterColumns:
                                        set_clause.append(f"{col.name}=%s")
                                        params.append(col.value)
                                        if col.updated:
                                            where_clause.append(f"{col.name}=%s")
                                            params.append(row.beforeColumns[int(col.index)].value)

                                    update_sql = f"""
                                        UPDATE {table_name}
                                        SET {','.join(set_clause)}
                                        WHERE {' AND '.join(where_clause)}
                                    """
                                    target_cursor.execute(update_sql, params)

                                elif event_type == EntryProtocol_pb2.EventType.DELETE:
                                    where_clause = []
                                    params = []

                                    for col in row.beforeColumns:
                                        where_clause.append(f"{col.name}=%s")
                                        params.append(col.value)

                                    delete_sql = f"""
                                        DELETE FROM {table_name}
                                        WHERE {' AND '.join(where_clause)}
                                    """
                                    target_cursor.execute(delete_sql, params)

                            target_conn.commit()
                            print(f"已处理 {len(row_change.rowDatas)} 条变更记录")

                        except Exception as e:
                            target_conn.rollback()
                            print(f"处理变更失败: {str(e)}")
                        finally:
                            if target_conn and target_conn.is_connected():
                                target_conn.close()

                time.sleep(0.1)

        except KeyboardInterrupt:
            print("停止监听")
        except Exception as e:
            print(f"增量同步异常: {str(e)}")



@DFF.API('同步表任务', category='bigdata', tags=['bigdata', 'simple'], cache_result=300, timeout=3600)
def syncTable(tbl_name):
    '''
    两数相加

    输入参数 x, y 均为数字类型,返回结果为两者之和
    '''
    # 配置源库和目标库
    source_config = {
        'host': 'host.docker.internal',
        'port': 33306,
        'user': 'readonly_user',
        'password': '改我',
        'database': 'e01'
    }

    target_config = {
        'host': 'host.docker.internal',
        'port': 33306,
        'user': 'root',
        'password': '改我',
        'database': 'e02'
    }

    sync = MySQLSync(source_config, target_config)

    # 执行全量同步
    sync.full_sync(tbl_name)



# 测试函数不需要装饰器
def test_plus():
    syncTable('region')

http://www.dtcms.com/a/122509.html

相关文章:

  • 基于cartographer 1.0.0 不使用ros 使用激光雷达数据和IMU数据融合实现的建图
  • Redis的Spring客户端的使用
  • Android系统深度定制:源码级拦截adb install的完整解决方案
  • Windows 11 家庭中文版 安装docker desktop 无法开启自启动问题处理
  • matlab内置的git软件版本管理功能
  • CSS AI 通义灵码 VSCode插件安装与功能详解
  • MySQL学习笔记十四
  • 安徽京准:NTP网络时钟服务器功能及同步模式的介绍
  • oracle将varchar2 转为clob类型存储。 oracle不支持直接使用sql,将 varchar2 到clob的类型转换,需要下面操作
  • Java + WebAssembly 2025:如何用Rust优化高性能Web应用?
  • proteus OLED12864仿真
  • centos 安装python3.9.9
  • Jupyter Lab 无法启动 Kernel 问题排查与解决总结
  • 山东大学软件学院项目创新实训开发日志(8)之数据库建表
  • 从响应式编程到未来架构革命:解锁高并发时代的底层思维范式
  • MySQL日期时间类型详解:DATE、TIME和DATETIME的用法与区别
  • 【Ansible自动化运维】二、Playbook 深入探究:构建复杂自动化流程
  • idea插件:AICommit,智能生成Git提交信息
  • 停车场管理系统带万字文档基于Springboot+Vue的前后端分离停车场管理系统Springboot项目java项目java课程设计java毕业设计
  • Open Scene Graph 3D到2D坐标转换
  • 【数据库原理及安全实验】实验二 数据库的语句操作
  • 【软件测试】自动化测试框架Pytest + Selenium的使用
  • Ubuntu 24.04启用root账户
  • Hi168云平台部署Ansible学习环境
  • Mysql(继续更新)
  • linux入门三:Linux 编辑器
  • 查看手机在线状态,保障设备安全运行
  • js chrome 插件,下载微博视频
  • 树和图论【详细整理,简单易懂!】(C++实现 蓝桥杯速查)
  • Python | 第十三章 | 多态 | 魔术方法 | 静态方法 | 抽象类