Spaces:
Running
Running
| """ | |
| ClearWave AI — HuggingFace Spaces | |
| Gradio UI + FastAPI routes | |
| BACKGROUND JOB SYSTEM: | |
| - POST /api/process-url → returns {jobId} instantly (no timeout) | |
| - GET /api/job/{jobId} → poll for progress / result | |
| - Jobs run in background threads — handles 1hr+ audio safely | |
| - Job results stored in memory for 1 hour then auto-cleaned | |
| - Gradio UI uses same background thread approach | |
| """ | |
| import os | |
| import uuid | |
| import json | |
| import base64 | |
| import tempfile | |
| import logging | |
| import subprocess | |
| import threading | |
| import time | |
| import requests | |
| import numpy as np | |
| import gradio as gr | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| from denoiser import Denoiser | |
| from transcriber import Transcriber | |
| from translator import Translator | |
| denoiser = Denoiser() | |
| transcriber = Transcriber() | |
| translator = Translator() | |
| LANGUAGES_DISPLAY = { | |
| "Auto Detect": "auto", | |
| "English": "en", "Telugu": "te", | |
| "Hindi": "hi", "Tamil": "ta", | |
| "Kannada": "kn", "Spanish": "es", | |
| "French": "fr", "German": "de", | |
| "Japanese": "ja", "Chinese": "zh", | |
| "Arabic": "ar", | |
| } | |
| OUT_LANGS = {k: v for k, v in LANGUAGES_DISPLAY.items() if k != "Auto Detect"} | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # JOB STORE — in-memory job registry | |
| # ══════════════════════════════════════════════════════════════════════ | |
| _jobs: dict = {} | |
| _jobs_lock = threading.Lock() | |
| JOB_TTL_SEC = 3600 # keep results for 1 hour | |
| def _new_job() -> str: | |
| job_id = str(uuid.uuid4()) | |
| with _jobs_lock: | |
| _jobs[job_id] = { | |
| "status": "queued", | |
| "step": 0, | |
| "message": "Queued...", | |
| "result": None, | |
| "created_at": time.time(), | |
| } | |
| return job_id | |
| def _update_job(job_id: str, **kwargs): | |
| with _jobs_lock: | |
| if job_id in _jobs: | |
| _jobs[job_id].update(kwargs) | |
| def _get_job(job_id: str) -> dict: | |
| with _jobs_lock: | |
| return dict(_jobs.get(job_id, {})) | |
| def _cleanup_loop(): | |
| """Remove jobs older than JOB_TTL_SEC — runs every 5 minutes.""" | |
| while True: | |
| time.sleep(300) | |
| now = time.time() | |
| with _jobs_lock: | |
| expired = [k for k, v in _jobs.items() | |
| if now - v.get("created_at", 0) > JOB_TTL_SEC] | |
| for k in expired: | |
| del _jobs[k] | |
| if expired: | |
| logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs") | |
| threading.Thread(target=_cleanup_loop, daemon=True).start() | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # AUDIO FORMAT CONVERTER | |
| # ══════════════════════════════════════════════════════════════════════ | |
| def convert_to_wav(audio_path: str) -> str: | |
| if audio_path is None: | |
| return audio_path | |
| ext = os.path.splitext(audio_path)[1].lower() | |
| if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]: | |
| return audio_path | |
| try: | |
| converted = audio_path + "_converted.wav" | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", audio_path, | |
| "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted | |
| ], capture_output=True) | |
| if result.returncode == 0 and os.path.exists(converted): | |
| logger.info(f"Converted {ext} → .wav") | |
| return converted | |
| except Exception as e: | |
| logger.warning(f"Conversion error: {e}") | |
| return audio_path | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # CORE PIPELINE | |
| # ══════════════════════════════════════════════════════════════════════ | |
| def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", | |
| opt_fillers=True, opt_stutters=True, opt_silences=True, | |
| opt_breaths=True, opt_mouth=True, job_id=None): | |
| def progress(step, message): | |
| update = {"status": "processing", "step": step, "message": message} | |
| if job_id: | |
| _update_job(job_id, **update) | |
| return update | |
| out_dir = tempfile.mkdtemp() | |
| try: | |
| yield progress(1, "⏳ Step 1/5 — Denoising...") | |
| denoise1 = denoiser.process( | |
| audio_path, out_dir, | |
| remove_fillers=False, remove_stutters=False, | |
| remove_silences=opt_silences, remove_breaths=opt_breaths, | |
| remove_mouth_sounds=opt_mouth, word_segments=None, | |
| ) | |
| clean1 = denoise1['audio_path'] | |
| stats = denoise1['stats'] | |
| yield progress(2, "⏳ Step 2/5 — Transcribing...") | |
| transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang) | |
| word_segs = transcriber._last_segments | |
| if (opt_fillers or opt_stutters) and word_segs: | |
| yield progress(3, "⏳ Step 3/5 — Removing fillers & stutters...") | |
| import soundfile as sf | |
| audio_data, sr = sf.read(clean1) | |
| if audio_data.ndim == 2: | |
| audio_data = audio_data.mean(axis=1) | |
| audio_data = audio_data.astype(np.float32) | |
| if opt_fillers: | |
| audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs) | |
| stats['fillers_removed'] = n_f | |
| transcript = denoiser.clean_transcript_fillers(transcript) | |
| if opt_stutters: | |
| audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs) | |
| stats['stutters_removed'] = n_s | |
| sf.write(clean1, audio_data, sr, subtype="PCM_24") | |
| else: | |
| stats['fillers_removed'] = 0 | |
| stats['stutters_removed'] = 0 | |
| translation = transcript | |
| tl_method = "same language" | |
| if tgt_lang != "auto" and detected_lang != tgt_lang: | |
| yield progress(4, "⏳ Step 4/5 — Translating...") | |
| translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) | |
| yield progress(5, "⏳ Step 5/5 — Summarizing...") | |
| summary = translator.summarize(transcript) | |
| with open(clean1, "rb") as f: | |
| enhanced_b64 = base64.b64encode(f.read()).decode("utf-8") | |
| result = { | |
| "status": "done", | |
| "step": 5, | |
| "message": "✅ Done!", | |
| "transcript": transcript, | |
| "translation": translation, | |
| "summary": summary, | |
| "audioPath": clean1, | |
| "enhancedAudio": enhanced_b64, | |
| "stats": { | |
| "language": detected_lang.upper(), | |
| "noise_method": stats.get("noise_method", "noisereduce"), | |
| "fillers_removed": stats.get("fillers_removed", 0), | |
| "stutters_removed": stats.get("stutters_removed", 0), | |
| "silences_removed_sec": stats.get("silences_removed_sec", 0), | |
| "breaths_reduced": stats.get("breaths_reduced", False), | |
| "mouth_sounds_removed": stats.get("mouth_sounds_removed", 0), | |
| "transcription_method": t_method, | |
| "translation_method": tl_method, | |
| "processing_sec": stats.get("processing_sec", 0), | |
| "word_segments": len(word_segs), | |
| "transcript_words": len(transcript.split()), | |
| }, | |
| } | |
| if job_id: | |
| _update_job(job_id, status="done", step=5, | |
| message="✅ Done!", result=result) | |
| yield result | |
| except Exception as e: | |
| logger.error(f"Pipeline failed: {e}", exc_info=True) | |
| err = {"status": "error", "message": f"❌ Error: {str(e)}"} | |
| if job_id: | |
| _update_job(job_id, **err) | |
| yield err | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # BACKGROUND WORKER | |
| # ══════════════════════════════════════════════════════════════════════ | |
| def _run_job_in_background(job_id, audio_path, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth): | |
| try: | |
| for _ in run_pipeline( | |
| audio_path, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth, job_id=job_id | |
| ): | |
| pass | |
| except Exception as e: | |
| _update_job(job_id, status="error", message=f"❌ {e}") | |
| finally: | |
| try: | |
| os.unlink(audio_path) | |
| except Exception: | |
| pass | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # GRADIO UI | |
| # ══════════════════════════════════════════════════════════════════════ | |
| def process_audio_gradio(audio_path, in_lang_name, out_lang_name, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth): | |
| if audio_path is None: | |
| yield ("❌ Please upload an audio file.", "", "", None, "", "") | |
| return | |
| if isinstance(audio_path, dict): | |
| audio_path = audio_path.get("name") or audio_path.get("path", "") | |
| audio_path = convert_to_wav(audio_path) | |
| src_lang = LANGUAGES_DISPLAY.get(in_lang_name, "auto") | |
| tgt_lang = LANGUAGES_DISPLAY.get(out_lang_name, "te") | |
| # Start background job | |
| job_id = _new_job() | |
| threading.Thread( | |
| target=_run_job_in_background, | |
| args=(job_id, audio_path, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth), | |
| daemon=True, | |
| ).start() | |
| # Poll and stream progress to Gradio UI | |
| while True: | |
| time.sleep(2) | |
| job = _get_job(job_id) | |
| if not job: | |
| yield ("❌ Job not found.", "", "", None, "", "") | |
| return | |
| status = job.get("status") | |
| message = job.get("message", "Processing...") | |
| if status in ("queued", "downloading", "processing"): | |
| yield (message, "", "", None, "", "") | |
| elif status == "done": | |
| result = job.get("result", {}) | |
| s = result.get("stats", {}) | |
| stats_str = "\n".join([ | |
| f"🎙️ Language : {s.get('language','?')}", | |
| f"🔊 Noise method : {s.get('noise_method','?')}", | |
| f"🗑️ Fillers : {s.get('fillers_removed', 0)} removed", | |
| f"🔁 Stutters : {s.get('stutters_removed', 0)} removed", | |
| f"🤫 Silences : {s.get('silences_removed_sec', 0):.1f}s removed", | |
| f"📝 Transcription : {s.get('transcription_method','?')}", | |
| f"🌐 Translation : {s.get('translation_method','?')}", | |
| f"⏱️ Total time : {s.get('processing_sec', 0):.1f}s", | |
| ]) | |
| yield (result.get("message", "✅ Done!"), | |
| result.get("transcript", ""), | |
| result.get("translation", ""), | |
| result.get("audioPath"), | |
| stats_str, | |
| result.get("summary", "")) | |
| return | |
| elif status == "error": | |
| yield (job.get("message", "❌ Error"), "", "", None, "Failed.", "") | |
| return | |
| with gr.Blocks(title="ClearWave AI") as demo: | |
| gr.Markdown("# 🎵 ClearWave AI\n### Professional Audio Enhancement — handles 1hr+ audio!") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_in = gr.File( | |
| label="📁 Upload Audio (MP3, WAV, MPEG, MP4, AAC, OGG, FLAC, AMR...)", | |
| file_types=[ | |
| ".mp3", ".wav", ".mpeg", ".mpg", ".mp4", ".m4a", | |
| ".aac", ".ogg", ".flac", ".opus", ".webm", ".amr", | |
| ".wma", ".aiff", ".aif", ".midi", ".mid", | |
| ], | |
| ) | |
| with gr.Row(): | |
| in_lang = gr.Dropdown(label="Input Language", | |
| choices=list(LANGUAGES_DISPLAY.keys()), | |
| value="Auto Detect") | |
| out_lang = gr.Dropdown(label="Output Language", | |
| choices=list(OUT_LANGS.keys()), | |
| value="Telugu") | |
| gr.Markdown("### 🎛️ Options") | |
| with gr.Row(): | |
| opt_fillers = gr.Checkbox(label="🗑️ Remove Fillers", value=True) | |
| opt_stutters = gr.Checkbox(label="🔁 Remove Stutters", value=True) | |
| with gr.Row(): | |
| opt_silences = gr.Checkbox(label="🤫 Remove Silences", value=True) | |
| opt_breaths = gr.Checkbox(label="💨 Reduce Breaths", value=True) | |
| with gr.Row(): | |
| opt_mouth = gr.Checkbox(label="👄 Reduce Mouth Sounds", value=True) | |
| process_btn = gr.Button("🚀 Process Audio", variant="primary", size="lg") | |
| status_out = gr.Textbox(label="Status", interactive=False, lines=2) | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.Tab("📝 Text"): | |
| with gr.Row(): | |
| transcript_out = gr.Textbox(label="Transcript", lines=14) | |
| translation_out = gr.Textbox(label="Translation", lines=14) | |
| with gr.Tab("🔊 Clean Audio"): | |
| clean_audio_out = gr.Audio(label="Enhanced Audio", type="filepath") | |
| with gr.Tab("📊 Stats"): | |
| stats_out = gr.Textbox(label="Stats", lines=12, interactive=False) | |
| with gr.Tab("📋 Summary"): | |
| summary_out = gr.Textbox(label="Summary", lines=14) | |
| process_btn.click( | |
| fn=process_audio_gradio, | |
| inputs=[audio_in, in_lang, out_lang, opt_fillers, opt_stutters, | |
| opt_silences, opt_breaths, opt_mouth], | |
| outputs=[status_out, transcript_out, translation_out, | |
| clean_audio_out, stats_out, summary_out] | |
| ) | |
| # ══════════════════════════════════════════════════════════════════════ | |
| # API ROUTES | |
| # ══════════════════════════════════════════════════════════════════════ | |
| import json as _json | |
| from fastapi import Request as _Request | |
| from fastapi.responses import JSONResponse as _JSONResponse | |
| async def api_health(): | |
| return _JSONResponse({ | |
| "status": "ok", | |
| "service": "ClearWave AI on HuggingFace", | |
| "jobs_active": len(_jobs), | |
| }) | |
| async def api_process_url(request: _Request): | |
| """ | |
| Instantly returns a jobId. | |
| Client polls GET /api/job/{jobId} for progress and result. | |
| No timeout issues — works for 1hr+ audio. | |
| """ | |
| data = await request.json() | |
| if "data" in data and isinstance(data["data"], dict): | |
| data = data["data"] | |
| audio_url = data.get("audioUrl") | |
| audio_id = data.get("audioId", "") | |
| src_lang = data.get("srcLang", "auto") | |
| tgt_lang = data.get("tgtLang", "te") | |
| opt_fillers = data.get("optFillers", True) | |
| opt_stutters = data.get("optStutters", True) | |
| opt_silences = data.get("optSilences", True) | |
| opt_breaths = data.get("optBreaths", True) | |
| opt_mouth = data.get("optMouth", True) | |
| if not audio_url: | |
| return _JSONResponse({"error": "audioUrl is required"}, status_code=400) | |
| job_id = _new_job() | |
| _update_job(job_id, status="downloading", message="Downloading audio...") | |
| def _download_and_run(): | |
| tmp_path = None | |
| audio_path = None | |
| try: | |
| # Download | |
| resp = requests.get(audio_url, timeout=300, stream=True) | |
| resp.raise_for_status() | |
| url_lower = audio_url.lower() | |
| if "wav" in url_lower: suffix = ".wav" | |
| elif "mpeg" in url_lower: suffix = ".mpeg" | |
| elif "mp4" in url_lower: suffix = ".mp4" | |
| elif "m4a" in url_lower: suffix = ".m4a" | |
| else: suffix = ".mp3" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| tmp_path = tmp.name | |
| downloaded = 0 | |
| total = int(resp.headers.get("content-length", 0)) | |
| for chunk in resp.iter_content(chunk_size=65536): | |
| if chunk: | |
| tmp.write(chunk) | |
| downloaded += len(chunk) | |
| if total: | |
| pct = int(downloaded * 100 / total) | |
| _update_job(job_id, status="downloading", | |
| message=f"Downloading... {pct}%") | |
| tmp.close() | |
| # Convert format | |
| audio_path = convert_to_wav(tmp_path) | |
| # Run pipeline | |
| for _ in run_pipeline( | |
| audio_path, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth, job_id=job_id | |
| ): | |
| pass | |
| # Tag result with audioId | |
| with _jobs_lock: | |
| if job_id in _jobs and _jobs[job_id].get("result"): | |
| _jobs[job_id]["result"]["audioId"] = audio_id | |
| except Exception as e: | |
| logger.error(f"Job {job_id} failed: {e}", exc_info=True) | |
| _update_job(job_id, status="error", message=f"❌ Error: {str(e)}") | |
| finally: | |
| for p in [tmp_path, audio_path]: | |
| try: | |
| if p and os.path.exists(p): | |
| os.unlink(p) | |
| except Exception: | |
| pass | |
| threading.Thread(target=_download_and_run, daemon=True).start() | |
| return _JSONResponse({ | |
| "jobId": job_id, | |
| "audioId": audio_id, | |
| "status": "queued", | |
| "pollUrl": f"/api/job/{job_id}", | |
| "message": "Job started! Poll pollUrl for progress.", | |
| }) | |
| async def api_get_job(job_id: str): | |
| """ | |
| Poll this to get job progress. | |
| When status=done, result contains full transcript/translation/audio. | |
| """ | |
| job = _get_job(job_id) | |
| if not job: | |
| return _JSONResponse({"error": "Job not found"}, status_code=404) | |
| response = { | |
| "jobId": job_id, | |
| "status": job.get("status"), | |
| "step": job.get("step", 0), | |
| "message": job.get("message", ""), | |
| } | |
| if job.get("status") == "done": | |
| response["result"] = job.get("result", {}) | |
| return _JSONResponse(response) | |
| async def api_list_jobs(): | |
| """List all active jobs.""" | |
| with _jobs_lock: | |
| summary = { | |
| k: {"status": v["status"], "step": v.get("step", 0), | |
| "message": v.get("message", "")} | |
| for k, v in _jobs.items() | |
| } | |
| return _JSONResponse({"jobs": summary, "total": len(summary)}) | |
| logger.info("✅ Routes: /api/health, /api/process-url, /api/job/{id}, /api/jobs") | |
| if __name__ == "__main__": | |
| demo.launch() |