aaaaaaaaaaaaaaa / accelerator /src /accelerator_core.cpp
arifather51's picture
Upload 28 files
a57f260 verified
//
// SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
// SPDX-License-Identifier: Apache-2.0
//
#include "accelerator_core.hpp"
#include <chrono>
#include <cstring>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <signal.h>
namespace pocket_tts_accelerator {
static AcceleratorCore* global_accelerator_instance = nullptr;
static volatile sig_atomic_t last_received_signal = 0;
static void signal_handler_function(int signal_number) {
last_received_signal = signal_number;
if (global_accelerator_instance != nullptr) {
global_accelerator_instance->shutdown();
}
}
AcceleratorCore::AcceleratorCore(const AcceleratorConfiguration& configuration)
: config(configuration)
, is_initialized(false)
, should_shutdown(false) {
}
AcceleratorCore::~AcceleratorCore() {
shutdown();
}
bool AcceleratorCore::initialize() {
if (is_initialized.load()) {
return true;
}
log_message("Initializing Pocket TTS Accelerator...");
memory_pool = std::make_unique<MemoryPool>(config.memory_pool_size_bytes);
log_message("Memory pool initialized with " + std::to_string(config.memory_pool_size_bytes / (1024 * 1024)) + " MB");
thread_pool = std::make_unique<ThreadPool>(config.number_of_worker_threads);
log_message("Thread pool initialized with " + std::to_string(config.number_of_worker_threads) + " worker threads");
audio_processor = std::make_unique<AudioProcessor>(*memory_pool);
log_message("Audio processor initialized");
ipc_handler = std::make_unique<IpcHandler>(config.ipc_socket_path);
log_message("IPC handler created for socket: " + config.ipc_socket_path);
register_all_command_handlers();
ipc_handler->set_shutdown_callback([this]() {
this->shutdown();
});
if (!ipc_handler->start_server()) {
log_message("ERROR: Failed to start IPC server");
return false;
}
log_message("IPC server started successfully");
global_accelerator_instance = this;
setup_signal_handlers();
is_initialized.store(true);
log_message("Pocket TTS Accelerator initialized successfully");
return true;
}
void AcceleratorCore::run() {
if (!is_initialized.load()) {
log_message("ERROR: Accelerator not initialized");
return;
}
log_message("Accelerator running and waiting for commands...");
while (!should_shutdown.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (last_received_signal != 0) {
log_message("Received signal: " + std::to_string(last_received_signal));
last_received_signal = 0;
}
}
log_message("Accelerator main loop exited");
}
void AcceleratorCore::shutdown() {
if (should_shutdown.exchange(true)) {
return;
}
log_message("Shutting down Pocket TTS Accelerator...");
if (ipc_handler) {
ipc_handler->stop_server();
log_message("IPC server stopped");
}
if (thread_pool) {
thread_pool->shutdown();
log_message("Thread pool shut down");
}
if (memory_pool) {
memory_pool->reset_pool();
log_message("Memory pool reset");
}
is_initialized.store(false);
log_message("Pocket TTS Accelerator shut down complete");
}
bool AcceleratorCore::is_running() const {
return is_initialized.load() && !should_shutdown.load();
}
std::string AcceleratorCore::get_status_string() const {
if (!is_initialized.load()) {
return "Not initialized";
}
if (should_shutdown.load()) {
return "Shutting down";
}
return "Running";
}
AcceleratorConfiguration AcceleratorCore::get_default_configuration() {
AcceleratorConfiguration default_config;
default_config.number_of_worker_threads = 2;
default_config.memory_pool_size_bytes = 64 * 1024 * 1024;
default_config.ipc_socket_path = "/tmp/pocket_tts_accelerator.sock";
default_config.enable_verbose_logging = true;
return default_config;
}
void AcceleratorCore::register_all_command_handlers() {
ipc_handler->register_command_handler(
CommandType::PING,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_ping_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::PROCESS_AUDIO,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_process_audio_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::CONVERT_TO_MONO,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_convert_to_mono_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::CONVERT_TO_PCM,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_convert_to_pcm_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::RESAMPLE_AUDIO,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_resample_audio_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::GET_MEMORY_STATS,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_get_memory_stats_command(payload);
}
);
ipc_handler->register_command_handler(
CommandType::CLEAR_MEMORY_POOL,
[this](const std::vector<std::uint8_t>& payload) {
return this->handle_clear_memory_pool_command(payload);
}
);
log_message("All command handlers registered");
}
void AcceleratorCore::setup_signal_handlers() {
signal(SIGINT, signal_handler_function);
signal(SIGTERM, signal_handler_function);
}
std::vector<std::uint8_t> AcceleratorCore::handle_ping_command(const std::vector<std::uint8_t>& payload) {
std::string payload_content;
if (!payload.empty()) {
payload_content = std::string(payload.begin(), payload.end());
log_message("Received PING command with payload: " + payload_content);
} else {
log_message("Received PING command");
}
std::string response_message = "PONG";
if (!payload_content.empty()) {
response_message += ":" + payload_content;
}
return std::vector<std::uint8_t>(response_message.begin(), response_message.end());
}
std::vector<std::uint8_t> AcceleratorCore::handle_process_audio_command(const std::vector<std::uint8_t>& payload) {
log_message("Received PROCESS_AUDIO command with payload size: " + std::to_string(payload.size()) + " bytes");
if (payload.size() < sizeof(ProcessAudioRequest)) {
std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
ProcessAudioRequest request;
std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest));
std::string input_path(request.input_file_path);
std::string output_path(request.output_file_path);
log_message("Processing audio from: " + input_path + " to: " + output_path);
auto future_result = thread_pool->submit_task([this, input_path, output_path]() {
return this->audio_processor->process_audio_for_voice_cloning(input_path, output_path);
});
AudioProcessingResult result = future_result.get();
if (result.success) {
log_message("Audio processing completed successfully");
std::string success_message = "SUCCESS:" + output_path;
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
} else {
log_message("Audio processing failed: " + result.error_message);
std::string error_message = "ERROR:" + result.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
}
std::vector<std::uint8_t> AcceleratorCore::handle_convert_to_mono_command(const std::vector<std::uint8_t>& payload) {
log_message("Received CONVERT_TO_MONO command with payload size: " + std::to_string(payload.size()) + " bytes");
if (payload.size() < sizeof(ProcessAudioRequest)) {
std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
ProcessAudioRequest request;
std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest));
std::string input_path(request.input_file_path);
std::string output_path(request.output_file_path);
log_message("Converting to mono from: " + input_path + " to: " + output_path);
AudioData audio_data = audio_processor->read_wav_file(input_path);
if (!audio_data.is_valid) {
std::string error_message = "ERROR:" + audio_data.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
AudioProcessingResult result = audio_processor->convert_to_mono(audio_data);
if (!result.success) {
std::string error_message = "ERROR:" + result.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
AudioData output_audio;
output_audio.samples = std::move(result.processed_samples);
output_audio.sample_rate = result.output_sample_rate;
output_audio.number_of_channels = 1;
output_audio.bits_per_sample = 16;
output_audio.is_valid = true;
if (!audio_processor->write_wav_file(output_path, output_audio)) {
std::string error_message = "ERROR:Failed to write output file";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
log_message("Mono conversion completed successfully");
std::string success_message = "SUCCESS:" + output_path;
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
}
std::vector<std::uint8_t> AcceleratorCore::handle_convert_to_pcm_command(const std::vector<std::uint8_t>& payload) {
log_message("Received CONVERT_TO_PCM command with payload size: " + std::to_string(payload.size()) + " bytes");
if (payload.size() < sizeof(ProcessAudioRequest)) {
std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
ProcessAudioRequest request;
std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest));
std::string input_path(request.input_file_path);
std::string output_path(request.output_file_path);
log_message("Converting to PCM from: " + input_path + " to: " + output_path);
AudioData audio_data = audio_processor->read_wav_file(input_path);
if (!audio_data.is_valid) {
std::string error_message = "ERROR:" + audio_data.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
AudioData mono_audio;
if (audio_data.number_of_channels > 1) {
log_message("Input has " + std::to_string(audio_data.number_of_channels) + " channels, converting to mono");
AudioProcessingResult mono_result = audio_processor->convert_to_mono(audio_data);
if (!mono_result.success) {
std::string error_message = "ERROR:" + mono_result.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
mono_audio.samples = std::move(mono_result.processed_samples);
mono_audio.sample_rate = mono_result.output_sample_rate;
} else {
mono_audio.samples = std::move(audio_data.samples);
mono_audio.sample_rate = audio_data.sample_rate;
}
mono_audio.number_of_channels = 1;
mono_audio.bits_per_sample = 16;
mono_audio.is_valid = true;
if (!audio_processor->write_wav_file(output_path, mono_audio)) {
std::string error_message = "ERROR:Failed to write output file";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
log_message("PCM conversion completed successfully");
std::string success_message = "SUCCESS:" + output_path;
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
}
std::vector<std::uint8_t> AcceleratorCore::handle_resample_audio_command(const std::vector<std::uint8_t>& payload) {
log_message("Received RESAMPLE_AUDIO command with payload size: " + std::to_string(payload.size()) + " bytes");
if (payload.size() < sizeof(ProcessAudioRequest)) {
std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
ProcessAudioRequest request;
std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest));
std::string input_path(request.input_file_path);
std::string output_path(request.output_file_path);
std::uint32_t target_sample_rate = request.target_sample_rate;
log_message("Resampling audio from: " + input_path + " to: " + output_path + " at " + std::to_string(target_sample_rate) + " Hz");
AudioData audio_data = audio_processor->read_wav_file(input_path);
if (!audio_data.is_valid) {
std::string error_message = "ERROR:" + audio_data.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
AudioProcessingResult result = audio_processor->resample_audio(audio_data, target_sample_rate);
if (!result.success) {
std::string error_message = "ERROR:" + result.error_message;
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
AudioData output_audio;
output_audio.samples = std::move(result.processed_samples);
output_audio.sample_rate = result.output_sample_rate;
output_audio.number_of_channels = audio_data.number_of_channels;
output_audio.bits_per_sample = 16;
output_audio.is_valid = true;
if (!audio_processor->write_wav_file(output_path, output_audio)) {
std::string error_message = "ERROR:Failed to write output file";
return std::vector<std::uint8_t>(error_message.begin(), error_message.end());
}
log_message("Resampling completed successfully");
std::string success_message = "SUCCESS:" + output_path;
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
}
std::vector<std::uint8_t> AcceleratorCore::handle_get_memory_stats_command(const std::vector<std::uint8_t>& payload) {
log_message("Received GET_MEMORY_STATS command with payload size: " + std::to_string(payload.size()) + " bytes");
MemoryStatsResponse stats;
stats.total_allocated_bytes = memory_pool->get_total_allocated_bytes();
stats.total_used_bytes = memory_pool->get_total_used_bytes();
stats.block_count = memory_pool->get_block_count();
log_message("Memory stats - Allocated: " + std::to_string(stats.total_allocated_bytes) +
" bytes, Used: " + std::to_string(stats.total_used_bytes) +
" bytes, Blocks: " + std::to_string(stats.block_count));
std::vector<std::uint8_t> response(sizeof(MemoryStatsResponse));
std::memcpy(response.data(), &stats, sizeof(MemoryStatsResponse));
return response;
}
std::vector<std::uint8_t> AcceleratorCore::handle_clear_memory_pool_command(const std::vector<std::uint8_t>& payload) {
log_message("Received CLEAR_MEMORY_POOL command with payload size: " + std::to_string(payload.size()) + " bytes");
std::size_t blocks_before = memory_pool->get_block_count();
std::size_t allocated_before = memory_pool->get_total_allocated_bytes();
memory_pool->clear_unused_blocks();
std::size_t blocks_after = memory_pool->get_block_count();
std::size_t allocated_after = memory_pool->get_total_allocated_bytes();
std::size_t blocks_freed = blocks_before - blocks_after;
std::size_t bytes_freed = allocated_before - allocated_after;
log_message("Memory pool cleared - Freed " + std::to_string(blocks_freed) +
" blocks (" + std::to_string(bytes_freed) + " bytes)");
std::string success_message = "SUCCESS:Freed " + std::to_string(blocks_freed) +
" blocks (" + std::to_string(bytes_freed) + " bytes)";
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
}
std::vector<std::uint8_t> AcceleratorCore::handle_shutdown_command(const std::vector<std::uint8_t>& payload) {
std::string shutdown_reason;
if (!payload.empty()) {
shutdown_reason = std::string(payload.begin(), payload.end());
log_message("Received SHUTDOWN command with reason: " + shutdown_reason);
} else {
log_message("Received SHUTDOWN command");
}
shutdown();
std::string success_message = "SUCCESS:Shutting down";
if (!shutdown_reason.empty()) {
success_message += " (reason: " + shutdown_reason + ")";
}
return std::vector<std::uint8_t>(success_message.begin(), success_message.end());
}
void AcceleratorCore::log_message(const std::string& message) const {
if (config.enable_verbose_logging) {
auto now = std::chrono::system_clock::now();
std::time_t time_t_now = std::chrono::system_clock::to_time_t(now);
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()
) % 1000;
std::tm time_info;
localtime_r(&time_t_now, &time_info);
std::ostringstream timestamp_stream;
timestamp_stream << std::put_time(&time_info, "%Y-%m-%d %H:%M:%S");
timestamp_stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count();
std::cout << "[" << timestamp_stream.str() << "] [ACCELERATOR] " << message << std::endl;
}
}
}