本文将带你从零实现一个基于音频Token化的角色语音识别系统,完整复现原神角色语音分类任务,包含数据处理、模型训练和推理全流程。
音频波形通过滑动窗口转换为数值Token序列的过程
一、为什么需要音频Token化?
传统音频处理通常依赖MFCC、频谱图等特征,但这些方法:
- 丢失原始波形细节
- 难以与现代Transformer架构无缝集成
- 特征工程复杂度高
创新思路:将原始音频波形直接转换为离散Token序列,使音频能像文本一样被语言模型处理。这种方法:
- 保留完整波形信息
- 兼容预训练语言模型架构
- 简化特征工程流程
二、核心数据处理:wav_to_token函数详解
def wav_to_token(path):
# 1. 音频标准化:零均值单位方差
sr, audio = wavfile.read(path)
left = (audio - np.mean(audio)) / np.std(audio)
# 2. 生成参考信号(关键创新点)
right = np.linspace(0.1, sr*np.pi, left.size)
# 3. 滑动窗口特征提取
sim_list = []
slope_list = []
for i in range(0, len(left) - 400, 400):
x, y = left[i:i + 1200], right[i:i + 1200]
# 确保窗口长度一致
if len(x) > len(y): x = x[:len(y)]
# 线性回归拟合波形趋势
slope, intercept = linear_regression(x, y)
# 计算拟合质量(余弦相似度)
sim_list.append(cosine_similarity(slope * x + intercept, y))
slope_list.append(slope)
# 4. Token量化
sim_list = min_max_normalize(sim_list) * 2**6 # 64级量化
slope_list = min_max_normalize(slope_list) * 2**7 # 128级量化
sim_list = sim_list.astype(np.int16)
slope_list = slope_list.astype(np.int16)
# 5. 生成最终Token
res = sim_list * slope_list # 特征融合
return res[res != 0] # 去除零值
关键设计解析:
双特征融合机制:
sim_list
:衡量原始波形与线性趋势的匹配度(0-63)slope_list
:表征波形局部斜率变化(0-127)- 通过乘法融合创建64×128=8192种独特Token
参考信号设计:
right = np.linspace(0.1, sr*np.pi, left.size)
生成线性增长的参考信号,使回归斜率能反映波形变化速率,避免绝对数值干扰
滑动窗口参数:
- 窗口大小:1200样本(400步长)
- 采样率影响:44.1kHz下窗口≈27ms(语音处理常用帧长)
可视化Token生成过程
# 在wav_to_token函数中添加
plt.figure(figsize=(12, 8))
plt.subplot(2,1,1)
plt.plot(left[:2400], 'b', label='原始波形')
plt.plot(right[:2400], 'r--', label='参考信号')
plt.legend()
plt.subplot(2,1,2)
plt.plot(sim_list, 'g', label='相似度特征')
plt.plot(slope_list, 'm', label='斜率特征')
plt.legend()
plt.savefig('token_generation.png', dpi=150)
三、词汇表构建:音频Token与文本标记的融合
# 特殊标记
voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>"]
# 添加角色名称(从目录名提取)
voc += [i.split("\\")[-1] for i in dirs] # 如"胡桃", "钟离"
# 添加音频Token空间(0-8197)
voc += [str(i) for i in range(8198)]
# 创建ID映射
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
词汇表示例:
Token类型 | 示例 | ID | 用途 |
---|---|---|---|
特殊标记 | `< | wav | >` |
角色名称 | 胡桃 |
12 | 分类目标 |
音频Token | 1245 |
20 | 波形特征 |
四、数据集构建:序列化音频样本
# 样本结构:[起始标记] + [音频Token序列] + [角色ID] + [结束标记]
data_set = []
for path in paths:
tokens = wav_to_token(path)
token_idx = [voc_x2id[str(i)] for i in tokens]
role_id = voc_x2id[path.split("\\")[-2]] # 从路径提取角色名
# 完整序列
sample = [voc_x2id["<|im_start|>"]] + token_idx + [role_id] + [voc_x2id["<|im_end|>"]]
data_set.append(sample)
序列化示例:
[3, 1245, 78, 309, ..., 8192, 12, 4]
↑ ↑ ↑ ↑
| | | |
起始 音频特征 胡桃 结束
未来展望:
- 扩展到语音识别任务(添加字符级Token)
- 结合对比学习提升特征表示
- 部署到移动端实现实时角色识别
技术启示:当我们将音频视为“可读的文本”时,语音处理就变成了自然语言处理——这是通往统一多模态模型的重要一步。
附录:关键函数完整实现
import numpy as np
import pandas as pd
from tqdm import tqdm
from shua2 import wav_to_token
from glob import glob
from model3 import SamOut
# from system_model import SamOut
import torch
from torch import nn, optim
import time
torch.manual_seed(42)
np.random.seed(42)
dirs=glob("C:/Users/dfy918/Downloads/yuanshen_zip/yuanshen_zip/*")
paths = np.hstack([glob(dir+"/wav/*.wav") for dir in dirs])
voc = set([str(i) for i in range(8198)])
# voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>", "<|cat|>", "<|cow|>", "<|dog|>", "<|pig|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc = ["<|pad|>","<|im_start|>","<|im_end|>", "<|wav|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
voc_size = len(voc)
# data_set = pd.read_pickle("wav.pkl")
data_set = []
for path in tqdm(paths):
tokens = wav_to_token(path)
token_idx = []
for i in tokens.astype("str").tolist():
token_idx.append(voc_x2id[i])
data_set.append([1] + token_idx + [voc_x2id[path.split("\\")[-2].split("/")[0]]]+[2])
pd.to_pickle(data_set, "wav.pkl")
np.random.shuffle(data_set)
train_data_set = data_set[:int(len(data_set) * 0.8)]
val_data_set = data_set[int(len(data_set) * 0.8):]
num_layers = 2
hidden_size = 2 ** 6 * num_layers
num_heads = num_layers
learning_rate = 0.001
batch_size = 32
num_epochs = 1000
model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)
params = 0
for i in model.parameters():
if i.shape != torch.Size([]):
params += i.numel()
print(f"Total parameters: {params}")
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
start_time = time.time()
for epoch in range(num_epochs):
# 训练阶段
np.random.shuffle(train_data_set)
model.train()
bar=tqdm(range(0, len(train_data_set), batch_size))
for i in bar:
batch_data = train_data_set[i:i + batch_size]
# 填充批次使所有序列长度相同
max_len = max(len(seq) for seq in batch_data)
padded_batch = []
for seq in batch_data:
padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)进行填充
padded_batch.append(padded_seq)
data = torch.tensor(padded_batch, dtype=torch.long)
input_tensor = data[:, :-1]
target_tensor = data[:, 1:]
output, _ = model(input_tensor)
output = output.reshape(-1, voc_size)
target_tensor = target_tensor.reshape(-1)
loss = criterion(output, target_tensor)
optimizer.zero_grad()
loss.backward()
optimizer.step()
bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
# 验证阶段
model.eval()
val_loss = 0
correct = []
total = 0
with torch.no_grad():
for i in range(0, len(val_data_set), batch_size):
batch_data = val_data_set[i:i + batch_size]
max_len = max(len(seq) for seq in batch_data)
padded_batch = []
for seq in batch_data:
padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)进行填充
padded_batch.append(padded_seq)
data = torch.tensor(padded_batch, dtype=torch.long)
input_tensor = data[:, :-1]
target_tensor = data[:, 1:]
output, _ = model(input_tensor)
acc=np.mean((torch.argmax(output,-1)==target_tensor).numpy())
correct.append(acc)
output = output.reshape(-1, voc_size)
target_tensor_flat = target_tensor.reshape(-1)
# 计算验证损失
val_loss += criterion(output, target_tensor_flat).item()
# 计算准确率 - 只关注倒数第二个token(类别标记)
# 获取每个序列的倒数第二个位置
avg_val_loss = val_loss / (len(val_data_set) / batch_size)
acc = np.mean(correct)
print(
f'Epoch [{epoch + 1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Acc: {acc:.4f}, Time: {time.time() - start_time:.2f}s')
print("Training complete.")
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
# 中文显示支持
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False
def cosine_similarity(a, b):
"""计算余弦相似度"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def linear_regression(x, y):
"""最小二乘法线性回归"""
n = len(x)
sum_x, sum_y = np.sum(x), np.sum(y)
sum_xy = np.sum(x * y)
sum_x2 = np.sum(x ** 2)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
intercept = (sum_y - slope * sum_x) / n
return slope, intercept
def min_max_normalize(data):
min_val = np.min(data)
max_val = np.max(data)
return (data - min_val) / (max_val - min_val + 10 ** -8)
def wav_to_token(path):
# 数据读取
sr, audio = wavfile.read(path)
left = (audio - np.mean(audio)) / np.std(audio)
right = np.linspace(0.1,sr*np.pi,left.size)
# 滑动窗口分析
sim_list = []
slope_list = []
for i in range(0, len(left) - 400, 400):
x, y = left[i:i + 1200], right[i:i + 1200]
if len(x) > len(y): x = x[:len(y)]
slope, intercept = linear_regression(x, y)
sim_list.append(cosine_similarity(slope * x + intercept, y))
slope_list.append(slope)
# 可视化
sim_list = np.array(sim_list)
slope_list = np.array(slope_list)
# token 化
sim_list = min_max_normalize(sim_list) * 2 ** 6
slope_list = min_max_normalize(slope_list) * 2 ** 7
sim_list = sim_list.astype(np.int16)
slope_list = slope_list.astype(np.int16)
res = sim_list * slope_list
return res[res != 0]
# 归一化 的最大值最小值应该是 全局考虑而不是 一段考虑 最好是精度极限