# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import os import socket import struct import subprocess import tempfile import threading import time from typing import Optional, Tuple, Dict, Any from config import ( ACCELERATOR_SOCKET_PATH, ACCELERATOR_BINARY_PATH, ACCELERATOR_WORKER_THREADS, ACCELERATOR_MEMORY_POOL_MB ) PROTOCOL_MAGIC_NUMBER = 0x50545453 COMMAND_PING = 0 COMMAND_PROCESS_AUDIO = 1 COMMAND_CONVERT_TO_MONO = 2 COMMAND_CONVERT_TO_PCM = 3 COMMAND_RESAMPLE_AUDIO = 4 COMMAND_GET_MEMORY_STATS = 5 COMMAND_CLEAR_MEMORY_POOL = 6 COMMAND_SHUTDOWN = 7 RESPONSE_SUCCESS = 0 RESPONSE_ERROR_INVALID_COMMAND = 1 RESPONSE_ERROR_FILE_NOT_FOUND = 2 RESPONSE_ERROR_PROCESSING_FAILED = 3 RESPONSE_ERROR_MEMORY_ALLOCATION = 4 RESPONSE_ERROR_INTERNAL = 5 REQUEST_HEADER_FORMAT = "=IIII" RESPONSE_HEADER_FORMAT = "=IIII" REQUEST_HEADER_SIZE = struct.calcsize(REQUEST_HEADER_FORMAT) RESPONSE_HEADER_SIZE = struct.calcsize(RESPONSE_HEADER_FORMAT) PROCESS_AUDIO_REQUEST_FORMAT = "=512s512sII" PROCESS_AUDIO_REQUEST_SIZE = struct.calcsize(PROCESS_AUDIO_REQUEST_FORMAT) MEMORY_STATS_RESPONSE_FORMAT = "=QQQ" MEMORY_STATS_RESPONSE_SIZE = struct.calcsize(MEMORY_STATS_RESPONSE_FORMAT) accelerator_process_handle = None accelerator_process_lock = threading.Lock() request_id_counter = 0 request_id_lock = threading.Lock() class AcceleratorClient: def __init__(self, socket_path: str = ACCELERATOR_SOCKET_PATH): self.socket_path = socket_path self.connection_timeout = 5.0 self.read_timeout = 30.0 def is_connected(self) -> bool: try: response = self.send_ping() return response is not None and response.startswith(b"PONG") except Exception: return False def send_ping(self) -> Optional[bytes]: return self._send_command(COMMAND_PING, b"") def process_audio( self, input_file_path: str, output_file_path: str, target_sample_rate: int = 0, options_flags: int = 0 ) -> Tuple[bool, str]: payload = self._pack_process_audio_request( input_file_path, output_file_path, target_sample_rate, options_flags ) response = self._send_command(COMMAND_PROCESS_AUDIO, payload) if response is None: return False, "Failed to communicate with accelerator" response_string = response.decode("utf-8", errors="ignore") if response_string.startswith("SUCCESS:"): return True, response_string[8:] elif response_string.startswith("ERROR:"): return False, response_string[6:] else: return False, response_string def convert_to_mono( self, input_file_path: str, output_file_path: str ) -> Tuple[bool, str]: payload = self._pack_process_audio_request( input_file_path, output_file_path, 0, 0 ) response = self._send_command(COMMAND_CONVERT_TO_MONO, payload) if response is None: return False, "Failed to communicate with accelerator" response_string = response.decode("utf-8", errors="ignore") if response_string.startswith("SUCCESS:"): return True, response_string[8:] elif response_string.startswith("ERROR:"): return False, response_string[6:] else: return False, response_string def convert_to_pcm( self, input_file_path: str, output_file_path: str ) -> Tuple[bool, str]: payload = self._pack_process_audio_request( input_file_path, output_file_path, 0, 0 ) response = self._send_command(COMMAND_CONVERT_TO_PCM, payload) if response is None: return False, "Failed to communicate with accelerator" response_string = response.decode("utf-8", errors="ignore") if response_string.startswith("SUCCESS:"): return True, response_string[8:] elif response_string.startswith("ERROR:"): return False, response_string[6:] else: return False, response_string def resample_audio( self, input_file_path: str, output_file_path: str, target_sample_rate: int ) -> Tuple[bool, str]: payload = self._pack_process_audio_request( input_file_path, output_file_path, target_sample_rate, 0 ) response = self._send_command(COMMAND_RESAMPLE_AUDIO, payload) if response is None: return False, "Failed to communicate with accelerator" response_string = response.decode("utf-8", errors="ignore") if response_string.startswith("SUCCESS:"): return True, response_string[8:] elif response_string.startswith("ERROR:"): return False, response_string[6:] else: return False, response_string def get_memory_stats(self) -> Optional[Dict[str, int]]: response = self._send_command(COMMAND_GET_MEMORY_STATS, b"") if response is None or len(response) < MEMORY_STATS_RESPONSE_SIZE: return None total_allocated, total_used, block_count = struct.unpack( MEMORY_STATS_RESPONSE_FORMAT, response[:MEMORY_STATS_RESPONSE_SIZE] ) return { "total_allocated_bytes": total_allocated, "total_used_bytes": total_used, "block_count": block_count } def clear_memory_pool(self) -> bool: response = self._send_command(COMMAND_CLEAR_MEMORY_POOL, b"") return response is not None def shutdown_accelerator(self) -> bool: response = self._send_command(COMMAND_SHUTDOWN, b"") return response is not None def _get_next_request_id(self) -> int: global request_id_counter with request_id_lock: request_id_counter += 1 return request_id_counter def _pack_process_audio_request( self, input_path: str, output_path: str, target_sample_rate: int, options_flags: int ) -> bytes: input_path_bytes = input_path.encode("utf-8")[:511] + b"\x00" output_path_bytes = output_path.encode("utf-8")[:511] + b"\x00" input_path_padded = input_path_bytes.ljust(512, b"\x00") output_path_padded = output_path_bytes.ljust(512, b"\x00") return struct.pack( PROCESS_AUDIO_REQUEST_FORMAT, input_path_padded, output_path_padded, target_sample_rate, options_flags ) def _send_command( self, command_type: int, payload: bytes ) -> Optional[bytes]: try: client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) client_socket.settimeout(self.connection_timeout) client_socket.connect(self.socket_path) request_id = self._get_next_request_id() request_header = struct.pack( REQUEST_HEADER_FORMAT, PROTOCOL_MAGIC_NUMBER, command_type, len(payload), request_id ) client_socket.sendall(request_header) if payload: client_socket.sendall(payload) client_socket.settimeout(self.read_timeout) response_header_data = self._receive_exactly(client_socket, RESPONSE_HEADER_SIZE) if response_header_data is None: client_socket.close() return None magic_number, status_code, payload_size, response_request_id = struct.unpack( RESPONSE_HEADER_FORMAT, response_header_data ) if magic_number != PROTOCOL_MAGIC_NUMBER: client_socket.close() return None if response_request_id != request_id: client_socket.close() return None response_payload = b"" if payload_size > 0: response_payload = self._receive_exactly(client_socket, payload_size) if response_payload is None: client_socket.close() return None client_socket.close() if status_code != RESPONSE_SUCCESS: return response_payload if response_payload else None return response_payload except socket.timeout: return None except socket.error: return None except Exception: return None def _receive_exactly( self, client_socket: socket.socket, num_bytes: int ) -> Optional[bytes]: received_data = b"" remaining_bytes = num_bytes while remaining_bytes > 0: try: chunk = client_socket.recv(remaining_bytes) if not chunk: return None received_data += chunk remaining_bytes -= len(chunk) except socket.timeout: return None except socket.error: return None return received_data def is_accelerator_available() -> bool: if not os.path.exists(ACCELERATOR_SOCKET_PATH): return False client = AcceleratorClient() return client.is_connected() def start_accelerator_daemon() -> bool: global accelerator_process_handle with accelerator_process_lock: if accelerator_process_handle is not None: if accelerator_process_handle.poll() is None: return True if not os.path.exists(ACCELERATOR_BINARY_PATH): return False try: accelerator_process_handle = subprocess.Popen( [ ACCELERATOR_BINARY_PATH, "--socket", ACCELERATOR_SOCKET_PATH, "--threads", str(ACCELERATOR_WORKER_THREADS), "--memory", str(ACCELERATOR_MEMORY_POOL_MB) ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True ) for attempt_index in range(50): time.sleep(0.1) if is_accelerator_available(): return True return is_accelerator_available() except Exception: return False def stop_accelerator_daemon() -> bool: global accelerator_process_handle with accelerator_process_lock: if is_accelerator_available(): try: client = AcceleratorClient() client.shutdown_accelerator() time.sleep(0.5) except Exception: pass if accelerator_process_handle is not None: if accelerator_process_handle.poll() is None: try: accelerator_process_handle.terminate() accelerator_process_handle.wait(timeout=5) except subprocess.TimeoutExpired: accelerator_process_handle.kill() accelerator_process_handle.wait() accelerator_process_handle = None return True def process_audio_with_accelerator( input_file_path: str, output_file_path: str ) -> Tuple[bool, str]: if not is_accelerator_available(): return False, "Accelerator not available" client = AcceleratorClient() return client.process_audio(input_file_path, output_file_path) def convert_to_mono_with_accelerator( input_file_path: str, output_file_path: str ) -> Tuple[bool, str]: if not is_accelerator_available(): return False, "Accelerator not available" client = AcceleratorClient() return client.convert_to_mono(input_file_path, output_file_path) def convert_to_pcm_with_accelerator( input_file_path: str, output_file_path: str ) -> Tuple[bool, str]: if not is_accelerator_available(): return False, "Accelerator not available" client = AcceleratorClient() return client.convert_to_pcm(input_file_path, output_file_path) def get_accelerator_memory_stats() -> Optional[Dict[str, int]]: if not is_accelerator_available(): return None client = AcceleratorClient() return client.get_memory_stats()