""" Admin UI Backend Endpoints Administrative controls for system oversight and review management Features: - Review queue management - System configuration - User management (placeholder) - Performance monitoring dashboard - Compliance reporting interface - Model versioning controls Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ from fastapi import APIRouter, HTTPException, Depends from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from pydantic import BaseModel from monitoring_service import get_monitoring_service from model_versioning import get_versioning_system from production_logging import get_medical_logger from compliance_reporting import get_compliance_system # Create admin router admin_router = APIRouter(prefix="/admin", tags=["admin"]) # ================================ # REQUEST/RESPONSE MODELS # ================================ class ReviewQueueItem(BaseModel): """Review queue item""" item_id: str document_id: str document_type: str confidence_score: float risk_level: str created_at: str assigned_to: Optional[str] = None priority: str # "critical", "high", "medium", "low" class ReviewAction(BaseModel): """Review action request""" item_id: str reviewer_id: str action: str # "approve", "reject", "escalate" comments: Optional[str] = None class SystemConfiguration(BaseModel): """System configuration""" error_threshold: float = 0.05 cache_size_mb: int = 1000 cache_ttl_hours: int = 24 alert_email: Optional[str] = None class ModelDeployment(BaseModel): """Model deployment request""" model_id: str version: str set_active: bool = False # ================================ # REVIEW QUEUE ENDPOINTS # ================================ # In-memory review queue (in production, use database) review_queue: List[ReviewQueueItem] = [] @admin_router.get("/review-queue") async def get_review_queue( priority: Optional[str] = None, status: Optional[str] = None ) -> Dict[str, Any]: """Get current review queue""" filtered_queue = review_queue if priority: filtered_queue = [item for item in filtered_queue if item.priority == priority] return { "total_items": len(review_queue), "filtered_items": len(filtered_queue), "queue": [item.dict() for item in filtered_queue], "summary": { "critical": len([i for i in review_queue if i.priority == "critical"]), "high": len([i for i in review_queue if i.priority == "high"]), "medium": len([i for i in review_queue if i.priority == "medium"]), "low": len([i for i in review_queue if i.priority == "low"]) } } @admin_router.post("/review-queue/action") async def submit_review_action(action: ReviewAction) -> Dict[str, Any]: """Submit review action (approve/reject/escalate)""" # Find item in queue item = next((i for i in review_queue if i.item_id == action.item_id), None) if not item: raise HTTPException(status_code=404, detail="Review item not found") # Log review action logger = get_medical_logger() logger.info( f"Review action: {action.action} on {action.item_id}", user_id=action.reviewer_id, document_id=item.document_id, details={"action": action.action, "comments": action.comments} ) # Log to compliance system compliance = get_compliance_system() compliance.log_audit_event( user_id=action.reviewer_id, event_type="REVIEW", resource=f"document:{item.document_id}", action=action.action.upper(), ip_address="internal", details={"item_id": action.item_id, "comments": action.comments} ) # Remove from queue if approved or rejected if action.action in ["approve", "reject"]: review_queue.remove(item) return { "success": True, "action": action.action, "item_id": action.item_id, "message": f"Review {action.action}d successfully" } @admin_router.post("/review-queue/assign") async def assign_review( item_id: str, reviewer_id: str ) -> Dict[str, Any]: """Assign review to a reviewer""" item = next((i for i in review_queue if i.item_id == item_id), None) if not item: raise HTTPException(status_code=404, detail="Review item not found") item.assigned_to = reviewer_id return { "success": True, "item_id": item_id, "assigned_to": reviewer_id } # ================================ # MONITORING DASHBOARD ENDPOINTS # ================================ @admin_router.get("/dashboard") async def get_admin_dashboard() -> Dict[str, Any]: """Get comprehensive admin dashboard data""" monitoring = get_monitoring_service() versioning = get_versioning_system() compliance = get_compliance_system() return { "timestamp": datetime.utcnow().isoformat(), "system_health": monitoring.get_system_health(), "performance_dashboard": monitoring.get_performance_dashboard(), "model_inventory": versioning.get_system_status(), "compliance_dashboard": compliance.get_compliance_dashboard(), "review_queue_summary": { "total_items": len(review_queue), "critical_items": len([i for i in review_queue if i.priority == "critical"]), "unassigned_items": len([i for i in review_queue if not i.assigned_to]) } } @admin_router.get("/metrics/performance") async def get_performance_metrics( window_minutes: int = 60 ) -> Dict[str, Any]: """Get detailed performance metrics""" monitoring = get_monitoring_service() # Get statistics for key stages stages = ["pdf_processing", "classification", "model_routing", "synthesis"] performance_data = {} for stage in stages: stats = monitoring.latency_tracker.get_stage_statistics(stage, window_minutes) performance_data[stage] = stats error_summary = monitoring.error_monitor.get_error_summary() return { "window_minutes": window_minutes, "latency_by_stage": performance_data, "error_summary": error_summary, "timestamp": datetime.utcnow().isoformat() } @admin_router.get("/metrics/cache") async def get_cache_metrics() -> Dict[str, Any]: """Get cache performance metrics""" versioning = get_versioning_system() cache_stats = versioning.input_cache.get_statistics() return { "cache_statistics": cache_stats, "recommendations": _generate_cache_recommendations(cache_stats), "timestamp": datetime.utcnow().isoformat() } # ================================ # MODEL MANAGEMENT ENDPOINTS # ================================ @admin_router.get("/models/inventory") async def get_model_inventory() -> Dict[str, Any]: """Get complete model inventory""" versioning = get_versioning_system() inventory = versioning.model_registry.get_model_inventory() return { "inventory": inventory, "summary": { "total_models": len(inventory), "total_versions": sum(data["total_versions"] for data in inventory.values()) }, "timestamp": datetime.utcnow().isoformat() } @admin_router.post("/models/deploy") async def deploy_model_version(deployment: ModelDeployment) -> Dict[str, Any]: """Deploy a model version""" versioning = get_versioning_system() try: if deployment.set_active: versioning.model_registry.set_active_version( deployment.model_id, deployment.version ) # Invalidate cache for this model versioning.input_cache.invalidate_model_version(deployment.version) return { "success": True, "model_id": deployment.model_id, "version": deployment.version, "active": deployment.set_active, "message": f"Model {deployment.model_id} v{deployment.version} deployed" } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @admin_router.post("/models/rollback") async def rollback_model( model_id: str, version: str ) -> Dict[str, Any]: """Rollback to a previous model version""" versioning = get_versioning_system() success = versioning.model_registry.rollback_to_version(model_id, version) if not success: raise HTTPException(status_code=404, detail="Model version not found") # Invalidate cache versioning.input_cache.invalidate_model_version(version) return { "success": True, "model_id": model_id, "rolled_back_to": version, "message": f"Rolled back {model_id} to v{version}" } @admin_router.get("/models/compare") async def compare_model_versions( model_id: str, version1: str, version2: str, metric: str = "accuracy" ) -> Dict[str, Any]: """Compare two model versions""" versioning = get_versioning_system() comparison = versioning.model_registry.compare_versions( model_id, version1, version2, metric ) return comparison # ================================ # COMPLIANCE ENDPOINTS # ================================ @admin_router.get("/compliance/hipaa-report") async def get_hipaa_report( days: int = 30 ) -> Dict[str, Any]: """Generate HIPAA compliance report""" compliance = get_compliance_system() end_date = datetime.utcnow() start_date = end_date - timedelta(days=days) report = compliance.generate_hipaa_report(start_date, end_date) return report @admin_router.get("/compliance/gdpr-report") async def get_gdpr_report( days: int = 30 ) -> Dict[str, Any]: """Generate GDPR compliance report""" compliance = get_compliance_system() end_date = datetime.utcnow() start_date = end_date - timedelta(days=days) report = compliance.generate_gdpr_report(start_date, end_date) return report @admin_router.get("/compliance/quality-metrics") async def get_quality_metrics( days: int = 30 ) -> Dict[str, Any]: """Get clinical quality metrics""" compliance = get_compliance_system() report = compliance.generate_quality_metrics_report(days) return report @admin_router.get("/compliance/security-incidents") async def get_security_incidents( days: int = 30 ) -> Dict[str, Any]: """Get security incidents report""" compliance = get_compliance_system() report = compliance.generate_security_incidents_report(days) return report # ================================ # SYSTEM CONFIGURATION ENDPOINTS # ================================ # In-memory configuration (in production, use database) system_config = SystemConfiguration() @admin_router.get("/config") async def get_system_configuration() -> SystemConfiguration: """Get current system configuration""" return system_config @admin_router.post("/config") async def update_system_configuration( config: SystemConfiguration ) -> Dict[str, Any]: """Update system configuration""" global system_config system_config = config logger = get_medical_logger() logger.info( "System configuration updated", details=config.dict() ) return { "success": True, "config": config.dict(), "message": "System configuration updated" } @admin_router.post("/cache/clear") async def clear_cache() -> Dict[str, Any]: """Clear all cache entries""" versioning = get_versioning_system() versioning.input_cache.clear() return { "success": True, "message": "Cache cleared successfully" } # ================================ # ALERTS MANAGEMENT # ================================ @admin_router.get("/alerts") async def get_active_alerts( level: Optional[str] = None ) -> Dict[str, Any]: """Get active system alerts""" monitoring = get_monitoring_service() from monitoring_service import AlertLevel alert_level = None if level: alert_level = AlertLevel(level.upper()) alerts = monitoring.alert_manager.get_active_alerts(level=alert_level) summary = monitoring.alert_manager.get_alert_summary() return { "active_alerts": [a.to_dict() for a in alerts], "summary": summary, "timestamp": datetime.utcnow().isoformat() } @admin_router.post("/alerts/{alert_id}/resolve") async def resolve_alert(alert_id: str) -> Dict[str, Any]: """Resolve an active alert""" monitoring = get_monitoring_service() monitoring.alert_manager.resolve_alert(alert_id) return { "success": True, "alert_id": alert_id, "message": "Alert resolved" } # ================================ # CACHE MANAGEMENT ENDPOINTS # ================================ @admin_router.get("/cache/statistics") async def get_cache_statistics() -> Dict[str, Any]: """ Get comprehensive cache statistics Returns cache performance metrics including: - Hit/miss rates - Memory usage - Entry count - Eviction statistics """ monitoring = get_monitoring_service() cache_stats = monitoring.get_cache_statistics() return { "statistics": cache_stats, "recommendations": _generate_cache_recommendations_v2(cache_stats), "timestamp": datetime.utcnow().isoformat() } @admin_router.get("/cache/entries") async def list_cache_entries(limit: int = 100) -> Dict[str, Any]: """ List cache entries with metadata Args: limit: Maximum number of entries to return (default: 100) """ monitoring = get_monitoring_service() entries = monitoring.cache_service.list_entries(limit=limit) return { "entries": entries, "total_shown": len(entries), "timestamp": datetime.utcnow().isoformat() } @admin_router.get("/cache/entry/{key}") async def get_cache_entry_info(key: str) -> Dict[str, Any]: """ Get detailed information about a specific cache entry Args: key: Cache key (SHA256 fingerprint) """ monitoring = get_monitoring_service() entry_info = monitoring.cache_service.get_entry_info(key) if entry_info is None: raise HTTPException(status_code=404, detail="Cache entry not found") return entry_info @admin_router.post("/cache/invalidate/{key}") async def invalidate_cache_entry(key: str) -> Dict[str, Any]: """ Invalidate a specific cache entry Args: key: Cache key (SHA256 fingerprint) """ monitoring = get_monitoring_service() success = monitoring.cache_service.invalidate(key) if not success: raise HTTPException(status_code=404, detail="Cache entry not found") return { "success": True, "key": key, "message": "Cache entry invalidated" } @admin_router.post("/cache/clear") async def clear_cache() -> Dict[str, Any]: """ Clear all cache entries WARNING: This will clear all cached data and may temporarily impact performance """ monitoring = get_monitoring_service() monitoring.cache_service.clear() return { "success": True, "message": "All cache entries cleared", "timestamp": datetime.utcnow().isoformat() } # ================================ # HELPER FUNCTIONS # ================================ def _generate_cache_recommendations_v2(stats: Dict[str, Any]) -> List[str]: """Generate cache optimization recommendations based on statistics""" recommendations = [] hit_rate = stats.get("hit_rate", 0.0) memory_usage = stats.get("memory_usage_mb", 0.0) max_memory = stats.get("max_memory_mb", 512) evictions = stats.get("evictions", 0) total_entries = stats.get("total_entries", 0) # Hit rate recommendations if hit_rate < 0.5: recommendations.append(f"Low cache hit rate ({hit_rate*100:.1f}%). Consider increasing cache size or TTL.") elif hit_rate > 0.8: recommendations.append(f"Excellent cache hit rate ({hit_rate*100:.1f}%). Cache performing optimally.") # Memory recommendations utilization = (memory_usage / max_memory) * 100 if max_memory > 0 else 0 if utilization > 90: recommendations.append(f"Cache near capacity ({utilization:.1f}% used). Consider increasing max cache size.") # Eviction recommendations if total_entries > 0 and evictions > total_entries * 0.1: recommendations.append(f"High eviction rate ({evictions} evictions). Increase cache size to improve performance.") # Default message if not recommendations: recommendations.append("Cache performing within normal parameters.") return recommendations def _generate_cache_recommendations(stats: Dict[str, Any]) -> List[str]: """Generate cache optimization recommendations""" recommendations = [] if stats["hit_rate_percent"] < 50: recommendations.append("Low cache hit rate. Consider increasing cache size or TTL.") if stats["utilization_percent"] > 90: recommendations.append("Cache near capacity. Consider increasing max cache size.") if stats["evictions"] > stats["total_requests"] * 0.1: recommendations.append("High eviction rate. Increase cache size to improve performance.") if not recommendations: recommendations.append("Cache performing optimally.") return recommendations