greeta commited on
Commit
1e8e39a
·
verified ·
1 Parent(s): 31e848a

Upload supabase_client.py

Browse files
Files changed (1) hide show
  1. supabase_client.py +240 -0
supabase_client.py ADDED
@@ -0,0 +1,240 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ REST client for Supabase/PostgREST.
3
+ Uses the service key directly and avoids the Python SDK dependency tree.
4
+ """
5
+
6
+ from __future__ import annotations
7
+
8
+ from typing import Dict, List, Optional
9
+ import logging
10
+
11
+ import httpx
12
+
13
+ logging.basicConfig(level=logging.INFO)
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ class SupabaseClient:
18
+ """Minimal async client for the `fipi_tasks` table."""
19
+
20
+ ALLOWED_TASK_FIELDS = {
21
+ "title",
22
+ "content",
23
+ "source_url",
24
+ "task_type",
25
+ "images",
26
+ "variants",
27
+ "rubert_analysis",
28
+ "scraped_at",
29
+ }
30
+
31
+ def __init__(self, url: str, key: str):
32
+ self.base_url = f"{url.rstrip('/')}/rest/v1"
33
+ self.table_name = "fipi_tasks"
34
+ self.headers = {
35
+ "apikey": key,
36
+ "Authorization": f"Bearer {key}",
37
+ "Content-Type": "application/json",
38
+ }
39
+
40
+ async def _request(
41
+ self,
42
+ method: str,
43
+ path: str,
44
+ *,
45
+ params: Optional[Dict[str, str]] = None,
46
+ json: Optional[Dict | List[Dict]] = None,
47
+ prefer: Optional[str] = None,
48
+ ) -> List[Dict]:
49
+ headers = dict(self.headers)
50
+ if prefer:
51
+ headers["Prefer"] = prefer
52
+
53
+ async with httpx.AsyncClient(timeout=30.0) as client:
54
+ response = await client.request(
55
+ method,
56
+ f"{self.base_url}/{path}",
57
+ params=params,
58
+ json=json,
59
+ headers=headers,
60
+ )
61
+ response.raise_for_status()
62
+
63
+ if not response.content:
64
+ return []
65
+
66
+ data = response.json()
67
+ return data if isinstance(data, list) else [data]
68
+
69
+ def _prepare_task_payload(self, task: Dict) -> Dict:
70
+ payload = {
71
+ key: value
72
+ for key, value in task.items()
73
+ if key in self.ALLOWED_TASK_FIELDS
74
+ }
75
+ payload.setdefault("task_type", "other")
76
+ payload.setdefault("images", [])
77
+ payload.setdefault("variants", [])
78
+ return payload
79
+
80
+ async def is_available(self) -> bool:
81
+ try:
82
+ await self._request(
83
+ "GET",
84
+ self.table_name,
85
+ params={"select": "id", "limit": "1"},
86
+ )
87
+ return True
88
+ except Exception as e:
89
+ logger.error("Supabase availability check failed: %s", e)
90
+ return False
91
+
92
+ async def insert_task(self, task: Dict) -> Optional[Dict]:
93
+ try:
94
+ existing = await self.get_task_by_url(task.get("source_url", ""))
95
+ if existing:
96
+ logger.info("Task already exists: %s", task.get("source_url"))
97
+ return None
98
+
99
+ result = await self._request(
100
+ "POST",
101
+ self.table_name,
102
+ json=self._prepare_task_payload(task),
103
+ prefer="return=representation",
104
+ )
105
+ return result[0] if result else None
106
+ except httpx.HTTPStatusError as e:
107
+ detail = e.response.text if e.response is not None else str(e)
108
+ logger.error("Error inserting task: %s", detail)
109
+ return None
110
+ except Exception as e:
111
+ logger.error("Error inserting task: %s", e)
112
+ return None
113
+
114
+ async def insert_tasks_batch(self, tasks: List[Dict]) -> List[Dict]:
115
+ saved = []
116
+ for task in tasks:
117
+ result = await self.insert_task(task)
118
+ if result:
119
+ saved.append(result)
120
+ logger.info("Saved %s of %s tasks", len(saved), len(tasks))
121
+ return saved
122
+
123
+ async def get_task_by_id(self, task_id: int) -> Optional[Dict]:
124
+ try:
125
+ result = await self._request(
126
+ "GET",
127
+ self.table_name,
128
+ params={"select": "*", "id": f"eq.{task_id}"},
129
+ )
130
+ return result[0] if result else None
131
+ except Exception as e:
132
+ logger.error("Error getting task by id: %s", e)
133
+ return None
134
+
135
+ async def get_task_by_url(self, url: str) -> Optional[Dict]:
136
+ if not url:
137
+ return None
138
+ try:
139
+ result = await self._request(
140
+ "GET",
141
+ self.table_name,
142
+ params={"select": "*", "source_url": f"eq.{url}"},
143
+ )
144
+ return result[0] if result else None
145
+ except Exception as e:
146
+ logger.error("Error getting task by url: %s", e)
147
+ return None
148
+
149
+ async def get_latest_tasks(self, limit: int = 10) -> List[Dict]:
150
+ try:
151
+ return await self._request(
152
+ "GET",
153
+ self.table_name,
154
+ params={
155
+ "select": "*",
156
+ "order": "scraped_at.desc",
157
+ "limit": str(limit),
158
+ },
159
+ )
160
+ except Exception as e:
161
+ logger.error("Error getting latest tasks: %s", e)
162
+ return []
163
+
164
+ async def get_all_tasks(self) -> List[Dict]:
165
+ try:
166
+ return await self._request(
167
+ "GET",
168
+ self.table_name,
169
+ params={"select": "*", "order": "scraped_at.desc"},
170
+ )
171
+ except Exception as e:
172
+ logger.error("Error getting all tasks: %s", e)
173
+ return []
174
+
175
+ async def search_tasks(self, query: str) -> List[Dict]:
176
+ try:
177
+ escaped_query = query.replace(",", " ").replace("(", " ").replace(")", " ")
178
+ pattern = f"*{escaped_query}*"
179
+ or_filter = f"(title.ilike.{pattern},content.ilike.{pattern})"
180
+ return await self._request(
181
+ "GET",
182
+ self.table_name,
183
+ params={
184
+ "select": "*",
185
+ "or": or_filter,
186
+ },
187
+ )
188
+ except Exception as e:
189
+ logger.error("Error searching tasks: %s", e)
190
+ return []
191
+
192
+ async def get_tasks_by_type(self, task_type: str) -> List[Dict]:
193
+ try:
194
+ return await self._request(
195
+ "GET",
196
+ self.table_name,
197
+ params={"select": "*", "task_type": f"eq.{task_type}"},
198
+ )
199
+ except Exception as e:
200
+ logger.error("Error getting tasks by type: %s", e)
201
+ return []
202
+
203
+ async def update_task(self, task_id: int, updates: Dict) -> Optional[Dict]:
204
+ try:
205
+ result = await self._request(
206
+ "PATCH",
207
+ self.table_name,
208
+ params={"id": f"eq.{task_id}"},
209
+ json=self._prepare_task_payload(updates),
210
+ prefer="return=representation",
211
+ )
212
+ return result[0] if result else None
213
+ except Exception as e:
214
+ logger.error("Error updating task: %s", e)
215
+ return None
216
+
217
+ async def delete_task(self, task_id: int) -> bool:
218
+ try:
219
+ await self._request(
220
+ "DELETE",
221
+ self.table_name,
222
+ params={"id": f"eq.{task_id}"},
223
+ prefer="return=representation",
224
+ )
225
+ return True
226
+ except Exception as e:
227
+ logger.error("Error deleting task: %s", e)
228
+ return False
229
+
230
+ async def get_stats(self) -> Dict:
231
+ try:
232
+ all_tasks = await self.get_all_tasks()
233
+ stats = {"total": len(all_tasks), "by_type": {}}
234
+ for task in all_tasks:
235
+ task_type = task.get("task_type", "unknown")
236
+ stats["by_type"][task_type] = stats["by_type"].get(task_type, 0) + 1
237
+ return stats
238
+ except Exception as e:
239
+ logger.error("Error getting stats: %s", e)
240
+ return {"total": 0, "by_type": {}}