| from fastapi import FastAPI, Request, Response
|
| from fastapi.responses import StreamingResponse
|
| import gradio as gr
|
| import uvicorn
|
| import base64
|
| import torch
|
| import os
|
| import tempfile
|
| import traceback
|
| import json
|
| import time
|
| import torchaudio
|
| import gc
|
| import sys
|
| import types
|
| import logging
|
| from threading import Thread, Lock
|
| from huggingface_hub import snapshot_download
|
|
|
|
|
| logging.getLogger("transformers").setLevel(logging.ERROR)
|
| logging.getLogger("TTS").setLevel(logging.ERROR)
|
| os.environ["CT2_VERBOSE"] = "0"
|
|
|
|
|
| if "torchaudio.backend" not in sys.modules:
|
| backend = types.ModuleType("torchaudio.backend")
|
| common = types.ModuleType("torchaudio.backend.common")
|
| try: common.AudioMetaData = torchaudio.AudioMetaData
|
| except AttributeError:
|
| class AudioMetaData: pass
|
| common.AudioMetaData = AudioMetaData
|
| backend.common = common
|
| sys.modules["torchaudio.backend"] = backend
|
| sys.modules["torchaudio.backend.common"] = common
|
|
|
| if not hasattr(torchaudio, "info"):
|
| def mock_info(filepath, **kwargs):
|
| from types import SimpleNamespace
|
| import wave
|
| try:
|
| with wave.open(filepath, "rb") as f:
|
| return SimpleNamespace(sample_rate=f.getframerate(), num_frames=f.getnframes(), num_channels=f.getnchannels(), bits_per_sample=f.getsampwidth() * 8, encoding="PCM_S")
|
| except: return SimpleNamespace(sample_rate=48000, num_frames=0, num_channels=1)
|
| torchaudio.info = mock_info
|
|
|
| try:
|
| _orig_load = torchaudio.load
|
| def patched_load(filepath, *args, **kwargs):
|
| try: return _orig_load(filepath, *args, **kwargs)
|
| except ImportError as e:
|
| if "torchcodec" in str(e).lower():
|
| import soundfile as sf
|
| data, samplerate = sf.read(filepath)
|
| t = torch.from_numpy(data).float()
|
| if len(t.shape) == 1: t = t.unsqueeze(0)
|
| else: t = t.T
|
| return t, samplerate
|
| raise e
|
| torchaudio.load = patched_load
|
| except Exception: pass
|
|
|
|
|
| import chatterbox_utils
|
| from faster_whisper import WhisperModel
|
| from TTS.api import TTS
|
| from df.enhance import init_df
|
| import deep_translator
|
|
|
| try:
|
| import spaces
|
| except ImportError:
|
| class spaces:
|
| @staticmethod
|
| def GPU(duration=60, f=None):
|
| if f is None: return lambda x: x
|
| return f
|
|
|
|
|
|
|
|
|
| os.environ["COQUI_TOS_AGREED"] = "1"
|
| MODELS = {"stt": None, "translate": None, "tts": None, "denoiser": None}
|
|
|
| WARMUP_STATUS = {"complete": False, "in_progress": False}
|
| WARMUP_LOCK = Lock()
|
|
|
| def activate_gpu_models(action):
|
| """v97: Stability-First Activation"""
|
| global MODELS, WARMUP_STATUS
|
| local_only = WARMUP_STATUS["complete"]
|
|
|
|
|
| if action in ["stt", "s2st"]:
|
| stt_on_gpu = False
|
| try: stt_on_gpu = MODELS["stt"] is not None and MODELS["stt"].model.device == "cuda"
|
| except: pass
|
| if not stt_on_gpu:
|
| print(f"ποΈ [v97] Activating Whisper on GPU (Stability Mode)...")
|
| try:
|
| if MODELS["stt"]: del MODELS["stt"]
|
| gc.collect(); torch.cuda.empty_cache()
|
|
|
| MODELS["stt"] = WhisperModel(
|
| "large-v3",
|
| device="cuda",
|
| compute_type="float16",
|
| local_files_only=local_only
|
| )
|
| except Exception as e:
|
| print(f"β οΈ Whisper GPU failed: {e}. Falling back to CPU.")
|
| MODELS["stt"] = WhisperModel("large-v3", device="cpu", compute_type="int8", local_files_only=True)
|
|
|
|
|
| if action in ["tts", "s2st"]:
|
| tts_on_gpu = False
|
| try:
|
| curr = str(next(MODELS["tts"].synthesizer.tts_model.parameters()).device)
|
| tts_on_gpu = "cuda" in curr
|
| except: pass
|
| if MODELS["tts"] is None or not tts_on_gpu:
|
| print(f"π [v97] Activating XTTS-v2 (GPU)...")
|
| try:
|
| if MODELS["tts"] is None:
|
| MODELS["tts"] = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=True)
|
| else: MODELS["tts"].to("cuda")
|
| except Exception as e:
|
| print(f"β οΈ XTTS GPU failed: {e}. Staying on CPU.")
|
| if MODELS["tts"] is None:
|
| MODELS["tts"] = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=False)
|
|
|
|
|
| chatterbox_utils.load_chatterbox(device="cuda" if torch.cuda.is_available() else "cpu")
|
|
|
|
|
| if MODELS["denoiser"] is None:
|
| try: MODELS["denoiser"] = init_df()
|
| except: pass
|
| if MODELS["translate"] is None: MODELS["translate"] = "active"
|
|
|
| def release_gpu_models():
|
| """v97: Clean Exit Handoff"""
|
| global MODELS
|
| print("π§Ή [v97] Releasing resources...")
|
| try:
|
| if MODELS["stt"] and MODELS["stt"].model.device == "cuda":
|
| del MODELS["stt"]
|
| MODELS["stt"] = WhisperModel("large-v3", device="cpu", compute_type="int8", local_files_only=True)
|
| if MODELS["tts"]:
|
| try: MODELS["tts"].to("cpu")
|
| except: pass
|
| chatterbox_utils.load_chatterbox(device="cpu")
|
| except: pass
|
| gc.collect()
|
| if torch.cuda.is_available(): torch.cuda.empty_cache()
|
|
|
| def warmup_task():
|
| """Silent Warmup (Resident RAM)"""
|
| global WARMUP_STATUS
|
| with WARMUP_LOCK:
|
| if WARMUP_STATUS["complete"] or WARMUP_STATUS["in_progress"]: return
|
| WARMUP_STATUS["in_progress"] = True
|
| print("\nπ₯ --- SILENT WARMUP STARTED (v97) ---")
|
| try:
|
| MODELS["stt"] = WhisperModel("large-v3", device="cpu", compute_type="int8")
|
| MODELS["tts"] = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=False)
|
| chatterbox_utils.warmup_chatterbox()
|
| WARMUP_STATUS["complete"] = True
|
| print(f"β
--- SYSTEM WARM --- \n")
|
| except: pass
|
| finally: WARMUP_STATUS["in_progress"] = False
|
|
|
| def _stt_logic(request_dict):
|
| audio_bytes = base64.b64decode(request_dict.get("file"))
|
| lang = request_dict.get("lang")
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
| f.write(audio_bytes); temp_path = f.name
|
| try:
|
|
|
| segments, _ = MODELS["stt"].transcribe(temp_path, language=lang, beam_size=1)
|
| return {"text": " ".join([s.text for s in segments]).strip()}
|
| finally:
|
| if os.path.exists(temp_path): os.unlink(temp_path)
|
|
|
| def _translate_logic(text, target_lang):
|
| return deep_translator.GoogleTranslator(source='auto', target=target_lang).translate(text)
|
|
|
| def _tts_logic(text, lang, speaker_wav_b64):
|
| if not text or not text.strip(): return {"error": "Input empty"}
|
| XTTS_MAP = {"en": "en", "de": "de", "fr": "fr", "es": "es", "it": "it", "pl": "pl", "pt": "pt", "tr": "tr", "ru": "ru", "nl": "nl", "cs": "cs", "ar": "ar", "hu": "hu", "ko": "ko", "hi": "hi", "zh": "zh-cn"}
|
| clean_lang = lang.strip().lower().split('-')[0]
|
| mapped_lang = XTTS_MAP.get(clean_lang) or ("zh-cn" if clean_lang == "zh" else None)
|
| if mapped_lang:
|
| speaker_wav_path = None
|
| if speaker_wav_b64:
|
| sb = base64.b64decode(speaker_wav_b64)
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
| f.write(sb); speaker_wav_path = f.name
|
| else: speaker_wav_path = "default_speaker.wav"
|
| try:
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file:
|
| output_path = output_file.name
|
| MODELS["tts"].tts_to_file(text=text, language=mapped_lang, file_path=output_path, speaker_wav=speaker_wav_path)
|
| with open(output_path, "rb") as f: return {"audio": base64.b64encode(f.read()).decode()}
|
| finally:
|
| if speaker_wav_path and "default_speaker" not in speaker_wav_path and os.path.exists(speaker_wav_path): os.unlink(speaker_wav_path)
|
| if 'output_path' in locals() and os.path.exists(output_path): os.unlink(output_path)
|
| try:
|
| temp_ref = None
|
| if speaker_wav_b64:
|
| sb = base64.b64decode(speaker_wav_b64)
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
| f.write(sb); temp_ref = f.name
|
| audio_bytes = chatterbox_utils.run_chatterbox_inference(text, clean_lang, speaker_wav_path=temp_ref)
|
| if temp_ref and os.path.exists(temp_ref): os.unlink(temp_ref)
|
| return {"audio": base64.b64encode(audio_bytes).decode()}
|
| except Exception as e: return {"error": f"TTS Failure: {str(e)}"}
|
|
|
| @spaces.GPU(duration=150)
|
| def core_process(request_dict):
|
| action = request_dict.get("action")
|
| t1 = time.time()
|
| print(f"--- [v97] π GPU SESSION: {action} ---")
|
| activate_gpu_models(action)
|
| try:
|
| if action == "stt": res = _stt_logic(request_dict)
|
| elif action == "translate": res = {"translated": _translate_logic(request_dict.get("text"), request_dict.get("target_lang", "en"))}
|
| elif action == "tts": res = _tts_logic(request_dict.get("text"), request_dict.get("lang"), request_dict.get("speaker_wav"))
|
| elif action == "s2st":
|
| stt_res = _stt_logic({"file": request_dict.get("file"), "lang": request_dict.get("source_lang")})
|
| translated = _translate_logic(stt_res.get("text", ""), request_dict.get("target_lang"))
|
| tts_res = _tts_logic(translated, request_dict.get("target_lang"), request_dict.get("speaker_wav"))
|
| res = {"text": stt_res.get("text"), "translated": translated, "audio": tts_res.get("audio")}
|
| elif action == "health": res = {"status": "awake"}
|
| else: res = {"error": f"Unknown action: {action}"}
|
| finally:
|
| print(f"--- [v97] β¨ END: {action} ({time.time()-t1:.2f}s) ---")
|
| release_gpu_models()
|
| return res
|
|
|
| app = FastAPI()
|
| @app.on_event("startup")
|
| async def startup_event():
|
| Thread(target=warmup_task, daemon=True).start()
|
|
|
| @app.post("/api/v1/process")
|
| async def api_process(request: Request):
|
| try: return core_process(await request.json())
|
| except Exception as e: return {"error": str(e)}
|
|
|
| @app.get("/health")
|
| def health(): return {"status": "ok", "warm": WARMUP_STATUS["complete"], "time": time.ctime()}
|
|
|
| @app.post("/api/v1/clear_cache")
|
| async def clear_cache():
|
| try:
|
| release_gpu_models()
|
| temp_dir = tempfile.gettempdir()
|
| for f in os.listdir(temp_dir):
|
| if f.endswith(".wav") or f.startswith("tm"):
|
| try: os.unlink(os.path.join(temp_dir, f))
|
| except: pass
|
| return {"status": "success"}
|
| except Exception as e: return {"status": "error", "message": str(e)}
|
|
|
| def gradio_fn(req_json):
|
| try: return json.dumps(core_process(json.loads(req_json)))
|
| except Exception as e: return json.dumps({"error": str(e)})
|
|
|
| demo = gr.Interface(fn=gradio_fn, inputs="text", outputs="text", title="π AI Engine")
|
| app = gr.mount_gradio_app(app, demo, path="/")
|
|
|
| if __name__ == "__main__":
|
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="error")
|
|
|