arifather51's picture
Upload 28 files
a57f260 verified
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import os
import socket
import struct
import subprocess
import tempfile
import threading
import time
from typing import Optional, Tuple, Dict, Any
from config import (
ACCELERATOR_SOCKET_PATH,
ACCELERATOR_BINARY_PATH,
ACCELERATOR_WORKER_THREADS,
ACCELERATOR_MEMORY_POOL_MB
)
PROTOCOL_MAGIC_NUMBER = 0x50545453
COMMAND_PING = 0
COMMAND_PROCESS_AUDIO = 1
COMMAND_CONVERT_TO_MONO = 2
COMMAND_CONVERT_TO_PCM = 3
COMMAND_RESAMPLE_AUDIO = 4
COMMAND_GET_MEMORY_STATS = 5
COMMAND_CLEAR_MEMORY_POOL = 6
COMMAND_SHUTDOWN = 7
RESPONSE_SUCCESS = 0
RESPONSE_ERROR_INVALID_COMMAND = 1
RESPONSE_ERROR_FILE_NOT_FOUND = 2
RESPONSE_ERROR_PROCESSING_FAILED = 3
RESPONSE_ERROR_MEMORY_ALLOCATION = 4
RESPONSE_ERROR_INTERNAL = 5
REQUEST_HEADER_FORMAT = "=IIII"
RESPONSE_HEADER_FORMAT = "=IIII"
REQUEST_HEADER_SIZE = struct.calcsize(REQUEST_HEADER_FORMAT)
RESPONSE_HEADER_SIZE = struct.calcsize(RESPONSE_HEADER_FORMAT)
PROCESS_AUDIO_REQUEST_FORMAT = "=512s512sII"
PROCESS_AUDIO_REQUEST_SIZE = struct.calcsize(PROCESS_AUDIO_REQUEST_FORMAT)
MEMORY_STATS_RESPONSE_FORMAT = "=QQQ"
MEMORY_STATS_RESPONSE_SIZE = struct.calcsize(MEMORY_STATS_RESPONSE_FORMAT)
accelerator_process_handle = None
accelerator_process_lock = threading.Lock()
request_id_counter = 0
request_id_lock = threading.Lock()
class AcceleratorClient:
def __init__(self, socket_path: str = ACCELERATOR_SOCKET_PATH):
self.socket_path = socket_path
self.connection_timeout = 5.0
self.read_timeout = 30.0
def is_connected(self) -> bool:
try:
response = self.send_ping()
return response is not None and response.startswith(b"PONG")
except Exception:
return False
def send_ping(self) -> Optional[bytes]:
return self._send_command(COMMAND_PING, b"")
def process_audio(
self,
input_file_path: str,
output_file_path: str,
target_sample_rate: int = 0,
options_flags: int = 0
) -> Tuple[bool, str]:
payload = self._pack_process_audio_request(
input_file_path,
output_file_path,
target_sample_rate,
options_flags
)
response = self._send_command(COMMAND_PROCESS_AUDIO, payload)
if response is None:
return False, "Failed to communicate with accelerator"
response_string = response.decode("utf-8", errors="ignore")
if response_string.startswith("SUCCESS:"):
return True, response_string[8:]
elif response_string.startswith("ERROR:"):
return False, response_string[6:]
else:
return False, response_string
def convert_to_mono(
self,
input_file_path: str,
output_file_path: str
) -> Tuple[bool, str]:
payload = self._pack_process_audio_request(
input_file_path,
output_file_path,
0,
0
)
response = self._send_command(COMMAND_CONVERT_TO_MONO, payload)
if response is None:
return False, "Failed to communicate with accelerator"
response_string = response.decode("utf-8", errors="ignore")
if response_string.startswith("SUCCESS:"):
return True, response_string[8:]
elif response_string.startswith("ERROR:"):
return False, response_string[6:]
else:
return False, response_string
def convert_to_pcm(
self,
input_file_path: str,
output_file_path: str
) -> Tuple[bool, str]:
payload = self._pack_process_audio_request(
input_file_path,
output_file_path,
0,
0
)
response = self._send_command(COMMAND_CONVERT_TO_PCM, payload)
if response is None:
return False, "Failed to communicate with accelerator"
response_string = response.decode("utf-8", errors="ignore")
if response_string.startswith("SUCCESS:"):
return True, response_string[8:]
elif response_string.startswith("ERROR:"):
return False, response_string[6:]
else:
return False, response_string
def resample_audio(
self,
input_file_path: str,
output_file_path: str,
target_sample_rate: int
) -> Tuple[bool, str]:
payload = self._pack_process_audio_request(
input_file_path,
output_file_path,
target_sample_rate,
0
)
response = self._send_command(COMMAND_RESAMPLE_AUDIO, payload)
if response is None:
return False, "Failed to communicate with accelerator"
response_string = response.decode("utf-8", errors="ignore")
if response_string.startswith("SUCCESS:"):
return True, response_string[8:]
elif response_string.startswith("ERROR:"):
return False, response_string[6:]
else:
return False, response_string
def get_memory_stats(self) -> Optional[Dict[str, int]]:
response = self._send_command(COMMAND_GET_MEMORY_STATS, b"")
if response is None or len(response) < MEMORY_STATS_RESPONSE_SIZE:
return None
total_allocated, total_used, block_count = struct.unpack(
MEMORY_STATS_RESPONSE_FORMAT,
response[:MEMORY_STATS_RESPONSE_SIZE]
)
return {
"total_allocated_bytes": total_allocated,
"total_used_bytes": total_used,
"block_count": block_count
}
def clear_memory_pool(self) -> bool:
response = self._send_command(COMMAND_CLEAR_MEMORY_POOL, b"")
return response is not None
def shutdown_accelerator(self) -> bool:
response = self._send_command(COMMAND_SHUTDOWN, b"")
return response is not None
def _get_next_request_id(self) -> int:
global request_id_counter
with request_id_lock:
request_id_counter += 1
return request_id_counter
def _pack_process_audio_request(
self,
input_path: str,
output_path: str,
target_sample_rate: int,
options_flags: int
) -> bytes:
input_path_bytes = input_path.encode("utf-8")[:511] + b"\x00"
output_path_bytes = output_path.encode("utf-8")[:511] + b"\x00"
input_path_padded = input_path_bytes.ljust(512, b"\x00")
output_path_padded = output_path_bytes.ljust(512, b"\x00")
return struct.pack(
PROCESS_AUDIO_REQUEST_FORMAT,
input_path_padded,
output_path_padded,
target_sample_rate,
options_flags
)
def _send_command(
self,
command_type: int,
payload: bytes
) -> Optional[bytes]:
try:
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_socket.settimeout(self.connection_timeout)
client_socket.connect(self.socket_path)
request_id = self._get_next_request_id()
request_header = struct.pack(
REQUEST_HEADER_FORMAT,
PROTOCOL_MAGIC_NUMBER,
command_type,
len(payload),
request_id
)
client_socket.sendall(request_header)
if payload:
client_socket.sendall(payload)
client_socket.settimeout(self.read_timeout)
response_header_data = self._receive_exactly(client_socket, RESPONSE_HEADER_SIZE)
if response_header_data is None:
client_socket.close()
return None
magic_number, status_code, payload_size, response_request_id = struct.unpack(
RESPONSE_HEADER_FORMAT,
response_header_data
)
if magic_number != PROTOCOL_MAGIC_NUMBER:
client_socket.close()
return None
if response_request_id != request_id:
client_socket.close()
return None
response_payload = b""
if payload_size > 0:
response_payload = self._receive_exactly(client_socket, payload_size)
if response_payload is None:
client_socket.close()
return None
client_socket.close()
if status_code != RESPONSE_SUCCESS:
return response_payload if response_payload else None
return response_payload
except socket.timeout:
return None
except socket.error:
return None
except Exception:
return None
def _receive_exactly(
self,
client_socket: socket.socket,
num_bytes: int
) -> Optional[bytes]:
received_data = b""
remaining_bytes = num_bytes
while remaining_bytes > 0:
try:
chunk = client_socket.recv(remaining_bytes)
if not chunk:
return None
received_data += chunk
remaining_bytes -= len(chunk)
except socket.timeout:
return None
except socket.error:
return None
return received_data
def is_accelerator_available() -> bool:
if not os.path.exists(ACCELERATOR_SOCKET_PATH):
return False
client = AcceleratorClient()
return client.is_connected()
def start_accelerator_daemon() -> bool:
global accelerator_process_handle
with accelerator_process_lock:
if accelerator_process_handle is not None:
if accelerator_process_handle.poll() is None:
return True
if not os.path.exists(ACCELERATOR_BINARY_PATH):
return False
try:
accelerator_process_handle = subprocess.Popen(
[
ACCELERATOR_BINARY_PATH,
"--socket", ACCELERATOR_SOCKET_PATH,
"--threads", str(ACCELERATOR_WORKER_THREADS),
"--memory", str(ACCELERATOR_MEMORY_POOL_MB)
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True
)
for attempt_index in range(50):
time.sleep(0.1)
if is_accelerator_available():
return True
return is_accelerator_available()
except Exception:
return False
def stop_accelerator_daemon() -> bool:
global accelerator_process_handle
with accelerator_process_lock:
if is_accelerator_available():
try:
client = AcceleratorClient()
client.shutdown_accelerator()
time.sleep(0.5)
except Exception:
pass
if accelerator_process_handle is not None:
if accelerator_process_handle.poll() is None:
try:
accelerator_process_handle.terminate()
accelerator_process_handle.wait(timeout=5)
except subprocess.TimeoutExpired:
accelerator_process_handle.kill()
accelerator_process_handle.wait()
accelerator_process_handle = None
return True
def process_audio_with_accelerator(
input_file_path: str,
output_file_path: str
) -> Tuple[bool, str]:
if not is_accelerator_available():
return False, "Accelerator not available"
client = AcceleratorClient()
return client.process_audio(input_file_path, output_file_path)
def convert_to_mono_with_accelerator(
input_file_path: str,
output_file_path: str
) -> Tuple[bool, str]:
if not is_accelerator_available():
return False, "Accelerator not available"
client = AcceleratorClient()
return client.convert_to_mono(input_file_path, output_file_path)
def convert_to_pcm_with_accelerator(
input_file_path: str,
output_file_path: str
) -> Tuple[bool, str]:
if not is_accelerator_available():
return False, "Accelerator not available"
client = AcceleratorClient()
return client.convert_to_pcm(input_file_path, output_file_path)
def get_accelerator_memory_stats() -> Optional[Dict[str, int]]:
if not is_accelerator_available():
return None
client = AcceleratorClient()
return client.get_memory_stats()