Spaces:
Sleeping
Sleeping
| """ | |
| Admin UI Backend Endpoints | |
| Administrative controls for system oversight and review management | |
| Features: | |
| - Review queue management | |
| - System configuration | |
| - User management (placeholder) | |
| - Performance monitoring dashboard | |
| - Compliance reporting interface | |
| - Model versioning controls | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| from fastapi import APIRouter, HTTPException, Depends | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timedelta | |
| from pydantic import BaseModel | |
| from monitoring_service import get_monitoring_service | |
| from model_versioning import get_versioning_system | |
| from production_logging import get_medical_logger | |
| from compliance_reporting import get_compliance_system | |
| # Create admin router | |
| admin_router = APIRouter(prefix="/admin", tags=["admin"]) | |
| # ================================ | |
| # REQUEST/RESPONSE MODELS | |
| # ================================ | |
| class ReviewQueueItem(BaseModel): | |
| """Review queue item""" | |
| item_id: str | |
| document_id: str | |
| document_type: str | |
| confidence_score: float | |
| risk_level: str | |
| created_at: str | |
| assigned_to: Optional[str] = None | |
| priority: str # "critical", "high", "medium", "low" | |
| class ReviewAction(BaseModel): | |
| """Review action request""" | |
| item_id: str | |
| reviewer_id: str | |
| action: str # "approve", "reject", "escalate" | |
| comments: Optional[str] = None | |
| class SystemConfiguration(BaseModel): | |
| """System configuration""" | |
| error_threshold: float = 0.05 | |
| cache_size_mb: int = 1000 | |
| cache_ttl_hours: int = 24 | |
| alert_email: Optional[str] = None | |
| class ModelDeployment(BaseModel): | |
| """Model deployment request""" | |
| model_id: str | |
| version: str | |
| set_active: bool = False | |
| # ================================ | |
| # REVIEW QUEUE ENDPOINTS | |
| # ================================ | |
| # In-memory review queue (in production, use database) | |
| review_queue: List[ReviewQueueItem] = [] | |
| async def get_review_queue( | |
| priority: Optional[str] = None, | |
| status: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Get current review queue""" | |
| filtered_queue = review_queue | |
| if priority: | |
| filtered_queue = [item for item in filtered_queue if item.priority == priority] | |
| return { | |
| "total_items": len(review_queue), | |
| "filtered_items": len(filtered_queue), | |
| "queue": [item.dict() for item in filtered_queue], | |
| "summary": { | |
| "critical": len([i for i in review_queue if i.priority == "critical"]), | |
| "high": len([i for i in review_queue if i.priority == "high"]), | |
| "medium": len([i for i in review_queue if i.priority == "medium"]), | |
| "low": len([i for i in review_queue if i.priority == "low"]) | |
| } | |
| } | |
| async def submit_review_action(action: ReviewAction) -> Dict[str, Any]: | |
| """Submit review action (approve/reject/escalate)""" | |
| # Find item in queue | |
| item = next((i for i in review_queue if i.item_id == action.item_id), None) | |
| if not item: | |
| raise HTTPException(status_code=404, detail="Review item not found") | |
| # Log review action | |
| logger = get_medical_logger() | |
| logger.info( | |
| f"Review action: {action.action} on {action.item_id}", | |
| user_id=action.reviewer_id, | |
| document_id=item.document_id, | |
| details={"action": action.action, "comments": action.comments} | |
| ) | |
| # Log to compliance system | |
| compliance = get_compliance_system() | |
| compliance.log_audit_event( | |
| user_id=action.reviewer_id, | |
| event_type="REVIEW", | |
| resource=f"document:{item.document_id}", | |
| action=action.action.upper(), | |
| ip_address="internal", | |
| details={"item_id": action.item_id, "comments": action.comments} | |
| ) | |
| # Remove from queue if approved or rejected | |
| if action.action in ["approve", "reject"]: | |
| review_queue.remove(item) | |
| return { | |
| "success": True, | |
| "action": action.action, | |
| "item_id": action.item_id, | |
| "message": f"Review {action.action}d successfully" | |
| } | |
| async def assign_review( | |
| item_id: str, | |
| reviewer_id: str | |
| ) -> Dict[str, Any]: | |
| """Assign review to a reviewer""" | |
| item = next((i for i in review_queue if i.item_id == item_id), None) | |
| if not item: | |
| raise HTTPException(status_code=404, detail="Review item not found") | |
| item.assigned_to = reviewer_id | |
| return { | |
| "success": True, | |
| "item_id": item_id, | |
| "assigned_to": reviewer_id | |
| } | |
| # ================================ | |
| # MONITORING DASHBOARD ENDPOINTS | |
| # ================================ | |
| async def get_admin_dashboard() -> Dict[str, Any]: | |
| """Get comprehensive admin dashboard data""" | |
| monitoring = get_monitoring_service() | |
| versioning = get_versioning_system() | |
| compliance = get_compliance_system() | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "system_health": monitoring.get_system_health(), | |
| "performance_dashboard": monitoring.get_performance_dashboard(), | |
| "model_inventory": versioning.get_system_status(), | |
| "compliance_dashboard": compliance.get_compliance_dashboard(), | |
| "review_queue_summary": { | |
| "total_items": len(review_queue), | |
| "critical_items": len([i for i in review_queue if i.priority == "critical"]), | |
| "unassigned_items": len([i for i in review_queue if not i.assigned_to]) | |
| } | |
| } | |
| async def get_performance_metrics( | |
| window_minutes: int = 60 | |
| ) -> Dict[str, Any]: | |
| """Get detailed performance metrics""" | |
| monitoring = get_monitoring_service() | |
| # Get statistics for key stages | |
| stages = ["pdf_processing", "classification", "model_routing", "synthesis"] | |
| performance_data = {} | |
| for stage in stages: | |
| stats = monitoring.latency_tracker.get_stage_statistics(stage, window_minutes) | |
| performance_data[stage] = stats | |
| error_summary = monitoring.error_monitor.get_error_summary() | |
| return { | |
| "window_minutes": window_minutes, | |
| "latency_by_stage": performance_data, | |
| "error_summary": error_summary, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_cache_metrics() -> Dict[str, Any]: | |
| """Get cache performance metrics""" | |
| versioning = get_versioning_system() | |
| cache_stats = versioning.input_cache.get_statistics() | |
| return { | |
| "cache_statistics": cache_stats, | |
| "recommendations": _generate_cache_recommendations(cache_stats), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # ================================ | |
| # MODEL MANAGEMENT ENDPOINTS | |
| # ================================ | |
| async def get_model_inventory() -> Dict[str, Any]: | |
| """Get complete model inventory""" | |
| versioning = get_versioning_system() | |
| inventory = versioning.model_registry.get_model_inventory() | |
| return { | |
| "inventory": inventory, | |
| "summary": { | |
| "total_models": len(inventory), | |
| "total_versions": sum(data["total_versions"] for data in inventory.values()) | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def deploy_model_version(deployment: ModelDeployment) -> Dict[str, Any]: | |
| """Deploy a model version""" | |
| versioning = get_versioning_system() | |
| try: | |
| if deployment.set_active: | |
| versioning.model_registry.set_active_version( | |
| deployment.model_id, | |
| deployment.version | |
| ) | |
| # Invalidate cache for this model | |
| versioning.input_cache.invalidate_model_version(deployment.version) | |
| return { | |
| "success": True, | |
| "model_id": deployment.model_id, | |
| "version": deployment.version, | |
| "active": deployment.set_active, | |
| "message": f"Model {deployment.model_id} v{deployment.version} deployed" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def rollback_model( | |
| model_id: str, | |
| version: str | |
| ) -> Dict[str, Any]: | |
| """Rollback to a previous model version""" | |
| versioning = get_versioning_system() | |
| success = versioning.model_registry.rollback_to_version(model_id, version) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Model version not found") | |
| # Invalidate cache | |
| versioning.input_cache.invalidate_model_version(version) | |
| return { | |
| "success": True, | |
| "model_id": model_id, | |
| "rolled_back_to": version, | |
| "message": f"Rolled back {model_id} to v{version}" | |
| } | |
| async def compare_model_versions( | |
| model_id: str, | |
| version1: str, | |
| version2: str, | |
| metric: str = "accuracy" | |
| ) -> Dict[str, Any]: | |
| """Compare two model versions""" | |
| versioning = get_versioning_system() | |
| comparison = versioning.model_registry.compare_versions( | |
| model_id, version1, version2, metric | |
| ) | |
| return comparison | |
| # ================================ | |
| # COMPLIANCE ENDPOINTS | |
| # ================================ | |
| async def get_hipaa_report( | |
| days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Generate HIPAA compliance report""" | |
| compliance = get_compliance_system() | |
| end_date = datetime.utcnow() | |
| start_date = end_date - timedelta(days=days) | |
| report = compliance.generate_hipaa_report(start_date, end_date) | |
| return report | |
| async def get_gdpr_report( | |
| days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Generate GDPR compliance report""" | |
| compliance = get_compliance_system() | |
| end_date = datetime.utcnow() | |
| start_date = end_date - timedelta(days=days) | |
| report = compliance.generate_gdpr_report(start_date, end_date) | |
| return report | |
| async def get_quality_metrics( | |
| days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Get clinical quality metrics""" | |
| compliance = get_compliance_system() | |
| report = compliance.generate_quality_metrics_report(days) | |
| return report | |
| async def get_security_incidents( | |
| days: int = 30 | |
| ) -> Dict[str, Any]: | |
| """Get security incidents report""" | |
| compliance = get_compliance_system() | |
| report = compliance.generate_security_incidents_report(days) | |
| return report | |
| # ================================ | |
| # SYSTEM CONFIGURATION ENDPOINTS | |
| # ================================ | |
| # In-memory configuration (in production, use database) | |
| system_config = SystemConfiguration() | |
| async def get_system_configuration() -> SystemConfiguration: | |
| """Get current system configuration""" | |
| return system_config | |
| async def update_system_configuration( | |
| config: SystemConfiguration | |
| ) -> Dict[str, Any]: | |
| """Update system configuration""" | |
| global system_config | |
| system_config = config | |
| logger = get_medical_logger() | |
| logger.info( | |
| "System configuration updated", | |
| details=config.dict() | |
| ) | |
| return { | |
| "success": True, | |
| "config": config.dict(), | |
| "message": "System configuration updated" | |
| } | |
| async def clear_cache() -> Dict[str, Any]: | |
| """Clear all cache entries""" | |
| versioning = get_versioning_system() | |
| versioning.input_cache.clear() | |
| return { | |
| "success": True, | |
| "message": "Cache cleared successfully" | |
| } | |
| # ================================ | |
| # ALERTS MANAGEMENT | |
| # ================================ | |
| async def get_active_alerts( | |
| level: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Get active system alerts""" | |
| monitoring = get_monitoring_service() | |
| from monitoring_service import AlertLevel | |
| alert_level = None | |
| if level: | |
| alert_level = AlertLevel(level.upper()) | |
| alerts = monitoring.alert_manager.get_active_alerts(level=alert_level) | |
| summary = monitoring.alert_manager.get_alert_summary() | |
| return { | |
| "active_alerts": [a.to_dict() for a in alerts], | |
| "summary": summary, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def resolve_alert(alert_id: str) -> Dict[str, Any]: | |
| """Resolve an active alert""" | |
| monitoring = get_monitoring_service() | |
| monitoring.alert_manager.resolve_alert(alert_id) | |
| return { | |
| "success": True, | |
| "alert_id": alert_id, | |
| "message": "Alert resolved" | |
| } | |
| # ================================ | |
| # CACHE MANAGEMENT ENDPOINTS | |
| # ================================ | |
| async def get_cache_statistics() -> Dict[str, Any]: | |
| """ | |
| Get comprehensive cache statistics | |
| Returns cache performance metrics including: | |
| - Hit/miss rates | |
| - Memory usage | |
| - Entry count | |
| - Eviction statistics | |
| """ | |
| monitoring = get_monitoring_service() | |
| cache_stats = monitoring.get_cache_statistics() | |
| return { | |
| "statistics": cache_stats, | |
| "recommendations": _generate_cache_recommendations_v2(cache_stats), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def list_cache_entries(limit: int = 100) -> Dict[str, Any]: | |
| """ | |
| List cache entries with metadata | |
| Args: | |
| limit: Maximum number of entries to return (default: 100) | |
| """ | |
| monitoring = get_monitoring_service() | |
| entries = monitoring.cache_service.list_entries(limit=limit) | |
| return { | |
| "entries": entries, | |
| "total_shown": len(entries), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_cache_entry_info(key: str) -> Dict[str, Any]: | |
| """ | |
| Get detailed information about a specific cache entry | |
| Args: | |
| key: Cache key (SHA256 fingerprint) | |
| """ | |
| monitoring = get_monitoring_service() | |
| entry_info = monitoring.cache_service.get_entry_info(key) | |
| if entry_info is None: | |
| raise HTTPException(status_code=404, detail="Cache entry not found") | |
| return entry_info | |
| async def invalidate_cache_entry(key: str) -> Dict[str, Any]: | |
| """ | |
| Invalidate a specific cache entry | |
| Args: | |
| key: Cache key (SHA256 fingerprint) | |
| """ | |
| monitoring = get_monitoring_service() | |
| success = monitoring.cache_service.invalidate(key) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Cache entry not found") | |
| return { | |
| "success": True, | |
| "key": key, | |
| "message": "Cache entry invalidated" | |
| } | |
| async def clear_cache() -> Dict[str, Any]: | |
| """ | |
| Clear all cache entries | |
| WARNING: This will clear all cached data and may temporarily impact performance | |
| """ | |
| monitoring = get_monitoring_service() | |
| monitoring.cache_service.clear() | |
| return { | |
| "success": True, | |
| "message": "All cache entries cleared", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # ================================ | |
| # HELPER FUNCTIONS | |
| # ================================ | |
| def _generate_cache_recommendations_v2(stats: Dict[str, Any]) -> List[str]: | |
| """Generate cache optimization recommendations based on statistics""" | |
| recommendations = [] | |
| hit_rate = stats.get("hit_rate", 0.0) | |
| memory_usage = stats.get("memory_usage_mb", 0.0) | |
| max_memory = stats.get("max_memory_mb", 512) | |
| evictions = stats.get("evictions", 0) | |
| total_entries = stats.get("total_entries", 0) | |
| # Hit rate recommendations | |
| if hit_rate < 0.5: | |
| recommendations.append(f"Low cache hit rate ({hit_rate*100:.1f}%). Consider increasing cache size or TTL.") | |
| elif hit_rate > 0.8: | |
| recommendations.append(f"Excellent cache hit rate ({hit_rate*100:.1f}%). Cache performing optimally.") | |
| # Memory recommendations | |
| utilization = (memory_usage / max_memory) * 100 if max_memory > 0 else 0 | |
| if utilization > 90: | |
| recommendations.append(f"Cache near capacity ({utilization:.1f}% used). Consider increasing max cache size.") | |
| # Eviction recommendations | |
| if total_entries > 0 and evictions > total_entries * 0.1: | |
| recommendations.append(f"High eviction rate ({evictions} evictions). Increase cache size to improve performance.") | |
| # Default message | |
| if not recommendations: | |
| recommendations.append("Cache performing within normal parameters.") | |
| return recommendations | |
| def _generate_cache_recommendations(stats: Dict[str, Any]) -> List[str]: | |
| """Generate cache optimization recommendations""" | |
| recommendations = [] | |
| if stats["hit_rate_percent"] < 50: | |
| recommendations.append("Low cache hit rate. Consider increasing cache size or TTL.") | |
| if stats["utilization_percent"] > 90: | |
| recommendations.append("Cache near capacity. Consider increasing max cache size.") | |
| if stats["evictions"] > stats["total_requests"] * 0.1: | |
| recommendations.append("High eviction rate. Increase cache size to improve performance.") | |
| if not recommendations: | |
| recommendations.append("Cache performing optimally.") | |
| return recommendations | |