Spaces:
Sleeping
Sleeping
| """ | |
| ClearWave AI β API Space (FastAPI only) | |
| BACKGROUND JOB SYSTEM: | |
| - POST /api/process-url β returns {jobId} instantly β NO timeout issues | |
| - GET /api/job/{jobId} β poll for progress and final result | |
| - GET /api/jobs β list all active jobs (debug) | |
| - Jobs run in background threads β handles 1hr+ audio safely | |
| - Results stored in memory for 1 hour then auto-cleaned | |
| """ | |
| import os | |
| import uuid | |
| import json | |
| import tempfile | |
| import logging | |
| import threading | |
| import time | |
| import requests | |
| import numpy as np | |
| import subprocess | |
| import cloudinary | |
| import cloudinary.uploader | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| cloudinary.config( | |
| cloud_name = os.environ.get("CLOUD_NAME"), | |
| api_key = os.environ.get("API_KEY"), | |
| api_secret = os.environ.get("API_SECRET"), | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| from denoiser import Denoiser | |
| from transcriber import Transcriber | |
| from translator import Translator | |
| denoiser = Denoiser() | |
| transcriber = Transcriber() | |
| translator = Translator() | |
| app = FastAPI(title="ClearWave AI API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # JOB STORE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _jobs: dict = {} | |
| _jobs_lock = threading.Lock() | |
| JOB_TTL_SEC = 3600 | |
| def _new_job() -> str: | |
| job_id = str(uuid.uuid4()) | |
| with _jobs_lock: | |
| _jobs[job_id] = { | |
| "status": "queued", | |
| "step": 0, | |
| "message": "Queued...", | |
| "result": None, | |
| "created_at": time.time(), | |
| } | |
| return job_id | |
| def _update_job(job_id: str, **kwargs): | |
| with _jobs_lock: | |
| if job_id in _jobs: | |
| _jobs[job_id].update(kwargs) | |
| def _get_job(job_id: str) -> dict: | |
| with _jobs_lock: | |
| return dict(_jobs.get(job_id, {})) | |
| def _cleanup_loop(): | |
| while True: | |
| time.sleep(300) | |
| now = time.time() | |
| with _jobs_lock: | |
| expired = [k for k, v in _jobs.items() | |
| if now - v.get("created_at", 0) > JOB_TTL_SEC] | |
| for k in expired: | |
| del _jobs[k] | |
| if expired: | |
| logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs") | |
| threading.Thread(target=_cleanup_loop, daemon=True).start() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AUDIO FORMAT CONVERTER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def convert_to_wav(audio_path: str) -> str: | |
| if audio_path is None: | |
| return audio_path | |
| ext = os.path.splitext(audio_path)[1].lower() | |
| if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]: | |
| return audio_path | |
| try: | |
| converted = audio_path + "_converted.wav" | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", audio_path, | |
| "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted | |
| ], capture_output=True) | |
| if result.returncode == 0 and os.path.exists(converted): | |
| logger.info(f"Converted {ext} β .wav") | |
| return converted | |
| except Exception as e: | |
| logger.warning(f"Conversion error: {e}") | |
| return audio_path | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CORE PIPELINE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", | |
| opt_fillers=True, opt_stutters=True, opt_silences=True, | |
| opt_breaths=True, opt_mouth=True, job_id=None): | |
| def progress(step, message): | |
| update = {"status": "processing", "step": step, "message": message} | |
| if job_id: | |
| _update_job(job_id, **update) | |
| return update | |
| out_dir = tempfile.mkdtemp() | |
| try: | |
| yield progress(1, "Step 1/5 β Denoising...") | |
| denoise1 = denoiser.process( | |
| audio_path, out_dir, | |
| remove_fillers=False, remove_stutters=False, | |
| remove_silences=opt_silences, remove_breaths=opt_breaths, | |
| remove_mouth_sounds=opt_mouth, word_segments=None, | |
| ) | |
| clean1 = denoise1["audio_path"] | |
| stats = denoise1["stats"] | |
| yield progress(2, "Step 2/5 β Transcribing...") | |
| transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang) | |
| word_segs = transcriber._last_segments | |
| if (opt_fillers or opt_stutters) and word_segs: | |
| yield progress(3, "Step 3/5 β Removing fillers & stutters...") | |
| import soundfile as sf | |
| audio_data, sr = sf.read(clean1) | |
| if audio_data.ndim == 2: | |
| audio_data = audio_data.mean(axis=1) | |
| audio_data = audio_data.astype(np.float32) | |
| if opt_fillers: | |
| audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs) | |
| stats["fillers_removed"] = n_f | |
| transcript = denoiser.clean_transcript_fillers(transcript) | |
| if opt_stutters: | |
| audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs) | |
| stats["stutters_removed"] = n_s | |
| clean_wav = os.path.join(out_dir, "clean_step3.wav") | |
| sf.write(clean_wav, audio_data, sr, format="WAV", subtype="PCM_24") | |
| clean1 = clean_wav | |
| else: | |
| stats["fillers_removed"] = 0 | |
| stats["stutters_removed"] = 0 | |
| translation = transcript | |
| tl_method = "same language" | |
| if tgt_lang != "auto" and detected_lang != tgt_lang: | |
| yield progress(4, "Step 4/5 β Translating...") | |
| translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) | |
| yield progress(5, "Step 5/5 β Summarizing...") | |
| summary = translator.summarize(transcript) | |
| # Upload to Cloudinary | |
| enhanced_url = None | |
| try: | |
| upload_result = cloudinary.uploader.upload( | |
| clean1, | |
| resource_type="video", | |
| folder="clearwave_enhanced", | |
| ) | |
| enhanced_url = upload_result["secure_url"] | |
| logger.info(f"Uploaded to Cloudinary: {enhanced_url}") | |
| except Exception as e: | |
| logger.error(f"Cloudinary upload failed: {e}") | |
| result = { | |
| "status": "done", | |
| "step": 5, | |
| "message": "Done!", | |
| "transcript": transcript, | |
| "translation": translation, | |
| "summary": summary, | |
| "enhancedAudio": enhanced_url, | |
| "stats": { | |
| "language": detected_lang.upper(), | |
| "noise_method": stats.get("noise_method", "noisereduce"), | |
| "fillers_removed": stats.get("fillers_removed", 0), | |
| "stutters_removed": stats.get("stutters_removed", 0), | |
| "silences_removed_sec": stats.get("silences_removed_sec", 0), | |
| "breaths_reduced": stats.get("breaths_reduced", False), | |
| "mouth_sounds_removed": stats.get("mouth_sounds_removed", 0), | |
| "transcription_method": t_method, | |
| "translation_method": tl_method, | |
| "processing_sec": stats.get("processing_sec", 0), | |
| "word_segments": len(word_segs), | |
| "transcript_words": len(transcript.split()), | |
| }, | |
| } | |
| if job_id: | |
| _update_job(job_id, status="done", step=5, | |
| message="Done!", result=result) | |
| yield result | |
| except Exception as e: | |
| logger.error(f"Pipeline failed: {e}", exc_info=True) | |
| err = {"status": "error", "message": f"Error: {str(e)}"} | |
| if job_id: | |
| _update_job(job_id, **err) | |
| yield err | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ROUTES | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def health(): | |
| return JSONResponse({ | |
| "status": "ok", | |
| "service": "ClearWave AI API", | |
| "jobs_active": len(_jobs), | |
| }) | |
| async def process_url(request: Request): | |
| """ | |
| Submit audio for processing. | |
| Returns jobId immediately β no timeout issues. | |
| Poll GET /api/job/{jobId} for progress and result. | |
| """ | |
| data = await request.json() | |
| audio_url = data.get("audioUrl") | |
| audio_id = data.get("audioId", "") | |
| src_lang = data.get("srcLang", "auto") | |
| tgt_lang = data.get("tgtLang", "te") | |
| opt_fillers = data.get("optFillers", True) | |
| opt_stutters = data.get("optStutters", True) | |
| opt_silences = data.get("optSilences", True) | |
| opt_breaths = data.get("optBreaths", True) | |
| opt_mouth = data.get("optMouth", True) | |
| if not audio_url: | |
| return JSONResponse({"error": "audioUrl is required"}, status_code=400) | |
| job_id = _new_job() | |
| _update_job(job_id, status="downloading", message="Downloading audio...") | |
| def _download_and_run(): | |
| tmp_path = None | |
| audio_path = None | |
| try: | |
| resp = requests.get(audio_url, timeout=300, stream=True) | |
| resp.raise_for_status() | |
| url_lower = audio_url.lower() | |
| if "wav" in url_lower: suffix = ".wav" | |
| elif "mpeg" in url_lower: suffix = ".mpeg" | |
| elif "mp4" in url_lower: suffix = ".mp4" | |
| elif "m4a" in url_lower: suffix = ".m4a" | |
| else: suffix = ".mp3" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| tmp_path = tmp.name | |
| downloaded = 0 | |
| total = int(resp.headers.get("content-length", 0)) | |
| for chunk in resp.iter_content(chunk_size=65536): | |
| if chunk: | |
| tmp.write(chunk) | |
| downloaded += len(chunk) | |
| if total: | |
| pct = int(downloaded * 100 / total) | |
| _update_job(job_id, status="downloading", | |
| message=f"Downloading... {pct}%") | |
| tmp.close() | |
| audio_path = convert_to_wav(tmp_path) | |
| for _ in run_pipeline( | |
| audio_path, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences, | |
| opt_breaths, opt_mouth, job_id=job_id | |
| ): | |
| pass | |
| with _jobs_lock: | |
| if job_id in _jobs and _jobs[job_id].get("result"): | |
| _jobs[job_id]["result"]["audioId"] = audio_id | |
| except Exception as e: | |
| logger.error(f"Job {job_id} failed: {e}", exc_info=True) | |
| _update_job(job_id, status="error", message=f"Error: {str(e)}") | |
| finally: | |
| for p in [tmp_path, audio_path]: | |
| try: | |
| if p and os.path.exists(p): | |
| os.unlink(p) | |
| except Exception: | |
| pass | |
| threading.Thread(target=_download_and_run, daemon=True).start() | |
| return JSONResponse({ | |
| "jobId": job_id, | |
| "audioId": audio_id, | |
| "status": "queued", | |
| "pollUrl": f"/api/job/{job_id}", | |
| "message": "Job started! Poll pollUrl every 3-5 seconds for progress.", | |
| }) | |
| async def get_job(job_id: str): | |
| """ | |
| Poll for job status and result. | |
| status: queued | downloading | processing | done | error | |
| result: available only when status=done | |
| """ | |
| job = _get_job(job_id) | |
| if not job: | |
| return JSONResponse({"error": "Job not found"}, status_code=404) | |
| response = { | |
| "jobId": job_id, | |
| "status": job.get("status"), | |
| "step": job.get("step", 0), | |
| "message": job.get("message", ""), | |
| } | |
| if job.get("status") == "done": | |
| response["result"] = job.get("result", {}) | |
| return JSONResponse(response) | |
| async def list_jobs(): | |
| """List all active jobs β useful for debugging.""" | |
| with _jobs_lock: | |
| summary = { | |
| k: {"status": v["status"], "step": v.get("step", 0), | |
| "message": v.get("message", "")} | |
| for k, v in _jobs.items() | |
| } | |
| return JSONResponse({"jobs": summary, "total": len(summary)}) |