""" Analytics Store for tenant-level analytics logging Tracks: - Tool usage (RAG, Web, Admin, LLM) - LLM token counts and latency - RAG recall/precision indicators - Red-flag violations - Per-tenant query volume """ import json import logging import os import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional try: from supabase import Client, create_client SUPABASE_AVAILABLE = True except (ImportError, Exception) as e: # Handle both ImportError and other exceptions (e.g., websockets.asyncio issues) Client = None # type: ignore SUPABASE_AVAILABLE = False # Only log at debug level to avoid noise - this is expected in some deployments import logging logging.getLogger(__name__).debug(f"Supabase import failed: {e}") logger = logging.getLogger(__name__) class AnalyticsStore: """ Analytics logging with Supabase-only backend. - Requires SUPABASE_URL and SUPABASE_SERVICE_KEY to be configured - All data is saved to Supabase (no SQLite fallback) - Raises errors if Supabase is not available """ def __init__( self, db_path: Optional[str] = None, use_supabase: Optional[bool] = None, auto_create_tables: bool = False, ): """ Initialize AnalyticsStore with Supabase-only backend. Args: db_path: Ignored (kept for backward compatibility, not used) use_supabase: Ignored (always uses Supabase if available) auto_create_tables: If True, attempt to create tables if missing Raises: RuntimeError: If Supabase credentials are missing or invalid """ self._tables_verified = False self.supabase_client: Optional[Client] = None # Require Supabase - no fallback supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") if not SUPABASE_AVAILABLE: raise RuntimeError( "Supabase package not installed. Install with: pip install supabase" ) if not supabase_url or not supabase_key: raise RuntimeError( "Supabase credentials required. Set SUPABASE_URL and SUPABASE_SERVICE_KEY." ) self.use_supabase = True # Always True - no fallback self._init_supabase(auto_create_tables) def _init_supabase(self, auto_create_tables: bool): """Initialize Supabase client. Raises error if initialization fails.""" supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: raise RuntimeError( "Supabase credentials are required!\n" "Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file." ) try: self.supabase_client = create_client(supabase_url, supabase_key) self.table_names = { "tool_usage": "tool_usage_events", "redflags": "redflag_violations", "rag_search": "rag_search_events", "agent_query": "agent_query_events", } if auto_create_tables: self._ensure_supabase_tables() else: self._quick_table_check() if not self._tables_verified: logger.warning( "⚠️ Supabase analytics tables not verified. " "Data inserts may fail. Run supabase_analytics_tables.sql in Supabase SQL Editor." ) except Exception as exc: logger.error(f"❌ Failed to initialize Supabase client for analytics: {exc}") raise RuntimeError( f"Failed to initialize Supabase client: {exc}. " "Verify SUPABASE_URL and SUPABASE_SERVICE_KEY are correct." ) from exc def _quick_table_check(self): """Verify that all expected Supabase tables exist.""" if not self.supabase_client: return try: for table in self.table_names.values(): # PostgREST select throws if table missing self.supabase_client.table(table).select("id").limit(1).execute() self._tables_verified = True except Exception: self._tables_verified = False def _ensure_supabase_tables(self): """ Best-effort table check. Actual table creation should be done by running supabase_analytics_tables.sql (mentioned in README + SUPABASE_SETUP). """ self._quick_table_check() if not self._tables_verified: sql_file = ( Path(__file__).resolve().parents[3] / "supabase_analytics_tables.sql" ) print("⚠️ Supabase analytics tables not verified.") if sql_file.exists(): print(f" Run the SQL script: {sql_file}") else: print(" Missing supabase_analytics_tables.sql in repo root.") # ------------------------------------------------------------------ # Logging helpers # ------------------------------------------------------------------ @staticmethod def _now_ts() -> int: return int(time.time()) def _serialize_tools(self, tools_used: Optional[List[str]]) -> Optional[str]: if tools_used: return json.dumps(tools_used) return None def _serialize_metadata(self, metadata: Optional[Dict[str, Any]]): if metadata: return json.dumps(metadata) return None def _supabase_insert(self, table: str, payload: Dict[str, Any]): """Insert data into Supabase table. Raises error if insert fails.""" if not self.supabase_client: raise RuntimeError(f"Supabase client not initialized. Cannot insert into {table}.") # Check if tables are verified (warn if not) if not self._tables_verified: logger.warning( f"Supabase tables not verified. Insert to {table} may fail. " f"Run supabase_analytics_tables.sql in Supabase SQL Editor." ) try: response = self.supabase_client.table(table).insert(payload).execute() logger.debug(f"Successfully inserted into {table}: {len(response.data) if response.data else 1} row(s)") except Exception as exc: error_msg = str(exc) logger.error( f"❌ Supabase insert failed for table '{table}': {error_msg}\n" f" Payload keys: {list(payload.keys())}\n" f" Tenant ID: {payload.get('tenant_id', 'N/A')}\n" f" This may indicate:\n" f" 1. Table '{table}' does not exist in Supabase\n" f" 2. Missing columns or schema mismatch\n" f" 3. RLS (Row Level Security) policy blocking insert\n" f" 4. Invalid Supabase credentials\n" f" Solution: Run supabase_analytics_tables.sql in Supabase SQL Editor" ) # Re-raise - no SQLite fallback raise RuntimeError( f"Failed to insert into Supabase table '{table}': {error_msg}" ) from exc def _supabase_simple_select( self, table: str, tenant_id: str, since_timestamp: Optional[int] = None, limit: Optional[int] = None, order_desc: bool = False, ) -> List[Dict[str, Any]]: if not self.supabase_client: return [] query = ( self.supabase_client.table(table) .select("*") .eq("tenant_id", tenant_id) .order("timestamp", desc=order_desc) ) if since_timestamp is not None: query = query.gte("timestamp", since_timestamp) if limit is not None: query = query.limit(limit) response = query.execute() return response.data or [] def _supabase_fetch_all( self, table: str, tenant_id: str, since_timestamp: Optional[int] = None, ) -> List[Dict[str, Any]]: """Fetch all rows for a tenant (used for aggregations).""" if not self.supabase_client: return [] rows: List[Dict[str, Any]] = [] start = 0 batch_size = 1000 while True: query = ( self.supabase_client.table(table) .select("*") .eq("tenant_id", tenant_id) .order("timestamp", desc=False) .range(start, start + batch_size - 1) ) if since_timestamp is not None: query = query.gte("timestamp", since_timestamp) response = query.execute() batch = response.data or [] rows.extend(batch) if len(batch) < batch_size: break start += batch_size return rows def log_tool_usage( self, tenant_id: str, tool_name: str, latency_ms: Optional[int] = None, tokens_used: Optional[int] = None, success: bool = True, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, user_id: Optional[str] = None ): """Log a tool usage event to Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot log tool usage.") payload = { "tenant_id": tenant_id, "user_id": user_id, "tool_name": tool_name, "timestamp": self._now_ts(), "latency_ms": latency_ms, "tokens_used": tokens_used, "success": success, "error_message": error_message, "metadata": metadata, } self._supabase_insert(self.table_names["tool_usage"], payload) def log_redflag_violation( self, tenant_id: str, rule_id: str, rule_pattern: str, severity: str, matched_text: str, confidence: Optional[float] = None, message_preview: Optional[str] = None, user_id: Optional[str] = None ): """Log a red-flag violation to Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot log redflag violation.") truncated_message = message_preview[:200] if message_preview else None payload = { "tenant_id": tenant_id, "user_id": user_id, "rule_id": rule_id, "rule_pattern": rule_pattern, "severity": severity, "matched_text": matched_text, "confidence": confidence, "message_preview": truncated_message, "timestamp": self._now_ts(), } self._supabase_insert(self.table_names["redflags"], payload) def log_rag_search( self, tenant_id: str, query: str, hits_count: int, avg_score: Optional[float] = None, top_score: Optional[float] = None, latency_ms: Optional[int] = None ): """Log a RAG search event with quality metrics to Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot log RAG search.") trimmed_query = query[:500] payload = { "tenant_id": tenant_id, "query": trimmed_query, "hits_count": hits_count, "avg_score": avg_score, "top_score": top_score, "timestamp": self._now_ts(), "latency_ms": latency_ms, } self._supabase_insert(self.table_names["rag_search"], payload) def log_agent_query( self, tenant_id: str, message_preview: str, intent: Optional[str] = None, tools_used: Optional[List[str]] = None, total_tokens: Optional[int] = None, total_latency_ms: Optional[int] = None, success: bool = True, user_id: Optional[str] = None ): """Log an agent query event (overall query tracking) to Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot log agent query.") truncated_message = message_preview[:200] payload = { "tenant_id": tenant_id, "user_id": user_id, "message_preview": truncated_message, "intent": intent, "tools_used": tools_used, "total_tokens": total_tokens, "total_latency_ms": total_latency_ms, "success": success, "timestamp": self._now_ts(), } self._supabase_insert(self.table_names["agent_query"], payload) def get_tool_usage_stats( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get tool usage statistics for a tenant from Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot get tool usage stats.") rows = self._supabase_fetch_all( self.table_names["tool_usage"], tenant_id, since_timestamp ) # Supabase aggregation (computed in Python) stats: Dict[str, Dict[str, Any]] = {} for row in rows: tool_name = row.get("tool_name") if not tool_name: continue entry = stats.setdefault( tool_name, { "count": 0, "avg_latency_ms": 0.0, "total_tokens": 0, "success_count": 0, "error_count": 0, }, ) entry["count"] += 1 latency = row.get("latency_ms") or 0 entry["avg_latency_ms"] += latency entry["total_tokens"] += row.get("tokens_used") or 0 if row.get("success"): entry["success_count"] += 1 else: entry["error_count"] += 1 for tool_name, entry in stats.items(): if entry["count"]: entry["avg_latency_ms"] = round(entry["avg_latency_ms"] / entry["count"], 2) return stats def get_redflag_violations( self, tenant_id: str, limit: int = 50, since_timestamp: Optional[int] = None ) -> List[Dict[str, Any]]: """Get recent red-flag violations for a tenant from Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot get redflag violations.") return self._supabase_simple_select( self.table_names["redflags"], tenant_id, since_timestamp=since_timestamp, limit=limit, order_desc=True, ) def get_activity_summary( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get activity summary for a tenant from Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot get activity summary.") queries = self._supabase_fetch_all( self.table_names["agent_query"], tenant_id, since_timestamp ) redflags = self._supabase_fetch_all( self.table_names["redflags"], tenant_id, since_timestamp ) total_queries = len(queries) active_users = len({row["user_id"] for row in queries if row.get("user_id")}) last_query_ts = max((row.get("timestamp") or 0) for row in queries) if queries else None redflag_count = len(redflags) return { "total_queries": total_queries, "active_users": active_users, "redflag_count": redflag_count, "last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None, } def get_activity_records( self, tenant_id: str, since_timestamp: Optional[int] = None, limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Get individual activity records (agent queries) with timestamps for heatmap visualization.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot get activity records.") # Fetch agent query events (these represent user queries/activity) queries = self._supabase_simple_select( self.table_names["agent_query"], tenant_id, since_timestamp=since_timestamp, limit=limit, order_desc=False, # Chronological order for heatmap ) # Format records for frontend consumption activities = [] for query in queries: timestamp = query.get("timestamp") if timestamp: # Convert timestamp to ISO format if it's a Unix timestamp if isinstance(timestamp, (int, float)): timestamp_iso = datetime.fromtimestamp(timestamp).isoformat() else: timestamp_iso = str(timestamp) activities.append({ "timestamp": timestamp_iso, "created_at": timestamp_iso, # Alias for compatibility "type": "query", "user_id": query.get("user_id"), "message_preview": query.get("message_preview", "")[:100], }) return activities def get_rag_quality_metrics( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get RAG quality metrics (recall/precision indicators) from Supabase.""" if not self.supabase_client: raise RuntimeError("Supabase client not initialized. Cannot get RAG quality metrics.") rows = self._supabase_fetch_all( self.table_names["rag_search"], tenant_id, since_timestamp ) if not rows: return { "total_searches": 0, "avg_hits_per_search": 0, "avg_score": 0, "avg_top_score": 0, "avg_latency_ms": 0, } total_searches = len(rows) avg_hits = sum(row.get("hits_count") or 0 for row in rows) / total_searches avg_avg_score = sum(row.get("avg_score") or 0 for row in rows) / total_searches avg_top_score = sum(row.get("top_score") or 0 for row in rows) / total_searches avg_latency = sum(row.get("latency_ms") or 0 for row in rows) / total_searches return { "total_searches": total_searches, "avg_hits_per_search": round(avg_hits, 2), "avg_score": round(avg_avg_score, 3), "avg_top_score": round(avg_top_score, 3), "avg_latency_ms": round(avg_latency, 2), }