clearwave-ai / app.py
testingfaces's picture
Update app.py
a229c78 verified
"""
ClearWave AI — HuggingFace Spaces
Gradio UI + FastAPI routes
BACKGROUND JOB SYSTEM:
- POST /api/process-url → returns {jobId} instantly (no timeout)
- GET /api/job/{jobId} → poll for progress / result
- Jobs run in background threads — handles 1hr+ audio safely
- Job results stored in memory for 1 hour then auto-cleaned
- Gradio UI uses same background thread approach
"""
import os
import uuid
import json
import base64
import tempfile
import logging
import subprocess
import threading
import time
import requests
import numpy as np
import gradio as gr
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from denoiser import Denoiser
from transcriber import Transcriber
from translator import Translator
denoiser = Denoiser()
transcriber = Transcriber()
translator = Translator()
LANGUAGES_DISPLAY = {
"Auto Detect": "auto",
"English": "en", "Telugu": "te",
"Hindi": "hi", "Tamil": "ta",
"Kannada": "kn", "Spanish": "es",
"French": "fr", "German": "de",
"Japanese": "ja", "Chinese": "zh",
"Arabic": "ar",
}
OUT_LANGS = {k: v for k, v in LANGUAGES_DISPLAY.items() if k != "Auto Detect"}
# ══════════════════════════════════════════════════════════════════════
# JOB STORE — in-memory job registry
# ══════════════════════════════════════════════════════════════════════
_jobs: dict = {}
_jobs_lock = threading.Lock()
JOB_TTL_SEC = 3600 # keep results for 1 hour
def _new_job() -> str:
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {
"status": "queued",
"step": 0,
"message": "Queued...",
"result": None,
"created_at": time.time(),
}
return job_id
def _update_job(job_id: str, **kwargs):
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id].update(kwargs)
def _get_job(job_id: str) -> dict:
with _jobs_lock:
return dict(_jobs.get(job_id, {}))
def _cleanup_loop():
"""Remove jobs older than JOB_TTL_SEC — runs every 5 minutes."""
while True:
time.sleep(300)
now = time.time()
with _jobs_lock:
expired = [k for k, v in _jobs.items()
if now - v.get("created_at", 0) > JOB_TTL_SEC]
for k in expired:
del _jobs[k]
if expired:
logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs")
threading.Thread(target=_cleanup_loop, daemon=True).start()
# ══════════════════════════════════════════════════════════════════════
# AUDIO FORMAT CONVERTER
# ══════════════════════════════════════════════════════════════════════
def convert_to_wav(audio_path: str) -> str:
if audio_path is None:
return audio_path
ext = os.path.splitext(audio_path)[1].lower()
if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]:
return audio_path
try:
converted = audio_path + "_converted.wav"
result = subprocess.run([
"ffmpeg", "-y", "-i", audio_path,
"-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted
], capture_output=True)
if result.returncode == 0 and os.path.exists(converted):
logger.info(f"Converted {ext} → .wav")
return converted
except Exception as e:
logger.warning(f"Conversion error: {e}")
return audio_path
# ══════════════════════════════════════════════════════════════════════
# CORE PIPELINE
# ══════════════════════════════════════════════════════════════════════
def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
opt_fillers=True, opt_stutters=True, opt_silences=True,
opt_breaths=True, opt_mouth=True, job_id=None):
def progress(step, message):
update = {"status": "processing", "step": step, "message": message}
if job_id:
_update_job(job_id, **update)
return update
out_dir = tempfile.mkdtemp()
try:
yield progress(1, "⏳ Step 1/5 — Denoising...")
denoise1 = denoiser.process(
audio_path, out_dir,
remove_fillers=False, remove_stutters=False,
remove_silences=opt_silences, remove_breaths=opt_breaths,
remove_mouth_sounds=opt_mouth, word_segments=None,
)
clean1 = denoise1['audio_path']
stats = denoise1['stats']
yield progress(2, "⏳ Step 2/5 — Transcribing...")
transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang)
word_segs = transcriber._last_segments
if (opt_fillers or opt_stutters) and word_segs:
yield progress(3, "⏳ Step 3/5 — Removing fillers & stutters...")
import soundfile as sf
audio_data, sr = sf.read(clean1)
if audio_data.ndim == 2:
audio_data = audio_data.mean(axis=1)
audio_data = audio_data.astype(np.float32)
if opt_fillers:
audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs)
stats['fillers_removed'] = n_f
transcript = denoiser.clean_transcript_fillers(transcript)
if opt_stutters:
audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs)
stats['stutters_removed'] = n_s
sf.write(clean1, audio_data, sr, subtype="PCM_24")
else:
stats['fillers_removed'] = 0
stats['stutters_removed'] = 0
translation = transcript
tl_method = "same language"
if tgt_lang != "auto" and detected_lang != tgt_lang:
yield progress(4, "⏳ Step 4/5 — Translating...")
translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
yield progress(5, "⏳ Step 5/5 — Summarizing...")
summary = translator.summarize(transcript)
with open(clean1, "rb") as f:
enhanced_b64 = base64.b64encode(f.read()).decode("utf-8")
result = {
"status": "done",
"step": 5,
"message": "✅ Done!",
"transcript": transcript,
"translation": translation,
"summary": summary,
"audioPath": clean1,
"enhancedAudio": enhanced_b64,
"stats": {
"language": detected_lang.upper(),
"noise_method": stats.get("noise_method", "noisereduce"),
"fillers_removed": stats.get("fillers_removed", 0),
"stutters_removed": stats.get("stutters_removed", 0),
"silences_removed_sec": stats.get("silences_removed_sec", 0),
"breaths_reduced": stats.get("breaths_reduced", False),
"mouth_sounds_removed": stats.get("mouth_sounds_removed", 0),
"transcription_method": t_method,
"translation_method": tl_method,
"processing_sec": stats.get("processing_sec", 0),
"word_segments": len(word_segs),
"transcript_words": len(transcript.split()),
},
}
if job_id:
_update_job(job_id, status="done", step=5,
message="✅ Done!", result=result)
yield result
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
err = {"status": "error", "message": f"❌ Error: {str(e)}"}
if job_id:
_update_job(job_id, **err)
yield err
# ══════════════════════════════════════════════════════════════════════
# BACKGROUND WORKER
# ══════════════════════════════════════════════════════════════════════
def _run_job_in_background(job_id, audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth):
try:
for _ in run_pipeline(
audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth, job_id=job_id
):
pass
except Exception as e:
_update_job(job_id, status="error", message=f"❌ {e}")
finally:
try:
os.unlink(audio_path)
except Exception:
pass
# ══════════════════════════════════════════════════════════════════════
# GRADIO UI
# ══════════════════════════════════════════════════════════════════════
def process_audio_gradio(audio_path, in_lang_name, out_lang_name,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth):
if audio_path is None:
yield ("❌ Please upload an audio file.", "", "", None, "", "")
return
if isinstance(audio_path, dict):
audio_path = audio_path.get("name") or audio_path.get("path", "")
audio_path = convert_to_wav(audio_path)
src_lang = LANGUAGES_DISPLAY.get(in_lang_name, "auto")
tgt_lang = LANGUAGES_DISPLAY.get(out_lang_name, "te")
# Start background job
job_id = _new_job()
threading.Thread(
target=_run_job_in_background,
args=(job_id, audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth),
daemon=True,
).start()
# Poll and stream progress to Gradio UI
while True:
time.sleep(2)
job = _get_job(job_id)
if not job:
yield ("❌ Job not found.", "", "", None, "", "")
return
status = job.get("status")
message = job.get("message", "Processing...")
if status in ("queued", "downloading", "processing"):
yield (message, "", "", None, "", "")
elif status == "done":
result = job.get("result", {})
s = result.get("stats", {})
stats_str = "\n".join([
f"🎙️ Language : {s.get('language','?')}",
f"🔊 Noise method : {s.get('noise_method','?')}",
f"🗑️ Fillers : {s.get('fillers_removed', 0)} removed",
f"🔁 Stutters : {s.get('stutters_removed', 0)} removed",
f"🤫 Silences : {s.get('silences_removed_sec', 0):.1f}s removed",
f"📝 Transcription : {s.get('transcription_method','?')}",
f"🌐 Translation : {s.get('translation_method','?')}",
f"⏱️ Total time : {s.get('processing_sec', 0):.1f}s",
])
yield (result.get("message", "✅ Done!"),
result.get("transcript", ""),
result.get("translation", ""),
result.get("audioPath"),
stats_str,
result.get("summary", ""))
return
elif status == "error":
yield (job.get("message", "❌ Error"), "", "", None, "Failed.", "")
return
with gr.Blocks(title="ClearWave AI") as demo:
gr.Markdown("# 🎵 ClearWave AI\n### Professional Audio Enhancement — handles 1hr+ audio!")
with gr.Row():
with gr.Column(scale=1):
audio_in = gr.File(
label="📁 Upload Audio (MP3, WAV, MPEG, MP4, AAC, OGG, FLAC, AMR...)",
file_types=[
".mp3", ".wav", ".mpeg", ".mpg", ".mp4", ".m4a",
".aac", ".ogg", ".flac", ".opus", ".webm", ".amr",
".wma", ".aiff", ".aif", ".midi", ".mid",
],
)
with gr.Row():
in_lang = gr.Dropdown(label="Input Language",
choices=list(LANGUAGES_DISPLAY.keys()),
value="Auto Detect")
out_lang = gr.Dropdown(label="Output Language",
choices=list(OUT_LANGS.keys()),
value="Telugu")
gr.Markdown("### 🎛️ Options")
with gr.Row():
opt_fillers = gr.Checkbox(label="🗑️ Remove Fillers", value=True)
opt_stutters = gr.Checkbox(label="🔁 Remove Stutters", value=True)
with gr.Row():
opt_silences = gr.Checkbox(label="🤫 Remove Silences", value=True)
opt_breaths = gr.Checkbox(label="💨 Reduce Breaths", value=True)
with gr.Row():
opt_mouth = gr.Checkbox(label="👄 Reduce Mouth Sounds", value=True)
process_btn = gr.Button("🚀 Process Audio", variant="primary", size="lg")
status_out = gr.Textbox(label="Status", interactive=False, lines=2)
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("📝 Text"):
with gr.Row():
transcript_out = gr.Textbox(label="Transcript", lines=14)
translation_out = gr.Textbox(label="Translation", lines=14)
with gr.Tab("🔊 Clean Audio"):
clean_audio_out = gr.Audio(label="Enhanced Audio", type="filepath")
with gr.Tab("📊 Stats"):
stats_out = gr.Textbox(label="Stats", lines=12, interactive=False)
with gr.Tab("📋 Summary"):
summary_out = gr.Textbox(label="Summary", lines=14)
process_btn.click(
fn=process_audio_gradio,
inputs=[audio_in, in_lang, out_lang, opt_fillers, opt_stutters,
opt_silences, opt_breaths, opt_mouth],
outputs=[status_out, transcript_out, translation_out,
clean_audio_out, stats_out, summary_out]
)
# ══════════════════════════════════════════════════════════════════════
# API ROUTES
# ══════════════════════════════════════════════════════════════════════
import json as _json
from fastapi import Request as _Request
from fastapi.responses import JSONResponse as _JSONResponse
@demo.app.get("/api/health")
async def api_health():
return _JSONResponse({
"status": "ok",
"service": "ClearWave AI on HuggingFace",
"jobs_active": len(_jobs),
})
@demo.app.post("/api/process-url")
async def api_process_url(request: _Request):
"""
Instantly returns a jobId.
Client polls GET /api/job/{jobId} for progress and result.
No timeout issues — works for 1hr+ audio.
"""
data = await request.json()
if "data" in data and isinstance(data["data"], dict):
data = data["data"]
audio_url = data.get("audioUrl")
audio_id = data.get("audioId", "")
src_lang = data.get("srcLang", "auto")
tgt_lang = data.get("tgtLang", "te")
opt_fillers = data.get("optFillers", True)
opt_stutters = data.get("optStutters", True)
opt_silences = data.get("optSilences", True)
opt_breaths = data.get("optBreaths", True)
opt_mouth = data.get("optMouth", True)
if not audio_url:
return _JSONResponse({"error": "audioUrl is required"}, status_code=400)
job_id = _new_job()
_update_job(job_id, status="downloading", message="Downloading audio...")
def _download_and_run():
tmp_path = None
audio_path = None
try:
# Download
resp = requests.get(audio_url, timeout=300, stream=True)
resp.raise_for_status()
url_lower = audio_url.lower()
if "wav" in url_lower: suffix = ".wav"
elif "mpeg" in url_lower: suffix = ".mpeg"
elif "mp4" in url_lower: suffix = ".mp4"
elif "m4a" in url_lower: suffix = ".m4a"
else: suffix = ".mp3"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp_path = tmp.name
downloaded = 0
total = int(resp.headers.get("content-length", 0))
for chunk in resp.iter_content(chunk_size=65536):
if chunk:
tmp.write(chunk)
downloaded += len(chunk)
if total:
pct = int(downloaded * 100 / total)
_update_job(job_id, status="downloading",
message=f"Downloading... {pct}%")
tmp.close()
# Convert format
audio_path = convert_to_wav(tmp_path)
# Run pipeline
for _ in run_pipeline(
audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth, job_id=job_id
):
pass
# Tag result with audioId
with _jobs_lock:
if job_id in _jobs and _jobs[job_id].get("result"):
_jobs[job_id]["result"]["audioId"] = audio_id
except Exception as e:
logger.error(f"Job {job_id} failed: {e}", exc_info=True)
_update_job(job_id, status="error", message=f"❌ Error: {str(e)}")
finally:
for p in [tmp_path, audio_path]:
try:
if p and os.path.exists(p):
os.unlink(p)
except Exception:
pass
threading.Thread(target=_download_and_run, daemon=True).start()
return _JSONResponse({
"jobId": job_id,
"audioId": audio_id,
"status": "queued",
"pollUrl": f"/api/job/{job_id}",
"message": "Job started! Poll pollUrl for progress.",
})
@demo.app.get("/api/job/{job_id}")
async def api_get_job(job_id: str):
"""
Poll this to get job progress.
When status=done, result contains full transcript/translation/audio.
"""
job = _get_job(job_id)
if not job:
return _JSONResponse({"error": "Job not found"}, status_code=404)
response = {
"jobId": job_id,
"status": job.get("status"),
"step": job.get("step", 0),
"message": job.get("message", ""),
}
if job.get("status") == "done":
response["result"] = job.get("result", {})
return _JSONResponse(response)
@demo.app.get("/api/jobs")
async def api_list_jobs():
"""List all active jobs."""
with _jobs_lock:
summary = {
k: {"status": v["status"], "step": v.get("step", 0),
"message": v.get("message", "")}
for k, v in _jobs.items()
}
return _JSONResponse({"jobs": summary, "total": len(summary)})
logger.info("✅ Routes: /api/health, /api/process-url, /api/job/{id}, /api/jobs")
if __name__ == "__main__":
demo.launch()