Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Header, HTTPException, Query | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime, timedelta | |
| from backend.api.storage.rules_store import RulesStore | |
| from backend.api.storage.analytics_store import AnalyticsStore | |
| router = APIRouter() | |
| rules_store = RulesStore() | |
| analytics_store = AnalyticsStore() | |
| class RulePayload(BaseModel): | |
| rule: str | |
| pattern: Optional[str] = None # Regex pattern | |
| severity: Optional[str] = "medium" # low, medium, high, critical | |
| description: Optional[str] = None | |
| enabled: Optional[bool] = True | |
| class BulkRulePayload(BaseModel): | |
| rules: List[str] | |
| def get_rules_for_tenant(tenant_id: str) -> List[str]: | |
| return rules_store.get_rules(tenant_id) | |
| async def get_redflag_rules( | |
| x_tenant_id: str = Header(None), | |
| detailed: bool = Query(False, description="Return full rule metadata including pattern and severity") | |
| ): | |
| """ | |
| Returns all red-flag rules for this tenant. | |
| Set detailed=true to get full metadata including regex patterns and severity levels. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| if detailed: | |
| rules = rules_store.get_rules_detailed(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "rules": rules, | |
| "count": len(rules) | |
| } | |
| else: | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "rules": rules, | |
| "count": len(rules) | |
| } | |
| async def add_redflag_rule( | |
| payload: Optional[RulePayload] = None, | |
| rule: Optional[str] = None, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Adds a new red-flag rule to this tenant with optional regex pattern and severity. | |
| Accepts either JSON body or query parameter ?rule=... | |
| JSON body supports: rule, pattern (regex), severity (low/medium/high/critical), description, enabled | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| rule_value = payload.rule if payload else rule | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Missing rule text") | |
| rule_value = rule_value.strip() | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Rule cannot be empty") | |
| # Extract optional parameters if payload provided | |
| pattern = payload.pattern if payload else None | |
| severity = payload.severity if payload else "medium" | |
| description = payload.description if payload else None | |
| enabled = payload.enabled if payload else True | |
| # Validate severity | |
| if severity not in ["low", "medium", "high", "critical"]: | |
| severity = "medium" | |
| rules_store.add_rule( | |
| x_tenant_id, | |
| rule_value, | |
| pattern=pattern, | |
| severity=severity, | |
| description=description, | |
| enabled=enabled | |
| ) | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "added_rule": rule_value, | |
| "pattern": pattern or rule_value, | |
| "severity": severity, | |
| "description": description or rule_value, | |
| "rules": rules | |
| } | |
| async def add_redflag_rules_bulk( | |
| payload: BulkRulePayload, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Adds multiple rules in one call. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| if not payload.rules: | |
| raise HTTPException(status_code=400, detail="No rules provided") | |
| cleaned = [rule.strip() for rule in payload.rules if rule.strip()] | |
| added = rules_store.add_rules_bulk(x_tenant_id, cleaned) | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "added_rules": added, | |
| "rules": rules | |
| } | |
| async def delete_redflag_rule( | |
| rule: str, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Deletes a red-flag rule for this tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| deleted = rules_store.delete_rule(x_tenant_id, rule) | |
| if not deleted: | |
| raise HTTPException(status_code=404, detail="Rule not found") | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "deleted_rule": rule, | |
| "rules": rules | |
| } | |
| async def get_violations( | |
| x_tenant_id: str = Header(None), | |
| limit: int = Query(50, description="Maximum number of violations to return"), | |
| days: int = Query(30, description="Number of days to look back") | |
| ): | |
| """ | |
| Returns red-flag violations for this tenant. | |
| Includes rule details, severity, confidence, and timestamps. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None | |
| violations = analytics_store.get_redflag_violations(x_tenant_id, limit, since_timestamp) | |
| # Convert timestamps to ISO format | |
| for violation in violations: | |
| if "timestamp" in violation: | |
| violation["timestamp_iso"] = datetime.fromtimestamp(violation["timestamp"]).isoformat() | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "violations": violations, | |
| "count": len(violations), | |
| "period_days": days | |
| } | |
| async def get_tool_logs( | |
| x_tenant_id: str = Header(None), | |
| tool_name: Optional[str] = Query(None, description="Filter by tool name (rag, web, admin, llm)"), | |
| days: int = Query(7, description="Number of days to look back"), | |
| limit: int = Query(100, description="Maximum number of logs to return") | |
| ): | |
| """ | |
| Returns detailed tool usage logs for this tenant. | |
| Includes every tool call with timestamp, latency, tokens, and success/error status. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| # For now, return aggregated stats. Full log querying would require extending AnalyticsStore | |
| since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None | |
| tool_stats = analytics_store.get_tool_usage_stats(x_tenant_id, since_timestamp) | |
| # Filter by tool if specified | |
| if tool_name: | |
| tool_stats = {tool_name: tool_stats.get(tool_name)} if tool_name in tool_stats else {} | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "tool_usage": tool_stats, | |
| "period_days": days | |
| } | |
| async def list_tenants(): | |
| """ | |
| Lists all tenants (placeholder - would need tenant management table). | |
| For demo purposes, returns info about available tenant data. | |
| """ | |
| # Placeholder implementation - in production, this would query a tenants table | |
| return { | |
| "tenants": [], | |
| "message": "Tenant management not fully implemented. Use tenant_id in headers for multi-tenant operations." | |
| } | |
| async def create_tenant( | |
| tenant_id: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ): | |
| """ | |
| Creates a new tenant (placeholder - would need tenant management table). | |
| """ | |
| # Placeholder implementation | |
| return { | |
| "tenant_id": tenant_id, | |
| "status": "created", | |
| "message": "Tenant management not fully implemented. Tenant IDs are created on first use." | |
| } | |
| async def delete_tenant(tenant_id: str): | |
| """ | |
| Deletes a tenant and all associated data (placeholder). | |
| WARNING: This would delete all rules, analytics, and documents for the tenant. | |
| """ | |
| # Placeholder implementation | |
| return { | |
| "tenant_id": tenant_id, | |
| "status": "deleted", | |
| "message": "Tenant deletion not fully implemented. This would delete all tenant data." | |
| } | |