import sys from pathlib import Path from typing import Optional sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) from backend.core.logger import logger from backend.core.config import config try: from supabase import create_client, Client except ImportError: pass class DatabaseManager: def __init__(self): url = config.SUPABASE_URL key = config.SUPABASE_KEY if not url or not key: logger.warning("Supabase URL or Key missing. Database operations disabled.") self.supabase = None else: try: self.supabase = create_client(url, key) except Exception as e: logger.error(f"Failed to initialize Supabase client: {e}") self.supabase = None def filter_unprocessed(self, articles: list) -> list: """ Takes a list of articles and returns only those that haven't been processed (i.e., not present in the registry table). """ if not self.supabase or not articles: return articles try: article_ids = [a.get('id') for a in articles if a.get('id')] if not article_ids: return articles response = self.supabase.table("registry").select("id").in_("id", article_ids).execute() existing_ids = {item['id'] for item in response.data} new_articles = [a for a in articles if a.get('id') not in existing_ids] if len(existing_ids) > 0: logger.info(f"Filtered {len(existing_ids)} previously processed articles.") return new_articles except Exception as e: logger.error(f"Error checking registry: {str(e)}") return articles def insert_article(self, article_data: dict) -> bool: """ Inserts processed article into articles and registry tables. """ if not self.supabase: return False try: article_id = article_data.get('id') if not article_id: return False article_record = { "id": article_id, "category": article_data.get('category', ''), "title": article_data.get('title', ''), "author": article_data.get('author', ''), "url": article_data.get('url', ''), "content": article_data.get('content', ''), "summary": article_data.get('summary', ''), "audio_url": article_data.get('audio_url', ''), "audio_status": article_data.get('audio_status', 'queued'), "published_at": article_data.get('published_date'), "scraped_at": article_data.get('scraped_at'), "summary_generated_at": article_data.get('summary_generated_at') } registry_record = { "id": article_id, "category": article_data.get('category', ''), "title": article_data.get('title', ''), "status": "completed" } self.supabase.table("articles").upsert(article_record).execute() self.supabase.table("registry").upsert(registry_record).execute() return True except Exception as e: logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}") return False def update_audio_status(self, article_id: str, status: str) -> bool: """ Updates the progressive audio_status ('queued', 'generating', 'ready'). """ if not self.supabase: return False try: self.supabase.table("articles").update( {"audio_status": status} ).eq("id", article_id).execute() return True except Exception as e: logger.error(f"Error updating audio_status for {article_id}: {str(e)}") return False def update_audio_url(self, article_id: str, audio_url: str) -> bool: """ Updates the audio_url and sets status to 'ready'. Called progressively as each TTS clip finishes generating. """ if not self.supabase: return False try: self.supabase.table("articles").update( { "audio_url": audio_url, "audio_status": "ready" } ).eq("id", article_id).execute() return True except Exception as e: logger.error(f"Error updating audio_url for {article_id}: {str(e)}") return False def check_query_cache(self, query: str, language: str) -> Optional[list]: """ Checks if a search query was already served today. Returns the cached articles in order, or None if no cache exists. """ if not self.supabase: return None try: query = query.strip().lower() # 1. Check if the query exists in cache for today (cache_date is automatically today in DB if not provided, # or we sort by created_at DESC) res = self.supabase.table("query_cache")\ .select("article_ids")\ .eq("query_text", query)\ .eq("language", language)\ .order("created_at", desc=True)\ .limit(1)\ .execute() if not res.data: return None article_ids = res.data[0].get("article_ids", []) if not article_ids: return None # 2. Fetch the actual articles art_res = self.supabase.table("articles").select("*").in_("id", article_ids).execute() # 3. Restore the original sorted order (Top 5 priority) article_map = {a["id"]: a for a in art_res.data} cached_articles = [article_map[aid] for aid in article_ids if aid in article_map] if cached_articles: logger.info(f"Cache hit! Restored {len(cached_articles)} articles for '{query}'") return cached_articles return None except Exception as e: logger.error(f"Error reading query cache for '{query}': {e}") return None def write_query_cache(self, query: str, language: str, article_ids: list) -> bool: """ Saves the resulting top article IDs for a search query. """ if not self.supabase or not article_ids: return False try: query = query.strip().lower() import datetime today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") record = { "query_text": query, "language": language, "cache_date": today, "article_ids": article_ids } self.supabase.table("query_cache").upsert(record).execute() logger.info(f"Cached top {len(article_ids)} articles for '{query}'") return True except Exception as e: logger.error(f"Error writing query cache for '{query}': {e}") return False