| import json | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| from supabase import Client, create_client | |
| _SUPABASE_CLIENT: Optional[Client] = None | |
| def _get_client() -> Client: | |
| """Create (or reuse) a Supabase client for database interactions.""" | |
| global _SUPABASE_CLIENT | |
| if _SUPABASE_CLIENT is None: | |
| url = os.getenv("SUPABASE_URL") | |
| service_role_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| if not url or not service_role_key: | |
| raise RuntimeError( | |
| "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY must be set for database access." | |
| ) | |
| _SUPABASE_CLIENT = create_client(url, service_role_key) | |
| return _SUPABASE_CLIENT | |
| def _execute(query): | |
| response = query.execute() | |
| if response.error: | |
| raise RuntimeError(f"Supabase error: {response.error.message}") | |
| return response | |
| def _ensure_user_profile(user_id: str) -> None: | |
| client = _get_client() | |
| _execute( | |
| client.table("user_profiles").upsert({"user_id": user_id}, on_conflict="user_id") | |
| ) | |
| def get_user_profile(user_id: str) -> Optional[Dict[str, Any]]: | |
| client = _get_client() | |
| response = _execute( | |
| client.table("user_profiles") | |
| .select("user_id, name, preferences, personality_summary, last_updated, created_at") | |
| .eq("user_id", user_id) | |
| .limit(1) | |
| ) | |
| if not response.data: | |
| return None | |
| record = response.data[0] | |
| if isinstance(record.get("preferences"), str): | |
| try: | |
| record["preferences"] = json.loads(record["preferences"]) | |
| except json.JSONDecodeError: | |
| record["preferences"] = {} | |
| return record | |
| def update_user_profile( | |
| user_id: str, | |
| *, | |
| name: Optional[str] = None, | |
| preferences: Optional[str] = None, | |
| personality_summary: Optional[str] = None, | |
| ) -> None: | |
| updates: Dict[str, Any] = {} | |
| if name is not None: | |
| updates["name"] = name | |
| if preferences is not None: | |
| updates["preferences"] = preferences | |
| if personality_summary is not None: | |
| updates["personality_summary"] = personality_summary | |
| if not updates: | |
| return | |
| client = _get_client() | |
| _ensure_user_profile(user_id) | |
| _execute(client.table("user_profiles").update(updates).eq("user_id", user_id)) | |
| def save_conversation(user_id: str, user_message: str, ai_response: str) -> str: | |
| client = _get_client() | |
| _ensure_user_profile(user_id) | |
| response = _execute( | |
| client.table("conversations").insert( | |
| { | |
| "user_id": user_id, | |
| "user_message": user_message, | |
| "ai_response": ai_response, | |
| } | |
| ) | |
| ) | |
| inserted = response.data[0] | |
| return str(inserted.get("id")) | |
| def get_recent_conversations(user_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: | |
| client = _get_client() | |
| query = ( | |
| client.table("conversations") | |
| .select("user_message, ai_response, created_at") | |
| .eq("user_id", user_id) | |
| .order("created_at", desc=True) | |
| ) | |
| if limit is not None: | |
| query = query.limit(limit) | |
| response = _execute(query) | |
| return response.data or [] | |
| def get_conversation_history(user_id: str) -> List[Dict[str, Any]]: | |
| client = _get_client() | |
| response = _execute( | |
| client.table("conversations") | |
| .select("user_message, ai_response, created_at") | |
| .eq("user_id", user_id) | |
| .order("created_at", desc=False) | |
| ) | |
| return response.data or [] | |
| def count_user_messages(user_id: str) -> int: | |
| client = _get_client() | |
| response = _execute( | |
| client.table("conversations") | |
| .select("id", count="exact") | |
| .eq("user_id", user_id) | |
| ) | |
| return response.count or 0 | |
| def update_user_profile_summary(user_id: str, summary: str) -> None: | |
| update_user_profile(user_id, personality_summary=summary) | |
| def get_user_embeddings(user_id: str) -> List[Dict[str, Any]]: | |
| client = _get_client() | |
| response = _execute( | |
| client.table("embeddings") | |
| .select("text, embedding") | |
| .eq("user_id", user_id) | |
| .order("created_at", desc=True) | |
| ) | |
| items: List[Dict[str, Any]] = [] | |
| for record in response.data or []: | |
| embedding = record.get("embedding") | |
| if isinstance(embedding, str): | |
| try: | |
| embedding = json.loads(embedding) | |
| except json.JSONDecodeError: | |
| embedding = [] | |
| items.append({"text": record.get("text", ""), "embedding": embedding}) | |
| return items | |
| def add_embedding(user_id: str, text: str, embedding: List[float]) -> None: | |
| client = _get_client() | |
| _ensure_user_profile(user_id) | |
| _execute( | |
| client.table("embeddings").insert( | |
| { | |
| "user_id": user_id, | |
| "text": text, | |
| "embedding": embedding, | |
| } | |
| ) | |
| ) | |