Spaces:
Sleeping
Sleeping
| """ | |
| News-Whisper On-Demand Search API (v3 β Lazy/Pull Generation) | |
| ============================================================= | |
| Phase 1: Sync scrape + Groq Summarize (Top 5) -> Insert to DB as 'queued'. | |
| Phase 2: Add Top 2 to PriorityQueue (background preload). | |
| Phase 3: Frontend calls `POST /generate-audio` on scroll/click to jump queue. | |
| Includes a single asyncio daemon worker to guarantee Kokoro TTS runs sequentially | |
| without memory leaks or CPU race conditions on Hugging Face Spaces. | |
| """ | |
| import sys | |
| import os | |
| import json | |
| import subprocess | |
| import time | |
| import asyncio | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from datetime import datetime, timezone | |
| import contextlib | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.concurrency import run_in_threadpool | |
| from pydantic import BaseModel, Field | |
| # Ensure project root is in path | |
| PROJECT_ROOT = Path(__file__).parent.resolve() | |
| sys.path.append(str(PROJECT_ROOT)) | |
| from backend.summarize_articles.groq_summarizer import summarize_with_groq | |
| from backend.services.database import DatabaseManager | |
| from backend.services.cloud import upload_file | |
| from backend.common.paths import get_project_root, sanitize_query_folder, find_latest_json | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Global TTS Queue & Worker | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| db = DatabaseManager() | |
| tts_queue = asyncio.PriorityQueue() | |
| worker_task = None | |
| async def tts_worker(): | |
| """Background worker that continuously processes TTS requests one at a time.""" | |
| print("[Queue Worker] Started listening for TTS tasks.") | |
| while True: | |
| try: | |
| priority, timestamp, task_data = await tts_queue.get() | |
| article = task_data["article"] | |
| language = task_data["language"] | |
| query = task_data["query"] | |
| article_id = article.get("id") | |
| print(f"\n[Queue Worker] Generating audio for {article_id} (Priority {priority})") | |
| db.update_audio_status(article_id, "generating") | |
| # Run heavy TTS in threadpool to prevent blocking the async event loop | |
| audio_url = await run_in_threadpool(_sync_generate_single_audio, article, language, query) | |
| if audio_url: | |
| db.update_audio_url(article_id, audio_url) # Sets status to 'ready' | |
| print(f"[Queue Worker] β Audio ready for {article_id}: {audio_url}") | |
| else: | |
| db.update_audio_status(article_id, "failed") | |
| print(f"[Queue Worker] β Audio failed for {article_id}") | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| print(f"[Queue Worker] β Task failed: {e}") | |
| finally: | |
| tts_queue.task_done() | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| global worker_task | |
| worker_task = asyncio.create_task(tts_worker()) | |
| yield | |
| # Shutdown | |
| if worker_task: | |
| worker_task.cancel() | |
| app = FastAPI( | |
| title="News-Whisper Intelligent API", | |
| description="Search API with Lazy Audio Generation and Priority Queueing.", | |
| version="3.0.0", | |
| lifespan=lifespan, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Request / Response Models | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class SearchRequest(BaseModel): | |
| query: str = Field(..., description="Search term", json_schema_extra={"examples": ["cricket"]}) | |
| language: str = Field(..., description="Language: 'english' or 'hindi'", json_schema_extra={"examples": ["english"]}) | |
| pages: Optional[int] = Field(1, description="Number of search result pages to scrape") | |
| no_dedup: Optional[bool] = Field(False, description="Skip duplicate article checking") | |
| class ArticleResponse(BaseModel): | |
| id: str | |
| title: str | |
| summary: str | |
| url: str | |
| author: str | |
| audio_url: Optional[str] = None | |
| audio_status: str = "queued" | |
| class SearchResponse(BaseModel): | |
| status: str | |
| message: str | |
| articles: List[ArticleResponse] = [] | |
| audio_pending: bool = False | |
| class GenerateAudioRequest(BaseModel): | |
| article: dict | |
| language: str | |
| query: str | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Endpoints | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def root_redirect(): | |
| """Redirect root to Swagger docs.""" | |
| return RedirectResponse(url="/docs") | |
| def health_check(): | |
| """Keep-alive endpoint.""" | |
| return {"status": "alive", "queue_size": tts_queue.qsize()} | |
| def search(req: SearchRequest): | |
| """ | |
| Phase 1: Scrape & Groq Summarize (Top 5). Returns in ~5s. | |
| Phase 2: Silently adds Top 2 to the background TTS Priority Queue (priority=10). | |
| Now with Supabase Query Caching (Instant return for repeat searches). | |
| """ | |
| if req.language not in ["english", "hindi"]: | |
| raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'") | |
| if not req.query.strip(): | |
| raise HTTPException(status_code=400, detail="Search query cannot be empty") | |
| query = req.query.strip() | |
| language = req.language.lower() | |
| print(f"\n{'='*80}") | |
| print(f"SEARCH REQUEST: '{query}' ({language})") | |
| print(f"{'='*80}\n") | |
| # ββ Phase 0: Check Supabase Query Cache ββββββββββββββββββββββββββββββββββ | |
| if not req.no_dedup: | |
| cached_articles = db.check_query_cache(query, language) | |
| if cached_articles: | |
| print(f"[Phase 0] β Cache Hit for '{query}'! Returning instantly.") | |
| # Even on a cache hit, we should ensure the Top 2 are in the TTS queue | |
| # if their audio hasn't finished generating yet. | |
| for art in cached_articles[:2]: | |
| if art.get("audio_status") == "queued": | |
| tts_queue.put_nowait((10, time.monotonic(), { | |
| "article": dict(art), | |
| "language": language, | |
| "query": query | |
| })) | |
| return SearchResponse( | |
| status="cache_hit", | |
| message=f"Cache hit! Found {len(cached_articles)} articles for '{query}'.", | |
| articles=[ArticleResponse(**a) for a in cached_articles], | |
| audio_pending=any(a.get("audio_status") != "ready" for a in cached_articles), | |
| ) | |
| # ββ Phase 1: Not cached. Scrape + Summarize + Insert ββββββββββββββββββββ | |
| try: | |
| articles = _phase1_scrape_and_summarize(query, language, req.pages, req.no_dedup) | |
| except Exception as e: | |
| print(f"β Phase 1 failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}") | |
| if not articles: | |
| return SearchResponse( | |
| status="empty", | |
| message=f"No articles found for '{query}'.", | |
| articles=[], | |
| ) | |
| # ββ Phase 2: Preload Top 2 silently βββββββββββββββββββββββββββββββββββββββ | |
| for article in articles[:2]: | |
| tts_queue.put_nowait((10, time.monotonic(), { | |
| "article": dict(article), | |
| "language": language, | |
| "query": query | |
| })) | |
| return SearchResponse( | |
| status="ready", | |
| message=f"Found {len(articles)} articles for '{query}'. Preloading audio...", | |
| articles=[ | |
| ArticleResponse( | |
| id=a.get("id", ""), | |
| title=a.get("title", ""), | |
| summary=a.get("summary", ""), | |
| url=a.get("url", ""), | |
| author=a.get("author", ""), | |
| audio_url=None, | |
| audio_status="queued" | |
| ) | |
| for a in articles | |
| ], | |
| audio_pending=True, | |
| ) | |
| def generate_audio_on_demand(req: GenerateAudioRequest): | |
| """ | |
| Phase 3: Frontend Pull. | |
| Adds a specific article to the FRONT of the queue (priority=1). | |
| Used when a user scrolls an article into view or explicitly clicks it. | |
| """ | |
| article_id = req.article.get("id") | |
| print(f"[API] Priority Generation requested for: {article_id}") | |
| tts_queue.put_nowait((1, time.monotonic(), { | |
| "article": req.article, | |
| "language": req.language.lower(), | |
| "query": req.query.strip() | |
| })) | |
| return {"status": "queued", "message": f"Article {article_id} added to priority queue"} | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pipeline Logic | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _phase1_scrape_and_summarize(query: str, language: str, pages: int, no_dedup: bool) -> List[Dict]: | |
| """Sync Scrape + Groq API + Database Insert + Update Cache.""" | |
| t0 = time.monotonic() | |
| project_root = get_project_root() | |
| scraper_script = project_root / "backend" / "web_scraping" / "news_scrape.py" | |
| safe_query = sanitize_query_folder(query) | |
| print(f"[Phase 1] Step 1/4: Scraping articles...") | |
| result = subprocess.run( | |
| [sys.executable, str(scraper_script), f"--{language}", "--search", query, "--pages", str(max(1, pages))], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| ) | |
| if result.returncode != 0: | |
| print(f"[Phase 1] Scraper stderr: {result.stderr[-500:]}") | |
| raise RuntimeError("Web scraping failed") | |
| scraped_dir = project_root / "articles" / language / "search_queries" / safe_query | |
| latest_json = find_latest_json(scraped_dir) | |
| if not latest_json: | |
| raise RuntimeError(f"No scraped articles found in {scraped_dir}") | |
| with open(latest_json, "r", encoding="utf-8") as f: | |
| articles = json.load(f) | |
| print(f"[Phase 1] Step 2/4: Summarizing top 5 via Groq...") | |
| summarized = summarize_with_groq(articles, language, max_articles=5) | |
| if not summarized: | |
| raise RuntimeError("Groq summarization returned empty results") | |
| print(f"[Phase 1] Step 3/4: Inserting into Supabase...") | |
| article_ids = [] | |
| for article in summarized: | |
| article["audio_url"] = "" | |
| article["audio_status"] = "queued" | |
| article_ids.append(article.get("id")) | |
| db.insert_article(article) | |
| print(f"[Phase 1] Step 4/4: Updating Query Cache...") | |
| db.write_query_cache(query, language, article_ids) | |
| print(f"[Phase 1] β Complete in {time.monotonic() - t0:.1f}s β {len(summarized)} articles ready") | |
| return summarized | |
| def _sync_generate_single_audio(article: Dict, language: str, query: str) -> Optional[str]: | |
| """Runs Kokoro TTS for a single article and uploads to Cloudinary.""" | |
| from backend.text_to_speech.tts import generate_audio | |
| from backend.services.delivery import DeliveryService | |
| delivery = DeliveryService() | |
| output_dir = delivery.get_audio_output_dir(language, query, is_search=True) | |
| try: | |
| articles_with_audio = generate_audio([article], language, output_dir) | |
| if not articles_with_audio: | |
| return None | |
| local_audio = articles_with_audio[0].get("local_audio_path") | |
| if not local_audio or not os.path.exists(local_audio): | |
| return None | |
| import re | |
| import hashlib | |
| timestamp = delivery._get_timestamp_folder() | |
| # Cloudinary rejects non-ASCII characters in public IDs. | |
| # Strip all non-ASCII, and append a short hash of the original query to guarantee uniqueness. | |
| safe_target = re.sub(r'[^a-zA-Z0-9]', '_', query.lower()) | |
| safe_target = re.sub(r'_+', '_', safe_target).strip('_') | |
| query_hash = hashlib.md5(query.encode('utf-8')).hexdigest()[:6] | |
| if not safe_target: | |
| safe_target = f"query_{query_hash}" | |
| else: | |
| safe_target = f"{safe_target}_{query_hash}" | |
| cloud_folder = f"audios/{language}/search_queries/{safe_target}/{timestamp}" | |
| return upload_file(local_audio, cloud_folder, resource_type="auto") | |
| except Exception as e: | |
| print(f"[_sync_generate_single_audio] Error: {e}") | |
| return None | |