从音频到Token:构建原神角色语音识别模型的完整实践
本文将带你从零实现一个基于音频Token化的角色语音识别系统,完整复现原神角色语音分类任务,包含数据处理、模型训练和推理全流程。
音频波形通过滑动窗口转换为数值Token序列的过程
一、为什么需要音频Token化?
传统音频处理通常依赖MFCC、频谱图等特征,但这些方法:
- 丢失原始波形细节
- 难以与现代Transformer架构无缝集成
- 特征工程复杂度高
创新思路:将原始音频波形直接转换为离散Token序列,使音频能像文本一样被语言模型处理。这种方法:
- 保留完整波形信息
- 兼容预训练语言模型架构
- 简化特征工程流程
二、核心数据处理:wav_to_token函数详解
def wav_to_token(path):# 1. 音频标准化:零均值单位方差sr, audio = wavfile.read(path)left = (audio - np.mean(audio)) / np.std(audio)# 2. 生成参考信号(关键创新点)right = np.linspace(0.1, sr*np.pi, left.size)# 3. 滑动窗口特征提取sim_list = []slope_list = []for i in range(0, len(left) - 400, 400):x, y = left[i:i + 1200], right[i:i + 1200]# 确保窗口长度一致if len(x) > len(y): x = x[:len(y)]# 线性回归拟合波形趋势slope, intercept = linear_regression(x, y)# 计算拟合质量(余弦相似度)sim_list.append(cosine_similarity(slope * x + intercept, y))slope_list.append(slope)# 4. Token量化sim_list = min_max_normalize(sim_list) * 2**6 # 64级量化slope_list = min_max_normalize(slope_list) * 2**7 # 128级量化sim_list = sim_list.astype(np.int16)slope_list = slope_list.astype(np.int16)# 5. 生成最终Tokenres = sim_list * slope_list # 特征融合return res[res != 0] # 去除零值
关键设计解析:
-
双特征融合机制:
sim_list
:衡量原始波形与线性趋势的匹配度(0-63)slope_list
:表征波形局部斜率变化(0-127)- 通过乘法融合创建64×128=8192种独特Token
-
参考信号设计:
right = np.linspace(0.1, sr*np.pi, left.size)
生成线性增长的参考信号,使回归斜率能反映波形变化速率,避免绝对数值干扰
-
滑动窗口参数:
- 窗口大小:1200样本(400步长)
- 采样率影响:44.1kHz下窗口≈27ms(语音处理常用帧长)
可视化Token生成过程
# 在wav_to_token函数中添加
plt.figure(figsize=(12, 8))
plt.subplot(2,1,1)
plt.plot(left[:2400], 'b', label='原始波形')
plt.plot(right[:2400], 'r--', label='参考信号')
plt.legend()plt.subplot(2,1,2)
plt.plot(sim_list, 'g', label='相似度特征')
plt.plot(slope_list, 'm', label='斜率特征')
plt.legend()
plt.savefig('token_generation.png', dpi=150)
三、词汇表构建:音频Token与文本标记的融合
# 特殊标记
voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>"]# 添加角色名称(从目录名提取)
voc += [i.split("\\")[-1] for i in dirs] # 如"胡桃", "钟离"# 添加音频Token空间(0-8197)
voc += [str(i) for i in range(8198)]# 创建ID映射
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
词汇表示例:
Token类型 | 示例 | ID | 用途 |
---|---|---|---|
特殊标记 | `< | wav | >` |
角色名称 | 胡桃 | 12 | 分类目标 |
音频Token | 1245 | 20 | 波形特征 |
四、数据集构建:序列化音频样本
# 样本结构:[起始标记] + [音频Token序列] + [角色ID] + [结束标记]
data_set = []
for path in paths:tokens = wav_to_token(path)token_idx = [voc_x2id[str(i)] for i in tokens]role_id = voc_x2id[path.split("\\")[-2]] # 从路径提取角色名# 完整序列sample = [voc_x2id["<|im_start|>"]] + token_idx + [role_id] + [voc_x2id["<|im_end|>"]]data_set.append(sample)
序列化示例:
[3, 1245, 78, 309, ..., 8192, 12, 4] ↑ ↑ ↑ ↑| | | |
起始 音频特征 胡桃 结束
未来展望:
- 扩展到语音识别任务(添加字符级Token)
- 结合对比学习提升特征表示
- 部署到移动端实现实时角色识别
技术启示:当我们将音频视为“可读的文本”时,语音处理就变成了自然语言处理——这是通往统一多模态模型的重要一步。
附录:关键函数完整实现
import numpy as np
import pandas as pd
from tqdm import tqdmfrom shua2 import wav_to_token
from glob import glob
from model3 import SamOut
# from system_model import SamOut
import torch
from torch import nn, optim
import timetorch.manual_seed(42)
np.random.seed(42)
dirs=glob("C:/Users/dfy918/Downloads/yuanshen_zip/yuanshen_zip/*")paths = np.hstack([glob(dir+"/wav/*.wav") for dir in dirs])
voc = set([str(i) for i in range(8198)])# voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>", "<|cat|>", "<|cow|>", "<|dog|>", "<|pig|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc = ["<|pad|>","<|im_start|>","<|im_end|>", "<|wav|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
voc_size = len(voc)
# data_set = pd.read_pickle("wav.pkl")
data_set = []
for path in tqdm(paths):tokens = wav_to_token(path)token_idx = []for i in tokens.astype("str").tolist():token_idx.append(voc_x2id[i])data_set.append([1] + token_idx + [voc_x2id[path.split("\\")[-2].split("/")[0]]]+[2])
pd.to_pickle(data_set, "wav.pkl")
np.random.shuffle(data_set)train_data_set = data_set[:int(len(data_set) * 0.8)]
val_data_set = data_set[int(len(data_set) * 0.8):]
num_layers = 2
hidden_size = 2 ** 6 * num_layers
num_heads = num_layers
learning_rate = 0.001
batch_size = 32
num_epochs = 1000model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)
params = 0
for i in model.parameters():if i.shape != torch.Size([]):params += i.numel()
print(f"Total parameters: {params}")criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)start_time = time.time()
for epoch in range(num_epochs):# 训练阶段np.random.shuffle(train_data_set)model.train()bar=tqdm(range(0, len(train_data_set), batch_size))for i in bar:batch_data = train_data_set[i:i + batch_size]# 填充批次使所有序列长度相同max_len = max(len(seq) for seq in batch_data)padded_batch = []for seq in batch_data:padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)进行填充padded_batch.append(padded_seq)data = torch.tensor(padded_batch, dtype=torch.long)input_tensor = data[:, :-1]target_tensor = data[:, 1:]output, _ = model(input_tensor)output = output.reshape(-1, voc_size)target_tensor = target_tensor.reshape(-1)loss = criterion(output, target_tensor)optimizer.zero_grad()loss.backward()optimizer.step()bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')# 验证阶段model.eval()val_loss = 0correct = []total = 0with torch.no_grad():for i in range(0, len(val_data_set), batch_size):batch_data = val_data_set[i:i + batch_size]max_len = max(len(seq) for seq in batch_data)padded_batch = []for seq in batch_data:padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)进行填充padded_batch.append(padded_seq)data = torch.tensor(padded_batch, dtype=torch.long)input_tensor = data[:, :-1]target_tensor = data[:, 1:]output, _ = model(input_tensor)acc=np.mean((torch.argmax(output,-1)==target_tensor).numpy())correct.append(acc)output = output.reshape(-1, voc_size)target_tensor_flat = target_tensor.reshape(-1)# 计算验证损失val_loss += criterion(output, target_tensor_flat).item()# 计算准确率 - 只关注倒数第二个token(类别标记)# 获取每个序列的倒数第二个位置avg_val_loss = val_loss / (len(val_data_set) / batch_size)acc = np.mean(correct)print(f'Epoch [{epoch + 1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Acc: {acc:.4f}, Time: {time.time() - start_time:.2f}s')print("Training complete.")
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile# 中文显示支持
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = Falsedef cosine_similarity(a, b):"""计算余弦相似度"""return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))def linear_regression(x, y):"""最小二乘法线性回归"""n = len(x)sum_x, sum_y = np.sum(x), np.sum(y)sum_xy = np.sum(x * y)sum_x2 = np.sum(x ** 2)slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)intercept = (sum_y - slope * sum_x) / nreturn slope, interceptdef min_max_normalize(data):min_val = np.min(data)max_val = np.max(data)return (data - min_val) / (max_val - min_val + 10 ** -8)def wav_to_token(path):# 数据读取sr, audio = wavfile.read(path)left = (audio - np.mean(audio)) / np.std(audio)right = np.linspace(0.1,sr*np.pi,left.size)# 滑动窗口分析sim_list = []slope_list = []for i in range(0, len(left) - 400, 400):x, y = left[i:i + 1200], right[i:i + 1200]if len(x) > len(y): x = x[:len(y)]slope, intercept = linear_regression(x, y)sim_list.append(cosine_similarity(slope * x + intercept, y))slope_list.append(slope)# 可视化sim_list = np.array(sim_list)slope_list = np.array(slope_list)# token 化sim_list = min_max_normalize(sim_list) * 2 ** 6slope_list = min_max_normalize(slope_list) * 2 ** 7sim_list = sim_list.astype(np.int16)slope_list = slope_list.astype(np.int16)res = sim_list * slope_listreturn res[res != 0]# 归一化 的最大值最小值应该是 全局考虑而不是 一段考虑 最好是精度极限