Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from fastapi import APIRouter, Header, HTTPException, Query, UploadFile, File | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime, timedelta | |
| from backend.api.storage.rules_store import RulesStore | |
| from backend.api.storage.analytics_store import AnalyticsStore | |
| from backend.api.services.rule_enhancer import RuleEnhancer | |
| from backend.api.services.document_ingestion import extract_text_from_file_bytes | |
| from ..utils.access_control import require_api_permission | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Initialize stores (table creation disabled by default to avoid blocking startup) | |
| rules_store = RulesStore(auto_create_table=False) | |
| rule_enhancer = RuleEnhancer() | |
| _analytics_store: Optional[AnalyticsStore] = None | |
| _analytics_disabled = os.getenv("ANALYTICS_DISABLED", "").lower() in {"1", "true", "yes"} | |
| _analytics_failed = False | |
| def _get_analytics_store() -> Optional[AnalyticsStore]: | |
| global _analytics_store, _analytics_failed | |
| if _analytics_disabled or _analytics_failed: | |
| return None | |
| if _analytics_store is not None: | |
| return _analytics_store | |
| try: | |
| _analytics_store = AnalyticsStore() | |
| except RuntimeError as exc: | |
| logger.warning("Admin analytics disabled: %s", exc) | |
| _analytics_failed = True | |
| _analytics_store = None | |
| except Exception as exc: # pragma: no cover - unexpected failures | |
| logger.debug("Admin analytics unexpected init failure: %s", exc) | |
| _analytics_failed = True | |
| _analytics_store = None | |
| return _analytics_store | |
| def _get_analytics_or_503() -> AnalyticsStore: | |
| store = _get_analytics_store() | |
| if not store: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Analytics is disabled or not configured (Supabase credentials missing).", | |
| ) | |
| return store | |
| def _log_backend_status_once() -> None: | |
| if getattr(_log_backend_status_once, "_already_logged", False): | |
| return | |
| if rules_store.use_supabase: | |
| print("✅ RulesStore: Using Supabase backend") | |
| else: | |
| print("⚠️ RulesStore: Using SQLite backend (set SUPABASE_URL + SUPABASE_SERVICE_KEY to use Supabase)") | |
| analytics = _get_analytics_store() | |
| if analytics is None: | |
| print("⚠️ AnalyticsStore: Disabled (Supabase not configured)") | |
| elif analytics.use_supabase: | |
| print("✅ AnalyticsStore: Using Supabase backend") | |
| else: | |
| print("⚠️ AnalyticsStore: Using fallback backend") | |
| _log_backend_status_once._already_logged = True # type: ignore[attr-defined] | |
| _log_backend_status_once() | |
| class RulePayload(BaseModel): | |
| rule: str | |
| pattern: Optional[str] = None # Regex pattern | |
| severity: Optional[str] = "medium" # low, medium, high, critical | |
| description: Optional[str] = None | |
| enabled: Optional[bool] = True | |
| class BulkRulePayload(BaseModel): | |
| rules: List[str] | |
| def get_rules_for_tenant(tenant_id: str) -> List[str]: | |
| return rules_store.get_rules(tenant_id) | |
| async def get_redflag_rules( | |
| x_tenant_id: str = Header(None), | |
| detailed: bool = Query(False, description="Return full rule metadata including pattern and severity") | |
| ): | |
| """ | |
| Returns all red-flag rules for this tenant. | |
| Set detailed=true to get full metadata including regex patterns and severity levels. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| if detailed: | |
| rules = rules_store.get_rules_detailed(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "rules": rules, | |
| "count": len(rules) | |
| } | |
| else: | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "rules": rules, | |
| "count": len(rules) | |
| } | |
| async def add_redflag_rule( | |
| payload: Optional[RulePayload] = None, | |
| rule: Optional[str] = None, | |
| x_tenant_id: str = Header(None), | |
| enhance: bool = Query(True, description="Use LLM to enhance the rule before saving"), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Adds a new red-flag rule to this tenant. | |
| Flow: | |
| 1. Fetch existing rules for context | |
| 2. Use LLM to analyze and enhance the rule (identify edge cases, improve pattern) | |
| 3. Save enhanced rule to database | |
| Accepts either JSON body or query parameter ?rule=... | |
| JSON body supports: rule, pattern (regex), severity (low/medium/high/critical), description, enabled | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "manage_rules") | |
| rule_value = payload.rule if payload else rule | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Missing rule text") | |
| rule_value = rule_value.strip() | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Rule cannot be empty") | |
| # Step 1: Get existing rules for context | |
| existing_rules = rules_store.get_rules(x_tenant_id) | |
| # Step 2: Enhance rule using LLM (if enhance=True and LLM available) | |
| enhanced_data = None | |
| if enhance: | |
| try: | |
| enhanced_data = await rule_enhancer.enhance_rule( | |
| rule_value, | |
| existing_rules=existing_rules if existing_rules else None, | |
| context=payload.description if payload and payload.description else None | |
| ) | |
| except Exception as e: | |
| # If enhancement fails, continue with original rule | |
| print(f"Rule enhancement failed: {e}, using original rule") | |
| enhanced_data = None | |
| # Step 3: Use enhanced data if available, otherwise use provided values | |
| if enhanced_data: | |
| final_rule = enhanced_data["rule"] | |
| final_pattern = enhanced_data["pattern"] | |
| final_severity = enhanced_data["severity"] | |
| final_description = enhanced_data["description"] | |
| explanation = enhanced_data.get("explanation", "") | |
| examples = enhanced_data.get("examples", []) | |
| missing_patterns = enhanced_data.get("missing_patterns", []) | |
| edge_cases = enhanced_data.get("edge_cases", []) | |
| improvements = enhanced_data.get("improvements", []) | |
| else: | |
| # Use provided values or defaults | |
| final_rule = rule_value | |
| final_pattern = payload.pattern if payload else None | |
| final_severity = payload.severity if payload else "medium" | |
| final_description = payload.description if payload else None | |
| explanation = "" | |
| examples = [] | |
| missing_patterns = [] | |
| edge_cases = [] | |
| improvements = [] | |
| # Override with explicit values if provided (user has final say) | |
| if payload: | |
| if payload.pattern: | |
| final_pattern = payload.pattern | |
| if payload.severity: | |
| final_severity = payload.severity | |
| if payload.description: | |
| final_description = payload.description | |
| # Validate severity | |
| if final_severity not in ["low", "medium", "high", "critical"]: | |
| final_severity = "medium" | |
| enabled = payload.enabled if payload else True | |
| # Step 4: Save to database | |
| rules_store.add_rule( | |
| x_tenant_id, | |
| final_rule, | |
| pattern=final_pattern, | |
| severity=final_severity, | |
| description=final_description, | |
| enabled=enabled | |
| ) | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "original_rule": rule_value, | |
| "added_rule": final_rule, | |
| "pattern": final_pattern or final_rule, | |
| "severity": final_severity, | |
| "description": final_description or final_rule, | |
| "enhanced": enhanced_data is not None, | |
| "explanation": explanation, | |
| "examples": examples, | |
| "missing_patterns": missing_patterns, | |
| "edge_cases": edge_cases, | |
| "improvements": improvements, | |
| "rules": rules | |
| } | |
| async def add_redflag_rules_bulk( | |
| payload: BulkRulePayload, | |
| x_tenant_id: str = Header(None), | |
| enhance: bool = Query(True, description="Use LLM to enhance rules before saving"), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Adds multiple rules in one call. | |
| Flow: | |
| 1. Fetch existing rules for context | |
| 2. Use LLM to enhance each rule (identify edge cases, improve patterns) | |
| 3. Save enhanced rules to database | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "manage_rules") | |
| if not payload.rules: | |
| raise HTTPException(status_code=400, detail="No rules provided") | |
| # Filter out comment lines (starting with #) and empty lines | |
| cleaned = [ | |
| rule.strip() | |
| for rule in payload.rules | |
| if rule.strip() and not rule.strip().startswith("#") | |
| ] | |
| # Step 1: Get existing rules for context | |
| existing_rules = rules_store.get_rules(x_tenant_id) | |
| # Step 2: Enhance rules using LLM (with chunk protection) | |
| enhanced_rules_data = [] | |
| if enhance: | |
| try: | |
| # Process rules in chunks to avoid timeout | |
| # Frontend already chunks to 5, but backend adds extra safety | |
| CHUNK_SIZE = 5 | |
| for i in range(0, len(cleaned), CHUNK_SIZE): | |
| chunk = cleaned[i:i + CHUNK_SIZE] | |
| try: | |
| chunk_enhanced = await rule_enhancer.enhance_rules_bulk( | |
| chunk, | |
| existing_rules=existing_rules if existing_rules else None | |
| ) | |
| enhanced_rules_data.extend(chunk_enhanced) | |
| except Exception as e: | |
| print(f"Chunk {i//CHUNK_SIZE + 1} enhancement failed: {e}") | |
| # Add fallback rules for this chunk | |
| for rule in chunk: | |
| enhanced_rules_data.append({ | |
| "rule": rule, | |
| "pattern": rule, | |
| "description": rule, | |
| "severity": "medium", | |
| "edge_cases": [], | |
| "improvements": ["Enhancement failed for this chunk"], | |
| "keywords": [] | |
| }) | |
| except Exception as e: | |
| print(f"Bulk rule enhancement failed: {e}, using original rules") | |
| enhanced_rules_data = [] | |
| # Step 3: Save enhanced rules to database | |
| added = [] | |
| enhancement_summary = [] | |
| for i, rule in enumerate(cleaned): | |
| try: | |
| if enhanced_rules_data and i < len(enhanced_rules_data): | |
| enhanced = enhanced_rules_data[i] | |
| success = rules_store.add_rule( | |
| x_tenant_id, | |
| enhanced["rule"], | |
| pattern=enhanced["pattern"], | |
| severity=enhanced["severity"], | |
| description=enhanced["description"], | |
| enabled=True | |
| ) | |
| if success: | |
| added.append(enhanced["rule"]) | |
| if enhanced.get("improvements") and "failed" not in str(enhanced.get("improvements", [])).lower(): | |
| enhancement_summary.append(f"{enhanced['rule']}: {', '.join(enhanced['improvements'][:2])}") | |
| else: | |
| # Fallback to original rule | |
| success = rules_store.add_rule(x_tenant_id, rule) | |
| if success: | |
| added.append(rule) | |
| except Exception as e: | |
| print(f"Error saving rule {i+1}: {e}") | |
| # Continue with next rule | |
| continue | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "added_rules": added, | |
| "enhanced": len(enhanced_rules_data) > 0, | |
| "enhancement_summary": enhancement_summary, | |
| "rules": rules | |
| } | |
| async def delete_redflag_rule( | |
| rule: str, | |
| x_tenant_id: str = Header(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Deletes a red-flag rule for this tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "manage_rules") | |
| deleted = rules_store.delete_rule(x_tenant_id, rule) | |
| if not deleted: | |
| raise HTTPException(status_code=404, detail="Rule not found") | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "deleted_rule": rule, | |
| "rules": rules | |
| } | |
| async def get_violations( | |
| x_tenant_id: str = Header(None), | |
| limit: int = Query(50, description="Maximum number of violations to return"), | |
| days: int = Query(30, description="Number of days to look back") | |
| ): | |
| """ | |
| Returns red-flag violations for this tenant. | |
| Includes rule details, severity, confidence, and timestamps. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None | |
| analytics = _get_analytics_or_503() | |
| violations = analytics.get_redflag_violations(x_tenant_id, limit, since_timestamp) | |
| # Convert timestamps to ISO format | |
| for violation in violations: | |
| if "timestamp" in violation: | |
| violation["timestamp_iso"] = datetime.fromtimestamp(violation["timestamp"]).isoformat() | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "violations": violations, | |
| "count": len(violations), | |
| "period_days": days | |
| } | |
| async def get_tool_logs( | |
| x_tenant_id: str = Header(None), | |
| tool_name: Optional[str] = Query(None, description="Filter by tool name (rag, web, admin, llm)"), | |
| days: int = Query(7, description="Number of days to look back"), | |
| limit: int = Query(100, description="Maximum number of logs to return") | |
| ): | |
| """ | |
| Returns detailed tool usage logs for this tenant. | |
| Includes every tool call with timestamp, latency, tokens, and success/error status. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| # For now, return aggregated stats. Full log querying would require extending AnalyticsStore | |
| since_timestamp = int((datetime.now() - timedelta(days=days)).timestamp()) if days else None | |
| analytics = _get_analytics_or_503() | |
| tool_stats = analytics.get_tool_usage_stats(x_tenant_id, since_timestamp) | |
| # Filter by tool if specified | |
| if tool_name: | |
| tool_stats = {tool_name: tool_stats.get(tool_name)} if tool_name in tool_stats else {} | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "tool_usage": tool_stats, | |
| "period_days": days | |
| } | |
| async def upload_rules_from_file( | |
| file: UploadFile = File(...), | |
| x_tenant_id: str = Header(None), | |
| enhance: bool = Query(True, description="Use LLM to enhance rules before saving"), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Upload rules from a file (TXT, PDF, DOC, DOCX). | |
| Extracts text from the file and processes rules. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "manage_rules") | |
| if not file.filename: | |
| raise HTTPException(status_code=400, detail="No file provided") | |
| file_ext = file.filename.split('.')[-1].lower() if '.' in file.filename else '' | |
| if file_ext not in ['txt', 'pdf', 'doc', 'docx', 'md']: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file type: {file_ext}. Supported: TXT, PDF, DOC, DOCX, MD" | |
| ) | |
| try: | |
| # Read file bytes | |
| file_bytes = await file.read() | |
| if not file_bytes: | |
| raise HTTPException(status_code=400, detail="File is empty") | |
| # Extract text from file | |
| try: | |
| extracted_text = extract_text_from_file_bytes(file_bytes, file.filename) | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| if not extracted_text or not extracted_text.strip(): | |
| raise HTTPException(status_code=400, detail="No text could be extracted from file") | |
| # Parse rules (filter comments and empty lines) | |
| rules = [ | |
| rule.strip() | |
| for rule in extracted_text.splitlines() | |
| if rule.strip() and not rule.strip().startswith("#") | |
| ] | |
| if not rules: | |
| raise HTTPException(status_code=400, detail="No valid rules found in file (after filtering comments)") | |
| # Get existing rules for context | |
| existing_rules = rules_store.get_rules(x_tenant_id) | |
| # Enhance rules using LLM | |
| enhanced_rules_data = [] | |
| if enhance: | |
| try: | |
| CHUNK_SIZE = 5 | |
| for i in range(0, len(rules), CHUNK_SIZE): | |
| chunk = rules[i:i + CHUNK_SIZE] | |
| try: | |
| chunk_enhanced = await rule_enhancer.enhance_rules_bulk( | |
| chunk, | |
| existing_rules=existing_rules if existing_rules else None | |
| ) | |
| enhanced_rules_data.extend(chunk_enhanced) | |
| except Exception as e: | |
| print(f"Chunk {i//CHUNK_SIZE + 1} enhancement failed: {e}") | |
| for rule in chunk: | |
| enhanced_rules_data.append({ | |
| "rule": rule, | |
| "pattern": rule, | |
| "description": rule, | |
| "severity": "medium", | |
| "edge_cases": [], | |
| "improvements": ["Enhancement failed for this chunk"], | |
| "keywords": [] | |
| }) | |
| except Exception as e: | |
| print(f"Bulk rule enhancement failed: {e}, using original rules") | |
| enhanced_rules_data = [] | |
| # Save rules to database | |
| added = [] | |
| enhancement_summary = [] | |
| for i, rule in enumerate(rules): | |
| try: | |
| if enhanced_rules_data and i < len(enhanced_rules_data): | |
| enhanced = enhanced_rules_data[i] | |
| success = rules_store.add_rule( | |
| x_tenant_id, | |
| enhanced["rule"], | |
| pattern=enhanced["pattern"], | |
| severity=enhanced["severity"], | |
| description=enhanced["description"], | |
| enabled=True | |
| ) | |
| if success: | |
| added.append(enhanced["rule"]) | |
| if enhanced.get("improvements") and "failed" not in str(enhanced.get("improvements", [])).lower(): | |
| enhancement_summary.append(f"{enhanced['rule']}: {', '.join(enhanced['improvements'][:2])}") | |
| else: | |
| success = rules_store.add_rule(x_tenant_id, rule) | |
| if success: | |
| added.append(rule) | |
| except Exception as e: | |
| print(f"Error saving rule {i+1}: {e}") | |
| continue | |
| rules_list = get_rules_for_tenant(x_tenant_id) | |
| # Collect explanations, examples, and missing patterns from enhanced rules | |
| explanations_data = [] | |
| for i, enhanced in enumerate(enhanced_rules_data): | |
| if i < len(rules): | |
| explanations_data.append({ | |
| "rule": enhanced.get("rule", rules[i]), | |
| "explanation": enhanced.get("explanation", ""), | |
| "examples": enhanced.get("examples", []), | |
| "missing_patterns": enhanced.get("missing_patterns", []) | |
| }) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "filename": file.filename, | |
| "added_rules": added, | |
| "total_extracted": len(rules), | |
| "enhanced": len(enhanced_rules_data) > 0, | |
| "enhancement_summary": enhancement_summary, | |
| "explanations": explanations_data, | |
| "rules": rules_list | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") | |
| async def create_supabase_table(x_tenant_id: str = Header(None)): | |
| """ | |
| Create the admin_rules table in Supabase if it doesn't exist. | |
| This endpoint can be called after startup to set up the table. | |
| """ | |
| try: | |
| if rules_store.use_supabase: | |
| success = rules_store.create_table_if_needed() | |
| if success: | |
| return { | |
| "status": "success", | |
| "message": "Table created successfully", | |
| "table": "admin_rules" | |
| } | |
| else: | |
| return { | |
| "status": "manual_required", | |
| "message": "Automatic table creation failed. Please run SQL manually.", | |
| "instructions": "Go to Supabase Dashboard → SQL Editor → Run supabase_admin_rules_table.sql" | |
| } | |
| else: | |
| return { | |
| "status": "not_applicable", | |
| "message": "Using SQLite, not Supabase. Table creation not needed." | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def list_tenants(): | |
| """ | |
| Lists all tenants (placeholder - would need tenant management table). | |
| For demo purposes, returns info about available tenant data. | |
| """ | |
| # Placeholder implementation - in production, this would query a tenants table | |
| return { | |
| "tenants": [], | |
| "message": "Tenant management not fully implemented. Use tenant_id in headers for multi-tenant operations." | |
| } | |
| async def create_tenant( | |
| tenant_id: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ): | |
| """ | |
| Creates a new tenant (placeholder - would need tenant management table). | |
| """ | |
| # Placeholder implementation | |
| return { | |
| "tenant_id": tenant_id, | |
| "status": "created", | |
| "message": "Tenant management not fully implemented. Tenant IDs are created on first use." | |
| } | |
| async def delete_tenant(tenant_id: str): | |
| """ | |
| Deletes a tenant and all associated data (placeholder). | |
| WARNING: This would delete all rules, analytics, and documents for the tenant. | |
| """ | |
| # Placeholder implementation | |
| return { | |
| "tenant_id": tenant_id, | |
| "status": "deleted", | |
| "message": "Tenant deletion not fully implemented. This would delete all tenant data." | |
| } | |