""" REST client for Supabase/PostgREST. Uses the service key directly and avoids the Python SDK dependency tree. """ from __future__ import annotations from typing import Dict, List, Optional import logging import httpx logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SupabaseClient: """Minimal async client for the `fipi_tasks` table.""" ALLOWED_TASK_FIELDS = { "title", "content", "source_url", "task_type", "images", "variants", "task_number", "source_kind", "task_guid", "rubert_analysis", "scraped_at", } def __init__(self, url: str, key: str): self.base_url = f"{url.rstrip('/')}/rest/v1" self.table_name = "fipi_tasks" self.headers = { "apikey": key, "Authorization": f"Bearer {key}", "Content-Type": "application/json", } async def _request( self, method: str, path: str, *, params: Optional[Dict[str, str]] = None, json: Optional[Dict | List[Dict]] = None, prefer: Optional[str] = None, ) -> List[Dict]: headers = dict(self.headers) if prefer: headers["Prefer"] = prefer async with httpx.AsyncClient(timeout=30.0) as client: response = await client.request( method, f"{self.base_url}/{path}", params=params, json=json, headers=headers, ) response.raise_for_status() if not response.content: return [] data = response.json() return data if isinstance(data, list) else [data] def _prepare_task_payload(self, task: Dict) -> Dict: payload = { key: value for key, value in task.items() if key in self.ALLOWED_TASK_FIELDS } payload.setdefault("task_type", "other") payload.setdefault("images", []) payload.setdefault("variants", []) return payload def _has_enrichment_changes(self, existing: Dict, incoming: Dict) -> bool: for field in ("task_number", "source_kind", "task_guid"): if incoming.get(field) and not existing.get(field): return True incoming_variants = incoming.get("variants") or [] existing_variants = existing.get("variants") or [] if incoming_variants and not existing_variants: return True incoming_images = incoming.get("images") or [] existing_images = existing.get("images") or [] if incoming_images and not existing_images: return True return False async def is_available(self) -> bool: try: await self._request( "GET", self.table_name, params={"select": "id", "limit": "1"}, ) return True except Exception as e: logger.error("Supabase availability check failed: %s", e) return False async def insert_task(self, task: Dict) -> Optional[Dict]: try: existing = await self.get_task_by_url(task.get("source_url", "")) if existing: if self._has_enrichment_changes(existing, task): logger.info("Updating existing task metadata: %s", task.get("source_url")) return await self.update_task(existing["id"], task) logger.info("Task already exists: %s", task.get("source_url")) return None result = await self._request( "POST", self.table_name, json=self._prepare_task_payload(task), prefer="return=representation", ) return result[0] if result else None except httpx.HTTPStatusError as e: detail = e.response.text if e.response is not None else str(e) logger.error("Error inserting task: %s", detail) return None except Exception as e: logger.error("Error inserting task: %s", e) return None async def insert_tasks_batch(self, tasks: List[Dict]) -> List[Dict]: saved = [] for task in tasks: result = await self.insert_task(task) if result: saved.append(result) logger.info("Saved %s of %s tasks", len(saved), len(tasks)) return saved async def get_task_by_id(self, task_id: int) -> Optional[Dict]: try: result = await self._request( "GET", self.table_name, params={"select": "*", "id": f"eq.{task_id}"}, ) return result[0] if result else None except Exception as e: logger.error("Error getting task by id: %s", e) return None async def get_task_by_url(self, url: str) -> Optional[Dict]: if not url: return None try: result = await self._request( "GET", self.table_name, params={"select": "*", "source_url": f"eq.{url}"}, ) return result[0] if result else None except Exception as e: logger.error("Error getting task by url: %s", e) return None async def get_latest_tasks(self, limit: int = 10) -> List[Dict]: try: return await self._request( "GET", self.table_name, params={ "select": "*", "order": "scraped_at.desc", "limit": str(limit), }, ) except Exception as e: logger.error("Error getting latest tasks: %s", e) return [] async def get_all_tasks(self) -> List[Dict]: try: return await self._request( "GET", self.table_name, params={"select": "*", "order": "scraped_at.desc"}, ) except Exception as e: logger.error("Error getting all tasks: %s", e) return [] async def search_tasks(self, query: str) -> List[Dict]: try: escaped_query = query.replace(",", " ").replace("(", " ").replace(")", " ") pattern = f"*{escaped_query}*" or_filter = f"(title.ilike.{pattern},content.ilike.{pattern})" return await self._request( "GET", self.table_name, params={ "select": "*", "or": or_filter, }, ) except Exception as e: logger.error("Error searching tasks: %s", e) return [] async def get_tasks_by_type(self, task_type: str) -> List[Dict]: try: return await self._request( "GET", self.table_name, params={"select": "*", "task_type": f"eq.{task_type}"}, ) except Exception as e: logger.error("Error getting tasks by type: %s", e) return [] async def update_task(self, task_id: int, updates: Dict) -> Optional[Dict]: try: result = await self._request( "PATCH", self.table_name, params={"id": f"eq.{task_id}"}, json=self._prepare_task_payload(updates), prefer="return=representation", ) return result[0] if result else None except Exception as e: logger.error("Error updating task: %s", e) return None async def delete_task(self, task_id: int) -> bool: try: await self._request( "DELETE", self.table_name, params={"id": f"eq.{task_id}"}, prefer="return=representation", ) return True except Exception as e: logger.error("Error deleting task: %s", e) return False async def get_stats(self) -> Dict: try: all_tasks = await self.get_all_tasks() stats = {"total": len(all_tasks), "by_type": {}} for task in all_tasks: task_type = task.get("task_type", "unknown") stats["by_type"][task_type] = stats["by_type"].get(task_type, 0) + 1 return stats except Exception as e: logger.error("Error getting stats: %s", e) return {"total": 0, "by_type": {}}