""" ClearWave AI — API Space (FastAPI only) BACKGROUND JOB SYSTEM: - POST /api/process-url → returns {jobId} instantly — NO timeout issues - GET /api/job/{jobId} → poll for progress and final result - GET /api/jobs → list all active jobs (debug) - Jobs run in background threads — handles 1hr+ audio safely - Results stored in memory for 1 hour then auto-cleaned """ import os import uuid import json import tempfile import logging import threading import time import requests import numpy as np import subprocess import cloudinary import cloudinary.uploader from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware cloudinary.config( cloud_name = os.environ.get("CLOUD_NAME"), api_key = os.environ.get("API_KEY"), api_secret = os.environ.get("API_SECRET"), ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from denoiser import Denoiser from transcriber import Transcriber from translator import Translator denoiser = Denoiser() transcriber = Transcriber() translator = Translator() app = FastAPI(title="ClearWave AI API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ══════════════════════════════════════════════════════════════════════ # JOB STORE # ══════════════════════════════════════════════════════════════════════ _jobs: dict = {} _jobs_lock = threading.Lock() JOB_TTL_SEC = 3600 def _new_job() -> str: job_id = str(uuid.uuid4()) with _jobs_lock: _jobs[job_id] = { "status": "queued", "step": 0, "message": "Queued...", "result": None, "created_at": time.time(), } return job_id def _update_job(job_id: str, **kwargs): with _jobs_lock: if job_id in _jobs: _jobs[job_id].update(kwargs) def _get_job(job_id: str) -> dict: with _jobs_lock: return dict(_jobs.get(job_id, {})) def _cleanup_loop(): while True: time.sleep(300) now = time.time() with _jobs_lock: expired = [k for k, v in _jobs.items() if now - v.get("created_at", 0) > JOB_TTL_SEC] for k in expired: del _jobs[k] if expired: logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs") threading.Thread(target=_cleanup_loop, daemon=True).start() # ══════════════════════════════════════════════════════════════════════ # AUDIO FORMAT CONVERTER # ══════════════════════════════════════════════════════════════════════ def convert_to_wav(audio_path: str) -> str: if audio_path is None: return audio_path ext = os.path.splitext(audio_path)[1].lower() if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]: return audio_path try: converted = audio_path + "_converted.wav" result = subprocess.run([ "ffmpeg", "-y", "-i", audio_path, "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted ], capture_output=True) if result.returncode == 0 and os.path.exists(converted): logger.info(f"Converted {ext} → .wav") return converted except Exception as e: logger.warning(f"Conversion error: {e}") return audio_path # ══════════════════════════════════════════════════════════════════════ # CORE PIPELINE # ══════════════════════════════════════════════════════════════════════ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", opt_fillers=True, opt_stutters=True, opt_silences=True, opt_breaths=True, opt_mouth=True, job_id=None): def progress(step, message): update = {"status": "processing", "step": step, "message": message} if job_id: _update_job(job_id, **update) return update out_dir = tempfile.mkdtemp() try: yield progress(1, "Step 1/5 — Denoising...") denoise1 = denoiser.process( audio_path, out_dir, remove_fillers=False, remove_stutters=False, remove_silences=opt_silences, remove_breaths=opt_breaths, remove_mouth_sounds=opt_mouth, word_segments=None, ) clean1 = denoise1["audio_path"] stats = denoise1["stats"] yield progress(2, "Step 2/5 — Transcribing...") transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang) word_segs = transcriber._last_segments if (opt_fillers or opt_stutters) and word_segs: yield progress(3, "Step 3/5 — Removing fillers & stutters...") import soundfile as sf audio_data, sr = sf.read(clean1) if audio_data.ndim == 2: audio_data = audio_data.mean(axis=1) audio_data = audio_data.astype(np.float32) if opt_fillers: audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs) stats["fillers_removed"] = n_f transcript = denoiser.clean_transcript_fillers(transcript) if opt_stutters: audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs) stats["stutters_removed"] = n_s clean_wav = os.path.join(out_dir, "clean_step3.wav") sf.write(clean_wav, audio_data, sr, format="WAV", subtype="PCM_24") clean1 = clean_wav else: stats["fillers_removed"] = 0 stats["stutters_removed"] = 0 translation = transcript tl_method = "same language" if tgt_lang != "auto" and detected_lang != tgt_lang: yield progress(4, "Step 4/5 — Translating...") translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) yield progress(5, "Step 5/5 — Summarizing...") summary = translator.summarize(transcript) # Upload to Cloudinary enhanced_url = None try: upload_result = cloudinary.uploader.upload( clean1, resource_type="video", folder="clearwave_enhanced", ) enhanced_url = upload_result["secure_url"] logger.info(f"Uploaded to Cloudinary: {enhanced_url}") except Exception as e: logger.error(f"Cloudinary upload failed: {e}") result = { "status": "done", "step": 5, "message": "Done!", "transcript": transcript, "translation": translation, "summary": summary, "enhancedAudio": enhanced_url, "stats": { "language": detected_lang.upper(), "noise_method": stats.get("noise_method", "noisereduce"), "fillers_removed": stats.get("fillers_removed", 0), "stutters_removed": stats.get("stutters_removed", 0), "silences_removed_sec": stats.get("silences_removed_sec", 0), "breaths_reduced": stats.get("breaths_reduced", False), "mouth_sounds_removed": stats.get("mouth_sounds_removed", 0), "transcription_method": t_method, "translation_method": tl_method, "processing_sec": stats.get("processing_sec", 0), "word_segments": len(word_segs), "transcript_words": len(transcript.split()), }, } if job_id: _update_job(job_id, status="done", step=5, message="Done!", result=result) yield result except Exception as e: logger.error(f"Pipeline failed: {e}", exc_info=True) err = {"status": "error", "message": f"Error: {str(e)}"} if job_id: _update_job(job_id, **err) yield err # ══════════════════════════════════════════════════════════════════════ # ROUTES # ══════════════════════════════════════════════════════════════════════ @app.get("/api/health") async def health(): return JSONResponse({ "status": "ok", "service": "ClearWave AI API", "jobs_active": len(_jobs), }) @app.post("/api/process-url") async def process_url(request: Request): """ Submit audio for processing. Returns jobId immediately — no timeout issues. Poll GET /api/job/{jobId} for progress and result. """ data = await request.json() audio_url = data.get("audioUrl") audio_id = data.get("audioId", "") src_lang = data.get("srcLang", "auto") tgt_lang = data.get("tgtLang", "te") opt_fillers = data.get("optFillers", True) opt_stutters = data.get("optStutters", True) opt_silences = data.get("optSilences", True) opt_breaths = data.get("optBreaths", True) opt_mouth = data.get("optMouth", True) if not audio_url: return JSONResponse({"error": "audioUrl is required"}, status_code=400) job_id = _new_job() _update_job(job_id, status="downloading", message="Downloading audio...") def _download_and_run(): tmp_path = None audio_path = None try: resp = requests.get(audio_url, timeout=300, stream=True) resp.raise_for_status() url_lower = audio_url.lower() if "wav" in url_lower: suffix = ".wav" elif "mpeg" in url_lower: suffix = ".mpeg" elif "mp4" in url_lower: suffix = ".mp4" elif "m4a" in url_lower: suffix = ".m4a" else: suffix = ".mp3" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tmp_path = tmp.name downloaded = 0 total = int(resp.headers.get("content-length", 0)) for chunk in resp.iter_content(chunk_size=65536): if chunk: tmp.write(chunk) downloaded += len(chunk) if total: pct = int(downloaded * 100 / total) _update_job(job_id, status="downloading", message=f"Downloading... {pct}%") tmp.close() audio_path = convert_to_wav(tmp_path) for _ in run_pipeline( audio_path, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences, opt_breaths, opt_mouth, job_id=job_id ): pass with _jobs_lock: if job_id in _jobs and _jobs[job_id].get("result"): _jobs[job_id]["result"]["audioId"] = audio_id except Exception as e: logger.error(f"Job {job_id} failed: {e}", exc_info=True) _update_job(job_id, status="error", message=f"Error: {str(e)}") finally: for p in [tmp_path, audio_path]: try: if p and os.path.exists(p): os.unlink(p) except Exception: pass threading.Thread(target=_download_and_run, daemon=True).start() return JSONResponse({ "jobId": job_id, "audioId": audio_id, "status": "queued", "pollUrl": f"/api/job/{job_id}", "message": "Job started! Poll pollUrl every 3-5 seconds for progress.", }) @app.get("/api/job/{job_id}") async def get_job(job_id: str): """ Poll for job status and result. status: queued | downloading | processing | done | error result: available only when status=done """ job = _get_job(job_id) if not job: return JSONResponse({"error": "Job not found"}, status_code=404) response = { "jobId": job_id, "status": job.get("status"), "step": job.get("step", 0), "message": job.get("message", ""), } if job.get("status") == "done": response["result"] = job.get("result", {}) return JSONResponse(response) @app.get("/api/jobs") async def list_jobs(): """List all active jobs — useful for debugging.""" with _jobs_lock: summary = { k: {"status": v["status"], "step": v.get("step", 0), "message": v.get("message", "")} for k, v in _jobs.items() } return JSONResponse({"jobs": summary, "total": len(summary)})