testingfaces commited on
Commit
88439ab
Β·
verified Β·
1 Parent(s): 9716505

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +207 -55
main.py CHANGED
@@ -1,22 +1,30 @@
1
  """
2
  ClearWave AI β€” API Space (FastAPI only)
3
- Handles /api/health and /api/process-url
4
- No Gradio, no routing conflicts.
 
 
 
 
 
5
  """
6
 
7
  import os
 
8
  import json
9
  import tempfile
10
  import logging
 
 
11
  import requests
12
  import numpy as np
 
13
  import cloudinary
14
  import cloudinary.uploader
15
  from fastapi import FastAPI, Request
16
- from fastapi.responses import StreamingResponse, JSONResponse
17
  from fastapi.middleware.cors import CORSMiddleware
18
 
19
- # Cloudinary config β€” set these in your HF Space secrets
20
  cloudinary.config(
21
  cloud_name = os.environ.get("CLOUD_NAME"),
22
  api_key = os.environ.get("API_KEY"),
@@ -35,7 +43,6 @@ transcriber = Transcriber()
35
  translator = Translator()
36
 
37
  app = FastAPI(title="ClearWave AI API")
38
-
39
  app.add_middleware(
40
  CORSMiddleware,
41
  allow_origins=["*"],
@@ -44,14 +51,92 @@ app.add_middleware(
44
  )
45
 
46
  # ══════════════════════════════════════════════════════════════════════
47
- # PIPELINE
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  # ══════════════════════════════════════════════════════════════════════
49
  def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
50
  opt_fillers=True, opt_stutters=True, opt_silences=True,
51
- opt_breaths=True, opt_mouth=True):
 
 
 
 
 
 
 
52
  out_dir = tempfile.mkdtemp()
53
  try:
54
- yield {"status": "processing", "step": 1, "message": "Step 1/5 β€” Denoising..."}
55
  denoise1 = denoiser.process(
56
  audio_path, out_dir,
57
  remove_fillers=False, remove_stutters=False,
@@ -61,14 +146,13 @@ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
61
  clean1 = denoise1["audio_path"]
62
  stats = denoise1["stats"]
63
 
64
- yield {"status": "processing", "step": 2, "message": "Step 2/5 β€” Transcribing..."}
65
  transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang)
66
  word_segs = transcriber._last_segments
67
 
68
  if (opt_fillers or opt_stutters) and word_segs:
69
- yield {"status": "processing", "step": 3, "message": "Step 3/5 β€” Removing fillers & stutters..."}
70
  import soundfile as sf
71
- # Read the denoised audio β€” soundfile can read both WAV and MP3
72
  audio_data, sr = sf.read(clean1)
73
  if audio_data.ndim == 2:
74
  audio_data = audio_data.mean(axis=1)
@@ -80,10 +164,9 @@ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
80
  if opt_stutters:
81
  audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs)
82
  stats["stutters_removed"] = n_s
83
- # Write to a fresh .wav β€” PCM_24 is WAV-only, never write to .mp3 path
84
  clean_wav = os.path.join(out_dir, "clean_step3.wav")
85
  sf.write(clean_wav, audio_data, sr, format="WAV", subtype="PCM_24")
86
- clean1 = clean_wav # downstream steps (Cloudinary upload) use this
87
  else:
88
  stats["fillers_removed"] = 0
89
  stats["stutters_removed"] = 0
@@ -91,28 +174,26 @@ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
91
  translation = transcript
92
  tl_method = "same language"
93
  if tgt_lang != "auto" and detected_lang != tgt_lang:
94
- yield {"status": "processing", "step": 4, "message": "Step 4/5 β€” Translating..."}
95
  translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
96
 
97
- yield {"status": "processing", "step": 5, "message": "Step 5/5 β€” Summarizing..."}
98
  summary = translator.summarize(transcript)
99
 
100
- # Upload enhanced audio to Cloudinary β€” returns a URL instead of base64.
101
- # This keeps the done SSE event tiny (~200 bytes) instead of ~700KB,
102
- # which was causing the JSON to be split across 85+ TCP chunks.
103
  try:
104
  upload_result = cloudinary.uploader.upload(
105
  clean1,
106
- resource_type = "video", # Cloudinary uses "video" for audio
107
- folder = "clearwave_enhanced",
108
  )
109
  enhanced_url = upload_result["secure_url"]
110
- logger.info(f"Enhanced audio uploaded: {enhanced_url}")
111
  except Exception as e:
112
  logger.error(f"Cloudinary upload failed: {e}")
113
- enhanced_url = None
114
 
115
- yield {
116
  "status": "done",
117
  "step": 5,
118
  "message": "Done!",
@@ -135,9 +216,18 @@ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
135
  "transcript_words": len(transcript.split()),
136
  },
137
  }
 
 
 
 
 
 
138
  except Exception as e:
139
  logger.error(f"Pipeline failed: {e}", exc_info=True)
140
- yield {"status": "error", "message": f"Error: {str(e)}"}
 
 
 
141
 
142
 
143
  # ══════════════════════════════════════════════════════════════════════
@@ -145,11 +235,20 @@ def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
145
  # ══════════════════════════════════════════════════════════════════════
146
  @app.get("/api/health")
147
  async def health():
148
- return JSONResponse({"status": "ok", "service": "ClearWave AI API"})
 
 
 
 
149
 
150
 
151
  @app.post("/api/process-url")
152
  async def process_url(request: Request):
 
 
 
 
 
153
  data = await request.json()
154
  audio_url = data.get("audioUrl")
155
  audio_id = data.get("audioId", "")
@@ -164,20 +263,24 @@ async def process_url(request: Request):
164
  if not audio_url:
165
  return JSONResponse({"error": "audioUrl is required"}, status_code=400)
166
 
167
- async def generate():
168
- import sys
169
-
170
- def sse(obj):
171
- sys.stdout.flush()
172
- return "data: " + json.dumps(obj) + "\n\n"
173
-
174
- yield sse({"status": "processing", "step": 0, "message": "Downloading audio..."})
175
 
 
 
 
176
  try:
177
- resp = requests.get(audio_url, timeout=60, stream=True)
178
  resp.raise_for_status()
179
- suffix = ".wav" if "wav" in audio_url.lower() else ".mp3"
180
- tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
 
 
 
 
 
 
 
181
  downloaded = 0
182
  total = int(resp.headers.get("content-length", 0))
183
  for chunk in resp.iter_content(chunk_size=65536):
@@ -186,26 +289,75 @@ async def process_url(request: Request):
186
  downloaded += len(chunk)
187
  if total:
188
  pct = int(downloaded * 100 / total)
189
- yield sse({"status": "processing", "step": 0,
190
- "message": "Downloading... " + str(pct) + "%"})
191
  tmp.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
  except Exception as e:
193
- yield sse({"status": "error", "message": "Download failed: " + str(e)})
194
- return
 
 
 
 
 
 
 
195
 
196
- for result in run_pipeline(tmp.name, src_lang, tgt_lang,
197
- opt_fillers, opt_stutters, opt_silences,
198
- opt_breaths, opt_mouth):
199
- result["audioId"] = audio_id
200
- yield sse(result)
201
 
202
- try:
203
- os.unlink(tmp.name)
204
- except Exception:
205
- pass
206
-
207
- return StreamingResponse(
208
- generate(),
209
- media_type="text/event-stream",
210
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
211
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
  ClearWave AI β€” API Space (FastAPI only)
3
+
4
+ BACKGROUND JOB SYSTEM:
5
+ - POST /api/process-url β†’ returns {jobId} instantly β€” NO timeout issues
6
+ - GET /api/job/{jobId} β†’ poll for progress and final result
7
+ - GET /api/jobs β†’ list all active jobs (debug)
8
+ - Jobs run in background threads β€” handles 1hr+ audio safely
9
+ - Results stored in memory for 1 hour then auto-cleaned
10
  """
11
 
12
  import os
13
+ import uuid
14
  import json
15
  import tempfile
16
  import logging
17
+ import threading
18
+ import time
19
  import requests
20
  import numpy as np
21
+ import subprocess
22
  import cloudinary
23
  import cloudinary.uploader
24
  from fastapi import FastAPI, Request
25
+ from fastapi.responses import JSONResponse
26
  from fastapi.middleware.cors import CORSMiddleware
27
 
 
28
  cloudinary.config(
29
  cloud_name = os.environ.get("CLOUD_NAME"),
30
  api_key = os.environ.get("API_KEY"),
 
43
  translator = Translator()
44
 
45
  app = FastAPI(title="ClearWave AI API")
 
46
  app.add_middleware(
47
  CORSMiddleware,
48
  allow_origins=["*"],
 
51
  )
52
 
53
  # ══════════════════════════════════════════════════════════════════════
54
+ # JOB STORE
55
+ # ══════════════════════════════════════════════════════════════════════
56
+ _jobs: dict = {}
57
+ _jobs_lock = threading.Lock()
58
+ JOB_TTL_SEC = 3600
59
+
60
+
61
+ def _new_job() -> str:
62
+ job_id = str(uuid.uuid4())
63
+ with _jobs_lock:
64
+ _jobs[job_id] = {
65
+ "status": "queued",
66
+ "step": 0,
67
+ "message": "Queued...",
68
+ "result": None,
69
+ "created_at": time.time(),
70
+ }
71
+ return job_id
72
+
73
+
74
+ def _update_job(job_id: str, **kwargs):
75
+ with _jobs_lock:
76
+ if job_id in _jobs:
77
+ _jobs[job_id].update(kwargs)
78
+
79
+
80
+ def _get_job(job_id: str) -> dict:
81
+ with _jobs_lock:
82
+ return dict(_jobs.get(job_id, {}))
83
+
84
+
85
+ def _cleanup_loop():
86
+ while True:
87
+ time.sleep(300)
88
+ now = time.time()
89
+ with _jobs_lock:
90
+ expired = [k for k, v in _jobs.items()
91
+ if now - v.get("created_at", 0) > JOB_TTL_SEC]
92
+ for k in expired:
93
+ del _jobs[k]
94
+ if expired:
95
+ logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs")
96
+
97
+
98
+ threading.Thread(target=_cleanup_loop, daemon=True).start()
99
+
100
+
101
+ # ══════════════════════════════════════════════════════════════════════
102
+ # AUDIO FORMAT CONVERTER
103
+ # ══════════════════════════════════════════════════════════════════════
104
+ def convert_to_wav(audio_path: str) -> str:
105
+ if audio_path is None:
106
+ return audio_path
107
+ ext = os.path.splitext(audio_path)[1].lower()
108
+ if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]:
109
+ return audio_path
110
+ try:
111
+ converted = audio_path + "_converted.wav"
112
+ result = subprocess.run([
113
+ "ffmpeg", "-y", "-i", audio_path,
114
+ "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted
115
+ ], capture_output=True)
116
+ if result.returncode == 0 and os.path.exists(converted):
117
+ logger.info(f"Converted {ext} β†’ .wav")
118
+ return converted
119
+ except Exception as e:
120
+ logger.warning(f"Conversion error: {e}")
121
+ return audio_path
122
+
123
+
124
+ # ══════════════════════════════════════════════════════════════════════
125
+ # CORE PIPELINE
126
  # ══════════════════════════════════════════════════════════════════════
127
  def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
128
  opt_fillers=True, opt_stutters=True, opt_silences=True,
129
+ opt_breaths=True, opt_mouth=True, job_id=None):
130
+
131
+ def progress(step, message):
132
+ update = {"status": "processing", "step": step, "message": message}
133
+ if job_id:
134
+ _update_job(job_id, **update)
135
+ return update
136
+
137
  out_dir = tempfile.mkdtemp()
138
  try:
139
+ yield progress(1, "Step 1/5 β€” Denoising...")
140
  denoise1 = denoiser.process(
141
  audio_path, out_dir,
142
  remove_fillers=False, remove_stutters=False,
 
146
  clean1 = denoise1["audio_path"]
147
  stats = denoise1["stats"]
148
 
149
+ yield progress(2, "Step 2/5 β€” Transcribing...")
150
  transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang)
151
  word_segs = transcriber._last_segments
152
 
153
  if (opt_fillers or opt_stutters) and word_segs:
154
+ yield progress(3, "Step 3/5 β€” Removing fillers & stutters...")
155
  import soundfile as sf
 
156
  audio_data, sr = sf.read(clean1)
157
  if audio_data.ndim == 2:
158
  audio_data = audio_data.mean(axis=1)
 
164
  if opt_stutters:
165
  audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs)
166
  stats["stutters_removed"] = n_s
 
167
  clean_wav = os.path.join(out_dir, "clean_step3.wav")
168
  sf.write(clean_wav, audio_data, sr, format="WAV", subtype="PCM_24")
169
+ clean1 = clean_wav
170
  else:
171
  stats["fillers_removed"] = 0
172
  stats["stutters_removed"] = 0
 
174
  translation = transcript
175
  tl_method = "same language"
176
  if tgt_lang != "auto" and detected_lang != tgt_lang:
177
+ yield progress(4, "Step 4/5 β€” Translating...")
178
  translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
179
 
180
+ yield progress(5, "Step 5/5 β€” Summarizing...")
181
  summary = translator.summarize(transcript)
182
 
183
+ # Upload to Cloudinary
184
+ enhanced_url = None
 
185
  try:
186
  upload_result = cloudinary.uploader.upload(
187
  clean1,
188
+ resource_type="video",
189
+ folder="clearwave_enhanced",
190
  )
191
  enhanced_url = upload_result["secure_url"]
192
+ logger.info(f"Uploaded to Cloudinary: {enhanced_url}")
193
  except Exception as e:
194
  logger.error(f"Cloudinary upload failed: {e}")
 
195
 
196
+ result = {
197
  "status": "done",
198
  "step": 5,
199
  "message": "Done!",
 
216
  "transcript_words": len(transcript.split()),
217
  },
218
  }
219
+
220
+ if job_id:
221
+ _update_job(job_id, status="done", step=5,
222
+ message="Done!", result=result)
223
+ yield result
224
+
225
  except Exception as e:
226
  logger.error(f"Pipeline failed: {e}", exc_info=True)
227
+ err = {"status": "error", "message": f"Error: {str(e)}"}
228
+ if job_id:
229
+ _update_job(job_id, **err)
230
+ yield err
231
 
232
 
233
  # ══════════════════════════════════════════════════════════════════════
 
235
  # ══════════════════════════════════════════════════════════════════════
236
  @app.get("/api/health")
237
  async def health():
238
+ return JSONResponse({
239
+ "status": "ok",
240
+ "service": "ClearWave AI API",
241
+ "jobs_active": len(_jobs),
242
+ })
243
 
244
 
245
  @app.post("/api/process-url")
246
  async def process_url(request: Request):
247
+ """
248
+ Submit audio for processing.
249
+ Returns jobId immediately β€” no timeout issues.
250
+ Poll GET /api/job/{jobId} for progress and result.
251
+ """
252
  data = await request.json()
253
  audio_url = data.get("audioUrl")
254
  audio_id = data.get("audioId", "")
 
263
  if not audio_url:
264
  return JSONResponse({"error": "audioUrl is required"}, status_code=400)
265
 
266
+ job_id = _new_job()
267
+ _update_job(job_id, status="downloading", message="Downloading audio...")
 
 
 
 
 
 
268
 
269
+ def _download_and_run():
270
+ tmp_path = None
271
+ audio_path = None
272
  try:
273
+ resp = requests.get(audio_url, timeout=300, stream=True)
274
  resp.raise_for_status()
275
+ url_lower = audio_url.lower()
276
+ if "wav" in url_lower: suffix = ".wav"
277
+ elif "mpeg" in url_lower: suffix = ".mpeg"
278
+ elif "mp4" in url_lower: suffix = ".mp4"
279
+ elif "m4a" in url_lower: suffix = ".m4a"
280
+ else: suffix = ".mp3"
281
+
282
+ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
283
+ tmp_path = tmp.name
284
  downloaded = 0
285
  total = int(resp.headers.get("content-length", 0))
286
  for chunk in resp.iter_content(chunk_size=65536):
 
289
  downloaded += len(chunk)
290
  if total:
291
  pct = int(downloaded * 100 / total)
292
+ _update_job(job_id, status="downloading",
293
+ message=f"Downloading... {pct}%")
294
  tmp.close()
295
+
296
+ audio_path = convert_to_wav(tmp_path)
297
+
298
+ for _ in run_pipeline(
299
+ audio_path, src_lang, tgt_lang,
300
+ opt_fillers, opt_stutters, opt_silences,
301
+ opt_breaths, opt_mouth, job_id=job_id
302
+ ):
303
+ pass
304
+
305
+ with _jobs_lock:
306
+ if job_id in _jobs and _jobs[job_id].get("result"):
307
+ _jobs[job_id]["result"]["audioId"] = audio_id
308
+
309
  except Exception as e:
310
+ logger.error(f"Job {job_id} failed: {e}", exc_info=True)
311
+ _update_job(job_id, status="error", message=f"Error: {str(e)}")
312
+ finally:
313
+ for p in [tmp_path, audio_path]:
314
+ try:
315
+ if p and os.path.exists(p):
316
+ os.unlink(p)
317
+ except Exception:
318
+ pass
319
 
320
+ threading.Thread(target=_download_and_run, daemon=True).start()
 
 
 
 
321
 
322
+ return JSONResponse({
323
+ "jobId": job_id,
324
+ "audioId": audio_id,
325
+ "status": "queued",
326
+ "pollUrl": f"/api/job/{job_id}",
327
+ "message": "Job started! Poll pollUrl every 3-5 seconds for progress.",
328
+ })
329
+
330
+
331
+ @app.get("/api/job/{job_id}")
332
+ async def get_job(job_id: str):
333
+ """
334
+ Poll for job status and result.
335
+ status: queued | downloading | processing | done | error
336
+ result: available only when status=done
337
+ """
338
+ job = _get_job(job_id)
339
+ if not job:
340
+ return JSONResponse({"error": "Job not found"}, status_code=404)
341
+
342
+ response = {
343
+ "jobId": job_id,
344
+ "status": job.get("status"),
345
+ "step": job.get("step", 0),
346
+ "message": job.get("message", ""),
347
+ }
348
+ if job.get("status") == "done":
349
+ response["result"] = job.get("result", {})
350
+
351
+ return JSONResponse(response)
352
+
353
+
354
+ @app.get("/api/jobs")
355
+ async def list_jobs():
356
+ """List all active jobs β€” useful for debugging."""
357
+ with _jobs_lock:
358
+ summary = {
359
+ k: {"status": v["status"], "step": v.get("step", 0),
360
+ "message": v.get("message", "")}
361
+ for k, v in _jobs.items()
362
+ }
363
+ return JSONResponse({"jobs": summary, "total": len(summary)})