File size: 7,505 Bytes
2cb327c
 
0b32eb4
2cb327c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0147f31
2cb327c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e15f0bd
0147f31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e15f0bd
 
0147f31
e15f0bd
 
 
 
 
 
 
0147f31
 
 
 
e15f0bd
 
 
 
 
0b32eb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import sys
from pathlib import Path
from typing import Optional

sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
from backend.core.logger import logger
from backend.core.config import config

try:
    from supabase import create_client, Client
except ImportError:
    pass

class DatabaseManager:
    def __init__(self):
        url = config.SUPABASE_URL
        key = config.SUPABASE_KEY
        
        if not url or not key:
            logger.warning("Supabase URL or Key missing. Database operations disabled.")
            self.supabase = None
        else:
            try:
                self.supabase = create_client(url, key)
            except Exception as e:
                logger.error(f"Failed to initialize Supabase client: {e}")
                self.supabase = None

    def filter_unprocessed(self, articles: list) -> list:
        """
        Takes a list of articles and returns only those that haven't been processed
        (i.e., not present in the registry table).
        """
        if not self.supabase or not articles:
            return articles

        try:
            article_ids = [a.get('id') for a in articles if a.get('id')]
            if not article_ids:
                return articles
                
            response = self.supabase.table("registry").select("id").in_("id", article_ids).execute()
            existing_ids = {item['id'] for item in response.data}
            
            new_articles = [a for a in articles if a.get('id') not in existing_ids]
            
            if len(existing_ids) > 0:
                logger.info(f"Filtered {len(existing_ids)} previously processed articles.")
                
            return new_articles
            
        except Exception as e:
            logger.error(f"Error checking registry: {str(e)}")
            return articles

    def insert_article(self, article_data: dict) -> bool:
        """
        Inserts processed article into articles and registry tables.
        """
        if not self.supabase:
            return False

        try:
            article_id = article_data.get('id')
            if not article_id:
                return False

            article_record = {
                "id": article_id,
                "category": article_data.get('category', ''),
                "title": article_data.get('title', ''),
                "author": article_data.get('author', ''),
                "url": article_data.get('url', ''),
                "content": article_data.get('content', ''),
                "summary": article_data.get('summary', ''),
                "audio_url": article_data.get('audio_url', ''),
                "audio_status": article_data.get('audio_status', 'queued'),
                "published_at": article_data.get('published_date'),
                "scraped_at": article_data.get('scraped_at'),
                "summary_generated_at": article_data.get('summary_generated_at')
            }
            
            registry_record = {
                "id": article_id,
                "category": article_data.get('category', ''),
                "title": article_data.get('title', ''),
                "status": "completed"
            }

            self.supabase.table("articles").upsert(article_record).execute()
            self.supabase.table("registry").upsert(registry_record).execute()
            
            return True

        except Exception as e:
            logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
            return False

    def update_audio_status(self, article_id: str, status: str) -> bool:
        """
        Updates the progressive audio_status ('queued', 'generating', 'ready').
        """
        if not self.supabase:
            return False
            
        try:
            self.supabase.table("articles").update(
                {"audio_status": status}
            ).eq("id", article_id).execute()
            return True
        except Exception as e:
            logger.error(f"Error updating audio_status for {article_id}: {str(e)}")
            return False

    def update_audio_url(self, article_id: str, audio_url: str) -> bool:
        """
        Updates the audio_url and sets status to 'ready'.
        Called progressively as each TTS clip finishes generating.
        """
        if not self.supabase:
            return False

        try:
            self.supabase.table("articles").update(
                {
                    "audio_url": audio_url,
                    "audio_status": "ready"
                }
            ).eq("id", article_id).execute()
            return True
        except Exception as e:
            logger.error(f"Error updating audio_url for {article_id}: {str(e)}")
            return False

    def check_query_cache(self, query: str, language: str) -> Optional[list]:
        """
        Checks if a search query was already served today.
        Returns the cached articles in order, or None if no cache exists.
        """
        if not self.supabase:
            return None
            
        try:
            query = query.strip().lower()
            
            # 1. Check if the query exists in cache for today (cache_date is automatically today in DB if not provided, 
            # or we sort by created_at DESC)
            res = self.supabase.table("query_cache")\
                .select("article_ids")\
                .eq("query_text", query)\
                .eq("language", language)\
                .order("created_at", desc=True)\
                .limit(1)\
                .execute()
                
            if not res.data:
                return None
                
            article_ids = res.data[0].get("article_ids", [])
            if not article_ids:
                return None
                
            # 2. Fetch the actual articles
            art_res = self.supabase.table("articles").select("*").in_("id", article_ids).execute()
            
            # 3. Restore the original sorted order (Top 5 priority)
            article_map = {a["id"]: a for a in art_res.data}
            cached_articles = [article_map[aid] for aid in article_ids if aid in article_map]
            
            if cached_articles:
                logger.info(f"Cache hit! Restored {len(cached_articles)} articles for '{query}'")
                return cached_articles
            return None

        except Exception as e:
            logger.error(f"Error reading query cache for '{query}': {e}")
            return None

    def write_query_cache(self, query: str, language: str, article_ids: list) -> bool:
        """
        Saves the resulting top article IDs for a search query.
        """
        if not self.supabase or not article_ids:
            return False
            
        try:
            query = query.strip().lower()
            import datetime
            today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
            
            record = {
                "query_text": query,
                "language": language,
                "cache_date": today,
                "article_ids": article_ids
            }
            
            self.supabase.table("query_cache").upsert(record).execute()
            logger.info(f"Cached top {len(article_ids)} articles for '{query}'")
            return True
        except Exception as e:
            logger.error(f"Error writing query cache for '{query}': {e}")
            return False