Spaces:
Running
Running
| """ | |
| Supabase database client | |
| """ | |
| from typing import Any, Dict, List, Optional | |
| from contextlib import asynccontextmanager | |
| from supabase import create_client, Client | |
| from postgrest.exceptions import APIError | |
| from app.config import settings | |
| from app.utils.logging import get_logger | |
| logger = get_logger("database") | |
| class DatabaseClient: | |
| """Supabase database client wrapper""" | |
| _instance = None | |
| _client: Optional[Client] = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def connect(self) -> Client: | |
| """Initialize Supabase client""" | |
| if self._client is None: | |
| try: | |
| self._client = create_client( | |
| settings.supabase_url, | |
| settings.supabase_service_key or settings.supabase_key | |
| ) | |
| logger.info("Supabase client initialized") | |
| except Exception as e: | |
| logger.error("Failed to initialize Supabase client", error=str(e)) | |
| raise | |
| return self._client | |
| def client(self) -> Client: | |
| """Get Supabase client""" | |
| if self._client is None: | |
| return self.connect() | |
| return self._client | |
| def table(self): | |
| """Get table builder""" | |
| return self.client.table | |
| async def fetch_one( | |
| self, | |
| table: str, | |
| filters: Optional[Dict[str, Any]] = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """Fetch single record""" | |
| try: | |
| query = self.table(table) | |
| if filters: | |
| for key, value in filters.items(): | |
| query = query.eq(key, value) | |
| result = await query.execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0] | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error fetching from {table}", error=str(e)) | |
| return None | |
| async def fetch_many( | |
| self, | |
| table: str, | |
| filters: Optional[Dict[str, Any]] = None, | |
| order_by: Optional[str] = None, | |
| ascending: bool = True, | |
| limit: int = 100, | |
| offset: int = 0 | |
| ) -> List[Dict[str, Any]]: | |
| """Fetch multiple records""" | |
| try: | |
| query = self.table(table) | |
| if filters: | |
| for key, value in filters.items(): | |
| query = query.eq(key, value) | |
| if order_by: | |
| query = query.order(order_by, desc=not ascending) | |
| result = await query.limit(limit).offset(offset).execute() | |
| return result.data or [] | |
| except Exception as e: | |
| logger.error(f"Error fetching from {table}", error=str(e)) | |
| return [] | |
| async def insert( | |
| self, | |
| table: str, | |
| data: Dict[str, Any] | List[Dict[str, Any]] | |
| ) -> Optional[Dict[str, Any] | List[Dict[str, Any]]]: | |
| """Insert record(s)""" | |
| try: | |
| result = await self.table(table).insert(data).execute() | |
| return result.data | |
| except Exception as e: | |
| logger.error(f"Error inserting into {table}", error=str(e)) | |
| return None | |
| async def update( | |
| self, | |
| table: str, | |
| data: Dict[str, Any], | |
| filters: Dict[str, Any] | |
| ) -> Optional[List[Dict[str, Any]]]: | |
| """Update record(s)""" | |
| try: | |
| query = self.table(table) | |
| for key, value in filters.items(): | |
| query = query.eq(key, value) | |
| result = await query.update(data).execute() | |
| return result.data | |
| except Exception as e: | |
| logger.error(f"Error updating {table}", error=str(e)) | |
| return None | |
| async def delete( | |
| self, | |
| table: str, | |
| filters: Dict[str, Any] | |
| ) -> bool: | |
| """Delete record(s)""" | |
| try: | |
| query = self.table(table) | |
| for key, value in filters.items(): | |
| query = query.eq(key, value) | |
| await query.delete().execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting from {table}", error=str(e)) | |
| return False | |
| async def rpc( | |
| self, | |
| function_name: str, | |
| params: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """Call stored procedure""" | |
| try: | |
| result = await self.client.rpc(function_name, params or {}).execute() | |
| return result.data | |
| except Exception as e: | |
| logger.error(f"Error calling RPC {function_name}", error=str(e)) | |
| return None | |
| # Global database instance | |
| db = DatabaseClient() | |