|
|
import os |
|
|
import requests |
|
|
import logging |
|
|
import shutil |
|
|
import subprocess |
|
|
from urllib.parse import urlparse |
|
|
from typing import List, Dict, Any, Optional |
|
|
from AgentF5TTSChunk import AgentF5TTS |
|
|
|
|
|
logger = logging.getLogger("services.tts") |
|
|
|
|
|
def get_audio_duration(file_path: str) -> float: |
|
|
"""Get duration of audio file using ffprobe.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffprobe', '-v', 'error', '-show_entries', 'format=duration', |
|
|
'-of', 'default=noprint_wrappers=1:nokey=1', file_path |
|
|
] |
|
|
output = subprocess.check_output(cmd).decode().strip() |
|
|
return float(output) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get duration for {file_path}: {e}") |
|
|
return 0.0 |
|
|
|
|
|
class TTSService: |
|
|
def __init__(self, config: Dict[str, Any]): |
|
|
self.config = config['tts'] |
|
|
self.voices_dir = self.config['voices_dir'] |
|
|
self.output_dir = self.config['output_dir'] |
|
|
|
|
|
|
|
|
os.makedirs(self.voices_dir, exist_ok=True) |
|
|
os.makedirs(self.output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
logger.info("Loading F5-TTS Model...") |
|
|
try: |
|
|
self.agent = AgentF5TTS( |
|
|
ckpt_file=self.config['checkpoint_file'], |
|
|
device=self.config.get('device', 'cuda') |
|
|
) |
|
|
logger.info("F5-TTS Model Loaded successfully.") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load F5-TTS Model: {e}") |
|
|
raise e |
|
|
|
|
|
def _get_extension_from_url(self, url: str) -> str: |
|
|
parsed = urlparse(url) |
|
|
path = parsed.path |
|
|
ext = os.path.splitext(path)[1] |
|
|
if not ext: |
|
|
return ".wav" |
|
|
return ext |
|
|
|
|
|
def _download_file(self, url: str, path: str): |
|
|
response = requests.get(url, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
with open(path, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
def prepare_voices(self, character_voices: List[Dict[str, str]]) -> Dict[str, Dict[str, str]]: |
|
|
""" |
|
|
Ensure all reference voices are available locally. |
|
|
Returns a map of character_name -> {'path': local_file_path, 'text': ref_text} |
|
|
""" |
|
|
voice_map = {} |
|
|
|
|
|
for cv in character_voices: |
|
|
char_name = cv.get('character') |
|
|
voice_id = cv.get('id') |
|
|
url = cv.get('timbre_url') |
|
|
text = cv.get('timbre_text', "") |
|
|
|
|
|
if not voice_id: |
|
|
continue |
|
|
|
|
|
|
|
|
ext = ".wav" |
|
|
if url: |
|
|
ext = self._get_extension_from_url(url) |
|
|
|
|
|
filename = f"{voice_id}{ext}" |
|
|
local_path = os.path.join(self.voices_dir, filename) |
|
|
|
|
|
|
|
|
if not os.path.exists(local_path): |
|
|
if url: |
|
|
try: |
|
|
logger.info(f"Downloading voice {voice_id}") |
|
|
self._download_file(url, local_path) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to download voice {voice_id}: {e}") |
|
|
continue |
|
|
else: |
|
|
logger.warning(f"Voice {voice_id} missing locally and no URL.") |
|
|
continue |
|
|
|
|
|
if os.path.exists(local_path): |
|
|
voice_data = {'path': local_path, 'text': text} |
|
|
if char_name: |
|
|
voice_map[char_name] = voice_data |
|
|
voice_map[str(voice_id)] = voice_data |
|
|
|
|
|
return voice_map |
|
|
|
|
|
def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a TTS generation task. |
|
|
Returns dictionary containing list of generated audio segments with metadata. |
|
|
""" |
|
|
task_id = task['task_id'] |
|
|
data = task['data'] |
|
|
|
|
|
character_voices = data.get('character_voice', []) |
|
|
content = data.get('content', []) |
|
|
|
|
|
if not content: |
|
|
raise ValueError("No content provided.") |
|
|
|
|
|
|
|
|
voice_map = self.prepare_voices(character_voices) |
|
|
|
|
|
|
|
|
task_out_dir = os.path.join(self.output_dir, task_id) |
|
|
os.makedirs(task_out_dir, exist_ok=True) |
|
|
|
|
|
segments_metadata = [] |
|
|
|
|
|
|
|
|
logger.info(f"Starting inference for {len(content)} segments") |
|
|
|
|
|
for idx, segment in enumerate(content): |
|
|
char_name = segment.get('character') |
|
|
text = segment.get('translation') |
|
|
start_time = segment.get('start', 0.0) |
|
|
end_time = segment.get('end', 0.0) |
|
|
|
|
|
if not text: |
|
|
continue |
|
|
|
|
|
|
|
|
original_duration = max(0.0, end_time - start_time) |
|
|
|
|
|
voice_data = voice_map.get(char_name) |
|
|
|
|
|
if not voice_data: |
|
|
logger.warning(f"Segment {idx}: No voice for '{char_name}'. Skipping.") |
|
|
continue |
|
|
|
|
|
ref_audio_path = voice_data['path'] |
|
|
ref_audio_text = voice_data['text'] |
|
|
|
|
|
out_filename = f"{idx:04d}.wav" |
|
|
out_path = os.path.join(task_out_dir, out_filename) |
|
|
|
|
|
try: |
|
|
self.agent.infer( |
|
|
ref_file=ref_audio_path, |
|
|
ref_text=ref_audio_text, |
|
|
gen_text=text, |
|
|
file_wave=out_path, |
|
|
remove_silence=self.config.get('remove_silence', True), |
|
|
speed=self.config.get('speed', 1.0) |
|
|
) |
|
|
|
|
|
if os.path.exists(out_path): |
|
|
gen_duration = get_audio_duration(out_path) |
|
|
|
|
|
segments_metadata.append({ |
|
|
'index': idx, |
|
|
'path': out_path, |
|
|
'start_time': start_time, |
|
|
'end_time': end_time, |
|
|
'original_duration': original_duration, |
|
|
'gen_duration': gen_duration |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Inference failed for segment {idx}: {e}") |
|
|
|
|
|
if not segments_metadata: |
|
|
raise Exception("No audio generated.") |
|
|
|
|
|
return { |
|
|
'task_id': task_id, |
|
|
'segments': segments_metadata, |
|
|
'task_dir': task_out_dir, |
|
|
'hook_url': data.get('hook_url'), |
|
|
'video_url': data.get('video_url'), |
|
|
'priority': task.get('priority', 3) |
|
|
} |
|
|
|