Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import tempfile | |
| import uuid | |
| from typing import Optional, List | |
| import requests | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| app = FastAPI(title="Piper + FFmpeg (Single App)") | |
| # Cache voice models locally (HF storage is ephemeral; cache reduces repeat downloads) | |
| MODEL_DIR = os.getenv("MODEL_DIR", "/tmp/piper_models") | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| # ----------------------------- | |
| # English-only voice catalog | |
| # ----------------------------- | |
| VOICE_CATALOG = { | |
| "en_US-amy": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/amy/medium/en_US-amy-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/amy/medium/en_US-amy-medium.onnx.json", | |
| }, | |
| "en_US-ryan": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/ryan/medium/en_US-ryan-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/ryan/medium/en_US-ryan-medium.onnx.json", | |
| }, | |
| "en_GB-alan": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/alan/medium/en_GB-alan-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/alan/medium/en_GB-alan-medium.onnx.json", | |
| }, | |
| "en_GB-sarah": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/sarah/medium/en_GB-sarah-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/sarah/medium/en_GB-sarah-medium.onnx.json", | |
| }, | |
| "en_AU-nat": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_AU/nat/medium/en_AU-nat-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_AU/nat/medium/en_AU-nat-medium.onnx.json", | |
| }, | |
| "en_US-joe": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/joe/medium/en_US-joe-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/joe/medium/en_US-joe-medium.onnx.json", | |
| }, | |
| "en_US-kathleen": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/kathleen/medium/en_US-kathleen-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/kathleen/medium/en_US-kathleen-medium.onnx.json", | |
| }, | |
| "en_US-danny": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/danny/medium/en_US-danny-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/danny/medium/en_US-danny-medium.onnx.json", | |
| }, | |
| "en_GB-jenny": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/jenny/medium/en_GB-jenny-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/jenny/medium/en_GB-jenny-medium.onnx.json", | |
| }, | |
| "en_US-lessac": { | |
| "onnx": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx", | |
| "json": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx.json", | |
| }, | |
| } | |
| def _download(url: str, path: str, timeout: int = 180): | |
| r = requests.get(url, timeout=timeout) | |
| r.raise_for_status() | |
| with open(path, "wb") as f: | |
| f.write(r.content) | |
| def ensure_voice_model(voice_id: str) -> str: | |
| if voice_id not in VOICE_CATALOG: | |
| raise HTTPException(404, f"Unknown voiceId: {voice_id}") | |
| onnx_path = os.path.join(MODEL_DIR, f"{voice_id}.onnx") | |
| json_path = os.path.join(MODEL_DIR, f"{voice_id}.onnx.json") | |
| if not os.path.exists(onnx_path): | |
| _download(VOICE_CATALOG[voice_id]["onnx"], onnx_path) | |
| if "json" in VOICE_CATALOG[voice_id] and not os.path.exists(json_path): | |
| try: | |
| _download(VOICE_CATALOG[voice_id]["json"], json_path) | |
| except Exception: | |
| pass | |
| return onnx_path | |
| def _cleanup_files(paths: List[str]): | |
| for p in paths: | |
| try: | |
| if p and os.path.exists(p): | |
| os.remove(p) | |
| except Exception: | |
| pass | |
| # ----------------------------- | |
| # Routes | |
| # ----------------------------- | |
| def root(): | |
| return { | |
| "ok": True, | |
| "service": "piper_ffmpeg", | |
| "endpoints": ["/docs", "/health", "/voices", "/tts", "/render"] | |
| } | |
| def health(): | |
| def has(cmd): | |
| return subprocess.run(["bash", "-lc", f"command -v {cmd} >/dev/null 2>&1"]).returncode == 0 | |
| return {"ok": True, "ffmpeg": has("ffmpeg"), "piper": has("piper")} | |
| def voices(): | |
| return {"voices": sorted(list(VOICE_CATALOG.keys()))} | |
| # ----------------------------- | |
| # TTS (Piper -> mp3/wav -> returns file) | |
| # ----------------------------- | |
| class TTSReq(BaseModel): | |
| jobId: str | |
| voiceId: str | |
| text: str | |
| format: str = "mp3" # mp3|wav | |
| length_scale: float = 0.95 | |
| noise_scale: float = 0.75 | |
| noise_w: float = 0.80 | |
| fx: bool = True | |
| def tts(req: TTSReq, background_tasks: BackgroundTasks): | |
| if not req.text.strip(): | |
| raise HTTPException(400, "text required") | |
| model_path = ensure_voice_model(req.voiceId) | |
| fmt = req.format.lower().strip() | |
| # IMPORTANT: output must NOT live inside a TemporaryDirectory that auto-deletes. | |
| # We create stable files under /tmp and delete them AFTER response is sent. | |
| uid = uuid.uuid4().hex | |
| wav_path = f"/tmp/tts_{req.jobId}_{uid}.wav" | |
| mp3_path = f"/tmp/tts_{req.jobId}_{uid}.mp3" | |
| # 1) Piper outputs WAV | |
| p = subprocess.run( | |
| ["piper", "--model", model_path, "--output_file", wav_path], | |
| input=req.text, | |
| text=True, | |
| capture_output=True | |
| ) | |
| if p.returncode != 0: | |
| _cleanup_files([wav_path, mp3_path]) | |
| raise HTTPException(500, f"piper failed: {p.stderr[-1500:]}") | |
| # 2) Optionally convert | |
| if fmt == "wav": | |
| out_path = wav_path | |
| media_type = "audio/wav" | |
| filename = f"{req.jobId}_{req.voiceId}.wav" | |
| cleanup = [wav_path] # delete after send | |
| elif fmt == "mp3": | |
| c = subprocess.run( | |
| ["ffmpeg", "-y", "-i", wav_path, "-codec:a", "libmp3lame", "-q:a", "3", mp3_path], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if c.returncode != 0 or not os.path.exists(mp3_path): | |
| _cleanup_files([wav_path, mp3_path]) | |
| raise HTTPException(500, f"ffmpeg mp3 convert failed: {c.stderr[-1500:]}") | |
| out_path = mp3_path | |
| media_type = "audio/mpeg" | |
| filename = f"{req.jobId}_{req.voiceId}.mp3" | |
| cleanup = [wav_path, mp3_path] # delete after send | |
| else: | |
| _cleanup_files([wav_path, mp3_path]) | |
| raise HTTPException(400, "format must be mp3 or wav") | |
| background_tasks.add_task(_cleanup_files, cleanup) | |
| return FileResponse(path=out_path, media_type=media_type, filename=filename) | |
| # ----------------------------- | |
| # Render (FFmpeg “wonders” -> returns mp4) | |
| # ----------------------------- | |
| class RenderReq(BaseModel): | |
| jobId: str | |
| preset: str = "shorts_hypercut" # shorts_hypercut|story_cinematic|facts_clean | |
| clipUrls: List[str] | |
| voiceUrl: Optional[str] = None | |
| srt: Optional[str] = None # optional captions | |
| def render(req: RenderReq, background_tasks: BackgroundTasks): | |
| if not req.clipUrls: | |
| raise HTTPException(400, "clipUrls required") | |
| uid = uuid.uuid4().hex | |
| work_dir = f"/tmp/render_{req.jobId}_{uid}" | |
| os.makedirs(work_dir, exist_ok=True) | |
| clips_dir = os.path.join(work_dir, "clips") | |
| os.makedirs(clips_dir, exist_ok=True) | |
| concat_txt = os.path.join(work_dir, "concat.txt") | |
| merged = os.path.join(work_dir, "merged.mp4") | |
| final_mp4 = os.path.join(work_dir, f"final_{req.preset}.mp4") | |
| try: | |
| # Download clips | |
| with open(concat_txt, "w") as f: | |
| for i, url in enumerate(req.clipUrls): | |
| p = os.path.join(clips_dir, f"clip_{i}.mp4") | |
| r = requests.get(url, timeout=120) | |
| r.raise_for_status() | |
| with open(p, "wb") as w: | |
| w.write(r.content) | |
| f.write(f"file '{p}'\n") | |
| # concat with fallback | |
| r1 = subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_txt, "-c", "copy", merged], | |
| capture_output=True, text=True | |
| ) | |
| if r1.returncode != 0: | |
| r2 = subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_txt, | |
| "-c:v", "libx264", "-crf", "19", "-preset", "veryfast", | |
| "-c:a", "aac", "-b:a", "128k", merged], | |
| capture_output=True, text=True | |
| ) | |
| if r2.returncode != 0: | |
| raise HTTPException(500, f"concat failed: {r2.stderr[-1500:]}") | |
| # Caption file (optional) | |
| srt_path = None | |
| if req.srt: | |
| srt_path = os.path.join(work_dir, "captions.srt") | |
| with open(srt_path, "w", encoding="utf-8") as w: | |
| w.write(req.srt) | |
| # Filter stack | |
| vf = "scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1,eq=contrast=1.08:saturation=1.12,unsharp=5:5:0.7:5:5:0.0,fps=30" | |
| if req.preset == "story_cinematic": | |
| vf += ",vignette=PI/6,eq=contrast=1.10:saturation=1.05" | |
| elif req.preset == "facts_clean": | |
| vf += ",eq=contrast=1.06:saturation=1.06" | |
| if srt_path: | |
| vf += f",subtitles={srt_path}:force_style='FontName=Arial,FontSize=20,Outline=3,Shadow=1,Alignment=2'" | |
| if req.voiceUrl: | |
| voice_path = os.path.join(work_dir, "voice.mp3") | |
| vr = requests.get(req.voiceUrl, timeout=120) | |
| vr.raise_for_status() | |
| with open(voice_path, "wb") as w: | |
| w.write(vr.content) | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", merged, "-i", voice_path, | |
| "-filter_complex", f"[0:v]{vf}[v];[1:a]loudnorm=I=-16:TP=-1.5:LRA=11[a]", | |
| "-map", "[v]", "-map", "[a]", | |
| "-shortest", "-movflags", "+faststart", "-pix_fmt", "yuv420p", | |
| final_mp4 | |
| ] | |
| else: | |
| cmd = ["ffmpeg", "-y", "-i", merged, "-vf", vf, "-movflags", "+faststart", "-pix_fmt", "yuv420p", final_mp4] | |
| rr = subprocess.run(cmd, capture_output=True, text=True) | |
| if rr.returncode != 0 or not os.path.exists(final_mp4): | |
| raise HTTPException(500, f"render failed: {rr.stderr[-1500:]}") | |
| # cleanup AFTER response is sent | |
| background_tasks.add_task(lambda: subprocess.run(["bash", "-lc", f"rm -rf {work_dir} >/dev/null 2>&1"])) | |
| return FileResponse(path=final_mp4, media_type="video/mp4", filename=f"{req.jobId}_{req.preset}.mp4") | |
| except HTTPException: | |
| # cleanup on error too | |
| subprocess.run(["bash", "-lc", f"rm -rf {work_dir} >/dev/null 2>&1"]) | |
| raise | |
| except Exception as e: | |
| subprocess.run(["bash", "-lc", f"rm -rf {work_dir} >/dev/null 2>&1"]) | |
| raise HTTPException(500, str(e)) |