Spaces:
Sleeping
Sleeping
| """ | |
| Analytics Store for tenant-level analytics logging | |
| Tracks: | |
| - Tool usage (RAG, Web, Admin, LLM) | |
| - LLM token counts and latency | |
| - RAG recall/precision indicators | |
| - Red-flag violations | |
| - Per-tenant query volume | |
| """ | |
| import sqlite3 | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| class AnalyticsStore: | |
| """ | |
| SQLite-backed store for analytics logging. | |
| Provides tenant-level analytics for tool usage, tokens, latency, and violations. | |
| """ | |
| def __init__(self, db_path: Optional[str] = None): | |
| if db_path is None: | |
| root_dir = Path(__file__).resolve().parents[3] | |
| data_dir = root_dir / "data" | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| self.db_path = data_dir / "analytics.db" | |
| else: | |
| self.db_path = Path(db_path) | |
| self._init_db() | |
| def _init_db(self): | |
| """Initialize database tables for analytics.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| # Tool usage events table | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS tool_usage_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tenant_id TEXT NOT NULL, | |
| user_id TEXT, | |
| tool_name TEXT NOT NULL, | |
| timestamp INTEGER NOT NULL, | |
| latency_ms INTEGER, | |
| tokens_used INTEGER, | |
| success BOOLEAN DEFAULT 1, | |
| error_message TEXT, | |
| metadata TEXT | |
| ) | |
| """) | |
| # Red-flag violations table | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS redflag_violations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tenant_id TEXT NOT NULL, | |
| user_id TEXT, | |
| rule_id TEXT NOT NULL, | |
| rule_pattern TEXT, | |
| severity TEXT NOT NULL, | |
| matched_text TEXT, | |
| confidence REAL, | |
| message_preview TEXT, | |
| timestamp INTEGER NOT NULL | |
| ) | |
| """) | |
| # RAG search events with quality metrics | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS rag_search_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tenant_id TEXT NOT NULL, | |
| query TEXT NOT NULL, | |
| hits_count INTEGER, | |
| avg_score REAL, | |
| top_score REAL, | |
| timestamp INTEGER NOT NULL, | |
| latency_ms INTEGER | |
| ) | |
| """) | |
| # Agent query events (overall query tracking) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS agent_query_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tenant_id TEXT NOT NULL, | |
| user_id TEXT, | |
| message_preview TEXT, | |
| intent TEXT, | |
| tools_used TEXT, | |
| total_tokens INTEGER, | |
| total_latency_ms INTEGER, | |
| success BOOLEAN DEFAULT 1, | |
| timestamp INTEGER NOT NULL | |
| ) | |
| """) | |
| # Create indexes separately (SQLite doesn't support inline INDEX in CREATE TABLE) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_tool_usage_tenant_timestamp | |
| ON tool_usage_events(tenant_id, timestamp) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_redflag_tenant_timestamp | |
| ON redflag_violations(tenant_id, timestamp) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_rag_search_tenant_timestamp | |
| ON rag_search_events(tenant_id, timestamp) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_agent_query_tenant_timestamp | |
| ON agent_query_events(tenant_id, timestamp) | |
| """) | |
| conn.commit() | |
| def log_tool_usage( | |
| self, | |
| tenant_id: str, | |
| tool_name: str, | |
| latency_ms: Optional[int] = None, | |
| tokens_used: Optional[int] = None, | |
| success: bool = True, | |
| error_message: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log a tool usage event.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(""" | |
| INSERT INTO tool_usage_events | |
| (tenant_id, user_id, tool_name, timestamp, latency_ms, tokens_used, success, error_message, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| tenant_id, | |
| user_id, | |
| tool_name, | |
| int(time.time()), | |
| latency_ms, | |
| tokens_used, | |
| 1 if success else 0, | |
| error_message, | |
| json.dumps(metadata) if metadata else None | |
| )) | |
| conn.commit() | |
| def log_redflag_violation( | |
| self, | |
| tenant_id: str, | |
| rule_id: str, | |
| rule_pattern: str, | |
| severity: str, | |
| matched_text: str, | |
| confidence: Optional[float] = None, | |
| message_preview: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log a red-flag violation.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(""" | |
| INSERT INTO redflag_violations | |
| (tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview, timestamp) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| tenant_id, | |
| user_id, | |
| rule_id, | |
| rule_pattern, | |
| severity, | |
| matched_text, | |
| confidence, | |
| message_preview[:200] if message_preview else None, | |
| int(time.time()) | |
| )) | |
| conn.commit() | |
| def log_rag_search( | |
| self, | |
| tenant_id: str, | |
| query: str, | |
| hits_count: int, | |
| avg_score: Optional[float] = None, | |
| top_score: Optional[float] = None, | |
| latency_ms: Optional[int] = None | |
| ): | |
| """Log a RAG search event with quality metrics.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(""" | |
| INSERT INTO rag_search_events | |
| (tenant_id, query, hits_count, avg_score, top_score, timestamp, latency_ms) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| tenant_id, | |
| query[:500], # Limit query length | |
| hits_count, | |
| avg_score, | |
| top_score, | |
| int(time.time()), | |
| latency_ms | |
| )) | |
| conn.commit() | |
| def log_agent_query( | |
| self, | |
| tenant_id: str, | |
| message_preview: str, | |
| intent: Optional[str] = None, | |
| tools_used: Optional[List[str]] = None, | |
| total_tokens: Optional[int] = None, | |
| total_latency_ms: Optional[int] = None, | |
| success: bool = True, | |
| user_id: Optional[str] = None | |
| ): | |
| """Log an agent query event (overall query tracking).""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(""" | |
| INSERT INTO agent_query_events | |
| (tenant_id, user_id, message_preview, intent, tools_used, total_tokens, total_latency_ms, success, timestamp) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| tenant_id, | |
| user_id, | |
| message_preview[:200], | |
| intent, | |
| json.dumps(tools_used) if tools_used else None, | |
| total_tokens, | |
| total_latency_ms, | |
| 1 if success else 0, | |
| int(time.time()) | |
| )) | |
| conn.commit() | |
| def get_tool_usage_stats( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get tool usage statistics for a tenant.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| query = """ | |
| SELECT | |
| tool_name, | |
| COUNT(*) as count, | |
| AVG(latency_ms) as avg_latency_ms, | |
| SUM(tokens_used) as total_tokens, | |
| SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count | |
| FROM tool_usage_events | |
| WHERE tenant_id = ? | |
| """ | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| query += " GROUP BY tool_name" | |
| cursor = conn.execute(query, params) | |
| rows = cursor.fetchall() | |
| stats = {} | |
| for row in rows: | |
| tool_name = row["tool_name"] | |
| stats[tool_name] = { | |
| "count": row["count"], | |
| "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2), | |
| "total_tokens": row["total_tokens"] or 0, | |
| "success_count": row["success_count"], | |
| "error_count": row["count"] - row["success_count"] | |
| } | |
| return stats | |
| def get_redflag_violations( | |
| self, | |
| tenant_id: str, | |
| limit: int = 50, | |
| since_timestamp: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Get recent red-flag violations for a tenant.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| query = """ | |
| SELECT * FROM redflag_violations | |
| WHERE tenant_id = ? | |
| """ | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| cursor = conn.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| def get_activity_summary( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get activity summary for a tenant.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| # Total queries | |
| query = "SELECT COUNT(*) as total FROM agent_query_events WHERE tenant_id = ?" | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| total_queries = conn.execute(query, params).fetchone()["total"] | |
| # Active users (unique user_ids in the period) | |
| query = """ | |
| SELECT COUNT(DISTINCT user_id) as active_users | |
| FROM agent_query_events | |
| WHERE tenant_id = ? AND user_id IS NOT NULL | |
| """ | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| active_users = conn.execute(query, params).fetchone()["active_users"] | |
| # Last query timestamp | |
| query = """ | |
| SELECT MAX(timestamp) as last_query | |
| FROM agent_query_events | |
| WHERE tenant_id = ? | |
| """ | |
| last_query_ts = conn.execute(query, [tenant_id]).fetchone()["last_query"] | |
| # Red-flag count | |
| query = "SELECT COUNT(*) as count FROM redflag_violations WHERE tenant_id = ?" | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| redflag_count = conn.execute(query, params).fetchone()["count"] | |
| return { | |
| "total_queries": total_queries, | |
| "active_users": active_users or 0, | |
| "redflag_count": redflag_count, | |
| "last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None | |
| } | |
| def get_rag_quality_metrics( | |
| self, | |
| tenant_id: str, | |
| since_timestamp: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Get RAG quality metrics (recall/precision indicators).""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| query = """ | |
| SELECT | |
| COUNT(*) as total_searches, | |
| AVG(hits_count) as avg_hits, | |
| AVG(avg_score) as avg_avg_score, | |
| AVG(top_score) as avg_top_score, | |
| AVG(latency_ms) as avg_latency_ms | |
| FROM rag_search_events | |
| WHERE tenant_id = ? | |
| """ | |
| params = [tenant_id] | |
| if since_timestamp: | |
| query += " AND timestamp >= ?" | |
| params.append(since_timestamp) | |
| row = conn.execute(query, params).fetchone() | |
| return { | |
| "total_searches": row["total_searches"] or 0, | |
| "avg_hits_per_search": round(row["avg_hits"] or 0, 2), | |
| "avg_score": round(row["avg_avg_score"] or 0, 3), | |
| "avg_top_score": round(row["avg_top_score"] or 0, 3), | |
| "avg_latency_ms": round(row["avg_latency_ms"] or 0, 2) | |
| } | |