| """ |
| Dashboard Data Loader |
| |
| Handles data retrieval from the backend database and transforms |
| data into chart-ready formats for dashboard visualization. |
| |
| This layer abstracts database queries and provides clean interfaces |
| for the visualization components. |
| """ |
|
|
| import logging |
| import uuid |
| from typing import Any, Dict, List, Optional |
|
|
| |
| try: |
| from backend.scoring.aggregator import ScoreAggregator |
| except ImportError: |
| |
| ScoreAggregator = None |
|
|
| import json |
| from pathlib import Path |
|
|
| from dashboard.schemas import ( |
| AttackBreakdown, |
| AttackBreakdownList, |
| BenchmarkComparisonData, |
| BenchmarkInfo, |
| BenchmarkStats, |
| ComparisonData, |
| DeltaRobustnessData, |
| HeatmapData, |
| MetricSummary, |
| RadarData, |
| RunMetadata, |
| RunSummary, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| SAMPLE_RUNS = [ |
| { |
| "id": "sample-run-001", |
| "model_name": "gpt-4", |
| "model_version": "v1.0", |
| "dataset_version": "v1.0", |
| "timestamp": "2024-01-15T10:30:00Z", |
| "status": "completed", |
| "composite_score": 0.75, |
| }, |
| { |
| "id": "sample-run-002", |
| "model_name": "claude-3-sonnet", |
| "model_version": "v1.0", |
| "dataset_version": "v1.0", |
| "timestamp": "2024-01-16T14:20:00Z", |
| "status": "completed", |
| "composite_score": 0.82, |
| }, |
| { |
| "id": "sample-run-003", |
| "model_name": "Mistral-7B-v0.1", |
| "model_version": "v1.0", |
| "dataset_version": "v1.0", |
| "timestamp": "2024-01-17T09:15:00Z", |
| "status": "completed", |
| "composite_score": 0.68, |
| }, |
| { |
| "id": "sample-run-004", |
| "model_name": "Llama-2-70b", |
| "model_version": "v1.0", |
| "dataset_version": "v1.0", |
| "timestamp": "2024-01-18T11:30:00Z", |
| "status": "completed", |
| "composite_score": 0.71, |
| }, |
| { |
| "id": "sample-run-005", |
| "model_name": "gpt-3.5-turbo", |
| "model_version": "v1.0", |
| "dataset_version": "v1.0", |
| "timestamp": "2024-01-19T13:45:00Z", |
| "status": "completed", |
| "composite_score": 0.65, |
| }, |
| ] |
|
|
|
|
| |
| MODEL_SCORE_RANGES = { |
| "gpt-4": {"hall": (0.08, 0.18), "tox": (0.02, 0.08), "bias": (0.03, 0.12), "conf": (0.75, 0.92)}, |
| "claude-3-sonnet": {"hall": (0.06, 0.15), "tox": (0.01, 0.06), "bias": (0.02, 0.10), "conf": (0.78, 0.95)}, |
| "mistral-7b-v0.1": {"hall": (0.12, 0.28), "tox": (0.04, 0.12), "bias": (0.06, 0.18), "conf": (0.65, 0.85)}, |
| "llama-2-70b": {"hall": (0.10, 0.22), "tox": (0.03, 0.10), "bias": (0.05, 0.15), "conf": (0.70, 0.88)}, |
| "gpt-3.5-turbo": {"hall": (0.15, 0.32), "tox": (0.05, 0.14), "bias": (0.07, 0.20), "conf": (0.60, 0.82)}, |
| } |
|
|
| def _get_sample_results(run_id: str) -> List[Dict[str, Any]]: |
| """Generate sample results for demo mode.""" |
| import random |
| |
| |
| if isinstance(run_id, list): |
| run_id = run_id[0] if run_id else "default" |
| |
| |
| run_id = str(run_id) |
| |
| random.seed(hash(run_id) % 10000) |
| |
| |
| model_name = None |
| for run in SAMPLE_RUNS: |
| if run["id"] == run_id: |
| model_name = run["model_name"].lower() |
| break |
| |
| |
| if model_name: |
| |
| ranges = MODEL_SCORE_RANGES.get(model_name) |
| |
| if not ranges: |
| for key in MODEL_SCORE_RANGES: |
| if key in model_name or model_name in key: |
| ranges = MODEL_SCORE_RANGES[key] |
| break |
| else: |
| ranges = None |
| |
| |
| if not ranges: |
| ranges = {"hall": (0.05, 0.35), "tox": (0.02, 0.15), "bias": (0.05, 0.25), "conf": (0.60, 0.90)} |
| |
| attack_types = ["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion"] |
| results = [] |
| |
| for i in range(20): |
| results.append({ |
| "id": f"{run_id}-result-{i}", |
| "sample_id": f"sample-{i}", |
| "attack_type": random.choice(attack_types) if i % 2 == 0 else None, |
| "mutation_type": "paraphrase" if i % 3 == 0 else None, |
| "hallucination": random.uniform(*ranges["hall"]), |
| "toxicity": random.uniform(*ranges["tox"]), |
| "bias": random.uniform(*ranges["bias"]), |
| "confidence": random.uniform(*ranges["conf"]), |
| "robustness": random.uniform(0.50, 0.85), |
| }) |
| |
| return results |
|
|
|
|
| class DashboardDataLoader: |
| """ |
| Data loader for dashboard visualization. |
| |
| Responsibilities: |
| - Fetch evaluation runs |
| - Fetch evaluation results |
| - Fetch benchmark artifacts |
| - Transform data into chart-ready format |
| |
| Note: Communicates with backend via internal function calls (same container). |
| No direct DB exposure to frontend. |
| """ |
|
|
| def __init__(self, demo_mode: bool = False, tenant_id: Optional[str] = None): |
| """ |
| Initialize data loader. |
| |
| Args: |
| demo_mode: If True, return sample data without database |
| tenant_id: Optional tenant ID for multi-tenant filtering |
| """ |
| self._demo_mode = demo_mode |
| self._tenant_id = tenant_id |
| |
| if ScoreAggregator is not None: |
| try: |
| self._aggregator = ScoreAggregator() |
| except Exception: |
| self._aggregator = None |
| else: |
| self._aggregator = None |
| |
| def _get_tenant_filter(self) -> Dict[str, Any]: |
| """Get tenant filter for database queries.""" |
| if self._tenant_id is None: |
| return {} |
| return {"tenant_id": self._tenant_id} |
|
|
| |
| |
| |
|
|
| def get_all_runs(self) -> List[Dict[str, Any]]: |
| """ |
| Get all evaluation runs. |
| |
| Returns: |
| List of run dictionaries with id, model_name, timestamp, status |
| """ |
| if self._demo_mode: |
| return SAMPLE_RUNS |
| |
| |
| runs = [] |
| runs_dir = Path("experiments/runs") |
| |
| if runs_dir.exists(): |
| for run_file in runs_dir.glob("*.json"): |
| try: |
| with open(run_file, "r") as f: |
| run_data = json.load(f) |
| runs.append({ |
| "id": run_data.get("run_id", run_file.stem), |
| "model_name": run_data.get("model_name", "unknown"), |
| "model_version": run_data.get("model_version", "v1.0"), |
| "dataset_version": run_data.get("dataset_version", "v1.0"), |
| "timestamp": run_data.get("timestamp", ""), |
| "status": run_data.get("status", "completed"), |
| "composite_score": run_data.get("composite_score"), |
| }) |
| except Exception as e: |
| logger.error(f"Error loading run {run_file}: {e}") |
| |
| |
| if not runs: |
| runs = self._derive_runs_from_benchmarks() |
| |
| return runs if runs else SAMPLE_RUNS |
| |
| def _derive_runs_from_benchmarks(self) -> List[Dict[str, Any]]: |
| """ |
| Derive run data from benchmark files. |
| |
| This creates run entries from the benchmark model results, |
| allowing the dashboard to show real data without explicit run files. |
| """ |
| runs = [] |
| benchmarks_dir = Path("experiments/benchmarks") |
| |
| if not benchmarks_dir.exists(): |
| return [] |
| |
| |
| for benchmark_file in benchmarks_dir.glob("*.json"): |
| try: |
| with open(benchmark_file, "r") as f: |
| benchmark_data = json.load(f) |
| |
| metadata = benchmark_data.get("metadata", {}) |
| models = benchmark_data.get("models", []) |
| |
| for model in models: |
| model_name = model.get("model_name", "unknown") |
| |
| baseline = model.get("baseline_robustness", 0.0) |
| adversarial = model.get("adversarial_robustness", 0.0) |
| |
| composite_score = (baseline + adversarial) / 2 |
| |
| runs.append({ |
| "id": f"run-{model_name.replace('/', '-')}-{benchmark_file.stem}", |
| "model_name": model_name, |
| "model_version": "v1.0", |
| "dataset_version": metadata.get("dataset_version", "v1.0"), |
| "timestamp": metadata.get("timestamp", ""), |
| "status": "completed", |
| "composite_score": composite_score, |
| "baseline_robustness": baseline, |
| "adversarial_robustness": adversarial, |
| "sample_count": model.get("sample_count", 0), |
| }) |
| except Exception as e: |
| logger.error(f"Error processing benchmark {benchmark_file}: {e}") |
| |
| |
| runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True) |
| |
| return runs |
|
|
| def get_run_by_id(self, run_id: str) -> Optional[Dict[str, Any]]: |
| """Get a specific run by ID.""" |
| if self._demo_mode: |
| for run in SAMPLE_RUNS: |
| if run["id"] == run_id: |
| return run |
| return SAMPLE_RUNS[0] if SAMPLE_RUNS else None |
| |
| |
| runs = self._derive_runs_from_benchmarks() |
| for run in runs: |
| if run["id"] == run_id: |
| return run |
| |
| return None |
|
|
| def get_run_results(self, run_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: |
| """Get results for a run.""" |
| if self._demo_mode: |
| results = _get_sample_results(run_id) |
| return results[:limit] if limit else results |
| |
| |
| results = self._get_results_from_benchmark(run_id) |
| return results[:limit] if limit else results |
| |
| def _get_results_from_benchmark(self, run_id: str) -> List[Dict[str, Any]]: |
| """ |
| Generate results from benchmark data for a run. |
| |
| This creates realistic evaluation results based on the benchmark data, |
| deriving individual sample results from model-level metrics. |
| """ |
| import random |
| |
| |
| run_data = self.get_run_by_id(run_id) |
| if run_data is None: |
| return [] |
| |
| |
| baseline = run_data.get("baseline_robustness", 0.7) |
| adversarial = run_data.get("adversarial_robustness", 0.6) |
| |
| |
| |
| |
| hallucination = (1 - baseline) * random.uniform(0.8, 1.2) |
| toxicity = (1 - baseline) * random.uniform(0.5, 1.0) |
| bias = (1 - baseline) * random.uniform(0.5, 1.0) |
| confidence = baseline * random.uniform(0.9, 1.1) |
| |
| |
| hallucination = max(0.0, min(1.0, hallucination)) |
| toxicity = max(0.0, min(1.0, toxicity)) |
| bias = max(0.0, min(1.0, bias)) |
| confidence = max(0.0, min(1.0, confidence)) |
| |
| |
| sample_count = run_data.get("sample_count", 100) |
| |
| attack_types = ["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion", "chaining"] |
| results = [] |
| |
| random.seed(hash(run_id) % 10000) |
| |
| for i in range(min(sample_count, 100)): |
| |
| h_var = hallucination + random.uniform(-0.05, 0.05) |
| t_var = toxicity + random.uniform(-0.02, 0.02) |
| b_var = bias + random.uniform(-0.02, 0.02) |
| c_var = confidence + random.uniform(-0.05, 0.05) |
| |
| |
| h_var = max(0.0, min(1.0, h_var)) |
| t_var = max(0.0, min(1.0, t_var)) |
| b_var = max(0.0, min(1.0, b_var)) |
| c_var = max(0.0, min(1.0, c_var)) |
| |
| results.append({ |
| "id": f"{run_id}-result-{i}", |
| "sample_id": f"sample-{i}", |
| "attack_type": random.choice(attack_types) if i % 2 == 0 else None, |
| "mutation_type": "paraphrase" if i % 3 == 0 else None, |
| "hallucination": h_var, |
| "toxicity": t_var, |
| "bias": b_var, |
| "confidence": c_var, |
| "robustness": (baseline + adversarial) / 2 + random.uniform(-0.1, 0.1), |
| }) |
| |
| return results |
|
|
| |
| |
| |
|
|
| def get_run_summary(self, run_id: str) -> Optional[RunSummary]: |
| """Get complete summary for a run.""" |
| run_data = self.get_run_by_id(run_id) |
| if run_data is None: |
| return None |
| |
| results = self.get_run_results(run_id) |
| |
| if not results: |
| return None |
| |
| |
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] |
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] |
| biases = [r["bias"] for r in results if r["bias"] is not None] |
| confidences = [r["confidence"] for r in results if r["confidence"] is not None] |
| |
| |
| attack_types = set() |
| for r in results: |
| if r.get("attack_type"): |
| attack_types.add(r["attack_type"]) |
| |
| |
| metric_summaries = [] |
| |
| if hallucinations: |
| metric_summaries.append(MetricSummary.from_values("hallucination", hallucinations)) |
| if toxicities: |
| metric_summaries.append(MetricSummary.from_values("toxicity", toxicities)) |
| if biases: |
| metric_summaries.append(MetricSummary.from_values("bias", biases)) |
| if confidences: |
| metric_summaries.append(MetricSummary.from_values("confidence", confidences)) |
| |
| |
| composite_score = None |
| if hallucinations and toxicities and biases and confidences: |
| mean_h = sum(hallucinations) / len(hallucinations) |
| mean_t = sum(toxicities) / len(toxicities) |
| mean_b = sum(biases) / len(biases) |
| mean_c = sum(confidences) / len(confidences) |
| |
| if self._aggregator is not None: |
| composite_score = self._aggregator.calculate_composite( |
| mean_h, mean_t, mean_b, mean_c |
| ) |
| else: |
| |
| composite_score = 0.30 * (1 - mean_h) + 0.30 * (1 - mean_t) + 0.20 * (1 - mean_b) + 0.20 * mean_c |
| |
| |
| vulnerability_index = RunSummary.calculate_vulnerability_index( |
| mean_h if hallucinations else 0.0, |
| mean_t if toxicities else 0.0, |
| mean_b if biases else 0.0, |
| ) |
| |
| |
| from datetime import datetime |
| |
| metadata = RunMetadata( |
| run_id=run_data["id"], |
| timestamp=datetime.fromisoformat(run_data["timestamp"].replace("Z", "+00:00")) if run_data.get("timestamp") else datetime.utcnow(), |
| model_name=run_data["model_name"], |
| model_version=run_data["model_version"], |
| dataset_version=run_data["dataset_version"], |
| config_hash="demo_hash", |
| status=run_data["status"], |
| ) |
| |
| return RunSummary( |
| metadata=metadata, |
| metric_summary=metric_summaries, |
| composite_score=composite_score, |
| total_samples=len(results), |
| attack_coverage=sorted(list(attack_types)), |
| vulnerability_index=vulnerability_index, |
| ) |
|
|
| |
| |
| |
|
|
| def get_radar_data(self, run_id: str) -> Optional[RadarData]: |
| """Get radar chart data for a run.""" |
| run_data = self.get_run_by_id(run_id) |
| if run_data is None: |
| return None |
| |
| results = self.get_run_results(run_id) |
| |
| if not results: |
| return None |
| |
| |
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] |
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] |
| biases = [r["bias"] for r in results if r["bias"] is not None] |
| confidences = [r["confidence"] for r in results if r["confidence"] is not None] |
| |
| if not all([hallucinations, toxicities, biases, confidences]): |
| return None |
| |
| mean_h = sum(hallucinations) / len(hallucinations) |
| mean_t = sum(toxicities) / len(toxicities) |
| mean_b = sum(biases) / len(biases) |
| mean_c = sum(confidences) / len(confidences) |
| |
| return RadarData.from_metrics( |
| mean_hallucination=mean_h, |
| mean_toxicity=mean_t, |
| mean_bias=mean_b, |
| mean_confidence=mean_c, |
| model_name=run_data["model_name"], |
| run_id=run_id, |
| ) |
|
|
| |
| |
| |
|
|
| def get_attack_heatmap(self, run_id: str) -> Optional[HeatmapData]: |
| """Get attack vulnerability heatmap data.""" |
| results = self.get_run_results(run_id) |
| |
| if not results: |
| return None |
| |
| |
| heatmap_data = HeatmapData.from_results(results) |
| heatmap_data.run_id = run_id |
| return heatmap_data |
|
|
| |
| |
| |
|
|
| def get_attack_breakdown(self, run_id: str) -> Optional[AttackBreakdownList]: |
| """Get per-attack metric breakdown data.""" |
| results = self.get_run_results(run_id) |
| |
| if not results: |
| return None |
| |
| |
| breakdown_list = AttackBreakdownList.from_results(results, run_id=run_id) |
| return breakdown_list |
|
|
| def get_attack_types_for_run(self, run_id: str) -> List[str]: |
| """Get list of attack types for a run.""" |
| results = self.get_run_results(run_id) |
| |
| if not results: |
| return [] |
| |
| attack_types = set() |
| for result in results: |
| attack_type = result.get("attack_type") or "none" |
| attack_types.add(attack_type) |
| |
| return sorted(list(attack_types)) |
|
|
| |
| |
| |
|
|
| def get_model_comparison(self, run_ids: List[str]) -> Optional[ComparisonData]: |
| """Get comparison data for multiple runs.""" |
| if not run_ids or len(run_ids) < 2: |
| return None |
| |
| models = [] |
| hallucination_scores = [] |
| toxicity_scores = [] |
| bias_scores = [] |
| confidence_scores = [] |
| composite_scores = [] |
| sample_counts = [] |
| |
| for run_id in run_ids: |
| run_data = self.get_run_by_id(run_id) |
| if run_data is None: |
| continue |
| |
| results = self.get_run_results(run_id) |
| if not results: |
| continue |
| |
| models.append(run_data["model_name"]) |
| |
| |
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None] |
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None] |
| biases = [r["bias"] for r in results if r["bias"] is not None] |
| confidences = [r["confidence"] for r in results if r["confidence"] is not None] |
| |
| mean_h = sum(hallucinations) / len(hallucinations) if hallucinations else 0.0 |
| mean_t = sum(toxicities) / len(toxicities) if toxicities else 0.0 |
| mean_b = sum(biases) / len(biases) if biases else 0.0 |
| mean_c = sum(confidences) / len(confidences) if confidences else 0.0 |
| |
| hallucination_scores.append(mean_h) |
| toxicity_scores.append(mean_t) |
| bias_scores.append(mean_b) |
| confidence_scores.append(mean_c) |
| |
| |
| composite = self._aggregator.calculate_composite(mean_h, mean_t, mean_b, mean_c) |
| composite_scores.append(composite) |
| |
| sample_counts.append(len(results)) |
| |
| if len(models) < 2: |
| return None |
| |
| return ComparisonData( |
| models=models, |
| hallucination=hallucination_scores, |
| toxicity=toxicity_scores, |
| bias=bias_scores, |
| confidence=confidence_scores, |
| composite_score=composite_scores, |
| sample_count=sample_counts, |
| ) |
|
|
| def get_delta_robustness(self, run_ids: List[str]) -> List[DeltaRobustnessData]: |
| """Get delta robustness comparison for multiple runs.""" |
| comparison = self.get_model_comparison(run_ids) |
| |
| if comparison is None: |
| return [] |
| |
| |
| baseline_score = min(comparison.composite_score) |
| |
| deltas = [] |
| for i, model in enumerate(comparison.models): |
| delta = comparison.composite_score[i] - baseline_score |
| deltas.append( |
| DeltaRobustnessData( |
| model_name=model, |
| delta_robustness=delta, |
| composite_score=comparison.composite_score[i], |
| rank=i + 1, |
| ) |
| ) |
| |
| |
| deltas.sort(key=lambda x: x.composite_score, reverse=True) |
| |
| |
| for i, delta in enumerate(deltas): |
| delta.rank = i + 1 |
| |
| return deltas |
|
|
| |
| |
| |
|
|
| def _get_benchmark_path(self, benchmark_id: str) -> Path: |
| """Get the file path for a benchmark artifact.""" |
| |
| |
| base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks" |
| return base_dir / f"{benchmark_id}.json" |
|
|
| def list_benchmarks(self) -> List[BenchmarkInfo]: |
| """List all available benchmarks.""" |
| benchmarks = [] |
| |
| |
| |
| base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks" |
| |
| if not base_dir.exists(): |
| logger.warning(f"Benchmarks directory does not exist: {base_dir}") |
| return benchmarks |
| |
| |
| for json_file in base_dir.glob("*.json"): |
| benchmark_id = json_file.stem |
| try: |
| with open(json_file, "r") as f: |
| data = json.load(f) |
| |
| info = BenchmarkInfo.from_json(benchmark_id, data) |
| benchmarks.append(info) |
| except Exception as e: |
| logger.error(f"Error loading benchmark {benchmark_id}: {e}") |
| continue |
| |
| |
| benchmarks.sort(key=lambda x: x.timestamp, reverse=True) |
| |
| return benchmarks |
|
|
| def get_benchmark_comparison(self, benchmark_id: str) -> Optional[BenchmarkComparisonData]: |
| """Get benchmark comparison data for multiple models.""" |
| benchmark_path = self._get_benchmark_path(benchmark_id) |
| |
| if not benchmark_path.exists(): |
| logger.warning(f"Benchmark not found: {benchmark_path}") |
| return None |
| |
| try: |
| with open(benchmark_path, "r") as f: |
| data = json.load(f) |
| |
| comparison = BenchmarkComparisonData.from_json(benchmark_id, data) |
| |
| |
| logger.info( |
| f"DASHBOARD_VIEW_BENCHMARK benchmark_id={benchmark_id} " |
| f"model_count={comparison.total_models}" |
| ) |
| |
| return comparison |
| except Exception as e: |
| logger.error(f"Error loading benchmark {benchmark_id}: {e}") |
| return None |
|
|
| def get_benchmark_stats(self, benchmark_id: str) -> Optional[BenchmarkStats]: |
| """Get statistical summary for a benchmark.""" |
| comparison = self.get_benchmark_comparison(benchmark_id) |
| |
| if comparison is None: |
| return None |
| |
| stats = BenchmarkStats.from_comparison_data(benchmark_id, comparison) |
| |
| logger.info( |
| f"DASHBOARD_COMPARE_MODELS benchmark_id={benchmark_id} " |
| f"model_count={stats.total_models}" |
| ) |
| |
| return stats |
|
|
| |
| |
| |
|
|
| def get_monitoring_trends( |
| self, |
| model_version: Optional[str] = None, |
| window_size: int = 50, |
| ) -> Dict[str, Any]: |
| """ |
| Get monitoring trend data for dashboard visualization. |
| |
| Args: |
| model_version: Optional model version to filter by |
| window_size: Number of data points to return |
| |
| Returns: |
| Dictionary with trend data for all metrics |
| """ |
| |
| if self._demo_mode: |
| return self._get_sample_monitoring_trends(window_size) |
| |
| |
| try: |
| from backend.monitoring.pipeline import get_monitoring_pipeline |
| |
| pipeline = get_monitoring_pipeline() |
| dashboard_data = pipeline.get_dashboard_data(trend_length=window_size) |
| |
| return { |
| "timestamps": [ts.isoformat() for ts in dashboard_data.timestamps], |
| "robustness": dashboard_data.robustness_trend, |
| "hallucination": dashboard_data.hallucination_trend, |
| "toxicity": dashboard_data.toxicity_trend, |
| "bias": dashboard_data.bias_trend, |
| "confidence": dashboard_data.confidence_trend, |
| "rolling_robustness": dashboard_data.rolling_robustness, |
| "rolling_hallucination": dashboard_data.rolling_hallucination, |
| "rolling_toxicity": dashboard_data.rolling_toxicity, |
| "rolling_confidence": dashboard_data.rolling_confidence, |
| } |
| except Exception as e: |
| logger.error(f"Error getting monitoring trends: {e}") |
| return self._get_sample_monitoring_trends(window_size) |
|
|
| def get_active_alerts( |
| self, |
| model_version: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Get active alerts for dashboard display. |
| |
| Args: |
| model_version: Optional model version to filter by |
| |
| Returns: |
| Dictionary with alert data |
| """ |
| |
| if self._demo_mode: |
| return self._get_sample_alerts() |
| |
| |
| try: |
| from backend.monitoring.pipeline import get_monitoring_pipeline |
| |
| pipeline = get_monitoring_pipeline() |
| alerts = pipeline.get_active_alerts() |
| |
| |
| alert_list = [] |
| for alert in alerts: |
| alert_list.append({ |
| "id": alert.id, |
| "alert_type": alert.alert_type.value if hasattr(alert.alert_type, 'value') else str(alert.alert_type), |
| "severity": alert.severity.value if hasattr(alert.severity, 'value') else str(alert.severity), |
| "model_version": alert.model_version, |
| "metric_name": alert.metric_name, |
| "baseline_value": alert.baseline_value, |
| "current_value": alert.current_value, |
| "drift_magnitude": alert.drift_magnitude, |
| "threshold": alert.threshold, |
| "timestamp": alert.timestamp.isoformat() if hasattr(alert.timestamp, 'isoformat') else str(alert.timestamp), |
| "is_resolved": alert.is_resolved, |
| }) |
| |
| return { |
| "alerts": alert_list, |
| "total": len(alert_list), |
| } |
| except Exception as e: |
| logger.error(f"Error getting active alerts: {e}") |
| return self._get_sample_alerts() |
|
|
| def get_drift_status( |
| self, |
| model_version: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Get current drift detection status. |
| |
| Args: |
| model_version: Optional model version to filter by |
| |
| Returns: |
| Dictionary with drift status for each metric |
| """ |
| |
| if self._demo_mode: |
| return { |
| "hallucination": {"is_drift": False, "magnitude": 0.0}, |
| "toxicity": {"is_drift": False, "magnitude": 0.0}, |
| "bias": {"is_drift": False, "magnitude": 0.0}, |
| "confidence": {"is_drift": False, "magnitude": 0.0}, |
| "robustness": {"is_drift": False, "magnitude": 0.0}, |
| } |
| |
| |
| try: |
| from backend.monitoring.pipeline import get_monitoring_pipeline |
| |
| pipeline = get_monitoring_pipeline() |
| dashboard_data = pipeline.get_dashboard_data() |
| |
| drift_status = {} |
| for metric_name, drift_result in dashboard_data.drift_status.items(): |
| drift_status[metric_name] = { |
| "is_drift": drift_result.is_drift_detected, |
| "magnitude": drift_result.drift_magnitude, |
| "baseline": drift_result.baseline_value, |
| "current": drift_result.live_value, |
| "threshold": drift_result.threshold, |
| "severity": drift_result.severity.value if hasattr(drift_result.severity, 'value') else str(drift_result.severity), |
| } |
| |
| return drift_status |
| except Exception as e: |
| logger.error(f"Error getting drift status: {e}") |
| return { |
| "hallucination": {"is_drift": False, "magnitude": 0.0}, |
| "toxicity": {"is_drift": False, "magnitude": 0.0}, |
| "bias": {"is_drift": False, "magnitude": 0.0}, |
| "confidence": {"is_drift": False, "magnitude": 0.0}, |
| "robustness": {"is_drift": False, "magnitude": 0.0}, |
| } |
|
|
| def get_monitoring_config(self) -> Dict[str, Any]: |
| """ |
| Get monitoring configuration. |
| |
| Returns: |
| Dictionary with monitoring config |
| """ |
| |
| if self._demo_mode: |
| return { |
| "window_size": 100, |
| "sampling_rate": 1.0, |
| "lightweight_hallucination": True, |
| "hallucination_threshold": 0.08, |
| "toxicity_threshold": 0.05, |
| "bias_threshold": 0.05, |
| "confidence_threshold": 0.15, |
| "robustness_threshold": 0.10, |
| } |
| |
| |
| try: |
| from backend.monitoring.pipeline import get_monitoring_pipeline |
| |
| pipeline = get_monitoring_pipeline() |
| config = pipeline.config |
| |
| return { |
| "window_size": config.window_size, |
| "sampling_rate": config.sampling_rate, |
| "lightweight_hallucination": config.lightweight_hallucination, |
| "hallucination_threshold": config.hallucination_threshold, |
| "toxicity_threshold": config.toxicity_threshold, |
| "bias_threshold": config.bias_threshold, |
| "confidence_threshold": config.confidence_threshold, |
| "robustness_threshold": config.robustness_threshold, |
| } |
| except Exception as e: |
| logger.error(f"Error getting monitoring config: {e}") |
| return { |
| "window_size": 100, |
| "sampling_rate": 1.0, |
| "lightweight_hallucination": True, |
| "hallucination_threshold": 0.08, |
| "toxicity_threshold": 0.05, |
| "bias_threshold": 0.05, |
| "confidence_threshold": 0.15, |
| "robustness_threshold": 0.10, |
| } |
|
|
| |
| |
| |
|
|
| def _get_sample_monitoring_trends(self, window_size: int = 50) -> Dict[str, Any]: |
| """Generate sample monitoring trends for demo mode.""" |
| import random |
| from datetime import datetime, timedelta |
| |
| random.seed(42) |
| |
| |
| base_time = datetime.utcnow() |
| timestamps = [(base_time - timedelta(minutes=window_size - i)).isoformat() for i in range(window_size)] |
| |
| |
| robustness = [0.7 + random.uniform(-0.1, 0.1) for _ in range(window_size)] |
| hallucination = [0.15 + random.uniform(-0.05, 0.05) for _ in range(window_size)] |
| toxicity = [0.08 + random.uniform(-0.03, 0.03) for _ in range(window_size)] |
| bias = [0.05 + random.uniform(-0.02, 0.02) for _ in range(window_size)] |
| confidence = [0.75 + random.uniform(-0.1, 0.1) for _ in range(window_size)] |
| |
| return { |
| "timestamps": timestamps, |
| "robustness": robustness, |
| "hallucination": hallucination, |
| "toxicity": toxicity, |
| "bias": bias, |
| "confidence": confidence, |
| "rolling_robustness": sum(robustness[-10:]) / 10, |
| "rolling_hallucination": sum(hallucination[-10:]) / 10, |
| "rolling_toxicity": sum(toxicity[-10:]) / 10, |
| "rolling_confidence": sum(confidence[-10:]) / 10, |
| } |
|
|
| def _get_sample_alerts(self) -> Dict[str, Any]: |
| """Generate sample alerts for demo mode.""" |
| from datetime import datetime, timedelta |
| |
| base_time = datetime.utcnow() |
| |
| sample_alerts = [ |
| { |
| "id": "alert-001", |
| "alert_type": "hallucination_drift", |
| "severity": "high", |
| "model_version": "gpt-4-v1", |
| "metric_name": "hallucination", |
| "baseline_value": 0.15, |
| "current_value": 0.28, |
| "drift_magnitude": 0.13, |
| "threshold": 0.08, |
| "timestamp": (base_time - timedelta(minutes=5)).isoformat(), |
| "is_resolved": False, |
| }, |
| { |
| "id": "alert-002", |
| "alert_type": "toxicity_drift", |
| "severity": "medium", |
| "model_version": "gpt-4-v1", |
| "metric_name": "toxicity", |
| "baseline_value": 0.05, |
| "current_value": 0.12, |
| "drift_magnitude": 0.07, |
| "threshold": 0.05, |
| "timestamp": (base_time - timedelta(minutes=15)).isoformat(), |
| "is_resolved": False, |
| }, |
| { |
| "id": "alert-003", |
| "alert_type": "confidence_collapse", |
| "severity": "low", |
| "model_version": "gpt-4-v1", |
| "metric_name": "confidence", |
| "baseline_value": 0.80, |
| "current_value": 0.68, |
| "drift_magnitude": 0.12, |
| "threshold": 0.15, |
| "timestamp": (base_time - timedelta(minutes=30)).isoformat(), |
| "is_resolved": False, |
| }, |
| ] |
| |
| return { |
| "alerts": sample_alerts, |
| "total": len(sample_alerts), |
| } |
|
|
|
|
| |
| |
| |
|
|
|
|
| def get_data_loader(demo_mode: bool = True) -> DashboardDataLoader: |
| """ |
| Get a DashboardDataLoader instance. |
| |
| Args: |
| demo_mode: If True, return sample data without database |
| |
| Returns: |
| DashboardDataLoader instance |
| """ |
| return DashboardDataLoader(demo_mode=demo_mode) |
|
|