forked from xiaozhi/xiaozhi-esp32
Refactor audio processing to enhance thread safety and state management
- Implement early return checks in Feed methods of AfeAudioProcessor, AfeWakeWord, CustomWakeWord, and EspWakeWord to prevent processing when not running. - Introduce std::atomic for running state in CustomWakeWord and EspWakeWord to ensure thread-safe access. - Consolidate input buffer management with mutex locks to avoid race conditions during Stop and Feed operations.
This commit is contained in:
@@ -89,11 +89,15 @@ size_t AfeAudioProcessor::GetFeedSize() {
|
||||
}
|
||||
|
||||
void AfeAudioProcessor::Feed(std::vector<int16_t>&& data) {
|
||||
if (afe_data_ == nullptr || !IsRunning()) {
|
||||
if (afe_data_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
// Check running state inside lock to avoid TOCTOU race with Stop()
|
||||
if (!IsRunning()) {
|
||||
return;
|
||||
}
|
||||
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
|
||||
size_t chunk_size = afe_iface_->get_feed_chunksize(afe_data_) * codec_->input_channels();
|
||||
while (input_buffer_.size() >= chunk_size) {
|
||||
@@ -108,11 +112,11 @@ void AfeAudioProcessor::Start() {
|
||||
|
||||
void AfeAudioProcessor::Stop() {
|
||||
xEventGroupClearBits(event_group_, PROCESSOR_RUNNING);
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
if (afe_data_ != nullptr) {
|
||||
afe_iface_->reset_buffer(afe_data_);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
input_buffer_.clear();
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
|
||||
#include "audio_processor.h"
|
||||
#include "audio_codec.h"
|
||||
@@ -27,7 +28,7 @@ private:
|
||||
int frame_samples_ = 0;
|
||||
std::function<void(std::vector<int16_t>&& data)> output_callback_;
|
||||
std::function<void(bool speaking)> vad_state_change_callback_;
|
||||
bool is_running_ = false;
|
||||
std::atomic<bool> is_running_ = false;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -99,20 +99,24 @@ void AfeWakeWord::Start() {
|
||||
|
||||
void AfeWakeWord::Stop() {
|
||||
xEventGroupClearBits(event_group_, DETECTION_RUNNING_EVENT);
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
if (afe_data_ != nullptr) {
|
||||
afe_iface_->reset_buffer(afe_data_);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
input_buffer_.clear();
|
||||
}
|
||||
|
||||
void AfeWakeWord::Feed(const std::vector<int16_t>& data) {
|
||||
if (afe_data_ == nullptr || !(xEventGroupGetBits(event_group_) & DETECTION_RUNNING_EVENT)) {
|
||||
if (afe_data_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
// Check running state inside lock to avoid TOCTOU race with Stop()
|
||||
if (!(xEventGroupGetBits(event_group_) & DETECTION_RUNNING_EVENT)) {
|
||||
return;
|
||||
}
|
||||
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
|
||||
size_t chunk_size = afe_iface_->get_feed_chunksize(afe_data_) * codec_->input_channels();
|
||||
while (input_buffer_.size() >= chunk_size) {
|
||||
|
||||
@@ -138,11 +138,19 @@ void CustomWakeWord::Start() {
|
||||
|
||||
void CustomWakeWord::Stop() {
|
||||
running_ = false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
input_buffer_.clear();
|
||||
}
|
||||
|
||||
void CustomWakeWord::Feed(const std::vector<int16_t>& data) {
|
||||
if (multinet_model_data_ == nullptr || !running_) {
|
||||
if (multinet_model_data_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
// Check running state inside lock to avoid TOCTOU race with Stop()
|
||||
if (!running_) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ private:
|
||||
std::string last_detected_wake_word_;
|
||||
std::atomic<bool> running_ = false;
|
||||
std::vector<int16_t> input_buffer_;
|
||||
std::mutex input_buffer_mutex_;
|
||||
|
||||
TaskHandle_t wake_word_encode_task_ = nullptr;
|
||||
StaticTask_t* wake_word_encode_task_buffer_ = nullptr;
|
||||
|
||||
@@ -54,11 +54,19 @@ void EspWakeWord::Start() {
|
||||
|
||||
void EspWakeWord::Stop() {
|
||||
running_ = false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
input_buffer_.clear();
|
||||
}
|
||||
|
||||
void EspWakeWord::Feed(const std::vector<int16_t>& data) {
|
||||
if (wakenet_data_ == nullptr || !running_) {
|
||||
if (wakenet_data_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(input_buffer_mutex_);
|
||||
// Check running state inside lock to avoid TOCTOU race with Stop()
|
||||
if (!running_) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
||||
#include "audio_codec.h"
|
||||
#include "wake_word.h"
|
||||
@@ -38,6 +39,7 @@ private:
|
||||
std::function<void(const std::string& wake_word)> wake_word_detected_callback_;
|
||||
std::string last_detected_wake_word_;
|
||||
std::vector<int16_t> input_buffer_;
|
||||
std::mutex input_buffer_mutex_;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user