""" Supabase client for the Memory System Replaces PostgreSQL with hosted Supabase database """ import os from supabase import create_client, Client from typing import List, Dict, Any, Optional import logging from datetime import datetime import json from dotenv import load_dotenv # Load environment variables load_dotenv() logger = logging.getLogger(__name__) class SupabaseManager: def __init__(self): self.supabase_url = os.getenv('SUPABASE_URL') self.supabase_key = os.getenv('SUPABASE_ANON_KEY') if not self.supabase_url or not self.supabase_key: logger.error("Supabase credentials not found in environment variables") logger.error("Please set SUPABASE_URL and SUPABASE_ANON_KEY") self.client = None return try: self.client: Client = create_client(self.supabase_url, self.supabase_key) logger.info("Supabase client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Supabase client: {e}") self.client = None def is_connected(self) -> bool: """Check if Supabase client is properly initialized""" return self.client is not None async def create_user(self, phone_number: str, name: str = None) -> Optional[Dict]: """Create or get existing user - bypass RLS by disabling user table dependency""" if not self.is_connected(): logger.error("Supabase client not connected") return None try: # Skip user table due to RLS issues - just return mock user # Store phone number directly in memory_nodes.user_id field instead return { 'id': phone_number, # Use phone as ID directly 'phone_number': phone_number, 'name': name or phone_number } except Exception as e: logger.error(f"Error creating/getting user: {e}") return None async def store_memory_node(self, user_phone: str, layer: int, fact_type: str, content: str, concluded_fact: str, confidence: float, evidence: List[Dict], extraction_method: str = 'initial', parent_update_id: str = None) -> Optional[str]: """Store a memory node with evidence and return its ID""" if not self.is_connected(): logger.error("Supabase client not connected") return None try: # Get user info (bypasses RLS) user = await self.create_user(user_phone) if not user: logger.error(f"Failed to create/get user: {user_phone}") return None # Store memory node with existing schema (use user_id field) node_data = { 'user_id': user['id'], # This will be the phone number 'layer': layer, 'fact_type': fact_type, 'content': content, 'concluded_fact': concluded_fact, 'confidence': confidence, 'status': 'pending', 'evidence': evidence, # Store as JSONB 'extraction_method': extraction_method, 'parent_update_id': parent_update_id } result = self.client.table('memory_nodes').insert(node_data).execute() if result.data: logger.info(f"Stored memory node for user {user_phone}: {concluded_fact}") return result.data[0]['id'] else: logger.error(f"Failed to store memory node for user {user_phone}") return None except Exception as e: logger.error(f"Error storing memory node: {e}") return None async def get_all_users(self) -> List[str]: """Get all unique user phone numbers""" if not self.is_connected(): return [] try: result = self.client.table('memory_nodes').select('user_id').execute() # Get unique phone numbers from user_id field phones = list(set([node['user_id'] for node in result.data])) return phones except Exception as e: logger.error(f"Error getting users: {e}") return [] async def get_user_summary(self, user_phone: str) -> Dict: """Get memory statistics for a specific user""" if not self.is_connected(): logger.warning("Supabase not connected, returning empty summary") return {} try: logger.info(f"Getting user summary for: {user_phone}") # Get all memory nodes for this user (using user_id with phone value) result = self.client.table('memory_nodes').select('status, layer').eq('user_id', user_phone).execute() logger.info(f"Found {len(result.data)} memory nodes for {user_phone}") total_nodes = len(result.data) approved_nodes = len([n for n in result.data if n['status'] == 'approved']) pending_nodes = len([n for n in result.data if n['status'] == 'pending']) rejected_nodes = len([n for n in result.data if n['status'] == 'rejected']) # Layer distribution layers = {} for node in result.data: layer_key = f"Layer{node['layer']}" if layer_key not in layers: layers[layer_key] = 0 layers[layer_key] += 1 summary = { 'total_nodes': total_nodes, 'approved_nodes': approved_nodes, 'pending_nodes': pending_nodes, 'rejected_nodes': rejected_nodes, 'layers': layers } logger.info(f"Summary for {user_phone}: {summary}") return summary except Exception as e: logger.error(f"Error getting user summary for {user_phone}: {e}") import traceback traceback.print_exc() return {} except Exception as e: logger.error(f"Error getting user summary: {e}") return {} async def get_user_memory_graph(self, user_phone: str, layer: Optional[int] = None) -> List[Dict]: """Get memory facts for a user, optionally filtered by layer""" if not self.is_connected(): return [] try: # Build query using user_id (which stores the phone number) query = self.client.table('memory_nodes').select('*').eq('user_id', user_phone) if layer: query = query.eq('layer', layer) result = query.order('created_at', desc=True).execute() # Format for frontend memory_facts = [] for node in result.data: memory_facts.append({ 'id': node['id'], 'layer': f"Layer{node['layer']}", 'fact_type': node['fact_type'], 'conclusion': node['concluded_fact'], 'confidence': node['confidence'], 'status': node['status'], 'evidence': node.get('evidence', []), 'created_at': node['created_at'], 'reviewed_at': node.get('reviewed_at'), 'reviewed_by': node.get('reviewed_by') }) return memory_facts except Exception as e: logger.error(f"Error getting user memory: {e}") return [] async def get_pending_updates(self, limit: int = 50, layer: Optional[str] = None) -> List[Dict]: """Get pending memory updates for ops review, optionally filtered by layer""" if not self.is_connected(): return [] try: # Build query with optional layer filter query = self.client.table('memory_nodes').select('*').eq('status', 'pending') # Add layer filter if specified (layer comes as "Layer1", "Layer2", etc.) if layer: layer_num = int(layer.replace('Layer', '')) query = query.eq('layer', layer_num) result = query.order('created_at', desc=True).limit(limit).execute() # Format for ops review interface pending_updates = [] for node in result.data: pending_updates.append({ 'id': node['id'], 'user_id': node['user_id'], # Use user_id field 'layer': f"Layer{node['layer']}", 'fact_type': node['fact_type'], 'conclusion': node['concluded_fact'], 'confidence': node['confidence'], 'evidence': node.get('evidence', []), 'created_at': node['created_at'], 'status': 'pending' }) return pending_updates except Exception as e: logger.error(f"Error getting pending updates: {e}") return [] async def approve_update(self, update_id: str, reviewed_by: str) -> bool: """Approve a pending memory update""" if not self.is_connected(): return False try: result = self.client.table('memory_nodes').update({ 'status': 'approved', 'reviewed_by': reviewed_by, 'reviewed_at': datetime.utcnow().isoformat() }).eq('id', update_id).execute() success = len(result.data) > 0 if success: logger.info(f"Approved update {update_id} by {reviewed_by}") return success except Exception as e: logger.error(f"Error approving update: {e}") return False async def reject_update(self, update_id: str, reviewed_by: str) -> bool: """Reject a pending memory update and mark for reprocessing""" if not self.is_connected(): return False try: result = self.client.table('memory_nodes').update({ 'status': 'rejected', 'needs_reprocess': True, # Flag for reprocessing 'reviewed_by': reviewed_by, 'reviewed_at': datetime.utcnow().isoformat() }).eq('id', update_id).execute() success = len(result.data) > 0 if success: logger.info(f"Rejected update {update_id} by {reviewed_by} - marked for reprocessing") return success except Exception as e: logger.error(f"Error rejecting update: {e}") return False async def get_system_stats(self) -> Dict: """Get overall system statistics""" if not self.is_connected(): return {} try: # Get total users users_result = self.client.table('users').select('id').execute() total_users = len(users_result.data) # Get memory node statistics nodes_result = self.client.table('memory_nodes').select('status, layer').execute() total_facts = len(nodes_result.data) approved_facts = len([n for n in nodes_result.data if n['status'] == 'approved']) pending_facts = len([n for n in nodes_result.data if n['status'] == 'pending']) rejected_facts = len([n for n in nodes_result.data if n['status'] == 'rejected']) # Calculate acceptance rate total_reviewed = approved_facts + rejected_facts acceptance_rate = (approved_facts / total_reviewed * 100) if total_reviewed > 0 else 0 # Layer distribution layer_distribution = {} for node in nodes_result.data: layer_key = f"Layer{node['layer']}" if layer_key not in layer_distribution: layer_distribution[layer_key] = 0 layer_distribution[layer_key] += 1 return { 'total_users': total_users, 'total_facts': total_facts, 'approved_facts': approved_facts, 'pending_facts': pending_facts, 'rejected_facts': rejected_facts, 'acceptance_rate': round(acceptance_rate, 1), 'layer_distribution': layer_distribution } except Exception as e: logger.error(f"Error getting system stats: {e}") return {} async def get_rejected_items_for_reprocessing(self, limit: int = 50) -> List[Dict]: """Get rejected memory nodes that need reprocessing""" if not self.is_connected(): return [] try: result = self.client.table('memory_nodes').select(''' *, users!inner(phone_number) ''').eq('status', 'rejected').eq('needs_reprocess', True).order('created_at', desc=True).limit(limit).execute() # Format for reprocessing rejected_items = [] for node in result.data: rejected_items.append({ 'id': node['id'], 'user_id': node['users']['phone_number'], 'layer': node['layer'], 'fact_type': node['fact_type'], 'content': node['content'], 'concluded_fact': node['concluded_fact'], 'confidence': node['confidence'], 'evidence': node.get('evidence', []), 'created_at': node['created_at'], 'rejected_at': node.get('reviewed_at'), 'parent_update_id': node.get('parent_update_id') }) return rejected_items except Exception as e: logger.error(f"Error getting rejected items: {e}") return [] async def mark_reprocessing_complete(self, update_id: str) -> bool: """Mark a rejected item as reprocessed (no longer needs reprocessing)""" if not self.is_connected(): return False try: result = self.client.table('memory_nodes').update({ 'needs_reprocess': False }).eq('id', update_id).execute() success = len(result.data) > 0 if success: logger.info(f"Marked {update_id} as reprocessed") return success except Exception as e: logger.error(f"Error marking reprocessing complete: {e}") return False async def get_user_contexts_excluding_evidence(self, user_phone: str, excluded_evidence: List[str]) -> List[Dict]: """Get all contexts for a user excluding specific message IDs (for reprocessing)""" if not self.is_connected(): return [] try: # Get user user_result = self.client.table('users').select('id').eq('phone_number', user_phone).execute() if not user_result.data: return [] # Get all memory nodes for this user to extract their evidence result = self.client.table('memory_nodes').select('evidence').eq('user_id', user_result.data[0]['id']).execute() all_contexts = [] for node in result.data: evidence = node.get('evidence', []) for evidence_item in evidence: # Only include contexts that don't contain excluded message IDs message_id = evidence_item.get('message_id') if message_id and message_id not in excluded_evidence: all_contexts.append(evidence_item) return all_contexts except Exception as e: logger.error(f"Error getting user contexts: {e}") return [] def log_extraction_fallback(self, user_phone: str, layer: int, fact_type: str, concluded_fact: str, confidence: float, evidence: List[Dict]): """Fallback logging when Supabase is unavailable""" log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'user_phone': user_phone, 'layer': layer, 'fact_type': fact_type, 'concluded_fact': concluded_fact, 'confidence': confidence, 'evidence': evidence, 'status': 'pending', 'source': 'supabase_fallback' } try: with open('memory_extractions.log', 'a', encoding='utf-8') as f: f.write(json.dumps(log_entry, ensure_ascii=False) + '\n') except Exception as e: logger.error(f"Failed to write fallback log: {e}") async def is_file_processed(self, user_id: str) -> bool: """Check if a JSON file has already been processed""" if not self.is_connected(): return False try: result = self.client.table('processed_files').select('*').eq('user_id', user_id).execute() return len(result.data) > 0 except Exception as e: logger.error(f"Error checking if file processed: {e}") return False async def mark_file_processed(self, user_id: str, total_nodes_extracted: int) -> bool: """Mark a JSON file as processed""" if not self.is_connected(): return False try: data = { 'user_id': user_id, 'processed_at': datetime.now().isoformat(), 'total_nodes_extracted': total_nodes_extracted, 'status': 'completed' } # Use upsert to handle re-processing scenarios result = self.client.table('processed_files').upsert(data).execute() if result.data: logger.info(f"Marked {user_id} as processed with {total_nodes_extracted} nodes") return True return False except Exception as e: logger.error(f"Error marking file as processed: {e}") return False async def mark_file_unprocessed(self, user_id: str) -> bool: """Mark a JSON file as unprocessed (for reprocessing)""" if not self.is_connected(): return False try: result = self.client.table('processed_files').delete().eq('user_id', user_id).execute() logger.info(f"Marked {user_id} as unprocessed for reprocessing") return True except Exception as e: logger.error(f"Error marking file as unprocessed: {e}") return False # Global instance supabase_manager = SupabaseManager()