[n8n] 工作流数据库管理SQLite | 数据访问层-REST API服务

发布于:2025-08-29 ⋅ 阅读:(15) ⋅ 点赞:(0)

第3章:工作流数据库管理

在前两章中,我们完成了工作流文件的扫描与分析,现在需要将这些结构化数据高效存储。

工作流数据库管理模块如同智能图书馆系统,为海量工作流提供组织化存储与快速检索能力

核心架构

可以将数据库理解为,别人帮我们设计好了的存储数据的数据结构,我们调用api即可

前文传送:

[自动化Adapt] 父子事件| 冗余过滤 | SQLite | SQLAlchemy | 会话工厂 | Alembic

[Meetily后端框架] 多模型-Pydantic AI 代理-统一抽象 | SQLite管理

技术选型

采用SQLite轻量级数据库,通过单文件实现完整数据库功能,具备以下特性:

  • 零配置部署
  • 事务支持(ACID兼容)
  • 标准SQL语法
  • 嵌入式运行无需服务

数据模型设计

数据库表workflows包含关键字段:

CREATE TABLE workflows 
(
    id INTEGER PRIMARY KEY,
    filename TEXT UNIQUE NOT NULL,  -- 原始文件名
    name TEXT NOT NULL,             -- 友好名称
    trigger_type TEXT,              -- 触发器类型
    integrations TEXT,              -- 集成服务列表(JSON格式)
    node_count INTEGER,             -- 节点数量
    file_hash TEXT,                 -- 文件哈希值
    analyzed_at TIMESTAMP           -- 分析时间
)

核心功能实现

1. 数据库初始化

def init_database():
    conn = sqlite3.connect('workflows.db')
    conn.execute("PRAGMA journal_mode=WAL")  # 启用预写式日志
    conn.execute("""
        CREATE TABLE IF NOT EXISTS workflows (...)
    """)
    return conn

2. 数据更新机制

采用INSERT OR REPLACE实现智能更新:

def upsert_workflow(conn, workflow):
    conn.execute("""
        INSERT OR REPLACE INTO workflows VALUES (
            ?,?,?,?,?,?,?,?
        )
    """, [
        workflow['filename'],
        workflow['name'],
        workflow['trigger_type'],
        json.dumps(workflow['integrations']),  # 列表转JSON
        workflow['node_count'],
        workflow['file_hash'],
        datetime.now()
    ])

3. 复合查询示例

支持多条件检索

def search_workflows(trigger_type=None, min_nodes=None):
    query = "SELECT * FROM workflows WHERE 1=1"
    params = []
    
    if trigger_type:
        query += " AND trigger_type = ?"
        params.append(trigger_type)
        
    if min_nodes:
        query += " AND node_count >= ?"
        params.append(min_nodes)
        
    return conn.execute(query, params).fetchall()

性能优化

  1. 索引加速:为高频查询字段创建索引
CREATE INDEX idx_trigger ON workflows(trigger_type);
CREATE INDEX idx_nodes ON workflows(node_count);
  1. 批量事务:减少IO操作
with conn:
    for workflow in workflow_batch:
        upsert_workflow(conn, workflow)
  1. 内存缓存:热点数据缓存策略
@lru_cache(maxsize=100)
def get_workflow(filename):
    return conn.execute("SELECT * FROM workflows WHERE filename=?", [filename]).fetchone()

总结

工作流数据库管理系统通过:

  1. 轻量级SQLite存储
  2. 智能的Upsert机制
  3. 复合查询支持
  4. 多维度性能优化

为工作流管理系统构建了坚实的数据基础。下一章将介绍如何实现高效检索:全文检索(FTS)集成


第4章:REST API服务

在前三章中,我们构建了工作流扫描、分析和存储系统。

本章将介绍REST API服务,它如同智能数据网关,为外部应用提供标准化访问接口。

核心架构

技术栈

  • Express.js:轻量级Node.js Web框架
  • RESTful设计:符合资源化API设计规范
  • JSON数据格式:通用结构化数据交换格式

接口设计

端点 方法 描述 示例
/api/stats GET 获取全局统计信息 curl localhost:8000/api/stats
/api/workflows GET 工作流搜索 curl "localhost:8000/api/workflows?q=gmail"
/api/workflows/:file GET 获取特定工作流详情 curl localhost:8000/api/workflows/MyFlow.json

核心功能实现

1. 服务初始化

const express = require('express');
const app = express();
app.use(express.json()); // 启用JSON解析

// 数据库连接
const db = require('./database');
db.initDatabase().then(() => {
    app.listen(8000, () => console.log('API服务已启动'));
});

2. 统计接口

app.get('/api/stats', async (req, res) => {
    const stats = await db.getStats();
    res.json({
        total: stats.total,
        active: stats.active,
        byTrigger: stats.byTrigger
    });
});

3. 搜索接口

app.get('/api/workflows', async (req, res) => 
{
    const { q, trigger, page=1, size=20 } = req.query;
    const results = await db.searchWorkflows(q, trigger, page, size);
    res.json(
    {
        data: results.workflows,
        pagination: 
        {
            page: parseInt(page),
            pageSize: parseInt(size),
            total: results.total
        }
    });
});

4. 文件下载

const path = require('path');
const fs = require('fs');

app.get('/api/workflows/:file/download', (req, res) => 
{
    const filePath = path.join('workflows', req.params.file);
    if (fs.existsSync(filePath)) 
    {
        res.download(filePath); // 自动处理文件流
    } 
    else 
    {
        res.status(404).json({ error: '文件不存在' });
    }
});

高级特性

1. 请求验证中间件

function validateSearch(req, res, next) 
{
    if (req.query.page && isNaN(req.query.page)) 
    {
        return res.status(400).json({ error: '页码参数无效' });
    }
    next(); // 验证通过
}

app.get('/api/workflows', validateSearch, (req, res) => {
    // 处理逻辑
});

2. 响应压缩

const compression = require('compression');
app.use(compression()); // 自动压缩JSON响应

3. 速率限制

const rateLimit = require('express-rate-limit');
const limiter = rateLimit({
    windowMs: 15 * 60 * 1000, // 15分钟
    max: 100 // 每个IP限制100次请求
});
app.use('/api/', limiter);

性能优化

  1. 连接池管理复用数据库连接
  2. 缓存控制:添加ETagCache-Control
  3. 异步处理:非阻塞IO操作
  4. 集群模式:利用Node.js集群模块

总结

REST API服务通过:

  1. 标准化的端点设计
  2. 健壮的参数处理
  3. 高效的数据传输
  4. 完善的安全措施

为工作流管理系统提供可靠的数据访问层。下一章将介绍增强型搜索能力全文检索集成


网站公告

今日签到

点亮在社区的每一天
去签到