Spaces:
Sleeping
Sleeping
File size: 7,505 Bytes
2cb327c 0b32eb4 2cb327c 0147f31 2cb327c e15f0bd 0147f31 e15f0bd 0147f31 e15f0bd 0147f31 e15f0bd 0b32eb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | import sys
from pathlib import Path
from typing import Optional
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
from backend.core.logger import logger
from backend.core.config import config
try:
from supabase import create_client, Client
except ImportError:
pass
class DatabaseManager:
def __init__(self):
url = config.SUPABASE_URL
key = config.SUPABASE_KEY
if not url or not key:
logger.warning("Supabase URL or Key missing. Database operations disabled.")
self.supabase = None
else:
try:
self.supabase = create_client(url, key)
except Exception as e:
logger.error(f"Failed to initialize Supabase client: {e}")
self.supabase = None
def filter_unprocessed(self, articles: list) -> list:
"""
Takes a list of articles and returns only those that haven't been processed
(i.e., not present in the registry table).
"""
if not self.supabase or not articles:
return articles
try:
article_ids = [a.get('id') for a in articles if a.get('id')]
if not article_ids:
return articles
response = self.supabase.table("registry").select("id").in_("id", article_ids).execute()
existing_ids = {item['id'] for item in response.data}
new_articles = [a for a in articles if a.get('id') not in existing_ids]
if len(existing_ids) > 0:
logger.info(f"Filtered {len(existing_ids)} previously processed articles.")
return new_articles
except Exception as e:
logger.error(f"Error checking registry: {str(e)}")
return articles
def insert_article(self, article_data: dict) -> bool:
"""
Inserts processed article into articles and registry tables.
"""
if not self.supabase:
return False
try:
article_id = article_data.get('id')
if not article_id:
return False
article_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"author": article_data.get('author', ''),
"url": article_data.get('url', ''),
"content": article_data.get('content', ''),
"summary": article_data.get('summary', ''),
"audio_url": article_data.get('audio_url', ''),
"audio_status": article_data.get('audio_status', 'queued'),
"published_at": article_data.get('published_date'),
"scraped_at": article_data.get('scraped_at'),
"summary_generated_at": article_data.get('summary_generated_at')
}
registry_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"status": "completed"
}
self.supabase.table("articles").upsert(article_record).execute()
self.supabase.table("registry").upsert(registry_record).execute()
return True
except Exception as e:
logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
return False
def update_audio_status(self, article_id: str, status: str) -> bool:
"""
Updates the progressive audio_status ('queued', 'generating', 'ready').
"""
if not self.supabase:
return False
try:
self.supabase.table("articles").update(
{"audio_status": status}
).eq("id", article_id).execute()
return True
except Exception as e:
logger.error(f"Error updating audio_status for {article_id}: {str(e)}")
return False
def update_audio_url(self, article_id: str, audio_url: str) -> bool:
"""
Updates the audio_url and sets status to 'ready'.
Called progressively as each TTS clip finishes generating.
"""
if not self.supabase:
return False
try:
self.supabase.table("articles").update(
{
"audio_url": audio_url,
"audio_status": "ready"
}
).eq("id", article_id).execute()
return True
except Exception as e:
logger.error(f"Error updating audio_url for {article_id}: {str(e)}")
return False
def check_query_cache(self, query: str, language: str) -> Optional[list]:
"""
Checks if a search query was already served today.
Returns the cached articles in order, or None if no cache exists.
"""
if not self.supabase:
return None
try:
query = query.strip().lower()
# 1. Check if the query exists in cache for today (cache_date is automatically today in DB if not provided,
# or we sort by created_at DESC)
res = self.supabase.table("query_cache")\
.select("article_ids")\
.eq("query_text", query)\
.eq("language", language)\
.order("created_at", desc=True)\
.limit(1)\
.execute()
if not res.data:
return None
article_ids = res.data[0].get("article_ids", [])
if not article_ids:
return None
# 2. Fetch the actual articles
art_res = self.supabase.table("articles").select("*").in_("id", article_ids).execute()
# 3. Restore the original sorted order (Top 5 priority)
article_map = {a["id"]: a for a in art_res.data}
cached_articles = [article_map[aid] for aid in article_ids if aid in article_map]
if cached_articles:
logger.info(f"Cache hit! Restored {len(cached_articles)} articles for '{query}'")
return cached_articles
return None
except Exception as e:
logger.error(f"Error reading query cache for '{query}': {e}")
return None
def write_query_cache(self, query: str, language: str, article_ids: list) -> bool:
"""
Saves the resulting top article IDs for a search query.
"""
if not self.supabase or not article_ids:
return False
try:
query = query.strip().lower()
import datetime
today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
record = {
"query_text": query,
"language": language,
"cache_date": today,
"article_ids": article_ids
}
self.supabase.table("query_cache").upsert(record).execute()
logger.info(f"Cached top {len(article_ids)} articles for '{query}'")
return True
except Exception as e:
logger.error(f"Error writing query cache for '{query}': {e}")
return False
|