""" ClearWave AI — API Space (FastAPI only) Handles /api/health and /api/process-url Audio enhancement : Cleanvoice API (noise, fillers, stutters, silences, breaths) Transcription : Groq Whisper large-v3 (primary) / faster-whisper (fallback) Translation : NLLB-200-1.3B (primary) / Google Translate (fallback) Summary : Extractive (position-scored) """ import os import json import time import tempfile import logging import requests import cloudinary import cloudinary.uploader from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware # ── Cloudinary config ───────────────────────────────────────────────────────── cloudinary.config( cloud_name = os.environ.get("CLOUD_NAME"), api_key = os.environ.get("API_KEY"), api_secret = os.environ.get("API_SECRET"), ) # ── Cleanvoice config ───────────────────────────────────────────────────────── CLEANVOICE_API_KEY = os.environ.get("CLEANVOICE_API_KEY") CLEANVOICE_BASE = "https://api.cleanvoice.ai/v2" logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from transcriber import Transcriber from translator import Translator transcriber = Transcriber() translator = Translator() app = FastAPI(title="ClearWave AI API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ══════════════════════════════════════════════════════════════════════════════ # CLEANVOICE HELPER # ══════════════════════════════════════════════════════════════════════════════ def cleanvoice_enhance(audio_path: str, out_dir: str, opt_fillers: bool = True, opt_stutters: bool = True, opt_silences: bool = True, opt_breaths: bool = True, opt_mouth: bool = True) -> dict: """ Full Cleanvoice enhancement pipeline: 1. Upload audio file → get signed URL 2. Submit edit job → configure which features to enable 3. Poll until done → max 30 attempts × 10s = 5 minutes 4. Download result → save to out_dir Returns: {"audio_path": str, "stats": dict} Raises RuntimeError on failure so run_pipeline() can catch and report it. """ if not CLEANVOICE_API_KEY: raise RuntimeError("CLEANVOICE_API_KEY is not set in HF Space secrets.") headers = {"X-API-Key": CLEANVOICE_API_KEY} # ── Step 1: Upload ──────────────────────────────────────────────────────── logger.info("[Cleanvoice] Uploading audio...") with open(audio_path, "rb") as f: up_resp = requests.post( f"{CLEANVOICE_BASE}/uploads", headers=headers, files={"file": (os.path.basename(audio_path), f)}, timeout=120, ) up_resp.raise_for_status() file_url = up_resp.json().get("url") or up_resp.json().get("signedUrl") if not file_url: raise RuntimeError(f"Cleanvoice upload gave no URL: {up_resp.json()}") logger.info(f"[Cleanvoice] Upload done → {file_url[:60]}...") # ── Step 2: Submit edit job ─────────────────────────────────────────────── # Cleanvoice config flags — map your pipeline options to Cleanvoice features config = { "enhance_speech": True, # always on — core noise removal "remove_filler_words": opt_fillers, # um, uh, like, basically... "remove_stutters": opt_stutters, # word repetitions "remove_silence": opt_silences, # long pauses "remove_breathing": opt_breaths, # breath sounds "remove_mouth_sounds": opt_mouth, # clicks, pops, smacks } logger.info(f"[Cleanvoice] Submitting edit job with config: {config}") edit_resp = requests.post( f"{CLEANVOICE_BASE}/edits", headers={**headers, "Content-Type": "application/json"}, json={"input": {"files": [file_url], "config": config}}, timeout=30, ) edit_resp.raise_for_status() edit_data = edit_resp.json() edit_id = edit_data.get("id") or edit_data.get("editId") if not edit_id: raise RuntimeError(f"Cleanvoice edit job gave no ID: {edit_data}") logger.info(f"[Cleanvoice] Edit job submitted → id={edit_id}") # ── Step 3: Poll until done ─────────────────────────────────────────────── max_attempts = 36 # 36 × 10s = 6 minutes max for attempt in range(1, max_attempts + 1): time.sleep(10) status_resp = requests.get( f"{CLEANVOICE_BASE}/edits/{edit_id}", headers=headers, timeout=15, ) status_resp.raise_for_status() status_data = status_resp.json() status = status_data.get("status", "unknown") logger.info(f"[Cleanvoice] Poll {attempt}/{max_attempts} → status={status}") if status == "completed": # Grab the output URL — try common key names output = status_data.get("output") or {} enhanced_dl = ( output.get("url") or output.get("downloadUrl") or status_data.get("downloadUrl") ) if not enhanced_dl: raise RuntimeError(f"Cleanvoice completed but no download URL: {status_data}") # ── Step 4: Download enhanced audio ────────────────────────────── logger.info(f"[Cleanvoice] Downloading result from {enhanced_dl[:60]}...") dl = requests.get(enhanced_dl, timeout=120) dl.raise_for_status() # Preserve original extension if possible, default to .mp3 ext = os.path.splitext(enhanced_dl.split("?")[0])[-1] or ".mp3" out_path = os.path.join(out_dir, f"cleanvoice_enhanced{ext}") with open(out_path, "wb") as f: f.write(dl.content) logger.info(f"[Cleanvoice] ✅ Enhanced audio saved → {out_path}") return { "audio_path": out_path, "stats": { "noise_method": "Cleanvoice API", "fillers_removed": "yes" if opt_fillers else "no", "stutters_removed": "yes" if opt_stutters else "no", "silences_removed_sec": "yes" if opt_silences else "no", "breaths_reduced": opt_breaths, "mouth_sounds_removed": "yes" if opt_mouth else "no", }, } elif status in ("error", "failed"): raise RuntimeError(f"Cleanvoice job failed: {status_data.get('message', status_data)}") # still processing — keep polling raise RuntimeError(f"Cleanvoice timed out after {max_attempts * 10}s (edit_id={edit_id})") # ══════════════════════════════════════════════════════════════════════════════ # PIPELINE # ══════════════════════════════════════════════════════════════════════════════ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", opt_fillers=True, opt_stutters=True, opt_silences=True, opt_breaths=True, opt_mouth=True): out_dir = tempfile.mkdtemp() stats = {} word_segs = [] try: # ── Step 1: Cleanvoice — full audio enhancement ─────────────────────── yield {"status": "processing", "step": 1, "message": "Step 1/4 — Enhancing audio with Cleanvoice..."} try: result = cleanvoice_enhance( audio_path, out_dir, opt_fillers=opt_fillers, opt_stutters=opt_stutters, opt_silences=opt_silences, opt_breaths=opt_breaths, opt_mouth=opt_mouth, ) clean1 = result["audio_path"] stats = result["stats"] logger.info("[Pipeline] Cleanvoice enhancement complete") except Exception as e: # Cleanvoice failed — log it and continue with original audio logger.error(f"[Pipeline] Cleanvoice failed: {e} — using original audio") clean1 = audio_path stats = { "noise_method": f"Cleanvoice failed: {e}", "fillers_removed": 0, "stutters_removed": 0, "silences_removed_sec": 0, "breaths_reduced": False, "mouth_sounds_removed": 0, } # ── Step 2: Transcribe ──────────────────────────────────────────────── yield {"status": "processing", "step": 2, "message": "Step 2/4 — Transcribing..."} transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang) word_segs = transcriber._last_segments logger.info(f"[Pipeline] Transcription done: {len(transcript.split())} words, lang={detected_lang}") # ── Step 3: Translate ───────────────────────────────────────────────── translation = transcript tl_method = "same language" if tgt_lang != "auto" and detected_lang != tgt_lang: yield {"status": "processing", "step": 3, "message": "Step 3/4 — Translating..."} translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) logger.info(f"[Pipeline] Translation done via {tl_method}") else: yield {"status": "processing", "step": 3, "message": "Step 3/4 — Skipping translation (same language)..."} # ── Step 4: Summarize + upload to Cloudinary ────────────────────────── yield {"status": "processing", "step": 4, "message": "Step 4/4 — Summarizing & uploading..."} summary = translator.summarize(transcript) try: upload_result = cloudinary.uploader.upload( clean1, resource_type="video", # Cloudinary uses "video" for audio files folder="clearwave_enhanced", ) enhanced_url = upload_result["secure_url"] logger.info(f"[Pipeline] Cloudinary upload done: {enhanced_url}") except Exception as e: logger.error(f"[Pipeline] Cloudinary upload failed: {e}") enhanced_url = None # ── Done ────────────────────────────────────────────────────────────── yield { "status": "done", "step": 4, "message": "Done!", "transcript": transcript, "translation": translation, "summary": summary, "enhancedAudio": enhanced_url, "stats": { "language": detected_lang.upper(), "noise_method": stats.get("noise_method", "Cleanvoice API"), "fillers_removed": stats.get("fillers_removed", 0), "stutters_removed": stats.get("stutters_removed", 0), "silences_removed_sec": stats.get("silences_removed_sec", 0), "breaths_reduced": stats.get("breaths_reduced", False), "mouth_sounds_removed": stats.get("mouth_sounds_removed", 0), "transcription_method": t_method, "translation_method": tl_method, "processing_sec": 0, "word_segments": len(word_segs), "transcript_words": len(transcript.split()), }, } except Exception as e: logger.error(f"Pipeline failed: {e}", exc_info=True) yield {"status": "error", "message": f"Error: {str(e)}"} # ══════════════════════════════════════════════════════════════════════════════ # ROUTES # ══════════════════════════════════════════════════════════════════════════════ @app.get("/api/health") async def health(): return JSONResponse({"status": "ok", "service": "ClearWave AI API"}) @app.post("/api/process-url") async def process_url(request: Request): data = await request.json() audio_url = data.get("audioUrl") audio_id = data.get("audioId", "") src_lang = data.get("srcLang", "auto") tgt_lang = data.get("tgtLang", "te") opt_fillers = data.get("optFillers", True) opt_stutters = data.get("optStutters", True) opt_silences = data.get("optSilences", True) opt_breaths = data.get("optBreaths", True) opt_mouth = data.get("optMouth", True) if not audio_url: return JSONResponse({"error": "audioUrl is required"}, status_code=400) async def generate(): import sys def sse(obj): sys.stdout.flush() return "data: " + json.dumps(obj) + "\n\n" yield sse({"status": "processing", "step": 0, "message": "Downloading audio..."}) try: resp = requests.get(audio_url, timeout=60, stream=True) resp.raise_for_status() suffix = ".wav" if "wav" in audio_url.lower() else ".mp3" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) downloaded = 0 total = int(resp.headers.get("content-length", 0)) for chunk in resp.iter_content(chunk_size=65536): if chunk: tmp.write(chunk) downloaded += len(chunk) if total: pct = int(downloaded * 100 / total) yield sse({"status": "processing", "step": 0, "message": "Downloading... " + str(pct) + "%"}) tmp.close() except Exception as e: yield sse({"status": "error", "message": "Download failed: " + str(e)}) return for result in run_pipeline(tmp.name, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth): result["audioId"] = audio_id yield sse(result) try: os.unlink(tmp.name) except Exception: pass return StreamingResponse( generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, )