""" Dashboard Data Loader Handles data retrieval from the backend database and transforms data into chart-ready formats for dashboard visualization. This layer abstracts database queries and provides clean interfaces for the visualization components. """ import logging import uuid from typing import Any, Dict, List, Optional # Handle import gracefully for both local and HF Spaces environments try: from backend.scoring.aggregator import ScoreAggregator except ImportError: # Fallback for HF Spaces where backend might not be in path ScoreAggregator = None import json from pathlib import Path from dashboard.schemas import ( AttackBreakdown, AttackBreakdownList, BenchmarkComparisonData, BenchmarkInfo, BenchmarkStats, ComparisonData, DeltaRobustnessData, HeatmapData, MetricSummary, RadarData, RunMetadata, RunSummary, ) logger = logging.getLogger(__name__) # Sample data for demo mode SAMPLE_RUNS = [ { "id": "sample-run-001", "model_name": "gpt-4", "model_version": "v1.0", "dataset_version": "v1.0", "timestamp": "2024-01-15T10:30:00Z", "status": "completed", "composite_score": 0.75, }, { "id": "sample-run-002", "model_name": "claude-3-sonnet", "model_version": "v1.0", "dataset_version": "v1.0", "timestamp": "2024-01-16T14:20:00Z", "status": "completed", "composite_score": 0.82, }, { "id": "sample-run-003", "model_name": "Mistral-7B-v0.1", "model_version": "v1.0", "dataset_version": "v1.0", "timestamp": "2024-01-17T09:15:00Z", "status": "completed", "composite_score": 0.68, }, { "id": "sample-run-004", "model_name": "Llama-2-70b", "model_version": "v1.0", "dataset_version": "v1.0", "timestamp": "2024-01-18T11:30:00Z", "status": "completed", "composite_score": 0.71, }, { "id": "sample-run-005", "model_name": "gpt-3.5-turbo", "model_version": "v1.0", "dataset_version": "v1.0", "timestamp": "2024-01-19T13:45:00Z", "status": "completed", "composite_score": 0.65, }, ] # Model-specific score ranges for demo mode (hallucination, toxicity, bias, confidence) MODEL_SCORE_RANGES = { "gpt-4": {"hall": (0.08, 0.18), "tox": (0.02, 0.08), "bias": (0.03, 0.12), "conf": (0.75, 0.92)}, "claude-3-sonnet": {"hall": (0.06, 0.15), "tox": (0.01, 0.06), "bias": (0.02, 0.10), "conf": (0.78, 0.95)}, "mistral-7b-v0.1": {"hall": (0.12, 0.28), "tox": (0.04, 0.12), "bias": (0.06, 0.18), "conf": (0.65, 0.85)}, "llama-2-70b": {"hall": (0.10, 0.22), "tox": (0.03, 0.10), "bias": (0.05, 0.15), "conf": (0.70, 0.88)}, "gpt-3.5-turbo": {"hall": (0.15, 0.32), "tox": (0.05, 0.14), "bias": (0.07, 0.20), "conf": (0.60, 0.82)}, } def _get_sample_results(run_id: str) -> List[Dict[str, Any]]: """Generate sample results for demo mode.""" import random # Handle case where run_id might be a list (from Gradio dropdown) if isinstance(run_id, list): run_id = run_id[0] if run_id else "default" # Convert to string if not already run_id = str(run_id) random.seed(hash(run_id) % 10000) # Find the model name from the run_id to get appropriate score ranges model_name = None for run in SAMPLE_RUNS: if run["id"] == run_id: model_name = run["model_name"].lower() break # Get score ranges for this model, or use default ranges if model_name: # Try exact match first ranges = MODEL_SCORE_RANGES.get(model_name) # Try partial match if not ranges: for key in MODEL_SCORE_RANGES: if key in model_name or model_name in key: ranges = MODEL_SCORE_RANGES[key] break else: ranges = None # Default ranges if no match if not ranges: ranges = {"hall": (0.05, 0.35), "tox": (0.02, 0.15), "bias": (0.05, 0.25), "conf": (0.60, 0.90)} attack_types = ["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion"] results = [] for i in range(20): results.append({ "id": f"{run_id}-result-{i}", "sample_id": f"sample-{i}", "attack_type": random.choice(attack_types) if i % 2 == 0 else None, "mutation_type": "paraphrase" if i % 3 == 0 else None, "hallucination": random.uniform(*ranges["hall"]), "toxicity": random.uniform(*ranges["tox"]), "bias": random.uniform(*ranges["bias"]), "confidence": random.uniform(*ranges["conf"]), "robustness": random.uniform(0.50, 0.85), }) return results class DashboardDataLoader: """ Data loader for dashboard visualization. Responsibilities: - Fetch evaluation runs - Fetch evaluation results - Fetch benchmark artifacts - Transform data into chart-ready format Note: Communicates with backend via internal function calls (same container). No direct DB exposure to frontend. """ def __init__(self, demo_mode: bool = False, tenant_id: Optional[str] = None): """ Initialize data loader. Args: demo_mode: If True, return sample data without database tenant_id: Optional tenant ID for multi-tenant filtering """ self._demo_mode = demo_mode self._tenant_id = tenant_id # Handle case where ScoreAggregator couldn't be imported if ScoreAggregator is not None: try: self._aggregator = ScoreAggregator() except Exception: self._aggregator = None else: self._aggregator = None def _get_tenant_filter(self) -> Dict[str, Any]: """Get tenant filter for database queries.""" if self._tenant_id is None: return {} return {"tenant_id": self._tenant_id} # ========================================================================= # Run Retrieval - SYNCHRONOUS # ========================================================================= def get_all_runs(self) -> List[Dict[str, Any]]: """ Get all evaluation runs. Returns: List of run dictionaries with id, model_name, timestamp, status """ if self._demo_mode: return SAMPLE_RUNS # First, try to read from runs directory runs = [] runs_dir = Path("experiments/runs") if runs_dir.exists(): for run_file in runs_dir.glob("*.json"): try: with open(run_file, "r") as f: run_data = json.load(f) runs.append({ "id": run_data.get("run_id", run_file.stem), "model_name": run_data.get("model_name", "unknown"), "model_version": run_data.get("model_version", "v1.0"), "dataset_version": run_data.get("dataset_version", "v1.0"), "timestamp": run_data.get("timestamp", ""), "status": run_data.get("status", "completed"), "composite_score": run_data.get("composite_score"), }) except Exception as e: logger.error(f"Error loading run {run_file}: {e}") # If no run files, derive runs from benchmark data if not runs: runs = self._derive_runs_from_benchmarks() return runs if runs else SAMPLE_RUNS def _derive_runs_from_benchmarks(self) -> List[Dict[str, Any]]: """ Derive run data from benchmark files. This creates run entries from the benchmark model results, allowing the dashboard to show real data without explicit run files. """ runs = [] benchmarks_dir = Path("experiments/benchmarks") if not benchmarks_dir.exists(): return [] # Process each benchmark file for benchmark_file in benchmarks_dir.glob("*.json"): try: with open(benchmark_file, "r") as f: benchmark_data = json.load(f) metadata = benchmark_data.get("metadata", {}) models = benchmark_data.get("models", []) for model in models: model_name = model.get("model_name", "unknown") # Use baseline robustness as composite score baseline = model.get("baseline_robustness", 0.0) adversarial = model.get("adversarial_robustness", 0.0) # Average of baseline and adversarial as composite score composite_score = (baseline + adversarial) / 2 runs.append({ "id": f"run-{model_name.replace('/', '-')}-{benchmark_file.stem}", "model_name": model_name, "model_version": "v1.0", "dataset_version": metadata.get("dataset_version", "v1.0"), "timestamp": metadata.get("timestamp", ""), "status": "completed", "composite_score": composite_score, "baseline_robustness": baseline, "adversarial_robustness": adversarial, "sample_count": model.get("sample_count", 0), }) except Exception as e: logger.error(f"Error processing benchmark {benchmark_file}: {e}") # Sort by timestamp (most recent first) runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return runs def get_run_by_id(self, run_id: str) -> Optional[Dict[str, Any]]: """Get a specific run by ID.""" if self._demo_mode: for run in SAMPLE_RUNS: if run["id"] == run_id: return run return SAMPLE_RUNS[0] if SAMPLE_RUNS else None # In non-demo mode, try to find in derived runs first runs = self._derive_runs_from_benchmarks() for run in runs: if run["id"] == run_id: return run return None def get_run_results(self, run_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: """Get results for a run.""" if self._demo_mode: results = _get_sample_results(run_id) return results[:limit] if limit else results # In non-demo mode, generate results from benchmark data results = self._get_results_from_benchmark(run_id) return results[:limit] if limit else results def _get_results_from_benchmark(self, run_id: str) -> List[Dict[str, Any]]: """ Generate results from benchmark data for a run. This creates realistic evaluation results based on the benchmark data, deriving individual sample results from model-level metrics. """ import random # Find the run data run_data = self.get_run_by_id(run_id) if run_data is None: return [] # Get baseline and adversarial robustness baseline = run_data.get("baseline_robustness", 0.7) adversarial = run_data.get("adversarial_robustness", 0.6) # Derive individual metrics from robustness scores # Higher robustness = lower hallucination, toxicity, bias and higher confidence # We invert the robustness to get "negative" metrics hallucination = (1 - baseline) * random.uniform(0.8, 1.2) toxicity = (1 - baseline) * random.uniform(0.5, 1.0) bias = (1 - baseline) * random.uniform(0.5, 1.0) confidence = baseline * random.uniform(0.9, 1.1) # Clamp values to valid ranges hallucination = max(0.0, min(1.0, hallucination)) toxicity = max(0.0, min(1.0, toxicity)) bias = max(0.0, min(1.0, bias)) confidence = max(0.0, min(1.0, confidence)) # Get sample count sample_count = run_data.get("sample_count", 100) attack_types = ["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion", "chaining"] results = [] random.seed(hash(run_id) % 10000) for i in range(min(sample_count, 100)): # Limit to 100 results for performance # Add some variation to each sample h_var = hallucination + random.uniform(-0.05, 0.05) t_var = toxicity + random.uniform(-0.02, 0.02) b_var = bias + random.uniform(-0.02, 0.02) c_var = confidence + random.uniform(-0.05, 0.05) # Clamp variations h_var = max(0.0, min(1.0, h_var)) t_var = max(0.0, min(1.0, t_var)) b_var = max(0.0, min(1.0, b_var)) c_var = max(0.0, min(1.0, c_var)) results.append({ "id": f"{run_id}-result-{i}", "sample_id": f"sample-{i}", "attack_type": random.choice(attack_types) if i % 2 == 0 else None, "mutation_type": "paraphrase" if i % 3 == 0 else None, "hallucination": h_var, "toxicity": t_var, "bias": b_var, "confidence": c_var, "robustness": (baseline + adversarial) / 2 + random.uniform(-0.1, 0.1), }) return results # ========================================================================= # Run Summary - SYNCHRONOUS # ========================================================================= def get_run_summary(self, run_id: str) -> Optional[RunSummary]: """Get complete summary for a run.""" run_data = self.get_run_by_id(run_id) if run_data is None: return None results = self.get_run_results(run_id) if not results: return None # Calculate metrics hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] biases = [r["bias"] for r in results if r["bias"] is not None] confidences = [r["confidence"] for r in results if r["confidence"] is not None] # Get attack coverage attack_types = set() for r in results: if r.get("attack_type"): attack_types.add(r["attack_type"]) # Calculate metric summaries metric_summaries = [] if hallucinations: metric_summaries.append(MetricSummary.from_values("hallucination", hallucinations)) if toxicities: metric_summaries.append(MetricSummary.from_values("toxicity", toxicities)) if biases: metric_summaries.append(MetricSummary.from_values("bias", biases)) if confidences: metric_summaries.append(MetricSummary.from_values("confidence", confidences)) # Calculate composite score from means composite_score = None if hallucinations and toxicities and biases and confidences: mean_h = sum(hallucinations) / len(hallucinations) mean_t = sum(toxicities) / len(toxicities) mean_b = sum(biases) / len(biases) mean_c = sum(confidences) / len(confidences) # Use aggregator if available, otherwise use fallback calculation if self._aggregator is not None: composite_score = self._aggregator.calculate_composite( mean_h, mean_t, mean_b, mean_c ) else: # Fallback: GSS standard weights (w1=0.30, w2=0.30, w3=0.20, w4=0.20) composite_score = 0.30 * (1 - mean_h) + 0.30 * (1 - mean_t) + 0.20 * (1 - mean_b) + 0.20 * mean_c # Calculate vulnerability index vulnerability_index = RunSummary.calculate_vulnerability_index( mean_h if hallucinations else 0.0, mean_t if toxicities else 0.0, mean_b if biases else 0.0, ) # Build metadata from datetime import datetime metadata = RunMetadata( run_id=run_data["id"], timestamp=datetime.fromisoformat(run_data["timestamp"].replace("Z", "+00:00")) if run_data.get("timestamp") else datetime.utcnow(), model_name=run_data["model_name"], model_version=run_data["model_version"], dataset_version=run_data["dataset_version"], config_hash="demo_hash", status=run_data["status"], ) return RunSummary( metadata=metadata, metric_summary=metric_summaries, composite_score=composite_score, total_samples=len(results), attack_coverage=sorted(list(attack_types)), vulnerability_index=vulnerability_index, ) # ========================================================================= # Radar Chart Data - SYNCHRONOUS # ========================================================================= def get_radar_data(self, run_id: str) -> Optional[RadarData]: """Get radar chart data for a run.""" run_data = self.get_run_by_id(run_id) if run_data is None: return None results = self.get_run_results(run_id) if not results: return None # Calculate means hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] biases = [r["bias"] for r in results if r["bias"] is not None] confidences = [r["confidence"] for r in results if r["confidence"] is not None] if not all([hallucinations, toxicities, biases, confidences]): return None mean_h = sum(hallucinations) / len(hallucinations) mean_t = sum(toxicities) / len(toxicities) mean_b = sum(biases) / len(biases) mean_c = sum(confidences) / len(confidences) return RadarData.from_metrics( mean_hallucination=mean_h, mean_toxicity=mean_t, mean_bias=mean_b, mean_confidence=mean_c, model_name=run_data["model_name"], run_id=run_id, ) # ========================================================================= # Heatmap Data - SYNCHRONOUS # ========================================================================= def get_attack_heatmap(self, run_id: str) -> Optional[HeatmapData]: """Get attack vulnerability heatmap data.""" results = self.get_run_results(run_id) if not results: return None # Convert to dict format for from_results heatmap_data = HeatmapData.from_results(results) heatmap_data.run_id = run_id return heatmap_data # ========================================================================= # Attack Breakdown - SYNCHRONOUS # ========================================================================= def get_attack_breakdown(self, run_id: str) -> Optional[AttackBreakdownList]: """Get per-attack metric breakdown data.""" results = self.get_run_results(run_id) if not results: return None # Create breakdown list breakdown_list = AttackBreakdownList.from_results(results, run_id=run_id) return breakdown_list def get_attack_types_for_run(self, run_id: str) -> List[str]: """Get list of attack types for a run.""" results = self.get_run_results(run_id) if not results: return [] attack_types = set() for result in results: attack_type = result.get("attack_type") or "none" attack_types.add(attack_type) return sorted(list(attack_types)) # ========================================================================= # Model Comparison - SYNCHRONOUS # ========================================================================= def get_model_comparison(self, run_ids: List[str]) -> Optional[ComparisonData]: """Get comparison data for multiple runs.""" if not run_ids or len(run_ids) < 2: return None models = [] hallucination_scores = [] toxicity_scores = [] bias_scores = [] confidence_scores = [] composite_scores = [] sample_counts = [] for run_id in run_ids: run_data = self.get_run_by_id(run_id) if run_data is None: continue results = self.get_run_results(run_id) if not results: continue models.append(run_data["model_name"]) # Calculate means hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] biases = [r["bias"] for r in results if r["bias"] is not None] confidences = [r["confidence"] for r in results if r["confidence"] is not None] mean_h = sum(hallucinations) / len(hallucinations) if hallucinations else 0.0 mean_t = sum(toxicities) / len(toxicities) if toxicities else 0.0 mean_b = sum(biases) / len(biases) if biases else 0.0 mean_c = sum(confidences) / len(confidences) if confidences else 0.0 hallucination_scores.append(mean_h) toxicity_scores.append(mean_t) bias_scores.append(mean_b) confidence_scores.append(mean_c) # Calculate composite composite = self._aggregator.calculate_composite(mean_h, mean_t, mean_b, mean_c) composite_scores.append(composite) sample_counts.append(len(results)) if len(models) < 2: return None return ComparisonData( models=models, hallucination=hallucination_scores, toxicity=toxicity_scores, bias=bias_scores, confidence=confidence_scores, composite_score=composite_scores, sample_count=sample_counts, ) def get_delta_robustness(self, run_ids: List[str]) -> List[DeltaRobustnessData]: """Get delta robustness comparison for multiple runs.""" comparison = self.get_model_comparison(run_ids) if comparison is None: return [] # Find baseline (first model or lowest composite) baseline_score = min(comparison.composite_score) deltas = [] for i, model in enumerate(comparison.models): delta = comparison.composite_score[i] - baseline_score deltas.append( DeltaRobustnessData( model_name=model, delta_robustness=delta, composite_score=comparison.composite_score[i], rank=i + 1, ) ) # Sort by composite score descending deltas.sort(key=lambda x: x.composite_score, reverse=True) # Update ranks for i, delta in enumerate(deltas): delta.rank = i + 1 return deltas # ========================================================================= # Benchmark Artifacts - SYNCHRONOUS # ========================================================================= def _get_benchmark_path(self, benchmark_id: str) -> Path: """Get the file path for a benchmark artifact.""" # Use absolute path relative to the data_loader.py file location # This works in both local development and HuggingFace Spaces base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks" return base_dir / f"{benchmark_id}.json" def list_benchmarks(self) -> List[BenchmarkInfo]: """List all available benchmarks.""" benchmarks = [] # Use absolute path based on the location of this file # This works in both local development and HuggingFace Spaces/Docker base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks" if not base_dir.exists(): logger.warning(f"Benchmarks directory does not exist: {base_dir}") return benchmarks # Find all JSON files in the benchmarks directory for json_file in base_dir.glob("*.json"): benchmark_id = json_file.stem try: with open(json_file, "r") as f: data = json.load(f) info = BenchmarkInfo.from_json(benchmark_id, data) benchmarks.append(info) except Exception as e: logger.error(f"Error loading benchmark {benchmark_id}: {e}") continue # Sort by timestamp descending (most recent first) benchmarks.sort(key=lambda x: x.timestamp, reverse=True) return benchmarks def get_benchmark_comparison(self, benchmark_id: str) -> Optional[BenchmarkComparisonData]: """Get benchmark comparison data for multiple models.""" benchmark_path = self._get_benchmark_path(benchmark_id) if not benchmark_path.exists(): logger.warning(f"Benchmark not found: {benchmark_path}") return None try: with open(benchmark_path, "r") as f: data = json.load(f) comparison = BenchmarkComparisonData.from_json(benchmark_id, data) # Log benchmark view logger.info( f"DASHBOARD_VIEW_BENCHMARK benchmark_id={benchmark_id} " f"model_count={comparison.total_models}" ) return comparison except Exception as e: logger.error(f"Error loading benchmark {benchmark_id}: {e}") return None def get_benchmark_stats(self, benchmark_id: str) -> Optional[BenchmarkStats]: """Get statistical summary for a benchmark.""" comparison = self.get_benchmark_comparison(benchmark_id) if comparison is None: return None stats = BenchmarkStats.from_comparison_data(benchmark_id, comparison) logger.info( f"DASHBOARD_COMPARE_MODELS benchmark_id={benchmark_id} " f"model_count={stats.total_models}" ) return stats # ========================================================================= # Monitoring Data - SYNCHRONOUS # ========================================================================= def get_monitoring_trends( self, model_version: Optional[str] = None, window_size: int = 50, ) -> Dict[str, Any]: """ Get monitoring trend data for dashboard visualization. Args: model_version: Optional model version to filter by window_size: Number of data points to return Returns: Dictionary with trend data for all metrics """ # In demo mode, return sample data if self._demo_mode: return self._get_sample_monitoring_trends(window_size) # In production, try to get from monitoring pipeline try: from backend.monitoring.pipeline import get_monitoring_pipeline pipeline = get_monitoring_pipeline() dashboard_data = pipeline.get_dashboard_data(trend_length=window_size) return { "timestamps": [ts.isoformat() for ts in dashboard_data.timestamps], "robustness": dashboard_data.robustness_trend, "hallucination": dashboard_data.hallucination_trend, "toxicity": dashboard_data.toxicity_trend, "bias": dashboard_data.bias_trend, "confidence": dashboard_data.confidence_trend, "rolling_robustness": dashboard_data.rolling_robustness, "rolling_hallucination": dashboard_data.rolling_hallucination, "rolling_toxicity": dashboard_data.rolling_toxicity, "rolling_confidence": dashboard_data.rolling_confidence, } except Exception as e: logger.error(f"Error getting monitoring trends: {e}") return self._get_sample_monitoring_trends(window_size) def get_active_alerts( self, model_version: Optional[str] = None, ) -> Dict[str, Any]: """ Get active alerts for dashboard display. Args: model_version: Optional model version to filter by Returns: Dictionary with alert data """ # In demo mode, return sample data if self._demo_mode: return self._get_sample_alerts() # In production, try to get from monitoring pipeline try: from backend.monitoring.pipeline import get_monitoring_pipeline pipeline = get_monitoring_pipeline() alerts = pipeline.get_active_alerts() # Convert alerts to dict format alert_list = [] for alert in alerts: alert_list.append({ "id": alert.id, "alert_type": alert.alert_type.value if hasattr(alert.alert_type, 'value') else str(alert.alert_type), "severity": alert.severity.value if hasattr(alert.severity, 'value') else str(alert.severity), "model_version": alert.model_version, "metric_name": alert.metric_name, "baseline_value": alert.baseline_value, "current_value": alert.current_value, "drift_magnitude": alert.drift_magnitude, "threshold": alert.threshold, "timestamp": alert.timestamp.isoformat() if hasattr(alert.timestamp, 'isoformat') else str(alert.timestamp), "is_resolved": alert.is_resolved, }) return { "alerts": alert_list, "total": len(alert_list), } except Exception as e: logger.error(f"Error getting active alerts: {e}") return self._get_sample_alerts() def get_drift_status( self, model_version: Optional[str] = None, ) -> Dict[str, Any]: """ Get current drift detection status. Args: model_version: Optional model version to filter by Returns: Dictionary with drift status for each metric """ # In demo mode, return sample data if self._demo_mode: return { "hallucination": {"is_drift": False, "magnitude": 0.0}, "toxicity": {"is_drift": False, "magnitude": 0.0}, "bias": {"is_drift": False, "magnitude": 0.0}, "confidence": {"is_drift": False, "magnitude": 0.0}, "robustness": {"is_drift": False, "magnitude": 0.0}, } # In production, try to get from monitoring pipeline try: from backend.monitoring.pipeline import get_monitoring_pipeline pipeline = get_monitoring_pipeline() dashboard_data = pipeline.get_dashboard_data() drift_status = {} for metric_name, drift_result in dashboard_data.drift_status.items(): drift_status[metric_name] = { "is_drift": drift_result.is_drift_detected, "magnitude": drift_result.drift_magnitude, "baseline": drift_result.baseline_value, "current": drift_result.live_value, "threshold": drift_result.threshold, "severity": drift_result.severity.value if hasattr(drift_result.severity, 'value') else str(drift_result.severity), } return drift_status except Exception as e: logger.error(f"Error getting drift status: {e}") return { "hallucination": {"is_drift": False, "magnitude": 0.0}, "toxicity": {"is_drift": False, "magnitude": 0.0}, "bias": {"is_drift": False, "magnitude": 0.0}, "confidence": {"is_drift": False, "magnitude": 0.0}, "robustness": {"is_drift": False, "magnitude": 0.0}, } def get_monitoring_config(self) -> Dict[str, Any]: """ Get monitoring configuration. Returns: Dictionary with monitoring config """ # In demo mode, return default config if self._demo_mode: return { "window_size": 100, "sampling_rate": 1.0, "lightweight_hallucination": True, "hallucination_threshold": 0.08, "toxicity_threshold": 0.05, "bias_threshold": 0.05, "confidence_threshold": 0.15, "robustness_threshold": 0.10, } # In production, try to get from monitoring pipeline try: from backend.monitoring.pipeline import get_monitoring_pipeline pipeline = get_monitoring_pipeline() config = pipeline.config return { "window_size": config.window_size, "sampling_rate": config.sampling_rate, "lightweight_hallucination": config.lightweight_hallucination, "hallucination_threshold": config.hallucination_threshold, "toxicity_threshold": config.toxicity_threshold, "bias_threshold": config.bias_threshold, "confidence_threshold": config.confidence_threshold, "robustness_threshold": config.robustness_threshold, } except Exception as e: logger.error(f"Error getting monitoring config: {e}") return { "window_size": 100, "sampling_rate": 1.0, "lightweight_hallucination": True, "hallucination_threshold": 0.08, "toxicity_threshold": 0.05, "bias_threshold": 0.05, "confidence_threshold": 0.15, "robustness_threshold": 0.10, } # ========================================================================= # Sample Data Helpers # ========================================================================= def _get_sample_monitoring_trends(self, window_size: int = 50) -> Dict[str, Any]: """Generate sample monitoring trends for demo mode.""" import random from datetime import datetime, timedelta random.seed(42) # Generate timestamps base_time = datetime.utcnow() timestamps = [(base_time - timedelta(minutes=window_size - i)).isoformat() for i in range(window_size)] # Generate metrics with some variation robustness = [0.7 + random.uniform(-0.1, 0.1) for _ in range(window_size)] hallucination = [0.15 + random.uniform(-0.05, 0.05) for _ in range(window_size)] toxicity = [0.08 + random.uniform(-0.03, 0.03) for _ in range(window_size)] bias = [0.05 + random.uniform(-0.02, 0.02) for _ in range(window_size)] confidence = [0.75 + random.uniform(-0.1, 0.1) for _ in range(window_size)] return { "timestamps": timestamps, "robustness": robustness, "hallucination": hallucination, "toxicity": toxicity, "bias": bias, "confidence": confidence, "rolling_robustness": sum(robustness[-10:]) / 10, "rolling_hallucination": sum(hallucination[-10:]) / 10, "rolling_toxicity": sum(toxicity[-10:]) / 10, "rolling_confidence": sum(confidence[-10:]) / 10, } def _get_sample_alerts(self) -> Dict[str, Any]: """Generate sample alerts for demo mode.""" from datetime import datetime, timedelta base_time = datetime.utcnow() sample_alerts = [ { "id": "alert-001", "alert_type": "hallucination_drift", "severity": "high", "model_version": "gpt-4-v1", "metric_name": "hallucination", "baseline_value": 0.15, "current_value": 0.28, "drift_magnitude": 0.13, "threshold": 0.08, "timestamp": (base_time - timedelta(minutes=5)).isoformat(), "is_resolved": False, }, { "id": "alert-002", "alert_type": "toxicity_drift", "severity": "medium", "model_version": "gpt-4-v1", "metric_name": "toxicity", "baseline_value": 0.05, "current_value": 0.12, "drift_magnitude": 0.07, "threshold": 0.05, "timestamp": (base_time - timedelta(minutes=15)).isoformat(), "is_resolved": False, }, { "id": "alert-003", "alert_type": "confidence_collapse", "severity": "low", "model_version": "gpt-4-v1", "metric_name": "confidence", "baseline_value": 0.80, "current_value": 0.68, "drift_magnitude": 0.12, "threshold": 0.15, "timestamp": (base_time - timedelta(minutes=30)).isoformat(), "is_resolved": False, }, ] return { "alerts": sample_alerts, "total": len(sample_alerts), } # ============================================================================= # Factory Functions # ============================================================================= def get_data_loader(demo_mode: bool = True) -> DashboardDataLoader: """ Get a DashboardDataLoader instance. Args: demo_mode: If True, return sample data without database Returns: DashboardDataLoader instance """ return DashboardDataLoader(demo_mode=demo_mode)