Devang1290 commited on
Commit
e15f0bd
Β·
1 Parent(s): d50eed9

feat: two-phase response - Groq summarization + async TTS with progressive Supabase updates

Browse files
backend/services/database.py CHANGED
@@ -93,3 +93,20 @@ class DatabaseManager:
93
  except Exception as e:
94
  logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
95
  return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
  except Exception as e:
94
  logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
95
  return False
96
+
97
+ def update_audio_url(self, article_id: str, audio_url: str) -> bool:
98
+ """
99
+ Updates the audio_url for a specific article in the articles table.
100
+ Called progressively as each TTS clip finishes generating.
101
+ """
102
+ if not self.supabase:
103
+ return False
104
+
105
+ try:
106
+ self.supabase.table("articles").update(
107
+ {"audio_url": audio_url}
108
+ ).eq("id", article_id).execute()
109
+ return True
110
+ except Exception as e:
111
+ logger.error(f"Error updating audio_url for {article_id}: {str(e)}")
112
+ return False
backend/summarize_articles/groq_summarizer.py ADDED
@@ -0,0 +1,155 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Groq-based Summarizer for On-Demand Search
3
+ ============================================
4
+ Uses Groq API (llama-3.3-70b) for fast summarization (~3s for 5 articles)
5
+ instead of local t5-small CPU inference (~100s for 15 articles).
6
+
7
+ This module is used ONLY for the on-demand search API.
8
+ The batch feed pipeline continues using t5-small for CI determinism.
9
+ """
10
+
11
+ import os
12
+ import sys
13
+ import time
14
+ from pathlib import Path
15
+ from typing import List, Dict, Optional
16
+ from datetime import datetime, timezone
17
+
18
+ sys.path.append(str(Path(__file__).resolve().parent.parent))
19
+ from core.logger import logger
20
+ from core.config import config
21
+
22
+ try:
23
+ from groq import Groq
24
+ except ImportError:
25
+ Groq = None
26
+
27
+
28
+ _groq_client = None
29
+
30
+
31
+ def _get_groq_client() -> Optional[object]:
32
+ """Lazy-initialize the Groq client."""
33
+ global _groq_client
34
+ if _groq_client is None:
35
+ api_key = config.GROQ_API_KEY or os.getenv("GROQ_API_KEY")
36
+ if not api_key:
37
+ logger.error("GROQ_API_KEY is not set. Cannot use Groq summarization.")
38
+ return None
39
+ if Groq is None:
40
+ logger.error("groq package not installed.")
41
+ return None
42
+ _groq_client = Groq(api_key=api_key)
43
+ return _groq_client
44
+
45
+
46
+ def _summarize_one_english(client, title: str, content: str) -> str:
47
+ """Summarize a single English article via Groq."""
48
+ response = client.chat.completions.create(
49
+ model="llama-3.3-70b-versatile",
50
+ messages=[
51
+ {
52
+ "role": "system",
53
+ "content": (
54
+ "You are a professional news anchor. Summarize the given news article "
55
+ "into a smooth, natural 2-3 sentence broadcast script. "
56
+ "Keep it concise and informative. Write in a conversational tone suitable "
57
+ "for text-to-speech. Output ONLY the summary text, nothing else."
58
+ ),
59
+ },
60
+ {
61
+ "role": "user",
62
+ "content": f"Title: {title}\n\nArticle:\n{content[:3000]}",
63
+ },
64
+ ],
65
+ temperature=0.3,
66
+ max_tokens=300,
67
+ )
68
+ return response.choices[0].message.content.strip()
69
+
70
+
71
+ def _summarize_one_hindi(client, title: str, content: str) -> str:
72
+ """Summarize a single Hindi article via Groq."""
73
+ response = client.chat.completions.create(
74
+ model="llama-3.3-70b-versatile",
75
+ messages=[
76
+ {
77
+ "role": "system",
78
+ "content": (
79
+ "You are a Hindi news anchor. Summarize the given Hindi news article "
80
+ "into a natural, smooth 2-3 sentence broadcast script in Hindi. "
81
+ "Use simple words. Write all numbers in Hindi words (e.g. ΰ€¦ΰ€Έ, ΰ€Έΰ€Ύΰ€€). "
82
+ "Output ONLY the Hindi summary text, nothing else, no quotes."
83
+ ),
84
+ },
85
+ {
86
+ "role": "user",
87
+ "content": f"ΰ€Άΰ₯€ΰ€°ΰ₯ΰ€·ΰ€•: {title}\n\nΰ€²ΰ₯‡ΰ€–:\n{content[:3000]}",
88
+ },
89
+ ],
90
+ temperature=0.3,
91
+ max_tokens=500,
92
+ )
93
+ return response.choices[0].message.content.strip()
94
+
95
+
96
+ def summarize_with_groq(
97
+ articles: List[Dict],
98
+ language: str,
99
+ max_articles: int = 5,
100
+ ) -> List[Dict]:
101
+ """
102
+ Summarize articles using Groq API.
103
+
104
+ Args:
105
+ articles: List of scraped article dicts with 'title' and 'content' keys.
106
+ language: 'english' or 'hindi'
107
+ max_articles: Maximum number of articles to process (default: 5)
108
+
109
+ Returns:
110
+ List of article dicts with 'summary' field added.
111
+ Returns empty list if Groq is unavailable.
112
+ """
113
+ client = _get_groq_client()
114
+ if client is None:
115
+ logger.error("Groq client unavailable. Returning empty results.")
116
+ return []
117
+
118
+ # Take only top N articles
119
+ articles = articles[:max_articles]
120
+ logger.info(f"Summarizing {len(articles)} articles via Groq ({language})...")
121
+
122
+ summarize_fn = _summarize_one_hindi if language == "hindi" else _summarize_one_english
123
+ processed = []
124
+
125
+ for idx, article in enumerate(articles, 1):
126
+ title = article.get("title", "Untitled")
127
+ content = article.get("content", "")
128
+
129
+ if not content:
130
+ logger.warning(f"[{idx}/{len(articles)}] Skipped (no content): {title[:50]}")
131
+ continue
132
+
133
+ try:
134
+ safe_title = title[:50].encode("utf-8", errors="replace").decode("utf-8")
135
+ logger.info(f"[{idx}/{len(articles)}] Summarizing: {safe_title}...")
136
+
137
+ summary = summarize_fn(client, title, content)
138
+ article["summary"] = summary
139
+ article["summarized"] = True
140
+ article["summary_generated_at"] = datetime.now(timezone.utc).isoformat()
141
+ processed.append(article)
142
+
143
+ # Small delay to respect Groq rate limits (30 RPM free tier)
144
+ if idx < len(articles):
145
+ time.sleep(2)
146
+
147
+ except Exception as e:
148
+ logger.error(f"Groq summarization failed for article {article.get('id')}: {e}")
149
+ # Include the article without summary rather than dropping it
150
+ article["summary"] = content[:500] + "..."
151
+ article["summarized"] = False
152
+ processed.append(article)
153
+
154
+ logger.success(f"Groq summarization complete: {sum(1 for a in processed if a.get('summarized'))} / {len(processed)} articles.")
155
+ return processed
hf_app.py CHANGED
@@ -1,64 +1,253 @@
 
 
 
 
 
 
 
 
 
 
1
  import sys
2
  import os
 
 
 
 
 
 
 
3
  from fastapi import FastAPI, BackgroundTasks, HTTPException
4
- from pydantic import BaseModel
5
- from typing import Optional
6
 
7
- # Ensure project root is in path so we can import main
8
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 
9
 
10
- from main import process_search
 
 
 
11
 
12
  app = FastAPI(
13
  title="News-Whisper On-Demand API",
14
- description="Asynchronous API for running the News Whisper pipeline on-demand.",
15
- version="1.0.0"
 
 
 
 
 
16
  )
17
 
 
 
 
 
 
 
 
18
  class SearchRequest(BaseModel):
19
- query: str
20
- language: str # "english" or "hindi"
21
- pages: Optional[int] = 1
22
- no_dedup: Optional[bool] = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
  @app.get("/health")
25
  def health_check():
26
- """
27
- Keep-alive endpoint.
28
- A GET request here prevents the Hugging Face space from sleeping after 48h.
29
- """
30
  return {"status": "alive"}
31
 
32
- def run_pipeline(language: str, query: str, no_dedup: bool, pages: int):
33
- """
34
- Wrapper function to execute the heavy ML pipeline in the background.
35
- """
36
- try:
37
- print(f"Starting background search task: '{query}' ({language})")
38
- success = process_search(language, query, no_dedup, pages)
39
- if not success:
40
- print(f"❌ Background pipeline failed for query: {query}")
41
- else:
42
- print(f"βœ… Background pipeline succeeded for query: {query}")
43
- except Exception as e:
44
- print(f"❌ Exception during pipeline execution: {e}")
45
 
46
- @app.post("/search")
47
- def trigger_search(req: SearchRequest, background_tasks: BackgroundTasks):
48
  """
49
- Triggers the on-demand search pipeline asynchronously.
 
 
 
50
  """
51
  if req.language not in ["english", "hindi"]:
52
  raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'")
53
-
54
  if not req.query.strip():
55
  raise HTTPException(status_code=400, detail="Search query cannot be empty")
56
 
57
- # Add the heavy pipeline to the BackgroundTasks queue
58
- # This allows FastAPI to return a 202 Accepted instantly while the TTS runs.
59
- background_tasks.add_task(run_pipeline, req.language, req.query, req.no_dedup, req.pages)
60
 
61
- return {
62
- "status": "processing",
63
- "message": f"Search for '{req.query}' in {req.language} has been queued. Listen to Supabase Realtime for the final output."
64
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ News-Whisper On-Demand Search API (v2 β€” Two-Phase Response)
3
+ ===========================================================
4
+ Phase 1 (Synchronous): Scrape β†’ Groq Summarize (top 5) β†’ Insert to Supabase β†’ Return articles
5
+ Phase 2 (Background): Kokoro TTS per article β†’ Upload to Cloudinary β†’ Update audio_url in Supabase
6
+
7
+ The frontend subscribes to Supabase Realtime and progressively unlocks
8
+ Play buttons as each audio_url changes from null to a Cloudinary URL.
9
+ """
10
+
11
  import sys
12
  import os
13
+ import json
14
+ import subprocess
15
+ import time
16
+ from pathlib import Path
17
+ from typing import List, Dict, Optional
18
+ from datetime import datetime, timezone
19
+
20
  from fastapi import FastAPI, BackgroundTasks, HTTPException
21
+ from fastapi.responses import RedirectResponse
22
+ from pydantic import BaseModel, Field
23
 
24
+ # Ensure project root is in path
25
+ PROJECT_ROOT = Path(__file__).parent.resolve()
26
+ sys.path.append(str(PROJECT_ROOT))
27
 
28
+ from backend.summarize_articles.groq_summarizer import summarize_with_groq
29
+ from backend.services.database import DatabaseManager
30
+ from backend.services.cloud import upload_file
31
+ from backend.common.paths import get_project_root, sanitize_query_folder, find_latest_json
32
 
33
  app = FastAPI(
34
  title="News-Whisper On-Demand API",
35
+ description=(
36
+ "Two-phase search API for News Whisper.\n\n"
37
+ "**Phase 1:** Returns article summaries in ~5 seconds.\n\n"
38
+ "**Phase 2:** Generates audio in the background (~65s). "
39
+ "Subscribe to Supabase Realtime to get progressive audio updates."
40
+ ),
41
+ version="2.0.0",
42
  )
43
 
44
+ db = DatabaseManager()
45
+
46
+
47
+ # ─────────────────────────────────────────────
48
+ # Request / Response Models
49
+ # ─────────────────────────────────────────────
50
+
51
  class SearchRequest(BaseModel):
52
+ query: str = Field(..., description="Search term", json_schema_extra={"examples": ["cricket"]})
53
+ language: str = Field(..., description="Language: 'english' or 'hindi'", json_schema_extra={"examples": ["english"]})
54
+ pages: Optional[int] = Field(1, description="Number of search result pages to scrape")
55
+ no_dedup: Optional[bool] = Field(False, description="Skip duplicate article checking")
56
+
57
+ class ArticleResponse(BaseModel):
58
+ id: str
59
+ title: str
60
+ summary: str
61
+ url: str
62
+ author: str
63
+ audio_url: Optional[str] = None
64
+
65
+ class SearchResponse(BaseModel):
66
+ status: str
67
+ message: str
68
+ articles: List[ArticleResponse] = []
69
+ audio_pending: bool = False
70
+
71
+
72
+ # ─────────────────────────────────────────────
73
+ # Endpoints
74
+ # ─────────────────────────────────────────────
75
+
76
+ @app.get("/", include_in_schema=False)
77
+ def root_redirect():
78
+ """Redirect root to Swagger docs."""
79
+ return RedirectResponse(url="/docs")
80
+
81
 
82
  @app.get("/health")
83
  def health_check():
84
+ """Keep-alive endpoint. Pinged by GitHub Actions to prevent the HF Space from sleeping."""
 
 
 
85
  return {"status": "alive"}
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
+ @app.post("/search", response_model=SearchResponse)
89
+ def search(req: SearchRequest, background_tasks: BackgroundTasks):
90
  """
91
+ Triggers the on-demand search pipeline.
92
+
93
+ **Phase 1 (sync, ~5s):** Scrapes articles, summarizes top 5 via Groq, inserts into Supabase.
94
+ **Phase 2 (async, ~65s):** Generates Kokoro TTS audio for each article and progressively updates Supabase.
95
  """
96
  if req.language not in ["english", "hindi"]:
97
  raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'")
 
98
  if not req.query.strip():
99
  raise HTTPException(status_code=400, detail="Search query cannot be empty")
100
 
101
+ query = req.query.strip()
102
+ language = req.language.lower()
 
103
 
104
+ print(f"\n{'='*80}")
105
+ print(f"SEARCH REQUEST: '{query}' ({language})")
106
+ print(f"{'='*80}\n")
107
+
108
+ # ── Phase 1: Synchronous β€” Scrape + Summarize + Insert ────────────────────
109
+ try:
110
+ articles = _phase1_scrape_and_summarize(query, language, req.pages, req.no_dedup)
111
+ except Exception as e:
112
+ print(f"❌ Phase 1 failed: {e}")
113
+ raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}")
114
+
115
+ if not articles:
116
+ return SearchResponse(
117
+ status="empty",
118
+ message=f"No articles found for '{query}'.",
119
+ articles=[],
120
+ audio_pending=False,
121
+ )
122
+
123
+ # ── Phase 2: Async β€” TTS in background ────────────────────────────────────
124
+ background_tasks.add_task(_phase2_generate_audio, articles, language, query)
125
+
126
+ return SearchResponse(
127
+ status="ready",
128
+ message=f"Found {len(articles)} articles for '{query}'. Audio is generating in the background.",
129
+ articles=[
130
+ ArticleResponse(
131
+ id=a.get("id", ""),
132
+ title=a.get("title", ""),
133
+ summary=a.get("summary", ""),
134
+ url=a.get("url", ""),
135
+ author=a.get("author", ""),
136
+ audio_url=None,
137
+ )
138
+ for a in articles
139
+ ],
140
+ audio_pending=True,
141
+ )
142
+
143
+
144
+ # ─────────────────────────────────────────────
145
+ # Phase 1: Scrape + Groq Summarize + Insert
146
+ # ─────────────────────────────────────────────
147
+
148
+ def _phase1_scrape_and_summarize(
149
+ query: str, language: str, pages: int, no_dedup: bool
150
+ ) -> List[Dict]:
151
+ """
152
+ Runs synchronously:
153
+ 1. Scrape articles via subprocess (reuses existing scraper)
154
+ 2. Summarize top 5 via Groq API
155
+ 3. Insert articles into Supabase (audio_url = null)
156
+ """
157
+ t0 = time.monotonic()
158
+ project_root = get_project_root()
159
+ scraper_script = project_root / "backend" / "web_scraping" / "news_scrape.py"
160
+ safe_query = sanitize_query_folder(query)
161
+
162
+ # ── Step 1: Scrape ────────────────────────────────────────────────────────
163
+ print(f"[Phase 1] Step 1/3: Scraping articles...")
164
+ result = subprocess.run(
165
+ [sys.executable, str(scraper_script), f"--{language}", "--search", query, "--pages", str(max(1, pages))],
166
+ capture_output=True,
167
+ text=True,
168
+ timeout=60,
169
+ )
170
+ if result.returncode != 0:
171
+ print(f"[Phase 1] Scraper stderr: {result.stderr[-500:]}")
172
+ raise RuntimeError("Web scraping failed")
173
+
174
+ # Find the scraped JSON
175
+ scraped_dir = project_root / "articles" / language / "search_queries" / safe_query
176
+ latest_json = find_latest_json(scraped_dir)
177
+ if not latest_json:
178
+ raise RuntimeError(f"No scraped articles found in {scraped_dir}")
179
+
180
+ with open(latest_json, "r", encoding="utf-8") as f:
181
+ articles = json.load(f)
182
+
183
+ print(f"[Phase 1] Scraped {len(articles)} articles in {time.monotonic() - t0:.1f}s")
184
+
185
+ # ── Step 2: Groq Summarize (top 5) ────────────────────────────────────────
186
+ print(f"[Phase 1] Step 2/3: Summarizing top 5 via Groq...")
187
+ t1 = time.monotonic()
188
+ summarized = summarize_with_groq(articles, language, max_articles=5)
189
+ print(f"[Phase 1] Summarized {len(summarized)} articles via Groq in {time.monotonic() - t1:.1f}s")
190
+
191
+ if not summarized:
192
+ raise RuntimeError("Groq summarization returned empty results")
193
+
194
+ # ── Step 3: Insert into Supabase (audio_url = null) ───────────────────────
195
+ print(f"[Phase 1] Step 3/3: Inserting into Supabase...")
196
+ for article in summarized:
197
+ article["audio_url"] = "" # Will be updated by Phase 2
198
+ db.insert_article(article)
199
+
200
+ total = time.monotonic() - t0
201
+ print(f"[Phase 1] βœ… Complete in {total:.1f}s β€” {len(summarized)} articles ready")
202
+ return summarized
203
+
204
+
205
+ # ─────────────────────────────────────────────
206
+ # Phase 2: TTS Generation (Background)
207
+ # ─────────────────────────────────────────────
208
+
209
+ def _phase2_generate_audio(articles: List[Dict], language: str, query: str):
210
+ """
211
+ Runs in the background after the HTTP response is sent.
212
+ Generates Kokoro TTS for each article and progressively updates Supabase.
213
+ """
214
+ print(f"\n[Phase 2] Starting TTS generation for {len(articles)} articles...")
215
+ t0 = time.monotonic()
216
+ safe_query = sanitize_query_folder(query)
217
+
218
+ try:
219
+ # Import TTS module
220
+ from backend.text_to_speech.tts import generate_audio
221
+ from backend.services.delivery import DeliveryService
222
+
223
+ delivery = DeliveryService()
224
+ output_dir = delivery.get_audio_output_dir(language, query, is_search=True)
225
+
226
+ # Generate audio for all articles
227
+ articles_with_audio = generate_audio(articles, language, output_dir)
228
+
229
+ # Upload each audio to Cloudinary and update Supabase progressively
230
+ timestamp = delivery._get_timestamp_folder()
231
+ parent_folder = "search_queries"
232
+ safe_target = query.replace(" ", "_").lower()
233
+
234
+ for article in articles_with_audio:
235
+ article_id = article.get("id")
236
+ local_audio = article.get("local_audio_path")
237
+
238
+ if local_audio and os.path.exists(local_audio):
239
+ cloud_folder = f"audios/{language}/{parent_folder}/{safe_target}/{timestamp}"
240
+ audio_url = upload_file(local_audio, cloud_folder, resource_type="auto")
241
+
242
+ if audio_url:
243
+ # Progressive update: frontend sees this via Supabase Realtime
244
+ db.update_audio_url(article_id, audio_url)
245
+ print(f"[Phase 2] βœ… Audio ready for {article_id}: {audio_url[:80]}...")
246
+
247
+ total = time.monotonic() - t0
248
+ print(f"[Phase 2] βœ… TTS complete in {total:.1f}s β€” all audio uploaded")
249
+
250
+ except Exception as e:
251
+ print(f"[Phase 2] ❌ TTS generation failed: {e}")
252
+ import traceback
253
+ traceback.print_exc()