Devang1290
feat: phase 3 - supabase query cache for instant top 5 results on repeat searches
0b32eb4
import sys
from pathlib import Path
from typing import Optional
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
from backend.core.logger import logger
from backend.core.config import config
try:
from supabase import create_client, Client
except ImportError:
pass
class DatabaseManager:
def __init__(self):
url = config.SUPABASE_URL
key = config.SUPABASE_KEY
if not url or not key:
logger.warning("Supabase URL or Key missing. Database operations disabled.")
self.supabase = None
else:
try:
self.supabase = create_client(url, key)
except Exception as e:
logger.error(f"Failed to initialize Supabase client: {e}")
self.supabase = None
def filter_unprocessed(self, articles: list) -> list:
"""
Takes a list of articles and returns only those that haven't been processed
(i.e., not present in the registry table).
"""
if not self.supabase or not articles:
return articles
try:
article_ids = [a.get('id') for a in articles if a.get('id')]
if not article_ids:
return articles
response = self.supabase.table("registry").select("id").in_("id", article_ids).execute()
existing_ids = {item['id'] for item in response.data}
new_articles = [a for a in articles if a.get('id') not in existing_ids]
if len(existing_ids) > 0:
logger.info(f"Filtered {len(existing_ids)} previously processed articles.")
return new_articles
except Exception as e:
logger.error(f"Error checking registry: {str(e)}")
return articles
def insert_article(self, article_data: dict) -> bool:
"""
Inserts processed article into articles and registry tables.
"""
if not self.supabase:
return False
try:
article_id = article_data.get('id')
if not article_id:
return False
article_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"author": article_data.get('author', ''),
"url": article_data.get('url', ''),
"content": article_data.get('content', ''),
"summary": article_data.get('summary', ''),
"audio_url": article_data.get('audio_url', ''),
"audio_status": article_data.get('audio_status', 'queued'),
"published_at": article_data.get('published_date'),
"scraped_at": article_data.get('scraped_at'),
"summary_generated_at": article_data.get('summary_generated_at')
}
registry_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"status": "completed"
}
self.supabase.table("articles").upsert(article_record).execute()
self.supabase.table("registry").upsert(registry_record).execute()
return True
except Exception as e:
logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
return False
def update_audio_status(self, article_id: str, status: str) -> bool:
"""
Updates the progressive audio_status ('queued', 'generating', 'ready').
"""
if not self.supabase:
return False
try:
self.supabase.table("articles").update(
{"audio_status": status}
).eq("id", article_id).execute()
return True
except Exception as e:
logger.error(f"Error updating audio_status for {article_id}: {str(e)}")
return False
def update_audio_url(self, article_id: str, audio_url: str) -> bool:
"""
Updates the audio_url and sets status to 'ready'.
Called progressively as each TTS clip finishes generating.
"""
if not self.supabase:
return False
try:
self.supabase.table("articles").update(
{
"audio_url": audio_url,
"audio_status": "ready"
}
).eq("id", article_id).execute()
return True
except Exception as e:
logger.error(f"Error updating audio_url for {article_id}: {str(e)}")
return False
def check_query_cache(self, query: str, language: str) -> Optional[list]:
"""
Checks if a search query was already served today.
Returns the cached articles in order, or None if no cache exists.
"""
if not self.supabase:
return None
try:
query = query.strip().lower()
# 1. Check if the query exists in cache for today (cache_date is automatically today in DB if not provided,
# or we sort by created_at DESC)
res = self.supabase.table("query_cache")\
.select("article_ids")\
.eq("query_text", query)\
.eq("language", language)\
.order("created_at", desc=True)\
.limit(1)\
.execute()
if not res.data:
return None
article_ids = res.data[0].get("article_ids", [])
if not article_ids:
return None
# 2. Fetch the actual articles
art_res = self.supabase.table("articles").select("*").in_("id", article_ids).execute()
# 3. Restore the original sorted order (Top 5 priority)
article_map = {a["id"]: a for a in art_res.data}
cached_articles = [article_map[aid] for aid in article_ids if aid in article_map]
if cached_articles:
logger.info(f"Cache hit! Restored {len(cached_articles)} articles for '{query}'")
return cached_articles
return None
except Exception as e:
logger.error(f"Error reading query cache for '{query}': {e}")
return None
def write_query_cache(self, query: str, language: str, article_ids: list) -> bool:
"""
Saves the resulting top article IDs for a search query.
"""
if not self.supabase or not article_ids:
return False
try:
query = query.strip().lower()
import datetime
today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
record = {
"query_text": query,
"language": language,
"cache_date": today,
"article_ids": article_ids
}
self.supabase.table("query_cache").upsert(record).execute()
logger.info(f"Cached top {len(article_ids)} articles for '{query}'")
return True
except Exception as e:
logger.error(f"Error writing query cache for '{query}': {e}")
return False