Coze源码分析-资源库-删除插件-后端源码-数据存储与安全

发布于:2025-09-10 ⋅ 阅读:(104) ⋅ 点赞:(0)

7. 数据存储层

7.1 数据库表结构

plugin_draft 表设计

文件位置:helm/charts/opencoze/files/mysql/schema.sql

真实DDL结构

CREATE TABLE IF NOT EXISTS `plugin_draft` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `space_id` bigint NOT NULL COMMENT 'space id',
  `developer_id` bigint NOT NULL COMMENT 'developer id',
  `plugin_type` int NOT NULL COMMENT 'plugin type',
  `icon_uri` varchar(255) NOT NULL COMMENT 'icon uri',
  `server_url` varchar(255) NOT NULL COMMENT 'server url',
  `app_id` bigint NOT NULL COMMENT 'app id',
  `manifest` json NULL COMMENT 'plugin manifest',
  `openapi_doc` json NULL COMMENT 'openapi document',
  `created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
  `updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
  PRIMARY KEY (`id`),
  INDEX `idx_developer_id` (`developer_id`),
  INDEX `idx_space_id` (`space_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'plugin_draft';
tool_draft 表设计

真实DDL结构

CREATE TABLE IF NOT EXISTS `tool_draft` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `plugin_id` bigint NOT NULL COMMENT 'plugin id',
  `sub_url` varchar(255) NOT NULL COMMENT 'sub url',
  `method` varchar(10) NOT NULL COMMENT 'http method',
  `activated_status` int NOT NULL COMMENT 'activated status',
  `debug_status` int NOT NULL COMMENT 'debug status',
  `operation` json NULL COMMENT 'operation info',
  `created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
  `updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
  PRIMARY KEY (`id`),
  INDEX `idx_plugin_id` (`plugin_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'tool_draft';

表结构特点

  1. 关联设计:plugin_draft和tool_draft通过plugin_id关联,支持级联删除
  2. 空间隔离:通过 space_id 实现多租户数据隔离
  3. JSON存储manifestopenapi_doc使用JSON类型,支持复杂结构数据
  4. 状态管理:tool_draft表包含激活状态和调试状态字段
  5. 索引优化:在关键查询字段上建立索引,优化查询性能
  6. 字符集:使用 utf8mb4_0900_ai_ci 排序规则,支持完整的Unicode字符集

plugin_draft字段详解

  • id:自增主键,唯一标识每个插件
  • space_id:工作空间ID,实现租户级别的数据隔离
  • developer_id:开发者用户ID,用于权限控制和查询优化
  • plugin_type:插件类型标识
  • icon_uri:插件图标URI
  • server_url:插件服务器URL
  • app_id:关联的应用ID
  • manifest:插件清单文件,JSON格式
  • openapi_doc:OpenAPI文档,JSON格式
  • created_at/updated_at:毫秒级时间戳,记录创建和更新时间

tool_draft字段详解

  • id:自增主键,唯一标识每个工具
  • plugin_id:关联的插件ID,支持级联删除
  • sub_url:工具的子URL路径
  • method:HTTP方法(GET、POST等)
  • activated_status:激活状态
  • debug_status:调试状态
  • operation:操作信息,JSON格式
  • created_at/updated_at:毫秒级时间戳,记录创建和更新时间

7.2 ElasticSearch 索引架构

coze_resource 统一索引

索引设计理念
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource 索引中,通过 res_type 字段进行类型区分。

插件在索引中的映射

{
  "mappings": {
    "properties": {
      "res_id": {
        "type": "long",
        "description": "资源ID,对应plugin_draft.id"
      },
      "res_type": {
        "type": "integer", 
        "description": "资源类型,插件为1"
      },
      "name": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        },
        "description": "插件名称,支持全文搜索和精确匹配"
      },
      "owner_id": {
        "type": "long",
        "description": "所有者ID,对应developer_id"
      },
      "space_id": {
        "type": "long",
        "description": "工作空间ID"
      },
      "plugin_type": {
        "type": "integer",
        "description": "插件类型"
      },
      "server_url": {
        "type": "keyword",
        "description": "插件服务器URL"
      },
      "create_time": {
        "type": "long",
        "description": "创建时间戳(毫秒)"
      },
      "update_time": {
        "type": "long",
        "description": "更新时间戳(毫秒)"
      }
    }
  }
}

资源类型常量定义

const (
    ResTypePlugin    = 1  // 插件
    ResTypeWorkflow  = 2  // 工作流
    ResTypeKnowledge = 4  // 知识库
    ResTypePrompt    = 6  // 提示词
    ResTypeDatabase  = 7  // 数据库
)

7.3 数据同步机制

事件驱动的删除同步架构

删除同步流程

  1. 删除操作触发:插件删除操作触发删除领域事件
  2. 事件发布:通过事件总线发布 ResourceDomainEvent 删除事件
  3. 事件处理resourceHandlerImpl 监听并处理删除事件
  4. 索引清理:将删除操作同步到ElasticSearch,移除相关索引
  5. 关联清理:同时清理插件关联的工具索引数据

删除同步核心代码

// 资源删除事件处理器
type resourceHandlerImpl struct {
    esClient es.Client
    logger   logs.Logger
}

// 处理插件删除领域事件
func (r *resourceHandlerImpl) HandlePluginDeleteEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {
    if event.OpType != entity.Deleted {
        return fmt.Errorf("invalid operation type for delete handler: %v", event.OpType)
    }
    
    // 记录删除操作日志
    r.logger.InfoCtx(ctx, "Processing plugin delete event", 
        "plugin_id", event.ResID,
        "space_id", event.SpaceID,
        "operator_id", event.OperatorID)
    
    return r.deleteFromIndex(ctx, event.ResID)
}

// 从索引中删除插件
func (r *resourceHandlerImpl) deleteFromIndex(ctx context.Context, pluginID int64) error {
    indexName := "coze_resource"
    docID := conv.Int64ToStr(pluginID)
    
    // 执行索引删除
    err := r.esClient.Delete(ctx, indexName, docID)
    if err != nil {
        r.logger.ErrorCtx(ctx, "Failed to delete plugin from index", 
            "plugin_id", pluginID, "error", err)
        return fmt.Errorf("delete plugin from ES index failed: %w", err)
    }
    
    // 验证删除结果
    exists, checkErr := r.esClient.Exists(ctx, indexName, docID)
    if checkErr != nil {
        r.logger.WarnCtx(ctx, "Failed to verify deletion", 
            "plugin_id", pluginID, "error", checkErr)
    } else if exists {
        r.logger.ErrorCtx(ctx, "Plugin still exists in index after deletion", 
            "plugin_id", pluginID)
        return fmt.Errorf("plugin deletion verification failed")
    }
    
    r.logger.InfoCtx(ctx, "Successfully deleted plugin from index", 
        "plugin_id", pluginID)
    return nil
}

7.4 插件删除操作存储层设计原则

插件删除数据一致性保证
  1. 删除一致性:采用事件驱动模式,保证MySQL删除和ElasticSearch索引清理的最终一致性
  2. 删除幂等性:插件删除操作支持重试,避免重复删除导致的异常
  3. 删除事务边界:插件数据库删除操作和删除事件发布在同一事务中,保证原子性
  4. 删除验证:插件删除完成后验证数据确实被移除,确保删除操作的完整性
  5. 级联删除:确保插件删除时同步删除关联的工具数据,维护数据完整性
插件删除性能优化策略
  1. 删除索引优化:基于插件主键ID的删除操作,具有最佳性能
  2. 批量删除:支持批量删除插件操作,减少数据库和ES的操作次数
  3. 异步删除处理:插件删除事件处理采用异步模式,不阻塞删除主流程
  4. 删除缓存清理:及时清理插件相关缓存,避免删除后的脏数据
  5. 分批级联删除:工具数据采用分批删除策略,避免大量工具删除时的性能问题
插件删除操作扩展性考虑
  1. 分片删除:支持按 space_id 进行分片删除,提高大规模插件删除的效率
  2. 删除队列:使用消息队列处理插件删除事件,支持高并发删除场景
  3. 删除监控:独立的插件删除操作监控,及时发现删除异常
  4. 多表协调:协调plugin_draft、tool_draft、plugin、tool等多表的删除操作
插件删除安全保障
  1. 权限验证:严格的插件删除权限验证,确保只有授权用户可以删除
  2. 删除审计:完整的插件删除操作审计日志,支持删除行为追踪
  3. 删除确认:重要插件删除前的二次确认机制
  4. 删除恢复:虽然是物理删除,但通过备份支持插件数据恢复
  5. 级联验证:删除插件前验证关联的工具数据,确保级联删除的完整性
  6. 引用检查:删除前检查插件是否被工作流等其他资源引用

7.5 插件删除操作监控和运维

插件删除操作监控
// 插件删除操作监控指标
type PluginDeleteMetrics struct {
    PluginDeleteSuccessCount int64         // 插件删除成功次数
    PluginDeleteFailureCount int64         // 插件删除失败次数
    PluginDeleteLatency      time.Duration // 插件删除操作延迟
    LastPluginDeleteTime     time.Time     // 最后插件删除时间
    PluginIndexCleanupCount  int64         // 插件索引清理次数
    PluginDeleteEventCount   int64         // 插件删除事件处理次数
    ToolCascadeDeleteCount   int64         // 工具级联删除次数
    PluginDeleteQueueSize    int64         // 插件删除队列大小
}

// 插件删除监控指标收集
func (r *resourceHandlerImpl) collectPluginDeleteMetrics(ctx context.Context, startTime time.Time, pluginID int64, err error) {
    latency := time.Since(startTime)
    if err != nil {
        metrics.PluginDeleteFailureCount++
        log.ErrorCtx(ctx, "plugin delete failed", 
            "plugin_id", pluginID, "error", err, "latency", latency)
    } else {
        metrics.PluginDeleteSuccessCount++
        metrics.PluginDeleteLatency = latency
        metrics.LastPluginDeleteTime = time.Now()
        log.InfoCtx(ctx, "plugin delete succeeded", 
            "plugin_id", pluginID, "latency", latency)
    }
}

// 插件删除操作健康检查
func (r *resourceHandlerImpl) pluginDeleteHealthCheck(ctx context.Context) error {
    // 检查数据库连接
    if err := r.db.Ping(); err != nil {
        return fmt.Errorf("database connection failed: %w", err)
    }
    
    // 检查ES连接
    if _, err := r.esClient.Ping(ctx); err != nil {
        return fmt.Errorf("elasticsearch connection failed: %w", err)
    }
    
    // 检查插件删除队列状态
    if queueSize := r.getPluginDeleteQueueSize(); queueSize > 1000 {
        return fmt.Errorf("plugin delete queue size too large: %d", queueSize)
    }
    
    // 检查级联删除状态
    if cascadeErrors := r.getCascadeDeleteErrors(); len(cascadeErrors) > 10 {
        return fmt.Errorf("too many cascade delete errors: %d", len(cascadeErrors))
    }
    
    return nil
}
插件删除数据质量保证
  1. 删除一致性检查:定期验证MySQL和ElasticSearch中插件删除数据的一致性
  2. 删除完整性验证:确保插件删除操作完全清理了相关数据和索引
  3. 级联删除验证:验证插件删除时工具数据的级联删除完整性
  4. 删除异常恢复:提供插件删除失败的重试和修复机制
  5. 删除性能监控:监控插件删除操作性能,及时发现和解决性能问题
  6. 删除审计追踪:完整记录插件删除操作的执行过程和结果
  7. 多表一致性:确保plugin_draft、tool_draft、plugin、tool等多表删除的一致性

8. 插件删除安全和权限验证机制

8.1 插件删除身份认证

JWT Token验证

  • 删除插件的所有API请求都需要携带有效的JWT Token
  • Token包含用户ID、工作空间权限等关键信息
  • 通过中间件统一验证Token的有效性和完整性
// 插件删除身份验证中间件
func PluginDeleteAuthMiddleware() app.HandlerFunc {
    return func(c context.Context, ctx *app.RequestContext) {
        token := ctx.GetHeader("Authorization")
        if token == nil {
            ctx.JSON(401, gin.H{"error": "删除插件需要登录认证"})
            ctx.Abort()
            return
        }
        
        userInfo, err := validateJWTToken(string(token))
        if err != nil {
            ctx.JSON(401, gin.H{"error": "Token无效,无法删除插件"})
            ctx.Abort()
            return
        }
        
        // 验证用户是否有删除插件的权限
        if !userInfo.HasPluginDeletePermission {
            ctx.JSON(403, gin.H{"error": "用户无删除插件权限"})
            ctx.Abort()
            return
        }
        
        ctx.Set("user_id", userInfo.UserID)
        ctx.Set("space_id", userInfo.SpaceID)
        ctx.Set("operator_id", userInfo.UserID)
        ctx.Next()
    }
}

8.2 插件删除工作空间权限控制

空间隔离机制

  • 每个用户只能删除其所属工作空间中的插件
  • 通过 space_id 字段实现插件删除权限隔离
  • 在插件删除操作中强制验证空间权限
// 插件删除工作空间权限验证
func (s *PluginApplicationService) validatePluginDeleteSpacePermission(ctx context.Context, pluginID int64) error {
    userSpaceID := ctx.Value("space_id").(int64)
    
    // 获取插件信息以验证空间权限
    plugin, err := s.DomainSVC.GetPluginResource(ctx, pluginID)
    if err != nil {
        return fmt.Errorf("获取插件信息失败: %w", err)
    }
    
    if plugin.SpaceID != userSpaceID {
        return errors.New("无权限删除该工作空间的插件")
    }
    
    // 检查工作空间是否允许删除插件
    spaceConfig, err := s.spaceService.GetSpaceConfig(ctx, userSpaceID)
    if err != nil {
        return err
    }
    
    if !spaceConfig.AllowPluginDeletion {
        return errors.New("该工作空间不允许删除插件")
    }
    
    return nil
}

8.3 插件删除资源级权限验证

插件删除所有权验证

  • 严格验证用户是否为插件的创建者
  • 只有创建者才能删除自己创建的插件
  • 通过 creator_id 进行严格的所有权判断
// 插件删除权限验证
func (s *PluginApplicationService) validatePluginDeletePermission(ctx context.Context, pluginID int64) error {
    userID := ctx.Value("user_id").(int64)
    
    // 获取插件信息
    plugin, err := s.DomainSVC.GetPluginResource(ctx, pluginID)
    if err != nil {
        if errors.Is(err, gorm.ErrRecordNotFound) {
            return errorx.New(errno.ErrPluginNotFoundCode, errorx.KV("plugin_id", pluginID))
        }
        return fmt.Errorf("获取插件信息失败: %w", err)
    }
    
    // 严格检查是否为创建者(只有创建者可以删除)
    if plugin.CreatorID != userID {
        return errorx.New(errno.ErrPluginPermissionDeniedCode, 
            errorx.KV("msg", "只有创建者可以删除插件"),
            errorx.KV("plugin_id", pluginID),
            errorx.KV("creator_id", plugin.CreatorID),
            errorx.KV("user_id", userID))
    }
    
    // 检查插件状态是否允许删除
    if plugin.Status == entity.PluginStatusDeleted {
        return errorx.New(errno.ErrPluginAlreadyDeletedCode, 
            errorx.KV("plugin_id", pluginID))
    }
    
    // 检查是否有其他依赖关系阻止删除
    hasReferences, err := s.checkPluginReferences(ctx, pluginID)
    if err != nil {
        return fmt.Errorf("检查插件引用关系失败: %w", err)
    }
    
    if hasReferences {
        return errorx.New(errno.ErrPluginHasReferencesCode, 
            errorx.KV("plugin_id", pluginID),
            errorx.KV("msg", "插件正在被其他资源引用,无法删除"))
    }
    
    return nil
}

// 检查插件引用关系
func (s *PluginApplicationService) checkPluginReferences(ctx context.Context, pluginID int64) (bool, error) {
    // 检查是否被工作流引用
    workflowRefs, err := s.workflowService.GetPluginReferences(ctx, pluginID)
    if err != nil {
        return false, err
    }
    
    if len(workflowRefs) > 0 {
        return true, nil
    }
    
    // 检查是否被其他插件引用
    pluginRefs, err := s.DomainSVC.GetPluginReferences(ctx, pluginID)
    if err != nil {
        return false, err
    }
    
    return len(pluginRefs) > 0, nil
}

8.4 插件删除API访问控制

删除请求频率限制

  • 实现基于用户的插件删除频率限制
  • 防止恶意批量删除插件
  • 支持不同用户等级的差异化删除限流策略

删除操作安全验证

  • 严格验证删除请求的合法性
  • 防止误删除和恶意删除攻击
  • 使用多重安全检查机制
  • 级联删除安全验证,确保关联工具数据的完整性
// 插件删除参数验证
func validatePluginDeleteRequest(req *plugin.DeletePluginRequest) error {
    if req.PluginID <= 0 {
        return errors.New("无效的插件ID")
    }
    
    // 验证删除确认标识
    if !req.ConfirmDelete {
        return errors.New("删除操作需要确认")
    }
    
    // 验证删除原因(可选)
    if req.DeleteReason != "" && len(req.DeleteReason) > 500 {
        return errors.New("删除原因长度不能超过500字符")
    }
    
    // 验证是否强制删除(忽略引用检查)
    if req.ForceDelete && !req.AdminConfirm {
        return errors.New("强制删除需要管理员确认")
    }
    
    return nil
}

// 插件删除操作安全检查
func (s *PluginApplicationService) validatePluginDeleteSafety(ctx context.Context, pluginID int64) error {
    userID := ctx.Value("user_id").(int64)
    
    // 检查用户插件删除频率限制
    deleteCount, err := s.getUserPluginDeleteCount(ctx, userID, time.Now().Add(-24*time.Hour))
    if err != nil {
        return fmt.Errorf("检查插件删除频率失败: %w", err)
    }
    
    if deleteCount >= 20 { // 24小时内最多删除20个插件
        return errorx.New(errno.ErrPluginDeleteRateLimitCode, 
            errorx.KV("user_id", userID),
            errorx.KV("delete_count", deleteCount))
    }
    
    // 检查插件是否为重要资源
    isImportant, err := s.checkPluginImportance(ctx, pluginID)
    if err != nil {
        return fmt.Errorf("检查插件重要性失败: %w", err)
    }
    
    if isImportant {
        // 重要插件需要额外确认
        return s.validateImportantPluginDeletion(ctx, pluginID)
    }
    
    // 检查插件工具数量
    toolCount, err := s.getPluginToolCount(ctx, pluginID)
    if err != nil {
        return fmt.Errorf("检查插件工具数量失败: %w", err)
    }
    
    if toolCount > 10 {
        // 工具数量较多的插件需要特殊确认
        return s.validateLargePluginDeletion(ctx, pluginID, toolCount)
    }
    
    return nil
}

// 检查插件重要性
func (s *PluginApplicationService) checkPluginImportance(ctx context.Context, pluginID int64) (bool, error) {
    // 检查插件使用频率
    usageCount, err := s.getPluginUsageCount(ctx, pluginID, time.Now().Add(-30*24*time.Hour))
    if err != nil {
        return false, err
    }
    
    // 使用次数超过50次认为是重要插件
    if usageCount > 50 {
        return true, nil
    }
    
    // 检查是否被标记为重要
    plugin, err := s.DomainSVC.GetPluginResource(ctx, pluginID)
    if err != nil {
        return false, err
    }
    
    // 检查插件是否已发布
    if plugin.Status == entity.PluginStatusPublished {
        return true, nil
    }
    
    return plugin.IsImportant, nil
}

// 重要插件删除验证
func (s *PluginApplicationService) validateImportantPluginDeletion(ctx context.Context, pluginID int64) error {
    // 重要插件删除需要管理员权限或特殊确认
    userID := ctx.Value("user_id").(int64)
    
    isAdmin, err := s.userService.IsSpaceAdmin(ctx, userID)
    if err != nil {
        return err
    }
    
    if !isAdmin {
        return errorx.New(errno.ErrImportantPluginDeleteCode, 
            errorx.KV("plugin_id", pluginID),
            errorx.KV("msg", "重要插件删除需要管理员权限"))
    }
    
    return nil
}

// 大型插件删除验证
func (s *PluginApplicationService) validateLargePluginDeletion(ctx context.Context, pluginID int64, toolCount int) error {
    userID := ctx.Value("user_id").(int64)
    
    // 记录大型插件删除操作
    logs.CtxWarnf(ctx, "User attempting to delete large plugin: userID=%d, pluginID=%d, toolCount=%d", 
        userID, pluginID, toolCount)
    
    // 检查是否有足够权限删除大型插件
    hasPermission, err := s.userService.HasLargePluginDeletePermission(ctx, userID)
    if err != nil {
        return err
    }
    
    if !hasPermission {
        return errorx.New(errno.ErrLargePluginDeleteCode, 
            errorx.KV("plugin_id", pluginID),
            errorx.KV("tool_count", toolCount),
            errorx.KV("msg", "删除大型插件需要特殊权限"))
    }
    
    return nil
}

微信公众号

今日签到

点亮在社区的每一天
去签到