Devang1290 commited on
Commit
0147f31
Β·
1 Parent(s): e15f0bd

feat: smart priority queue for TTS with lazy frontend pull architecture

Browse files
Files changed (2) hide show
  1. backend/services/database.py +22 -2
  2. hf_app.py +127 -91
backend/services/database.py CHANGED
@@ -73,6 +73,7 @@ class DatabaseManager:
73
  "content": article_data.get('content', ''),
74
  "summary": article_data.get('summary', ''),
75
  "audio_url": article_data.get('audio_url', ''),
 
76
  "published_at": article_data.get('published_date'),
77
  "scraped_at": article_data.get('scraped_at'),
78
  "summary_generated_at": article_data.get('summary_generated_at')
@@ -94,9 +95,25 @@ class DatabaseManager:
94
  logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
95
  return False
96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  def update_audio_url(self, article_id: str, audio_url: str) -> bool:
98
  """
99
- Updates the audio_url for a specific article in the articles table.
100
  Called progressively as each TTS clip finishes generating.
101
  """
102
  if not self.supabase:
@@ -104,7 +121,10 @@ class DatabaseManager:
104
 
105
  try:
106
  self.supabase.table("articles").update(
107
- {"audio_url": audio_url}
 
 
 
108
  ).eq("id", article_id).execute()
109
  return True
110
  except Exception as e:
 
73
  "content": article_data.get('content', ''),
74
  "summary": article_data.get('summary', ''),
75
  "audio_url": article_data.get('audio_url', ''),
76
+ "audio_status": article_data.get('audio_status', 'queued'),
77
  "published_at": article_data.get('published_date'),
78
  "scraped_at": article_data.get('scraped_at'),
79
  "summary_generated_at": article_data.get('summary_generated_at')
 
95
  logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
96
  return False
97
 
98
+ def update_audio_status(self, article_id: str, status: str) -> bool:
99
+ """
100
+ Updates the progressive audio_status ('queued', 'generating', 'ready').
101
+ """
102
+ if not self.supabase:
103
+ return False
104
+
105
+ try:
106
+ self.supabase.table("articles").update(
107
+ {"audio_status": status}
108
+ ).eq("id", article_id).execute()
109
+ return True
110
+ except Exception as e:
111
+ logger.error(f"Error updating audio_status for {article_id}: {str(e)}")
112
+ return False
113
+
114
  def update_audio_url(self, article_id: str, audio_url: str) -> bool:
115
  """
116
+ Updates the audio_url and sets status to 'ready'.
117
  Called progressively as each TTS clip finishes generating.
118
  """
119
  if not self.supabase:
 
121
 
122
  try:
123
  self.supabase.table("articles").update(
124
+ {
125
+ "audio_url": audio_url,
126
+ "audio_status": "ready"
127
+ }
128
  ).eq("id", article_id).execute()
129
  return True
130
  except Exception as e:
hf_app.py CHANGED
@@ -1,11 +1,12 @@
1
  """
2
- News-Whisper On-Demand Search API (v2 β€” Two-Phase Response)
3
- ===========================================================
4
- Phase 1 (Synchronous): Scrape β†’ Groq Summarize (top 5) β†’ Insert to Supabase β†’ Return articles
5
- Phase 2 (Background): Kokoro TTS per article β†’ Upload to Cloudinary β†’ Update audio_url in Supabase
6
-
7
- The frontend subscribes to Supabase Realtime and progressively unlocks
8
- Play buttons as each audio_url changes from null to a Cloudinary URL.
 
9
  """
10
 
11
  import sys
@@ -13,12 +14,15 @@ import os
13
  import json
14
  import subprocess
15
  import time
 
16
  from pathlib import Path
17
  from typing import List, Dict, Optional
18
  from datetime import datetime, timezone
 
19
 
20
  from fastapi import FastAPI, BackgroundTasks, HTTPException
21
  from fastapi.responses import RedirectResponse
 
22
  from pydantic import BaseModel, Field
23
 
24
  # Ensure project root is in path
@@ -30,19 +34,62 @@ from backend.services.database import DatabaseManager
30
  from backend.services.cloud import upload_file
31
  from backend.common.paths import get_project_root, sanitize_query_folder, find_latest_json
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  app = FastAPI(
34
- title="News-Whisper On-Demand API",
35
- description=(
36
- "Two-phase search API for News Whisper.\n\n"
37
- "**Phase 1:** Returns article summaries in ~5 seconds.\n\n"
38
- "**Phase 2:** Generates audio in the background (~65s). "
39
- "Subscribe to Supabase Realtime to get progressive audio updates."
40
- ),
41
- version="2.0.0",
42
  )
43
 
44
- db = DatabaseManager()
45
-
46
 
47
  # ─────────────────────────────────────────────
48
  # Request / Response Models
@@ -61,6 +108,7 @@ class ArticleResponse(BaseModel):
61
  url: str
62
  author: str
63
  audio_url: Optional[str] = None
 
64
 
65
  class SearchResponse(BaseModel):
66
  status: str
@@ -68,6 +116,11 @@ class SearchResponse(BaseModel):
68
  articles: List[ArticleResponse] = []
69
  audio_pending: bool = False
70
 
 
 
 
 
 
71
 
72
  # ─────────────────────────────────────────────
73
  # Endpoints
@@ -81,17 +134,15 @@ def root_redirect():
81
 
82
  @app.get("/health")
83
  def health_check():
84
- """Keep-alive endpoint. Pinged by GitHub Actions to prevent the HF Space from sleeping."""
85
- return {"status": "alive"}
86
 
87
 
88
  @app.post("/search", response_model=SearchResponse)
89
- def search(req: SearchRequest, background_tasks: BackgroundTasks):
90
  """
91
- Triggers the on-demand search pipeline.
92
-
93
- **Phase 1 (sync, ~5s):** Scrapes articles, summarizes top 5 via Groq, inserts into Supabase.
94
- **Phase 2 (async, ~65s):** Generates Kokoro TTS audio for each article and progressively updates Supabase.
95
  """
96
  if req.language not in ["english", "hindi"]:
97
  raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'")
@@ -105,7 +156,6 @@ def search(req: SearchRequest, background_tasks: BackgroundTasks):
105
  print(f"SEARCH REQUEST: '{query}' ({language})")
106
  print(f"{'='*80}\n")
107
 
108
- # ── Phase 1: Synchronous β€” Scrape + Summarize + Insert ────────────────────
109
  try:
110
  articles = _phase1_scrape_and_summarize(query, language, req.pages, req.no_dedup)
111
  except Exception as e:
@@ -117,15 +167,19 @@ def search(req: SearchRequest, background_tasks: BackgroundTasks):
117
  status="empty",
118
  message=f"No articles found for '{query}'.",
119
  articles=[],
120
- audio_pending=False,
121
  )
122
 
123
- # ── Phase 2: Async β€” TTS in background ────────────────────────────────────
124
- background_tasks.add_task(_phase2_generate_audio, articles, language, query)
 
 
 
 
 
125
 
126
  return SearchResponse(
127
  status="ready",
128
- message=f"Found {len(articles)} articles for '{query}'. Audio is generating in the background.",
129
  articles=[
130
  ArticleResponse(
131
  id=a.get("id", ""),
@@ -134,6 +188,7 @@ def search(req: SearchRequest, background_tasks: BackgroundTasks):
134
  url=a.get("url", ""),
135
  author=a.get("author", ""),
136
  audio_url=None,
 
137
  )
138
  for a in articles
139
  ],
@@ -141,25 +196,36 @@ def search(req: SearchRequest, background_tasks: BackgroundTasks):
141
  )
142
 
143
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  # ─────────────────────────────────────────────
145
- # Phase 1: Scrape + Groq Summarize + Insert
146
  # ─────────────────────────────────────────────
147
 
148
- def _phase1_scrape_and_summarize(
149
- query: str, language: str, pages: int, no_dedup: bool
150
- ) -> List[Dict]:
151
- """
152
- Runs synchronously:
153
- 1. Scrape articles via subprocess (reuses existing scraper)
154
- 2. Summarize top 5 via Groq API
155
- 3. Insert articles into Supabase (audio_url = null)
156
- """
157
  t0 = time.monotonic()
158
  project_root = get_project_root()
159
  scraper_script = project_root / "backend" / "web_scraping" / "news_scrape.py"
160
  safe_query = sanitize_query_folder(query)
161
 
162
- # ── Step 1: Scrape ────────────────────────────────────────────────────────
163
  print(f"[Phase 1] Step 1/3: Scraping articles...")
164
  result = subprocess.run(
165
  [sys.executable, str(scraper_script), f"--{language}", "--search", query, "--pages", str(max(1, pages))],
@@ -171,7 +237,6 @@ def _phase1_scrape_and_summarize(
171
  print(f"[Phase 1] Scraper stderr: {result.stderr[-500:]}")
172
  raise RuntimeError("Web scraping failed")
173
 
174
- # Find the scraped JSON
175
  scraped_dir = project_root / "articles" / language / "search_queries" / safe_query
176
  latest_json = find_latest_json(scraped_dir)
177
  if not latest_json:
@@ -180,74 +245,45 @@ def _phase1_scrape_and_summarize(
180
  with open(latest_json, "r", encoding="utf-8") as f:
181
  articles = json.load(f)
182
 
183
- print(f"[Phase 1] Scraped {len(articles)} articles in {time.monotonic() - t0:.1f}s")
184
-
185
- # ── Step 2: Groq Summarize (top 5) ────────────────────────────────────────
186
  print(f"[Phase 1] Step 2/3: Summarizing top 5 via Groq...")
187
- t1 = time.monotonic()
188
  summarized = summarize_with_groq(articles, language, max_articles=5)
189
- print(f"[Phase 1] Summarized {len(summarized)} articles via Groq in {time.monotonic() - t1:.1f}s")
190
-
191
  if not summarized:
192
  raise RuntimeError("Groq summarization returned empty results")
193
 
194
- # ── Step 3: Insert into Supabase (audio_url = null) ───────────────────────
195
  print(f"[Phase 1] Step 3/3: Inserting into Supabase...")
196
  for article in summarized:
197
- article["audio_url"] = "" # Will be updated by Phase 2
 
198
  db.insert_article(article)
199
 
200
- total = time.monotonic() - t0
201
- print(f"[Phase 1] βœ… Complete in {total:.1f}s β€” {len(summarized)} articles ready")
202
  return summarized
203
 
204
 
205
- # ─────────────────────────────────────────────
206
- # Phase 2: TTS Generation (Background)
207
- # ─────────────────────────────────────────────
 
208
 
209
- def _phase2_generate_audio(articles: List[Dict], language: str, query: str):
210
- """
211
- Runs in the background after the HTTP response is sent.
212
- Generates Kokoro TTS for each article and progressively updates Supabase.
213
- """
214
- print(f"\n[Phase 2] Starting TTS generation for {len(articles)} articles...")
215
- t0 = time.monotonic()
216
- safe_query = sanitize_query_folder(query)
217
 
218
  try:
219
- # Import TTS module
220
- from backend.text_to_speech.tts import generate_audio
221
- from backend.services.delivery import DeliveryService
222
 
223
- delivery = DeliveryService()
224
- output_dir = delivery.get_audio_output_dir(language, query, is_search=True)
 
225
 
226
- # Generate audio for all articles
227
- articles_with_audio = generate_audio(articles, language, output_dir)
228
-
229
- # Upload each audio to Cloudinary and update Supabase progressively
230
  timestamp = delivery._get_timestamp_folder()
231
- parent_folder = "search_queries"
232
  safe_target = query.replace(" ", "_").lower()
 
233
 
234
- for article in articles_with_audio:
235
- article_id = article.get("id")
236
- local_audio = article.get("local_audio_path")
237
-
238
- if local_audio and os.path.exists(local_audio):
239
- cloud_folder = f"audios/{language}/{parent_folder}/{safe_target}/{timestamp}"
240
- audio_url = upload_file(local_audio, cloud_folder, resource_type="auto")
241
-
242
- if audio_url:
243
- # Progressive update: frontend sees this via Supabase Realtime
244
- db.update_audio_url(article_id, audio_url)
245
- print(f"[Phase 2] βœ… Audio ready for {article_id}: {audio_url[:80]}...")
246
-
247
- total = time.monotonic() - t0
248
- print(f"[Phase 2] βœ… TTS complete in {total:.1f}s β€” all audio uploaded")
249
-
250
  except Exception as e:
251
- print(f"[Phase 2] ❌ TTS generation failed: {e}")
252
- import traceback
253
- traceback.print_exc()
 
1
  """
2
+ News-Whisper On-Demand Search API (v3 β€” Lazy/Pull Generation)
3
+ =============================================================
4
+ Phase 1: Sync scrape + Groq Summarize (Top 5) -> Insert to DB as 'queued'.
5
+ Phase 2: Add Top 2 to PriorityQueue (background preload).
6
+ Phase 3: Frontend calls `POST /generate-audio` on scroll/click to jump queue.
7
+
8
+ Includes a single asyncio daemon worker to guarantee Kokoro TTS runs sequentially
9
+ without memory leaks or CPU race conditions on Hugging Face Spaces.
10
  """
11
 
12
  import sys
 
14
  import json
15
  import subprocess
16
  import time
17
+ import asyncio
18
  from pathlib import Path
19
  from typing import List, Dict, Optional
20
  from datetime import datetime, timezone
21
+ import contextlib
22
 
23
  from fastapi import FastAPI, BackgroundTasks, HTTPException
24
  from fastapi.responses import RedirectResponse
25
+ from fastapi.concurrency import run_in_threadpool
26
  from pydantic import BaseModel, Field
27
 
28
  # Ensure project root is in path
 
34
  from backend.services.cloud import upload_file
35
  from backend.common.paths import get_project_root, sanitize_query_folder, find_latest_json
36
 
37
+ # ─────────────────────────────────────────────
38
+ # Global TTS Queue & Worker
39
+ # ─────────────────────────────────────────────
40
+ db = DatabaseManager()
41
+ tts_queue = asyncio.PriorityQueue()
42
+ worker_task = None
43
+
44
+ async def tts_worker():
45
+ """Background worker that continuously processes TTS requests one at a time."""
46
+ print("[Queue Worker] Started listening for TTS tasks.")
47
+ while True:
48
+ try:
49
+ priority, timestamp, task_data = await tts_queue.get()
50
+ article = task_data["article"]
51
+ language = task_data["language"]
52
+ query = task_data["query"]
53
+ article_id = article.get("id")
54
+
55
+ print(f"\n[Queue Worker] Generating audio for {article_id} (Priority {priority})")
56
+ db.update_audio_status(article_id, "generating")
57
+
58
+ # Run heavy TTS in threadpool to prevent blocking the async event loop
59
+ audio_url = await run_in_threadpool(_sync_generate_single_audio, article, language, query)
60
+
61
+ if audio_url:
62
+ db.update_audio_url(article_id, audio_url) # Sets status to 'ready'
63
+ print(f"[Queue Worker] βœ… Audio ready for {article_id}: {audio_url}")
64
+ else:
65
+ db.update_audio_status(article_id, "failed")
66
+ print(f"[Queue Worker] ❌ Audio failed for {article_id}")
67
+
68
+ except asyncio.CancelledError:
69
+ break
70
+ except Exception as e:
71
+ print(f"[Queue Worker] ❌ Task failed: {e}")
72
+ finally:
73
+ tts_queue.task_done()
74
+
75
+ @contextlib.asynccontextmanager
76
+ async def lifespan(app: FastAPI):
77
+ # Startup
78
+ global worker_task
79
+ worker_task = asyncio.create_task(tts_worker())
80
+ yield
81
+ # Shutdown
82
+ if worker_task:
83
+ worker_task.cancel()
84
+
85
+
86
  app = FastAPI(
87
+ title="News-Whisper Intelligent API",
88
+ description="Search API with Lazy Audio Generation and Priority Queueing.",
89
+ version="3.0.0",
90
+ lifespan=lifespan,
 
 
 
 
91
  )
92
 
 
 
93
 
94
  # ─────────────────────────────────────────────
95
  # Request / Response Models
 
108
  url: str
109
  author: str
110
  audio_url: Optional[str] = None
111
+ audio_status: str = "queued"
112
 
113
  class SearchResponse(BaseModel):
114
  status: str
 
116
  articles: List[ArticleResponse] = []
117
  audio_pending: bool = False
118
 
119
+ class GenerateAudioRequest(BaseModel):
120
+ article: dict
121
+ language: str
122
+ query: str
123
+
124
 
125
  # ─────────────────────────────────────────────
126
  # Endpoints
 
134
 
135
  @app.get("/health")
136
  def health_check():
137
+ """Keep-alive endpoint."""
138
+ return {"status": "alive", "queue_size": tts_queue.qsize()}
139
 
140
 
141
  @app.post("/search", response_model=SearchResponse)
142
+ def search(req: SearchRequest):
143
  """
144
+ Phase 1: Scrape & Groq Summarize (Top 5). Returns in ~5s.
145
+ Phase 2: Silently adds Top 2 to the background TTS Priority Queue (priority=10).
 
 
146
  """
147
  if req.language not in ["english", "hindi"]:
148
  raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'")
 
156
  print(f"SEARCH REQUEST: '{query}' ({language})")
157
  print(f"{'='*80}\n")
158
 
 
159
  try:
160
  articles = _phase1_scrape_and_summarize(query, language, req.pages, req.no_dedup)
161
  except Exception as e:
 
167
  status="empty",
168
  message=f"No articles found for '{query}'.",
169
  articles=[],
 
170
  )
171
 
172
+ # ── Phase 2: Preload Top 2 silently ───────────────────────────────────────
173
+ for article in articles[:2]:
174
+ tts_queue.put_nowait((10, time.monotonic(), {
175
+ "article": dict(article),
176
+ "language": language,
177
+ "query": query
178
+ }))
179
 
180
  return SearchResponse(
181
  status="ready",
182
+ message=f"Found {len(articles)} articles for '{query}'. Preloading audio...",
183
  articles=[
184
  ArticleResponse(
185
  id=a.get("id", ""),
 
188
  url=a.get("url", ""),
189
  author=a.get("author", ""),
190
  audio_url=None,
191
+ audio_status="queued"
192
  )
193
  for a in articles
194
  ],
 
196
  )
197
 
198
 
199
+ @app.post("/generate-audio", status_code=202)
200
+ def generate_audio_on_demand(req: GenerateAudioRequest):
201
+ """
202
+ Phase 3: Frontend Pull.
203
+ Adds a specific article to the FRONT of the queue (priority=1).
204
+ Used when a user scrolls an article into view or explicitly clicks it.
205
+ """
206
+ article_id = req.article.get("id")
207
+ print(f"[API] Priority Generation requested for: {article_id}")
208
+
209
+ tts_queue.put_nowait((1, time.monotonic(), {
210
+ "article": req.article,
211
+ "language": req.language.lower(),
212
+ "query": req.query.strip()
213
+ }))
214
+
215
+ return {"status": "queued", "message": f"Article {article_id} added to priority queue"}
216
+
217
+
218
  # ─────────────────────────────────────────────
219
+ # Pipeline Logic
220
  # ─────────────────────────────────────────────
221
 
222
+ def _phase1_scrape_and_summarize(query: str, language: str, pages: int, no_dedup: bool) -> List[Dict]:
223
+ """Sync Scrape + Groq API + Database Insert."""
 
 
 
 
 
 
 
224
  t0 = time.monotonic()
225
  project_root = get_project_root()
226
  scraper_script = project_root / "backend" / "web_scraping" / "news_scrape.py"
227
  safe_query = sanitize_query_folder(query)
228
 
 
229
  print(f"[Phase 1] Step 1/3: Scraping articles...")
230
  result = subprocess.run(
231
  [sys.executable, str(scraper_script), f"--{language}", "--search", query, "--pages", str(max(1, pages))],
 
237
  print(f"[Phase 1] Scraper stderr: {result.stderr[-500:]}")
238
  raise RuntimeError("Web scraping failed")
239
 
 
240
  scraped_dir = project_root / "articles" / language / "search_queries" / safe_query
241
  latest_json = find_latest_json(scraped_dir)
242
  if not latest_json:
 
245
  with open(latest_json, "r", encoding="utf-8") as f:
246
  articles = json.load(f)
247
 
 
 
 
248
  print(f"[Phase 1] Step 2/3: Summarizing top 5 via Groq...")
 
249
  summarized = summarize_with_groq(articles, language, max_articles=5)
250
+
 
251
  if not summarized:
252
  raise RuntimeError("Groq summarization returned empty results")
253
 
 
254
  print(f"[Phase 1] Step 3/3: Inserting into Supabase...")
255
  for article in summarized:
256
+ article["audio_url"] = ""
257
+ article["audio_status"] = "queued"
258
  db.insert_article(article)
259
 
260
+ print(f"[Phase 1] βœ… Complete in {time.monotonic() - t0:.1f}s β€” {len(summarized)} articles ready")
 
261
  return summarized
262
 
263
 
264
+ def _sync_generate_single_audio(article: Dict, language: str, query: str) -> Optional[str]:
265
+ """Runs Kokoro TTS for a single article and uploads to Cloudinary."""
266
+ from backend.text_to_speech.tts import generate_audio
267
+ from backend.services.delivery import DeliveryService
268
 
269
+ delivery = DeliveryService()
270
+ output_dir = delivery.get_audio_output_dir(language, query, is_search=True)
 
 
 
 
 
 
271
 
272
  try:
273
+ articles_with_audio = generate_audio([article], language, output_dir)
274
+ if not articles_with_audio:
275
+ return None
276
 
277
+ local_audio = articles_with_audio[0].get("local_audio_path")
278
+ if not local_audio or not os.path.exists(local_audio):
279
+ return None
280
 
 
 
 
 
281
  timestamp = delivery._get_timestamp_folder()
 
282
  safe_target = query.replace(" ", "_").lower()
283
+ cloud_folder = f"audios/{language}/search_queries/{safe_target}/{timestamp}"
284
 
285
+ return upload_file(local_audio, cloud_folder, resource_type="auto")
286
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
  except Exception as e:
288
+ print(f"[_sync_generate_single_audio] Error: {e}")
289
+ return None