Compare commits

...

2 Commits

Author SHA1 Message Date
Terrence
f39c112970 Refactor audio processing to enhance thread safety and state management
- Implement early return checks in Feed methods of AfeAudioProcessor, AfeWakeWord, CustomWakeWord, and EspWakeWord to prevent processing when not running.
- Introduce std::atomic for running state in CustomWakeWord and EspWakeWord to ensure thread-safe access.
- Consolidate input buffer management with mutex locks to avoid race conditions during Stop and Feed operations.
2026-02-04 12:17:16 +08:00
Terrence
f7e258979e Enhance audio processing and wake word detection
- Set task priority in Application::Run to improve responsiveness.
- Log detected wake words with their state in HandleWakeWordDetectedEvent.
- Streamline audio feeding in AudioService to handle both wake word and audio processor events.
- Implement input buffering in AfeAudioProcessor, AfeWakeWord, CustomWakeWord, and EspWakeWord to manage audio data more efficiently.
- Clear input buffers on stop to prevent residual data issues.
2026-02-04 12:04:19 +08:00
11 changed files with 148 additions and 58 deletions

View File

@@ -777,7 +777,9 @@ void Application::HandleWakeWordDetectedEvent() {
}
auto state = GetDeviceState();
auto wake_word = audio_service_.GetLastWakeWord();
ESP_LOGI(TAG, "Wake word detected: %s (state: %d)", wake_word.c_str(), (int)state);
if (state == kDeviceStateIdle) {
audio_service_.EncodeWakeWord();
auto wake_word = audio_service_.GetLastWakeWord();
@@ -793,8 +795,23 @@ void Application::HandleWakeWordDetectedEvent() {
}
// Channel already opened, continue directly
ContinueWakeWordInvoke(wake_word);
} else if (state == kDeviceStateSpeaking) {
} else if (state == kDeviceStateSpeaking || state == kDeviceStateListening) {
AbortSpeaking(kAbortReasonWakeWordDetected);
// Clear send queue to avoid sending residues to server
while (audio_service_.PopPacketFromSendQueue());
if (state == kDeviceStateListening) {
auto mode = aec_mode_ == kAecOff ? kListeningModeAutoStop : kListeningModeRealtime;
protocol_->SendStartListening(mode);
audio_service_.ResetDecoder();
audio_service_.PlaySound(Lang::Sounds::OGG_POPUP);
// Re-enable wake word detection as it was stopped by the detection itself
audio_service_.EnableWakeWordDetection(true);
} else {
// Play popup sound and start listening again
play_popup_on_listening_ = true;
SetListeningMode(aec_mode_ == kAecOff ? kListeningModeAutoStop : kListeningModeRealtime);
}
} else if (state == kDeviceStateActivating) {
// Restart the activation check if the wake word is detected during activation
SetDeviceState(kDeviceStateIdle);
@@ -822,6 +839,9 @@ void Application::ContinueWakeWordInvoke(const std::string& wake_word) {
}
// Set the chat state to wake word detected
protocol_->SendWakeWordDetected(wake_word);
// Set flag to play popup sound after state changes to listening
play_popup_on_listening_ = true;
SetListeningMode(aec_mode_ == kAecOff ? kListeningModeAutoStop : kListeningModeRealtime);
#else
// Set flag to play popup sound after state changes to listening
@@ -859,7 +879,7 @@ void Application::HandleStateChangedEvent() {
display->SetEmotion("neutral");
// Make sure the audio processor is running
if (!audio_service_.IsAudioProcessorRunning()) {
if (play_popup_on_listening_ || !audio_service_.IsAudioProcessorRunning()) {
// For auto mode, wait for playback queue to be empty before enabling voice processing
// This prevents audio truncation when STOP arrives late due to network jitter
if (listening_mode_ == kListeningModeAutoStop) {
@@ -869,9 +889,14 @@ void Application::HandleStateChangedEvent() {
// Send the start listening command
protocol_->SendStartListening(listening_mode_);
audio_service_.EnableVoiceProcessing(true);
audio_service_.EnableWakeWordDetection(false);
}
// TODO: Should use a Kconfig option to enable/disable wake word detection in listening mode
if (true) {
// Always ensure wake word detection state in listening
audio_service_.EnableWakeWordDetection(audio_service_.IsAfeWakeWord());
}
// Play popup sound after ResetDecoder (in EnableVoiceProcessing) has been called
if (play_popup_on_listening_) {
play_popup_on_listening_ = false;

View File

@@ -265,27 +265,18 @@ void AudioService::AudioInputTask() {
}
}
/* Feed the wake word */
if (bits & AS_EVENT_WAKE_WORD_RUNNING) {
/* Feed the wake word and/or audio processor */
if (bits & (AS_EVENT_WAKE_WORD_RUNNING | AS_EVENT_AUDIO_PROCESSOR_RUNNING)) {
int samples = 160; // 10ms
std::vector<int16_t> data;
int samples = wake_word_->GetFeedSize();
if (samples > 0) {
if (ReadAudioData(data, 16000, samples)) {
if (ReadAudioData(data, 16000, samples)) {
if (bits & AS_EVENT_WAKE_WORD_RUNNING) {
wake_word_->Feed(data);
continue;
}
}
}
/* Feed the audio processor */
if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {
std::vector<int16_t> data;
int samples = audio_processor_->GetFeedSize();
if (samples > 0) {
if (ReadAudioData(data, 16000, samples)) {
if (bits & AS_EVENT_AUDIO_PROCESSOR_RUNNING) {
audio_processor_->Feed(std::move(data));
continue;
}
continue;
}
}

View File

@@ -92,7 +92,18 @@ void AfeAudioProcessor::Feed(std::vector<int16_t>&& data) {
if (afe_data_ == nullptr) {
return;
}
afe_iface_->feed(afe_data_, data.data());
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
// Check running state inside lock to avoid TOCTOU race with Stop()
if (!IsRunning()) {
return;
}
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
size_t chunk_size = afe_iface_->get_feed_chunksize(afe_data_) * codec_->input_channels();
while (input_buffer_.size() >= chunk_size) {
afe_iface_->feed(afe_data_, input_buffer_.data());
input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + chunk_size);
}
}
void AfeAudioProcessor::Start() {
@@ -101,9 +112,12 @@ void AfeAudioProcessor::Start() {
void AfeAudioProcessor::Stop() {
xEventGroupClearBits(event_group_, PROCESSOR_RUNNING);
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
if (afe_data_ != nullptr) {
afe_iface_->reset_buffer(afe_data_);
}
input_buffer_.clear();
}
bool AfeAudioProcessor::IsRunning() {

View File

@@ -9,6 +9,7 @@
#include <string>
#include <vector>
#include <functional>
#include <mutex>
#include "audio_processor.h"
#include "audio_codec.h"
@@ -37,6 +38,8 @@ private:
AudioCodec* codec_ = nullptr;
int frame_samples_ = 0;
bool is_speaking_ = false;
std::vector<int16_t> input_buffer_;
std::mutex input_buffer_mutex_;
std::vector<int16_t> output_buffer_;
void AudioProcessorTask();

View File

@@ -3,6 +3,7 @@
#include <vector>
#include <functional>
#include <atomic>
#include "audio_processor.h"
#include "audio_codec.h"
@@ -27,7 +28,7 @@ private:
int frame_samples_ = 0;
std::function<void(std::vector<int16_t>&& data)> output_callback_;
std::function<void(bool speaking)> vad_state_change_callback_;
bool is_running_ = false;
std::atomic<bool> is_running_ = false;
};
#endif

View File

@@ -99,19 +99,30 @@ void AfeWakeWord::Start() {
void AfeWakeWord::Stop() {
xEventGroupClearBits(event_group_, DETECTION_RUNNING_EVENT);
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
if (afe_data_ != nullptr) {
afe_iface_->reset_buffer(afe_data_);
}
input_buffer_.clear();
}
void AfeWakeWord::Feed(const std::vector<int16_t>& data) {
if (afe_data_ == nullptr) {
return;
}
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
// Check running state inside lock to avoid TOCTOU race with Stop()
if (!(xEventGroupGetBits(event_group_) & DETECTION_RUNNING_EVENT)) {
return;
}
afe_iface_->feed(afe_data_, data.data());
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
size_t chunk_size = afe_iface_->get_feed_chunksize(afe_data_) * codec_->input_channels();
while (input_buffer_.size() >= chunk_size) {
afe_iface_->feed(afe_data_, input_buffer_.data());
input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + chunk_size);
}
}
size_t AfeWakeWord::GetFeedSize() {

View File

@@ -44,6 +44,8 @@ private:
std::function<void(const std::string& wake_word)> wake_word_detected_callback_;
AudioCodec* codec_ = nullptr;
std::string last_detected_wake_word_;
std::vector<int16_t> input_buffer_;
std::mutex input_buffer_mutex_;
TaskHandle_t wake_word_encode_task_ = nullptr;
StaticTask_t* wake_word_encode_task_buffer_ = nullptr;

View File

@@ -138,49 +138,64 @@ void CustomWakeWord::Start() {
void CustomWakeWord::Stop() {
running_ = false;
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
input_buffer_.clear();
}
void CustomWakeWord::Feed(const std::vector<int16_t>& data) {
if (multinet_model_data_ == nullptr || !running_) {
if (multinet_model_data_ == nullptr) {
return;
}
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
// Check running state inside lock to avoid TOCTOU race with Stop()
if (!running_) {
return;
}
esp_mn_state_t mn_state;
// If input channels is 2, we need to fetch the left channel data
if (codec_->input_channels() == 2) {
auto mono_data = std::vector<int16_t>(data.size() / 2);
for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) {
mono_data[i] = data[j];
for (size_t i = 0; i < data.size(); i += 2) {
input_buffer_.push_back(data[i]);
}
StoreWakeWordData(mono_data);
mn_state = multinet_->detect(multinet_model_data_, const_cast<int16_t*>(mono_data.data()));
} else {
StoreWakeWordData(data);
mn_state = multinet_->detect(multinet_model_data_, const_cast<int16_t*>(data.data()));
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
}
if (mn_state == ESP_MN_STATE_DETECTING) {
return;
} else if (mn_state == ESP_MN_STATE_DETECTED) {
esp_mn_results_t *mn_result = multinet_->get_results(multinet_model_data_);
for (int i = 0; i < mn_result->num && running_; i++) {
ESP_LOGI(TAG, "Custom wake word detected: command_id=%d, string=%s, prob=%f",
mn_result->command_id[i], mn_result->string, mn_result->prob[i]);
auto& command = commands_[mn_result->command_id[i] - 1];
if (command.action == "wake") {
last_detected_wake_word_ = command.text;
running_ = false;
if (wake_word_detected_callback_) {
wake_word_detected_callback_(last_detected_wake_word_);
int chunksize = multinet_->get_samp_chunksize(multinet_model_data_);
while (input_buffer_.size() >= chunksize) {
std::vector<int16_t> chunk(input_buffer_.begin(), input_buffer_.begin() + chunksize);
StoreWakeWordData(chunk);
esp_mn_state_t mn_state = multinet_->detect(multinet_model_data_, chunk.data());
if (mn_state == ESP_MN_STATE_DETECTED) {
esp_mn_results_t *mn_result = multinet_->get_results(multinet_model_data_);
for (int i = 0; i < mn_result->num && running_; i++) {
ESP_LOGI(TAG, "Custom wake word detected: command_id=%d, string=%s, prob=%f",
mn_result->command_id[i], mn_result->string, mn_result->prob[i]);
auto& command = commands_[mn_result->command_id[i] - 1];
if (command.action == "wake") {
last_detected_wake_word_ = command.text;
running_ = false;
input_buffer_.clear();
if (wake_word_detected_callback_) {
wake_word_detected_callback_(last_detected_wake_word_);
}
}
}
multinet_->clean(multinet_model_data_);
} else if (mn_state == ESP_MN_STATE_TIMEOUT) {
ESP_LOGD(TAG, "Command word detection timeout, cleaning state");
multinet_->clean(multinet_model_data_);
}
multinet_->clean(multinet_model_data_);
} else if (mn_state == ESP_MN_STATE_TIMEOUT) {
ESP_LOGD(TAG, "Command word detection timeout, cleaning state");
multinet_->clean(multinet_model_data_);
if (!running_) {
break;
}
input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + chunksize);
}
}

View File

@@ -53,6 +53,8 @@ private:
AudioCodec* codec_ = nullptr;
std::string last_detected_wake_word_;
std::atomic<bool> running_ = false;
std::vector<int16_t> input_buffer_;
std::mutex input_buffer_mutex_;
TaskHandle_t wake_word_encode_task_ = nullptr;
StaticTask_t* wake_word_encode_task_buffer_ = nullptr;

View File

@@ -54,21 +54,44 @@ void EspWakeWord::Start() {
void EspWakeWord::Stop() {
running_ = false;
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
input_buffer_.clear();
}
void EspWakeWord::Feed(const std::vector<int16_t>& data) {
if (wakenet_data_ == nullptr || !running_) {
if (wakenet_data_ == nullptr) {
return;
}
int res = wakenet_iface_->detect(wakenet_data_, (int16_t *)data.data());
if (res > 0) {
last_detected_wake_word_ = wakenet_iface_->get_word_name(wakenet_data_, res);
running_ = false;
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
// Check running state inside lock to avoid TOCTOU race with Stop()
if (!running_) {
return;
}
if (wake_word_detected_callback_) {
wake_word_detected_callback_(last_detected_wake_word_);
if (codec_->input_channels() == 2) {
for (size_t i = 0; i < data.size(); i += 2) {
input_buffer_.push_back(data[i]);
}
} else {
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
}
int chunksize = wakenet_iface_->get_samp_chunksize(wakenet_data_);
while (input_buffer_.size() >= chunksize) {
int res = wakenet_iface_->detect(wakenet_data_, input_buffer_.data());
if (res > 0) {
last_detected_wake_word_ = wakenet_iface_->get_word_name(wakenet_data_, res);
running_ = false;
input_buffer_.clear();
if (wake_word_detected_callback_) {
wake_word_detected_callback_(last_detected_wake_word_);
}
break;
}
input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + chunksize);
}
}

View File

@@ -9,6 +9,7 @@
#include <vector>
#include <functional>
#include <atomic>
#include <mutex>
#include "audio_codec.h"
#include "wake_word.h"
@@ -37,6 +38,8 @@ private:
std::function<void(const std::string& wake_word)> wake_word_detected_callback_;
std::string last_detected_wake_word_;
std::vector<int16_t> input_buffer_;
std::mutex input_buffer_mutex_;
};
#endif