[n8n] 工作流数据库管理SQLite | 数据访问层-REST API服务
第3章:工作流数据库管理
在前两章中,我们完成了工作流文件的扫描与分析,现在需要将这些结构化数据高效存储。
工作流数据库管理模块如同智能图书馆系统,为海量工作流提供组织化存储与快速检索能力。
核心架构
可以将数据库理解为,别人帮我们设计好了的存储数据的数据结构,我们调用api即可
前文传送:
[自动化Adapt] 父子事件| 冗余过滤 | SQLite | SQLAlchemy | 会话工厂 | Alembic
[Meetily后端框架] 多模型-Pydantic AI 代理-统一抽象 | SQLite管理
技术选型
采用SQLite轻量级数据库,通过单文件实现完整数据库功能,具备以下特性:
- 零配置部署
- 事务支持(ACID兼容)
- 标准SQL语法
- 嵌入式运行无需服务
数据模型设计
数据库表workflows
包含关键字段:
CREATE TABLE workflows
(id INTEGER PRIMARY KEY,filename TEXT UNIQUE NOT NULL, -- 原始文件名name TEXT NOT NULL, -- 友好名称trigger_type TEXT, -- 触发器类型integrations TEXT, -- 集成服务列表(JSON格式)node_count INTEGER, -- 节点数量file_hash TEXT, -- 文件哈希值analyzed_at TIMESTAMP -- 分析时间
)
核心功能实现
1. 数据库初始化
def init_database():conn = sqlite3.connect('workflows.db')conn.execute("PRAGMA journal_mode=WAL") # 启用预写式日志conn.execute("""CREATE TABLE IF NOT EXISTS workflows (...)""")return conn
2. 数据更新机制
采用INSERT OR REPLACE
实现智能更新:
def upsert_workflow(conn, workflow):conn.execute("""INSERT OR REPLACE INTO workflows VALUES (?,?,?,?,?,?,?,?)""", [workflow['filename'],workflow['name'],workflow['trigger_type'],json.dumps(workflow['integrations']), # 列表转JSONworkflow['node_count'],workflow['file_hash'],datetime.now()])
3. 复合查询示例
支持多条件检索:
def search_workflows(trigger_type=None, min_nodes=None):query = "SELECT * FROM workflows WHERE 1=1"params = []if trigger_type:query += " AND trigger_type = ?"params.append(trigger_type)if min_nodes:query += " AND node_count >= ?"params.append(min_nodes)return conn.execute(query, params).fetchall()
性能优化
- 索引加速:为高频查询字段创建索引
CREATE INDEX idx_trigger ON workflows(trigger_type);
CREATE INDEX idx_nodes ON workflows(node_count);
- 批量事务:减少IO操作
with conn:for workflow in workflow_batch:upsert_workflow(conn, workflow)
- 内存缓存:热点数据缓存策略
@lru_cache(maxsize=100)
def get_workflow(filename):return conn.execute("SELECT * FROM workflows WHERE filename=?", [filename]).fetchone()
总结
工作流数据库管理系统通过:
- 轻量级SQLite存储
- 智能的Upsert机制
- 复合查询支持
- 多维度性能优化
为工作流管理系统构建了坚实的数据基础。下一章将介绍如何实现高效检索:全文检索(FTS)集成
第4章:REST API服务
在前三章中,我们构建了工作流扫描、分析和存储系统。
本章将介绍REST API服务,它如同智能数据网关,为外部应用提供标准化访问接口。
核心架构
技术栈
- Express.js:轻量级Node.js Web框架
- RESTful设计:符合资源化API设计规范
- JSON数据格式:通用结构化数据交换格式
接口设计
端点 | 方法 | 描述 | 示例 |
---|---|---|---|
/api/stats | GET | 获取全局统计信息 | curl localhost:8000/api/stats |
/api/workflows | GET | 工作流搜索 | curl "localhost:8000/api/workflows?q=gmail" |
/api/workflows/:file | GET | 获取特定工作流详情 | curl localhost:8000/api/workflows/MyFlow.json |
核心功能实现
1. 服务初始化
const express = require('express');
const app = express();
app.use(express.json()); // 启用JSON解析// 数据库连接
const db = require('./database');
db.initDatabase().then(() => {app.listen(8000, () => console.log('API服务已启动'));
});
2. 统计接口
app.get('/api/stats', async (req, res) => {const stats = await db.getStats();res.json({total: stats.total,active: stats.active,byTrigger: stats.byTrigger});
});
3. 搜索接口
app.get('/api/workflows', async (req, res) =>
{const { q, trigger, page=1, size=20 } = req.query;const results = await db.searchWorkflows(q, trigger, page, size);res.json({data: results.workflows,pagination: {page: parseInt(page),pageSize: parseInt(size),total: results.total}});
});
4. 文件下载
const path = require('path');
const fs = require('fs');app.get('/api/workflows/:file/download', (req, res) =>
{const filePath = path.join('workflows', req.params.file);if (fs.existsSync(filePath)) {res.download(filePath); // 自动处理文件流} else {res.status(404).json({ error: '文件不存在' });}
});
高级特性
1. 请求验证中间件
function validateSearch(req, res, next)
{if (req.query.page && isNaN(req.query.page)) {return res.status(400).json({ error: '页码参数无效' });}next(); // 验证通过
}app.get('/api/workflows', validateSearch, (req, res) => {// 处理逻辑
});
2. 响应压缩
const compression = require('compression');
app.use(compression()); // 自动压缩JSON响应
3. 速率限制
const rateLimit = require('express-rate-limit');
const limiter = rateLimit({windowMs: 15 * 60 * 1000, // 15分钟max: 100 // 每个IP限制100次请求
});
app.use('/api/', limiter);
性能优化
- 连接池管理:
复用数据库连接
- 缓存控制:添加
ETag
和Cache-Control
头 - 异步处理:非阻塞IO操作
- 集群模式:利用
Node.js集群
模块
总结
REST API服务通过:
- 标准化的端点设计
- 健壮的参数处理
- 高效的数据传输
- 完善的安全措施
为工作流管理系统提供可靠的数据访问层
。下一章将介绍增强型搜索能力:全文检索集成