atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Database utilities for the AI Agent system
Provides vector store access and database connection management
"""
import os
import logging
from typing import Optional, Any
from supabase import create_client, Client
logger = logging.getLogger(__name__)
# Global client instance
_supabase_client: Optional[Client] = None
_vector_store: Optional[Any] = None
def get_supabase_client() -> Client:
"""Get or create Supabase client"""
global _supabase_client
if _supabase_client is None:
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_KEY")
if not url or not key:
raise ValueError("SUPABASE_URL and SUPABASE_KEY environment variables are required")
_supabase_client = create_client(url, key)
logger.info("Supabase client initialized")
return _supabase_client
def get_vector_store():
"""Get vector store instance"""
global _vector_store
if _vector_store is None:
try:
# Try to get from enhanced database
from .database_enhanced import initialize_supabase_enhanced
import asyncio
# Initialize enhanced vector store
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
components = loop.run_until_complete(initialize_supabase_enhanced())
_vector_store = components.get('vector_store')
finally:
loop.close()
except Exception as e:
logger.warning(f"Could not initialize enhanced vector store: {e}")
_vector_store = None
return _vector_store
class SupabaseLogHandler(logging.Handler):
"""Custom log handler for Supabase"""
def __init__(self, client: Client):
super().__init__()
self.client = client
def emit(self, record):
try:
# Create log entry
log_entry = {
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'timestamp': record.created
}
# Insert into logs table
self.client.table('logs').insert(log_entry).execute()
except Exception as e:
# Don't let logging errors crash the application
print(f"Log handler error: {e}")
def log_interaction(self, session_id: str, user_message: str, assistant_response: str):
"""Log user interaction"""
try:
interaction = {
'session_id': session_id,
'user_message': user_message,
'assistant_response': assistant_response,
'timestamp': 'now()'
}
self.client.table('interactions').insert(interaction).execute()
except Exception as e:
logger.error(f"Failed to log interaction: {e}")
def create_tables():
"""Create necessary database tables"""
try:
client = get_supabase_client()
# Create logs table
client.rpc('create_logs_table').execute()
# Create interactions table
client.rpc('create_interactions_table').execute()
logger.info("Database tables created successfully")
except Exception as e:
logger.error(f"Failed to create tables: {e}")
def health_check() -> dict:
"""Check database health"""
try:
client = get_supabase_client()
# Test connection
response = client.table('logs').select('count').limit(1).execute()
return {
'status': 'healthy',
'connection': 'ok',
'tables': 'accessible'
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}