前言
本文将深入分析Coze Studio项目中用户登录后点击"资源库"功能的后端实现,通过源码解读来理解整个资源库管理系统的架构设计和技术实现。
项目架构概览
整体架构设计
Coze Studio后端采用了经典的分层架构模式,将资源库功能划分为以下几个核心层次:
┌─────────────────────────────────────────────────────────────┐
│ IDL接口定义层 │
│ ┌─────────────┐ ┌───────────────── ┐ ┌─────────────┐ │
│ │ base.thrift │ │openapiauth.thrift│ │ api.thrift │ │
│ └─────────────┘ └───────────────── ┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ API网关层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Handler │ │ Router │ │ Middleware │ │
│ │ 处理器 │ │ 路由 │ │ 中间件 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SearchApplicationService │ │
│ │ LibraryResourceList │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 领域服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Search Plugin workflow │ │
│ │ Knowledge prompt database │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 数据访问层 │
│ ┌─────────────────────────────┐ ┌─────────────────────────┐
│ │ pluginDraftDAO │ │ RepositoryImpl │
│ │ KnowledgeDAO │ │ WorkflowDAO │
│ │ PromptDAO │ │ OnlineImpl │
│ └─────────────────────────────┘ └─────────────────────────┘
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ ┌─ ─ ─── ─── ── ─ ─ ─┐ │
│ │ gorm.DB │ │
│ │ es.Client │ │
│ └── ─ ── ─── ── ── ─ ┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 存储服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ MySQL数据库 │ │
│ │ ElasticSearch数据库 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
1. IDL接口定义层
IDL基础类型定义(base.thrift)
文件位置:idl/base.thrift
核心代码:
namespace py base
namespace go base
namespace java com.bytedance.thrift.base
struct TrafficEnv {
1: bool Open = false,
2: string Env = "" ,
}
struct Base {
1: string LogID = "",
2: string Caller = "",
3: string Addr = "",
4: string Client = "",
5: optional TrafficEnv TrafficEnv ,
6: optional map<string,string> Extra ,
}
struct BaseResp {
1: string StatusMessage = "",
2: i32 StatusCode = 0 ,
3: optional map<string,string> Extra ,
}
文件作用:
定义了Coze Studio项目中所有接口的基础数据结构,作为其他IDL文件的依赖基础。
资源库查询接口定义(resource.thrift)
文件位置:idl/resource/resource.thrift
当Coze用户登录平台后点击"资源库"时,前端会调用LibraryResourceList
接口来获取资源库中的资源列表。该接口支持多种筛选条件,包括资源类型、发布状态、创建者等。
LibraryResourceList接口定义
LibraryResourceListResponse LibraryResourceList(1: LibraryResourceListRequest request)
(api.post='/api/plugin_api/library_resource_list', api.category="resource", api.gen_path="resource", agw.preserve_base="true")
请求结构体定义
struct LibraryResourceListRequest {
1 : optional i32 user_filter , // 是否为当前用户创建,0-不过滤,1-当前用户
2 : optional list<resource_common.ResType> res_type_filter , // 资源类型过滤 [4,1] 0表示不过滤
3 : optional string name , // 资源名称
4 : optional resource_common.PublishStatus publish_status_filter, // 发布状态,0-不过滤,1-未发布,2-已发布
5 : required i64 space_id (agw.js_conv="str", api.js_conv="true"), // 用户空间ID
7 : optional i32 size , // 单次读取数据条数,默认10,最大100
9 : optional string cursor , // 游标,用于分页,默认0,首次请求可不传,后续请求需要带上次返回的cursor
10 : optional list<string> search_keys , // 指定自定义搜索使用的字段,不填默认只name匹配
11 : optional bool is_get_imageflow , // res_type_filter为[2 workflow]时是否需要返回图像审核
255: base.Base Base ,
}
响应结构体定义
struct LibraryResourceListResponse {
1 : i64 code ,
2 : string msg ,
3 : list<resource_common.ResourceInfo> resource_list, // 资源列表
5 : optional string cursor , // 下次请求的游标
6 : bool has_more , // 是否还有数据可拉取
255: required base.BaseResp BaseResp ,
}
核心数据结构 - ResourceInfo
struct ResourceInfo{
1 : optional i64 ResID , // 资源ID
2 : optional ResType ResType , // 资源类型
3 : optional i32 ResSubType , // 资源子类型
4 : optional string Name , // 资源名称
5 : optional string Desc , // 资源描述
6 : optional string Icon , // 资源图标,完整url
7 : optional i64 CreatorID , // 资源创建者ID
8 : optional string CreatorAvatar , // 创建者头像
9 : optional string CreatorName , // 创建者名称
10: optional string UserName , // 创建者用户名
11: optional PublishStatus PublishStatus , // 资源发布状态
12: optional i32 BizResStatus , // 业务资源状态
13: optional bool CollaborationEnable , // 是否启用多人编辑
14: optional i64 EditTime , // 最后编辑时间
15: optional i64 SpaceID , // 资源所属空间ID
16: optional map<string,string> BizExtend , // 业务扩展信息
17: optional list<ResourceAction> Actions , // 可执行的操作按钮
18: optional bool DetailDisable , // 是否禁止进入详情页
19: optional bool DelFlag , // 删除标识
}
接口作用说明:
- 主要功能: 获取用户空间下的资源库资源列表
- 支持的资源类型: Plugin(插件)、Workflow(工作流)、Knowledge(知识库)、Database(数据库)、UI组件、Prompt、Variable(变量)、Voice(语音)、Imageflow(图像流)
- 筛选能力: 支持按资源类型、发布状态、创建者、名称等多维度筛选
- 分页支持: 使用cursor游标方式实现高效分页
- 操作权限: 返回每个资源可执行的操作列表(复制、删除、编辑等)
IDL主API服务聚合文件(api.thrift)
文件位置:idl/api.thrift
该文件是整个Coze项目的API服务聚合入口点,负责将所有业务模块的IDL服务定义统一聚合,为代码生成工具提供完整的服务接口定义。
核心代码:
include "./resource/resource.thrift"
namespace go coze
// 资源库核心服务聚合
service ResourceService extends resource.ResourceService {}
// 其他业务服务聚合
资源库接口聚合说明:
通过 service ResourceService extends resource.ResourceService {}
聚合定义,api.thrift将resource.thrift中定义的所有资源库相关接口统一暴露,包括:
- LibraryResourceList:获取资源库资源列表
- LibraryResourceDetail:获取资源库资源详情
- LibraryResourceCreate:创建资源库资源
- LibraryResourceUpdate:更新资源库资源
- LibraryResourceDelete:删除资源库资源
2. API网关层
接口定义-resource.go文件详细分析
文件位置:backend\api\model\resource\resource.go
核心代码:
type ResourceService interface {
LibraryResourceList(ctx context.Context, request *LibraryResourceListRequest) (r *LibraryResourceListResponse, err error)
}
LibraryResourceList接口模型定义
当Coze用户登录平台后点击"资源库"时,前端会调用LibraryResourceList
接口来获取资源库中的资源列表。该接口支持多种筛选条件,包括资源类型、发布状态、创建者等。
LibraryResourceListRequest 请求结构体:
type LibraryResourceListRequest struct {
// Whether created by the current user, 0 - unfiltered, 1 - current user
UserFilter *int32 `thrift:"user_filter,1,optional" form:"user_filter" json:"user_filter,omitempty" query:"user_filter"`
// [4,1] 0 means do not filter
ResTypeFilter []common.ResType `thrift:"res_type_filter,2,optional" form:"res_type_filter" json:"res_type_filter,omitempty" query:"res_type_filter"`
// name
Name *string `thrift:"name,3,optional" form:"name" json:"name,omitempty" query:"name"`
// Published status, 0 - unfiltered, 1 - unpublished, 2 - published
PublishStatusFilter *common.PublishStatus `thrift:"publish_status_filter,4,optional" form:"publish_status_filter" json:"publish_status_filter,omitempty" query:"publish_status_filter"`
// User's space ID
SpaceID int64 `thrift:"space_id,5,required" form:"space_id,required" json:"space_id,string,required" query:"space_id,required"`
// The number of data bars read at one time, the default is 10, and the maximum is 100.
Size *int32 `thrift:"size,7,optional" form:"size" json:"size,omitempty" query:"size"`
// Cursor, used for paging, default 0, the first request can not be passed, subsequent requests need to bring the last returned cursor
Cursor *string `thrift:"cursor,9,optional" form:"cursor" json:"cursor,omitempty" query:"cursor"`
// The field used to specify the custom search, do not fill in the default only name matches, eg [] string {name, custom} matches the name and custom fields full_text
SearchKeys []string `thrift:"search_keys,10,optional" form:"search_keys" json:"search_keys,omitempty" query:"search_keys"`
// Do you need to return image review when the res_type_filter is [2 workflow]
IsGetImageflow *bool `thrift:"is_get_imageflow,11,optional" form:"is_get_imageflow" json:"is_get_imageflow,omitempty" query:"is_get_imageflow"`
Base *base.Base `thrift:"Base,255" form:"Base" json:"Base" query:"Base"`
}
LibraryResourceListResponse 响应结构体:
type LibraryResourceListResponse struct {
Code int64 `thrift:"code,1" form:"code" json:"code" query:"code"`
Msg string `thrift:"msg,2" form:"msg" json:"msg" query:"msg"`
ResourceList []*common.ResourceInfo `thrift:"resource_list,3" form:"resource_list" json:"resource_list" query:"resource_list"`
// Cursor, the cursor for the next request
Cursor *string `thrift:"cursor,5,optional" form:"cursor" json:"cursor,omitempty" query:"cursor"`
// Is there still data to be pulled?
HasMore bool `thrift:"has_more,6" form:"has_more" json:"has_more" query:"has_more"`
BaseResp *base.BaseResp `thrift:"BaseResp,255,required" form:"BaseResp,required" json:"BaseResp,required" query:"BaseResp,required"`
}
接口功能说明
业务功能:
- 资源库列表获取:获取用户空间内的所有资源库资源,支持分页查询
- 多维度筛选:支持按资源类型、发布状态、创建者、名称等条件筛选
- 搜索功能:支持按名称和自定义字段进行全文搜索
- 分页支持:使用游标分页机制,支持高效的大数据量分页
- 权限控制:基于用户身份和空间权限进行访问控制
技术特性:
- 类型安全:使用强类型定义确保数据一致性
- 多格式支持:支持thrift、form、json、query等多种序列化格式
- 可选字段:使用optional标记支持向后兼容
- 统一响应:遵循统一的响应格式规范
- 游标分页:使用cursor机制实现高效分页,避免深度分页性能问题
文件作用:
由thriftgo自动生成的Go代码文件,基于IDL定义生成对应的Go结构体和接口,提供类型安全的API模型。该文件实现了完整的Thrift RPC通信机制,包括客户端调用、服务端处理、序列化/反序列化等功能,确保了分布式服务间的可靠通信。
资源库接口处理器实现
文件位置:backend/api/handler/coze/resource_service.go
该文件包含了用户登录后点击资源库功能的所有核心API接口处理器,主要负责处理资源库资源的查询、复制、管理等功能。
核心代码:
// LibraryResourceList .
// @router /api/plugin_api/library_resource_list [POST]
func LibraryResourceList(ctx context.Context, c *app.RequestContext) {
var err error
var req resource.LibraryResourceListRequest
err = c.BindAndValidate(&req)
if err != nil {
invalidParamRequestResponse(c, err.Error())
return
}
if req.SpaceID <= 0 {
invalidParamRequestResponse(c, "space_id is invalid")
return
}
if req.GetSize() > 100 {
invalidParamRequestResponse(c, "size is too large")
return
}
resp, err := search.SearchSVC.LibraryResourceList(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
c.JSON(consts.StatusOK, resp)
}
实现功能:
- 参数验证:验证请求参数的有效性,包括SpaceID和Size的范围检查
- 请求绑定:使用Hertz框架的BindAndValidate方法自动绑定和验证请求参数
- 业务调用:调用搜索服务层的LibraryResourceList方法获取资源列表
- 错误处理:统一的错误处理机制,包括参数错误和内部服务错误
- 响应返回:以JSON格式返回标准化的响应结果
参数校验逻辑:
- SpaceID校验:必须大于0,确保是有效的空间ID
- Size限制:最大不超过100,防止单次查询数据量过大影响性能
- 自动绑定:利用struct tag自动完成参数绑定和基础验证
路由注册实现-api.go文件详细分析
文件位置:backend/api/router/coze/api.go
核心代码:
// Code generated by hertz generator. DO NOT EDIT.
func Register(r *server.Hertz) {
root := r.Group("/", rootMw()...)
{
_api := root.Group("/api", _apiMw()...)
{
_plugin_api := _api.Group("/plugin_api", _plugin_apiMw()...)
_plugin_api.POST("/library_resource_list", append(_libraryresourcelistMw(), coze.LibraryResourceList)...)
// ... 其他资源相关路由
}
}
}
文件作用:
此文件是Coze Studio后端的核心路由注册文件,由hertz generator自动生成,负责将所有HTTP API接口路由与对应的处理函数进行绑定和注册。该文件构建了完整的RESTful API路由树结构。对于资源库模块,构建了层次化的路由结构:
/api/plugin_api/library_resource_list [POST]
├── rootMw() # 根级中间件
├── _apiMw() # API组中间件
├── _plugin_apiMw() # 插件API组中间件
├── _libraryresourcelistMw() # 资源库列表接口中间件
└── coze.LibraryResourceList # 处理函数
中间件系统-middleware.go文件详细分析
文件位置:backend/api/router/coze/middleware.go
核心代码:
func _plugin_apiMw() []app.HandlerFunc {
// 插件API模块中间件
return nil
}
func _libraryresourcelistMw() []app.HandlerFunc {
// 资源库列表查询接口专用中间件
return nil
}
func _projectresourcelistMw() []app.HandlerFunc {
// 项目资源列表查询接口专用中间件
return nil
}
func _resourcecopydispatchMw() []app.HandlerFunc {
// 资源复制分发接口专用中间件
return nil
}
文件作用:
- 中间件函数定义:为资源库模块的每个路由组和特定路由提供中间件挂载点
- 路由层级管理:按照路由的层级结构组织中间件函数,支持三层中间件架构
- 开发者扩展接口:提供统一的接口供开发者添加自定义中间件逻辑,如认证、鉴权、限流、日志记录等
- 粒度化控制:支持从模块级别到接口级别的细粒度中间件控制
- 功能扩展:可在此处添加资源访问权限检查、请求日志记录、性能监控等功能
API网关层Restful接口路由-Coze+Hertz
Hertz为每个HTTP方法维护独立的路由树,通过分组路由的方式构建层次化的API结构。对于资源库列表查询接口的完整路由链路:
/api/plugin_api/library_resource_list [POST]
├── rootMw() # 根级中间件(全局认证、CORS等)
├── _apiMw() # API组中间件(API版本控制、通用验证)
├── _plugin_apiMw() # 插件API组中间件(插件相关权限检查)
├── _libraryresourcelistMw() # 接口级中间件(资源库特定逻辑)
└── coze.LibraryResourceList # 处理函数
这种设计的优势:
- 层次化管理:不同层级的中间件处理不同的关注点,职责清晰
- 可扩展性:每个层级都可以独立添加中间件,不影响其他层级
- 性能优化:中间件按需执行,避免不必要的开销
- POST请求支持:专门处理POST请求的JSON数据绑定和验证
- 资源库管理:专门为资源库功能设计的路由结构,支持复杂的资源操作
- 统一错误处理:在中间件层面实现统一的错误处理和响应格式化
- 安全控制:多层级的安全检查,确保资源访问的安全性
3. 应用服务层
SearchApplicationService初始化
文件位置:backend/application/search/resource_search.go
和 backend/application/search/init.go
SearchApplicationService是搜索应用服务层的核心组件,负责处理项目和资源的搜索、获取、收藏等业务逻辑,是连接API层和领域层的重要桥梁。
服务结构定义
文件位置:backend/application/search/resource_search.go
// SearchApplicationService 搜索应用服务,处理项目和资源搜索的核心业务逻辑
var SearchSVC = &SearchApplicationService{}
type SearchApplicationService struct {
*ServiceComponents // 嵌入服务组件依赖
DomainSVC search.Search // 搜索领域服务
}
// 资源类型到默认图标的映射
var resType2iconURI = map[common.ResType]string{
common.ResType_Plugin: consts.DefaultPluginIcon,
common.ResType_Workflow: consts.DefaultWorkflowIcon,
common.ResType_Knowledge: consts.DefaultDatasetIcon,
common.ResType_Prompt: consts.DefaultPromptIcon,
common.ResType_Database: consts.DefaultDatabaseIcon,
}
服务组件依赖
文件位置:backend/application/search/init.go
// ServiceComponents 定义搜索服务所需的所有依赖组件
type ServiceComponents struct {
DB *gorm.DB // 数据库连接
Cache cache.Cmdable // 缓存服务
TOS storage.Storage // 对象存储服务
ESClient es.Client // Elasticsearch客户端
ProjectEventBus ProjectEventBus // 项目事件总线
ResourceEventBus ResourceEventBus // 资源事件总线
SingleAgentDomainSVC singleagent.SingleAgent // 单智能体领域服务
APPDomainSVC app.AppService // APP领域服务
KnowledgeDomainSVC knowledge.Knowledge // 知识库领域服务
PluginDomainSVC service.PluginService // 插件领域服务
WorkflowDomainSVC workflow.Service // 工作流领域服务
UserDomainSVC user.User // 用户领域服务
ConnectorDomainSVC connector.Connector // 连接器领域服务
PromptDomainSVC prompt.Prompt // 提示词领域服务
DatabaseDomainSVC database.Database // 数据库领域服务
}
服务初始化实现
文件位置:backend/application/search/init.go
// InitService 初始化搜索应用服务,注入所有依赖并设置消息队列消费者
func InitService(ctx context.Context, s *ServiceComponents) (*SearchApplicationService, error) {
// 创建搜索领域服务
searchDomainSVC := search.NewDomainService(ctx, s.ESClient)
// 注入依赖到全局服务实例
SearchSVC.DomainSVC = searchDomainSVC
SearchSVC.ServiceComponents = s
// 设置项目搜索消费者
searchConsumer := search.NewProjectHandler(ctx, s.ESClient)
logs.Infof("start search domain consumer...")
nameServer := os.Getenv(consts.MQServer)
// 注册项目事件消费者
err := eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer)
if err != nil {
return nil, fmt.Errorf("register search consumer failed, err=%w", err)
}
// 设置资源搜索消费者
searchResourceConsumer := search.NewResourceHandler(ctx, s.ESClient)
// 注册资源事件消费者
err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer)
if err != nil {
return nil, fmt.Errorf("register search consumer failed, err=%w", err)
}
return SearchSVC, nil
}
// 事件总线类型别名
type (
ResourceEventBus = search.ResourceEventBus
ProjectEventBus = search.ProjectEventBus
)
// NewResourceEventBus 创建资源事件总线
func NewResourceEventBus(p eventbus.Producer) search.ResourceEventBus {
return search.NewResourceEventBus(p)
}
// NewProjectEventBus 创建项目事件总线
func NewProjectEventBus(p eventbus.Producer) search.ProjectEventBus {
return search.NewProjectEventBus(p)
}
服务初始化特点:
- 依赖注入:通过ServiceComponents结构体注入15个不同的领域服务,实现完整的业务功能支持
- Elasticsearch集成:使用ES客户端提供强大的全文搜索和索引功能
- 事件驱动架构:集成项目和资源事件总线,支持异步事件处理和数据同步
- 消息队列消费者:自动注册项目和资源的MQ消费者,实现实时数据更新
- 多领域服务协调:整合智能体、APP、知识库、插件、工作流等多个领域服务
- 存储服务集成:支持数据库持久化、缓存加速和对象存储
资源库查询服务核心实现
资源库列表获取功能
文件位置:backend/application/search/resource_search.go
// LibraryResourceList 获取资源库列表
func (s *SearchApplicationService) LibraryResourceList(ctx context.Context, req *resource.LibraryResourceListRequest) (resp *resource.LibraryResourceListResponse, err error) {
// 权限验证
userID := ctxutil.GetUIDFromCtx(ctx)
if userID == nil {
return nil, errorx.New(errno.ErrSearchPermissionCode, errorx.KV("msg", "session required"))
}
// 构建资源搜索请求
searchReq := &entity.SearchResourcesRequest{
SpaceID: req.GetSpaceID(),
OwnerID: 0,
Name: req.GetName(),
ResTypeFilter: req.GetResTypeFilter(),
PublishStatusFilter: req.GetPublishStatusFilter(),
SearchKeys: req.GetSearchKeys(),
Cursor: req.GetCursor(),
Limit: req.GetSize(),
}
// 设置用户过滤条件
if req.IsSetUserFilter() && req.GetUserFilter() > 0 {
searchReq.OwnerID = ptr.From(userID)
}
// 调用领域服务搜索资源
searchResp, err := s.DomainSVC.SearchResources(ctx, searchReq)
if err != nil {
return nil, err
}
// 并发处理资源数据封装
lock := sync.Mutex{}
tasks := taskgroup.NewUninterruptibleTaskGroup(ctx, 10)
resources := make([]*common.ResourceInfo, len(searchResp.Data))
// 并发处理除第一个资源外的所有资源
if len(searchResp.Data) > 1 {
for idx := range searchResp.Data[1:] {
index := idx + 1
v := searchResp.Data[index]
tasks.Go(func() error {
ri, err := s.packResource(ctx, v)
if err != nil {
logs.CtxErrorf(ctx, "[LibraryResourceList] packResource failed, will ignore resID: %d, Name : %s, resType: %d, err: %v",
v.ResID, v.GetName(), v.ResType, err)
return err
}
lock.Lock()
defer lock.Unlock()
resources[index] = ri
return nil
})
}
}
// 同步处理第一个资源
if len(searchResp.Data) != 0 {
ri, err := s.packResource(ctx, searchResp.Data[0])
if err != nil {
return nil, err
}
lock.Lock()
resources[0] = ri
lock.Unlock()
}
// 等待并发任务完成
err = tasks.Wait()
if err != nil {
return nil, err
}
// 过滤空资源
filterResource := make([]*common.ResourceInfo, 0)
for _, res := range resources {
if res == nil {
continue
}
filterResource = append(filterResource, res)
}
return &resource.LibraryResourceListResponse{
Code: 0,
ResourceList: filterResource,
Cursor: ptr.Of(searchResp.NextCursor),
HasMore: searchResp.HasMore,
}, nil
}
代码作用:
- 1.权限验证 :检查用户是否已登录,未登录则返回权限错误
- 2.资源搜索 :根据请求参数(空间ID、资源名称、类型、发布状态)搜索资源
- 3.用户过滤 :支持按当前用户创建的资源进行过滤
- 4.并发处理 :使用协程池并发处理资源数据封装,提高性能
- 5.分页支持 :通过游标机制实现分页查询
代码功能:
在用户登录后点击“资源库”的场景中会调用(LibraryResourceList 方法),系统会:
- 首先通过搜索服务获取资源列表: searchResp, err := s.DomainSVC.SearchResources(ctx, searchReq)
- 然后 循环遍历 每个资源,为每个资源调用 s.packResource(ctx, v)
- 在 packResource 中,为每个资源创建对应的 packer 并调用 GetDataInfo获取具体资源的信息。
func (s *SearchApplicationService) packResource(ctx context.Context, doc *entity.ResourceDocument) (*common.ResourceInfo, error) {
ri := &common.ResourceInfo{
ResID: ptr.Of(doc.ResID),
ResType: ptr.Of(doc.ResType),
Name: doc.Name,
SpaceID: doc.SpaceID,
CreatorID: doc.OwnerID,
ResSubType: doc.ResSubType,
PublishStatus: doc.PublishStatus,
EditTime: ptr.Of(doc.GetUpdateTime() / 1000),
}
if doc.BizStatus != nil {
ri.BizResStatus = ptr.Of(int32(*doc.BizStatus))
}
packer, err := NewResourcePacker(doc.ResID, doc.ResType, s.ServiceComponents)
if err != nil {
return nil, errorx.Wrapf(err, "NewResourcePacker failed")
}
ri = s.packUserInfo(ctx, ri, doc.GetOwnerID())
ri.Actions = packer.GetActions(ctx)
data, err := packer.GetDataInfo(ctx)
if err != nil {
logs.CtxWarnf(ctx, "[packResource] GetDataInfo failed, resID: %d, Name : %s, resType: %d, err: %v",
doc.ResID, doc.GetName(), doc.ResType, err)
ri.Icon = ptr.Of(s.getResourceDefaultIconURL(ctx, doc.ResType))
return ri, nil // Warn : weak dependency data
}
ri.BizResStatus = data.status
ri.Desc = data.desc
ri.Icon = ternary.IFElse(len(data.iconURL) > 0,
&data.iconURL, ptr.Of(s.getResourceIconURL(ctx, data.iconURI, doc.ResType)))
ri.BizExtend = map[string]string{
"url": ptr.From(ri.Icon),
}
return ri, nil
}
代码功能:
packResource 方法是 资源数据封装器 ,负责将原始的资源文档数据转换为前端可用的资源信息结构。具体功能包括:
- 基础信息封装
- 将 entity.ResourceDocument 转换为 common.ResourceInfo
- 设置资源ID、类型、名称、空间ID、创建者ID等基础字段
- 处理编辑时间(转换为秒级时间戳)
- 资源打包器创建
- 调用 NewResourcePacker 根据资源类型创建对应的打包器:
- Plugin打包器 :处理插件资源
- Workflow打包器 :处理工作流资源
- Knowledge打包器 :处理知识库资源
- Prompt打包器 :处理提示词资源
- Database打包器 :处理数据库资源
- 用户信息封装
- 调用 packUserInfo 获取创建者的用户信息(姓名、头像)
- 设置默认用户头像(如果用户头像为空)
- 操作权限设置
- 通过 packer.GetActions(ctx) 获取资源可执行的操作列表
- 不同资源类型有不同的操作权限(编辑、删除、复制等)
- 详细信息获取
- 调用 packer.GetDataInfo(ctx) 获取资源的详细信息:
- 图标信息 :iconURI、iconURL
- 描述信息 :资源描述
- 状态信息 :业务状态
- 如果获取失败,使用默认图标并记录警告日志
- 图标处理
- 优先使用资源的自定义图标URL
- 如果没有自定义图标,则使用默认图标
- 将图标URL添加到扩展信息中
- 设计特点
- 容错性强 :即使获取详细信息失败,也会返回基础信息
- 类型安全 :通过不同的打包器处理不同类型的资源
- 可扩展性 :新增资源类型只需添加对应的打包器
- 性能优化 :在并发场景中被调用,支持高并发处理
这个方法是资源库系统中的核心数据转换组件,确保了前端能够获得完整、格式化的资源信息。
GetDataInfo详解
文件位置:backend\application\search\resource_pack.go
接口定义核心代码:
type ResourcePacker interface {
GetDataInfo(ctx context.Context) (*dataInfo, error)
}
数据结构定义:
type dataInfo struct {
iconURI *string // 资源图标URI
iconURL string // 资源图标完整URL
desc *string // 资源描述
status *int32 // 资源状态(仅知识库使用)
}
插件资源GetDataInfo实现:
func (p *pluginPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {
plugin, err := p.appContext.PluginDomainSVC.GetDraftPlugin(ctx, p.resID)
if err != nil {
return nil, err
}
iconURL, err := p.appContext.TOS.GetObjectUrl(ctx, plugin.GetIconURI())
if err != nil {
logs.CtxWarnf(ctx, "get icon url failed with '%s', err=%v", plugin.GetIconURI(), err)
}
return &dataInfo{
iconURI: ptr.Of(plugin.GetIconURI()),
iconURL: iconURL,
desc: ptr.Of(plugin.GetDesc()),
}, nil
}
工作流资源GetDataInfo实现:
func (w *workflowPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {
info, err := w.appContext.WorkflowDomainSVC.Get(ctx, &vo.GetPolicy{
ID: w.resID,
MetaOnly: true,
})
if err != nil {
return nil, err
}
return &dataInfo{
iconURI: &info.IconURI,
iconURL: info.IconURL,
desc: &info.Desc,
}, nil
}
知识库资源GetDataInfo实现:
func (k *knowledgePacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {
res, err := k.appContext.KnowledgeDomainSVC.GetKnowledgeByID(ctx, &service.GetKnowledgeByIDRequest{
KnowledgeID: k.resID,
})
if err != nil {
return nil, err
}
kn := res.Knowledge
return &dataInfo{
iconURI: ptr.Of(kn.IconURI),
iconURL: kn.IconURL,
desc: ptr.Of(kn.Description),
status: ptr.Of(int32(kn.Status)),
}, nil
}
提示词资源GetDataInfo实现:
func (p *promptPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {
pInfo, err := p.appContext.PromptDomainSVC.GetPromptResource(ctx, p.resID)
if err != nil {
return nil, err
}
return &dataInfo{
iconURI: nil, // prompt don't have custom icon
iconURL: "",
desc: &pInfo.Description,
}, nil
}
数据库资源GetDataInfo实现:
func (d *databasePacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {
listResp, err := d.appContext.DatabaseDomainSVC.MGetDatabase(ctx, &dbservice.MGetDatabaseRequest{Basics: []*database.DatabaseBasic{
{
ID: d.resID,
TableType: table.TableType_OnlineTable,
},
}})
if err != nil {
return nil, err
}
if len(listResp.Databases) == 0 {
return nil, fmt.Errorf("online database not found, id: %d", d.resID)
}
return &dataInfo{
iconURI: ptr.Of(listResp.Databases[0].IconURI),
iconURL: listResp.Databases[0].IconURL,
desc: ptr.Of(listResp.Databases[0].TableDesc),
}, nil
}
功能说明:
- GetDataInfo方法作用:获取不同类型资源的详细信息,包括图标、描述和状态等
- 多态实现:通过ResourcePacker接口实现多态,支持Plugin、Workflow、Knowledge、Prompt、Database五种资源类型
- 数据封装:将各种资源的详细信息统一封装到dataInfo结构体中
- 错误处理:每个实现都包含完善的错误处理机制,确保系统稳定性
- 图标处理:支持自定义图标URI和完整URL的获取,提示词资源不支持自定义图标
- 状态管理:知识库资源额外支持状态信息的获取和管理
- 工厂模式:通过NewResourcePacker工厂函数根据资源类型创建相应的实现
4. 领域服务层
搜索领域
搜索领域服务接口
文件位置:backend\domain\search\service\service.go
核心代码:
package service
import (
"context"
"github.com/coze-dev/coze-studio/backend/domain/search/entity"
)
type ResourceEventBus interface {
PublishResources(ctx context.Context, event *entity.ResourceDomainEvent) error
}
type Search interface {
SearchResources(ctx context.Context, req *entity.SearchResourcesRequest) (resp *entity.SearchResourcesResponse, err error)
}
搜索领域服务实现-业务接口
文件位置:backend\domain\search\service\search.go
核心代码:
package service
import (
"context"
"strconv"
"github.com/bytedance/sonic"
model "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/search"
searchEntity "github.com/coze-dev/coze-studio/backend/domain/search/entity"
"github.com/coze-dev/coze-studio/backend/infra/contract/es"
"github.com/coze-dev/coze-studio/backend/pkg/lang/conv"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
)
var searchInstance *searchImpl
func NewDomainService(ctx context.Context, e es.Client) Search {
return &searchImpl{
esClient: e,
}
}
type searchImpl struct {
esClient es.Client
}
func (s *searchImpl) SearchResources(ctx context.Context, req *searchEntity.SearchResourcesRequest) (resp *searchEntity.SearchResourcesResponse, err error) {
logs.CtxDebugf(ctx, "[SearchResources] search : %s", conv.DebugJsonToStr(req))
searchReq := &es.Request{
Query: &es.Query{
Bool: &es.BoolQuery{},
},
}
result, err := s.esClient.Search(ctx, resourceIndexName, searchReq)
if err != nil {
return nil, err
}
hits := result.Hits.Hits
hasMore := func() bool {
if len(hits) > reqLimit {
return true
}
return false
}()
if hasMore {
hits = hits[:reqLimit]
}
docs := make([]*searchEntity.ResourceDocument, 0, len(hits))
for _, hit := range hits {
doc := &searchEntity.ResourceDocument{}
if err = sonic.Unmarshal(hit.Source_, doc); err != nil {
return nil, err
}
docs = append(docs, doc)
}
nextCursor := ""
if len(docs) > 0 {
nextCursor = formatResourceNextCursor(req.OrderFiledName, docs[len(docs)-1])
}
if nextCursor == "" {
hasMore = false
}
var total *int64
if result.Hits.Total != nil {
total = ptr.Of(result.Hits.Total.Value)
}
resp = &searchEntity.SearchResourcesResponse{
Data: docs,
TotalHits: total,
HasMore: hasMore,
NextCursor: nextCursor,
}
return resp, nil
}
代码功能:
该方法是Coze Studio资源搜索的核心实现,主要功能包括:
- 多维度筛选 :支持按应用ID、空间ID、资源名称、所有者ID、资源类型、发布状态等条件筛选
- ElasticSearch查询 :构建复杂的ES Bool查询,支持精确匹配和模糊搜索
- 分页支持 :支持基于页码和游标的两种分页方式
- 排序功能 :支持按指定字段排序,默认按更新时间排序
- 结果处理 :将ES查询结果反序列化为ResourceDocument对象,并计算分页信息
该方法位于领域服务层,是资源库功能的核心搜索引擎。
插件领域
插件领域服务接口
文件位置:backend/domain/plugin/service/service.go
插件领域服务接口定义了插件管理的核心业务能力,包括草稿插件管理、在线插件发布、工具管理、Agent工具绑定等功能。
type PluginService interface {
// Draft Plugin
GetDraftPlugin(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, err error)
}
核心接口功能:
- 草稿插件管理:创建、获取、更新、删除草稿状态的插件
- 在线插件管理:发布插件到线上环境,获取在线插件信息
- 工具管理:管理插件中的具体工具,支持OpenAPI文档转换
- Agent工具绑定:将插件工具绑定到Agent,支持工具执行
- OAuth认证:处理插件的OAuth授权流程
插件领域服务实现-业务接口
文件位置:backend/domain/plugin/service/service_impl.go
和backend\domain\plugin\service\plugin_draft.go
插件服务实现类包含了所有插件相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type Components struct {
IDGen idgen.IDGenerator
DB *gorm.DB
OSS storage.Storage
PluginRepo repository.PluginRepository
ToolRepo repository.ToolRepository
OAuthRepo repository.OAuthRepository
}
func NewService(components *Components) PluginService {
impl := &pluginServiceImpl{
db: components.DB,
oss: components.OSS,
pluginRepo: components.PluginRepo,
toolRepo: components.ToolRepo,
oauthRepo: components.OAuthRepo,
httpCli: resty.New(),
}
initOnce.Do(func() {
ctx := context.Background()
safego.Go(ctx, func() {
impl.processOAuthAccessToken(ctx)
})
})
return impl
}
type pluginServiceImpl struct {
db *gorm.DB
oss storage.Storage
pluginRepo repository.PluginRepository
toolRepo repository.ToolRepository
oauthRepo repository.OAuthRepository
httpCli *resty.Client
}
func (p *pluginServiceImpl) GetDraftPlugin(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, err error) {
pl, exist, err := p.pluginRepo.GetDraftPlugin(ctx, pluginID)
if err != nil {
return nil, errorx.Wrapf(err, "GetDraftPlugin failed, pluginID=%d", pluginID)
}
if !exist {
return nil, errorx.New(errno.ErrPluginRecordNotFound)
}
return pl, nil
}
实现特点:
- 依赖注入:通过Components结构体注入所需的基础设施组件
- 仓储模式:使用Repository模式进行数据访问抽象
- HTTP客户端:集成Resty HTTP客户端用于外部API调用
- 异步处理:使用safego进行OAuth令牌的异步处理
- 单例初始化:使用sync.Once确保初始化逻辑只执行一次
工作流领域
工作流领域服务接口
文件位置:backend/domain/workflow/interface.go
type Service interface {
Get(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)
}
接口功能分析:
- 工作流管理:Create、Save、Get、MGet、Delete、UpdateMeta等基础CRUD操作
工作流领域服务实现-业务接口
文件位置:backend/domain/workflow/service/service_impl.go
type impl struct {
repo workflow.Repository
*asToolImpl
*executableImpl
*conversationImpl
}
func NewWorkflowService(repo workflow.Repository) workflow.Service {
return &impl{
repo: repo,
asToolImpl: &asToolImpl{
repo: repo,
},
executableImpl: &executableImpl{
repo: repo,
},
conversationImpl: &conversationImpl{repo: repo},
}
}
func (i *impl) Get(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error) {
return i.repo.GetEntity(ctx, policy)
}
工作流领域服务实现-业务实体
文件位置:backend/domain/workflow/entity/workflow.go
type Workflow struct {
ID int64
CommitID string
*vo.Meta
*vo.CanvasInfo
*vo.DraftMeta
*vo.VersionMeta
}
func (w *Workflow) GetBasic() *WorkflowBasic {
var version string
if w.VersionMeta != nil {
version = w.VersionMeta.Version
}
return &WorkflowBasic{
ID: w.ID,
Version: version,
SpaceID: w.SpaceID,
AppID: w.AppID,
CommitID: w.CommitID,
}
}
func (w *Workflow) GetLatestVersion() string {
if w.LatestPublishedVersion == nil {
return ""
}
return *w.LatestPublishedVersion
}
type WorkflowBasic struct {
ID int64
Version string
SpaceID int64
AppID *int64
CommitID string
}
type CopyWorkflowFromAppToLibraryResult struct {
WorkflowIDVersionMap map[int64]IDVersionPair
ValidateIssues []*vo.ValidateIssue
CopiedWorkflows []*Workflow
}
实体设计特点:
- 组合结构:Workflow实体组合了Meta、CanvasInfo、DraftMeta、VersionMeta
- 版本管理:支持草稿版本和发布版本的管理
- 基础信息:WorkflowBasic提供工作流的核心标识信息
- 复制结果:CopyWorkflowFromAppToLibraryResult封装复制操作的完整结果
- 方法封装:提供GetBasic、GetLatestVersion等便捷方法
- 空值处理:对可能为空的字段进行安全处理
知识库领域
知识库领域服务接口
文件位置:backend/domain/knowledge/service/interface.go
知识库领域服务接口定义了知识库管理的核心业务能力,包括知识库管理、文档管理、切片管理、检索功能等。
type Knowledge interface {
GetKnowledgeByID(ctx context.Context, request *GetKnowledgeByIDRequest) (response *GetKnowledgeByIDResponse, err error)
}
核心接口功能:
- 知识库管理:创建、获取、更新、删除、复制知识库,支持知识库在应用和资源库间移动
知识库领域服务实现-业务接口
文件位置:backend/domain/knowledge/service/knowledge.go
知识库服务实现类包含了所有知识库相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type KnowledgeSVCConfig struct {
DB *gorm.DB // required
IDGen idgen.IDGenerator // required
RDB rdb.RDB // Required: Form storage
Producer eventbus.Producer // Required: Document indexing process goes through mq asynchronous processing
SearchStoreManagers []searchstore.Manager // Required: Vector/Full Text
ParseManager parser.Manager // Optional: document segmentation and processing capability, default builtin parser
Storage storage.Storage // required: oss
ModelFactory chatmodel.Factory // Required: Model factory
Rewriter messages2query.MessagesToQuery // Optional: Do not overwrite when not configured
Reranker rerank.Reranker // Optional: default rrf when not configured
NL2Sql nl2sql.NL2SQL // Optional: Not supported by default when not configured
EnableCompactTable *bool // Optional: Table data compression, default true
OCR ocr.OCR // Optional: ocr, ocr function is not available when not provided
CacheCli cache.Cmdable // Optional: cache implementation
}
type knowledgeSVC struct {
knowledgeRepo repository.KnowledgeRepo
documentRepo repository.KnowledgeDocumentRepo
sliceRepo repository.KnowledgeDocumentSliceRepo
reviewRepo repository.KnowledgeDocumentReviewRepo
modelFactory chatmodel.Factory
idgen idgen.IDGenerator
rdb rdb.RDB
producer eventbus.Producer
searchStoreManagers []searchstore.Manager
parseManager parser.Manager
rewriter messages2query.MessagesToQuery
reranker rerank.Reranker
storage storage.Storage
nl2Sql nl2sql.NL2SQL
cacheCli cache.Cmdable
enableCompactTable bool // Table data compression
}
func (k *knowledgeSVC) GetKnowledgeByID(ctx context.Context, request *GetKnowledgeByIDRequest) (response *GetKnowledgeByIDResponse, err error) {
if request == nil || request.KnowledgeID == 0 {
return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "request is empty"))
}
kn, err := k.knowledgeRepo.GetByID(ctx, request.KnowledgeID)
if err != nil {
return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
}
if kn == nil || kn.ID == 0 {
return nil, errorx.New(errno.ErrKnowledgeNotExistCode, errorx.KV("msg", "knowledge not found"))
}
knEntity, err := k.fromModelKnowledge(ctx, kn)
if err != nil {
return nil, err
}
return &GetKnowledgeByIDResponse{
Knowledge: knEntity,
}, nil
}
知识库领域服务实现-业务实体
文件位置:backend/domain/knowledge/entity/knowledge.go
type Knowledge struct {
*knowledge.Knowledge
}
type WhereKnowledgeOption struct {
KnowledgeIDs []int64
AppID *int64
SpaceID *int64
Name *string // Exact match
Status []int32
UserID *int64
Query *string // fuzzy match
Page *int
PageSize *int
Order *Order
OrderType *OrderType
FormatType *int64
}
type OrderType int32
const (
OrderTypeAsc OrderType = 1
OrderTypeDesc OrderType = 2
)
type Order int32
const (
OrderCreatedAt Order = 1
OrderUpdatedAt Order = 2
)
实体设计特点:
- 组合结构:Knowledge实体组合了crossdomain中的Knowledge结构
- 查询选项:WhereKnowledgeOption提供了丰富的查询条件支持
- 排序支持:支持按创建时间、更新时间等字段排序
- 分页查询:内置分页查询支持
- 多维筛选:支持按应用ID、空间ID、状态、格式类型等多维度筛选
- 模糊搜索:支持按名称精确匹配和模糊搜索
提示词领域
提示词领域服务接口
文件位置:backend/domain/prompt/service/prompt.go
提示词领域服务接口定义了提示词管理的核心业务能力,包括提示词资源的创建、获取、更新、删除以及官方提示词模板的管理。
type Prompt interface {
GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error)
}
核心接口功能:
- 提示词资源管理:创建、获取、更新、删除用户自定义的提示词资源
提示词领域服务实现-业务接口
文件位置:backend/domain/prompt/service/prompt_impl.go
提示词服务实现类包含了所有提示词相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type promptService struct {
Repo repository.PromptRepository
}
func NewService(repo repository.PromptRepository) Prompt {
return &promptService{
Repo: repo,
}
}
func (s *promptService) GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error) {
return s.Repo.GetPromptResource(ctx, promptID)
}
实现特点:
- 依赖注入:通过Repository接口注入数据访问能力
- 仓储模式:使用Repository模式进行数据访问抽象
- 官方模板:集成官方提示词模板管理功能
- 搜索过滤:支持按关键词搜索和过滤提示词
- 数据复制:使用深拷贝确保数据安全性
- 错误处理:统一的错误处理和传播机制
提示词领域服务实现-业务实体
文件位置:backend/domain/prompt/entity/promot_resource.go
type PromptResource struct {
ID int64
SpaceID int64
Name string
Description string
PromptText string
Status int32
CreatorID int64
CreatedAt int64
UpdatedAt int64
}
实体设计特点:
- 基础信息:包含ID、名称、描述等基本属性
- 内容存储:PromptText字段存储完整的提示词内容
- 权限管理:SpaceID和CreatorID支持多租户和权限控制
- 状态管理:Status字段支持提示词的状态管理
- 时间追踪:CreatedAt和UpdatedAt支持创建和更新时间追踪
- 简洁设计:实体结构简洁明了,专注于提示词核心属性
数据库领域
数据库领域负责管理Coze Studio中的数据库资源,包括草稿数据库和在线数据库的创建、更新、删除、查询等核心功能。该领域支持结构化数据存储,为Agent提供数据查询和操作能力。
数据库领域服务接口
文件位置:backend/domain/memory/database/service/database.go
数据库领域服务接口定义了数据库管理的核心业务能力,包括数据库CRUD操作、记录管理、Agent绑定、SQL执行等功能。
type Database interface {
// 数据库基础操作
MGetDatabase(ctx context.Context, req *MGetDatabaseRequest) (*MGetDatabaseResponse, error)
}
核心接口功能:
- 数据库管理:创建、更新、删除、批量获取数据库
- 记录操作:增删改查数据库记录,支持分页查询
- 模板和导入:支持Excel模板生成和数据导入
- SQL执行:支持自定义SQL查询和操作
- Agent集成:将数据库绑定到Agent,支持智能查询
- 发布管理:将草稿数据库发布到线上环境
数据库领域服务实现-业务接口
文件位置:backend/domain/memory/database/service/database_impl.go
数据库服务实现类包含了所有数据库相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
MGetDatabase方法实现:
func (d databaseService) MGetDatabase(ctx context.Context, req *MGetDatabaseRequest) (*MGetDatabaseResponse, error) {
if len(req.Basics) == 0 {
return &MGetDatabaseResponse{
Databases: []*entity2.Database{},
}, nil
}
onlineID2NeedSysFields := make(map[int64]bool)
draftID2NeedSysFields := make(map[int64]bool)
uniqueOnlineIDs := make([]int64, 0)
uniqueDraftIDs := make([]int64, 0)
idMap := make(map[int64]bool)
for _, basic := range req.Basics {
if !idMap[basic.ID] {
idMap[basic.ID] = true
if basic.TableType == table.TableType_OnlineTable {
uniqueOnlineIDs = append(uniqueOnlineIDs, basic.ID)
onlineID2NeedSysFields[basic.ID] = basic.NeedSysFields
} else {
uniqueDraftIDs = append(uniqueDraftIDs, basic.ID)
draftID2NeedSysFields[basic.ID] = basic.NeedSysFields
}
}
}
// 批量获取在线数据库和草稿数据库
onlineDatabases, err := d.onlineDAO.MGet(ctx, uniqueOnlineIDs)
if err != nil {
return nil, fmt.Errorf("batch get database info failed: %v", err)
}
draftDatabases, err := d.draftDAO.MGet(ctx, uniqueDraftIDs)
if err != nil {
return nil, fmt.Errorf("batch get database info failed: %v", err)
}
// 处理系统字段和图标URL
for _, onlineDatabase := range onlineDatabases {
if needSys, ok := onlineID2NeedSysFields[onlineDatabase.ID]; ok && needSys {
if onlineDatabase.FieldList == nil {
onlineDatabase.FieldList = make([]*database.FieldItem, 0, 3)
}
onlineDatabase.FieldList = append(onlineDatabase.FieldList,
physicaltable.GetDisplayCreateTimeField(),
physicaltable.GetDisplayUidField(),
physicaltable.GetDisplayIDField())
}
if onlineDatabase.IconURI != "" {
objURL, uRrr := d.storage.GetObjectUrl(ctx, onlineDatabase.IconURI)
if uRrr == nil {
onlineDatabase.IconURL = objURL
}
}
}
databases := make([]*entity2.Database, 0)
databases = append(databases, onlineDatabases...)
databases = append(databases, draftDatabases...)
return &MGetDatabaseResponse{
Databases: databases,
}, nil
}
业务逻辑特点:
- 双表管理:同时维护草稿表和在线表,支持开发和生产环境分离
- 物理表创建:自动创建对应的物理数据库表结构
- 事务保证:使用数据库事务确保数据一致性
- 字段管理:支持自定义字段和系统字段的管理
- 图标处理:自动处理图标URI到URL的转换
- 批量操作:支持批量获取数据库信息,提高查询效率
数据库领域服务实现-业务实体
文件位置:backend/domain/memory/database/entity/database.go
数据库领域实体定义了数据库相关的核心数据结构和业务对象。
type Database = database.Database
// DatabaseFilter Database filter criteria
type DatabaseFilter struct {
CreatorID *int64
SpaceID *int64
TableName *string
AppID *int64
}
// Pagination pagination
type Pagination struct {
Total int64
Limit int
Offset int
}
type TableSheet struct {
SheetID int64
HeaderLineIdx int64
StartLineIdx int64
}
type TableReaderMeta struct {
TosMaxLine int64
SheetId int64
HeaderLineIdx int64
StartLineIdx int64
ReaderMethod database.TableReadDataMethod
ReadLineCnt int64
Schema []*knowledge.DocTableColumn
}
type ExcelExtraInfo struct {
Sheets []*knowledge.DocTableSheet
ExtensionName string // extension
FileSize int64 // file size
SourceFileID int64
TosURI string
}
type ColumnInfo struct {
ColumnType knowledge.ColumnType
ContainsEmptyValue bool
}
实体设计特点:
- 数据库实体:复用跨域模型中的Database定义,保持一致性
- 过滤条件:DatabaseFilter支持多维度的数据库查询过滤
- 分页支持:Pagination实体支持分页查询功能
- 表格处理:TableSheet和TableReaderMeta支持Excel/CSV文件导入
- 元数据管理:ExcelExtraInfo包含文件的详细元数据信息
- 列信息:ColumnInfo描述数据库列的类型和约束信息
数据库领域核心功能:
- 结构化存储:为Agent提供结构化数据存储能力
- 数据导入:支持从Excel/CSV文件批量导入数据
- SQL查询:支持自定义SQL查询和数据分析
- Agent集成:与Agent深度集成,支持智能数据查询
- 版本管理:通过草稿和在线环境实现数据版本管理
- 权限控制:支持基于用户和应用的数据访问控制
5. 数据访问层
插件仓储
PluginRepo仓储接口定义
文件位置:backend\domain\plugin\repository\plugin_repository.go
type PluginRepository interface {
GetDraftPlugin(ctx context.Context, pluginID int64, opts ...PluginSelectedOptions) (plugin *entity.PluginInfo, exist bool, err error)
}
PluginRepo仓储实现
文件位置:backend\domain\plugin\repository\plugin_impl.go
func (p *pluginRepoImpl) GetDraftPlugin(ctx context.Context, pluginID int64, opts ...PluginSelectedOptions) (plugin *entity.PluginInfo, exist bool, err error) {
var opt *dal.PluginSelectedOption
if len(opts) > 0 {
opt = &dal.PluginSelectedOption{}
for _, o := range opts {
o(opt)
}
}
return p.pluginDraftDAO.Get(ctx, pluginID, opt)
}
数据模型与查询
文件位置:backend\domain\plugin\internal\dal\plugin_draft.go
func (a *PluginDraftDAO) Get(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, exist bool, err error) {
table := a.query.PluginDraft
res, err := table.WithContext(ctx).
Where(table.ID.Eq(pluginID)).
First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, nil
}
return nil, false, err
}
plugin = pluginDraftPO(*res).ToDO()
return plugin, true, nil
}
代码功能:
- 数据库查询 - 使用GORM查询框架,从 PluginDraft 表中根据ID查找记录
- 错误处理 - 区分处理两种情况:
- 记录不存在:返回 (nil, false, nil)
- 其他数据库错误:返回 (nil, false, err)
- 数据转换 - 将数据库持久化对象(PO)转换为领域实体(DO)
- pluginDraftPO(*res).ToDO() 执行PO到DO的转换
- 成功返回 - 找到记录时返回 (plugin, true, nil)
- 失败返回 - 未找到记录时返回 (nil, false, nil)
文件依赖关系:
数据库表结构 (schema.sql)(plugin_draft表)
↓ gen_orm_query.go
模型文件 (model/plugin_draft.gen.go) - 生成模型
↓
查询文件 (query/plugin_draft.gen.go) - 依赖对应模型
↓
统一入口 (query/gen.go) - 依赖所有查询文件
工作流仓储
workflowRepo仓储接口定义
文件位置:backend\domain\workflow\interface.go
type Repository interface {
GetEntity(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)
}
workflowRepo仓储实现
文件位置:backend\domain\workflow\internal\repo\repository.go
type RepositoryImpl struct {
idgen.IDGenerator
query *query.Query
redis cache.Cmdable
tos storage.Storage
einoCompose.CheckPointStore
workflow.InterruptEventStore
workflow.CancelSignalStore
workflow.ExecuteHistoryStore
builtinModel cm.BaseChatModel
workflow.WorkflowConfig
workflow.Suggester
}
func (r *RepositoryImpl) GetEntity(ctx context.Context, policy *vo.GetPolicy) (_ *entity.Workflow, err error) {
defer func() {
if err != nil {
err = vo.WrapIfNeeded(errno.ErrDatabaseError, err)
}
}()
meta, err := r.GetMeta(ctx, policy.ID)
if err != nil {
return nil, err
}
if policy.MetaOnly {
return &entity.Workflow{
ID: policy.ID,
Meta: meta,
}, nil
}
}
func (r *RepositoryImpl) GetMeta(ctx context.Context, id int64) (_ *vo.Meta, err error) {
defer func() {
if err != nil {
err = vo.WrapIfNeeded(errno.ErrDatabaseError, err)
}
}()
meta, err := r.query.WorkflowMeta.WithContext(ctx).Debug().Where(r.query.WorkflowMeta.ID.Eq(id)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, vo.WrapError(errno.ErrWorkflowNotFound, fmt.Errorf("workflow meta not found for ID %d: %w", id, err),
errorx.KV("id", strconv.FormatInt(id, 10)))
}
return nil, fmt.Errorf("failed to get workflow meta for ID %d: %w", id, err)
}
return r.convertMeta(ctx, meta)
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(workflow_meta表)
↓ gen_orm_query.go
模型文件 (model/workflow_meta.gen.go) - 生成模型
↓
查询文件 (query/workflow_meta.gen.go) - 依赖对应模型
↓
统一入口 (query/gen.go) - 依赖所有查询文件
知识库仓储
knowledgeRepo仓储接口定义
文件位置:backend\domain\knowledge\repository\repository.go
type KnowledgeRepo interface {
GetByID(ctx context.Context, id int64) (*model.Knowledge, error)
}
knowledgeRepo仓储实现
文件位置:backend\domain\knowledge\internal\dal\dao\knowledge.go
func (dao *KnowledgeDAO) GetByID(ctx context.Context, id int64) (*model.Knowledge, error) {
k := dao.Query.Knowledge
knowledge, err := k.WithContext(ctx).Where(k.ID.Eq(id)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return knowledge, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(knowledge表)
↓ gen_orm_query.go
模型文件 (model/knowledge.gen.go) - 生成模型
↓
查询文件 (query/knowledge.gen.go) - 依赖对应模型
↓
统一入口 (query/gen.go) - 依赖所有查询文件
提示词仓储
promptRepo仓储接口定义
文件位置:backend\domain\prompt\repository\repository.go
type PromptRepository interface {
GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error)
}
promptRepo仓储实现
文件位置:backend\domain\prompt\internal\dal\prompt_resource.go
func (d *PromptDAO) GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error) {
promptModel := query.PromptResource
promptWhere := []gen.Condition{
promptModel.ID.Eq(promptID),
}
promptResource, err := promptModel.WithContext(ctx).Where(promptWhere...).First()
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.WrapByCode(err, errno.ErrPromptDataNotFoundCode)
}
if err != nil {
return nil, errorx.WrapByCode(err, errno.ErrPromptGetCode)
}
do := d.promptResourcePO2DO(promptResource)
return do, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(prompt_resource表)
↓ gen_orm_query.go
模型文件 (model/prompt_resource.gen.go) - 生成模型
↓
查询文件 (query/prompt_resource.gen.go) - 依赖对应模型
↓
统一入口 (query/gen.go) - 依赖所有查询文件
数据库仓储
databaseRepo仓储接口定义
文件位置:backend\domain\memory\database\repository\repository.go
type OnlineDAO interface {
MGet(ctx context.Context, ids []int64) ([]*entity.Database, error)
}
databaseRepo仓储实现
文件位置:backend\domain\memory\database\repository\repository.go
func (o *OlineImpl) MGet(ctx context.Context, ids []int64) ([]*entity.Database, error) {
if len(ids) == 0 {
return []*entity.Database{}, nil
}
res := o.query.OnlineDatabaseInfo
// Query undeleted records with IDs in the given list
records, err := res.WithContext(ctx).
Where(res.ID.In(ids...)).
Find()
if err != nil {
return nil, fmt.Errorf("batch query online database failed: %v", err)
}
// Build returns results
databases := make([]*entity.Database, 0, len(records))
for _, info := range records {
db := &entity.Database{
ID: info.ID,
SpaceID: info.SpaceID,
CreatorID: info.CreatorID,
IconURI: info.IconURI,
AppID: info.AppID,
DraftID: &info.RelatedDraftID,
OnlineID: &info.ID,
IsVisible: info.IsVisible == 1,
PromptDisabled: info.PromptDisabled == 1,
TableName: info.TableName_,
TableDesc: info.TableDesc,
FieldList: info.TableField,
Status: table.BotTableStatus_Online,
ActualTableName: info.PhysicalTableName,
RwMode: table.BotTableRWMode(info.RwMode),
CreatedAtMs: info.CreatedAt,
UpdatedAtMs: info.UpdatedAt,
}
databases = append(databases, db)
}
return databases, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(online_database_info表)
↓ gen_orm_query.go
模型文件 (model/online_database_info.gen.go) - 生成模型
↓
查询文件 (query/online_database_info.gen.go) - 依赖对应模型
↓
统一入口 (query/gen.go) - 依赖所有查询文件
6. 基础设施层
database.go文件详解
文件位置:backend/infra/contract/orm/database.go
核心代码:
package orm
import (
"gorm.io/gorm"
)
type DB = gorm.DB
文件作用:
- 定义了 type DB = gorm.DB ,为 GORM 数据库对象提供类型别名
- 作为契约层(Contract),为上层提供统一的数据库接口抽象
- 便于后续可能的数据库实现替换(如从 MySQL 切换到 PostgreSQL)
mysql.go文件详解
文件位置:backend/infra/impl/mysql/mysql.go
核心代码:
package mysql
import (
"fmt"
"os"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
func New() (*gorm.DB, error) {
dsn := os.Getenv("MYSQL_DSN")
db, err := gorm.Open(mysql.Open(dsn))
if err != nil {
return nil, fmt.Errorf("mysql open, dsn: %s, err: %w", dsn, err)
}
return db, nil
}
文件作用:
- 定义了 New() 函数,负责建立 GORM MySQL 数据库连接
- 使用环境变量 MYSQL_DSN 配置数据库连接字符串
- 返回 *gorm.DB 实例,作为整个应用的数据库连接对象
- 后端服务启动时,调用 mysql.New() 初始化数据库连接
main.go → application.Init() → appinfra.Init() → mysql.New()
ElasticSearch架构设计
Contract 层(接口定义)
backend/infra/contract/es/
目录定义了 ElasticSearch 的抽象接口:
es.go
: 定义了核心接口Client
接口:包含Search
、Create
、Update
、Delete
、CreateIndex
等方法Types
接口:定义属性类型创建方法BulkIndexer
接口:批量操作接口
model.go
: 定义数据模型Request
:搜索请求结构体,包含查询条件、分页、排序等Response
:搜索响应结构体,包含命中结果和元数据Hit
:单个搜索结果BulkIndexerItem
:批量操作项
query.go
: 定义查询相关结构Query
:查询结构体,支持多种查询类型QueryType
常量:equal
、match
、multi_match
、not_exists
、contains
、in
BoolQuery
:布尔查询,支持must
、should
、filter
、must_not
- 各种查询构造函数:
NewEqualQuery
、NewMatchQuery
等
Implementation 层(具体实现)
backend/infra/impl/es/
目录提供了具体实现:
es_impl.go
: 工厂方法New()
函数根据环境变量ES_VERSION
选择 ES7 或 ES8 实现- 类型别名导出,统一接口
es7.go
: ElasticSearch 7.x 实现es7Client
结构体实现Client
接口- 使用
github.com/elastic/go-elasticsearch/v7
官方客户端 Search
方法将抽象查询转换为 ES7 格式的 JSON 查询query2ESQuery
方法处理查询类型转换
es8.go
: ElasticSearch 8.x 实现es8Client
结构体实现Client
接口- 使用
github.com/elastic/go-elasticsearch/v8
官方客户端 - 使用类型化 API,更加类型安全
Search
方法使用 ES8 的 typed API
查询执行流程
- 业务层调用:
backend/domain/search/service/search.go
中的SearchProjects
方法 - 构建查询:创建
es.Request
对象,设置查询条件、排序、分页等 - 执行查询:调用
s.esClient.Search(ctx, projectIndexName, searchReq)
- 版本适配:根据
ES_VERSION
环境变量,自动选择 ES7 或 ES8 实现 - 查询转换:
- ES7:将抽象查询转换为 JSON 格式
- ES8:将抽象查询转换为类型化结构体
- 结果处理:将 ES 响应转换为统一的
Response
结构体
索引使用:
- 项目索引:
projectIndexName = "project_draft"
存储项目草稿信息 - 资源索引:
resourceIndexName = "coze_resource"
存储各类资源信息
设计优势
- 版本兼容:同时支持 ES7 和 ES8,通过环境变量切换
- 接口统一:业务代码无需关心具体 ES 版本
- 类型安全:ES8 使用类型化 API,减少运行时错误
- 查询抽象:提供统一的查询构建方式,支持复杂的布尔查询
- 易于扩展:新增查询类型只需在 contract 层定义,impl 层实现
这种设计模式体现了依赖倒置原则,业务层依赖抽象接口而非具体实现,使得系统更加灵活和可维护。
7. 数据存储层
数据库表结构
文件位置:docker/volumes/mysql/schema.sql
-- 插件草稿表
CREATE TABLE IF NOT EXISTS `plugin_draft` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',
`creator_id` bigint(20) NOT NULL COMMENT '插件创建者ID',
`name` varchar(255) NOT NULL COMMENT '插件名称',
`description` text COMMENT '插件描述',
`icon_uri` varchar(255) DEFAULT NULL COMMENT '插件图标URI',
`plugin_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '插件类型',
`config` json DEFAULT NULL COMMENT '插件配置信息',
`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',
`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',
`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',
PRIMARY KEY (`id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='插件草稿表';
-- 工作流元数据表
CREATE TABLE IF NOT EXISTS `workflow_meta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',
`creator_id` bigint(20) NOT NULL COMMENT '工作流创建者ID',
`name` varchar(255) NOT NULL COMMENT '工作流名称',
`description` text COMMENT '工作流描述',
`icon_uri` varchar(255) DEFAULT NULL COMMENT '工作流图标URI',
`workflow_config` json DEFAULT NULL COMMENT '工作流配置信息',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态',
`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',
`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',
`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',
PRIMARY KEY (`id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工作流元数据表';
-- 知识库表
CREATE TABLE IF NOT EXISTS `knowledge` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',
`creator_id` bigint(20) NOT NULL COMMENT '知识库创建者ID',
`name` varchar(255) NOT NULL COMMENT '知识库名称',
`description` text COMMENT '知识库描述',
`icon_uri` varchar(255) DEFAULT NULL COMMENT '知识库图标URI',
`knowledge_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '知识库类型',
`config` json DEFAULT NULL COMMENT '知识库配置信息',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '知识库状态',
`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',
`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',
`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',
PRIMARY KEY (`id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='知识库表';
-- 提示词资源表
CREATE TABLE IF NOT EXISTS `prompt_resource` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',
`creator_id` bigint(20) NOT NULL COMMENT '提示词创建者ID',
`name` varchar(255) NOT NULL COMMENT '提示词名称',
`description` text COMMENT '提示词描述',
`content` text NOT NULL COMMENT '提示词内容',
`category` varchar(100) DEFAULT NULL COMMENT '提示词分类',
`tags` json DEFAULT NULL COMMENT '提示词标签',
`version` varchar(50) DEFAULT '1.0' COMMENT '提示词版本',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '提示词状态',
`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',
`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',
`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',
PRIMARY KEY (`id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_category` (`category`),
KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='提示词资源表';
-- 在线数据库信息表
CREATE TABLE IF NOT EXISTS `online_database_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',
`creator_id` bigint(20) NOT NULL COMMENT '数据库创建者ID',
`name` varchar(255) NOT NULL COMMENT '数据库名称',
`description` text COMMENT '数据库描述',
`database_type` varchar(50) NOT NULL COMMENT '数据库类型',
`connection_config` json DEFAULT NULL COMMENT '数据库连接配置',
`schema_info` json DEFAULT NULL COMMENT '数据库模式信息',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '数据库状态',
`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',
`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',
`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',
PRIMARY KEY (`id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_database_type` (`database_type`),
KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='在线数据库信息表';
ElasticSearch 索引结构
索引名称:coze_resource
用途:统一存储所有类型的资源信息,包括插件(plugin_draft)、工作流(workflow_meta)、知识库(knowledge)、提示词(prompt_resource)和数据库(online_database_info)等资源的元数据。
索引结构:
{
"mappings": {
"properties": {
"res_id": {
"type": "long",
"description": "资源ID,唯一标识"
},
"res_type": {
"type": "integer",
"description": "资源类型:1=Plugin, 2=Workflow, 3=Imageflow, 4=Knowledge, 5=UI, 6=Prompt, 7=Database, 8=Variable, 9=Voice"
},
"res_sub_type": {
"type": "integer",
"description": "资源子类型,可选"
},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"description": "资源名称,支持全文搜索"
},
"owner_id": {
"type": "long",
"description": "资源所有者ID"
},
"space_id": {
"type": "long",
"description": "工作空间ID"
},
"app_id": {
"type": "long",
"description": "关联的应用ID,可选"
},
"biz_status": {
"type": "long",
"description": "业务状态"
},
"publish_status": {
"type": "integer",
"description": "发布状态"
},
"create_time": {
"type": "long",
"description": "创建时间戳(毫秒)"
},
"update_time": {
"type": "long",
"description": "更新时间戳(毫秒)"
},
"publish_time": {
"type": "long",
"description": "发布时间戳(毫秒)"
}
}
}
}
资源类型映射:
数据库表 | 资源类型 | ResType值 | 说明 |
---|---|---|---|
plugin_draft | Plugin | 1 | 插件资源 |
workflow_meta | Workflow | 2 | 工作流资源 |
knowledge | Knowledge | 4 | 知识库资源 |
prompt_resource | Prompt | 6 | 提示词资源 |
online_database_info | Database | 7 | 数据库资源 |
索引特点:
- 统一存储:所有类型的资源都存储在同一个索引中,通过
res_type
字段区分 - 高效搜索:支持按资源类型、名称、所有者、工作空间等多维度搜索
- 实时同步:通过事件总线机制,数据库变更会实时同步到ElasticSearch
- 权限隔离:通过
space_id
和owner_id
实现工作空间和用户级别的权限隔离 - 状态管理:支持资源的业务状态和发布状态管理
搜索功能:
- 全文搜索:支持按资源名称进行全文搜索
- 精确匹配:支持按资源ID、类型、所有者等进行精确匹配
- 范围查询:支持按时间范围、状态范围等进行查询
- 排序功能:支持按创建时间、更新时间、发布时间等排序
- 分页查询:支持高效的分页查询功能
数据同步机制:
通过 ResourceDomainEvent
事件和 resourceHandlerImpl
处理器实现数据库到ElasticSearch的实时同步:
// 资源索引名称
const resourceIndexName = "coze_resource"
// 资源事件处理
func (s *resourceHandlerImpl) indexResource(ctx context.Context, opType entity.OpType, r *entity.ResourceDocument) error {
switch opType {
case entity.Created:
return s.esClient.Create(ctx, resourceIndexName, conv.Int64ToStr(r.ResID), r)
case entity.Updated:
return s.esClient.Update(ctx, resourceIndexName, conv.Int64ToStr(r.ResID), r)
case entity.Deleted:
return s.esClient.Delete(ctx, resourceIndexName, conv.Int64ToStr(r.ResID))
}
return fmt.Errorf("unexpected op type: %v", opType)
}
使用场景:
- 资源搜索:在工作空间中搜索各类资源
- 资源列表:获取用户或工作空间的资源列表
- 资源推荐:基于使用频率和相关性推荐资源
- 权限控制:基于用户权限过滤可访问的资源
- 统计分析:资源使用情况统计和分析
8. 安全和权限验证机制
8.1 用户身份认证
JWT Token验证:
- 所有API请求都需要携带有效的JWT Token
- Token包含用户ID、工作空间权限等关键信息
- 通过中间件统一验证Token的有效性和完整性
// 用户身份验证中间件
func AuthMiddleware() app.HandlerFunc {
return func(c context.Context, ctx *app.RequestContext) {
token := ctx.GetHeader("Authorization")
if token == nil {
ctx.JSON(401, gin.H{"error": "未授权访问"})
ctx.Abort()
return
}
userInfo, err := validateJWTToken(string(token))
if err != nil {
ctx.JSON(401, gin.H{"error": "Token无效"})
ctx.Abort()
return
}
ctx.Set("user_id", userInfo.UserID)
ctx.Set("space_id", userInfo.SpaceID)
ctx.Next()
}
}
8.2 工作空间权限控制
空间隔离机制:
- 每个用户只能访问其所属工作空间的资源
- 通过
space_id
字段实现数据隔离 - 在数据库查询和ElasticSearch搜索中都强制添加空间过滤条件
// 工作空间权限验证
func (s *SearchApplicationService) validateSpacePermission(ctx context.Context, spaceID int64) error {
userSpaceID := ctx.Value("space_id").(int64)
if userSpaceID != spaceID {
return errors.New("无权限访问该工作空间资源")
}
return nil
}
8.3 资源级权限验证
资源所有权验证:
- 验证用户是否为资源的创建者或有权限访问者
- 支持资源的读取、编辑、删除等不同级别权限
- 通过
owner_id
和权限表进行权限判断
权限矩阵:
资源类型 | 创建者权限 | 工作空间成员权限 | 访客权限 |
---|---|---|---|
插件 | 读写删除 | 读取 | 无 |
工作流 | 读写删除 | 读取、复制 | 无 |
知识库 | 读写删除 | 读取 | 无 |
提示词 | 读写删除 | 读取、使用 | 无 |
数据库 | 读写删除 | 读取 | 无 |
8.4 API访问控制
请求频率限制:
- 实现基于用户和IP的请求频率限制
- 防止恶意请求和资源滥用
- 支持不同API端点的差异化限流策略
参数验证:
- 严格验证所有输入参数的格式和范围
- 防止SQL注入、XSS等安全攻击
- 使用白名单机制验证资源类型等枚举值
// 参数验证示例
func validateSearchRequest(req *resource.SearchResourceRequest) error {
if req.SpaceID <= 0 {
return errors.New("无效的工作空间ID")
}
if len(req.Query) > 100 {
return errors.New("搜索关键词过长")
}
for _, resType := range req.ResTypeFilter {
if !isValidResourceType(resType) {
return errors.New("无效的资源类型")
}
}
return nil
}
9. 错误处理和日志记录
9.1 分层错误处理机制
错误分类体系:
// 错误类型定义
type ErrorType int
const (
// 业务错误
ErrBusiness ErrorType = iota + 1000
ErrResourceNotFound
ErrPermissionDenied
ErrInvalidParameter
// 系统错误
ErrSystem ErrorType = iota + 2000
ErrDatabaseConnection
ErrElasticSearchTimeout
ErrServiceUnavailable
// 网络错误
ErrNetwork ErrorType = iota + 3000
ErrRequestTimeout
ErrConnectionRefused
)
错误处理流程:
- 捕获阶段:在各层级捕获具体错误
- 包装阶段:添加上下文信息和错误码
- 记录阶段:根据错误级别记录日志
- 响应阶段:返回用户友好的错误信息
9.2 统一错误响应格式
// 统一错误响应结构
type ErrorResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
TraceID string `json:"trace_id"`
}
// 错误处理中间件
func ErrorHandlerMiddleware() app.HandlerFunc {
return func(c context.Context, ctx *app.RequestContext) {
defer func() {
if err := recover(); err != nil {
traceID := ctx.GetString("trace_id")
logs.CtxErrorf(c, "Panic recovered: %v, traceID: %s", err, traceID)
ctx.JSON(500, ErrorResponse{
Code: 5000,
Message: "服务器内部错误",
TraceID: traceID,
})
}
}()
ctx.Next()
}
}
9.3 日志记录策略
日志级别定义:
- DEBUG:详细的调试信息,包括参数值、中间结果
- INFO:关键业务流程信息,如用户操作、资源访问
- WARN:潜在问题警告,如性能异常、参数异常
- ERROR:错误信息,包括业务错误和系统错误
- FATAL:严重错误,可能导致服务不可用
结构化日志格式:
// 日志记录示例
func (s *SearchApplicationService) SearchResources(ctx context.Context, req *resource.SearchResourceRequest) {
traceID := generateTraceID()
ctx = context.WithValue(ctx, "trace_id", traceID)
// 记录请求开始
logs.CtxInfof(ctx, "SearchResources started, userID=%d, spaceID=%d, query=%s, traceID=%s",
req.UserID, req.SpaceID, req.Query, traceID)
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
logs.CtxInfof(ctx, "SearchResources completed, duration=%dms, traceID=%s",
duration.Milliseconds(), traceID)
}()
// 业务逻辑处理...
}
日志内容规范:
- 请求日志:记录用户ID、工作空间ID、请求参数、TraceID
- 业务日志:记录关键业务操作、数据变更、权限验证结果
- 性能日志:记录接口响应时间、数据库查询时间、ES查询时间
- 错误日志:记录错误堆栈、上下文信息、影响范围
9.4 监控和告警
关键指标监控:
- 接口性能:响应时间、QPS、错误率
- 资源使用:数据库连接数、ES查询延迟、内存使用率
- 业务指标:资源搜索成功率、用户活跃度、资源访问频次
告警策略:
- 错误率告警:当错误率超过5%时触发告警
- 性能告警:当接口响应时间超过2秒时触发告警
- 资源告警:当数据库连接数超过80%时触发告警
10. 资源列表查询流程图
10.1 LibraryResourceList接口完整调用流程
用户登录 Coze 平台点击"资源库"场景的后端处理流程:
用户点击"资源库" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据查询层 → 响应返回
↓ ↓ ↓ ↓ ↓ ↓ ↓
前端路由跳转 HTTP POST请求 路由匹配 参数验证 权限检查 ES查询 JSON响应
↓ ↓ ↓ ↓ ↓ ↓ ↓
/space/:id/library /api/plugin_api/ Handler 请求绑定 用户身份 coze_ 资源列表
library_resource 函数调用 参数校验 Session resource 数据返回
_list LibraryResourceList 验证 索引查询
↓
SearchApplicationService
↓
构建ES查询条件
↓
执行资源搜索
↓
资源数据打包
↓
返回分页结果
10.2 详细流程说明
1. API网关层(路由处理)
文件位置:backend/api/handler/coze/resource_service.go
// @router /api/plugin_api/library_resource_list [POST]
func LibraryResourceList(ctx context.Context, c *app.RequestContext) {
var err error
var req resource.LibraryResourceListRequest
// 1. 请求参数绑定和验证
err = c.BindAndValidate(&req)
if err != nil {
invalidParamRequestResponse(c, err.Error())
return
}
// 2. 业务参数校验
if req.SpaceID <= 0 {
invalidParamRequestResponse(c, "space_id is invalid")
return
}
if req.GetSize() > 100 {
invalidParamRequestResponse(c, "size is too large")
return
}
// 3. 调用业务服务
resp, err := search.SearchSVC.LibraryResourceList(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
// 4. 返回JSON响应
c.JSON(consts.StatusOK, resp)
}
处理步骤:
- 路由匹配:
POST /api/plugin_api/library_resource_list
- 参数绑定:将HTTP请求体绑定到
LibraryResourceListRequest
结构体 - 参数验证:验证
space_id
有效性,限制size
不超过100 - 服务调用:调用搜索服务的
LibraryResourceList
方法 - 响应返回:返回JSON格式的响应数据
2. 业务服务层(SearchApplicationService)
文件位置:backend/application/search/resource_search.go
func (s *SearchApplicationService) LibraryResourceList(ctx context.Context, req *resource.LibraryResourceListRequest) (resp *resource.LibraryResourceListResponse, err error) {
// 1. 用户身份验证
userID := ctxutil.GetUIDFromCtx(ctx)
if userID == nil {
return nil, errorx.New(errno.ErrSearchPermissionCode, errorx.KV("msg", "session required"))
}
// 2. 构建搜索请求
searchReq := &entity.SearchResourcesRequest{
SpaceID: req.GetSpaceID(),
OwnerID: 0, // 资源库查询不限制所有者
Name: req.GetName(),
ResTypeFilter: req.GetResTypeFilter(),
PublishStatusFilter: req.GetPublishStatusFilter(),
SearchKeys: req.GetSearchKeys(),
Cursor: req.GetCursor(),
Limit: req.GetSize(),
}
// 3. 执行资源搜索
searchResp, err := s.searchDomainService.SearchResources(ctx, searchReq)
if err != nil {
return nil, err
}
// 4. 资源数据打包
resourceList, err := s.packResources(ctx, searchResp.ResourceDocuments, req.GetIsGetImageflow())
if err != nil {
return nil, err
}
// 5. 构建响应
resp = &resource.LibraryResourceListResponse{
Code: 0,
Msg: "success",
ResourceList: resourceList,
Cursor: searchResp.NextCursor,
HasMore: searchResp.HasMore,
}
return resp, nil
}
核心功能:
- 身份验证:从上下文中提取用户ID,验证用户登录状态
- 权限检查:验证用户对指定工作空间的访问权限
- 查询构建:构建ElasticSearch查询条件
- 数据打包:调用资源打包器处理查询结果
- 响应组装:构建标准化的响应数据结构
3. 领域服务层(资源搜索)
核心功能:
- ES查询构建:根据过滤条件构建ElasticSearch查询DSL
- 索引查询:在
coze_resource
索引中执行搜索 - 结果处理:处理搜索结果,提取资源文档
查询条件包括:
space_id
:工作空间ID(必需)res_type_filter
:资源类型过滤(Plugin、Workflow、Knowledge、Prompt、Database)publish_status_filter
:发布状态过滤name
:资源名称模糊匹配search_keys
:自定义搜索字段
4. 数据库操作层
ElasticSearch查询:
- 索引:
coze_resource
- 查询类型:复合查询(bool query)
- 过滤条件:
term
:精确匹配(space_id、res_type等)match
:模糊匹配(name字段)range
:范围查询(时间等)
- 排序:按
update_time_ms
降序 - 分页:使用
search_after
游标分页
5. 资源打包器(ProjectPackager)
打包流程:
func (s *SearchApplicationService) packResources(ctx context.Context, docs []*entity.ResourceDocument, isGetImageflow bool) ([]*common.ResourceInfo, error) {
var resourceList []*common.ResourceInfo
for _, doc := range docs {
// 1. 基础信息转换
resourceInfo := &common.ResourceInfo{
ResID: doc.GetResID(),
ResType: doc.GetResType(),
Name: doc.GetName(),
CreateTimeMS: doc.GetCreateTimeMS(),
UpdateTimeMS: doc.GetUpdateTimeMS(),
PublishStatus: doc.GetPublishStatus(),
}
// 2. 根据资源类型调用对应打包器
switch doc.GetResType() {
case common.ResType_Plugin:
err := s.packPluginResource(ctx, resourceInfo, doc)
case common.ResType_Workflow:
err := s.packWorkflowResource(ctx, resourceInfo, doc, isGetImageflow)
case common.ResType_Knowledge:
err := s.packKnowledgeResource(ctx, resourceInfo, doc)
case common.ResType_Prompt:
err := s.packPromptResource(ctx, resourceInfo, doc)
case common.ResType_Database:
err := s.packDatabaseResource(ctx, resourceInfo, doc)
}
// 3. 设置资源图标
resourceInfo.IconURL = s.getResourceIconURL(doc.GetResType())
resourceList = append(resourceList, resourceInfo)
}
return resourceList, nil
}
打包内容:
- 基础信息:资源ID、名称、类型、状态、时间等
- 类型特定信息:根据资源类型补充专有字段
- 扩展信息:图标URL、描述、标签等
- 权限信息:用户对资源的操作权限
6. 响应数据结构
LibraryResourceListResponse:
type LibraryResourceListResponse struct {
Code int64 `json:"code"` // 响应码
Msg string `json:"msg"` // 响应消息
ResourceList []*common.ResourceInfo `json:"resource_list"` // 资源列表
Cursor string `json:"cursor"` // 下一页游标
HasMore bool `json:"has_more"` // 是否有更多数据
BaseResp *base.BaseResp `json:"base_resp"` // 基础响应信息
}
ResourceInfo结构:
- 基础信息:ID、名称、类型、状态、时间等
- 扩展信息:图标URL、描述、标签等
- 类型特定信息:根据资源类型补充的专有字段
10.3 关键时间节点
阶段 | 预期耗时 | 超时阈值 | 说明 |
---|---|---|---|
参数验证 | <5ms | 50ms | 请求参数校验 |
身份验证 | <10ms | 100ms | Session验证 |
权限验证 | <20ms | 200ms | 工作空间权限查询 |
ES查询 | <100ms | 2000ms | ElasticSearch搜索 |
资源打包 | <50ms | 500ms | 数据转换和补充 |
总耗时 | <200ms | 3000ms | 端到端响应时间 |
10.4 技术特点
- 统一索引管理:所有资源类型统一存储在
coze_resource
索引中 - 灵活查询条件:支持多维度过滤和搜索
- 游标分页:使用ElasticSearch的
search_after
实现高效分页 - 资源类型打包:针对不同资源类型提供专门的数据打包逻辑
- 权限控制:基于用户Session和工作空间权限进行访问控制
11. 核心技术特点
11.1 分层架构设计
清晰的职责分离:
- API层:负责HTTP请求处理、参数验证、响应格式化
- 应用层:负责业务逻辑编排、服务组合、事务管理
- 领域层:负责核心业务逻辑、实体定义、业务规则
- 基础设施层:负责数据访问、外部服务集成、技术实现
依赖倒置原则:
- 高层模块不依赖低层模块,都依赖于抽象
- 通过接口定义实现解耦,便于测试和扩展
- 支持不同存储引擎和搜索引擎的灵活切换
11.2 ElasticSearch搜索引擎
统一索引设计:
- 所有资源类型统一存储在
coze_resource
索引中 - 通过
res_type
字段区分不同资源类型 - 支持全文搜索、精确匹配、范围查询等多种搜索模式
实时数据同步:
- 基于事件驱动架构实现数据库到ES的实时同步
- 支持增量更新,避免全量重建索引
- 异步处理机制确保主业务流程不受影响
高性能查询优化:
- 合理的字段映射和分析器配置
- 支持分页查询和游标查询
- 查询结果缓存和预加载机制
11.3 事件驱动架构
异步事件处理:
- 使用消息队列实现事件的异步处理
- 支持事件的重试和死信队列机制
- 确保数据最终一致性
事件类型定义:
type OpType string
const (
Created OpType = "created" // 资源创建事件
Updated OpType = "updated" // 资源更新事件
Deleted OpType = "deleted" // 资源删除事件
)
事件处理流程:
- 数据库操作完成后发布领域事件
- 事件总线接收并路由事件到相应处理器
- 处理器更新ElasticSearch索引
- 记录处理结果和异常信息
11.4 多数据源集成
MySQL关系型数据库:
- 存储结构化的资源元数据
- 支持ACID事务保证数据一致性
- 通过索引优化查询性能
ElasticSearch搜索引擎:
- 提供强大的全文搜索能力
- 支持复杂的聚合查询和统计分析
- 水平扩展能力强,支持大数据量
数据一致性保证:
- 以MySQL为主数据源,ES为搜索引擎
- 通过事件机制保证数据最终一致性
- 支持数据校验和修复机制
11.5 安全和权限控制
多层次权限验证:
- 用户身份认证(JWT Token)
- 工作空间权限控制(Space级别隔离)
- 资源级权限验证(Owner权限)
数据隔离机制:
- 通过
space_id
实现工作空间级别的数据隔离 - 在所有查询中强制添加空间过滤条件
- 防止跨空间的数据泄露
11.6 可观测性设计
全链路追踪:
- 为每个请求生成唯一的TraceID
- 在日志中记录TraceID便于问题排查
- 支持分布式链路追踪
结构化日志:
- 统一的日志格式和字段定义
- 支持日志聚合和分析
- 不同级别的日志记录策略
性能监控:
- 接口响应时间监控
- 数据库和ES查询性能监控
- 系统资源使用率监控
12. 总结
12.1 架构优势
Coze资源库后端采用了现代化的微服务架构设计,具有以下显著优势:
1. 高可扩展性
- 分层架构设计使得各层职责清晰,便于独立扩展
- ElasticSearch的水平扩展能力支持大规模数据处理
- 事件驱动架构支持异步处理,提高系统吞吐量
2. 高可用性
- 多数据源设计提供数据冗余和故障转移能力
- 异步事件处理确保主业务流程的稳定性
- 完善的错误处理和重试机制
3. 高性能
- ElasticSearch提供毫秒级的搜索响应
- 合理的索引设计和查询优化
- 缓存机制减少重复查询开销
4. 高安全性
- 多层次的权限验证机制
- 工作空间级别的数据隔离
- 完善的参数验证和安全防护
12.2 技术亮点
1. 统一资源索引设计
- 将不同类型的资源统一存储在一个ES索引中
- 通过资源类型字段实现逻辑分离
- 简化了索引管理和跨资源类型搜索
2. 事件驱动的数据同步
- 基于领域事件实现数据库到ES的实时同步
- 保证了数据的最终一致性
- 支持事件重放和数据修复
3. 分层的错误处理机制
- 统一的错误分类和处理流程
- 用户友好的错误信息返回
- 完善的日志记录和监控告警
通过以上的架构设计和技术实现,Coze资源库后端为用户提供了高效、安全、可靠的资源管理和搜索服务,为AI应用开发提供了强有力的基础设施支撑。