一篇文章讲清楚超简单 离线语音合成TTS 和 离线语音识别 系统部署
本文只介绍两个轻量级的
语音合成用piper, 语音识别用vosk
部署简单,效果勉强
语音合成
推荐 piper (其他没用过)
安装
linux下安装
pip install piper-tts
下载模型(63M)
中文模型下载 zh_CN-huayan-medium.onnx 和 zh_CN-huayan-medium.onnx.json两个文件, 放在同一个目录
声音好听, 但是空格,符号等识别不了, 没有停顿, 没有更好的中文模型是个遗憾
如果只是windows系统用微软的TTS效果比这个好
模型下载地址
https://hf-mirror.com/rhasspy/piper-voices
https://hf-mirror.com/rhasspy/piper-voices/tree/main/zh/zh_CN/huayan/medium
使用
在当前目录下会输出66.wav这个音频文件
echo '今年前5个月,我国货物贸易进出口总值17.94万亿元' | piper --model ./zh_CN-huayan-medium.onnx --output_file 66.wav
对接
下面代码未验证过
import subprocess
import os
import asyncio
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
import uuid
app = FastAPI()
# 设置模型路径和输出目录(使用绝对路径)
# MODEL_PATH = './model/zh_CN-huayan-x_low.onnx'
MODEL_PATH = './model/zh_CN-huayan-medium.onnx'
OUTPUT_DIR = os.path.abspath('./output/') # 使用绝对路径
# 确保输出目录存在
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
class SynthesizeRequest(BaseModel):
text: str
mode: str = Query('sync', enum=['sync', 'async']) # 支持 'sync' 或 'async'
# 同步版本的 TTS 生成
def synthesize_text_sync(text: str, output_file: str):
try:
# 调用 piper 命令生成音频
command = f"echo '{text}' | piper --model {MODEL_PATH} --output_file {output_file}"
print(command)
result = subprocess.run(
#['echo', text, '|', 'piper', '--model', MODEL_PATH, '--output_file', output_file],
command,
check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# 打印 piper 命令的输出(标准输出和标准错误)
print(f"piper stdout: {result.stdout.decode()}")
print(f"piper stderr: {result.stderr.decode()}")
if not os.path.exists(output_file):
raise FileNotFoundError(f"音频文件未生成: {output_file}")
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=f"Failed to generate speech: {str(e)}")
# 异步版本的 TTS 生成
async def synthesize_text_async(text: str, output_file: str):
try:
# 使用异步方式调用 piper 命令生成音频
process = await asyncio.create_subprocess_exec(
'echo', text, '|', 'piper', '--model', MODEL_PATH, '--output_file', output_file,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# 打印 piper 命令的输出(标准输出和标准错误)
print(f"piper stdout: {stdout.decode()}")
print(f"piper stderr: {stderr.decode()}")
if process.returncode != 0 or not os.path.exists(output_file):
raise FileNotFoundError(f"音频文件未生成: {output_file}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate speech: {str(e)}")
@app.post("/synthesize/")
async def synthesize(request: SynthesizeRequest):
text = request.text
if not text:
raise HTTPException(status_code=400, detail="Text is required")
# 生成音频文件路径(绝对路径)
# output_file = os.path.join(OUTPUT_DIR, 'welcome.wav')
# 生成唯一的文件名,避免并发冲突
unique_id = str(uuid.uuid4()) # 使用 UUID 生成唯一标识符
output_file = os.path.join(OUTPUT_DIR, f"{unique_id}.wav") # 生成唯一的文件名
# 根据请求的模式选择同步或异步
if request.mode == 'sync':
synthesize_text_sync(text, output_file)
elif request.mode == 'async':
await synthesize_text_async(text, output_file)
else:
raise HTTPException(status_code=400, detail="Invalid mode, must be 'sync' or 'async'")
# 确保文件存在后再返回
if not os.path.exists(output_file):
raise HTTPException(status_code=500, detail=f"音频文件生成失败: {output_file}")
# 返回音频文件
return FileResponse(output_file, media_type='audio/wav')
语音识别
用vosk
1,nuget安装vosk
2,去官网下载一个中文模型(只有42M) : https://alphacephei.com/vosk/models
42M的模型效果很一般,好在使用简单
还有个1.3G的模型,没有测试,应该不错
using Vosk;
/// <summary>
/// 语音转文字
/// 需要去官网下载模型: https://alphacephei.com/vosk/models
/// 解压模型后放在项目目录下面
/// </summary>
public class VoskDemo
{
public static void DemoBytes(Model model)
{
// Demo byte buffer 基本语音识别
VoskRecognizer rec = new VoskRecognizer(model, 16000.0f);
rec.SetMaxAlternatives(0);
rec.SetWords(true);
using(Stream source = File.OpenRead("test.wav")) {
byte[] buffer = new byte[4096];
int bytesRead;
while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0) {
if (rec.AcceptWaveform(buffer, bytesRead)) {
Console.WriteLine(rec.Result());
} else {
Console.WriteLine(rec.PartialResult());
}
}
}
Console.WriteLine(rec.FinalResult());
}
public static void DemoFloats(Model model)
{
// Demo float array 流媒体
VoskRecognizer rec = new VoskRecognizer(model, 16000.0f);
using(Stream source = File.OpenRead("test.wav")) {
byte[] buffer = new byte[4096];
int bytesRead;
while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0) {
float[] fbuffer = new float[bytesRead / 2];
for (int i = 0, n = 0; i < fbuffer.Length; i++, n+=2) {
fbuffer[i] = BitConverter.ToInt16(buffer, n);
}
if (rec.AcceptWaveform(fbuffer, fbuffer.Length)) {
Console.WriteLine(rec.Result());
} else {
Console.WriteLine(rec.PartialResult());
}
}
}
Console.WriteLine(rec.FinalResult());
}
public static void DemoSpeaker(Model model)
{
// Output speakers 说话人识别
SpkModel spkModel = new SpkModel("model-spk");
VoskRecognizer rec = new VoskRecognizer(model, 16000.0f);
rec.SetSpkModel(spkModel);
using(Stream source = File.OpenRead("test.wav")) {
byte[] buffer = new byte[4096];
int bytesRead;
while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0) {
if (rec.AcceptWaveform(buffer, bytesRead)) {
Console.WriteLine(rec.Result());
} else {
Console.WriteLine(rec.PartialResult());
}
}
}
Console.WriteLine(rec.FinalResult());
}
public static void Test()
{
// You can set to -1 to disable logging messages
Vosk.Vosk.SetLogLevel(0);
Model model = new Model("vosk-model-small-cn-0.22"); // 模型目录
//DemoBytes(model);
//DemoFloats(model);
DemoSpeaker(model);
}
}