news-whisper-api / hf_app.py
Devang1290
fix: sanitize cloudy paths for non-ascii hindi queries
b31627d
"""
News-Whisper On-Demand Search API (v3 β€” Lazy/Pull Generation)
=============================================================
Phase 1: Sync scrape + Groq Summarize (Top 5) -> Insert to DB as 'queued'.
Phase 2: Add Top 2 to PriorityQueue (background preload).
Phase 3: Frontend calls `POST /generate-audio` on scroll/click to jump queue.
Includes a single asyncio daemon worker to guarantee Kokoro TTS runs sequentially
without memory leaks or CPU race conditions on Hugging Face Spaces.
"""
import sys
import os
import json
import subprocess
import time
import asyncio
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime, timezone
import contextlib
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import RedirectResponse
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel, Field
# Ensure project root is in path
PROJECT_ROOT = Path(__file__).parent.resolve()
sys.path.append(str(PROJECT_ROOT))
from backend.summarize_articles.groq_summarizer import summarize_with_groq
from backend.services.database import DatabaseManager
from backend.services.cloud import upload_file
from backend.common.paths import get_project_root, sanitize_query_folder, find_latest_json
# ─────────────────────────────────────────────
# Global TTS Queue & Worker
# ─────────────────────────────────────────────
db = DatabaseManager()
tts_queue = asyncio.PriorityQueue()
worker_task = None
async def tts_worker():
"""Background worker that continuously processes TTS requests one at a time."""
print("[Queue Worker] Started listening for TTS tasks.")
while True:
try:
priority, timestamp, task_data = await tts_queue.get()
article = task_data["article"]
language = task_data["language"]
query = task_data["query"]
article_id = article.get("id")
print(f"\n[Queue Worker] Generating audio for {article_id} (Priority {priority})")
db.update_audio_status(article_id, "generating")
# Run heavy TTS in threadpool to prevent blocking the async event loop
audio_url = await run_in_threadpool(_sync_generate_single_audio, article, language, query)
if audio_url:
db.update_audio_url(article_id, audio_url) # Sets status to 'ready'
print(f"[Queue Worker] βœ… Audio ready for {article_id}: {audio_url}")
else:
db.update_audio_status(article_id, "failed")
print(f"[Queue Worker] ❌ Audio failed for {article_id}")
except asyncio.CancelledError:
break
except Exception as e:
print(f"[Queue Worker] ❌ Task failed: {e}")
finally:
tts_queue.task_done()
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global worker_task
worker_task = asyncio.create_task(tts_worker())
yield
# Shutdown
if worker_task:
worker_task.cancel()
app = FastAPI(
title="News-Whisper Intelligent API",
description="Search API with Lazy Audio Generation and Priority Queueing.",
version="3.0.0",
lifespan=lifespan,
)
# ─────────────────────────────────────────────
# Request / Response Models
# ─────────────────────────────────────────────
class SearchRequest(BaseModel):
query: str = Field(..., description="Search term", json_schema_extra={"examples": ["cricket"]})
language: str = Field(..., description="Language: 'english' or 'hindi'", json_schema_extra={"examples": ["english"]})
pages: Optional[int] = Field(1, description="Number of search result pages to scrape")
no_dedup: Optional[bool] = Field(False, description="Skip duplicate article checking")
class ArticleResponse(BaseModel):
id: str
title: str
summary: str
url: str
author: str
audio_url: Optional[str] = None
audio_status: str = "queued"
class SearchResponse(BaseModel):
status: str
message: str
articles: List[ArticleResponse] = []
audio_pending: bool = False
class GenerateAudioRequest(BaseModel):
article: dict
language: str
query: str
# ─────────────────────────────────────────────
# Endpoints
# ─────────────────────────────────────────────
@app.get("/", include_in_schema=False)
def root_redirect():
"""Redirect root to Swagger docs."""
return RedirectResponse(url="/docs")
@app.get("/health")
def health_check():
"""Keep-alive endpoint."""
return {"status": "alive", "queue_size": tts_queue.qsize()}
@app.post("/search", response_model=SearchResponse)
def search(req: SearchRequest):
"""
Phase 1: Scrape & Groq Summarize (Top 5). Returns in ~5s.
Phase 2: Silently adds Top 2 to the background TTS Priority Queue (priority=10).
Now with Supabase Query Caching (Instant return for repeat searches).
"""
if req.language not in ["english", "hindi"]:
raise HTTPException(status_code=400, detail="Language must be 'english' or 'hindi'")
if not req.query.strip():
raise HTTPException(status_code=400, detail="Search query cannot be empty")
query = req.query.strip()
language = req.language.lower()
print(f"\n{'='*80}")
print(f"SEARCH REQUEST: '{query}' ({language})")
print(f"{'='*80}\n")
# ── Phase 0: Check Supabase Query Cache ──────────────────────────────────
if not req.no_dedup:
cached_articles = db.check_query_cache(query, language)
if cached_articles:
print(f"[Phase 0] βœ… Cache Hit for '{query}'! Returning instantly.")
# Even on a cache hit, we should ensure the Top 2 are in the TTS queue
# if their audio hasn't finished generating yet.
for art in cached_articles[:2]:
if art.get("audio_status") == "queued":
tts_queue.put_nowait((10, time.monotonic(), {
"article": dict(art),
"language": language,
"query": query
}))
return SearchResponse(
status="cache_hit",
message=f"Cache hit! Found {len(cached_articles)} articles for '{query}'.",
articles=[ArticleResponse(**a) for a in cached_articles],
audio_pending=any(a.get("audio_status") != "ready" for a in cached_articles),
)
# ── Phase 1: Not cached. Scrape + Summarize + Insert ────────────────────
try:
articles = _phase1_scrape_and_summarize(query, language, req.pages, req.no_dedup)
except Exception as e:
print(f"❌ Phase 1 failed: {e}")
raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}")
if not articles:
return SearchResponse(
status="empty",
message=f"No articles found for '{query}'.",
articles=[],
)
# ── Phase 2: Preload Top 2 silently ───────────────────────────────────────
for article in articles[:2]:
tts_queue.put_nowait((10, time.monotonic(), {
"article": dict(article),
"language": language,
"query": query
}))
return SearchResponse(
status="ready",
message=f"Found {len(articles)} articles for '{query}'. Preloading audio...",
articles=[
ArticleResponse(
id=a.get("id", ""),
title=a.get("title", ""),
summary=a.get("summary", ""),
url=a.get("url", ""),
author=a.get("author", ""),
audio_url=None,
audio_status="queued"
)
for a in articles
],
audio_pending=True,
)
@app.post("/generate-audio", status_code=202)
def generate_audio_on_demand(req: GenerateAudioRequest):
"""
Phase 3: Frontend Pull.
Adds a specific article to the FRONT of the queue (priority=1).
Used when a user scrolls an article into view or explicitly clicks it.
"""
article_id = req.article.get("id")
print(f"[API] Priority Generation requested for: {article_id}")
tts_queue.put_nowait((1, time.monotonic(), {
"article": req.article,
"language": req.language.lower(),
"query": req.query.strip()
}))
return {"status": "queued", "message": f"Article {article_id} added to priority queue"}
# ─────────────────────────────────────────────
# Pipeline Logic
# ─────────────────────────────────────────────
def _phase1_scrape_and_summarize(query: str, language: str, pages: int, no_dedup: bool) -> List[Dict]:
"""Sync Scrape + Groq API + Database Insert + Update Cache."""
t0 = time.monotonic()
project_root = get_project_root()
scraper_script = project_root / "backend" / "web_scraping" / "news_scrape.py"
safe_query = sanitize_query_folder(query)
print(f"[Phase 1] Step 1/4: Scraping articles...")
result = subprocess.run(
[sys.executable, str(scraper_script), f"--{language}", "--search", query, "--pages", str(max(1, pages))],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode != 0:
print(f"[Phase 1] Scraper stderr: {result.stderr[-500:]}")
raise RuntimeError("Web scraping failed")
scraped_dir = project_root / "articles" / language / "search_queries" / safe_query
latest_json = find_latest_json(scraped_dir)
if not latest_json:
raise RuntimeError(f"No scraped articles found in {scraped_dir}")
with open(latest_json, "r", encoding="utf-8") as f:
articles = json.load(f)
print(f"[Phase 1] Step 2/4: Summarizing top 5 via Groq...")
summarized = summarize_with_groq(articles, language, max_articles=5)
if not summarized:
raise RuntimeError("Groq summarization returned empty results")
print(f"[Phase 1] Step 3/4: Inserting into Supabase...")
article_ids = []
for article in summarized:
article["audio_url"] = ""
article["audio_status"] = "queued"
article_ids.append(article.get("id"))
db.insert_article(article)
print(f"[Phase 1] Step 4/4: Updating Query Cache...")
db.write_query_cache(query, language, article_ids)
print(f"[Phase 1] βœ… Complete in {time.monotonic() - t0:.1f}s β€” {len(summarized)} articles ready")
return summarized
def _sync_generate_single_audio(article: Dict, language: str, query: str) -> Optional[str]:
"""Runs Kokoro TTS for a single article and uploads to Cloudinary."""
from backend.text_to_speech.tts import generate_audio
from backend.services.delivery import DeliveryService
delivery = DeliveryService()
output_dir = delivery.get_audio_output_dir(language, query, is_search=True)
try:
articles_with_audio = generate_audio([article], language, output_dir)
if not articles_with_audio:
return None
local_audio = articles_with_audio[0].get("local_audio_path")
if not local_audio or not os.path.exists(local_audio):
return None
import re
import hashlib
timestamp = delivery._get_timestamp_folder()
# Cloudinary rejects non-ASCII characters in public IDs.
# Strip all non-ASCII, and append a short hash of the original query to guarantee uniqueness.
safe_target = re.sub(r'[^a-zA-Z0-9]', '_', query.lower())
safe_target = re.sub(r'_+', '_', safe_target).strip('_')
query_hash = hashlib.md5(query.encode('utf-8')).hexdigest()[:6]
if not safe_target:
safe_target = f"query_{query_hash}"
else:
safe_target = f"{safe_target}_{query_hash}"
cloud_folder = f"audios/{language}/search_queries/{safe_target}/{timestamp}"
return upload_file(local_audio, cloud_folder, resource_type="auto")
except Exception as e:
print(f"[_sync_generate_single_audio] Error: {e}")
return None