Spaces:
Sleeping
Sleeping
Devang1290
feat: phase 3 - supabase query cache for instant top 5 results on repeat searches
0b32eb4 | import sys | |
| from pathlib import Path | |
| from typing import Optional | |
| sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) | |
| from backend.core.logger import logger | |
| from backend.core.config import config | |
| try: | |
| from supabase import create_client, Client | |
| except ImportError: | |
| pass | |
| class DatabaseManager: | |
| def __init__(self): | |
| url = config.SUPABASE_URL | |
| key = config.SUPABASE_KEY | |
| if not url or not key: | |
| logger.warning("Supabase URL or Key missing. Database operations disabled.") | |
| self.supabase = None | |
| else: | |
| try: | |
| self.supabase = create_client(url, key) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Supabase client: {e}") | |
| self.supabase = None | |
| def filter_unprocessed(self, articles: list) -> list: | |
| """ | |
| Takes a list of articles and returns only those that haven't been processed | |
| (i.e., not present in the registry table). | |
| """ | |
| if not self.supabase or not articles: | |
| return articles | |
| try: | |
| article_ids = [a.get('id') for a in articles if a.get('id')] | |
| if not article_ids: | |
| return articles | |
| response = self.supabase.table("registry").select("id").in_("id", article_ids).execute() | |
| existing_ids = {item['id'] for item in response.data} | |
| new_articles = [a for a in articles if a.get('id') not in existing_ids] | |
| if len(existing_ids) > 0: | |
| logger.info(f"Filtered {len(existing_ids)} previously processed articles.") | |
| return new_articles | |
| except Exception as e: | |
| logger.error(f"Error checking registry: {str(e)}") | |
| return articles | |
| def insert_article(self, article_data: dict) -> bool: | |
| """ | |
| Inserts processed article into articles and registry tables. | |
| """ | |
| if not self.supabase: | |
| return False | |
| try: | |
| article_id = article_data.get('id') | |
| if not article_id: | |
| return False | |
| article_record = { | |
| "id": article_id, | |
| "category": article_data.get('category', ''), | |
| "title": article_data.get('title', ''), | |
| "author": article_data.get('author', ''), | |
| "url": article_data.get('url', ''), | |
| "content": article_data.get('content', ''), | |
| "summary": article_data.get('summary', ''), | |
| "audio_url": article_data.get('audio_url', ''), | |
| "audio_status": article_data.get('audio_status', 'queued'), | |
| "published_at": article_data.get('published_date'), | |
| "scraped_at": article_data.get('scraped_at'), | |
| "summary_generated_at": article_data.get('summary_generated_at') | |
| } | |
| registry_record = { | |
| "id": article_id, | |
| "category": article_data.get('category', ''), | |
| "title": article_data.get('title', ''), | |
| "status": "completed" | |
| } | |
| self.supabase.table("articles").upsert(article_record).execute() | |
| self.supabase.table("registry").upsert(registry_record).execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}") | |
| return False | |
| def update_audio_status(self, article_id: str, status: str) -> bool: | |
| """ | |
| Updates the progressive audio_status ('queued', 'generating', 'ready'). | |
| """ | |
| if not self.supabase: | |
| return False | |
| try: | |
| self.supabase.table("articles").update( | |
| {"audio_status": status} | |
| ).eq("id", article_id).execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating audio_status for {article_id}: {str(e)}") | |
| return False | |
| def update_audio_url(self, article_id: str, audio_url: str) -> bool: | |
| """ | |
| Updates the audio_url and sets status to 'ready'. | |
| Called progressively as each TTS clip finishes generating. | |
| """ | |
| if not self.supabase: | |
| return False | |
| try: | |
| self.supabase.table("articles").update( | |
| { | |
| "audio_url": audio_url, | |
| "audio_status": "ready" | |
| } | |
| ).eq("id", article_id).execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating audio_url for {article_id}: {str(e)}") | |
| return False | |
| def check_query_cache(self, query: str, language: str) -> Optional[list]: | |
| """ | |
| Checks if a search query was already served today. | |
| Returns the cached articles in order, or None if no cache exists. | |
| """ | |
| if not self.supabase: | |
| return None | |
| try: | |
| query = query.strip().lower() | |
| # 1. Check if the query exists in cache for today (cache_date is automatically today in DB if not provided, | |
| # or we sort by created_at DESC) | |
| res = self.supabase.table("query_cache")\ | |
| .select("article_ids")\ | |
| .eq("query_text", query)\ | |
| .eq("language", language)\ | |
| .order("created_at", desc=True)\ | |
| .limit(1)\ | |
| .execute() | |
| if not res.data: | |
| return None | |
| article_ids = res.data[0].get("article_ids", []) | |
| if not article_ids: | |
| return None | |
| # 2. Fetch the actual articles | |
| art_res = self.supabase.table("articles").select("*").in_("id", article_ids).execute() | |
| # 3. Restore the original sorted order (Top 5 priority) | |
| article_map = {a["id"]: a for a in art_res.data} | |
| cached_articles = [article_map[aid] for aid in article_ids if aid in article_map] | |
| if cached_articles: | |
| logger.info(f"Cache hit! Restored {len(cached_articles)} articles for '{query}'") | |
| return cached_articles | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error reading query cache for '{query}': {e}") | |
| return None | |
| def write_query_cache(self, query: str, language: str, article_ids: list) -> bool: | |
| """ | |
| Saves the resulting top article IDs for a search query. | |
| """ | |
| if not self.supabase or not article_ids: | |
| return False | |
| try: | |
| query = query.strip().lower() | |
| import datetime | |
| today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") | |
| record = { | |
| "query_text": query, | |
| "language": language, | |
| "cache_date": today, | |
| "article_ids": article_ids | |
| } | |
| self.supabase.table("query_cache").upsert(record).execute() | |
| logger.info(f"Cached top {len(article_ids)} articles for '{query}'") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error writing query cache for '{query}': {e}") | |
| return False | |