""" Analytics Store for tenant-level analytics logging Tracks: - Tool usage (RAG, Web, Admin, LLM) - LLM token counts and latency - RAG recall/precision indicators - Red-flag violations - Per-tenant query volume """ import sqlite3 import json import time from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime class AnalyticsStore: """ SQLite-backed store for analytics logging. Provides tenant-level analytics for tool usage, tokens, latency, and violations. """ def __init__(self, db_path: Optional[str] = None): if db_path is None: root_dir = Path(__file__).resolve().parents[3] data_dir = root_dir / "data" data_dir.mkdir(parents=True, exist_ok=True) self.db_path = data_dir / "analytics.db" else: self.db_path = Path(db_path) self._init_db() def _init_db(self): """Initialize database tables for analytics.""" with sqlite3.connect(self.db_path) as conn: # Tool usage events table conn.execute(""" CREATE TABLE IF NOT EXISTS tool_usage_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, tool_name TEXT NOT NULL, timestamp INTEGER NOT NULL, latency_ms INTEGER, tokens_used INTEGER, success BOOLEAN DEFAULT 1, error_message TEXT, metadata TEXT ) """) # Red-flag violations table conn.execute(""" CREATE TABLE IF NOT EXISTS redflag_violations ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, rule_id TEXT NOT NULL, rule_pattern TEXT, severity TEXT NOT NULL, matched_text TEXT, confidence REAL, message_preview TEXT, timestamp INTEGER NOT NULL ) """) # RAG search events with quality metrics conn.execute(""" CREATE TABLE IF NOT EXISTS rag_search_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, query TEXT NOT NULL, hits_count INTEGER, avg_score REAL, top_score REAL, timestamp INTEGER NOT NULL, latency_ms INTEGER ) """) # Agent query events (overall query tracking) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_query_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, user_id TEXT, message_preview TEXT, intent TEXT, tools_used TEXT, total_tokens INTEGER, total_latency_ms INTEGER, success BOOLEAN DEFAULT 1, timestamp INTEGER NOT NULL ) """) # Create indexes separately (SQLite doesn't support inline INDEX in CREATE TABLE) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_tool_usage_tenant_timestamp ON tool_usage_events(tenant_id, timestamp) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_redflag_tenant_timestamp ON redflag_violations(tenant_id, timestamp) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_rag_search_tenant_timestamp ON rag_search_events(tenant_id, timestamp) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_agent_query_tenant_timestamp ON agent_query_events(tenant_id, timestamp) """) conn.commit() def log_tool_usage( self, tenant_id: str, tool_name: str, latency_ms: Optional[int] = None, tokens_used: Optional[int] = None, success: bool = True, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, user_id: Optional[str] = None ): """Log a tool usage event.""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO tool_usage_events (tenant_id, user_id, tool_name, timestamp, latency_ms, tokens_used, success, error_message, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, tool_name, int(time.time()), latency_ms, tokens_used, 1 if success else 0, error_message, json.dumps(metadata) if metadata else None )) conn.commit() def log_redflag_violation( self, tenant_id: str, rule_id: str, rule_pattern: str, severity: str, matched_text: str, confidence: Optional[float] = None, message_preview: Optional[str] = None, user_id: Optional[str] = None ): """Log a red-flag violation.""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO redflag_violations (tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview[:200] if message_preview else None, int(time.time()) )) conn.commit() def log_rag_search( self, tenant_id: str, query: str, hits_count: int, avg_score: Optional[float] = None, top_score: Optional[float] = None, latency_ms: Optional[int] = None ): """Log a RAG search event with quality metrics.""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO rag_search_events (tenant_id, query, hits_count, avg_score, top_score, timestamp, latency_ms) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, query[:500], # Limit query length hits_count, avg_score, top_score, int(time.time()), latency_ms )) conn.commit() def log_agent_query( self, tenant_id: str, message_preview: str, intent: Optional[str] = None, tools_used: Optional[List[str]] = None, total_tokens: Optional[int] = None, total_latency_ms: Optional[int] = None, success: bool = True, user_id: Optional[str] = None ): """Log an agent query event (overall query tracking).""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO agent_query_events (tenant_id, user_id, message_preview, intent, tools_used, total_tokens, total_latency_ms, success, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tenant_id, user_id, message_preview[:200], intent, json.dumps(tools_used) if tools_used else None, total_tokens, total_latency_ms, 1 if success else 0, int(time.time()) )) conn.commit() def get_tool_usage_stats( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get tool usage statistics for a tenant.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT tool_name, COUNT(*) as count, AVG(latency_ms) as avg_latency_ms, SUM(tokens_used) as total_tokens, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count FROM tool_usage_events WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) query += " GROUP BY tool_name" cursor = conn.execute(query, params) rows = cursor.fetchall() stats = {} for row in rows: tool_name = row["tool_name"] stats[tool_name] = { "count": row["count"], "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2), "total_tokens": row["total_tokens"] or 0, "success_count": row["success_count"], "error_count": row["count"] - row["success_count"] } return stats def get_redflag_violations( self, tenant_id: str, limit: int = 50, since_timestamp: Optional[int] = None ) -> List[Dict[str, Any]]: """Get recent red-flag violations for a tenant.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT * FROM redflag_violations WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) cursor = conn.execute(query, params) rows = cursor.fetchall() return [dict(row) for row in rows] def get_activity_summary( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get activity summary for a tenant.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row # Total queries query = "SELECT COUNT(*) as total FROM agent_query_events WHERE tenant_id = ?" params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) total_queries = conn.execute(query, params).fetchone()["total"] # Active users (unique user_ids in the period) query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM agent_query_events WHERE tenant_id = ? AND user_id IS NOT NULL """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) active_users = conn.execute(query, params).fetchone()["active_users"] # Last query timestamp query = """ SELECT MAX(timestamp) as last_query FROM agent_query_events WHERE tenant_id = ? """ last_query_ts = conn.execute(query, [tenant_id]).fetchone()["last_query"] # Red-flag count query = "SELECT COUNT(*) as count FROM redflag_violations WHERE tenant_id = ?" params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) redflag_count = conn.execute(query, params).fetchone()["count"] return { "total_queries": total_queries, "active_users": active_users or 0, "redflag_count": redflag_count, "last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None } def get_rag_quality_metrics( self, tenant_id: str, since_timestamp: Optional[int] = None ) -> Dict[str, Any]: """Get RAG quality metrics (recall/precision indicators).""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = """ SELECT COUNT(*) as total_searches, AVG(hits_count) as avg_hits, AVG(avg_score) as avg_avg_score, AVG(top_score) as avg_top_score, AVG(latency_ms) as avg_latency_ms FROM rag_search_events WHERE tenant_id = ? """ params = [tenant_id] if since_timestamp: query += " AND timestamp >= ?" params.append(since_timestamp) row = conn.execute(query, params).fetchone() return { "total_searches": row["total_searches"] or 0, "avg_hits_per_search": round(row["avg_hits"] or 0, 2), "avg_score": round(row["avg_avg_score"] or 0, 3), "avg_top_score": round(row["avg_top_score"] or 0, 3), "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2) }