IntegraChat / backend /api /storage /analytics_store.py
nothingworry's picture
Migrate admin rules and analytics to Supabase
611e2c1
raw
history blame
26.6 kB
"""
Analytics Store for tenant-level analytics logging
Tracks:
- Tool usage (RAG, Web, Admin, LLM)
- LLM token counts and latency
- RAG recall/precision indicators
- Red-flag violations
- Per-tenant query volume
"""
import json
import os
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from supabase import Client, create_client
SUPABASE_AVAILABLE = True
except ImportError:
Client = None # type: ignore
SUPABASE_AVAILABLE = False
class AnalyticsStore:
"""
Analytics logging with dual-backend support.
- Uses Supabase when SUPABASE_URL/SUPABASE_SERVICE_KEY are configured
- Falls back to local SQLite (data/analytics.db) otherwise
"""
def __init__(
self,
db_path: Optional[str] = None,
use_supabase: Optional[bool] = None,
auto_create_tables: bool = False,
):
self.use_supabase = use_supabase
self._tables_verified = False
self.supabase_client: Optional[Client] = None
if self.use_supabase is None:
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
self.use_supabase = bool(
supabase_url and supabase_key and SUPABASE_AVAILABLE
)
if self.use_supabase:
self._init_supabase(auto_create_tables)
else:
self._init_sqlite(db_path)
def _init_sqlite(self, db_path: Optional[str]):
"""Initialize SQLite database path + schema."""
if db_path is None:
root_dir = Path(__file__).resolve().parents[3]
data_dir = root_dir / "data"
data_dir.mkdir(parents=True, exist_ok=True)
self.db_path = data_dir / "analytics.db"
else:
self.db_path = Path(db_path)
self._init_db()
def _init_supabase(self, auto_create_tables: bool):
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
if not supabase_url or not supabase_key:
print("⚠️ Supabase credentials missing. Falling back to SQLite for analytics.")
self.use_supabase = False
self._init_sqlite(None)
return
try:
self.supabase_client = create_client(supabase_url, supabase_key)
self.table_names = {
"tool_usage": "tool_usage_events",
"redflags": "redflag_violations",
"rag_search": "rag_search_events",
"agent_query": "agent_query_events",
}
if auto_create_tables:
self._ensure_supabase_tables()
else:
self._quick_table_check()
except Exception as exc: # pragma: no cover - defensive logging
print(f"⚠️ Failed to initialize Supabase client for analytics: {exc}")
self.use_supabase = False
self._init_sqlite(None)
def _quick_table_check(self):
"""Verify that all expected Supabase tables exist."""
if not self.supabase_client:
return
try:
for table in self.table_names.values():
# PostgREST select throws if table missing
self.supabase_client.table(table).select("id").limit(1).execute()
self._tables_verified = True
except Exception:
self._tables_verified = False
def _ensure_supabase_tables(self):
"""
Best-effort table check. Actual table creation should be done by running
supabase_analytics_tables.sql (mentioned in README + SUPABASE_SETUP).
"""
self._quick_table_check()
if not self._tables_verified:
sql_file = (
Path(__file__).resolve().parents[3] / "supabase_analytics_tables.sql"
)
print("⚠️ Supabase analytics tables not verified.")
if sql_file.exists():
print(f" Run the SQL script: {sql_file}")
else:
print(" Missing supabase_analytics_tables.sql in repo root.")
def _init_db(self):
"""Initialize database tables for analytics."""
with sqlite3.connect(self.db_path) as conn:
# Tool usage events table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tool_usage_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
tool_name TEXT NOT NULL,
timestamp INTEGER NOT NULL,
latency_ms INTEGER,
tokens_used INTEGER,
success BOOLEAN DEFAULT 1,
error_message TEXT,
metadata TEXT
)
"""
)
# Red-flag violations table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS redflag_violations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
rule_id TEXT NOT NULL,
rule_pattern TEXT,
severity TEXT NOT NULL,
matched_text TEXT,
confidence REAL,
message_preview TEXT,
timestamp INTEGER NOT NULL
)
"""
)
# RAG search events with quality metrics
conn.execute(
"""
CREATE TABLE IF NOT EXISTS rag_search_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
query TEXT NOT NULL,
hits_count INTEGER,
avg_score REAL,
top_score REAL,
timestamp INTEGER NOT NULL,
latency_ms INTEGER
)
"""
)
# Agent query events (overall query tracking)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS agent_query_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
user_id TEXT,
message_preview TEXT,
intent TEXT,
tools_used TEXT,
total_tokens INTEGER,
total_latency_ms INTEGER,
success BOOLEAN DEFAULT 1,
timestamp INTEGER NOT NULL
)
"""
)
# Create indexes separately (SQLite doesn't support inline INDEX in CREATE TABLE)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_tool_usage_tenant_timestamp
ON tool_usage_events(tenant_id, timestamp)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_redflag_tenant_timestamp
ON redflag_violations(tenant_id, timestamp)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_rag_search_tenant_timestamp
ON rag_search_events(tenant_id, timestamp)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_agent_query_tenant_timestamp
ON agent_query_events(tenant_id, timestamp)
"""
)
conn.commit()
# ------------------------------------------------------------------
# Logging helpers
# ------------------------------------------------------------------
@staticmethod
def _now_ts() -> int:
return int(time.time())
def _serialize_tools(self, tools_used: Optional[List[str]]) -> Optional[str]:
if tools_used:
return json.dumps(tools_used)
return None
def _serialize_metadata(self, metadata: Optional[Dict[str, Any]]):
if metadata:
return json.dumps(metadata)
return None
def _supabase_insert(self, table: str, payload: Dict[str, Any]):
if not self.supabase_client:
return
try:
self.supabase_client.table(table).insert(payload).execute()
except Exception as exc: # pragma: no cover - logging only
print(f"❌ Supabase insert failed for {table}: {exc}")
def _supabase_simple_select(
self,
table: str,
tenant_id: str,
since_timestamp: Optional[int] = None,
limit: Optional[int] = None,
order_desc: bool = False,
) -> List[Dict[str, Any]]:
if not self.supabase_client:
return []
query = (
self.supabase_client.table(table)
.select("*")
.eq("tenant_id", tenant_id)
.order("timestamp", desc=order_desc)
)
if since_timestamp is not None:
query = query.gte("timestamp", since_timestamp)
if limit is not None:
query = query.limit(limit)
response = query.execute()
return response.data or []
def _supabase_fetch_all(
self,
table: str,
tenant_id: str,
since_timestamp: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Fetch all rows for a tenant (used for aggregations)."""
if not self.supabase_client:
return []
rows: List[Dict[str, Any]] = []
start = 0
batch_size = 1000
while True:
query = (
self.supabase_client.table(table)
.select("*")
.eq("tenant_id", tenant_id)
.order("timestamp", desc=False)
.range(start, start + batch_size - 1)
)
if since_timestamp is not None:
query = query.gte("timestamp", since_timestamp)
response = query.execute()
batch = response.data or []
rows.extend(batch)
if len(batch) < batch_size:
break
start += batch_size
return rows
def log_tool_usage(
self,
tenant_id: str,
tool_name: str,
latency_ms: Optional[int] = None,
tokens_used: Optional[int] = None,
success: bool = True,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None
):
"""Log a tool usage event."""
if self.use_supabase and self.supabase_client:
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"tool_name": tool_name,
"timestamp": self._now_ts(),
"latency_ms": latency_ms,
"tokens_used": tokens_used,
"success": success,
"error_message": error_message,
"metadata": metadata,
}
self._supabase_insert(self.table_names["tool_usage"], payload)
return
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO tool_usage_events
(tenant_id, user_id, tool_name, timestamp, latency_ms, tokens_used, success, error_message, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tenant_id,
user_id,
tool_name,
self._now_ts(),
latency_ms,
tokens_used,
1 if success else 0,
error_message,
self._serialize_metadata(metadata),
),
)
conn.commit()
def log_redflag_violation(
self,
tenant_id: str,
rule_id: str,
rule_pattern: str,
severity: str,
matched_text: str,
confidence: Optional[float] = None,
message_preview: Optional[str] = None,
user_id: Optional[str] = None
):
"""Log a red-flag violation."""
truncated_message = message_preview[:200] if message_preview else None
if self.use_supabase and self.supabase_client:
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"rule_id": rule_id,
"rule_pattern": rule_pattern,
"severity": severity,
"matched_text": matched_text,
"confidence": confidence,
"message_preview": truncated_message,
"timestamp": self._now_ts(),
}
self._supabase_insert(self.table_names["redflags"], payload)
return
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO redflag_violations
(tenant_id, user_id, rule_id, rule_pattern, severity, matched_text, confidence, message_preview, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tenant_id,
user_id,
rule_id,
rule_pattern,
severity,
matched_text,
confidence,
truncated_message,
self._now_ts(),
),
)
conn.commit()
def log_rag_search(
self,
tenant_id: str,
query: str,
hits_count: int,
avg_score: Optional[float] = None,
top_score: Optional[float] = None,
latency_ms: Optional[int] = None
):
"""Log a RAG search event with quality metrics."""
trimmed_query = query[:500]
if self.use_supabase and self.supabase_client:
payload = {
"tenant_id": tenant_id,
"query": trimmed_query,
"hits_count": hits_count,
"avg_score": avg_score,
"top_score": top_score,
"timestamp": self._now_ts(),
"latency_ms": latency_ms,
}
self._supabase_insert(self.table_names["rag_search"], payload)
return
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO rag_search_events
(tenant_id, query, hits_count, avg_score, top_score, timestamp, latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
tenant_id,
trimmed_query,
hits_count,
avg_score,
top_score,
self._now_ts(),
latency_ms,
),
)
conn.commit()
def log_agent_query(
self,
tenant_id: str,
message_preview: str,
intent: Optional[str] = None,
tools_used: Optional[List[str]] = None,
total_tokens: Optional[int] = None,
total_latency_ms: Optional[int] = None,
success: bool = True,
user_id: Optional[str] = None
):
"""Log an agent query event (overall query tracking)."""
truncated_message = message_preview[:200]
serialized_tools = self._serialize_tools(tools_used)
if self.use_supabase and self.supabase_client:
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"message_preview": truncated_message,
"intent": intent,
"tools_used": tools_used,
"total_tokens": total_tokens,
"total_latency_ms": total_latency_ms,
"success": success,
"timestamp": self._now_ts(),
}
self._supabase_insert(self.table_names["agent_query"], payload)
return
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO agent_query_events
(tenant_id, user_id, message_preview, intent, tools_used, total_tokens, total_latency_ms, success, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tenant_id,
user_id,
truncated_message,
intent,
serialized_tools,
total_tokens,
total_latency_ms,
1 if success else 0,
self._now_ts(),
),
)
conn.commit()
def get_tool_usage_stats(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get tool usage statistics for a tenant."""
if self.use_supabase and self.supabase_client:
rows = self._supabase_fetch_all(
self.table_names["tool_usage"], tenant_id, since_timestamp
)
else:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT
tool_name,
COUNT(*) as count,
AVG(latency_ms) as avg_latency_ms,
SUM(tokens_used) as total_tokens,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count
FROM tool_usage_events
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
query += " GROUP BY tool_name"
cursor = conn.execute(query, params)
rows = cursor.fetchall()
stats = {}
for row in rows:
tool_name = row["tool_name"]
stats[tool_name] = {
"count": row["count"],
"avg_latency_ms": round(row["avg_latency_ms"] or 0, 2),
"total_tokens": row["total_tokens"] or 0,
"success_count": row["success_count"],
"error_count": row["count"] - row["success_count"],
}
return stats
# Supabase aggregation (computed in Python)
stats: Dict[str, Dict[str, Any]] = {}
for row in rows:
tool_name = row.get("tool_name")
if not tool_name:
continue
entry = stats.setdefault(
tool_name,
{
"count": 0,
"avg_latency_ms": 0.0,
"total_tokens": 0,
"success_count": 0,
"error_count": 0,
},
)
entry["count"] += 1
latency = row.get("latency_ms") or 0
entry["avg_latency_ms"] += latency
entry["total_tokens"] += row.get("tokens_used") or 0
if row.get("success"):
entry["success_count"] += 1
else:
entry["error_count"] += 1
for tool_name, entry in stats.items():
if entry["count"]:
entry["avg_latency_ms"] = round(entry["avg_latency_ms"] / entry["count"], 2)
return stats
def get_redflag_violations(
self,
tenant_id: str,
limit: int = 50,
since_timestamp: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get recent red-flag violations for a tenant."""
if self.use_supabase and self.supabase_client:
rows = self._supabase_simple_select(
self.table_names["redflags"],
tenant_id,
since_timestamp=since_timestamp,
limit=limit,
order_desc=True,
)
return rows
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT * FROM redflag_violations
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_activity_summary(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get activity summary for a tenant."""
if self.use_supabase and self.supabase_client:
queries = self._supabase_fetch_all(
self.table_names["agent_query"], tenant_id, since_timestamp
)
redflags = self._supabase_fetch_all(
self.table_names["redflags"], tenant_id, since_timestamp
)
total_queries = len(queries)
active_users = len({row["user_id"] for row in queries if row.get("user_id")})
last_query_ts = max((row.get("timestamp") or 0) for row in queries) if queries else None
redflag_count = len(redflags)
return {
"total_queries": total_queries,
"active_users": active_users,
"redflag_count": redflag_count,
"last_query": datetime.fromtimestamp(last_query_ts).isoformat()
if last_query_ts
else None,
}
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# Total queries
query = "SELECT COUNT(*) as total FROM agent_query_events WHERE tenant_id = ?"
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
total_queries = conn.execute(query, params).fetchone()["total"]
# Active users (unique user_ids in the period)
query = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM agent_query_events
WHERE tenant_id = ? AND user_id IS NOT NULL
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
active_users = conn.execute(query, params).fetchone()["active_users"]
# Last query timestamp
query = """
SELECT MAX(timestamp) as last_query
FROM agent_query_events
WHERE tenant_id = ?
"""
last_query_ts = conn.execute(query, [tenant_id]).fetchone()["last_query"]
# Red-flag count
query = "SELECT COUNT(*) as count FROM redflag_violations WHERE tenant_id = ?"
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
redflag_count = conn.execute(query, params).fetchone()["count"]
return {
"total_queries": total_queries,
"active_users": active_users or 0,
"redflag_count": redflag_count,
"last_query": datetime.fromtimestamp(last_query_ts).isoformat()
if last_query_ts
else None,
}
def get_rag_quality_metrics(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get RAG quality metrics (recall/precision indicators)."""
if self.use_supabase and self.supabase_client:
rows = self._supabase_fetch_all(
self.table_names["rag_search"], tenant_id, since_timestamp
)
if not rows:
return {
"total_searches": 0,
"avg_hits_per_search": 0,
"avg_score": 0,
"avg_top_score": 0,
"avg_latency_ms": 0,
}
total_searches = len(rows)
avg_hits = sum(row.get("hits_count") or 0 for row in rows) / total_searches
avg_avg_score = sum(row.get("avg_score") or 0 for row in rows) / total_searches
avg_top_score = sum(row.get("top_score") or 0 for row in rows) / total_searches
avg_latency = sum(row.get("latency_ms") or 0 for row in rows) / total_searches
return {
"total_searches": total_searches,
"avg_hits_per_search": round(avg_hits, 2),
"avg_score": round(avg_avg_score, 3),
"avg_top_score": round(avg_top_score, 3),
"avg_latency_ms": round(avg_latency, 2),
}
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT
COUNT(*) as total_searches,
AVG(hits_count) as avg_hits,
AVG(avg_score) as avg_avg_score,
AVG(top_score) as avg_top_score,
AVG(latency_ms) as avg_latency_ms
FROM rag_search_events
WHERE tenant_id = ?
"""
params = [tenant_id]
if since_timestamp:
query += " AND timestamp >= ?"
params.append(since_timestamp)
row = conn.execute(query, params).fetchone()
return {
"total_searches": row["total_searches"] or 0,
"avg_hits_per_search": round(row["avg_hits"] or 0, 2),
"avg_score": round(row["avg_avg_score"] or 0, 3),
"avg_top_score": round(row["avg_top_score"] or 0, 3),
"avg_latency_ms": round(row["avg_latency_ms"] or 0, 2),
}