clearwave-api / main.py
testingfaces's picture
Update main.py
88439ab verified
"""
ClearWave AI β€” API Space (FastAPI only)
BACKGROUND JOB SYSTEM:
- POST /api/process-url β†’ returns {jobId} instantly β€” NO timeout issues
- GET /api/job/{jobId} β†’ poll for progress and final result
- GET /api/jobs β†’ list all active jobs (debug)
- Jobs run in background threads β€” handles 1hr+ audio safely
- Results stored in memory for 1 hour then auto-cleaned
"""
import os
import uuid
import json
import tempfile
import logging
import threading
import time
import requests
import numpy as np
import subprocess
import cloudinary
import cloudinary.uploader
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
cloudinary.config(
cloud_name = os.environ.get("CLOUD_NAME"),
api_key = os.environ.get("API_KEY"),
api_secret = os.environ.get("API_SECRET"),
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from denoiser import Denoiser
from transcriber import Transcriber
from translator import Translator
denoiser = Denoiser()
transcriber = Transcriber()
translator = Translator()
app = FastAPI(title="ClearWave AI API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ══════════════════════════════════════════════════════════════════════
# JOB STORE
# ══════════════════════════════════════════════════════════════════════
_jobs: dict = {}
_jobs_lock = threading.Lock()
JOB_TTL_SEC = 3600
def _new_job() -> str:
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {
"status": "queued",
"step": 0,
"message": "Queued...",
"result": None,
"created_at": time.time(),
}
return job_id
def _update_job(job_id: str, **kwargs):
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id].update(kwargs)
def _get_job(job_id: str) -> dict:
with _jobs_lock:
return dict(_jobs.get(job_id, {}))
def _cleanup_loop():
while True:
time.sleep(300)
now = time.time()
with _jobs_lock:
expired = [k for k, v in _jobs.items()
if now - v.get("created_at", 0) > JOB_TTL_SEC]
for k in expired:
del _jobs[k]
if expired:
logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs")
threading.Thread(target=_cleanup_loop, daemon=True).start()
# ══════════════════════════════════════════════════════════════════════
# AUDIO FORMAT CONVERTER
# ══════════════════════════════════════════════════════════════════════
def convert_to_wav(audio_path: str) -> str:
if audio_path is None:
return audio_path
ext = os.path.splitext(audio_path)[1].lower()
if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]:
return audio_path
try:
converted = audio_path + "_converted.wav"
result = subprocess.run([
"ffmpeg", "-y", "-i", audio_path,
"-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted
], capture_output=True)
if result.returncode == 0 and os.path.exists(converted):
logger.info(f"Converted {ext} β†’ .wav")
return converted
except Exception as e:
logger.warning(f"Conversion error: {e}")
return audio_path
# ══════════════════════════════════════════════════════════════════════
# CORE PIPELINE
# ══════════════════════════════════════════════════════════════════════
def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
opt_fillers=True, opt_stutters=True, opt_silences=True,
opt_breaths=True, opt_mouth=True, job_id=None):
def progress(step, message):
update = {"status": "processing", "step": step, "message": message}
if job_id:
_update_job(job_id, **update)
return update
out_dir = tempfile.mkdtemp()
try:
yield progress(1, "Step 1/5 β€” Denoising...")
denoise1 = denoiser.process(
audio_path, out_dir,
remove_fillers=False, remove_stutters=False,
remove_silences=opt_silences, remove_breaths=opt_breaths,
remove_mouth_sounds=opt_mouth, word_segments=None,
)
clean1 = denoise1["audio_path"]
stats = denoise1["stats"]
yield progress(2, "Step 2/5 β€” Transcribing...")
transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang)
word_segs = transcriber._last_segments
if (opt_fillers or opt_stutters) and word_segs:
yield progress(3, "Step 3/5 β€” Removing fillers & stutters...")
import soundfile as sf
audio_data, sr = sf.read(clean1)
if audio_data.ndim == 2:
audio_data = audio_data.mean(axis=1)
audio_data = audio_data.astype(np.float32)
if opt_fillers:
audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs)
stats["fillers_removed"] = n_f
transcript = denoiser.clean_transcript_fillers(transcript)
if opt_stutters:
audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs)
stats["stutters_removed"] = n_s
clean_wav = os.path.join(out_dir, "clean_step3.wav")
sf.write(clean_wav, audio_data, sr, format="WAV", subtype="PCM_24")
clean1 = clean_wav
else:
stats["fillers_removed"] = 0
stats["stutters_removed"] = 0
translation = transcript
tl_method = "same language"
if tgt_lang != "auto" and detected_lang != tgt_lang:
yield progress(4, "Step 4/5 β€” Translating...")
translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
yield progress(5, "Step 5/5 β€” Summarizing...")
summary = translator.summarize(transcript)
# Upload to Cloudinary
enhanced_url = None
try:
upload_result = cloudinary.uploader.upload(
clean1,
resource_type="video",
folder="clearwave_enhanced",
)
enhanced_url = upload_result["secure_url"]
logger.info(f"Uploaded to Cloudinary: {enhanced_url}")
except Exception as e:
logger.error(f"Cloudinary upload failed: {e}")
result = {
"status": "done",
"step": 5,
"message": "Done!",
"transcript": transcript,
"translation": translation,
"summary": summary,
"enhancedAudio": enhanced_url,
"stats": {
"language": detected_lang.upper(),
"noise_method": stats.get("noise_method", "noisereduce"),
"fillers_removed": stats.get("fillers_removed", 0),
"stutters_removed": stats.get("stutters_removed", 0),
"silences_removed_sec": stats.get("silences_removed_sec", 0),
"breaths_reduced": stats.get("breaths_reduced", False),
"mouth_sounds_removed": stats.get("mouth_sounds_removed", 0),
"transcription_method": t_method,
"translation_method": tl_method,
"processing_sec": stats.get("processing_sec", 0),
"word_segments": len(word_segs),
"transcript_words": len(transcript.split()),
},
}
if job_id:
_update_job(job_id, status="done", step=5,
message="Done!", result=result)
yield result
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
err = {"status": "error", "message": f"Error: {str(e)}"}
if job_id:
_update_job(job_id, **err)
yield err
# ══════════════════════════════════════════════════════════════════════
# ROUTES
# ══════════════════════════════════════════════════════════════════════
@app.get("/api/health")
async def health():
return JSONResponse({
"status": "ok",
"service": "ClearWave AI API",
"jobs_active": len(_jobs),
})
@app.post("/api/process-url")
async def process_url(request: Request):
"""
Submit audio for processing.
Returns jobId immediately β€” no timeout issues.
Poll GET /api/job/{jobId} for progress and result.
"""
data = await request.json()
audio_url = data.get("audioUrl")
audio_id = data.get("audioId", "")
src_lang = data.get("srcLang", "auto")
tgt_lang = data.get("tgtLang", "te")
opt_fillers = data.get("optFillers", True)
opt_stutters = data.get("optStutters", True)
opt_silences = data.get("optSilences", True)
opt_breaths = data.get("optBreaths", True)
opt_mouth = data.get("optMouth", True)
if not audio_url:
return JSONResponse({"error": "audioUrl is required"}, status_code=400)
job_id = _new_job()
_update_job(job_id, status="downloading", message="Downloading audio...")
def _download_and_run():
tmp_path = None
audio_path = None
try:
resp = requests.get(audio_url, timeout=300, stream=True)
resp.raise_for_status()
url_lower = audio_url.lower()
if "wav" in url_lower: suffix = ".wav"
elif "mpeg" in url_lower: suffix = ".mpeg"
elif "mp4" in url_lower: suffix = ".mp4"
elif "m4a" in url_lower: suffix = ".m4a"
else: suffix = ".mp3"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp_path = tmp.name
downloaded = 0
total = int(resp.headers.get("content-length", 0))
for chunk in resp.iter_content(chunk_size=65536):
if chunk:
tmp.write(chunk)
downloaded += len(chunk)
if total:
pct = int(downloaded * 100 / total)
_update_job(job_id, status="downloading",
message=f"Downloading... {pct}%")
tmp.close()
audio_path = convert_to_wav(tmp_path)
for _ in run_pipeline(
audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth, job_id=job_id
):
pass
with _jobs_lock:
if job_id in _jobs and _jobs[job_id].get("result"):
_jobs[job_id]["result"]["audioId"] = audio_id
except Exception as e:
logger.error(f"Job {job_id} failed: {e}", exc_info=True)
_update_job(job_id, status="error", message=f"Error: {str(e)}")
finally:
for p in [tmp_path, audio_path]:
try:
if p and os.path.exists(p):
os.unlink(p)
except Exception:
pass
threading.Thread(target=_download_and_run, daemon=True).start()
return JSONResponse({
"jobId": job_id,
"audioId": audio_id,
"status": "queued",
"pollUrl": f"/api/job/{job_id}",
"message": "Job started! Poll pollUrl every 3-5 seconds for progress.",
})
@app.get("/api/job/{job_id}")
async def get_job(job_id: str):
"""
Poll for job status and result.
status: queued | downloading | processing | done | error
result: available only when status=done
"""
job = _get_job(job_id)
if not job:
return JSONResponse({"error": "Job not found"}, status_code=404)
response = {
"jobId": job_id,
"status": job.get("status"),
"step": job.get("step", 0),
"message": job.get("message", ""),
}
if job.get("status") == "done":
response["result"] = job.get("result", {})
return JSONResponse(response)
@app.get("/api/jobs")
async def list_jobs():
"""List all active jobs β€” useful for debugging."""
with _jobs_lock:
summary = {
k: {"status": v["status"], "step": v.get("step", 0),
"message": v.get("message", "")}
for k, v in _jobs.items()
}
return JSONResponse({"jobs": summary, "total": len(summary)})