nothingworry's picture
feat: add RBAC enforcement for MCP tools and API endpoints
b65ef75
raw
history blame
5.08 kB
from fastapi import APIRouter, Header, HTTPException, Query
from typing import Optional
from datetime import datetime, timedelta
from ..storage.analytics_store import AnalyticsStore
from ..utils.access_control import require_api_permission
router = APIRouter()
# Initialize analytics store
analytics_store = AnalyticsStore()
@router.get("/overview")
async def analytics_overview(
x_tenant_id: str = Header(None),
days: int = Query(30, description="Number of days to look back"),
x_user_role: str = Header("viewer")
):
"""
Returns an overview of analytics for the dashboard.
Includes total queries, tool usage, red-flag count, and active users.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "view_analytics")
since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None
tool_usage = analytics_store.get_tool_usage_stats(x_tenant_id, since_timestamp)
activity = analytics_store.get_activity_summary(x_tenant_id, since_timestamp)
rag_quality = analytics_store.get_rag_quality_metrics(x_tenant_id, since_timestamp)
return {
"tenant_id": x_tenant_id,
"overview": {
"total_queries": activity["total_queries"],
"tool_usage": tool_usage,
"redflag_count": activity["redflag_count"],
"active_users": activity["active_users"],
"last_query": activity["last_query"],
"rag_quality": rag_quality
}
}
@router.get("/tool-usage")
async def analytics_tool_usage(
x_tenant_id: str = Header(None),
days: int = Query(30, description="Number of days to look back"),
x_user_role: str = Header("viewer")
):
"""
Returns how often each tool (RAG, Web, Admin, LLM) was used with detailed stats.
Includes counts, latency, tokens, and success/error rates.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "view_analytics")
since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None
tool_usage = analytics_store.get_tool_usage_stats(x_tenant_id, since_timestamp)
return {
"tenant_id": x_tenant_id,
"tool_usage": tool_usage,
"period_days": days
}
@router.get("/redflags")
async def analytics_redflags(
x_tenant_id: str = Header(None),
limit: int = Query(50, description="Maximum number of violations to return"),
days: int = Query(30, description="Number of days to look back"),
x_user_role: str = Header("viewer")
):
"""
Returns red-flag violations for this tenant.
Includes rule details, severity, confidence, and timestamps.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "view_analytics")
since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None
redflags = analytics_store.get_redflag_violations(x_tenant_id, limit, since_timestamp)
# Convert timestamps to ISO format
for violation in redflags:
if "timestamp" in violation:
violation["timestamp_iso"] = datetime.fromtimestamp(violation["timestamp"]).isoformat()
return {
"tenant_id": x_tenant_id,
"redflags": redflags,
"count": len(redflags)
}
@router.get("/activity")
async def analytics_activity(
x_tenant_id: str = Header(None),
days: int = Query(30, description="Number of days to look back"),
x_user_role: str = Header("viewer")
):
"""
Returns general tenant activity statistics.
Includes total queries, active users, and last query timestamp.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "view_analytics")
since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None
activity = analytics_store.get_activity_summary(x_tenant_id, since_timestamp)
return {
"tenant_id": x_tenant_id,
"activity": activity,
"period_days": days
}
@router.get("/rag-quality")
async def analytics_rag_quality(
x_tenant_id: str = Header(None),
days: int = Query(30, description="Number of days to look back"),
x_user_role: str = Header("viewer")
):
"""
Returns RAG quality metrics including recall/precision indicators.
Includes average hits, scores, and latency.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "view_analytics")
since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None
rag_quality = analytics_store.get_rag_quality_metrics(x_tenant_id, since_timestamp)
return {
"tenant_id": x_tenant_id,
"rag_quality": rag_quality,
"period_days": days
}