greeta commited on
Commit
5ed4248
·
verified ·
1 Parent(s): 6401b56

Upload supabase_client.py

Browse files
Files changed (1) hide show
  1. supabase_client.py +264 -0
supabase_client.py ADDED
@@ -0,0 +1,264 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ REST client for Supabase/PostgREST.
3
+ Uses the service key directly and avoids the Python SDK dependency tree.
4
+ """
5
+
6
+ from __future__ import annotations
7
+
8
+ from typing import Dict, List, Optional
9
+ import logging
10
+
11
+ import httpx
12
+
13
+ logging.basicConfig(level=logging.INFO)
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ class SupabaseClient:
18
+ """Minimal async client for the `fipi_tasks` table."""
19
+
20
+ ALLOWED_TASK_FIELDS = {
21
+ "title",
22
+ "content",
23
+ "source_url",
24
+ "task_type",
25
+ "images",
26
+ "variants",
27
+ "task_number",
28
+ "source_kind",
29
+ "task_guid",
30
+ "rubert_analysis",
31
+ "scraped_at",
32
+ }
33
+
34
+ def __init__(self, url: str, key: str):
35
+ self.base_url = f"{url.rstrip('/')}/rest/v1"
36
+ self.table_name = "fipi_tasks"
37
+ self.headers = {
38
+ "apikey": key,
39
+ "Authorization": f"Bearer {key}",
40
+ "Content-Type": "application/json",
41
+ }
42
+
43
+ async def _request(
44
+ self,
45
+ method: str,
46
+ path: str,
47
+ *,
48
+ params: Optional[Dict[str, str]] = None,
49
+ json: Optional[Dict | List[Dict]] = None,
50
+ prefer: Optional[str] = None,
51
+ ) -> List[Dict]:
52
+ headers = dict(self.headers)
53
+ if prefer:
54
+ headers["Prefer"] = prefer
55
+
56
+ async with httpx.AsyncClient(timeout=30.0) as client:
57
+ response = await client.request(
58
+ method,
59
+ f"{self.base_url}/{path}",
60
+ params=params,
61
+ json=json,
62
+ headers=headers,
63
+ )
64
+ response.raise_for_status()
65
+
66
+ if not response.content:
67
+ return []
68
+
69
+ data = response.json()
70
+ return data if isinstance(data, list) else [data]
71
+
72
+ def _prepare_task_payload(self, task: Dict) -> Dict:
73
+ payload = {
74
+ key: value
75
+ for key, value in task.items()
76
+ if key in self.ALLOWED_TASK_FIELDS
77
+ }
78
+ payload.setdefault("task_type", "other")
79
+ payload.setdefault("images", [])
80
+ payload.setdefault("variants", [])
81
+ return payload
82
+
83
+ def _has_enrichment_changes(self, existing: Dict, incoming: Dict) -> bool:
84
+ for field in ("task_number", "source_kind", "task_guid"):
85
+ if incoming.get(field) and not existing.get(field):
86
+ return True
87
+
88
+ incoming_variants = incoming.get("variants") or []
89
+ existing_variants = existing.get("variants") or []
90
+ if incoming_variants and not existing_variants:
91
+ return True
92
+
93
+ incoming_images = incoming.get("images") or []
94
+ existing_images = existing.get("images") or []
95
+ if incoming_images and not existing_images:
96
+ return True
97
+
98
+ return False
99
+
100
+ async def is_available(self) -> bool:
101
+ try:
102
+ await self._request(
103
+ "GET",
104
+ self.table_name,
105
+ params={"select": "id", "limit": "1"},
106
+ )
107
+ return True
108
+ except Exception as e:
109
+ logger.error("Supabase availability check failed: %s", e)
110
+ return False
111
+
112
+ async def insert_task(self, task: Dict) -> Optional[Dict]:
113
+ try:
114
+ existing = await self.get_task_by_url(task.get("source_url", ""))
115
+ if existing:
116
+ if self._has_enrichment_changes(existing, task):
117
+ logger.info("Updating existing task metadata: %s", task.get("source_url"))
118
+ return await self.update_task(existing["id"], task)
119
+
120
+ logger.info("Task already exists: %s", task.get("source_url"))
121
+ return None
122
+
123
+ result = await self._request(
124
+ "POST",
125
+ self.table_name,
126
+ json=self._prepare_task_payload(task),
127
+ prefer="return=representation",
128
+ )
129
+ return result[0] if result else None
130
+ except httpx.HTTPStatusError as e:
131
+ detail = e.response.text if e.response is not None else str(e)
132
+ logger.error("Error inserting task: %s", detail)
133
+ return None
134
+ except Exception as e:
135
+ logger.error("Error inserting task: %s", e)
136
+ return None
137
+
138
+ async def insert_tasks_batch(self, tasks: List[Dict]) -> List[Dict]:
139
+ saved = []
140
+ for task in tasks:
141
+ result = await self.insert_task(task)
142
+ if result:
143
+ saved.append(result)
144
+ logger.info("Saved %s of %s tasks", len(saved), len(tasks))
145
+ return saved
146
+
147
+ async def get_task_by_id(self, task_id: int) -> Optional[Dict]:
148
+ try:
149
+ result = await self._request(
150
+ "GET",
151
+ self.table_name,
152
+ params={"select": "*", "id": f"eq.{task_id}"},
153
+ )
154
+ return result[0] if result else None
155
+ except Exception as e:
156
+ logger.error("Error getting task by id: %s", e)
157
+ return None
158
+
159
+ async def get_task_by_url(self, url: str) -> Optional[Dict]:
160
+ if not url:
161
+ return None
162
+ try:
163
+ result = await self._request(
164
+ "GET",
165
+ self.table_name,
166
+ params={"select": "*", "source_url": f"eq.{url}"},
167
+ )
168
+ return result[0] if result else None
169
+ except Exception as e:
170
+ logger.error("Error getting task by url: %s", e)
171
+ return None
172
+
173
+ async def get_latest_tasks(self, limit: int = 10) -> List[Dict]:
174
+ try:
175
+ return await self._request(
176
+ "GET",
177
+ self.table_name,
178
+ params={
179
+ "select": "*",
180
+ "order": "scraped_at.desc",
181
+ "limit": str(limit),
182
+ },
183
+ )
184
+ except Exception as e:
185
+ logger.error("Error getting latest tasks: %s", e)
186
+ return []
187
+
188
+ async def get_all_tasks(self) -> List[Dict]:
189
+ try:
190
+ return await self._request(
191
+ "GET",
192
+ self.table_name,
193
+ params={"select": "*", "order": "scraped_at.desc"},
194
+ )
195
+ except Exception as e:
196
+ logger.error("Error getting all tasks: %s", e)
197
+ return []
198
+
199
+ async def search_tasks(self, query: str) -> List[Dict]:
200
+ try:
201
+ escaped_query = query.replace(",", " ").replace("(", " ").replace(")", " ")
202
+ pattern = f"*{escaped_query}*"
203
+ or_filter = f"(title.ilike.{pattern},content.ilike.{pattern})"
204
+ return await self._request(
205
+ "GET",
206
+ self.table_name,
207
+ params={
208
+ "select": "*",
209
+ "or": or_filter,
210
+ },
211
+ )
212
+ except Exception as e:
213
+ logger.error("Error searching tasks: %s", e)
214
+ return []
215
+
216
+ async def get_tasks_by_type(self, task_type: str) -> List[Dict]:
217
+ try:
218
+ return await self._request(
219
+ "GET",
220
+ self.table_name,
221
+ params={"select": "*", "task_type": f"eq.{task_type}"},
222
+ )
223
+ except Exception as e:
224
+ logger.error("Error getting tasks by type: %s", e)
225
+ return []
226
+
227
+ async def update_task(self, task_id: int, updates: Dict) -> Optional[Dict]:
228
+ try:
229
+ result = await self._request(
230
+ "PATCH",
231
+ self.table_name,
232
+ params={"id": f"eq.{task_id}"},
233
+ json=self._prepare_task_payload(updates),
234
+ prefer="return=representation",
235
+ )
236
+ return result[0] if result else None
237
+ except Exception as e:
238
+ logger.error("Error updating task: %s", e)
239
+ return None
240
+
241
+ async def delete_task(self, task_id: int) -> bool:
242
+ try:
243
+ await self._request(
244
+ "DELETE",
245
+ self.table_name,
246
+ params={"id": f"eq.{task_id}"},
247
+ prefer="return=representation",
248
+ )
249
+ return True
250
+ except Exception as e:
251
+ logger.error("Error deleting task: %s", e)
252
+ return False
253
+
254
+ async def get_stats(self) -> Dict:
255
+ try:
256
+ all_tasks = await self.get_all_tasks()
257
+ stats = {"total": len(all_tasks), "by_type": {}}
258
+ for task in all_tasks:
259
+ task_type = task.get("task_type", "unknown")
260
+ stats["by_type"][task_type] = stats["by_type"].get(task_type, 0) + 1
261
+ return stats
262
+ except Exception as e:
263
+ logger.error("Error getting stats: %s", e)
264
+ return {"total": 0, "by_type": {}}