""" Analytics Store for tenant-level analytics logging Tracks: - Tool usage (RAG, Web, Admin, LLM) - LLM token counts and latency - RAG recall/precision indicators - Red-flag violations - Per-tenant query volume """ import json import os import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional try: from supabase import Client, create_client SUPABASE_AVAILABLE = True except ImportError: Client = None # type: ignore SUPABASE_AVAILABLE = False class AnalyticsStore: """ Analytics logging with dual-backend support. - Uses Supabase when SUPABASE_URL/SUPABASE_SERVICE_KEY are configured - Falls back to local SQLite (data/analytics.db) otherwise """ def __init__( self, db_path: Optional[str] = None, use_supabase: Optional[bool] = None, auto_create_tables: bool = False, ): self.use_supabase = use_supabase self._tables_verified = False self.supabase_client: Optional[Client] = None if self.use_supabase is None: supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") self.use_supabase = bool( supabase_url and supabase_key and SUPABASE_AVAILABLE ) if self.use_supabase: self._init_supabase(auto_create_tables) else: self._init_sqlite(db_path) def _init_sqlite(self, db_path: Optional[str]): """Initialize SQLite database path + schema.""" if db_path is None: root_dir = Path(__file__).resolve().parents[3] data_dir = root_dir / "data" data_dir.mkdir(parents=True, exist_ok=True) self.db_path = data_dir / "analytics.db" else: self.db_path = Path(db_path) self._init_db() def _init_supabase(self, auto_create_tables: bool): supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: print("⚠️ Supabase credentials missing. Falling back to SQLite for analytics.") self.use_supabase = False self._init_sqlite(None) return try: self.supabase_client = create_client(supabase_url, supabase_key) self.table_names = { "tool_usage": "tool_usage_events", "redflags": "redflag_violations", "rag_search": "rag_search_events", "agent_query": "agent_query_events", } if auto_create_tables: self._ensure_supabase_tables() else: self._quick_table_check() except Exception as exc: # pragma: no cover - defensive logging print(f"⚠️ Failed to initialize Supabase client for analytics: {exc}") self.use_supabase = False self._init_sqlite(None) def _quick_table_check(self): """Verify that all expected Supabase tables exist.""" if not self.supabase_client: return try: for table in self.table_names.values(): # PostgREST select throws if table missing self.supabase_client.table(table).select("id").limit(1).execute() self._tables_verified = True except Exception: self._tables_verified = False def _ensure_supabase_tables(self): """ Best-effort table check. Actual table creation should be done by running supabase_analytics_tables.sql (mentioned in README + SUPABASE_SETUP). """ self._quick_table_check() if not self._tables_verified: sql_file = ( Path(__file__).resolve().parents[3] / "supabase_analytics_tables.sql" ) print("⚠️ Supabase analytics tables not verified.") if sql_file.exists(): print(f" Run the SQL script: {sql_file}") else: print(" Missing supabase_analytics_tables.sql in repo root.") def _init_db(self): """Initialize database tables for analytics.""" with sqlite3.connect(self.db_path) as conn: # Tool usage events table conn.execute( """ CREATE TABLE IF NOT EXISTS tool_usage_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, tool_name TEXT NOT NULL, timestamp INTEGER NOT NULL, latency_ms INTEGER, tokens_used INTEGER, success BOOLEAN DEFAULT 1, error_message TEXT, metadata TEXT ) """ ) # Red-flag violations table conn.execute( """ CREATE TABLE IF NOT EXISTS redflag_violations ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, rule_id TEXT NOT NULL, rule_pattern TEXT, severity TEXT NOT NULL, matched_text TEXT, confidence REAL, message_preview TEXT, timestamp INTEGER NOT NULL ) """ ) # RAG search events with quality metrics conn.execute( """ CREATE TABLE IF NOT EXISTS rag_search_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, query TEXT NOT NULL, hits_count INTEGER, avg_score REAL, top_score REAL, timestamp INTEGER NOT NULL, latency_ms INTEGER ) """ ) # Agent query events (overall query tracking) conn.execute( """ CREATE TABLE IF NOT EXISTS agent_query_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, message_preview TEXT, intent TEXT, tools_used TEXT, total_tokens INTEGER, total_latency_ms INTEGER, success BOOLEAN DEFAULT 1, timestamp INTEGER NOT NULL ) """ ) # Create indexes separately (SQLite doesn't support inline INDEX in CREATE TABLE) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_tool_usage_tenant_timestamp ON tool_usage_events(tenant_id, timestamp) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_redflag_tenant_timestamp ON redflag_violations(tenant_id, timestamp) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_rag_search_tenant_timestamp ON rag_search_events(tenant_id, timestamp) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_agent_query_tenant_timestamp ON agent_query_events(tenant_id, timestamp) """ ) conn.commit() # ------------------------------------------------------------------ # Logging helpers # ------------------------------------------------------------------ @staticmethod def _now_ts() -> int: return int(time.time()) def _serialize_tools(self, tools_used: Optional[List[str]]) -> Optional[str]: if tools_used: return json.dumps(tools_used) return None def _serialize_metadata(self, metadata: Optional[Dict[str, Any]]): if metadata: return json.dumps(metadata) return None def _supabase_insert(self, table: str, payload: Dict[str, Any]): if not self.supabase_client: return try: self.supabase_client.table(table).insert(payload).execute() except Exception as exc: # pragma: no cover - logging only print(f"❌ Supabase insert failed for {table}: {exc}") def _supabase_simple_select( self, table: str, tenant_id: str, since_timestamp: Optional[int] = None, limit: Optional[int] = None, order_desc: bool = False, ) -> List[Dict[str, Any]]: if not self.supabase_client: return [] query = ( self.supabase_client.table(table) .select("*") .eq("tenant_id", tenant_id) .order("timestamp", desc=order_desc) ) if since_timestamp is not None: query = query.gte("timestamp", since_timestamp) if limit is not None: query = query.limit(limit) response = query.execute() return response.data or [] def _supabase_fetch_all( self, table: str, tenant_id: str, since_timestamp: Optional[int] = None, ) -> List[Dict[str, Any]]: """Fetch all rows for a tenant (used for aggregations).""" if not self.supabase_client: return [] rows: List[Dict[str, Any]] = [] start = 0 batch_size = 1000 while True: query = ( self.supabase_client.table(table) .select("*") .eq("tenant_id", tenant_id) .order("timestamp", desc=False) .range(start, start + batch_size - 1) ) if since_timestamp is not None: query = query.gte("timestamp", since_timestamp) response = query.execute() batch = response.data or [] rows.extend(batch) if len(batch) < batch_size: break start += batch_size return rows def log_tool_usage( self, tenant_id: str, tool_name: str, latency_ms: Optional[int] = None, tokens_used: Optional[int] = None, success: bool = True, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, user_id: Optional[str] = None ): """Log a tool usage event.""" if self.use_supabase and self.supabase_client: payload = { "tenant_id": tenant_id, "user_id": user_id, "tool_name": tool_name, "timestamp": self._now_ts(), "latency_ms": latency_ms, "tokens_used": tokens_used, "success": success, "error_message": error_message, "metadata": metadata, } self._supabase_insert(self.table_names["tool_usage"], payload) return with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO tool_usage_events (tenant_id, user_id, tool_name, timestamp, latency_ms, tokens_used, success, error_message, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, tool_name, self._now_ts(), latency_ms, tokens_used, 1 if success else 0, error_message, self._serialize_metadata(metadata), ), ) conn.commit() def log_redflag_violation( self, tenant_id: str, rule_id: str, rule_pattern: str, severity: str, matched_text: str, confidence: Optional[float] = None, message_preview: Optional[str] = None, user_id: Optional[str] = None ): """Log a red-flag violation.""" truncated_message = message_preview[:200] if message_preview else None if self.use_supabase and self.supabase_client: payload = { "tenant_id": tenant_id, "user_id": user_id, "rule_id": rule_id, "rule_pattern": rule_pattern, "severity": severity, "matched_text": matched_text, "confidence": confidence, "message_preview": truncated_message, "timestamp": self._now_ts(), } self._supabase_insert(self.table_names["redflags"], payload) return with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO redflag_violations (tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, truncated_message, self._now_ts(), ), ) conn.commit() def log_rag_search( self, tenant_id: str, query: str, hits_count: int, avg_score: Optional[float] = None, top_score: Optional[float] = None, latency_ms: Optional[int] = None ): """Log a RAG search event with quality metrics.""" trimmed_query = query[:500] if self.use_supabase and self.supabase_client: payload = { "tenant_id": tenant_id, "query": trimmed_query, "hits_count": hits_count, "avg_score": avg_score, "top_score": top_score, "timestamp": self._now_ts(), "latency_ms": latency_ms, } self._supabase_insert(self.table_names["rag_search"], payload) return with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO rag_search_events (tenant_id, query, hits_count, avg_score, top_score, timestamp, latency_ms) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, trimmed_query, hits_count, avg_score, top_score, self._now_ts(), latency_ms, ), ) conn.commit() def log_agent_query( self, tenant_id: str, message_preview: str, intent: Optional[str] = None, tools_used: Optional[List[str]] = None, total_tokens: Optional[int] = None, total_latency_ms: Optional[int] = None, success: bool = True, user_id: Optional[str] = None ): """Log an agent query event (overall query tracking).""" truncated_message = message_preview[:200] serialized_tools = self._serialize_tools(tools_used) if self.use_supabase and self.supabase_client: payload = { "tenant_id": tenant_id, "user_id": user_id, "message_preview": truncated_message, "intent": intent, "tools_used": tools_used, "total_tokens": total_tokens, "total_latency_ms": total_latency_ms, "success": success, "timestamp": self._now_ts(), } self._supabase_insert(self.table_names["agent_query"], payload) return with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO agent_query_events (tenant_id, user_id, message_preview, intent, tools_used, total_tokens, total_latency_ms, success, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, truncated_message, intent, serialized_tools, total_tokens, total_latency_ms, 1 if success else 0, self._now_ts(), ), ) conn.commit() def get_tool_usage_stats( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get tool usage statistics for a tenant.""" if self.use_supabase and self.supabase_client: rows = self._supabase_fetch_all( self.table_names["tool_usage"], tenant_id, since_timestamp ) else: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT tool_name, COUNT(*) as count, AVG(latency_ms) as avg_latency_ms, SUM(tokens_used) as total_tokens, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count FROM tool_usage_events WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) query += " GROUP BY tool_name" cursor = conn.execute(query, params) rows = cursor.fetchall() stats = {} for row in rows: tool_name = row["tool_name"] stats[tool_name] = { "count": row["count"], "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2), "total_tokens": row["total_tokens"] or 0, "success_count": row["success_count"], "error_count": row["count"] - row["success_count"], } return stats # Supabase aggregation (computed in Python) stats: Dict[str, Dict[str, Any]] = {} for row in rows: tool_name = row.get("tool_name") if not tool_name: continue entry = stats.setdefault( tool_name, { "count": 0, "avg_latency_ms": 0.0, "total_tokens": 0, "success_count": 0, "error_count": 0, }, ) entry["count"] += 1 latency = row.get("latency_ms") or 0 entry["avg_latency_ms"] += latency entry["total_tokens"] += row.get("tokens_used") or 0 if row.get("success"): entry["success_count"] += 1 else: entry["error_count"] += 1 for tool_name, entry in stats.items(): if entry["count"]: entry["avg_latency_ms"] = round(entry["avg_latency_ms"] / entry["count"], 2) return stats def get_redflag_violations( self, tenant_id: str, limit: int = 50, since_timestamp: Optional[int] = None ) -> List[Dict[str, Any]]: """Get recent red-flag violations for a tenant.""" if self.use_supabase and self.supabase_client: rows = self._supabase_simple_select( self.table_names["redflags"], tenant_id, since_timestamp=since_timestamp, limit=limit, order_desc=True, ) return rows with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT * FROM redflag_violations WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) cursor = conn.execute(query, params) rows = cursor.fetchall() return [dict(row) for row in rows] def get_activity_summary( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get activity summary for a tenant.""" if self.use_supabase and self.supabase_client: queries = self._supabase_fetch_all( self.table_names["agent_query"], tenant_id, since_timestamp ) redflags = self._supabase_fetch_all( self.table_names["redflags"], tenant_id, since_timestamp ) total_queries = len(queries) active_users = len({row["user_id"] for row in queries if row.get("user_id")}) last_query_ts = max((row.get("timestamp") or 0) for row in queries) if queries else None redflag_count = len(redflags) return { "total_queries": total_queries, "active_users": active_users, "redflag_count": redflag_count, "last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None, } with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row # Total queries query = "SELECT COUNT(*) as total FROM agent_query_events WHERE tenant_id = ?" params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) total_queries = conn.execute(query, params).fetchone()["total"] # Active users (unique user_ids in the period) query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM agent_query_events WHERE tenant_id = ? AND user_id IS NOT NULL """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) active_users = conn.execute(query, params).fetchone()["active_users"] # Last query timestamp query = """ SELECT MAX(timestamp) as last_query FROM agent_query_events WHERE tenant_id = ? """ last_query_ts = conn.execute(query, [tenant_id]).fetchone()["last_query"] # Red-flag count query = "SELECT COUNT(*) as count FROM redflag_violations WHERE tenant_id = ?" params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) redflag_count = conn.execute(query, params).fetchone()["count"] return { "total_queries": total_queries, "active_users": active_users or 0, "redflag_count": redflag_count, "last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None, } def get_rag_quality_metrics( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get RAG quality metrics (recall/precision indicators).""" if self.use_supabase and self.supabase_client: rows = self._supabase_fetch_all( self.table_names["rag_search"], tenant_id, since_timestamp ) if not rows: return { "total_searches": 0, "avg_hits_per_search": 0, "avg_score": 0, "avg_top_score": 0, "avg_latency_ms": 0, } total_searches = len(rows) avg_hits = sum(row.get("hits_count") or 0 for row in rows) / total_searches avg_avg_score = sum(row.get("avg_score") or 0 for row in rows) / total_searches avg_top_score = sum(row.get("top_score") or 0 for row in rows) / total_searches avg_latency = sum(row.get("latency_ms") or 0 for row in rows) / total_searches return { "total_searches": total_searches, "avg_hits_per_search": round(avg_hits, 2), "avg_score": round(avg_avg_score, 3), "avg_top_score": round(avg_top_score, 3), "avg_latency_ms": round(avg_latency, 2), } with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT COUNT(*) as total_searches, AVG(hits_count) as avg_hits, AVG(avg_score) as avg_avg_score, AVG(top_score) as avg_top_score, AVG(latency_ms) as avg_latency_ms FROM rag_search_events WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) row = conn.execute(query, params).fetchone() return { "total_searches": row["total_searches"] or 0, "avg_hits_per_search": round(row["avg_hits"] or 0, 2), "avg_score": round(row["avg_avg_score"] or 0, 3), "avg_top_score": round(row["avg_top_score"] or 0, 3), "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2), }