6. 基础设施层
基础设施层为知识库创建功能提供底层技术支撑,包括数据存储、缓存、消息队列、文档处理、向量化等核心服务。
6.1 数据存储服务
6.1.1 MySQL数据库
文件位置: backend/infra/rdb/mysql.go
// MySQLConfig MySQL配置
type MySQLConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
MaxOpenConns int `yaml:"max_open_conns"`
MaxIdleConns int `yaml:"max_idle_conns"`
MaxLifetime int `yaml:"max_lifetime"`
}
// NewMySQLConnection 创建MySQL连接
func NewMySQLConnection(config *MySQLConfig) (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.Username, config.Password, config.Host, config.Port, config.Database)
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
})
if err != nil {
return nil, fmt.Errorf("连接MySQL失败: %w", err)
}
sqlDB, err := db.DB()
if err != nil {
return nil, fmt.Errorf("获取SQL DB失败: %w", err)
}
// 设置连接池参数
sqlDB.SetMaxOpenConns(config.MaxOpenConns)
sqlDB.SetMaxIdleConns(config.MaxIdleConns)
sqlDB.SetConnMaxLifetime(time.Duration(config.MaxLifetime) * time.Second)
return db, nil
}
6.1.2 Redis缓存
文件位置: backend/infra/cache/redis.go
// RedisConfig Redis配置
type RedisConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Password string `yaml:"password"`
DB int `yaml:"db"`
PoolSize int `yaml:"pool_size"`
}
// NewRedisClient 创建Redis客户端
func NewRedisClient(config *RedisConfig) *redis.Client {
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),
Password: config.Password,
DB: config.DB,
PoolSize: config.PoolSize,
})
return rdb
}
// KnowledgeCacheManager 知识库缓存管理器
type KnowledgeCacheManager struct {
redisClient *redis.Client
localCache *cache.Cache
}
func (c *KnowledgeCacheManager) SetKnowledge(ctx context.Context, knowledge *model.Knowledge) error {
// 1. 序列化知识库数据
data, err := json.Marshal(knowledge)
if err != nil {
return fmt.Errorf("序列化知识库数据失败: %w", err)
}
// 2. 设置Redis缓存
cacheKey := fmt.Sprintf("knowledge:%d", knowledge.ID)
err = c.redisClient.Set(ctx, cacheKey, data, time.Hour).Err()
if err != nil {
return fmt.Errorf("设置Redis缓存失败: %w", err)
}
// 3. 设置本地缓存
c.localCache.Set(cacheKey, knowledge, time.Hour)
return nil
}
6.2 文档处理服务
6.2.1 文档解析器
文件位置: backend/infra/document/parser.go
// DocumentParser 文档解析器接口
type DocumentParser interface {
Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error)
SupportedTypes() []string
}
// ParseResult 解析结果
type ParseResult struct {
Content string `json:"content"`
Metadata map[string]string `json:"metadata"`
Sections []*Section `json:"sections"`
WordCount int `json:"word_count"`
}
// Section 文档章节
type Section struct {
Title string `json:"title"`
Content string `json:"content"`
Level int `json:"level"`
}
// UniversalDocumentParser 通用文档解析器
type UniversalDocumentParser struct {
parsers map[string]DocumentParser
}
func (p *UniversalDocumentParser) Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error) {
parser, exists := p.parsers[fileType]
if !exists {
return nil, fmt.Errorf("不支持的文件类型: %s", fileType)
}
result, err := parser.Parse(ctx, file, fileType)
if err != nil {
return nil, fmt.Errorf("解析文档失败: %w", err)
}
return result, nil
}
6.2.2 文档分片器
文件位置: backend/infra/document/splitter.go
// DocumentSplitter 文档分片器
type DocumentSplitter struct {
maxChunkSize int
overlapSize int
separators []string
}
// SplitDocument 分割文档
func (s *DocumentSplitter) SplitDocument(ctx context.Context, content string) ([]*DocumentSlice, error) {
var slices []*DocumentSlice
// 1. 按段落分割
paragraphs := strings.Split(content, "\n\n")
var currentSlice strings.Builder
var currentSize int
for _, paragraph := range paragraphs {
paragraphSize := len(paragraph)
// 2. 检查是否需要创建新分片
if currentSize+paragraphSize > s.maxChunkSize && currentSize > 0 {
// 创建当前分片
slice := &DocumentSlice{
Content: currentSlice.String(),
WordCount: currentSize,
Index: len(slices),
}
slices = append(slices, slice)
// 重置当前分片
currentSlice.Reset()
currentSize = 0
// 添加重叠内容
if s.overlapSize > 0 {
overlapContent := s.getOverlapContent(slice.Content, s.overlapSize)
currentSlice.WriteString(overlapContent)
currentSize = len(overlapContent)
}
}
// 3. 添加段落到当前分片
if currentSize > 0 {
currentSlice.WriteString("\n\n")
currentSize += 2
}
currentSlice.WriteString(paragraph)
currentSize += paragraphSize
}
// 4. 处理最后一个分片
if currentSize > 0 {
slice := &DocumentSlice{
Content: currentSlice.String(),
WordCount: currentSize,
Index: len(slices),
}
slices = append(slices, slice)
}
return slices, nil
}
// DocumentSlice 文档分片
type DocumentSlice struct {
Content string `json:"content"`
WordCount int `json:"word_count"`
Index int `json:"index"`
Vector []float32 `json:"vector,omitempty"`
}
6.3 向量化服务
6.3.1 向量化引擎
文件位置: backend/infra/embedding/engine.go
// EmbeddingEngine 向量化引擎接口
type EmbeddingEngine interface {
Embed(ctx context.Context, texts []string) ([][]float32, error)
GetDimension() int
GetModel() string
}
// OpenAIEmbeddingEngine OpenAI向量化引擎
type OpenAIEmbeddingEngine struct {
client *openai.Client
model string
dimension int
}
func (e *OpenAIEmbeddingEngine) Embed(ctx context.Context, texts []string) ([][]float32, error) {
// 1. 构建请求
req := openai.EmbeddingRequest{
Input: texts,
Model: openai.EmbeddingModel(e.model),
}
// 2. 调用OpenAI API
resp, err := e.client.CreateEmbeddings(ctx, req)
if err != nil {
return nil, fmt.Errorf("调用OpenAI向量化API失败: %w", err)
}
// 3. 提取向量数据
vectors := make([][]float32, len(resp.Data))
for i, embedding := range resp.Data {
vectors[i] = make([]float32, len(embedding.Embedding))
for j, val := range embedding.Embedding {
vectors[i][j] = float32(val)
}
}
return vectors, nil
}
6.4 向量存储服务
6.4.1 Milvus向量数据库
文件位置: backend/infra/searchstore/milvus/client.go
// MilvusClient Milvus客户端
type MilvusClient struct {
client milvus.Client
config *MilvusConfig
}
// MilvusConfig Milvus配置
type MilvusConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
}
// CreateCollection 创建集合
func (c *MilvusClient) CreateCollection(ctx context.Context, collectionName string, dimension int) error {
// 1. 定义字段
fields := []*entity.Field{
{
Name: "id",
DataType: entity.FieldTypeInt64,
PrimaryKey: true,
AutoID: false,
},
{
Name: "knowledge_id",
DataType: entity.FieldTypeInt64,
},
{
Name: "document_id",
DataType: entity.FieldTypeInt64,
},
{
Name: "slice_id",
DataType: entity.FieldTypeInt64,
},
{
Name: "content",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{
"max_length": "65535",
},
},
{
Name: "vector",
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": fmt.Sprintf("%d", dimension),
},
},
}
// 2. 创建集合
schema := &entity.Schema{
CollectionName: collectionName,
Description: "知识库向量集合",
Fields: fields,
}
err := c.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
if err != nil {
return fmt.Errorf("创建Milvus集合失败: %w", err)
}
// 3. 创建索引
indexParam := entity.NewIndexIvfFlat(entity.L2, 1024)
err = c.client.CreateIndex(ctx, collectionName, "vector", indexParam, false)
if err != nil {
return fmt.Errorf("创建向量索引失败: %w", err)
}
return nil
}
// InsertVectors 插入向量
func (c *MilvusClient) InsertVectors(ctx context.Context, collectionName string, data *VectorData) error {
// 1. 准备数据
ids := make([]int64, len(data.IDs))
knowledgeIDs := make([]int64, len(data.IDs))
documentIDs := make([]int64, len(data.IDs))
sliceIDs := make([]int64, len(data.IDs))
contents := make([]string, len(data.IDs))
vectors := make([][]float32, len(data.IDs))
for i, item := range data.Items {
ids[i] = item.ID
knowledgeIDs[i] = item.KnowledgeID
documentIDs[i] = item.DocumentID
sliceIDs[i] = item.SliceID
contents[i] = item.Content
vectors[i] = item.Vector
}
// 2. 构建列数据
columns := []entity.Column{
entity.NewColumnInt64("id", ids),
entity.NewColumnInt64("knowledge_id", knowledgeIDs),
entity.NewColumnInt64("document_id", documentIDs),
entity.NewColumnInt64("slice_id", sliceIDs),
entity.NewColumnVarChar("content", contents),
entity.NewColumnFloatVector("vector", dimension, vectors),
}
// 3. 插入数据
_, err := c.client.Insert(ctx, collectionName, "", columns...)
if err != nil {
return fmt.Errorf("插入向量数据失败: %w", err)
}
return nil
}
6.5 消息队列服务
6.5.1 事件总线
文件位置: backend/infra/eventbus/eventbus.go
// EventBus 事件总线接口
type EventBus interface {
Publish(ctx context.Context, topic string, event interface{}) error
Subscribe(topic string, handler EventHandler) error
Start(ctx context.Context) error
Stop() error
}
// EventHandler 事件处理器
type EventHandler func(ctx context.Context, event interface{}) error
// KafkaEventBus Kafka事件总线
type KafkaEventBus struct {
producer sarama.SyncProducer
consumer sarama.ConsumerGroup
config *KafkaConfig
handlers map[string][]EventHandler
}
// KafkaConfig Kafka配置
type KafkaConfig struct {
Brokers []string `yaml:"brokers"`
GroupID string `yaml:"group_id"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
// Publish 发布事件
func (k *KafkaEventBus) Publish(ctx context.Context, topic string, event interface{}) error {
// 1. 序列化事件
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("序列化事件失败: %w", err)
}
// 2. 构建消息
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(data),
Headers: []sarama.RecordHeader{
{
Key: []byte("event_type"),
Value: []byte(reflect.TypeOf(event).Name()),
},
{
Key: []byte("timestamp"),
Value: []byte(fmt.Sprintf("%d", time.Now().Unix())),
},
},
}
// 3. 发送消息
_, _, err = k.producer.SendMessage(msg)
if err != nil {
return fmt.Errorf("发送Kafka消息失败: %w", err)
}
return nil
}
6.6 搜索服务
6.6.1 ElasticSearch
文件位置: backend/infra/es/client.go
// ESClient ElasticSearch客户端
type ESClient struct {
client *elasticsearch.Client
config *ESConfig
}
// ESConfig ElasticSearch配置
type ESConfig struct {
Addresses []string `yaml:"addresses"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Index string `yaml:"index"`
}
// CreateKnowledgeIndex 创建知识库索引
func (c *ESClient) CreateKnowledgeIndex(ctx context.Context, indexName string) error {
// 1. 定义索引映射
mapping := map[string]interface{}{
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"knowledge_id": map[string]interface{}{
"type": "long",
},
"name": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"description": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"content": map[string]interface{}{
"type": "text",
"analyzer": "ik_max_word",
},
"space_id": map[string]interface{}{
"type": "long",
},
"creator_id": map[string]interface{}{
"type": "long",
},
"created_at": map[string]interface{}{
"type": "date",
},
"status": map[string]interface{}{
"type": "integer",
},
},
},
"settings": map[string]interface{}{
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": map[string]interface{}{
"analyzer": map[string]interface{}{
"ik_max_word": map[string]interface{}{
"type": "ik_max_word",
"tokenizer": "ik_max_word",
},
},
},
},
}
// 2. 创建索引
mappingJSON, _ := json.Marshal(mapping)
req := esapi.IndicesCreateRequest{
Index: indexName,
Body: strings.NewReader(string(mappingJSON)),
}
res, err := req.Do(ctx, c.client)
if err != nil {
return fmt.Errorf("创建ES索引失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("创建ES索引失败: %s", res.String())
}
return nil
}
6.7 配置管理
6.7.1 配置中心
文件位置: backend/infra/config/config.go
// Config 应用配置
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
Redis RedisConfig `yaml:"redis"`
Kafka KafkaConfig `yaml:"kafka"`
Milvus MilvusConfig `yaml:"milvus"`
ES ESConfig `yaml:"elasticsearch"`
Embedding EmbeddingConfig `yaml:"embedding"`
}
// ServerConfig 服务器配置
type ServerConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Mode string `yaml:"mode"`
}
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
MySQL MySQLConfig `yaml:"mysql"`
}
// EmbeddingConfig 向量化配置
type EmbeddingConfig struct {
Provider string `yaml:"provider"`
Model string `yaml:"model"`
APIKey string `yaml:"api_key"`
Dimension int `yaml:"dimension"`
}
// LoadConfig 加载配置
func LoadConfig(configPath string) (*Config, error) {
// 1. 读取配置文件
data, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("读取配置文件失败: %w", err)
}
// 2. 解析YAML配置
var config Config
err = yaml.Unmarshal(data, &config)
if err != nil {
return nil, fmt.Errorf("解析配置文件失败: %w", err)
}
// 3. 环境变量覆盖
err = envconfig.Process("", &config)
if err != nil {
return nil, fmt.Errorf("处理环境变量失败: %w", err)
}
return &config, nil
}
6.8 基础设施层总结
基础设施层为知识库创建功能提供了完整的技术支撑:
- 数据存储: MySQL主数据库 + Redis缓存
- 文档处理: 多格式文档解析 + 智能分片
- 向量化: OpenAI/本地模型向量化
- 向量存储: Milvus向量数据库
- 搜索引擎: ElasticSearch全文搜索
- 消息队列: Kafka事件驱动
- 配置管理: 统一配置中心
这些基础设施服务通过依赖注入的方式集成到上层业务逻辑中,确保了系统的可扩展性和可维护性。
7. 数据存储层
7.1 数据库表结构
knowledge_base 表设计
文件位置:helm/charts/opencoze/files/mysql/schema.sql
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_base` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'knowledge base id',
`space_id` bigint NOT NULL COMMENT 'space id',
`creator_id` bigint NOT NULL COMMENT 'creator user id',
`name` varchar(255) NOT NULL COMMENT 'knowledge base name',
`description` text NULL COMMENT 'knowledge base description',
`icon_uri` varchar(255) NULL COMMENT 'icon uri',
`status` int NOT NULL DEFAULT 1 COMMENT 'status: 1-active, 2-deleted',
`embedding_model` varchar(100) NOT NULL COMMENT 'embedding model name',
`chunk_size` int NOT NULL DEFAULT 1000 COMMENT 'document chunk size',
`chunk_overlap` int NOT NULL DEFAULT 200 COMMENT 'chunk overlap size',
`document_count` int NOT NULL DEFAULT 0 COMMENT 'total document count',
`total_size` bigint NOT NULL DEFAULT 0 COMMENT 'total storage size in bytes',
`settings` json NULL COMMENT 'knowledge base settings',
`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
PRIMARY KEY (`id`),
INDEX `idx_creator_id` (`creator_id`),
INDEX `idx_space_id` (`space_id`),
INDEX `idx_status` (`status`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_base';
knowledge_document 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_document` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'document id',
`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',
`name` varchar(255) NOT NULL COMMENT 'document name',
`file_type` varchar(50) NOT NULL COMMENT 'file type: pdf, txt, docx, etc',
`file_size` bigint NOT NULL COMMENT 'file size in bytes',
`file_path` varchar(500) NOT NULL COMMENT 'file storage path',
`content_hash` varchar(64) NOT NULL COMMENT 'content hash for deduplication',
`chunk_count` int NOT NULL DEFAULT 0 COMMENT 'total chunk count',
`processing_status` int NOT NULL DEFAULT 1 COMMENT 'processing status: 1-pending, 2-processing, 3-completed, 4-failed',
`error_message` text NULL COMMENT 'error message if processing failed',
`metadata` json NULL COMMENT 'document metadata',
`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
PRIMARY KEY (`id`),
INDEX `idx_knowledge_base_id` (`knowledge_base_id`),
INDEX `idx_processing_status` (`processing_status`),
INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_document';
knowledge_chunk 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_chunk` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'chunk id',
`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',
`document_id` bigint NOT NULL COMMENT 'document id',
`chunk_index` int NOT NULL COMMENT 'chunk index in document',
`content` text NOT NULL COMMENT 'chunk content',
`content_hash` varchar(64) NOT NULL COMMENT 'content hash',
`token_count` int NOT NULL DEFAULT 0 COMMENT 'token count',
`embedding_vector` json NULL COMMENT 'embedding vector data',
`metadata` json NULL COMMENT 'chunk metadata',
`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
PRIMARY KEY (`id`),
INDEX `idx_knowledge_base_id` (`knowledge_base_id`),
INDEX `idx_document_id` (`document_id`),
INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_chunk';
表结构特点:
- 关联设计:knowledge_base、knowledge_document和knowledge_chunk通过外键关联,支持级联查询
- 空间隔离:通过
space_id
实现多租户数据隔离 - JSON存储:
settings
、metadata
和embedding_vector
使用JSON类型,支持复杂结构数据 - 状态管理:knowledge_document表包含处理状态字段,支持异步处理流程
- 索引优化:在关键查询字段上建立索引,优化查询性能
- 字符集:使用
utf8mb4_0900_ai_ci
排序规则,支持完整的Unicode字符集 - 向量存储:支持嵌入向量的JSON存储,便于语义搜索
- 去重机制:通过content_hash实现内容去重
knowledge_base字段详解:
id
:自增主键,唯一标识每个知识库space_id
:工作空间ID,实现租户级别的数据隔离creator_id
:创建者用户ID,用于权限控制和查询优化name
:知识库名称description
:知识库描述信息icon_uri
:知识库图标URIstatus
:知识库状态(1-活跃,2-已删除)embedding_model
:嵌入模型名称chunk_size
:文档分块大小chunk_overlap
:分块重叠大小document_count
:文档总数total_size
:总存储大小(字节)settings
:知识库设置,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
knowledge_document字段详解:
id
:自增主键,唯一标识每个文档knowledge_base_id
:关联的知识库IDname
:文档名称file_type
:文件类型(pdf、txt、docx等)file_size
:文件大小(字节)file_path
:文件存储路径content_hash
:内容哈希,用于去重chunk_count
:分块总数processing_status
:处理状态(1-待处理,2-处理中,3-已完成,4-失败)error_message
:处理失败时的错误信息metadata
:文档元数据,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
knowledge_chunk字段详解:
id
:自增主键,唯一标识每个分块knowledge_base_id
:关联的知识库IDdocument_id
:关联的文档IDchunk_index
:在文档中的分块索引content
:分块内容content_hash
:内容哈希token_count
:令牌数量embedding_vector
:嵌入向量数据,JSON格式metadata
:分块元数据,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
7.2 ElasticSearch 索引架构
coze_resource 统一索引
索引设计理念:
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource
索引中,通过 res_type
字段进行类型区分。
知识库在索引中的映射:
{
"mappings": {
"properties": {
"res_id": {
"type": "long",
"description": "资源ID,对应knowledge_base.id"
},
"res_type": {
"type": "integer",
"description": "资源类型,知识库为4"
},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"description": "知识库名称,支持全文搜索和精确匹配"
},
"description": {
"type": "text",
"analyzer": "standard",
"description": "知识库描述,支持全文搜索"
},
"owner_id": {
"type": "long",
"description": "所有者ID,对应creator_id"
},
"space_id": {
"type": "long",
"description": "工作空间ID"
},
"embedding_model": {
"type": "keyword",
"description": "嵌入模型名称"
},
"document_count": {
"type": "integer",
"description": "文档数量"
},
"total_size": {
"type": "long",
"description": "总存储大小"
},
"status": {
"type": "integer",
"description": "知识库状态"
},
"create_time": {
"type": "long",
"description": "创建时间戳(毫秒)"
},
"update_time": {
"type": "long",
"description": "更新时间戳(毫秒)"
}
}
}
}
knowledge_content 内容索引
知识库内容专用索引:
{
"mappings": {
"properties": {
"chunk_id": {
"type": "long",
"description": "分块ID"
},
"knowledge_base_id": {
"type": "long",
"description": "知识库ID"
},
"document_id": {
"type": "long",
"description": "文档ID"
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"description": "分块内容,支持中文分词"
},
"embedding_vector": {
"type": "dense_vector",
"dims": 1536,
"description": "嵌入向量,用于语义搜索"
},
"metadata": {
"type": "object",
"description": "分块元数据"
},
"token_count": {
"type": "integer",
"description": "令牌数量"
}
}
}
}
资源类型常量定义:
const (
ResTypePlugin = 1 // 插件
ResTypeWorkflow = 2 // 工作流
ResTypeKnowledge = 4 // 知识库
ResTypePrompt = 6 // 提示词
ResTypeDatabase = 7 // 数据库
)
7.3 数据同步机制
事件驱动的创建同步架构
创建同步流程:
- 创建操作触发:知识库创建操作触发创建领域事件
- 事件发布:通过事件总线发布
ResourceDomainEvent
创建事件 - 事件处理:
resourceHandlerImpl
监听并处理创建事件 - 索引建立:将创建操作同步到ElasticSearch,建立相关索引
- 向量存储:同时在向量数据库中创建知识库向量空间
创建同步核心代码:
// 资源创建事件处理器
type resourceHandlerImpl struct {
esClient es.Client
vectorClient vector.Client
logger logs.Logger
}
// 处理知识库创建领域事件
func (r *resourceHandlerImpl) HandleKnowledgeCreateEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {
if event.OpType != entity.Created {
return fmt.Errorf("invalid operation type for create handler: %v", event.OpType)
}
// 记录创建操作日志
r.logger.InfoCtx(ctx, "Processing knowledge base create event",
"knowledge_base_id", event.ResID,
"space_id", event.SpaceID,
"operator_id", event.OperatorID)
// 创建ES索引
if err := r.createResourceIndex(ctx, event); err != nil {
return fmt.Errorf("create resource index failed: %w", err)
}
// 创建向量空间
if err := r.createVectorSpace(ctx, event); err != nil {
r.logger.WarnCtx(ctx, "Failed to create vector space",
"knowledge_base_id", event.ResID, "error", err)
// 向量空间创建失败不阻塞主流程
}
return nil
}
// 在索引中创建知识库
func (r *resourceHandlerImpl) createResourceIndex(ctx context.Context, event *entity.ResourceDomainEvent) error {
indexName := "coze_resource"
docID := conv.Int64ToStr(event.ResID)
// 构建索引文档
document := map[string]interface{}{
"res_id": event.ResID,
"res_type": 4, // 知识库类型
"name": event.Name,
"description": event.Description,
"owner_id": event.OperatorID,
"space_id": event.SpaceID,
"embedding_model": event.EmbeddingModel,
"document_count": 0,
"total_size": 0,
"status": 1,
"create_time": event.CreateTime,
"update_time": event.UpdateTime,
}
// 执行索引创建
err := r.esClient.Create(ctx, indexName, docID, document)
if err != nil {
r.logger.ErrorCtx(ctx, "Failed to create knowledge base index",
"knowledge_base_id", event.ResID, "error", err)
return fmt.Errorf("create knowledge base ES index failed: %w", err)
}
// 验证创建结果
exists, checkErr := r.esClient.Exists(ctx, indexName, docID)
if checkErr != nil {
r.logger.WarnCtx(ctx, "Failed to verify creation",
"knowledge_base_id", event.ResID, "error", checkErr)
} else if !exists {
r.logger.ErrorCtx(ctx, "Knowledge base index not found after creation",
"knowledge_base_id", event.ResID)
return fmt.Errorf("knowledge base creation verification failed")
}
r.logger.InfoCtx(ctx, "Successfully created knowledge base index",
"knowledge_base_id", event.ResID)
return nil
}
// 创建向量空间
func (r *resourceHandlerImpl) createVectorSpace(ctx context.Context, event *entity.ResourceDomainEvent) error {
spaceName := fmt.Sprintf("kb_%d", event.ResID)
// 创建向量集合
err := r.vectorClient.CreateCollection(ctx, &vector.CreateCollectionRequest{
CollectionName: spaceName,
Dimension: 1536, // OpenAI embedding维度
MetricType: "COSINE",
Description: fmt.Sprintf("Vector space for knowledge base %d", event.ResID),
})
if err != nil {
return fmt.Errorf("create vector collection failed: %w", err)
}
r.logger.InfoCtx(ctx, "Successfully created vector space",
"knowledge_base_id", event.ResID, "collection_name", spaceName)
return nil
}
7.4 知识库创建操作存储层设计原则
知识库创建数据一致性保证
- 创建一致性:采用事件驱动模式,保证MySQL创建和ElasticSearch索引建立的最终一致性
- 创建幂等性:知识库创建操作支持重试,避免重复创建导致的数据冲突
- 创建事务边界:知识库数据库创建操作和创建事件发布在同一事务中,保证原子性
- 创建验证:知识库创建完成后验证数据确实被正确存储,确保创建操作的完整性
- 向量空间创建:确保知识库创建时同步创建向量存储空间,维护数据完整性
知识库创建性能优化策略
- 创建索引优化:基于知识库主键ID的创建操作,具有最佳性能
- 批量创建:支持批量创建知识库操作,减少数据库和ES的操作次数
- 异步创建处理:知识库创建事件处理采用异步模式,不阻塞创建主流程
- 创建缓存预热:创建后及时预热知识库相关缓存,提高后续访问性能
- 分批向量创建:向量空间采用分批创建策略,避免大量向量创建时的性能问题
知识库创建操作扩展性考虑
- 分片创建:支持按
space_id
进行分片创建,提高大规模知识库创建的效率 - 创建队列:使用消息队列处理知识库创建事件,支持高并发创建场景
- 创建监控:独立的知识库创建操作监控,及时发现创建异常
- 多存储协调:协调MySQL、ElasticSearch、向量数据库等多存储的创建操作
知识库创建安全保障
- 权限验证:严格的知识库创建权限验证,确保只有授权用户可以创建
- 创建审计:完整的知识库创建操作审计日志,支持创建行为追踪
- 创建限制:实施知识库创建频率限制,防止恶意批量创建
- 数据备份:创建操作完成后及时备份知识库数据,支持数据恢复
- 向量验证:创建知识库时验证向量空间的创建完整性
- 重复检查:创建前检查知识库名称和配置是否重复,避免冲突
7.5 知识库创建操作监控和运维
知识库创建操作监控
// 知识库创建操作监控指标
type KnowledgeCreateMetrics struct {
KnowledgeCreateSuccessCount int64 // 知识库创建成功次数
KnowledgeCreateFailureCount int64 // 知识库创建失败次数
KnowledgeCreateLatency time.Duration // 知识库创建操作延迟
LastKnowledgeCreateTime time.Time // 最后知识库创建时间
KnowledgeIndexCreateCount int64 // 知识库索引创建次数
KnowledgeCreateEventCount int64 // 知识库创建事件处理次数
VectorSpaceCreateCount int64 // 向量空间创建次数
KnowledgeCreateQueueSize int64 // 知识库创建队列大小
KnowledgeCreateRateLimit int64 // 知识库创建频率限制触发次数
KnowledgeDuplicateCount int64 // 知识库重复创建检测次数
DocumentProcessingCount int64 // 文档处理次数
EmbeddingGenerationCount int64 // 向量生成次数
}
// 知识库创建监控指标收集
func (r *resourceHandlerImpl) collectKnowledgeCreateMetrics(ctx context.Context, startTime time.Time, knowledgeID int64, err error) {
latency := time.Since(startTime)
if err != nil {
metrics.KnowledgeCreateFailureCount++
log.ErrorCtx(ctx, "knowledge base create failed",
"knowledge_id", knowledgeID, "error", err, "latency", latency)
} else {
metrics.KnowledgeCreateSuccessCount++
metrics.KnowledgeCreateLatency = latency
metrics.LastKnowledgeCreateTime = time.Now()
log.InfoCtx(ctx, "knowledge base create succeeded",
"knowledge_id", knowledgeID, "latency", latency)
}
}
// 知识库创建操作健康检查
func (r *resourceHandlerImpl) knowledgeCreateHealthCheck(ctx context.Context) error {
// 检查数据库连接
if err := r.db.Ping(); err != nil {
return fmt.Errorf("database connection failed: %w", err)
}
// 检查ES连接
if _, err := r.esClient.Ping(ctx); err != nil {
return fmt.Errorf("elasticsearch connection failed: %w", err)
}
// 检查向量数据库连接
if err := r.vectorClient.Ping(ctx); err != nil {
return fmt.Errorf("vector database connection failed: %w", err)
}
// 检查知识库创建队列状态
if queueSize := r.getKnowledgeCreateQueueSize(); queueSize > 1000 {
return fmt.Errorf("knowledge create queue size too large: %d", queueSize)
}
// 检查向量空间创建状态
if vectorErrors := r.getVectorSpaceCreateErrors(); len(vectorErrors) > 10 {
return fmt.Errorf("too many vector space create errors: %d", len(vectorErrors))
}
// 检查创建频率限制状态
if rateLimitHits := r.getCreateRateLimitHits(); rateLimitHits > 100 {
return fmt.Errorf("too many rate limit hits: %d", rateLimitHits)
}
// 检查文档处理队列状态
if docQueueSize := r.getDocumentProcessingQueueSize(); docQueueSize > 5000 {
return fmt.Errorf("document processing queue size too large: %d", docQueueSize)
}
return nil
}
知识库创建数据质量保证
- 创建一致性检查:定期验证MySQL、ElasticSearch和向量数据库中知识库创建数据的一致性
- 创建完整性验证:确保知识库创建操作完全建立了相关数据、索引和向量空间
- 向量空间验证:验证知识库创建时向量空间的创建完整性和配置正确性
- 创建异常恢复:提供知识库创建失败的重试和修复机制
- 创建性能监控:监控知识库创建操作性能,及时发现和解决性能问题
- 创建审计追踪:完整记录知识库创建操作的执行过程和结果
- 多存储一致性:确保MySQL、ElasticSearch、向量数据库等多存储创建的一致性
- 重复检测:检测和防止知识库重复创建,维护数据唯一性
- 创建回滚机制:创建失败时的数据回滚和清理机制
- 文档处理监控:监控知识库创建过程中的文档处理和向量化进度
- 存储配额检查:创建前检查存储配额,确保有足够空间存储知识库数据
- 嵌入模型验证:验证知识库创建时指定的嵌入模型配置正确性
8. 知识库创建安全和权限验证机制
8.1 知识库创建身份认证
JWT Token验证:
- 创建知识库的所有API请求都需要携带有效的JWT Token
- Token包含用户ID、工作空间权限等关键信息
- 通过中间件统一验证Token的有效性和完整性
// 知识库创建身份验证中间件
func KnowledgeCreateAuthMiddleware() app.HandlerFunc {
return func(c context.Context, ctx *app.RequestContext) {
token := ctx.GetHeader("Authorization")
if token == nil {
ctx.JSON(401, gin.H{"error": "创建知识库需要登录认证"})
ctx.Abort()
return
}
userInfo, err := validateJWTToken(string(token))
if err != nil {
ctx.JSON(401, gin.H{"error": "Token无效,无法创建知识库"})
ctx.Abort()
return
}
// 验证用户是否有创建知识库的权限
if !userInfo.HasKnowledgeCreatePermission {
ctx.JSON(403, gin.H{"error": "用户无创建知识库权限"})
ctx.Abort()
return
}
ctx.Set("user_id", userInfo.UserID)
ctx.Set("space_id", userInfo.SpaceID)
ctx.Set("creator_id", userInfo.UserID)
ctx.Next()
}
}
8.2 知识库创建工作空间权限控制
空间隔离机制:
- 每个用户只能在其所属工作空间中创建知识库
- 通过
space_id
字段实现知识库创建权限隔离 - 在知识库创建操作中强制验证空间权限
// 知识库创建工作空间权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreateSpacePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {
userSpaceID := ctx.Value("space_id").(int64)
// 验证请求的空间ID是否与用户所属空间一致
if req.SpaceID != userSpaceID {
return errors.New("无权限在该工作空间创建知识库")
}
// 检查工作空间是否允许创建知识库
spaceConfig, err := s.spaceService.GetSpaceConfig(ctx, userSpaceID)
if err != nil {
return fmt.Errorf("获取工作空间配置失败: %w", err)
}
if !spaceConfig.AllowKnowledgeCreation {
return errors.New("该工作空间不允许创建知识库")
}
// 检查工作空间知识库数量限制
knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, userSpaceID)
if err != nil {
return fmt.Errorf("获取工作空间知识库数量失败: %w", err)
}
if knowledgeCount >= spaceConfig.MaxKnowledgeCount {
return fmt.Errorf("工作空间知识库数量已达上限: %d", spaceConfig.MaxKnowledgeCount)
}
// 检查工作空间存储配额
storageUsage, err := s.getSpaceStorageUsage(ctx, userSpaceID)
if err != nil {
return fmt.Errorf("获取工作空间存储使用量失败: %w", err)
}
if storageUsage >= spaceConfig.MaxStorageQuota {
return fmt.Errorf("工作空间存储配额已满: %d GB", spaceConfig.MaxStorageQuota/1024/1024/1024)
}
return nil
}
8.3 知识库创建资源级权限验证
知识库创建用户权限验证:
- 严格验证用户是否具有知识库创建权限
- 验证用户在指定工作空间的操作权限
- 通过存储配额和向量空间权限进行资源级控制
// 知识库创建权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreatePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {
userID := ctx.Value("user_id").(int64)
// 验证用户是否具有知识库创建权限
hasPermission, err := s.userService.HasKnowledgeCreatePermission(ctx, userID)
if err != nil {
return fmt.Errorf("验证知识库创建权限失败: %w", err)
}
if !hasPermission {
return errorx.New(errno.ErrKnowledgePermissionCode,
errorx.KV(errno.KnowledgeMsgKey, "用户无知识库创建权限"),
errorx.KV("user_id", userID))
}
// 验证工作空间权限
spacePermission, err := s.spaceService.CheckUserSpacePermission(ctx, userID, req.SpaceID)
if err != nil {
return fmt.Errorf("验证工作空间权限失败: %w", err)
}
if !spacePermission.CanCreateKnowledge {
return errorx.New(errno.ErrKnowledgeSpacePermissionCode,
errorx.KV(errno.KnowledgeMsgKey, "用户在该工作空间无知识库创建权限"),
errorx.KV("user_id", userID),
errorx.KV("space_id", req.SpaceID))
}
// 检查用户创建知识库频率限制
createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))
if err != nil {
return fmt.Errorf("检查知识库创建频率失败: %w", err)
}
if createCount >= 20 { // 24小时内最多创建20个知识库
return errorx.New(errno.ErrKnowledgeCreateRateLimitCode,
errorx.KV("user_id", userID),
errorx.KV("create_count", createCount))
}
// 检查知识库名称是否重复
exists, err := s.checkKnowledgeNameExists(ctx, req.SpaceID, req.Name)
if err != nil {
return fmt.Errorf("检查知识库名称重复失败: %w", err)
}
if exists {
return errorx.New(errno.ErrKnowledgeNameExistsCode,
errorx.KV("knowledge_name", req.Name),
errorx.KV("space_id", req.SpaceID))
}
// 检查存储配额
storageQuota, err := s.checkStorageQuota(ctx, userID, req.SpaceID)
if err != nil {
return fmt.Errorf("检查存储配额失败: %w", err)
}
if !storageQuota.CanCreate {
return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode,
errorx.KV("user_id", userID),
errorx.KV("used_storage", storageQuota.UsedStorage),
errorx.KV("max_storage", storageQuota.MaxStorage))
}
// 检查向量空间权限
vectorPermission, err := s.checkVectorSpacePermission(ctx, userID, req.EmbeddingModel)
if err != nil {
return fmt.Errorf("检查向量空间权限失败: %w", err)
}
if !vectorPermission.CanCreateSpace {
return errorx.New(errno.ErrKnowledgeVectorSpacePermissionCode,
errorx.KV("user_id", userID),
errorx.KV("embedding_model", req.EmbeddingModel))
}
return nil
}
// 检查知识库名称是否存在
func (s *KnowledgeApplicationService) checkKnowledgeNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {
// 检查同一工作空间下是否存在同名知识库
knowledges, err := s.DomainSVC.ListKnowledges(ctx, &service.ListKnowledgesRequest{
SpaceID: spaceID,
PageInfo: entity.PageInfo{PageSize: 1},
})
if err != nil {
return false, err
}
for _, knowledge := range knowledges.Knowledges {
if knowledge.Name == name {
return true, nil
}
}
return false, nil
}
// 检查存储配额
func (s *KnowledgeApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {
// 获取用户存储配额信息
quota, err := s.storageService.GetUserStorageQuota(ctx, userID)
if err != nil {
return nil, err
}
// 获取当前使用量
usage, err := s.storageService.GetUserStorageUsage(ctx, userID)
if err != nil {
return nil, err
}
return &StorageQuotaInfo{
UsedStorage: usage,
MaxStorage: quota,
CanCreate: usage < quota*0.95, // 使用率不超过95%
}, nil
}
// 检查向量空间权限
func (s *KnowledgeApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {
// 检查用户是否有权限使用指定的嵌入模型
modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)
if err != nil {
return nil, err
}
// 检查向量空间创建配额
spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)
if err != nil {
return nil, err
}
maxSpaces := s.getUserMaxVectorSpaces(userID)
return &VectorSpacePermission{
CanCreateSpace: modelPermission && spaceCount < maxSpaces,
CurrentSpaces: spaceCount,
MaxSpaces: maxSpaces,
}, nil
}
8.4 知识库创建API访问控制
创建请求频率限制:
- 实现基于用户的知识库创建频率限制
- 防止恶意批量创建知识库
- 支持不同用户等级的差异化创建限流策略
- 基于文档处理能力的动态限流
创建操作安全验证:
- 严格验证创建请求的合法性
- 防止恶意创建和资源滥用攻击
- 使用多重安全检查机制
- 文档内容安全扫描和验证
- 向量空间创建安全验证
// 知识库创建参数验证
func validateKnowledgeCreateRequest(req *service.CreateKnowledgeRequest) error {
if req.SpaceID <= 0 {
return errors.New("无效的工作空间ID")
}
if req.CreatorID <= 0 {
return errors.New("无效的创建者ID")
}
// 验证知识库名称
if req.Name == "" {
return errors.New("知识库名称不能为空")
}
if len(req.Name) > 100 {
return errors.New("知识库名称长度不能超过100字符")
}
// 验证知识库描述
if req.Description != "" && len(req.Description) > 1000 {
return errors.New("知识库描述长度不能超过1000字符")
}
// 验证嵌入模型
if req.EmbeddingModel == "" {
return errors.New("嵌入模型不能为空")
}
if !isValidEmbeddingModel(req.EmbeddingModel) {
return errors.New("不支持的嵌入模型")
}
// 验证分块策略
if req.ChunkStrategy != nil {
if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {
return errors.New("分块大小必须在1-8192之间")
}
if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {
return errors.New("分块重叠大小必须小于分块大小")
}
}
// 验证图标URI
if req.IconURI != "" && !isValidIconURI(req.IconURI) {
return errors.New("无效的图标URI格式")
}
return nil
}
// 知识库创建操作安全检查
func (s *KnowledgeApplicationService) validateKnowledgeCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {
userID := ctx.Value("user_id").(int64)
// 检查用户知识库创建频率限制
createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))
if err != nil {
return fmt.Errorf("检查知识库创建频率失败: %w", err)
}
if createCount >= 20 { // 24小时内最多创建20个知识库
return errorx.New(errno.ErrKnowledgeCreateRateLimitCode,
errorx.KV("user_id", userID),
errorx.KV("create_count", createCount))
}
// 检查嵌入模型可用性
modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)
if err != nil {
return fmt.Errorf("检查嵌入模型可用性失败: %w", err)
}
if !modelAvailable {
return errors.New("嵌入模型当前不可用")
}
// 检查用户存储配额
storageQuota, err := s.checkUserStorageQuota(ctx, userID)
if err != nil {
return fmt.Errorf("检查用户存储配额失败: %w", err)
}
if !storageQuota.CanCreateKnowledge {
return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode,
errorx.KV("user_id", userID),
errorx.KV("used_storage", storageQuota.UsedStorage),
errorx.KV("max_storage", storageQuota.MaxStorage))
}
// 检查向量数据库连接
vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)
if err != nil {
return fmt.Errorf("检查向量数据库健康状态失败: %w", err)
}
if !vectorDBHealthy {
return errors.New("向量数据库当前不可用,无法创建知识库")
}
// 检查文档处理服务状态
docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)
if err != nil {
return fmt.Errorf("检查文档处理服务状态失败: %w", err)
}
if !docProcessorHealthy {
return errors.New("文档处理服务当前不可用,无法创建知识库")
}
return nil
}
// 检查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {
// 检查模型是否在支持列表中
supportedModels := []string{
"text-embedding-ada-002",
"text-embedding-3-small",
"text-embedding-3-large",
"bge-large-zh-v1.5",
"bge-base-zh-v1.5",
}
for _, model := range supportedModels {
if model == modelName {
// 检查模型服务是否可用
return s.embeddingService.IsModelAvailable(ctx, modelName)
}
}
return false, nil
}
// 检查向量数据库健康状态
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {
// 发送健康检查请求到向量数据库
healthCheck := &VectorDBHealthCheck{
Timeout: 5 * time.Second,
}
healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)
if err != nil {
logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)
return false, nil
}
return healthy, nil
}
// 检查文档处理服务健康状态
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {
// 检查文档处理队列状态
queueStatus, err := s.documentService.GetQueueStatus(ctx)
if err != nil {
logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)
return false, nil
}
// 如果队列积压过多,认为服务不健康
if queueStatus.PendingJobs > 10000 {
logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)
return false, nil
}
return true, nil
}
// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {
// 查询用户所有插件的存储使用量
plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)
if err != nil {
return 0, fmt.Errorf("获取用户插件列表失败: %w", err)
}
var totalSize int64
for _, plugin := range plugins {
// 计算插件manifest和openapi_doc的存储大小
if plugin.Manifest != nil {
totalSize += int64(len(plugin.Manifest))
}
if plugin.OpenapiDoc != nil {
totalSize += int64(len(plugin.OpenapiDoc))
}
}
return totalSize, nil
}
// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {
// 根据用户等级返回不同的存储配额
// 这里简化处理,实际应该从用户配置中获取
return 100 * 1024 * 1024 // 100MB
}
// URL格式验证
func isValidURL(urlStr string) bool {
u, err := url.Parse(urlStr)
return err == nil && u.Scheme != "" && u.Host != ""
}
// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {
validTypes := []common.PluginType{
common.PluginTypeHTTP,
common.PluginTypeLocal,
}
for _, validType := range validTypes {
if pluginType == validType {
return true
}
}
return false
}
9. 知识库创建错误处理和日志记录
9.1 知识库创建分层错误处理机制
知识库创建错误分类体系:
// 知识库创建错误类型定义
type KnowledgeCreateErrorType int
const (
// 知识库创建业务错误
ErrKnowledgeCreateBusiness KnowledgeCreateErrorType = iota + 1000
ErrKnowledgeNameExists
ErrKnowledgePermissionDenied
ErrKnowledgeCreateRateLimit
ErrKnowledgeInvalidParameters
ErrKnowledgeEmbeddingModelNotSupported
ErrKnowledgeStorageQuotaExceeded
ErrKnowledgeDocumentProcessingFailed
ErrKnowledgeInvalidFileType
ErrKnowledgeFileSizeExceeded
ErrKnowledgeInvalidChunkSize
ErrKnowledgeInvalidIconURI
ErrKnowledgeInvalidSpaceID
ErrKnowledgeDuplicateName
ErrKnowledgeVectorSpaceCreateFailed
// 知识库创建系统错误
ErrKnowledgeCreateSystem KnowledgeCreateErrorType = iota + 2000
ErrKnowledgeDatabaseConnection
ErrKnowledgeElasticSearchTimeout
ErrKnowledgeServiceUnavailable
ErrKnowledgeCreateEventPublishFailed
ErrKnowledgeIndexCreateFailed
ErrKnowledgeTransactionRollbackFailed
ErrKnowledgeVectorStoreTimeout
ErrKnowledgeIDGenerationFailed
ErrKnowledgeEmbeddingServiceFailed
ErrKnowledgeContentIndexFailed
// 知识库创建网络错误
ErrKnowledgeCreateNetwork KnowledgeCreateErrorType = iota + 3000
ErrKnowledgeCreateRequestTimeout
ErrKnowledgeCreateConnectionRefused
ErrKnowledgeCreateServiceDown
ErrKnowledgeCreateESConnectionFailed
ErrKnowledgeVectorDBConnectionFailed
ErrKnowledgeEmbeddingAPITimeout
)
知识库创建错误处理流程:
- 捕获阶段:在知识库创建各层级捕获具体错误
- 包装阶段:添加知识库创建操作相关上下文信息和错误码
- 记录阶段:根据错误级别记录知识库创建操作日志
- 响应阶段:返回用户友好的知识库创建错误信息
- 回滚阶段:知识库创建失败时进行必要的数据回滚操作
- 向量处理:处理向量空间创建失败的错误
- 重试机制:对于可重试的创建错误提供重试建议
- 用户指导:为常见创建错误提供解决方案指导
9.2 知识库创建统一错误响应格式
// 知识库创建错误响应结构
type KnowledgeCreateErrorResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
TraceID string `json:"trace_id"`
KnowledgeID int64 `json:"knowledge_id,omitempty"`
Operation string `json:"operation"`
CanRetry bool `json:"can_retry"`
DocumentsProcessed int `json:"documents_processed,omitempty"`
DocumentsFailed int `json:"documents_failed,omitempty"`
ValidationErrors []string `json:"validation_errors,omitempty"`
SuggestedFix string `json:"suggested_fix,omitempty"`
FieldErrors map[string]string `json:"field_errors,omitempty"`
VectorSpaceStatus string `json:"vector_space_status,omitempty"`
EmbeddingModel string `json:"embedding_model,omitempty"`
}
// 知识库创建错误处理中间件
func KnowledgeCreateErrorHandlerMiddleware() app.HandlerFunc {
return func(c context.Context, ctx *app.RequestContext) {
defer func() {
if err := recover(); err != nil {
traceID := ctx.GetString("trace_id")
userID := ctx.GetInt64("user_id")
spaceID := ctx.GetInt64("space_id")
logs.CtxErrorf(c, "Knowledge base creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s",
err, userID, spaceID, traceID)
ctx.JSON(500, KnowledgeCreateErrorResponse{
Code: 5000,
Message: "知识库创建服务器内部错误",
TraceID: traceID,
Operation: "create_knowledge",
CanRetry: true,
SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",
})
}
}()
ctx.Next()
}
}
// 插件创建业务错误处理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {
traceID := ctx.GetString("trace_id")
var response PluginCreateErrorResponse
response.TraceID = traceID
response.Operation = "create_plugin"
switch {
case errors.Is(err, errno.ErrPluginInvalidParamCode):
response.Code = 400
response.Message = "插件参数无效"
response.CanRetry = false
response.SuggestedFix = "请检查插件名称、描述、服务器URL等参数是否正确"
case errors.Is(err, errno.ErrPluginPermissionCode):
response.Code = 403
response.Message = "无权限创建插件"
response.CanRetry = false
response.SuggestedFix = "请确保已登录且具有插件创建权限"
case errors.Is(err, errno.ErrPluginInvalidManifest):
response.Code = 400
response.Message = "插件清单格式无效"
response.CanRetry = false
response.SuggestedFix = "请检查插件清单文件格式是否符合规范"
case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):
response.Code = 400
response.Message = "OpenAPI文档格式无效"
response.CanRetry = false
response.SuggestedFix = "请检查OpenAPI文档格式是否符合OpenAPI 3.0规范"
case errors.Is(err, errno.ErrPluginIDExist):
response.Code = 409
response.Message = "插件ID已存在"
response.CanRetry = false
response.SuggestedFix = "请使用不同的插件名称或检查是否已存在同名插件"
case errors.Is(err, errno.ErrPluginCreateRateLimit):
response.Code = 429
response.Message = "创建操作过于频繁,请稍后再试"
response.CanRetry = true
response.SuggestedFix = "请等待一段时间后重试"
case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):
response.Code = 413
response.Message = "存储配额已满"
response.CanRetry = false
response.SuggestedFix = "请清理不需要的插件或升级存储配额"
case errors.Is(err, errno.ErrPluginServerURLNotAccessible):
response.Code = 400
response.Message = "插件服务器URL不可访问"
response.CanRetry = true
response.SuggestedFix = "请检查服务器URL是否正确且可访问"
default:
response.Code = 500
response.Message = "插件创建失败"
response.CanRetry = true
response.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"
}
ctx.JSON(response.Code, response)
}
// 插件创建系统错误处理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {
traceID := ctx.GetString("trace_id")
var response PluginCreateErrorResponse
response.TraceID = traceID
response.Operation = "create_plugin"
switch {
case errors.Is(err, errno.ErrPluginDatabaseConnection):
response.Code = 500
response.Message = "插件数据库连接失败"
response.CanRetry = true
response.SuggestedFix = "数据库连接异常,请稍后重试"
case errors.Is(err, errno.ErrPluginElasticSearchTimeout):
response.Code = 500
response.Message = "插件索引操作超时"
response.CanRetry = true
response.SuggestedFix = "搜索服务响应超时,请稍后重试"
case errors.Is(err, errno.ErrPluginServiceUnavailable):
response.Code = 503
response.Message = "插件创建服务暂时不可用"
response.CanRetry = true
response.SuggestedFix = "服务正在维护中,请稍后重试"
case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):
response.Code = 500
response.Message = "插件创建事件发布失败"
response.CanRetry = true
response.SuggestedFix = "事件发布异常,插件已创建但可能影响搜索,请稍后重试"
case errors.Is(err, errno.ErrPluginIndexCreateFailed):
response.Code = 500
response.Message = "插件索引创建失败"
response.CanRetry = true
response.SuggestedFix = "搜索索引创建失败,插件已创建但可能无法搜索到"
case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):
response.Code = 500
response.Message = "插件创建事务回滚失败"
response.CanRetry = false
response.SuggestedFix = "数据一致性异常,请联系技术支持"
case errors.Is(err, errno.ErrPluginIDGenerationFailed):
response.Code = 500
response.Message = "插件ID生成失败"
response.CanRetry = true
response.SuggestedFix = "ID生成服务异常,请稍后重试"
default:
response.Code = 5000
response.Message = "插件创建失败"
response.Details = "服务器内部错误,请稍后重试"
response.CanRetry = true
response.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"
}
ctx.JSON(response.Code, response)
}
9.3 知识库创建日志记录策略
知识库创建日志级别定义:
- DEBUG:知识库创建详细调试信息,包括参数值、向量处理过程、文档分块详情
- INFO:知识库创建关键业务流程信息,如创建开始、参数验证、数据插入、向量空间创建
- WARN:知识库创建潜在问题警告,如存储配额警告、文档处理警告、向量生成警告
- ERROR:知识库创建错误信息,包括创建失败、权限错误、向量空间创建失败
- FATAL:知识库创建严重错误,可能导致数据不一致或向量空间损坏
知识库创建结构化日志格式:
// 知识库创建日志记录示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {
traceID := generateTraceID()
ctx = context.WithValue(ctx, "trace_id", traceID)
userID := ctxutil.GetUIDFromCtx(ctx)
// 记录知识库创建开始
logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s",
userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s",
duration.Milliseconds(), traceID)
}()
// 记录关键步骤
logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s",
req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)
// 权限验证日志
logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s",
userID, req.GetSpaceID(), traceID)
// 存储配额检查日志
logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)
// 向量空间创建日志
logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s",
req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)
// 数据库创建操作日志
logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s",
req.GetName(), traceID)
// ElasticSearch索引创建日志
logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s",
knowledgeID, traceID)
// 事件发布日志
logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s",
knowledgeID, traceID)
return resp, nil
}
// 知识库创建操作审计日志
func (s *KnowledgeApplicationService) logKnowledgeCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {
userID := ctx.Value("user_id").(int64)
spaceID := ctx.Value("space_id").(int64)
traceID := ctx.Value("trace_id").(string)
auditLog := map[string]interface{}{
"operation": operation,
"knowledge_id": knowledgeID,
"user_id": userID,
"space_id": spaceID,
"trace_id": traceID,
"timestamp": time.Now().Unix(),
"details": details,
"knowledge_name": details["knowledge_name"],
"embedding_model": details["embedding_model"],
"chunk_size": details["chunk_size"],
"chunk_overlap": details["chunk_overlap"],
"vector_space_id": details["vector_space_id"],
"storage_used": details["storage_used"],
}
logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}
// 文档处理日志记录
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {
traceID := ctx.Value("trace_id").(string)
docLog := map[string]interface{}{
"operation": operation,
"knowledge_id": knowledgeID,
"document_id": documentID,
"trace_id": traceID,
"timestamp": time.Now().Unix(),
"details": details,
"file_name": details["file_name"],
"file_size": details["file_size"],
"chunk_count": details["chunk_count"],
"vector_count": details["vector_count"],
"processing_time": details["processing_time"],
}
logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}
// 向量空间操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {
traceID := ctx.Value("trace_id").(string)
vectorLog := map[string]interface{}{
"operation": operation,
"vector_space_id": vectorSpaceID,
"trace_id": traceID,
"timestamp": time.Now().Unix(),
"details": details,
"embedding_model": details["embedding_model"],
"dimensions": details["dimensions"],
"vector_count": details["vector_count"],
"index_type": details["index_type"],
}
logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}
知识库创建日志内容规范:
- 请求日志:记录用户ID、工作空间ID、知识库名称、嵌入模型、分块策略、TraceID
- 业务日志:记录知识库创建步骤、参数验证结果、权限验证结果、向量空间创建过程
- 性能日志:记录创建接口响应时间、数据库插入时间、向量空间创建时间、文档处理时间
- 错误日志:记录创建错误堆栈、知识库相关上下文信息、向量处理失败原因
- 审计日志:记录知识库的创建操作、创建参数、创建结果、关联的文档和向量信息
- 安全日志:记录创建频率、权限验证、存储配额检查、可疑创建行为
- 文档处理日志:记录文档上传、分块处理、向量生成、索引创建等详细过程
- 向量空间日志:记录向量空间创建、配置、索引构建、查询性能等信息
9.4 知识库创建监控和告警
知识库创建关键指标监控:
- 创建性能:知识库创建响应时间、创建成功率、创建QPS、创建吞吐量
- 资源使用:数据库连接数、向量空间创建延迟、内存使用率、文档处理队列长度
- 业务指标:知识库创建成功率、创建频率分布、不同嵌入模型使用比例、用户创建活跃度
- 安全指标:权限验证通过率、恶意创建尝试次数、创建频率限制触发次数、存储配额检查失败率
- 质量指标:向量空间创建成功率、文档处理成功率、嵌入模型响应率、索引创建成功率
- 存储指标:存储使用量、向量数量、文档数量、索引大小、存储增长率
- 向量处理指标:向量生成延迟、向量维度分布、嵌入模型调用次数、向量相似度计算性能
知识库创建告警策略:
- 创建失败率告警:当知识库创建失败率超过3%时触发告警
- 性能告警:当知识库创建响应时间超过10秒时触发告警
- 资源告警:当数据库连接数超过80%或向量数据库连接异常时触发告警
- 安全告警:当检测到异常创建行为或存储配额滥用时立即触发告警
- 数据一致性告警:当MySQL、ES和向量数据库创建状态不一致时触发告警
- 配额告警:当用户存储使用量超过90%时触发告警
- 向量服务告警:当嵌入模型服务不可用或响应超时时触发告警
- 文档处理告警:当文档处理队列积压超过阈值时触发告警
// 知识库创建监控指标收集
type KnowledgeCreateMetrics struct {
CreateSuccessCount int64 // 创建成功次数
CreateFailureCount int64 // 创建失败次数
CreateLatency time.Duration // 创建延迟
PermissionDeniedCount int64 // 权限拒绝次数
RateLimitCount int64 // 频率限制次数
ParameterValidationFailCount int64 // 参数验证失败次数
VectorSpaceCreateLatency time.Duration // 向量空间创建延迟
VectorSpaceCreateFailCount int64 // 向量空间创建失败次数
DocumentProcessingLatency time.Duration // 文档处理延迟
EmbeddingGenerationLatency time.Duration // 嵌入生成延迟
EmbeddingModelFailCount int64 // 嵌入模型调用失败次数
StorageQuotaExceededCount int64 // 存储配额超限次数
IndexCreateLatency time.Duration // 索引创建延迟
IndexCreateFailCount int64 // 索引创建失败次数
EventPublishLatency time.Duration // 事件发布延迟
DatabaseInsertLatency time.Duration // 数据库插入延迟
VectorDatabaseLatency time.Duration // 向量数据库操作延迟
TotalStorageUsed int64 // 总存储使用量
TotalVectorCount int64 // 总向量数量
TotalDocumentCount int64 // 总文档数量
}
// 知识库创建监控指标上报
func (s *KnowledgeApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, knowledgeID int64, req *knowledgeAPI.CreateDatasetRequest, err error) {
latency := time.Since(startTime)
if err != nil {
metrics.CreateFailureCount++
// 根据错误类型分类统计
switch {
case errors.Is(err, errno.ErrKnowledgePermissionCode):
metrics.PermissionDeniedCount++
case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):
metrics.RateLimitCount++
case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):
metrics.ParameterValidationFailCount++
case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):
metrics.StorageQuotaExceededCount++
case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):
metrics.VectorSpaceCreateFailCount++
case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):
metrics.EmbeddingModelFailCount++
case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):
metrics.IndexCreateFailCount++
}
logs.CtxErrorf(ctx, "Knowledge %s failed, knowledgeName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms",
operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())
} else {
metrics.CreateSuccessCount++
metrics.CreateLatency = latency
// 记录知识库类型统计
embeddingModel := req.GetEmbeddingModel()
chunkSize := req.GetChunkSize()
logs.CtxInfof(ctx, "Knowledge %s succeeded, knowledgeID=%d, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms",
operation, knowledgeID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())
}
// 上报到监控系统
s.metricsReporter.Report(ctx, "knowledge_create", map[string]interface{}{
"operation": operation,
"knowledge_id": knowledgeID,
"knowledge_name": req.GetName(),
"embedding_model": req.GetEmbeddingModel(),
"chunk_size": req.GetChunkSize(),
"chunk_overlap": req.GetChunkOverlap(),
"space_id": req.GetSpaceID(),
"success": err == nil,
"latency_ms": latency.Milliseconds(),
"error_type": getKnowledgeCreateErrorType(err),
"vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),
"storage_used": getStorageUsed(ctx, req.GetSpaceID()),
})
}
// 获取知识库创建错误类型
func getKnowledgeCreateErrorType(err error) string {
if err == nil {
return "none"
}
// 基于知识库错误码定义
switch {
case errors.Is(err, errno.ErrKnowledgePermissionCode):
return "permission_denied"
case errors.Is(err, errno.ErrKnowledgeNameExistsCode):
return "knowledge_exists"
case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):
return "invalid_parameters"
case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):
return "storage_quota_exceeded"
case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):
return "vector_space_create_failed"
case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):
return "embedding_model_failed"
case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):
return "index_create_failed"
case errors.Is(err, errno.ErrKnowledgeDocumentProcessingFailedCode):
return "document_processing_failed"
case errors.Is(err, errno.ErrKnowledgeVectorDatabaseTimeoutCode):
return "vector_database_timeout"
case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):
return "rate_limit_exceeded"
default:
return "system_error"
}
}
// 知识库创建告警检查
func (s *KnowledgeApplicationService) checkCreateAlerts(ctx context.Context, metrics *KnowledgeCreateMetrics) {
// 创建失败率告警
totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCount
if totalCreates > 100 {
failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)
if failureRate > 0.03 { // 3%
s.alertManager.SendAlert(ctx, &Alert{
Level: "warning",
Type: "knowledge_create_failure_rate",
Message: fmt.Sprintf("知识库创建失败率过高: %.2f%%", failureRate*100),
Metrics: map[string]interface{}{
"failure_rate": failureRate,
"total_creates": totalCreates,
},
})
}
}
// 性能告警
if metrics.CreateLatency > 10*time.Second {
s.alertManager.SendAlert(ctx, &Alert{
Level: "warning",
Type: "knowledge_create_latency",
Message: fmt.Sprintf("知识库创建延迟过高: %dms", metrics.CreateLatency.Milliseconds()),
Metrics: map[string]interface{}{
"latency_ms": metrics.CreateLatency.Milliseconds(),
},
})
}
// 存储配额告警
if metrics.StorageQuotaExceededCount > 10 {
s.alertManager.SendAlert(ctx, &Alert{
Level: "critical",
Type: "knowledge_storage_quota_exceeded",
Message: fmt.Sprintf("存储配额超限次数过多: %d", metrics.StorageQuotaExceededCount),
Metrics: map[string]interface{}{
"quota_exceeded_count": metrics.StorageQuotaExceededCount,
},
})
}
// 向量空间创建失败告警
if metrics.VectorSpaceCreateFailCount > 5 {
s.alertManager.SendAlert(ctx, &Alert{
Level: "critical",
Type: "knowledge_vector_space_create_failed",
Message: fmt.Sprintf("向量空间创建失败次数过多: %d", metrics.VectorSpaceCreateFailCount),
Metrics: map[string]interface{}{
"vector_space_fail_count": metrics.VectorSpaceCreateFailCount,
},
})
}
// 嵌入模型失败告警
if metrics.EmbeddingModelFailCount > 20 {
s.alertManager.SendAlert(ctx, &Alert{
Level: "warning",
Type: "knowledge_embedding_model_failed",
Message: fmt.Sprintf("嵌入模型调用失败次数过多: %d", metrics.EmbeddingModelFailCount),
Metrics: map[string]interface{}{
"embedding_fail_count": metrics.EmbeddingModelFailCount,
},
})
}
}