IntegraChat / backend /api /storage /analytics_store.py
nothingworry's picture
feat: Add LLM rule explanations, real-time visualizations, and fix analytics permissions
adf80ee
raw
history blame
19.5 kB
"""
Analytics Store for tenant-level analytics logging
Tracks:
- Tool usage (RAG, Web, Admin, LLM)
- LLM token counts and latency
- RAG recall/precision indicators
- Red-flag violations
- Per-tenant query volume
"""
import json
import logging
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from supabase import Client, create_client
SUPABASE_AVAILABLE = True
except ImportError:
Client = None # type: ignore
SUPABASE_AVAILABLE = False
logger = logging.getLogger(__name__)
class AnalyticsStore:
"""
Analytics logging with Supabase-only backend.
- Requires SUPABASE_URL and SUPABASE_SERVICE_KEY to be configured
- All data is saved to Supabase (no SQLite fallback)
- Raises errors if Supabase is not available
"""
def __init__(
self,
db_path: Optional[str] = None,
use_supabase: Optional[bool] = None,
auto_create_tables: bool = False,
):
"""
Initialize AnalyticsStore with Supabase-only backend.
Args:
db_path: Ignored (kept for backward compatibility, not used)
use_supabase: Ignored (always uses Supabase if available)
auto_create_tables: If True, attempt to create tables if missing
Raises:
RuntimeError: If Supabase credentials are missing or invalid
"""
self._tables_verified = False
self.supabase_client: Optional[Client] = None
# Require Supabase - no fallback
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
if not SUPABASE_AVAILABLE:
raise RuntimeError(
"Supabase package not installed. Install with: pip install supabase\n"
"AnalyticsStore requires Supabase - SQLite fallback has been removed."
)
if not supabase_url or not supabase_key:
raise RuntimeError(
"Supabase credentials are required!\n"
"Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file.\n"
"AnalyticsStore requires Supabase - SQLite fallback has been removed."
)
self.use_supabase = True # Always True - no fallback
self._init_supabase(auto_create_tables)
def _init_supabase(self, auto_create_tables: bool):
"""Initialize Supabase client. Raises error if initialization fails."""
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
if not supabase_url or not supabase_key:
raise RuntimeError(
"Supabase credentials are required!\n"
"Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file."
)
try:
self.supabase_client = create_client(supabase_url, supabase_key)
self.table_names = {
"tool_usage": "tool_usage_events",
"redflags": "redflag_violations",
"rag_search": "rag_search_events",
"agent_query": "agent_query_events",
}
if auto_create_tables:
self._ensure_supabase_tables()
else:
self._quick_table_check()
if not self._tables_verified:
logger.warning(
"⚠️ Supabase analytics tables not verified. "
"Data inserts may fail. Run supabase_analytics_tables.sql in Supabase SQL Editor."
)
except Exception as exc:
logger.error(f"❌ Failed to initialize Supabase client for analytics: {exc}")
raise RuntimeError(
f"Failed to initialize Supabase client: {exc}\n"
"Make sure SUPABASE_URL and SUPABASE_SERVICE_KEY are correct.\n"
"AnalyticsStore requires Supabase - SQLite fallback has been removed."
) from exc
def _quick_table_check(self):
"""Verify that all expected Supabase tables exist."""
if not self.supabase_client:
return
try:
for table in self.table_names.values():
# PostgREST select throws if table missing
self.supabase_client.table(table).select("id").limit(1).execute()
self._tables_verified = True
except Exception:
self._tables_verified = False
def _ensure_supabase_tables(self):
"""
Best-effort table check. Actual table creation should be done by running
supabase_analytics_tables.sql (mentioned in README + SUPABASE_SETUP).
"""
self._quick_table_check()
if not self._tables_verified:
sql_file = (
Path(__file__).resolve().parents[3] / "supabase_analytics_tables.sql"
)
print("⚠️ Supabase analytics tables not verified.")
if sql_file.exists():
print(f" Run the SQL script: {sql_file}")
else:
print(" Missing supabase_analytics_tables.sql in repo root.")
# ------------------------------------------------------------------
# Logging helpers
# ------------------------------------------------------------------
@staticmethod
def _now_ts() -> int:
return int(time.time())
def _serialize_tools(self, tools_used: Optional[List[str]]) -> Optional[str]:
if tools_used:
return json.dumps(tools_used)
return None
def _serialize_metadata(self, metadata: Optional[Dict[str, Any]]):
if metadata:
return json.dumps(metadata)
return None
def _supabase_insert(self, table: str, payload: Dict[str, Any]):
"""Insert data into Supabase table. Raises error if insert fails."""
if not self.supabase_client:
raise RuntimeError(f"Supabase client not initialized. Cannot insert into {table}.")
# Check if tables are verified (warn if not)
if not self._tables_verified:
logger.warning(
f"Supabase tables not verified. Insert to {table} may fail. "
f"Run supabase_analytics_tables.sql in Supabase SQL Editor."
)
try:
response = self.supabase_client.table(table).insert(payload).execute()
logger.debug(f"Successfully inserted into {table}: {len(response.data) if response.data else 1} row(s)")
except Exception as exc:
error_msg = str(exc)
logger.error(
f"❌ Supabase insert failed for table '{table}': {error_msg}\n"
f" Payload keys: {list(payload.keys())}\n"
f" Tenant ID: {payload.get('tenant_id', 'N/A')}\n"
f" This may indicate:\n"
f" 1. Table '{table}' does not exist in Supabase\n"
f" 2. Missing columns or schema mismatch\n"
f" 3. RLS (Row Level Security) policy blocking insert\n"
f" 4. Invalid Supabase credentials\n"
f" Solution: Run supabase_analytics_tables.sql in Supabase SQL Editor"
)
# Re-raise - no SQLite fallback
raise RuntimeError(
f"Failed to insert into Supabase table '{table}': {error_msg}\n"
"AnalyticsStore requires Supabase - SQLite fallback has been removed."
) from exc
def _supabase_simple_select(
self,
table: str,
tenant_id: str,
since_timestamp: Optional[int] = None,
limit: Optional[int] = None,
order_desc: bool = False,
) -> List[Dict[str, Any]]:
if not self.supabase_client:
return []
query = (
self.supabase_client.table(table)
.select("*")
.eq("tenant_id", tenant_id)
.order("timestamp", desc=order_desc)
)
if since_timestamp is not None:
query = query.gte("timestamp", since_timestamp)
if limit is not None:
query = query.limit(limit)
response = query.execute()
return response.data or []
def _supabase_fetch_all(
self,
table: str,
tenant_id: str,
since_timestamp: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Fetch all rows for a tenant (used for aggregations)."""
if not self.supabase_client:
return []
rows: List[Dict[str, Any]] = []
start = 0
batch_size = 1000
while True:
query = (
self.supabase_client.table(table)
.select("*")
.eq("tenant_id", tenant_id)
.order("timestamp", desc=False)
.range(start, start + batch_size - 1)
)
if since_timestamp is not None:
query = query.gte("timestamp", since_timestamp)
response = query.execute()
batch = response.data or []
rows.extend(batch)
if len(batch) < batch_size:
break
start += batch_size
return rows
def log_tool_usage(
self,
tenant_id: str,
tool_name: str,
latency_ms: Optional[int] = None,
tokens_used: Optional[int] = None,
success: bool = True,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None
):
"""Log a tool usage event to Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot log tool usage.")
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"tool_name": tool_name,
"timestamp": self._now_ts(),
"latency_ms": latency_ms,
"tokens_used": tokens_used,
"success": success,
"error_message": error_message,
"metadata": metadata,
}
self._supabase_insert(self.table_names["tool_usage"], payload)
def log_redflag_violation(
self,
tenant_id: str,
rule_id: str,
rule_pattern: str,
severity: str,
matched_text: str,
confidence: Optional[float] = None,
message_preview: Optional[str] = None,
user_id: Optional[str] = None
):
"""Log a red-flag violation to Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot log redflag violation.")
truncated_message = message_preview[:200] if message_preview else None
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"rule_id": rule_id,
"rule_pattern": rule_pattern,
"severity": severity,
"matched_text": matched_text,
"confidence": confidence,
"message_preview": truncated_message,
"timestamp": self._now_ts(),
}
self._supabase_insert(self.table_names["redflags"], payload)
def log_rag_search(
self,
tenant_id: str,
query: str,
hits_count: int,
avg_score: Optional[float] = None,
top_score: Optional[float] = None,
latency_ms: Optional[int] = None
):
"""Log a RAG search event with quality metrics to Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot log RAG search.")
trimmed_query = query[:500]
payload = {
"tenant_id": tenant_id,
"query": trimmed_query,
"hits_count": hits_count,
"avg_score": avg_score,
"top_score": top_score,
"timestamp": self._now_ts(),
"latency_ms": latency_ms,
}
self._supabase_insert(self.table_names["rag_search"], payload)
def log_agent_query(
self,
tenant_id: str,
message_preview: str,
intent: Optional[str] = None,
tools_used: Optional[List[str]] = None,
total_tokens: Optional[int] = None,
total_latency_ms: Optional[int] = None,
success: bool = True,
user_id: Optional[str] = None
):
"""Log an agent query event (overall query tracking) to Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot log agent query.")
truncated_message = message_preview[:200]
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"message_preview": truncated_message,
"intent": intent,
"tools_used": tools_used,
"total_tokens": total_tokens,
"total_latency_ms": total_latency_ms,
"success": success,
"timestamp": self._now_ts(),
}
self._supabase_insert(self.table_names["agent_query"], payload)
def get_tool_usage_stats(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get tool usage statistics for a tenant from Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot get tool usage stats.")
rows = self._supabase_fetch_all(
self.table_names["tool_usage"], tenant_id, since_timestamp
)
# Supabase aggregation (computed in Python)
stats: Dict[str, Dict[str, Any]] = {}
for row in rows:
tool_name = row.get("tool_name")
if not tool_name:
continue
entry = stats.setdefault(
tool_name,
{
"count": 0,
"avg_latency_ms": 0.0,
"total_tokens": 0,
"success_count": 0,
"error_count": 0,
},
)
entry["count"] += 1
latency = row.get("latency_ms") or 0
entry["avg_latency_ms"] += latency
entry["total_tokens"] += row.get("tokens_used") or 0
if row.get("success"):
entry["success_count"] += 1
else:
entry["error_count"] += 1
for tool_name, entry in stats.items():
if entry["count"]:
entry["avg_latency_ms"] = round(entry["avg_latency_ms"] / entry["count"], 2)
return stats
def get_redflag_violations(
self,
tenant_id: str,
limit: int = 50,
since_timestamp: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get recent red-flag violations for a tenant from Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot get redflag violations.")
return self._supabase_simple_select(
self.table_names["redflags"],
tenant_id,
since_timestamp=since_timestamp,
limit=limit,
order_desc=True,
)
def get_activity_summary(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get activity summary for a tenant from Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot get activity summary.")
queries = self._supabase_fetch_all(
self.table_names["agent_query"], tenant_id, since_timestamp
)
redflags = self._supabase_fetch_all(
self.table_names["redflags"], tenant_id, since_timestamp
)
total_queries = len(queries)
active_users = len({row["user_id"] for row in queries if row.get("user_id")})
last_query_ts = max((row.get("timestamp") or 0) for row in queries) if queries else None
redflag_count = len(redflags)
return {
"total_queries": total_queries,
"active_users": active_users,
"redflag_count": redflag_count,
"last_query": datetime.fromtimestamp(last_query_ts).isoformat()
if last_query_ts
else None,
}
def get_activity_records(
self,
tenant_id: str,
since_timestamp: Optional[int] = None,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get individual activity records (agent queries) with timestamps for heatmap visualization."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot get activity records.")
# Fetch agent query events (these represent user queries/activity)
queries = self._supabase_simple_select(
self.table_names["agent_query"],
tenant_id,
since_timestamp=since_timestamp,
limit=limit,
order_desc=False, # Chronological order for heatmap
)
# Format records for frontend consumption
activities = []
for query in queries:
timestamp = query.get("timestamp")
if timestamp:
# Convert timestamp to ISO format if it's a Unix timestamp
if isinstance(timestamp, (int, float)):
timestamp_iso = datetime.fromtimestamp(timestamp).isoformat()
else:
timestamp_iso = str(timestamp)
activities.append({
"timestamp": timestamp_iso,
"created_at": timestamp_iso, # Alias for compatibility
"type": "query",
"user_id": query.get("user_id"),
"message_preview": query.get("message_preview", "")[:100],
})
return activities
def get_rag_quality_metrics(
self,
tenant_id: str,
since_timestamp: Optional[int] = None
) -> Dict[str, Any]:
"""Get RAG quality metrics (recall/precision indicators) from Supabase."""
if not self.supabase_client:
raise RuntimeError("Supabase client not initialized. Cannot get RAG quality metrics.")
rows = self._supabase_fetch_all(
self.table_names["rag_search"], tenant_id, since_timestamp
)
if not rows:
return {
"total_searches": 0,
"avg_hits_per_search": 0,
"avg_score": 0,
"avg_top_score": 0,
"avg_latency_ms": 0,
}
total_searches = len(rows)
avg_hits = sum(row.get("hits_count") or 0 for row in rows) / total_searches
avg_avg_score = sum(row.get("avg_score") or 0 for row in rows) / total_searches
avg_top_score = sum(row.get("top_score") or 0 for row in rows) / total_searches
avg_latency = sum(row.get("latency_ms") or 0 for row in rows) / total_searches
return {
"total_searches": total_searches,
"avg_hits_per_search": round(avg_hits, 2),
"avg_score": round(avg_avg_score, 3),
"avg_top_score": round(avg_top_score, 3),
"avg_latency_ms": round(avg_latency, 2),
}