// // SPDX-FileCopyrightText: Hadad // SPDX-License-Identifier: Apache-2.0 // #include "accelerator_core.hpp" #include #include #include #include #include #include #include namespace pocket_tts_accelerator { static AcceleratorCore* global_accelerator_instance = nullptr; static volatile sig_atomic_t last_received_signal = 0; static void signal_handler_function(int signal_number) { last_received_signal = signal_number; if (global_accelerator_instance != nullptr) { global_accelerator_instance->shutdown(); } } AcceleratorCore::AcceleratorCore(const AcceleratorConfiguration& configuration) : config(configuration) , is_initialized(false) , should_shutdown(false) { } AcceleratorCore::~AcceleratorCore() { shutdown(); } bool AcceleratorCore::initialize() { if (is_initialized.load()) { return true; } log_message("Initializing Pocket TTS Accelerator..."); memory_pool = std::make_unique(config.memory_pool_size_bytes); log_message("Memory pool initialized with " + std::to_string(config.memory_pool_size_bytes / (1024 * 1024)) + " MB"); thread_pool = std::make_unique(config.number_of_worker_threads); log_message("Thread pool initialized with " + std::to_string(config.number_of_worker_threads) + " worker threads"); audio_processor = std::make_unique(*memory_pool); log_message("Audio processor initialized"); ipc_handler = std::make_unique(config.ipc_socket_path); log_message("IPC handler created for socket: " + config.ipc_socket_path); register_all_command_handlers(); ipc_handler->set_shutdown_callback([this]() { this->shutdown(); }); if (!ipc_handler->start_server()) { log_message("ERROR: Failed to start IPC server"); return false; } log_message("IPC server started successfully"); global_accelerator_instance = this; setup_signal_handlers(); is_initialized.store(true); log_message("Pocket TTS Accelerator initialized successfully"); return true; } void AcceleratorCore::run() { if (!is_initialized.load()) { log_message("ERROR: Accelerator not initialized"); return; } log_message("Accelerator running and waiting for commands..."); while (!should_shutdown.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (last_received_signal != 0) { log_message("Received signal: " + std::to_string(last_received_signal)); last_received_signal = 0; } } log_message("Accelerator main loop exited"); } void AcceleratorCore::shutdown() { if (should_shutdown.exchange(true)) { return; } log_message("Shutting down Pocket TTS Accelerator..."); if (ipc_handler) { ipc_handler->stop_server(); log_message("IPC server stopped"); } if (thread_pool) { thread_pool->shutdown(); log_message("Thread pool shut down"); } if (memory_pool) { memory_pool->reset_pool(); log_message("Memory pool reset"); } is_initialized.store(false); log_message("Pocket TTS Accelerator shut down complete"); } bool AcceleratorCore::is_running() const { return is_initialized.load() && !should_shutdown.load(); } std::string AcceleratorCore::get_status_string() const { if (!is_initialized.load()) { return "Not initialized"; } if (should_shutdown.load()) { return "Shutting down"; } return "Running"; } AcceleratorConfiguration AcceleratorCore::get_default_configuration() { AcceleratorConfiguration default_config; default_config.number_of_worker_threads = 2; default_config.memory_pool_size_bytes = 64 * 1024 * 1024; default_config.ipc_socket_path = "/tmp/pocket_tts_accelerator.sock"; default_config.enable_verbose_logging = true; return default_config; } void AcceleratorCore::register_all_command_handlers() { ipc_handler->register_command_handler( CommandType::PING, [this](const std::vector& payload) { return this->handle_ping_command(payload); } ); ipc_handler->register_command_handler( CommandType::PROCESS_AUDIO, [this](const std::vector& payload) { return this->handle_process_audio_command(payload); } ); ipc_handler->register_command_handler( CommandType::CONVERT_TO_MONO, [this](const std::vector& payload) { return this->handle_convert_to_mono_command(payload); } ); ipc_handler->register_command_handler( CommandType::CONVERT_TO_PCM, [this](const std::vector& payload) { return this->handle_convert_to_pcm_command(payload); } ); ipc_handler->register_command_handler( CommandType::RESAMPLE_AUDIO, [this](const std::vector& payload) { return this->handle_resample_audio_command(payload); } ); ipc_handler->register_command_handler( CommandType::GET_MEMORY_STATS, [this](const std::vector& payload) { return this->handle_get_memory_stats_command(payload); } ); ipc_handler->register_command_handler( CommandType::CLEAR_MEMORY_POOL, [this](const std::vector& payload) { return this->handle_clear_memory_pool_command(payload); } ); log_message("All command handlers registered"); } void AcceleratorCore::setup_signal_handlers() { signal(SIGINT, signal_handler_function); signal(SIGTERM, signal_handler_function); } std::vector AcceleratorCore::handle_ping_command(const std::vector& payload) { std::string payload_content; if (!payload.empty()) { payload_content = std::string(payload.begin(), payload.end()); log_message("Received PING command with payload: " + payload_content); } else { log_message("Received PING command"); } std::string response_message = "PONG"; if (!payload_content.empty()) { response_message += ":" + payload_content; } return std::vector(response_message.begin(), response_message.end()); } std::vector AcceleratorCore::handle_process_audio_command(const std::vector& payload) { log_message("Received PROCESS_AUDIO command with payload size: " + std::to_string(payload.size()) + " bytes"); if (payload.size() < sizeof(ProcessAudioRequest)) { std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes"; return std::vector(error_message.begin(), error_message.end()); } ProcessAudioRequest request; std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest)); std::string input_path(request.input_file_path); std::string output_path(request.output_file_path); log_message("Processing audio from: " + input_path + " to: " + output_path); auto future_result = thread_pool->submit_task([this, input_path, output_path]() { return this->audio_processor->process_audio_for_voice_cloning(input_path, output_path); }); AudioProcessingResult result = future_result.get(); if (result.success) { log_message("Audio processing completed successfully"); std::string success_message = "SUCCESS:" + output_path; return std::vector(success_message.begin(), success_message.end()); } else { log_message("Audio processing failed: " + result.error_message); std::string error_message = "ERROR:" + result.error_message; return std::vector(error_message.begin(), error_message.end()); } } std::vector AcceleratorCore::handle_convert_to_mono_command(const std::vector& payload) { log_message("Received CONVERT_TO_MONO command with payload size: " + std::to_string(payload.size()) + " bytes"); if (payload.size() < sizeof(ProcessAudioRequest)) { std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes"; return std::vector(error_message.begin(), error_message.end()); } ProcessAudioRequest request; std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest)); std::string input_path(request.input_file_path); std::string output_path(request.output_file_path); log_message("Converting to mono from: " + input_path + " to: " + output_path); AudioData audio_data = audio_processor->read_wav_file(input_path); if (!audio_data.is_valid) { std::string error_message = "ERROR:" + audio_data.error_message; return std::vector(error_message.begin(), error_message.end()); } AudioProcessingResult result = audio_processor->convert_to_mono(audio_data); if (!result.success) { std::string error_message = "ERROR:" + result.error_message; return std::vector(error_message.begin(), error_message.end()); } AudioData output_audio; output_audio.samples = std::move(result.processed_samples); output_audio.sample_rate = result.output_sample_rate; output_audio.number_of_channels = 1; output_audio.bits_per_sample = 16; output_audio.is_valid = true; if (!audio_processor->write_wav_file(output_path, output_audio)) { std::string error_message = "ERROR:Failed to write output file"; return std::vector(error_message.begin(), error_message.end()); } log_message("Mono conversion completed successfully"); std::string success_message = "SUCCESS:" + output_path; return std::vector(success_message.begin(), success_message.end()); } std::vector AcceleratorCore::handle_convert_to_pcm_command(const std::vector& payload) { log_message("Received CONVERT_TO_PCM command with payload size: " + std::to_string(payload.size()) + " bytes"); if (payload.size() < sizeof(ProcessAudioRequest)) { std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes"; return std::vector(error_message.begin(), error_message.end()); } ProcessAudioRequest request; std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest)); std::string input_path(request.input_file_path); std::string output_path(request.output_file_path); log_message("Converting to PCM from: " + input_path + " to: " + output_path); AudioData audio_data = audio_processor->read_wav_file(input_path); if (!audio_data.is_valid) { std::string error_message = "ERROR:" + audio_data.error_message; return std::vector(error_message.begin(), error_message.end()); } AudioData mono_audio; if (audio_data.number_of_channels > 1) { log_message("Input has " + std::to_string(audio_data.number_of_channels) + " channels, converting to mono"); AudioProcessingResult mono_result = audio_processor->convert_to_mono(audio_data); if (!mono_result.success) { std::string error_message = "ERROR:" + mono_result.error_message; return std::vector(error_message.begin(), error_message.end()); } mono_audio.samples = std::move(mono_result.processed_samples); mono_audio.sample_rate = mono_result.output_sample_rate; } else { mono_audio.samples = std::move(audio_data.samples); mono_audio.sample_rate = audio_data.sample_rate; } mono_audio.number_of_channels = 1; mono_audio.bits_per_sample = 16; mono_audio.is_valid = true; if (!audio_processor->write_wav_file(output_path, mono_audio)) { std::string error_message = "ERROR:Failed to write output file"; return std::vector(error_message.begin(), error_message.end()); } log_message("PCM conversion completed successfully"); std::string success_message = "SUCCESS:" + output_path; return std::vector(success_message.begin(), success_message.end()); } std::vector AcceleratorCore::handle_resample_audio_command(const std::vector& payload) { log_message("Received RESAMPLE_AUDIO command with payload size: " + std::to_string(payload.size()) + " bytes"); if (payload.size() < sizeof(ProcessAudioRequest)) { std::string error_message = "ERROR:Invalid payload size, expected " + std::to_string(sizeof(ProcessAudioRequest)) + " bytes"; return std::vector(error_message.begin(), error_message.end()); } ProcessAudioRequest request; std::memcpy(&request, payload.data(), sizeof(ProcessAudioRequest)); std::string input_path(request.input_file_path); std::string output_path(request.output_file_path); std::uint32_t target_sample_rate = request.target_sample_rate; log_message("Resampling audio from: " + input_path + " to: " + output_path + " at " + std::to_string(target_sample_rate) + " Hz"); AudioData audio_data = audio_processor->read_wav_file(input_path); if (!audio_data.is_valid) { std::string error_message = "ERROR:" + audio_data.error_message; return std::vector(error_message.begin(), error_message.end()); } AudioProcessingResult result = audio_processor->resample_audio(audio_data, target_sample_rate); if (!result.success) { std::string error_message = "ERROR:" + result.error_message; return std::vector(error_message.begin(), error_message.end()); } AudioData output_audio; output_audio.samples = std::move(result.processed_samples); output_audio.sample_rate = result.output_sample_rate; output_audio.number_of_channels = audio_data.number_of_channels; output_audio.bits_per_sample = 16; output_audio.is_valid = true; if (!audio_processor->write_wav_file(output_path, output_audio)) { std::string error_message = "ERROR:Failed to write output file"; return std::vector(error_message.begin(), error_message.end()); } log_message("Resampling completed successfully"); std::string success_message = "SUCCESS:" + output_path; return std::vector(success_message.begin(), success_message.end()); } std::vector AcceleratorCore::handle_get_memory_stats_command(const std::vector& payload) { log_message("Received GET_MEMORY_STATS command with payload size: " + std::to_string(payload.size()) + " bytes"); MemoryStatsResponse stats; stats.total_allocated_bytes = memory_pool->get_total_allocated_bytes(); stats.total_used_bytes = memory_pool->get_total_used_bytes(); stats.block_count = memory_pool->get_block_count(); log_message("Memory stats - Allocated: " + std::to_string(stats.total_allocated_bytes) + " bytes, Used: " + std::to_string(stats.total_used_bytes) + " bytes, Blocks: " + std::to_string(stats.block_count)); std::vector response(sizeof(MemoryStatsResponse)); std::memcpy(response.data(), &stats, sizeof(MemoryStatsResponse)); return response; } std::vector AcceleratorCore::handle_clear_memory_pool_command(const std::vector& payload) { log_message("Received CLEAR_MEMORY_POOL command with payload size: " + std::to_string(payload.size()) + " bytes"); std::size_t blocks_before = memory_pool->get_block_count(); std::size_t allocated_before = memory_pool->get_total_allocated_bytes(); memory_pool->clear_unused_blocks(); std::size_t blocks_after = memory_pool->get_block_count(); std::size_t allocated_after = memory_pool->get_total_allocated_bytes(); std::size_t blocks_freed = blocks_before - blocks_after; std::size_t bytes_freed = allocated_before - allocated_after; log_message("Memory pool cleared - Freed " + std::to_string(blocks_freed) + " blocks (" + std::to_string(bytes_freed) + " bytes)"); std::string success_message = "SUCCESS:Freed " + std::to_string(blocks_freed) + " blocks (" + std::to_string(bytes_freed) + " bytes)"; return std::vector(success_message.begin(), success_message.end()); } std::vector AcceleratorCore::handle_shutdown_command(const std::vector& payload) { std::string shutdown_reason; if (!payload.empty()) { shutdown_reason = std::string(payload.begin(), payload.end()); log_message("Received SHUTDOWN command with reason: " + shutdown_reason); } else { log_message("Received SHUTDOWN command"); } shutdown(); std::string success_message = "SUCCESS:Shutting down"; if (!shutdown_reason.empty()) { success_message += " (reason: " + shutdown_reason + ")"; } return std::vector(success_message.begin(), success_message.end()); } void AcceleratorCore::log_message(const std::string& message) const { if (config.enable_verbose_logging) { auto now = std::chrono::system_clock::now(); std::time_t time_t_now = std::chrono::system_clock::to_time_t(now); auto milliseconds = std::chrono::duration_cast( now.time_since_epoch() ) % 1000; std::tm time_info; localtime_r(&time_t_now, &time_info); std::ostringstream timestamp_stream; timestamp_stream << std::put_time(&time_info, "%Y-%m-%d %H:%M:%S"); timestamp_stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); std::cout << "[" << timestamp_stream.str() << "] [ACCELERATOR] " << message << std::endl; } } }