Spaces:
Runtime error
Runtime error
| # | |
| # SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import os | |
| import socket | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import threading | |
| import time | |
| from typing import Optional, Tuple, Dict, Any | |
| from config import ( | |
| ACCELERATOR_SOCKET_PATH, | |
| ACCELERATOR_BINARY_PATH, | |
| ACCELERATOR_WORKER_THREADS, | |
| ACCELERATOR_MEMORY_POOL_MB | |
| ) | |
| PROTOCOL_MAGIC_NUMBER = 0x50545453 | |
| COMMAND_PING = 0 | |
| COMMAND_PROCESS_AUDIO = 1 | |
| COMMAND_CONVERT_TO_MONO = 2 | |
| COMMAND_CONVERT_TO_PCM = 3 | |
| COMMAND_RESAMPLE_AUDIO = 4 | |
| COMMAND_GET_MEMORY_STATS = 5 | |
| COMMAND_CLEAR_MEMORY_POOL = 6 | |
| COMMAND_SHUTDOWN = 7 | |
| RESPONSE_SUCCESS = 0 | |
| RESPONSE_ERROR_INVALID_COMMAND = 1 | |
| RESPONSE_ERROR_FILE_NOT_FOUND = 2 | |
| RESPONSE_ERROR_PROCESSING_FAILED = 3 | |
| RESPONSE_ERROR_MEMORY_ALLOCATION = 4 | |
| RESPONSE_ERROR_INTERNAL = 5 | |
| REQUEST_HEADER_FORMAT = "=IIII" | |
| RESPONSE_HEADER_FORMAT = "=IIII" | |
| REQUEST_HEADER_SIZE = struct.calcsize(REQUEST_HEADER_FORMAT) | |
| RESPONSE_HEADER_SIZE = struct.calcsize(RESPONSE_HEADER_FORMAT) | |
| PROCESS_AUDIO_REQUEST_FORMAT = "=512s512sII" | |
| PROCESS_AUDIO_REQUEST_SIZE = struct.calcsize(PROCESS_AUDIO_REQUEST_FORMAT) | |
| MEMORY_STATS_RESPONSE_FORMAT = "=QQQ" | |
| MEMORY_STATS_RESPONSE_SIZE = struct.calcsize(MEMORY_STATS_RESPONSE_FORMAT) | |
| accelerator_process_handle = None | |
| accelerator_process_lock = threading.Lock() | |
| request_id_counter = 0 | |
| request_id_lock = threading.Lock() | |
| class AcceleratorClient: | |
| def __init__(self, socket_path: str = ACCELERATOR_SOCKET_PATH): | |
| self.socket_path = socket_path | |
| self.connection_timeout = 5.0 | |
| self.read_timeout = 30.0 | |
| def is_connected(self) -> bool: | |
| try: | |
| response = self.send_ping() | |
| return response is not None and response.startswith(b"PONG") | |
| except Exception: | |
| return False | |
| def send_ping(self) -> Optional[bytes]: | |
| return self._send_command(COMMAND_PING, b"") | |
| def process_audio( | |
| self, | |
| input_file_path: str, | |
| output_file_path: str, | |
| target_sample_rate: int = 0, | |
| options_flags: int = 0 | |
| ) -> Tuple[bool, str]: | |
| payload = self._pack_process_audio_request( | |
| input_file_path, | |
| output_file_path, | |
| target_sample_rate, | |
| options_flags | |
| ) | |
| response = self._send_command(COMMAND_PROCESS_AUDIO, payload) | |
| if response is None: | |
| return False, "Failed to communicate with accelerator" | |
| response_string = response.decode("utf-8", errors="ignore") | |
| if response_string.startswith("SUCCESS:"): | |
| return True, response_string[8:] | |
| elif response_string.startswith("ERROR:"): | |
| return False, response_string[6:] | |
| else: | |
| return False, response_string | |
| def convert_to_mono( | |
| self, | |
| input_file_path: str, | |
| output_file_path: str | |
| ) -> Tuple[bool, str]: | |
| payload = self._pack_process_audio_request( | |
| input_file_path, | |
| output_file_path, | |
| 0, | |
| 0 | |
| ) | |
| response = self._send_command(COMMAND_CONVERT_TO_MONO, payload) | |
| if response is None: | |
| return False, "Failed to communicate with accelerator" | |
| response_string = response.decode("utf-8", errors="ignore") | |
| if response_string.startswith("SUCCESS:"): | |
| return True, response_string[8:] | |
| elif response_string.startswith("ERROR:"): | |
| return False, response_string[6:] | |
| else: | |
| return False, response_string | |
| def convert_to_pcm( | |
| self, | |
| input_file_path: str, | |
| output_file_path: str | |
| ) -> Tuple[bool, str]: | |
| payload = self._pack_process_audio_request( | |
| input_file_path, | |
| output_file_path, | |
| 0, | |
| 0 | |
| ) | |
| response = self._send_command(COMMAND_CONVERT_TO_PCM, payload) | |
| if response is None: | |
| return False, "Failed to communicate with accelerator" | |
| response_string = response.decode("utf-8", errors="ignore") | |
| if response_string.startswith("SUCCESS:"): | |
| return True, response_string[8:] | |
| elif response_string.startswith("ERROR:"): | |
| return False, response_string[6:] | |
| else: | |
| return False, response_string | |
| def resample_audio( | |
| self, | |
| input_file_path: str, | |
| output_file_path: str, | |
| target_sample_rate: int | |
| ) -> Tuple[bool, str]: | |
| payload = self._pack_process_audio_request( | |
| input_file_path, | |
| output_file_path, | |
| target_sample_rate, | |
| 0 | |
| ) | |
| response = self._send_command(COMMAND_RESAMPLE_AUDIO, payload) | |
| if response is None: | |
| return False, "Failed to communicate with accelerator" | |
| response_string = response.decode("utf-8", errors="ignore") | |
| if response_string.startswith("SUCCESS:"): | |
| return True, response_string[8:] | |
| elif response_string.startswith("ERROR:"): | |
| return False, response_string[6:] | |
| else: | |
| return False, response_string | |
| def get_memory_stats(self) -> Optional[Dict[str, int]]: | |
| response = self._send_command(COMMAND_GET_MEMORY_STATS, b"") | |
| if response is None or len(response) < MEMORY_STATS_RESPONSE_SIZE: | |
| return None | |
| total_allocated, total_used, block_count = struct.unpack( | |
| MEMORY_STATS_RESPONSE_FORMAT, | |
| response[:MEMORY_STATS_RESPONSE_SIZE] | |
| ) | |
| return { | |
| "total_allocated_bytes": total_allocated, | |
| "total_used_bytes": total_used, | |
| "block_count": block_count | |
| } | |
| def clear_memory_pool(self) -> bool: | |
| response = self._send_command(COMMAND_CLEAR_MEMORY_POOL, b"") | |
| return response is not None | |
| def shutdown_accelerator(self) -> bool: | |
| response = self._send_command(COMMAND_SHUTDOWN, b"") | |
| return response is not None | |
| def _get_next_request_id(self) -> int: | |
| global request_id_counter | |
| with request_id_lock: | |
| request_id_counter += 1 | |
| return request_id_counter | |
| def _pack_process_audio_request( | |
| self, | |
| input_path: str, | |
| output_path: str, | |
| target_sample_rate: int, | |
| options_flags: int | |
| ) -> bytes: | |
| input_path_bytes = input_path.encode("utf-8")[:511] + b"\x00" | |
| output_path_bytes = output_path.encode("utf-8")[:511] + b"\x00" | |
| input_path_padded = input_path_bytes.ljust(512, b"\x00") | |
| output_path_padded = output_path_bytes.ljust(512, b"\x00") | |
| return struct.pack( | |
| PROCESS_AUDIO_REQUEST_FORMAT, | |
| input_path_padded, | |
| output_path_padded, | |
| target_sample_rate, | |
| options_flags | |
| ) | |
| def _send_command( | |
| self, | |
| command_type: int, | |
| payload: bytes | |
| ) -> Optional[bytes]: | |
| try: | |
| client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| client_socket.settimeout(self.connection_timeout) | |
| client_socket.connect(self.socket_path) | |
| request_id = self._get_next_request_id() | |
| request_header = struct.pack( | |
| REQUEST_HEADER_FORMAT, | |
| PROTOCOL_MAGIC_NUMBER, | |
| command_type, | |
| len(payload), | |
| request_id | |
| ) | |
| client_socket.sendall(request_header) | |
| if payload: | |
| client_socket.sendall(payload) | |
| client_socket.settimeout(self.read_timeout) | |
| response_header_data = self._receive_exactly(client_socket, RESPONSE_HEADER_SIZE) | |
| if response_header_data is None: | |
| client_socket.close() | |
| return None | |
| magic_number, status_code, payload_size, response_request_id = struct.unpack( | |
| RESPONSE_HEADER_FORMAT, | |
| response_header_data | |
| ) | |
| if magic_number != PROTOCOL_MAGIC_NUMBER: | |
| client_socket.close() | |
| return None | |
| if response_request_id != request_id: | |
| client_socket.close() | |
| return None | |
| response_payload = b"" | |
| if payload_size > 0: | |
| response_payload = self._receive_exactly(client_socket, payload_size) | |
| if response_payload is None: | |
| client_socket.close() | |
| return None | |
| client_socket.close() | |
| if status_code != RESPONSE_SUCCESS: | |
| return response_payload if response_payload else None | |
| return response_payload | |
| except socket.timeout: | |
| return None | |
| except socket.error: | |
| return None | |
| except Exception: | |
| return None | |
| def _receive_exactly( | |
| self, | |
| client_socket: socket.socket, | |
| num_bytes: int | |
| ) -> Optional[bytes]: | |
| received_data = b"" | |
| remaining_bytes = num_bytes | |
| while remaining_bytes > 0: | |
| try: | |
| chunk = client_socket.recv(remaining_bytes) | |
| if not chunk: | |
| return None | |
| received_data += chunk | |
| remaining_bytes -= len(chunk) | |
| except socket.timeout: | |
| return None | |
| except socket.error: | |
| return None | |
| return received_data | |
| def is_accelerator_available() -> bool: | |
| if not os.path.exists(ACCELERATOR_SOCKET_PATH): | |
| return False | |
| client = AcceleratorClient() | |
| return client.is_connected() | |
| def start_accelerator_daemon() -> bool: | |
| global accelerator_process_handle | |
| with accelerator_process_lock: | |
| if accelerator_process_handle is not None: | |
| if accelerator_process_handle.poll() is None: | |
| return True | |
| if not os.path.exists(ACCELERATOR_BINARY_PATH): | |
| return False | |
| try: | |
| accelerator_process_handle = subprocess.Popen( | |
| [ | |
| ACCELERATOR_BINARY_PATH, | |
| "--socket", ACCELERATOR_SOCKET_PATH, | |
| "--threads", str(ACCELERATOR_WORKER_THREADS), | |
| "--memory", str(ACCELERATOR_MEMORY_POOL_MB) | |
| ], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| start_new_session=True | |
| ) | |
| for attempt_index in range(50): | |
| time.sleep(0.1) | |
| if is_accelerator_available(): | |
| return True | |
| return is_accelerator_available() | |
| except Exception: | |
| return False | |
| def stop_accelerator_daemon() -> bool: | |
| global accelerator_process_handle | |
| with accelerator_process_lock: | |
| if is_accelerator_available(): | |
| try: | |
| client = AcceleratorClient() | |
| client.shutdown_accelerator() | |
| time.sleep(0.5) | |
| except Exception: | |
| pass | |
| if accelerator_process_handle is not None: | |
| if accelerator_process_handle.poll() is None: | |
| try: | |
| accelerator_process_handle.terminate() | |
| accelerator_process_handle.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| accelerator_process_handle.kill() | |
| accelerator_process_handle.wait() | |
| accelerator_process_handle = None | |
| return True | |
| def process_audio_with_accelerator( | |
| input_file_path: str, | |
| output_file_path: str | |
| ) -> Tuple[bool, str]: | |
| if not is_accelerator_available(): | |
| return False, "Accelerator not available" | |
| client = AcceleratorClient() | |
| return client.process_audio(input_file_path, output_file_path) | |
| def convert_to_mono_with_accelerator( | |
| input_file_path: str, | |
| output_file_path: str | |
| ) -> Tuple[bool, str]: | |
| if not is_accelerator_available(): | |
| return False, "Accelerator not available" | |
| client = AcceleratorClient() | |
| return client.convert_to_mono(input_file_path, output_file_path) | |
| def convert_to_pcm_with_accelerator( | |
| input_file_path: str, | |
| output_file_path: str | |
| ) -> Tuple[bool, str]: | |
| if not is_accelerator_available(): | |
| return False, "Accelerator not available" | |
| client = AcceleratorClient() | |
| return client.convert_to_pcm(input_file_path, output_file_path) | |
| def get_accelerator_memory_stats() -> Optional[Dict[str, int]]: | |
| if not is_accelerator_available(): | |
| return None | |
| client = AcceleratorClient() | |
| return client.get_memory_stats() |