一、利用创建好的对象来调用音频服务
上周从上图的getaudiocode()方法进去感受了一下底层小智的构造如何实现。所以用一个codec来接收我们所构造的音频对象。
下来是用构造好的音频对象来调用音频初始化服务Initialize,因为启动函数Application函数的类中有audio_servicez_所以可以进行调用。
这段初始化代码的核心作用是:
1绑定并启动音频编解码器
2配置音频数据流的格式和处理流程
3按需初始化音频处理器和唤醒词检测模块
4设置好各类回调,保证音频事件能及时通知到主程序
5创建定时器,自动管理音频硬件电源
void AudioService::Initialize(AudioCodec* codec) {
// 保存传入的音频编解码器指针
codec_ = codec;
// 启动音频编解码器,准备采集和播放
codec_->Start();
/* 初始化 Opus 解码器和编码器 */
// 创建 Opus 解码器,采样率与输出一致,单声道,帧长为 OPUS_FRAME_DURATION_MS
opus_decoder_ = std::make_unique<OpusDecoderWrapper>(codec->output_sample_rate(), 1, OPUS_FRAME_DURATION_MS);
// 创建 Opus 编码器,采样率固定为 16kHz,单声道,帧长为 OPUS_FRAME_DURATION_MS
opus_encoder_ = std::make_unique<OpusEncoderWrapper>(16000, 1, OPUS_FRAME_DURATION_MS);
// 设置编码复杂度为最低,节省算力
opus_encoder_->SetComplexity(0);
// 如果输入采样率不是 16kHz,则配置重采样器,将输入音频转换为 16kHz
if (codec->input_sample_rate() != 16000) {
input_resampler_.Configure(codec->input_sample_rate(), 16000);
reference_resampler_.Configure(codec->input_sample_rate(), 16000);
}
// 根据编译配置选择不同的音频处理器(如带有回声消除的AFE,或无处理的空实现)
#if CONFIG_USE_AUDIO_PROCESSOR
audio_processor_ = std::make_unique<AfeAudioProcessor>();
#else
audio_processor_ = std::make_unique<NoAudioProcessor>();
#endif
// 根据编译配置选择不同的唤醒词检测算法
#if CONFIG_USE_AFE_WAKE_WORD
wake_word_ = std::make_unique<AfeWakeWord>();
#elif CONFIG_USE_ESP_WAKE_WORD
wake_word_ = std::make_unique<EspWakeWord>();
#elif CONFIG_USE_CUSTOM_WAKE_WORD
wake_word_ = std::make_unique<CustomWakeWord>();
#else
wake_word_ = nullptr;
#endif
// 设置音频处理器的输出回调,当有处理好的音频输出时,推入编码队列
audio_processor_->OnOutput([this](std::vector<int16_t>&& data) {
PushTaskToEncodeQueue(kAudioTaskTypeEncodeToSendQueue, std::move(data));
});
// 设置语音活动检测(VAD)回调,检测到说话状态变化时,更新状态并通知外部
audio_processor_->OnVadStateChange([this](bool speaking) {
voice_detected_ = speaking;
if (callbacks_.on_vad_change) {
callbacks_.on_vad_change(speaking);
}
});
// 如果启用了唤醒词检测,设置唤醒词检测回调,检测到唤醒词时通知外部
if (wake_word_) {
wake_word_->OnWakeWordDetected([this](const std::string& wake_word) {
if (callbacks_.on_wake_word_detected) {
callbacks_.on_wake_word_detected(wake_word);
}
});
}
// 创建音频电源管理定时器,定期检查音频输入/输出是否需要关闭以省电
esp_timer_create_args_t audio_power_timer_args = {
.callback = [](void* arg) {
AudioService* audio_service = (AudioService*)arg;
audio_service->CheckAndUpdateAudioPowerState();
},
.arg = this,
.dispatch_method = ESP_TIMER_TASK,
.name = "audio_power_timer",
.skip_unhandled_events = true,
};
esp_timer_create(&audio_power_timer_args, &audio_power_timer_);
}
二、启动音频服务
经过上部分的初始化,配置好了音频的编解码器,以及处理时对于音频的要求(不符合要求的要重新采样为符合要求的格式),还包括唤醒词的检测、提取和回调。
启动流程(Start)
1标记服务未停止
service_stopped_ = false;
让各任务知道服务正在运行。
2清除音频相关事件位
xEventGroupClearBits(...)
确保音频输入、唤醒词、音频处理等任务可以正常启动。
3启动音频电源管理定时器
esp_timer_start_periodic(...)
每秒检查一次音频硬件的电源状态,自动省电。
4启动音频输入任务
xTaskCreatePinnedToCore 或 xTaskCreate
创建音频采集任务,负责从麦克风采集音频数据。
5启动音频输出任务
xTaskCreate
创建音频播放任务,负责将音频数据输出到扬声器。
6启动 Opus 编解码任务
xTaskCreate
创建音频编解码任务,负责音频数据的编码(发送)和解码(播放)。
void AudioService::Start() {
// 标记服务未停止
service_stopped_ = false;
// 清除音频相关的事件位,确保任务可以正常启动
xEventGroupClearBits(event_group_, AS_EVENT_AUDIO_TESTING_RUNNING | AS_EVENT_WAKE_WORD_RUNNING | AS_EVENT_AUDIO_PROCESSOR_RUNNING);
// 启动音频电源管理定时器,每秒检查一次音频硬件电源状态
esp_timer_start_periodic(audio_power_timer_, 1000000);
/* 启动音频输入任务 */
#if CONFIG_USE_AUDIO_PROCESSOR
// 如果使用音频处理器,任务绑定到指定内核
xTaskCreatePinnedToCore([](void* arg) {
AudioService* audio_service = (AudioService*)arg;
audio_service->AudioInputTask();
vTaskDelete(NULL);
}, "audio_input", 2048 * 3, this, 8, &audio_input_task_handle_, 1);
#else
// 不使用音频处理器,普通方式创建任务
xTaskCreate([](void* arg) {
AudioService* audio_service = (AudioService*)arg;
audio_service->AudioInputTask();
vTaskDelete(NULL);
}, "audio_input", 2048 * 3, this, 8, &audio_input_task_handle_);
#endif
/* 启动音频输出任务 */
xTaskCreate([](void* arg) {
AudioService* audio_service = (AudioService*)arg;
audio_service->AudioOutputTask();
vTaskDelete(NULL);
}, "audio_output", 4096, this, 3, &audio_output_task_handle_);
/* 启动 Opus 编解码任务 */
xTaskCreate([](void* arg) {
AudioService* audio_service = (AudioService*)arg;
audio_service->OpusCodecTask();
vTaskDelete(NULL);
}, "opus_codec", 4096 * 7, this, 2, &opus_codec_task_handle_);
}
三、音频回调服务
下来回到Application函数内,下一步执行下图这一模块:
首先定义一个callbacks对象,他的类型如下:
AudioServiceCallbacks 是一个回调函数集合,用于让外部(比如主应用 Application)能够“订阅”音频服务(AudioService)中的各种事件。当音频服务内部发生特定事件时,会自动调用这些回调,通知外部进行相应处理。
struct AudioServiceCallbacks { std::function<void(void)> on_send_queue_available; std::function<void(const std::string&)> on_wake_word_detected; std::function<void(bool)> on_vad_change; std::function<void(void)> on_audio_testing_queue_full; };
让主程序通过事件组机制,能够及时响应音频服务中的关键事件,实现音频事件的异步通知和处理。
每个成员的含义
- on_send_queue_available
类型:std::function<void(void)>
说明:当音频发送队列有可用数据时触发。比如可以通知主程序“可以发送音频数据到服务器了”。
- on_wake_word_detected
类型:std::function<void(const std::string&)>
说明:当检测到唤醒词(如“小智”)时触发。参数是检测到的唤醒词内容。
- on_vad_change
类型:std::function<void(bool)>
说明:当语音活动检测(VAD)状态发生变化时触发。参数 bool 表示当前是否有人在说话(true=正在说话,false=静音)。
- on_audio_testing_queue_full
类型:std::function<void(void)>
说明:当音频测试队列已满时触发。一般用于调试或测试场景。
异步和函数回调的区别?
方面 | 异步执行 | 自动回调 |
---|---|---|
是否并发 | 是,任务后台运行 | 不一定,回调是响应机制 |
主体是谁 | 程序发起的异步任务 | 异步任务完成后执行的函数 |
控制权 | 主程序不阻塞,控制权立即返回 | 控制权在回调被触发时才回到你手里 |
是否依赖异步 | 异步通常搭配回调使用 | 回调常用在异步任务,但也可用于同步场景 |
举个例子 | setTimeout() 不会阻塞主线程 |
setTimeout(fn, 1000) 中的 fn 是回调 |
四、音频服务具体功能
分别了解下列三个核心任务函数:
- AudioInputTask():音频采集
- AudioOutputTask():音频播放
- OpusCodecTask():音频编解码
// 音频输入任务,运行在一个 FreeRTOS 任务中
void AudioService::AudioInputTask() {
while (true) {
// 等待音频相关事件触发:测试模式、唤醒词检测、通用音频处理
EventBits_t bits = xEventGroupWaitBits(
event_group_,
AS_EVENT_AUDIO_TESTING_RUNNING |
AS_EVENT_WAKE_WORD_RUNNING |
AS_EVENT_AUDIO_PROCESSOR_RUNNING,
pdFALSE, // 不清除标志位
pdFALSE, // 任意一个事件即可返回
portMAX_DELAY // 无限等待
);
// 如果服务已经停止,则退出任务
if (service_stopped_) {
break;
}
// 若麦克风需要预热,延迟一段时间后继续下一轮循环
if (audio_input_need_warmup_) {
audio_input_need_warmup_ = false;
vTaskDelay(pdMS_TO_TICKS(120)); // 延迟 120ms
continue;
}
/** ==========================
* 音频测试处理逻辑(如按下 BOOT 录音)
* ========================== */
if (bits & AS_EVENT_AUDIO_TESTING_RUNNING) {
// 判断测试队列是否已满(按最大时长判断)
if (audio_testing_queue_.size() >= AUDIO_TESTING_MAX_DURATION_MS / OPUS_FRAME_DURATION_MS) {
ESP_LOGW(TAG, "Audio testing queue is full, stopping audio testing");
EnableAudioTesting(false); // 自动关闭测试
continue;
}
// 准备读取一帧音频数据(例如 20ms × 16000Hz)
std::vector<int16_t> data;
int samples = OPUS_FRAME_DURATION_MS * 16000 / 1000;
// 如果成功读取音频数据
if (ReadAudioData(data, 16000, samples)) {
// 若为双声道,仅保留左声道数据(变为单声道)
if (codec_->input_channels() == 2) {
auto mono_data = std::vector<int16_t>(data.size() / 2);
for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) {
mono_data[i] = data[j];
}
data = std::move(mono_data);
}
// 推送数据到测试编码队列
PushTaskToEncodeQueue(kAudioTaskTypeEncodeToTestingQueue, std::move(data));
continue; // 当前处理完毕,回到等待下一次事件
}
}
/** ==========================
* 唤醒词检测处理逻辑
* ========================== */
if (bits & AS_EVENT_WAKE_WORD_RUNNING) {
std::vector<int16_t> data;
int samples = wake_word_->GetFeedSize(); // 获取所需帧长度
// 若帧长度有效且成功读取数据
if (samples > 0 && ReadAudioData(data, 16000, samples)) {
wake_word_->Feed(data); // 投喂唤醒词检测器
continue;
}
}
/** ==========================
* 通用音频处理逻辑
* ========================== */
if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {
std::vector<int16_t> data;
int samples = audio_processor_->GetFeedSize(); // 获取处理器需要的数据大小
// 若帧有效且数据读取成功
if (samples > 0 && ReadAudioData(data, 16000, samples)) {
audio_processor_->Feed(std::move(data)); // 投喂音频处理器
continue;
}
}
// 如果没有任何已知事件被处理到,这通常是逻辑错误
ESP_LOGE(TAG, "Should not be here, bits: %lx", bits);
break; // 退出任务
}
// 最后,任务退出时打印警告日志
ESP_LOGW(TAG, "Audio input task stopped");
}
void AudioService::AudioInputTask() {
while (true) {
EventBits_t bits = xEventGroupWaitBits(event_group_, AS_EVENT_AUDIO_TESTING_RUNNING |
AS_EVENT_WAKE_WORD_RUNNING | AS_EVENT_AUDIO_PROCESSOR_RUNNING,
pdFALSE, pdFALSE, portMAX_DELAY);
if (service_stopped_) {
break;
}
if (audio_input_need_warmup_) {
audio_input_need_warmup_ = false;
vTaskDelay(pdMS_TO_TICKS(120));
continue;
}
/* Used for audio testing in NetworkConfiguring mode by clicking the BOOT button */
if (bits & AS_EVENT_AUDIO_TESTING_RUNNING) {
if (audio_testing_queue_.size() >= AUDIO_TESTING_MAX_DURATION_MS / OPUS_FRAME_DURATION_MS) {
ESP_LOGW(TAG, "Audio testing queue is full, stopping audio testing");
EnableAudioTesting(false);
continue;
}
std::vector<int16_t> data;
int samples = OPUS_FRAME_DURATION_MS * 16000 / 1000;
if (ReadAudioData(data, 16000, samples)) {
// If input channels is 2, we need to fetch the left channel data
if (codec_->input_channels() == 2) {
auto mono_data = std::vector<int16_t>(data.size() / 2);
for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) {
mono_data[i] = data[j];
}
data = std::move(mono_data);
}
PushTaskToEncodeQueue(kAudioTaskTypeEncodeToTestingQueue, std::move(data));
continue;
}
}
/* Feed the wake word */
if (bits & AS_EVENT_WAKE_WORD_RUNNING) {
std::vector<int16_t> data;
int samples = wake_word_->GetFeedSize();
if (samples > 0) {
if (ReadAudioData(data, 16000, samples)) {
wake_word_->Feed(data);
continue;
}
}
}
/* Feed the audio processor */
if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {
std::vector<int16_t> data;
int samples = audio_processor_->GetFeedSize();
if (samples > 0) {
if (ReadAudioData(data, 16000, samples)) {
audio_processor_->Feed(std::move(data));
continue;
}
}
}
ESP_LOGE(TAG, "Should not be here, bits: %lx", bits);
break;
}
ESP_LOGW(TAG, "Audio input task stopped");
}
void AudioService::AudioOutputTask() {
while (true) {
// 加锁等待播放队列非空或服务停止信号
std::unique_lock<std::mutex> lock(audio_queue_mutex_);
// 如果队列为空且服务未停止,则阻塞等待条件变量触发
audio_queue_cv_.wait(lock, [this]() {
return !audio_playback_queue_.empty() || service_stopped_;
});
// 如果检测到服务已经停止,则退出任务
if (service_stopped_) {
break;
}
// 从播放队列取出一个音频任务(前移出队)
auto task = std::move(audio_playback_queue_.front());
audio_playback_queue_.pop_front();
// 通知等待的线程队列已发生变化(唤醒可能的生产者)
audio_queue_cv_.notify_all();
// 解锁互斥量,开始进行播放处理
lock.unlock();
// 如果音频输出尚未启用,则启用输出并启动功耗监测定时器
if (!codec_->output_enabled()) {
codec_->EnableOutput(true);
esp_timer_start_periodic(audio_power_timer_, AUDIO_POWER_CHECK_INTERVAL_MS * 1000);
}
// 将 PCM 数据输出到音频设备
codec_->OutputData(task->pcm);
// 更新时间戳记录为最近一次输出时间
last_output_time_ = std::chrono::steady_clock::now();
// 播放计数器 +1,用于调试/统计
debug_statistics_.playback_count++;
#if CONFIG_USE_SERVER_AEC
// 若启用了服务器端 AEC,并且任务中包含有效时间戳,则记录该时间戳
if (task->timestamp > 0) {
lock.lock(); // 重新加锁以保护 timestamp_queue_
timestamp_queue_.push_back(task->timestamp);
}
#endif
}
// 最后,任务退出时打印日志
ESP_LOGW(TAG, "Audio output task stopped");
}
void AudioService::OpusCodecTask() {
while (true) {
// 加锁并等待条件满足:
// - 服务已停止
// - 编码队列非空 且 发送队列未满
// - 解码队列非空 且 播放队列未满
std::unique_lock<std::mutex> lock(audio_queue_mutex_);
audio_queue_cv_.wait(lock, [this]() {
return service_stopped_ ||
(!audio_encode_queue_.empty() && audio_send_queue_.size() < MAX_SEND_PACKETS_IN_QUEUE) ||
(!audio_decode_queue_.empty() && audio_playback_queue_.size() < MAX_PLAYBACK_TASKS_IN_QUEUE);
});
// 若服务已停止,则退出任务
if (service_stopped_) {
break;
}
/** ========================
* 解码逻辑
* ======================== */
if (!audio_decode_queue_.empty() && audio_playback_queue_.size() < MAX_PLAYBACK_TASKS_IN_QUEUE) {
// 取出一个待解码数据包
auto packet = std::move(audio_decode_queue_.front());
audio_decode_queue_.pop_front();
audio_queue_cv_.notify_all();
lock.unlock(); // 解锁以便其他线程访问队列
// 构造新的播放任务
auto task = std::make_unique<AudioTask>();
task->type = kAudioTaskTypeDecodeToPlaybackQueue;
task->timestamp = packet->timestamp;
// 设置解码参数
SetDecodeSampleRate(packet->sample_rate, packet->frame_duration);
// 解码数据
if (opus_decoder_->Decode(std::move(packet->payload), task->pcm)) {
// 如果解码后的采样率不一致,则重采样
if (opus_decoder_->sample_rate() != codec_->output_sample_rate()) {
int target_size = output_resampler_.GetOutputSamples(task->pcm.size());
std::vector<int16_t> resampled(target_size);
output_resampler_.Process(task->pcm.data(), task->pcm.size(), resampled.data());
task->pcm = std::move(resampled);
}
// 加锁并推送到播放队列
lock.lock();
audio_playback_queue_.push_back(std::move(task));
audio_queue_cv_.notify_all();
} else {
// 解码失败
ESP_LOGE(TAG, "Failed to decode audio");
lock.lock();
}
debug_statistics_.decode_count++;
}
/** ========================
* 编码逻辑
* ======================== */
if (!audio_encode_queue_.empty() && audio_send_queue_.size() < MAX_SEND_PACKETS_IN_QUEUE) {
auto task = std::move(audio_encode_queue_.front());
audio_encode_queue_.pop_front();
audio_queue_cv_.notify_all();
lock.unlock(); // 解锁以进行编码
// 构建音频流数据包
auto packet = std::make_unique<AudioStreamPacket>();
packet->frame_duration = OPUS_FRAME_DURATION_MS;
packet->sample_rate = 16000;
packet->timestamp = task->timestamp;
// 编码 PCM 数据
if (!opus_encoder_->Encode(std::move(task->pcm), packet->payload)) {
ESP_LOGE(TAG, "Failed to encode audio");
continue;
}
// 根据任务类型,推送到不同队列
if (task->type == kAudioTaskTypeEncodeToSendQueue) {
{
std::lock_guard<std::mutex> lock(audio_queue_mutex_);
audio_send_queue_.push_back(std::move(packet));
}
// 通知有新的可发送数据
if (callbacks_.on_send_queue_available) {
callbacks_.on_send_queue_available();
}
} else if (task->type == kAudioTaskTypeEncodeToTestingQueue) {
std::lock_guard<std::mutex> lock(audio_queue_mutex_);
audio_testing_queue_.push_back(std::move(packet));
}
debug_statistics_.encode_count++;
lock.lock(); // 重新加锁以进入下一轮循环
}
}
// 任务退出时记录日志
ESP_LOGW(TAG, "Opus codec task stopped");
}