""" ClearWave AI — HuggingFace Spaces Gradio UI + FastAPI routes BACKGROUND JOB SYSTEM: - POST /api/process-url → returns {jobId} instantly (no timeout) - GET /api/job/{jobId} → poll for progress / result - Jobs run in background threads — handles 1hr+ audio safely - Job results stored in memory for 1 hour then auto-cleaned - Gradio UI uses same background thread approach """ import os import uuid import json import base64 import tempfile import logging import subprocess import threading import time import requests import numpy as np import gradio as gr logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from denoiser import Denoiser from transcriber import Transcriber from translator import Translator denoiser = Denoiser() transcriber = Transcriber() translator = Translator() LANGUAGES_DISPLAY = { "Auto Detect": "auto", "English": "en", "Telugu": "te", "Hindi": "hi", "Tamil": "ta", "Kannada": "kn", "Spanish": "es", "French": "fr", "German": "de", "Japanese": "ja", "Chinese": "zh", "Arabic": "ar", } OUT_LANGS = {k: v for k, v in LANGUAGES_DISPLAY.items() if k != "Auto Detect"} # ══════════════════════════════════════════════════════════════════════ # JOB STORE — in-memory job registry # ══════════════════════════════════════════════════════════════════════ _jobs: dict = {} _jobs_lock = threading.Lock() JOB_TTL_SEC = 3600 # keep results for 1 hour def _new_job() -> str: job_id = str(uuid.uuid4()) with _jobs_lock: _jobs[job_id] = { "status": "queued", "step": 0, "message": "Queued...", "result": None, "created_at": time.time(), } return job_id def _update_job(job_id: str, **kwargs): with _jobs_lock: if job_id in _jobs: _jobs[job_id].update(kwargs) def _get_job(job_id: str) -> dict: with _jobs_lock: return dict(_jobs.get(job_id, {})) def _cleanup_loop(): """Remove jobs older than JOB_TTL_SEC — runs every 5 minutes.""" while True: time.sleep(300) now = time.time() with _jobs_lock: expired = [k for k, v in _jobs.items() if now - v.get("created_at", 0) > JOB_TTL_SEC] for k in expired: del _jobs[k] if expired: logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs") threading.Thread(target=_cleanup_loop, daemon=True).start() # ══════════════════════════════════════════════════════════════════════ # AUDIO FORMAT CONVERTER # ══════════════════════════════════════════════════════════════════════ def convert_to_wav(audio_path: str) -> str: if audio_path is None: return audio_path ext = os.path.splitext(audio_path)[1].lower() if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]: return audio_path try: converted = audio_path + "_converted.wav" result = subprocess.run([ "ffmpeg", "-y", "-i", audio_path, "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted ], capture_output=True) if result.returncode == 0 and os.path.exists(converted): logger.info(f"Converted {ext} → .wav") return converted except Exception as e: logger.warning(f"Conversion error: {e}") return audio_path # ══════════════════════════════════════════════════════════════════════ # CORE PIPELINE # ══════════════════════════════════════════════════════════════════════ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", opt_fillers=True, opt_stutters=True, opt_silences=True, opt_breaths=True, opt_mouth=True, job_id=None): def progress(step, message): update = {"status": "processing", "step": step, "message": message} if job_id: _update_job(job_id, **update) return update out_dir = tempfile.mkdtemp() try: yield progress(1, "⏳ Step 1/5 — Denoising...") denoise1 = denoiser.process( audio_path, out_dir, remove_fillers=False, remove_stutters=False, remove_silences=opt_silences, remove_breaths=opt_breaths, remove_mouth_sounds=opt_mouth, word_segments=None, ) clean1 = denoise1['audio_path'] stats = denoise1['stats'] yield progress(2, "⏳ Step 2/5 — Transcribing...") transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang) word_segs = transcriber._last_segments if (opt_fillers or opt_stutters) and word_segs: yield progress(3, "⏳ Step 3/5 — Removing fillers & stutters...") import soundfile as sf audio_data, sr = sf.read(clean1) if audio_data.ndim == 2: audio_data = audio_data.mean(axis=1) audio_data = audio_data.astype(np.float32) if opt_fillers: audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs) stats['fillers_removed'] = n_f transcript = denoiser.clean_transcript_fillers(transcript) if opt_stutters: audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs) stats['stutters_removed'] = n_s sf.write(clean1, audio_data, sr, subtype="PCM_24") else: stats['fillers_removed'] = 0 stats['stutters_removed'] = 0 translation = transcript tl_method = "same language" if tgt_lang != "auto" and detected_lang != tgt_lang: yield progress(4, "⏳ Step 4/5 — Translating...") translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) yield progress(5, "⏳ Step 5/5 — Summarizing...") summary = translator.summarize(transcript) with open(clean1, "rb") as f: enhanced_b64 = base64.b64encode(f.read()).decode("utf-8") result = { "status": "done", "step": 5, "message": "✅ Done!", "transcript": transcript, "translation": translation, "summary": summary, "audioPath": clean1, "enhancedAudio": enhanced_b64, "stats": { "language": detected_lang.upper(), "noise_method": stats.get("noise_method", "noisereduce"), "fillers_removed": stats.get("fillers_removed", 0), "stutters_removed": stats.get("stutters_removed", 0), "silences_removed_sec": stats.get("silences_removed_sec", 0), "breaths_reduced": stats.get("breaths_reduced", False), "mouth_sounds_removed": stats.get("mouth_sounds_removed", 0), "transcription_method": t_method, "translation_method": tl_method, "processing_sec": stats.get("processing_sec", 0), "word_segments": len(word_segs), "transcript_words": len(transcript.split()), }, } if job_id: _update_job(job_id, status="done", step=5, message="✅ Done!", result=result) yield result except Exception as e: logger.error(f"Pipeline failed: {e}", exc_info=True) err = {"status": "error", "message": f"❌ Error: {str(e)}"} if job_id: _update_job(job_id, **err) yield err # ══════════════════════════════════════════════════════════════════════ # BACKGROUND WORKER # ══════════════════════════════════════════════════════════════════════ def _run_job_in_background(job_id, audio_path, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth): try: for _ in run_pipeline( audio_path, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth, job_id=job_id ): pass except Exception as e: _update_job(job_id, status="error", message=f"❌ {e}") finally: try: os.unlink(audio_path) except Exception: pass # ══════════════════════════════════════════════════════════════════════ # GRADIO UI # ══════════════════════════════════════════════════════════════════════ def process_audio_gradio(audio_path, in_lang_name, out_lang_name, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth): if audio_path is None: yield ("❌ Please upload an audio file.", "", "", None, "", "") return if isinstance(audio_path, dict): audio_path = audio_path.get("name") or audio_path.get("path", "") audio_path = convert_to_wav(audio_path) src_lang = LANGUAGES_DISPLAY.get(in_lang_name, "auto") tgt_lang = LANGUAGES_DISPLAY.get(out_lang_name, "te") # Start background job job_id = _new_job() threading.Thread( target=_run_job_in_background, args=(job_id, audio_path, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth), daemon=True, ).start() # Poll and stream progress to Gradio UI while True: time.sleep(2) job = _get_job(job_id) if not job: yield ("❌ Job not found.", "", "", None, "", "") return status = job.get("status") message = job.get("message", "Processing...") if status in ("queued", "downloading", "processing"): yield (message, "", "", None, "", "") elif status == "done": result = job.get("result", {}) s = result.get("stats", {}) stats_str = "\n".join([ f"🎙️ Language : {s.get('language','?')}", f"🔊 Noise method : {s.get('noise_method','?')}", f"🗑️ Fillers : {s.get('fillers_removed', 0)} removed", f"🔁 Stutters : {s.get('stutters_removed', 0)} removed", f"🤫 Silences : {s.get('silences_removed_sec', 0):.1f}s removed", f"📝 Transcription : {s.get('transcription_method','?')}", f"🌐 Translation : {s.get('translation_method','?')}", f"⏱️ Total time : {s.get('processing_sec', 0):.1f}s", ]) yield (result.get("message", "✅ Done!"), result.get("transcript", ""), result.get("translation", ""), result.get("audioPath"), stats_str, result.get("summary", "")) return elif status == "error": yield (job.get("message", "❌ Error"), "", "", None, "Failed.", "") return with gr.Blocks(title="ClearWave AI") as demo: gr.Markdown("# 🎵 ClearWave AI\n### Professional Audio Enhancement — handles 1hr+ audio!") with gr.Row(): with gr.Column(scale=1): audio_in = gr.File( label="📁 Upload Audio (MP3, WAV, MPEG, MP4, AAC, OGG, FLAC, AMR...)", file_types=[ ".mp3", ".wav", ".mpeg", ".mpg", ".mp4", ".m4a", ".aac", ".ogg", ".flac", ".opus", ".webm", ".amr", ".wma", ".aiff", ".aif", ".midi", ".mid", ], ) with gr.Row(): in_lang = gr.Dropdown(label="Input Language", choices=list(LANGUAGES_DISPLAY.keys()), value="Auto Detect") out_lang = gr.Dropdown(label="Output Language", choices=list(OUT_LANGS.keys()), value="Telugu") gr.Markdown("### 🎛️ Options") with gr.Row(): opt_fillers = gr.Checkbox(label="🗑️ Remove Fillers", value=True) opt_stutters = gr.Checkbox(label="🔁 Remove Stutters", value=True) with gr.Row(): opt_silences = gr.Checkbox(label="🤫 Remove Silences", value=True) opt_breaths = gr.Checkbox(label="💨 Reduce Breaths", value=True) with gr.Row(): opt_mouth = gr.Checkbox(label="👄 Reduce Mouth Sounds", value=True) process_btn = gr.Button("🚀 Process Audio", variant="primary", size="lg") status_out = gr.Textbox(label="Status", interactive=False, lines=2) with gr.Column(scale=2): with gr.Tabs(): with gr.Tab("📝 Text"): with gr.Row(): transcript_out = gr.Textbox(label="Transcript", lines=14) translation_out = gr.Textbox(label="Translation", lines=14) with gr.Tab("🔊 Clean Audio"): clean_audio_out = gr.Audio(label="Enhanced Audio", type="filepath") with gr.Tab("📊 Stats"): stats_out = gr.Textbox(label="Stats", lines=12, interactive=False) with gr.Tab("📋 Summary"): summary_out = gr.Textbox(label="Summary", lines=14) process_btn.click( fn=process_audio_gradio, inputs=[audio_in, in_lang, out_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth], outputs=[status_out, transcript_out, translation_out, clean_audio_out, stats_out, summary_out] ) # ══════════════════════════════════════════════════════════════════════ # API ROUTES # ══════════════════════════════════════════════════════════════════════ import json as _json from fastapi import Request as _Request from fastapi.responses import JSONResponse as _JSONResponse @demo.app.get("/api/health") async def api_health(): return _JSONResponse({ "status": "ok", "service": "ClearWave AI on HuggingFace", "jobs_active": len(_jobs), }) @demo.app.post("/api/process-url") async def api_process_url(request: _Request): """ Instantly returns a jobId. Client polls GET /api/job/{jobId} for progress and result. No timeout issues — works for 1hr+ audio. """ data = await request.json() if "data" in data and isinstance(data["data"], dict): data = data["data"] audio_url = data.get("audioUrl") audio_id = data.get("audioId", "") src_lang = data.get("srcLang", "auto") tgt_lang = data.get("tgtLang", "te") opt_fillers = data.get("optFillers", True) opt_stutters = data.get("optStutters", True) opt_silences = data.get("optSilences", True) opt_breaths = data.get("optBreaths", True) opt_mouth = data.get("optMouth", True) if not audio_url: return _JSONResponse({"error": "audioUrl is required"}, status_code=400) job_id = _new_job() _update_job(job_id, status="downloading", message="Downloading audio...") def _download_and_run(): tmp_path = None audio_path = None try: # Download resp = requests.get(audio_url, timeout=300, stream=True) resp.raise_for_status() url_lower = audio_url.lower() if "wav" in url_lower: suffix = ".wav" elif "mpeg" in url_lower: suffix = ".mpeg" elif "mp4" in url_lower: suffix = ".mp4" elif "m4a" in url_lower: suffix = ".m4a" else: suffix = ".mp3" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tmp_path = tmp.name downloaded = 0 total = int(resp.headers.get("content-length", 0)) for chunk in resp.iter_content(chunk_size=65536): if chunk: tmp.write(chunk) downloaded += len(chunk) if total: pct = int(downloaded * 100 / total) _update_job(job_id, status="downloading", message=f"Downloading... {pct}%") tmp.close() # Convert format audio_path = convert_to_wav(tmp_path) # Run pipeline for _ in run_pipeline( audio_path, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth, job_id=job_id ): pass # Tag result with audioId with _jobs_lock: if job_id in _jobs and _jobs[job_id].get("result"): _jobs[job_id]["result"]["audioId"] = audio_id except Exception as e: logger.error(f"Job {job_id} failed: {e}", exc_info=True) _update_job(job_id, status="error", message=f"❌ Error: {str(e)}") finally: for p in [tmp_path, audio_path]: try: if p and os.path.exists(p): os.unlink(p) except Exception: pass threading.Thread(target=_download_and_run, daemon=True).start() return _JSONResponse({ "jobId": job_id, "audioId": audio_id, "status": "queued", "pollUrl": f"/api/job/{job_id}", "message": "Job started! Poll pollUrl for progress.", }) @demo.app.get("/api/job/{job_id}") async def api_get_job(job_id: str): """ Poll this to get job progress. When status=done, result contains full transcript/translation/audio. """ job = _get_job(job_id) if not job: return _JSONResponse({"error": "Job not found"}, status_code=404) response = { "jobId": job_id, "status": job.get("status"), "step": job.get("step", 0), "message": job.get("message", ""), } if job.get("status") == "done": response["result"] = job.get("result", {}) return _JSONResponse(response) @demo.app.get("/api/jobs") async def api_list_jobs(): """List all active jobs.""" with _jobs_lock: summary = { k: {"status": v["status"], "step": v.get("step", 0), "message": v.get("message", "")} for k, v in _jobs.items() } return _JSONResponse({"jobs": summary, "total": len(summary)}) logger.info("✅ Routes: /api/health, /api/process-url, /api/job/{id}, /api/jobs") if __name__ == "__main__": demo.launch()