分词和同义词

发布于:2025-07-16 ⋅ 阅读:(20) ⋅ 点赞:(0)

一、简介

自定义分词库和同义词库是弥合 “标准通用语言” 与 **“特定领域语言”**之间语义鸿沟的关键桥梁。其根本目的是让计算机能像领域专家一样,更精准地理解和处理特定场景下的文本,从而提升搜索、推荐、分析等应用的智能化水平
在这里插入图片描述

1.1 分词库 (Custom Segmentation Dictionary)

核心目的:确保特定词语的完整性,避免被错误切分,保障语义的最小单元正确。

如果一个专有名词被错误地切分开,它所代表的完整含义就会丢失,后续的所有分析都将基于错误的基础。

具体应用场景与目的:

  1. 识别专有名词,保护领域知识完整性
    1. 企业/产品/品牌名:
      1. 例: “蓝鲸智云”、“织云平台”、“特斯拉Model Y”
      2. 目的: 如果不加干预,“蓝鲸智云”可能会被切成 蓝鲸 / 智云,失去了品牌专属性。在做舆情分析、竞品分析时,无法准确捕捉到对特定产品的讨论。
    2. 行业术语/私域黑话:
      1. 例: 电商行业的“动销率”、“客单价”;金融行业的“量化对冲”、“滚动收益”;游戏行业的“开黑”、“Gank”。
      2. 目的: 这些是行业内沟通的基础。错误切分(如 量化 / 对冲)会导致专业文档检索失败,也无法对行业报告、用户评论进行有效的文本挖掘。
    3. 人名/地名/组织机构名:
      1. 例: “张三丰”、“朝阳区群众”、“数据智能部”。
      2. 目的: 保证实体识别(NER)的准确性。例如,在分析客服聊天记录时,需要准确识别出是哪个部门或哪位同事被提及。
  2. 收录网络新词与热词,跟上时代语言变化
    1. 例: “元宇宙”、“打工人”、“YYDS”、“绝绝子”、“破防了”。
    2. 目的: 新词热词的出现速度远快于通用词库的更新速度。及时收录这些词,才能准确理解用户在社交媒体、评论区的情绪和讨论焦点,做好舆情监控和用户画像。
  3. 修正原生分词的常见错误
    1. 例: “上海市长江大桥”
      1. 错误切分: 上海 / 市长 / 江大桥
      2. 正确切分: 上海市 / 长江大桥
    2. 目的: 提升基础分词的准确率。很多有歧义的词组组合,可以通过自定义词典强制其正确切分,这是提升整体NLP任务效果的基础。

1.2 同义词库 (Custom Synonym Dictionary)

核心目的:将不同说法但含义相同的词语关联起来,统一语义表达,扩大召回并简化分析。

用户在表达同一个意思时,会使用不同的词语。同义词库就是为了抹平这种表达差异,让机器知道“A”和“B”其实在说同一件事。

具体应用场景与目的:

  1. 提升搜索召回率与用户体验
    1. 例: 电脑 = 计算机 = PC;笔记本 = 手提电脑;土豆 = 马铃薯 = 洋芋。
    2. 目的: 当用户搜索“笔记本”时,系统也能将包含“手提电脑”的商品或文章返回给用户,避免因用词不同而搜不到结果,极大提升了搜索的全面性(召回率)。
  2. 统一公司内部术语与简称
    1. 例: 人工智能 = AI;北京大学 = 北大;用户增长部 = 增长部。
    2. 目的: 在知识库或内部系统中,员工可能使用简称进行搜索。通过同义词库,可以确保无论输入全称还是简称,都能定位到同一份文档或数据。
  3. 兼容用户输入错误与习惯
    1. 例: 罗技 = 罗计(错别字);帐号 = 账号(异形词)。
    2. 目的: 增强系统的鲁棒性,即使用户输入了常见的错别字或使用了不同的书写习惯,系统依然能正确理解其意图。
  4. 统一数据分析与统计口径
    1. 例: 在客服质检场景中,bug = 缺陷 = 问题 = 故障。
    2. 目的: 在进行工单分类、问题根因分析时,可以将这些同义词映射为同一个标签(如“产品缺陷”)。这样,在统计“产品缺陷”类问题的数量时,不会因为用词不同而产生遗漏,保证了数据报表的准确性。

1.3 前置知识

1.3.1 基于词典的分词方法

这是最主流和有效的分词范式。其核心思想是:将一个句子与一个足够大的词典进行匹配,切分出所有在词典中存在的词语。因此,词典的质量和覆盖度直接决定了分词效果的上限。

1.3.2 Trie树

  • 什么是Trie树? 它是一种树形结构,每个节点代表一个字符,从根节点到某个节点的路径就构成一个词。
  • 为什么选择Trie树?
    • 极速前缀查询: 它能以 O(L) 的时间复杂度(L为待查字符串长度)判断一个字符串是否存在于词典中或是否为某个词的前缀。这是实现高效分词算法(如最大匹配法)的基础。
    • 空间高效: 共享公共前缀的词语可以节省存储空间。

1.3.3 词典的构建与管理

词典文件通常采用简单的文本格式,每一行代表一个词条,可包含以下信息:

词语 词频 词性 (以空格或制表符分隔)

# dict.txt 示例
蓝鲸智云 1000 a_brand
智能运维 800 n
解决方案 1200 n
  • 加载与缓存: 首次启动时,程序会读取文本词典来构建Trie树。这个过程可能耗时较长。因此,一个关键的优化是:在首次构建后,将内存中的Trie树 序列化 为一个二进制文件(例如 .trie 缓存)。后续启动时,直接从该二进制文件加载,速度会提升几个数量级。
  • 词频对数化: 直接使用原始词频可能导致高频词与低频词权重差距过大。通常会对其进行 对数平滑 处理,使其分布更均匀,便于后续的评分计算。

二、分词

一个完整的分词流程包含 预处理、核心切分、歧义解决 等步骤。

2.1 文本预处理:标准化的第一步

为了消除格式差异,在分词前必须对输入文本进行标准化:

  • 大小写转换: 将所有英文字母统一转为小写。
  • 全角/半角转换: 将全角字符(如 ,、A)转换为半角(如 ,、A)。
  • 繁简转换: 将繁体中文统一转换为简体。

2.2 基础算法:双向最大匹配法 (Bi-directional Maximum Matching)

这是基于词典的最经典的分词算法。

  • 正向最大匹配 (Forward Maximum Matching, FMM): 从左到右扫描句子,每次都试图匹配出当前位置开始的最长词语。

  • 反向最大匹配 (Backward Maximum Matching, BMM): 从右到左扫描句子,每次都试图匹配出当前位置结束的最长词语。
    示例: 待分词句为 “北京大学城”,词典中有 “北京”、“大学”、“北京大学”、“大学城”。

  • FMM 结果: 北京大学 / 城

  • BMM 结果: 北京 / 大学城
    当 FMM 和 BMM 的结果不一致时,就产生了 分词歧义

3. 解决分词歧义:全路径搜索与评分机制

这是分词器最核心、最能体现其“智能”的部分。当检测到歧义时,我们需要一套更复杂的机制来决定哪种切分方式最优。

  • 步骤一:识别歧义片段
    通过比较双向最大匹配的结果,可以定位出产生分歧的文本片段(如上例中的 “北京大学城”)。
  • 步骤二:搜索所有可能的分词路径
    针对该歧义片段,使用 深度优先搜索(DFS)或类似算法,并结合Trie树,找出所有可能的分词组合。
    • 对于 “北京大学城”,可能的路径有:
      • [北京, 大学, 城]
      • [北京大学, 城]
      • [北京, 大学城]
  • 步骤三:设计评分函数,选择最佳路径
    为每条分词路径计算一个综合得分,选择得分最高的作为最终结果。一个好的评分函数通常会考虑以下因素:
    • 平均词频: 由更常见(词频更高)的词组成的路径得分更高。
    • 词语长度: 倾向于由更长的词组成的路径(例如,选择 北京大学 而不是 北京 和 大学)。这被称为“最少分词数”原则。
    • 词语数量惩罚: 对切分得过于零碎的路径进行适当降权,以避免无意义的短词组合。
      通过这套机制,分词器能够像人一样,基于统计信息和语言学规则,在多种可能性中做出最合理的选择。

三、同义词

3.1 核心价值:同义词为何如此重要?

在搜索引擎或问答系统中,同义词的核心价值在于弥合用户表达与文档表达之间的“词汇鸿沟”。用户的自然语言表达往往存在多样性,而数据源(如文档、文章、知识库)中的措辞可能与用户用词存在差异,常见的原因包括:

  • 同义异形:用户搜索“电脑”,文档中可能使用“计算机”。
  • 中英文混杂:用户输入“人工智能”,文档使用“AI”。
  • 俗称与学名:用户查询“土豆”,文档为“马铃薯”。
  • 概念别称:一个产品或概念存在多个说法,如“自动驾驶”与“无人驾驶”。
  • 缩写与全称:如“PM2.5”与“可吸入颗粒物”。
    如果不引入同义词扩展,系统仅基于字面匹配,极易导致召回率(Recall)不足,无法覆盖潜在相关的结果。同义词体系可以显著提升检索效果,扩大召回范围的同时兼顾精准度

3.2 同义词应策略

为了充分发挥同义词的价值,需要采用了一套分层、加权的策略。总体分成以下三个阶段:

3.2.1 意图切分——从用户问题到核心概念

接收到用户原始问题后,首先进行意图切分,提取最具价值的核心概念。

  • 文本预处理
    • 中英文分隔:中英文之间添加空格,便于后续 token 处理。
    • 标点清理:对特殊字符和标点进行统一处理,如 [ :|\r\n\t,,。??/!!&^%%(){}<>]+` 统统替换为空格。
    • 弱意图词过滤:移除常见疑问修饰词,例如“请问、什么样的、哪家、how、what、do、please”等,以聚焦核心主题。
  • 核心概念提取
    • 基于空白字符切分词语。
    • 对连续英文(非专有名词)进行组合。例如:
用户输入:“我想了解 iPhone 15 Pro 的信息”
核心概念提取结果:“iPhone 15 Pro”
*  **推荐权重**:核心概念本身权重设为 **5**,在整体计算时保持核心主导地位。

3.2.2 分层查找—— 区分主次,精细召回

对核心概念执行分层查找,根据同义词的重要性区分主次:

  • 第一层:概念级同义词
    • 定义:对完整核心概念(如 "iPhone 15 Pro")寻找整体替代说法。
    • 查找方式:lookup(“iPhone 15 Pro”) → 返回 [“苹果15 Pro”, “苹果15Pro”]。
    • 应用目的:整体替换,最大程度捕获用户原始意图的等价表述。
    • 推荐权重0.7
  • 第二层:关键词级同义词
    • 定义:将核心概念拆分成关键词(weights 方法内部完成),对每个词寻找同义词。
    • 查找方式
lookup("iPhone") → ["苹果手机"]
lookup("Pro") → ["专业版"]
  • 应用目的:在粒度更细的层面提供召回补充,对文档中存在部分概念匹配时提升召回率。
  • 推荐权重0.2

3.2.3 多维度加权 —— 构建最终召回得分

在最终召回得分计算中,除了同义词层次的基础权重(核心概念、概念同义词、关键词同义词)外,还引入以下补充权重:

  • NER(命名实体识别)权重
    • 作用:不同实体类别对召回重要性有不同影响,NER 体系赋予差异化加权。
  • 词性(POSTag)权重
    • 动词、副词、代词等对召回效果贡献较低,设置权重衰减(如 r, c, d → 0.3)。
    • 实词如名词、地名保持较高权重(如 ns, nt, n → 2-3)。
  • 词频/逆文档频率(TF/IDF)加权
    • 高频词影响力下调,低频但重要词上调。

3.3 综合得分计算公式(示例)

一个词的最终召回得分可简单描述为:

Final_Score = Base_Weight * NER_Weight * POSTag_Weight * IDF_Score

其中:

  • Base_Weight 来自不同层次(核心词 5,概念级 0.7,关键词级 0.2)
  • NER_Weight, POSTag_Weight 来自词性表和实体类别表
  • IDF_Score 在函数中动态计算

四、附录

4.1 需要管理的文件

  • 分词库
    • huqie.txt (线下维护,文件形式存在代码库中)
    • 公司级别 huqie-qifu (线上维护,超管可修改)
    • 用户自定义分词库,整个系统共用一份(空间用户可修改)
  • ner (命名实体)
    • ragflow内置的ner.json (线下维护,文件形式存在代码库中)
    • 用户自定义命名实体,整个系统共用一份(空间用户可修改)
  • synon (同义词)
    • ragflow内置的同义词 (丢弃
    • 用户自定义同义词(空间用户可修改)
  • term.freq(术语 频率 )(感觉可以删除掉,跟分词表的词频有点重复
    • 用户自定义术语(空间用户可修改)

五、附录

4.1 分词表格式

ragflow分词表目前用的是huqie,下载地址:https://huggingface.co/InfiniFlow/huqie/tree/main

词频越高,说明这个词越普通,得分越低。

文件格式:分词 词频 词性

金童云商 3 nr
青禾服装 3 nr
救济灾民 3 l
左移 17 nr
低速 176 d
雨果网 3 nr
钢小二 3 nr

词性表:

类型 词性 权重 备注
功能词 r 0.3 代词(pronoun)如:我、你、他
c 连词(conjunction)如:和、但是、而且
d 副词(adverb)如:很、非常、不
名词 n 2 普通名词(general noun)如:电脑、机器、算法
nt 3 机构名(noun of organization)
ns 地名(noun of location)
数字 [0-9-]+ 2 数字形式(或者编号),视为有信息量的实体。
其它 1 动词、形容词或者其它未知词性

词频对应idf得分分布

这里f值范围是1-1000000。第一列f为词库中的原始词频的最小值,实际区间为 [f, next_f)

f(原始词频) F(词频离散值/分桶) df(分桶后的词频) idf(逆文档频率分数)
1 -13 2 6.6021
2 -12 6 6.1871
4 -11 17 5.757
11 -10 45 5.342
28 -9 123 4.9084
75 -8 335 4.4744
204 -7 912 4.0401
554 -6 2479 3.6066
1504 -5 6738 3.1741
4087 -4 18316 2.7443
11109 -3 49787 2.3219
30198 -2 135335 1.9185
82085 -1 367879 1.5585
223131 0 1000000 1.2788

4.2 命名实体识别格式(NER)

Named Entity Recognition,也可以称为专有名词。以下为ragflow中定义的格式。

文件格式:

{
  "上官": "firstnm",
  "山子股份": "stock",
  "傻逼": "toxic"
}

实体类型:

类型 权重 含义
toxic 2 敏感/风险词
func 1 功能词 / 语法词 如“的”、“是”、“怎么”
corp 3 公司或组织名
loca 3 地点/地名
sch 3 学校或教育机构
stock 3 股票或证券名称
firstnm 1 人名首字母或常见姓名

4.3 同义词格式

同义词当前为JSON格式存储,key为用户输入问题中包含的词,值为该词的同义词表

{
  "骑士乳业": "832786",
  "832786": "骑士乳业",
  "官微": ["公众号", "官方号", "官方微信"]
}

4.4 IDF 公式(标准形式)

IDF(Inverse Document Frequency,逆文档频率)是衡量一个词在整个语料库中“稀有程度”的指标,核心思想是:出现频率越低的词越有区分度,应该赋予更高的权重

IDF(w) = log ( N / (1 + DF(w)) )

  • N:语料库中的总文档数

  • DF(w):包含词 w 的文档数(Document Frequency)

  • 1 + DF(w):+1 是平滑处理,防止除以零
    简化解释:

  • 出现得多(常见词,如“的”,“是”),DF 大 → IDF 小

  • 出现得少(专业词,如“iPhone 15 Pro”,“ChatGPT”),DF 小 → IDF 大
    ES6.x之前默认用的是TF-IDF,7.x及之后默认使用了BM25算法,核心思想仍然包含IDF概念,会额外关注词频(某个 term t 在文档 d 中出现的次数)和当前文档d的长度、索引中所有文档的平均长度的影响。

RAGFlow的代码实现中,IDF公式修改成了:

  • idf1(权重0.3):math.log10(10 + ((N - freq(t) + 0.5) / (freq(t) + 0.5)))
    • t为query分词后的token,N固定为10000000
    • freq(t):
      • 如果t为2位及以上的数字,则为3
      • 如果是词库的词,则计算得分:int(math.exp(F) * 1000000 + 0.5),这是离散化设计的精度损失,本质上是对词频做了分 bin,词频只要落在同一个 log 区间,最终 score 就一样。
      • 如果t是字符.空格,则是300
      • 对t继续分词,取最小得分/6,最少为30。
  • idf2(权重0.7):math.log10(10 + ((N - DF(t) + 0.5) / (DF(t) + 0.5)))
    • t为query分词后的token,N固定为1000000000
    • df(t):
      • [0-9. -]{2,},则为5
      • 如果在term.freq(rag/res/)文件中词,取它的词频 + 3
      • [a-z. -]+,则为300
      • 继续分词,取min(df(sub_t)) / 6,最小值为3
      • 其它情况返回3

4.5 分词实现

"""
pip install datrie nltk hanziconv -i https://mirrors.aliyun.com/pypi/simple
Windows平台还需要去安装一个wheel:https://github.com/liuzhenghua/datrie/releases/tag/v202503170859
"""
import logging
import copy
import datrie
import math
import os
import re
import string
from collections import namedtuple
from hanziconv import HanziConv
from nltk import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] - %(message)s')
# 定义词典条目结构,方便访问
DictEntry = namedtuple('DictEntry', ['word', 'frequency', 'pos_tag'])
# 定义分词路径中的一个节点
TokenNode = namedtuple('TokenNode', ['token', 'score', 'pos_tag'])

def is_chinese(char):
    """判断一个字符是否为中文字符"""
    return '\u4e00' <= char <= '\u9fa5'

class CustomTokenizer:
    """
    一个高性能、可定制的中英混合文本分词器。
    该分词器使用Trie树存储词典,并通过结合多种分词策略来解决歧义,
    特别适合需要处理领域专有词汇的场景。
    """
    # 评分模型中的常数,用于惩罚切分过碎的结果
    PATH_LENGTH_BIAS = 30
    # 词频计算的基数分母,用于对数平滑
    FREQUENCY_DENOMINATOR = 1000000
    # 用于切分中英文/数字混合文本的正则表达式
    SPLIT_PATTERN = re.compile(r"([a-zA-Z0-9\._+-]+)")
    def __init__(self, dict_path: str, debug=False):
        """
        初始化分词器。
        Args:
            dict_path (str): 主词典文件的路径 (例如 'data/dict.txt')。
        """
        self.dict_path = dict_path
        self.trie = None
        self.DEBUG = debug
        self.SPLIT_CHAR = r"([ ,\.<>/?;:'\[\]\\`!@#$%^&*\(\)\{\}\|_+=《》,。?、;‘’:“”【】~!¥%……()——-]+|[a-zA-Z0-9,\.-]+)"
        self.stemmer = PorterStemmer()
        self.lemmatizer = WordNetLemmatizer()
        self._initialize()
    def _initialize(self):
        """
        核心初始化逻辑:加载词典并构建Trie树。
        优先从缓存加载,若缓存不存在或加载失败,则从原始文本文件构建。
        """
        triecache_path = self.dict_path + ".trie"
        # 尝试从缓存文件加载Trie树
        if os.path.exists(triecache_path):
            try:
                logging.info(f"正在从缓存文件加载Trie树: {triecache_path}")
                self.trie = datrie.Trie.load(triecache_path)
                logging.info("Trie树加载成功。")
                return
            except Exception as e:
                logging.warning(f"从缓存加载Trie树失败: {e}。将从原始词典重新构建。")
        # 如果缓存加载失败或不存在,则从头构建
        logging.info("未找到Trie缓存或加载失败,开始从原始词典文件构建...")
        self.trie = datrie.Trie(string.printable)
        self._load_dictionary(self.dict_path)
    def _get_key(self, word: str) -> str:
        """将词语转换为Trie树中存储的键(小写UTF-8),encode再转str是为了兼容中文字符,b''需要去除。"""
        return str(word.lower().encode("utf-8"))[2:-1]
    def _get_rkey(self, word: str) -> str:
        """存入反向key"""
        return str(("DD" + word[::-1].lower()).encode("utf-8"))[2:-1]
    def _load_dictionary(self, file_path: str):
        """
        从文本文件加载词典,构建或更新Trie树,并创建缓存。
        """
        logging.info(f"开始从词典文件构建Trie树: {file_path}")
        try:
            with open(file_path, "r", encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    parts = re.split(r"[ \t]", line)
                    word, freq, pos_tag = parts[0], int(parts[1]), parts[2]
                    # 对词频进行对数平滑处理,避免悬殊过大
                    log_freq = int(math.log(max(1, freq) / self.FREQUENCY_DENOMINATOR) + 0.5)
                    key = self._get_key(word)
                    rkey = self._get_rkey(word)
                    # 仅在当前词不存在或新词频率更高时更新
                    if key not in self.trie or self.trie[key][0] < log_freq:
                        self.trie[key] = (log_freq, pos_tag)
                        self.trie[rkey] = 1
            # 构建完成后,保存Trie树到缓存文件
            triecache_path = file_path + ".trie"
            logging.info(f"Trie树构建完成,正在保存缓存至: {triecache_path}")
            self.trie.save(triecache_path)
        except Exception as e:
            logging.error(f"构建Trie树失败: {file_path}", exc_info=e)
    def add_dictionary(self, file_path: str):
        """
        动态添加用户自定义词典。新词会补充到现有Trie树中。
        """
        self._load_dictionary(file_path)
    def _preprocess(self, text: str) -> str:
        """对输入文本进行一系列标准化预处理。"""
        text = re.sub(r"\W+", " ", text)  # 非字母数字替换为空格
        # 1. 全角转半角
        r_string = ""
        for char in text:
            inside_code = ord(char)
            if inside_code == 0x3000:
                inside_code = 0x0020
            else:
                inside_code -= 0xfee0
            if not (0x0020 <= inside_code <= 0x7e):
                r_string += char
            else:
                r_string += chr(inside_code)
        text = r_string
        # 2. 转换为小写
        text = text.lower()
        # 3. 繁简转换
        text = HanziConv.toSimplified(text)
        return text
    def _score_path(self, path: list[TokenNode]) -> tuple[list[str], float]:
        """
        为一条分词路径计算综合得分。
        得分越高,代表该分词路径越优。
        评分策略:
        1. 平均词频得分 (avg_freq_score): 路径中所有词的平均(对数)词频。
        2. 词长分布得分 (long_word_ratio): 路径中长词(长度>1)的比例,鼓励长词。
        3. 路径长度惩罚 (length_penalty): 对切分过于零碎(词数太多)的路径进行惩罚。
        """
        if not path:
            return [], -1.0
        num_tokens = len(path)
        total_freq_score = sum(node.score for node in path)
        num_long_words = sum(1 for node in path if len(node.token) > 1)
        avg_freq_score = total_freq_score / num_tokens
        long_word_ratio = num_long_words / num_tokens
        length_penalty = self.PATH_LENGTH_BIAS / num_tokens
        # 最终得分为三者加权(此处为简单相加)
        final_score = avg_freq_score + long_word_ratio + length_penalty
        tokens = [node.token for node in path]
        logging.debug(f"路径: {tokens}, 最终得分: {final_score:.2f} (AvgFreq: {avg_freq_score:.2f}, "
                      f"LongWordRatio: {long_word_ratio:.2f}, Penalty: {length_penalty:.2f})")
        return tokens, final_score
    def score_(self, token_freq_tags: list[tuple[str, tuple[int, str]]]) -> tuple[list[str], float]:
        """
        计算分词得分,返回分词列表和得分。
        得分公式:score = B / num_tokens + long_token_ratio + total_frequency
        """
        base_score = 30
        total_frequency = 0
        long_token_count = 0
        tokens = []
        for token, (freq, tag) in token_freq_tags:
            total_frequency += freq
            if len(token) >= 2:
                long_token_count += 1
            tokens.append(token)
        num_tokens = len(tokens)
        long_token_ratio = long_token_count / num_tokens if num_tokens > 0 else 0
        score = (base_score / num_tokens) + long_token_ratio + total_frequency if num_tokens > 0 else 0
        logging.debug(
            f"[SC] tokens={tokens}, count={num_tokens}, long_ratio={long_token_ratio:.4f}, freq_sum={total_frequency}, score={score:.4f}")
        return tokens, score
    def max_forward(self, text):
        """
        使用最大正向匹配算法对 text 分词,返回最佳切分及对应得分。
        """
        segments = []
        start = 0
        while start < len(text):
            end = start + 1
            current_substr = text[start:end]
            # 尽可能向右扩展子串,直到不再匹配 trie 前缀
            while end < len(text) and self.trie.has_keys_with_prefix(self._get_key(current_substr)):
                end += 1
                current_substr = text[start:end]
            # 回退:如果最后一个子串不在词典里,逐步缩短
            while end - 1 > start and self._get_key(current_substr) not in self.trie:
                end -= 1
                current_substr = text[start:end]
            key = self._get_key(current_substr)
            if key in self.trie:
                score_info = self.trie[key]
            else:
                score_info = (0, '')  # 默认分数为 0,标签为空
            segments.append((current_substr, score_info))
            start = end  # 移动起始指针
        # 根据 segments 计算最终得分和分词序列
        return self.score_(segments)
    def _max_backward(self, text: str) -> tuple[list[str], float]:
        """
        使用最大向后匹配进行分词,返回分词结果及其得分。
        """
        token_freq_list = []
        start_idx = len(text) - 1
        while start_idx >= 0:
            end_idx = start_idx + 1
            substring = text[start_idx:end_idx]
            # 尝试向前扩展匹配
            while start_idx > 0 and self.trie.has_keys_with_prefix(self._get_rkey(substring)):
                start_idx -= 1
                substring = text[start_idx:end_idx]
            # 如果无法匹配完整词,逐步缩短直到找到匹配
            while start_idx + 1 < end_idx and self._get_key(substring) not in self.trie:
                start_idx += 1
                substring = text[start_idx:end_idx]
            # 加入匹配词或单字
            if self._get_key(substring) in self.trie:
                freq_tag = self.trie[self._get_key(substring)]
            else:
                freq_tag = (0, '')
            token_freq_list.append((substring, freq_tag))
            # 移动到下一个分词位置
            start_idx -= 1
        # 因为是从后往前扫描,最终结果需要反转
        tokens, score = self.score_(token_freq_list[::-1])
        return tokens, score
    def _split_by_lang(self, line):
        """
            将输入文本按语言(中文/非中文)分段。
            返回一个列表,每个元素是 (文本片段, 是否中文) 的元组。
            """
        result = []
        blocks = re.split(self.SPLIT_CHAR, line)
        for block in blocks:
            if not block:
                continue
            start = 0
            end = 1
            current_is_chinese = is_chinese(block[start])
            while end < len(block):
                next_is_chinese = is_chinese(block[end])
                if next_is_chinese == current_is_chinese:
                    end += 1
                else:
                    result.append((block[start:end], current_is_chinese))
                    start = end
                    end = start + 1
                    current_is_chinese = next_is_chinese
            # 处理最后一段
            if start < len(block):
                result.append((block[start:end], current_is_chinese))
        return result
    def _dfs(self, text, start_idx, current_tokens, all_tokenizations, depth=0, memo_cache=None):
        if memo_cache is None:
            memo_cache = {}
        MAX_DEPTH = 10
        if depth > MAX_DEPTH:
            if start_idx < len(text):
                remaining = "".join(text[start_idx:])
                all_tokenizations.append(current_tokens + [(remaining, (-12, ''))])
            return start_idx
        state_key = (start_idx, len(current_tokens))
        if state_key in memo_cache:
            return memo_cache[state_key]
        if start_idx >= len(text):
            all_tokenizations.append(current_tokens)
            memo_cache[state_key] = start_idx
            return start_idx
        max_reach = start_idx
        # 检查是否有重复字符块
        if start_idx < len(text) - 4:
            char = text[start_idx]
            if all(text[start_idx + i] == char for i in range(5)):
                end_idx = start_idx
                while end_idx < len(text) and text[end_idx] == char:
                    end_idx += 1
                mid_idx = start_idx + min(10, end_idx - start_idx)
                token = "".join(text[start_idx:mid_idx])
                token_key = self._get_key(token)
                token_info = self.trie.get(token_key, (-12, ''))
                max_reach = max(max_reach, self._dfs(
                    text, mid_idx, current_tokens + [(token, token_info)],
                    all_tokenizations, depth + 1, memo_cache
                ))
                memo_cache[state_key] = max_reach
                return max_reach
        # 启发式:是否可以跳一个字符
        next_start = start_idx + 1
        if start_idx + 2 <= len(text):
            t1 = "".join(text[start_idx:start_idx + 1])
            t2 = "".join(text[start_idx:start_idx + 2])
            if self.trie.has_keys_with_prefix(self._get_key(t1)) and not self.trie.has_keys_with_prefix(self._get_key(t2)):
                next_start = start_idx + 2
        # 连续单字合并启发式
        if len(current_tokens) >= 3 and all(len(tok[0]) == 1 for tok in current_tokens[-3:]):
            t_merge = current_tokens[-1][0] + "".join(text[start_idx:start_idx + 1])
            if self.trie.has_keys_with_prefix(self._get_key(t_merge)):
                next_start = start_idx + 2
        # 正常匹配
        for end_idx in range(next_start, len(text) + 1):
            token = "".join(text[start_idx:end_idx])
            token_key = self._get_key(token)
            if end_idx > start_idx + 1 and not self.trie.has_keys_with_prefix(token_key):
                break
            if token_key in self.trie:
                token_info = self.trie[token_key]
                max_reach = max(max_reach, self._dfs(
                    text, end_idx, current_tokens + [(token, token_info)],
                    all_tokenizations, depth + 1, memo_cache
                ))
        if max_reach > start_idx:
            memo_cache[state_key] = max_reach
            return max_reach
        # fallback单字
        single_char = "".join(text[start_idx:start_idx + 1])
        token_info = self.trie.get(self._get_key(single_char), (-12, ''))
        result = self._dfs(text, start_idx + 1, current_tokens + [(single_char, token_info)],
                           all_tokenizations, depth + 1, memo_cache)
        memo_cache[state_key] = result
        return result
    def _sort_tokens(self, tokenizations):
        """
        对多个分词方案进行打分排序,得分高的优先返回。
        """
        scored_results = []
        for token_seq in tokenizations:
            tokens, score = self.score_(token_seq)
            scored_results.append((tokens, score))
        # 按照得分从高到低排序
        return sorted(scored_results, key=lambda x: x[1], reverse=True)
    def freq(self, tk):
        k = self._get_key(tk)
        if k not in self.trie:
            return 0
        return int(math.exp(self.trie[k][0]) * self.FREQUENCY_DENOMINATOR + 0.5)
    def _merge(self, token_str):
        """
        合并分词结果:如果连续token拼接后是高频词,则优先合并。
        """
        merged_tokens = []
        tokens = re.sub(r"\s+", " ", token_str).strip().split()
        start_idx = 0
        while start_idx < len(tokens):
            end_idx = start_idx + 1
            # 尝试扩展窗口到 [start_idx:end_idx)
            for i in range(start_idx + 2, min(len(tokens) + 1, start_idx + 5)):
                candidate = "".join(tokens[start_idx:i])
                if re.search(self.SPLIT_CHAR, candidate) and self.freq(candidate):
                    end_idx = i
            merged_tokens.append("".join(tokens[start_idx:end_idx]))
            start_idx = end_idx
        # return " ".join(merged_tokens)
        return merged_tokens
    def tokenize(self, text: str) -> list[str]:
        """
        对输入的文本进行分词,返回分词结果列表。
        主要流程:
        1. 预处理文本。
        2. 按中英文/数字块进行切分。
        3. 对中文块应用智能分词算法,对其他块进行简单处理。
        4. 合并结果。
        """
        if not text:
            return []
        processed_text = self._preprocess(text)
        # 根据语言分割文本
        lang_segments = self._split_by_lang(processed_text)
        tokens = []
        for segment, is_chinese in lang_segments:
            if not is_chinese:
                # 英文分词 + 词形还原 + stemming
                english_tokens = word_tokenize(segment)
                processed = [self.stemmer.stem(self.lemmatizer.lemmatize(tok)) for tok in english_tokens]
                tokens.extend(processed)
                continue
            # 中文处理
            if len(segment) < 2 or re.match(r"[a-z\.-]+$", segment) or re.match(r"[0-9\.-]+$", segment):
                tokens.append(segment)
                continue
            # 中文用正向和反向分词
            forward_tokens, forward_score = self.max_forward(segment)
            backward_tokens, backward_score = self._max_backward(segment)
            if self.DEBUG:
                logging.debug("[FW] {} {}".format(forward_tokens, forward_score))
                logging.debug("[BW] {} {}".format(backward_tokens, backward_score))
            # 对 forward 和 backward 结果进行对齐
            f_idx, b_idx, f_base, b_base = 0, 0, 0, 0
            common_prefix_len = 0
            while (f_idx + common_prefix_len < len(forward_tokens) and
                   b_idx + common_prefix_len < len(backward_tokens) and
                   forward_tokens[f_idx + common_prefix_len] == backward_tokens[b_idx + common_prefix_len]):
                common_prefix_len += 1
            if common_prefix_len > 0:
                tokens.append(" ".join(forward_tokens[f_idx: f_idx + common_prefix_len]))
            f_base = f_idx + common_prefix_len
            b_base = b_idx + common_prefix_len
            f_idx = f_base + 1
            b_idx = b_base + 1
            # 对剩余的部分进行对齐处理
            while b_idx < len(backward_tokens) and f_idx < len(forward_tokens):
                b_sub = "".join(backward_tokens[b_base:b_idx])
                f_sub = "".join(forward_tokens[f_base:f_idx])
                if b_sub != f_sub:
                    # 谁短谁继续扩展
                    if len(b_sub) > len(f_sub):
                        f_idx += 1
                    else:
                        b_idx += 1
                    continue
                # 如果子串相等,但单词粒度不同
                if backward_tokens[b_idx] != forward_tokens[f_idx]:
                    b_idx += 1
                    f_idx += 1
                    continue
                # 对不同分词组合进行 DFS 搜索
                dfs_candidates = []
                self._dfs("".join(forward_tokens[f_base:f_idx]), 0, [], dfs_candidates)
                best_sequence = self._sort_tokens(dfs_candidates)[0][0]
                tokens.append(" ".join(best_sequence))
                # 找相同的前缀部分
                same_len = 1
                while (b_idx + same_len < len(backward_tokens) and
                       f_idx + same_len < len(forward_tokens) and
                       backward_tokens[b_idx + same_len] == forward_tokens[f_idx + same_len]):
                    same_len += 1
                tokens.append(" ".join(forward_tokens[f_idx: f_idx + same_len]))
                b_base = b_idx + same_len
                f_base = f_idx + same_len
                b_idx = b_base + 1
                f_idx = f_base + 1
            # 收尾:剩余的尾部一致
            if b_base < len(backward_tokens):
                assert f_base < len(forward_tokens)
                assert "".join(backward_tokens[b_base:]) == "".join(forward_tokens[f_base:])
                dfs_candidates = []
                self._dfs("".join(forward_tokens[f_base:]), 0, [], dfs_candidates)
                best_sequence = self._sort_tokens(dfs_candidates)[0][0]
                tokens.append(" ".join(best_sequence))
        # 合并、去除多余空格
        # return tokens
        merged_tokens = " ".join(tokens)
        # logging.debug("[TKS] {}".format(self.merge_(merged_tokens)))
        return self._merge(merged_tokens)

#### A.2 使用示例
if __name__ == '__main__':
    # --- 准备环境 ---
    # 1. 创建一个临时目录和词典文件用于演示
    data_dir = "temp_dict_data"
    os.makedirs(data_dir, exist_ok=True)
    dict_file_path = os.path.join(data_dir, "main_dict.txt")
    # 2. 写入主词典内容
    main_dict_content = """
北京 100000 n
大学 80000 n
北京大学 5000 n_org
大学城 3000 n_loc
智能 90000 adj
运维 60000 n
智能运维 6000 n_tech
解决方案 2000 n_prod
平台 70000 n
蓝鲸 100 n_brand
"""
    with open(dict_file_path, "w", encoding="utf-8") as f:
        f.write(main_dict_content.strip())
    # --- 开始使用分词器 ---
    print("=" * 20 + " 1. 初始化分词器 " + "=" * 20)
    # 首次初始化会从txt文件构建,并生成.trie缓存
    tokenizer = CustomTokenizer(dict_path=dict_file_path)
    print("\n" + "=" * 20 + " 2. 测试分词(歧义句) " + "=" * 20)
    ambiguous_text = "北京大学城"
    tokens = tokenizer.tokenize(ambiguous_text)
    print(f"输入: '{ambiguous_text}'")
    print(f"分词结果: {tokens}")  # 期望: ['北京', '大学城'] 或 ['北京大学', '城'] 取决于词频和评分模型
    print("\n" + "=" * 20 + " 3. 测试中英混合文本 " + "=" * 20)
    mixed_text = "我司的蓝鲸智云平台是领先的AIOps智能运维解决方案"
    tokens = tokenizer.tokenize(mixed_text)
    print(f"输入: '{mixed_text}'")
    print(f"分词结果: {tokens}")  # 期望: ['我司的', '蓝鲸', '智云', '平台', '是', '领先', '的', 'aiops', '智能运维', '解决方案']
    # --- 动态添加用户词典 ---
    print("\n" + "=" * 20 + " 4. 动态添加用户词典 " + "=" * 20)
    user_dict_path = os.path.join(data_dir, "user_dict.txt")
    user_dict_content = """
蓝鲸智云 9999 n_brand
AIOps 9999 n_tech
我司 100 phrase
"""
    with open(user_dict_path, "w", encoding="utf-8") as f:
        f.write(user_dict_content.strip())
    tokenizer.add_dictionary(user_dict_path)
    print("用户词典已添加。")
    print("\n" + "=" * 20 + " 5. 再次测试中英混合文本 " + "=" * 20)
    tokens_after_add = tokenizer.tokenize(mixed_text)
    print(f"输入: '{mixed_text}'")
    print(
        f"分词结果 (添加词典后): {tokens_after_add}")  # 期望: ['我司的', '蓝鲸智云', '平台', '是', '领先', '的', 'aiops', '智能运维', '解决方案']
    # --- 清理临时文件 ---
    import shutil
    shutil.rmtree(data_dir)
    print(f"\n演示完成,已清理临时目录: {data_dir}")


网站公告

今日签到

点亮在社区的每一天
去签到