File size: 4,893 Bytes
015dbc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import json
import os
from typing import Any, Dict, List, Optional
from supabase import Client, create_client
_SUPABASE_CLIENT: Optional[Client] = None
def _get_client() -> Client:
"""Create (or reuse) a Supabase client for database interactions."""
global _SUPABASE_CLIENT
if _SUPABASE_CLIENT is None:
url = os.getenv("SUPABASE_URL")
service_role_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not url or not service_role_key:
raise RuntimeError(
"SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY must be set for database access."
)
_SUPABASE_CLIENT = create_client(url, service_role_key)
return _SUPABASE_CLIENT
def _execute(query):
response = query.execute()
if response.error:
raise RuntimeError(f"Supabase error: {response.error.message}")
return response
def _ensure_user_profile(user_id: str) -> None:
client = _get_client()
_execute(
client.table("user_profiles").upsert({"user_id": user_id}, on_conflict="user_id")
)
def get_user_profile(user_id: str) -> Optional[Dict[str, Any]]:
client = _get_client()
response = _execute(
client.table("user_profiles")
.select("user_id, name, preferences, personality_summary, last_updated, created_at")
.eq("user_id", user_id)
.limit(1)
)
if not response.data:
return None
record = response.data[0]
if isinstance(record.get("preferences"), str):
try:
record["preferences"] = json.loads(record["preferences"])
except json.JSONDecodeError:
record["preferences"] = {}
return record
def update_user_profile(
user_id: str,
*,
name: Optional[str] = None,
preferences: Optional[str] = None,
personality_summary: Optional[str] = None,
) -> None:
updates: Dict[str, Any] = {}
if name is not None:
updates["name"] = name
if preferences is not None:
updates["preferences"] = preferences
if personality_summary is not None:
updates["personality_summary"] = personality_summary
if not updates:
return
client = _get_client()
_ensure_user_profile(user_id)
_execute(client.table("user_profiles").update(updates).eq("user_id", user_id))
def save_conversation(user_id: str, user_message: str, ai_response: str) -> str:
client = _get_client()
_ensure_user_profile(user_id)
response = _execute(
client.table("conversations").insert(
{
"user_id": user_id,
"user_message": user_message,
"ai_response": ai_response,
}
)
)
inserted = response.data[0]
return str(inserted.get("id"))
def get_recent_conversations(user_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
client = _get_client()
query = (
client.table("conversations")
.select("user_message, ai_response, created_at")
.eq("user_id", user_id)
.order("created_at", desc=True)
)
if limit is not None:
query = query.limit(limit)
response = _execute(query)
return response.data or []
def get_conversation_history(user_id: str) -> List[Dict[str, Any]]:
client = _get_client()
response = _execute(
client.table("conversations")
.select("user_message, ai_response, created_at")
.eq("user_id", user_id)
.order("created_at", desc=False)
)
return response.data or []
def count_user_messages(user_id: str) -> int:
client = _get_client()
response = _execute(
client.table("conversations")
.select("id", count="exact")
.eq("user_id", user_id)
)
return response.count or 0
def update_user_profile_summary(user_id: str, summary: str) -> None:
update_user_profile(user_id, personality_summary=summary)
def get_user_embeddings(user_id: str) -> List[Dict[str, Any]]:
client = _get_client()
response = _execute(
client.table("embeddings")
.select("text, embedding")
.eq("user_id", user_id)
.order("created_at", desc=True)
)
items: List[Dict[str, Any]] = []
for record in response.data or []:
embedding = record.get("embedding")
if isinstance(embedding, str):
try:
embedding = json.loads(embedding)
except json.JSONDecodeError:
embedding = []
items.append({"text": record.get("text", ""), "embedding": embedding})
return items
def add_embedding(user_id: str, text: str, embedding: List[float]) -> None:
client = _get_client()
_ensure_user_profile(user_id)
_execute(
client.table("embeddings").insert(
{
"user_id": user_id,
"text": text,
"embedding": embedding,
}
)
)
|