Spaces:
Sleeping
Sleeping
| """ | |
| Analytics Store for tenant-level analytics logging | |
| Tracks: | |
| - Tool usage (RAG, Web, Admin, LLM) | |
| - LLM token counts and latency | |
| - RAG recall/precision indicators | |
| - Red-flag violations | |
| - Per-tenant query volume | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| try: | |
| from supabase import Client, create_client | |
| SUPABASE_AVAILABLE = True | |
| except (ImportError, Exception) as e: | |
| # Handle both ImportError and other exceptions (e.g., websockets.asyncio issues) | |
| Client = None # type: ignore | |
| SUPABASE_AVAILABLE = False | |
| # Only log at debug level to avoid noise - this is expected in some deployments | |
| import logging | |
| logging.getLogger(__name__).debug(f"Supabase import failed: {e}") | |
| logger = logging.getLogger(__name__) | |
| class AnalyticsStore: | |
| """ | |
| Analytics logging with Supabase-only backend. | |
| - Requires SUPABASE_URL and SUPABASE_SERVICE_KEY to be configured | |
| - All data is saved to Supabase (no SQLite fallback) | |
| - Raises errors if Supabase is not available | |
| """ | |
| def __init__( | |
| self, | |
| db_path: Optional[str] = None, | |
| use_supabase: Optional[bool] = None, | |
| auto_create_tables: bool = False, | |
| ): | |
| """ | |
| Initialize AnalyticsStore with Supabase-only backend. | |
| Args: | |
| db_path: Ignored (kept for backward compatibility, not used) | |
| use_supabase: Ignored (always uses Supabase if available) | |
| auto_create_tables: If True, attempt to create tables if missing | |
| Raises: | |
| RuntimeError: If Supabase credentials are missing or invalid | |
| """ | |
| self._tables_verified = False | |
| self.supabase_client: Optional[Client] = None | |
| # Require Supabase - no fallback | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_SERVICE_KEY") | |
| if not SUPABASE_AVAILABLE: | |
| raise RuntimeError( | |
| "Supabase package not installed. Install with: pip install supabase" | |
| ) | |
| if not supabase_url or not supabase_key: | |
| raise RuntimeError( | |
| "Supabase credentials required. Set SUPABASE_URL and SUPABASE_SERVICE_KEY." | |
| ) | |
| self.use_supabase = True # Always True - no fallback | |
| self._init_supabase(auto_create_tables) | |
| def _init_supabase(self, auto_create_tables: bool): | |
| """Initialize Supabase client. Raises error if initialization fails.""" | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_SERVICE_KEY") | |
| if not supabase_url or not supabase_key: | |
| raise RuntimeError( | |
| "Supabase credentials are required!\n" | |
| "Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file." | |
| ) | |
| try: | |
| self.supabase_client = create_client(supabase_url, supabase_key) | |
| self.table_names = { | |
| "tool_usage": "tool_usage_events", | |
| "redflags": "redflag_violations", | |
| "rag_search": "rag_search_events", | |
| "agent_query": "agent_query_events", | |
| } | |
| if auto_create_tables: | |
| self._ensure_supabase_tables() | |
| else: | |
| self._quick_table_check() | |
| if not self._tables_verified: | |
| logger.warning( | |
| "⚠️ Supabase analytics tables not verified. " | |
| "Data inserts may fail. Run supabase_analytics_tables.sql in Supabase SQL Editor." | |
| ) | |
| except Exception as exc: | |
| logger.error(f"❌ Failed to initialize Supabase client for analytics: {exc}") | |
| raise RuntimeError( | |
| f"Failed to initialize Supabase client: {exc}. " | |
| "Verify SUPABASE_URL and SUPABASE_SERVICE_KEY are correct." | |
| ) from exc | |
| def _quick_table_check(self): | |
| """Verify that all expected Supabase tables exist.""" | |
| if not self.supabase_client: | |
| return | |
| try: | |
| for table in self.table_names.values(): | |
| # PostgREST select throws if table missing | |
| self.supabase_client.table(table).select("id").limit(1).execute() | |
| self._tables_verified = True | |
| except Exception: | |
| self._tables_verified = False | |
| def _ensure_supabase_tables(self): | |
| """ | |
| Best-effort table check. Actual table creation should be done by running | |
| supabase_analytics_tables.sql (mentioned in README + SUPABASE_SETUP). | |
| """ | |
| self._quick_table_check() | |
| if not self._tables_verified: | |
| sql_file = ( | |
| Path(__file__).resolve().parents[3] / "supabase_analytics_tables.sql" | |
| ) | |
| print("⚠️ Supabase analytics tables not verified.") | |
| if sql_file.exists(): | |
| print(f" Run the SQL script: {sql_file}") | |
| else: | |
| print(" Missing supabase_analytics_tables.sql in repo root.") | |
| # ------------------------------------------------------------------ | |
| # Logging helpers | |
| # ------------------------------------------------------------------ | |
| def _now_ts() -> int: | |
| return int(time.time()) | |
| def _serialize_tools(self, tools_used: Optional[List[str]]) -> Optional[str]: | |
| if tools_used: | |
| return json.dumps(tools_used) | |
| return None | |
| def _serialize_metadata(self, metadata: Optional[Dict[str, Any]]): | |
| if metadata: | |
| return json.dumps(metadata) | |
| return None | |
| def _supabase_insert(self, table: str, payload: Dict[str, Any]): | |
| """Insert data into Supabase table. Raises error if insert fails.""" | |
| if not self.supabase_client: | |
| raise RuntimeError(f"Supabase client not initialized. Cannot insert into {table}.") | |
| # Check if tables are verified (warn if not) | |
| if not self._tables_verified: | |
| logger.warning( | |
| f"Supabase tables not verified. Insert to {table} may fail. " | |
| f"Run supabase_analytics_tables.sql in Supabase SQL Editor." | |
| ) | |
| try: | |
| response = self.supabase_client.table(table).insert(payload).execute() | |
| logger.debug(f"Successfully inserted into {table}: {len(response.data) if response.data else 1} row(s)") | |
| except Exception as exc: | |
| error_msg = str(exc) | |
| logger.error( | |
| f"❌ Supabase insert failed for table '{table}': {error_msg}\n" | |
| f" Payload keys: {list(payload.keys())}\n" | |
| f" Tenant ID: {payload.get('tenant_id', 'N/A')}\n" | |
| f" This may indicate:\n" | |
| f" 1. Table '{table}' does not exist in Supabase\n" | |
| f" 2. Missing columns or schema mismatch\n" | |
| f" 3. RLS (Row Level Security) policy blocking insert\n" | |
| f" 4. Invalid Supabase credentials\n" | |
| f" Solution: Run supabase_analytics_tables.sql in Supabase SQL Editor" | |
| ) | |
| # Re-raise - no SQLite fallback | |
| raise RuntimeError( | |
| f"Failed to insert into Supabase table '{table}': {error_msg}" | |
| ) from exc | |
| def _supabase_simple_select( | |
| self, | |
| table: str, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None, | |
| limit: Optional[int] = None, | |
| order_desc: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| if not self.supabase_client: | |
| return [] | |
| query = ( | |
| self.supabase_client.table(table) | |
| .select("*") | |
| .eq("tenant_id", tenant_id) | |
| .order("timestamp", desc=order_desc) | |
| ) | |
| if since_timestamp is not None: | |
| query = query.gte("timestamp", since_timestamp) | |
| if limit is not None: | |
| query = query.limit(limit) | |
| response = query.execute() | |
| return response.data or [] | |
| def _supabase_fetch_all( | |
| self, | |
| table: str, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None, | |
| ) -> List[Dict[str, Any]]: | |
| """Fetch all rows for a tenant (used for aggregations).""" | |
| if not self.supabase_client: | |
| return [] | |
| rows: List[Dict[str, Any]] = [] | |
| start = 0 | |
| batch_size = 1000 | |
| while True: | |
| query = ( | |
| self.supabase_client.table(table) | |
| .select("*") | |
| .eq("tenant_id", tenant_id) | |
| .order("timestamp", desc=False) | |
| .range(start, start + batch_size - 1) | |
| ) | |
| if since_timestamp is not None: | |
| query = query.gte("timestamp", since_timestamp) | |
| response = query.execute() | |
| batch = response.data or [] | |
| rows.extend(batch) | |
| if len(batch) < batch_size: | |
| break | |
| start += batch_size | |
| return rows | |
| def log_tool_usage( | |
| self, | |
| tenant_id: str, | |
| tool_name: str, | |
| latency_ms: Optional[int] = None, | |
| tokens_used: Optional[int] = None, | |
| success: bool = True, | |
| error_message: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log a tool usage event to Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot log tool usage.") | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "user_id": user_id, | |
| "tool_name": tool_name, | |
| "timestamp": self._now_ts(), | |
| "latency_ms": latency_ms, | |
| "tokens_used": tokens_used, | |
| "success": success, | |
| "error_message": error_message, | |
| "metadata": metadata, | |
| } | |
| self._supabase_insert(self.table_names["tool_usage"], payload) | |
| def log_redflag_violation( | |
| self, | |
| tenant_id: str, | |
| rule_id: str, | |
| rule_pattern: str, | |
| severity: str, | |
| matched_text: str, | |
| confidence: Optional[float] = None, | |
| message_preview: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log a red-flag violation to Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot log redflag violation.") | |
| truncated_message = message_preview[:200] if message_preview else None | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "user_id": user_id, | |
| "rule_id": rule_id, | |
| "rule_pattern": rule_pattern, | |
| "severity": severity, | |
| "matched_text": matched_text, | |
| "confidence": confidence, | |
| "message_preview": truncated_message, | |
| "timestamp": self._now_ts(), | |
| } | |
| self._supabase_insert(self.table_names["redflags"], payload) | |
| def log_rag_search( | |
| self, | |
| tenant_id: str, | |
| query: str, | |
| hits_count: int, | |
| avg_score: Optional[float] = None, | |
| top_score: Optional[float] = None, | |
| latency_ms: Optional[int] = None | |
| ): | |
| """Log a RAG search event with quality metrics to Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot log RAG search.") | |
| trimmed_query = query[:500] | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "query": trimmed_query, | |
| "hits_count": hits_count, | |
| "avg_score": avg_score, | |
| "top_score": top_score, | |
| "timestamp": self._now_ts(), | |
| "latency_ms": latency_ms, | |
| } | |
| self._supabase_insert(self.table_names["rag_search"], payload) | |
| def log_agent_query( | |
| self, | |
| tenant_id: str, | |
| message_preview: str, | |
| intent: Optional[str] = None, | |
| tools_used: Optional[List[str]] = None, | |
| total_tokens: Optional[int] = None, | |
| total_latency_ms: Optional[int] = None, | |
| success: bool = True, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log an agent query event (overall query tracking) to Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot log agent query.") | |
| truncated_message = message_preview[:200] | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "user_id": user_id, | |
| "message_preview": truncated_message, | |
| "intent": intent, | |
| "tools_used": tools_used, | |
| "total_tokens": total_tokens, | |
| "total_latency_ms": total_latency_ms, | |
| "success": success, | |
| "timestamp": self._now_ts(), | |
| } | |
| self._supabase_insert(self.table_names["agent_query"], payload) | |
| def get_tool_usage_stats( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get tool usage statistics for a tenant from Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot get tool usage stats.") | |
| rows = self._supabase_fetch_all( | |
| self.table_names["tool_usage"], tenant_id, since_timestamp | |
| ) | |
| # Supabase aggregation (computed in Python) | |
| stats: Dict[str, Dict[str, Any]] = {} | |
| for row in rows: | |
| tool_name = row.get("tool_name") | |
| if not tool_name: | |
| continue | |
| entry = stats.setdefault( | |
| tool_name, | |
| { | |
| "count": 0, | |
| "avg_latency_ms": 0.0, | |
| "total_tokens": 0, | |
| "success_count": 0, | |
| "error_count": 0, | |
| }, | |
| ) | |
| entry["count"] += 1 | |
| latency = row.get("latency_ms") or 0 | |
| entry["avg_latency_ms"] += latency | |
| entry["total_tokens"] += row.get("tokens_used") or 0 | |
| if row.get("success"): | |
| entry["success_count"] += 1 | |
| else: | |
| entry["error_count"] += 1 | |
| for tool_name, entry in stats.items(): | |
| if entry["count"]: | |
| entry["avg_latency_ms"] = round(entry["avg_latency_ms"] / entry["count"], 2) | |
| return stats | |
| def get_redflag_violations( | |
| self, | |
| tenant_id: str, | |
| limit: int = 50, | |
| since_timestamp: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Get recent red-flag violations for a tenant from Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot get redflag violations.") | |
| return self._supabase_simple_select( | |
| self.table_names["redflags"], | |
| tenant_id, | |
| since_timestamp=since_timestamp, | |
| limit=limit, | |
| order_desc=True, | |
| ) | |
| def get_activity_summary( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get activity summary for a tenant from Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot get activity summary.") | |
| queries = self._supabase_fetch_all( | |
| self.table_names["agent_query"], tenant_id, since_timestamp | |
| ) | |
| redflags = self._supabase_fetch_all( | |
| self.table_names["redflags"], tenant_id, since_timestamp | |
| ) | |
| total_queries = len(queries) | |
| active_users = len({row["user_id"] for row in queries if row.get("user_id")}) | |
| last_query_ts = max((row.get("timestamp") or 0) for row in queries) if queries else None | |
| redflag_count = len(redflags) | |
| return { | |
| "total_queries": total_queries, | |
| "active_users": active_users, | |
| "redflag_count": redflag_count, | |
| "last_query": datetime.fromtimestamp(last_query_ts).isoformat() | |
| if last_query_ts | |
| else None, | |
| } | |
| def get_activity_records( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None, | |
| limit: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Get individual activity records (agent queries) with timestamps for heatmap visualization.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot get activity records.") | |
| # Fetch agent query events (these represent user queries/activity) | |
| queries = self._supabase_simple_select( | |
| self.table_names["agent_query"], | |
| tenant_id, | |
| since_timestamp=since_timestamp, | |
| limit=limit, | |
| order_desc=False, # Chronological order for heatmap | |
| ) | |
| # Format records for frontend consumption | |
| activities = [] | |
| for query in queries: | |
| timestamp = query.get("timestamp") | |
| if timestamp: | |
| # Convert timestamp to ISO format if it's a Unix timestamp | |
| if isinstance(timestamp, (int, float)): | |
| timestamp_iso = datetime.fromtimestamp(timestamp).isoformat() | |
| else: | |
| timestamp_iso = str(timestamp) | |
| activities.append({ | |
| "timestamp": timestamp_iso, | |
| "created_at": timestamp_iso, # Alias for compatibility | |
| "type": "query", | |
| "user_id": query.get("user_id"), | |
| "message_preview": query.get("message_preview", "")[:100], | |
| }) | |
| return activities | |
| def get_rag_quality_metrics( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get RAG quality metrics (recall/precision indicators) from Supabase.""" | |
| if not self.supabase_client: | |
| raise RuntimeError("Supabase client not initialized. Cannot get RAG quality metrics.") | |
| rows = self._supabase_fetch_all( | |
| self.table_names["rag_search"], tenant_id, since_timestamp | |
| ) | |
| if not rows: | |
| return { | |
| "total_searches": 0, | |
| "avg_hits_per_search": 0, | |
| "avg_score": 0, | |
| "avg_top_score": 0, | |
| "avg_latency_ms": 0, | |
| } | |
| total_searches = len(rows) | |
| avg_hits = sum(row.get("hits_count") or 0 for row in rows) / total_searches | |
| avg_avg_score = sum(row.get("avg_score") or 0 for row in rows) / total_searches | |
| avg_top_score = sum(row.get("top_score") or 0 for row in rows) / total_searches | |
| avg_latency = sum(row.get("latency_ms") or 0 for row in rows) / total_searches | |
| return { | |
| "total_searches": total_searches, | |
| "avg_hits_per_search": round(avg_hits, 2), | |
| "avg_score": round(avg_avg_score, 3), | |
| "avg_top_score": round(avg_top_score, 3), | |
| "avg_latency_ms": round(avg_latency, 2), | |
| } | |