IntegraChat / backend /api /storage /analytics_store.py
nothingworry's picture
working Tenant ID
c509b44
raw
history blame
14.3 kB
"""
Analytics Store for tenant-level analytics logging
Tracks:
- Tool usage (RAG, Web, Admin, LLM)
- LLM token counts and latency
- RAG recall/precision indicators
- Red-flag violations
- Per-tenant query volume
"""
import sqlite3
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
class AnalyticsStore:
"""
SQLite-backed store for analytics logging.
Provides tenant-level analytics for tool usage, tokens, latency, and violations.
"""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
root_dir = Path(__file__).resolve().parents[3]
data_dir = root_dir / "data"
data_dir.mkdir(parents=True, exist_ok=True)
self.db_path = data_dir / "analytics.db"
else:
self.db_path = Path(db_path)
self._init_db()
def _init_db(self):
"""Initialize database tables for analytics."""
with sqlite3.connect(self.db_path) as conn:
# Tool usage events table
conn.execute("""
CREATE TABLE IF NOT EXISTS tool_usage_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
tool_name TEXT NOT NULL,
timestamp INTEGER NOT NULL,
latency_ms INTEGER,
tokens_used INTEGER,
success BOOLEAN DEFAULT 1,
error_message TEXT,
metadata TEXT
)
""")
# Red-flag violations table
conn.execute("""
CREATE TABLE IF NOT EXISTS redflag_violations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
rule_id TEXT NOT NULL,
rule_pattern TEXT,
severity TEXT NOT NULL,
matched_text TEXT,
confidence REAL,
message_preview TEXT,
timestamp INTEGER NOT NULL
)
""")
# RAG search events with quality metrics
conn.execute("""
CREATE TABLE IF NOT EXISTS rag_search_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
query TEXT NOT NULL,
hits_count INTEGER,
avg_score REAL,
top_score REAL,
timestamp INTEGER NOT NULL,
latency_ms INTEGER
)
""")
# Agent query events (overall query tracking)
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_query_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
message_preview TEXT,
intent TEXT,
tools_used TEXT,
total_tokens INTEGER,
total_latency_ms INTEGER,
success BOOLEAN DEFAULT 1,
timestamp INTEGER NOT NULL
)
""")
# Create indexes separately (SQLite doesn't support inline INDEX in CREATE TABLE)
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_usage_tenant_timestamp
ON tool_usage_events(tenant_id, timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_redflag_tenant_timestamp
ON redflag_violations(tenant_id, timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_rag_search_tenant_timestamp
ON rag_search_events(tenant_id, timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_query_tenant_timestamp
ON agent_query_events(tenant_id, timestamp)
""")
conn.commit()
def log_tool_usage(
self,
tenant_id: str,
tool_name: str,
latency_ms: Optional[int] = None,
tokens_used: Optional[int] = None,
success: bool = True,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None
):
"""Log a tool usage event."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO tool_usage_events
(tenant_id, user_id, tool_name, timestamp, latency_ms, tokens_used, success, error_message, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
tenant_id,
user_id,
tool_name,
int(time.time()),
latency_ms,
tokens_used,
1 if success else 0,
error_message,
json.dumps(metadata) if metadata else None
))
conn.commit()
def log_redflag_violation(
self,
tenant_id: str,
rule_id: str,
rule_pattern: str,
severity: str,
matched_text: str,
confidence: Optional[float] = None,
message_preview: Optional[str] = None,
user_id: Optional[str] = None
):
"""Log a red-flag violation."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO redflag_violations
(tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
tenant_id,
user_id,
rule_id,
rule_pattern,
severity,
matched_text,
confidence,
message_preview[:200] if message_preview else None,
int(time.time())
))
conn.commit()
def log_rag_search(
self,
tenant_id: str,
query: str,
hits_count: int,
avg_score: Optional[float] = None,
top_score: Optional[float] = None,
latency_ms: Optional[int] = None
):
"""Log a RAG search event with quality metrics."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO rag_search_events
(tenant_id, query, hits_count, avg_score, top_score, timestamp, latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
tenant_id,
query[:500], # Limit query length
hits_count,
avg_score,
top_score,
int(time.time()),
latency_ms
))
conn.commit()
def log_agent_query(
self,
tenant_id: str,
message_preview: str,
intent: Optional[str] = None,
tools_used: Optional[List[str]] = None,
total_tokens: Optional[int] = None,
total_latency_ms: Optional[int] = None,
success: bool = True,
user_id: Optional[str] = None
):
"""Log an agent query event (overall query tracking)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO agent_query_events
(tenant_id, user_id, message_preview, intent, tools_used, total_tokens, total_latency_ms, success, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
tenant_id,
user_id,
message_preview[:200],
intent,
json.dumps(tools_used) if tools_used else None,
total_tokens,
total_latency_ms,
1 if success else 0,
int(time.time())
))
conn.commit()
def get_tool_usage_stats(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get tool usage statistics for a tenant."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT
tool_name,
COUNT(*) as count,
AVG(latency_ms) as avg_latency_ms,
SUM(tokens_used) as total_tokens,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count
FROM tool_usage_events
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
query += " GROUP BY tool_name"
cursor = conn.execute(query, params)
rows = cursor.fetchall()
stats = {}
for row in rows:
tool_name = row["tool_name"]
stats[tool_name] = {
"count": row["count"],
"avg_latency_ms": round(row["avg_latency_ms"] or 0, 2),
"total_tokens": row["total_tokens"] or 0,
"success_count": row["success_count"],
"error_count": row["count"] - row["success_count"]
}
return stats
def get_redflag_violations(
self,
tenant_id: str,
limit: int = 50,
since_timestamp: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get recent red-flag violations for a tenant."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT * FROM redflag_violations
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_activity_summary(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get activity summary for a tenant."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# Total queries
query = "SELECT COUNT(*) as total FROM agent_query_events WHERE tenant_id = ?"
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
total_queries = conn.execute(query, params).fetchone()["total"]
# Active users (unique user_ids in the period)
query = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM agent_query_events
WHERE tenant_id = ? AND user_id IS NOT NULL
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
active_users = conn.execute(query, params).fetchone()["active_users"]
# Last query timestamp
query = """
SELECT MAX(timestamp) as last_query
FROM agent_query_events
WHERE tenant_id = ?
"""
last_query_ts = conn.execute(query, [tenant_id]).fetchone()["last_query"]
# Red-flag count
query = "SELECT COUNT(*) as count FROM redflag_violations WHERE tenant_id = ?"
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
redflag_count = conn.execute(query, params).fetchone()["count"]
return {
"total_queries": total_queries,
"active_users": active_users or 0,
"redflag_count": redflag_count,
"last_query": datetime.fromtimestamp(last_query_ts).isoformat() if last_query_ts else None
}
def get_rag_quality_metrics(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get RAG quality metrics (recall/precision indicators)."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT
COUNT(*) as total_searches,
AVG(hits_count) as avg_hits,
AVG(avg_score) as avg_avg_score,
AVG(top_score) as avg_top_score,
AVG(latency_ms) as avg_latency_ms
FROM rag_search_events
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
row = conn.execute(query, params).fetchone()
return {
"total_searches": row["total_searches"] or 0,
"avg_hits_per_search": round(row["avg_hits"] or 0, 2),
"avg_score": round(row["avg_avg_score"] or 0, 3),
"avg_top_score": round(row["avg_top_score"] or 0, 3),
"avg_latency_ms": round(row["avg_latency_ms"] or 0, 2)
}