| """
|
| Dashboard Data Loader
|
|
|
| Handles data retrieval from the backend database and transforms
|
| data into chart-ready formats for dashboard visualization.
|
|
|
| This layer abstracts database queries and provides clean interfaces
|
| for the visualization components.
|
| """
|
|
|
| import logging
|
| import uuid
|
| from typing import Any, Dict, List, Optional
|
|
|
| from backend.scoring.aggregator import ScoreAggregator
|
|
|
| import json
|
| from pathlib import Path
|
|
|
| from dashboard.schemas import (
|
| AttackBreakdown,
|
| AttackBreakdownList,
|
| BenchmarkComparisonData,
|
| BenchmarkInfo,
|
| BenchmarkStats,
|
| ComparisonData,
|
| DeltaRobustnessData,
|
| HeatmapData,
|
| MetricSummary,
|
| RadarData,
|
| RunMetadata,
|
| RunSummary,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
| SAMPLE_RUNS = [
|
| {
|
| "id": "sample-run-001",
|
| "model_name": "gpt-4",
|
| "model_version": "v1.0",
|
| "dataset_version": "v1.0",
|
| "timestamp": "2024-01-15T10:30:00Z",
|
| "status": "completed",
|
| "composite_score": 0.75,
|
| },
|
| {
|
| "id": "sample-run-002",
|
| "model_name": "claude-3-sonnet",
|
| "model_version": "v1.0",
|
| "dataset_version": "v1.0",
|
| "timestamp": "2024-01-16T14:20:00Z",
|
| "status": "completed",
|
| "composite_score": 0.82,
|
| },
|
| {
|
| "id": "sample-run-003",
|
| "model_name": "Mistral-7B-v0.1",
|
| "model_version": "v1.0",
|
| "dataset_version": "v1.0",
|
| "timestamp": "2024-01-17T09:15:00Z",
|
| "status": "completed",
|
| "composite_score": 0.68,
|
| },
|
| {
|
| "id": "sample-run-004",
|
| "model_name": "Llama-2-70b",
|
| "model_version": "v1.0",
|
| "dataset_version": "v1.0",
|
| "timestamp": "2024-01-18T11:30:00Z",
|
| "status": "completed",
|
| "composite_score": 0.71,
|
| },
|
| {
|
| "id": "sample-run-005",
|
| "model_name": "gpt-3.5-turbo",
|
| "model_version": "v1.0",
|
| "dataset_version": "v1.0",
|
| "timestamp": "2024-01-19T13:45:00Z",
|
| "status": "completed",
|
| "composite_score": 0.65,
|
| },
|
| ]
|
|
|
|
|
|
|
| MODEL_SCORE_RANGES = {
|
| "gpt-4": {"hall": (0.08, 0.18), "tox": (0.02, 0.08), "bias": (0.03, 0.12), "conf": (0.75, 0.92)},
|
| "claude-3-sonnet": {"hall": (0.06, 0.15), "tox": (0.01, 0.06), "bias": (0.02, 0.10), "conf": (0.78, 0.95)},
|
| "mistral-7b-v0.1": {"hall": (0.12, 0.28), "tox": (0.04, 0.12), "bias": (0.06, 0.18), "conf": (0.65, 0.85)},
|
| "llama-2-70b": {"hall": (0.10, 0.22), "tox": (0.03, 0.10), "bias": (0.05, 0.15), "conf": (0.70, 0.88)},
|
| "gpt-3.5-turbo": {"hall": (0.15, 0.32), "tox": (0.05, 0.14), "bias": (0.07, 0.20), "conf": (0.60, 0.82)},
|
| }
|
|
|
| def _get_sample_results(run_id: str) -> List[Dict[str, Any]]:
|
| """Generate sample results for demo mode."""
|
| import random
|
|
|
|
|
| if isinstance(run_id, list):
|
| run_id = run_id[0] if run_id else "default"
|
|
|
|
|
| run_id = str(run_id)
|
|
|
| random.seed(hash(run_id) % 10000)
|
|
|
|
|
| model_name = None
|
| for run in SAMPLE_RUNS:
|
| if run["id"] == run_id:
|
| model_name = run["model_name"].lower()
|
| break
|
|
|
|
|
| if model_name:
|
|
|
| ranges = MODEL_SCORE_RANGES.get(model_name)
|
|
|
| if not ranges:
|
| for key in MODEL_SCORE_RANGES:
|
| if key in model_name or model_name in key:
|
| ranges = MODEL_SCORE_RANGES[key]
|
| break
|
| else:
|
| ranges = None
|
|
|
|
|
| if not ranges:
|
| ranges = {"hall": (0.05, 0.35), "tox": (0.02, 0.15), "bias": (0.05, 0.25), "conf": (0.60, 0.90)}
|
|
|
| attack_types = ["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion"]
|
| results = []
|
|
|
| for i in range(20):
|
| results.append({
|
| "id": f"{run_id}-result-{i}",
|
| "sample_id": f"sample-{i}",
|
| "attack_type": random.choice(attack_types) if i % 2 == 0 else None,
|
| "mutation_type": "paraphrase" if i % 3 == 0 else None,
|
| "hallucination": random.uniform(*ranges["hall"]),
|
| "toxicity": random.uniform(*ranges["tox"]),
|
| "bias": random.uniform(*ranges["bias"]),
|
| "confidence": random.uniform(*ranges["conf"]),
|
| "robustness": random.uniform(0.50, 0.85),
|
| })
|
|
|
| return results
|
|
|
|
|
| class DashboardDataLoader:
|
| """
|
| Data loader for dashboard visualization.
|
|
|
| Responsibilities:
|
| - Fetch evaluation runs
|
| - Fetch evaluation results
|
| - Fetch benchmark artifacts
|
| - Transform data into chart-ready format
|
|
|
| Note: Communicates with backend via internal function calls (same container).
|
| No direct DB exposure to frontend.
|
| """
|
|
|
| def __init__(self, demo_mode: bool = True, tenant_id: Optional[str] = None):
|
| """
|
| Initialize data loader.
|
|
|
| Args:
|
| demo_mode: If True, return sample data without database
|
| tenant_id: Optional tenant ID for multi-tenant filtering
|
| """
|
| self._demo_mode = demo_mode
|
| self._tenant_id = tenant_id
|
| self._aggregator = ScoreAggregator()
|
|
|
| def _get_tenant_filter(self) -> Dict[str, Any]:
|
| """Get tenant filter for database queries."""
|
| if self._tenant_id is None:
|
| return {}
|
| return {"tenant_id": self._tenant_id}
|
|
|
|
|
|
|
|
|
|
|
| def get_all_runs(self) -> List[Dict[str, Any]]:
|
| """
|
| Get all evaluation runs.
|
|
|
| Returns:
|
| List of run dictionaries with id, model_name, timestamp, status
|
| """
|
| if self._demo_mode:
|
| return SAMPLE_RUNS
|
|
|
|
|
| runs = []
|
| runs_dir = Path("experiments/runs")
|
|
|
| if runs_dir.exists():
|
| for run_file in runs_dir.glob("*.json"):
|
| try:
|
| with open(run_file, "r") as f:
|
| run_data = json.load(f)
|
| runs.append({
|
| "id": run_data.get("run_id", run_file.stem),
|
| "model_name": run_data.get("model_name", "unknown"),
|
| "model_version": run_data.get("model_version", "v1.0"),
|
| "dataset_version": run_data.get("dataset_version", "v1.0"),
|
| "timestamp": run_data.get("timestamp", ""),
|
| "status": run_data.get("status", "completed"),
|
| "composite_score": run_data.get("composite_score"),
|
| })
|
| except Exception as e:
|
| logger.error(f"Error loading run {run_file}: {e}")
|
|
|
| return runs if runs else SAMPLE_RUNS
|
|
|
| def get_run_by_id(self, run_id: str) -> Optional[Dict[str, Any]]:
|
| """Get a specific run by ID."""
|
| if self._demo_mode:
|
| for run in SAMPLE_RUNS:
|
| if run["id"] == run_id:
|
| return run
|
| return SAMPLE_RUNS[0] if SAMPLE_RUNS else None
|
|
|
| return None
|
|
|
| def get_run_results(self, run_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
|
| """Get results for a run."""
|
| if self._demo_mode:
|
| results = _get_sample_results(run_id)
|
| return results[:limit] if limit else results
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def get_run_summary(self, run_id: str) -> Optional[RunSummary]:
|
| """Get complete summary for a run."""
|
| run_data = self.get_run_by_id(run_id)
|
| if run_data is None:
|
| return None
|
|
|
| results = self.get_run_results(run_id)
|
|
|
| if not results:
|
| return None
|
|
|
|
|
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None]
|
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None]
|
| biases = [r["bias"] for r in results if r["bias"] is not None]
|
| confidences = [r["confidence"] for r in results if r["confidence"] is not None]
|
|
|
|
|
| attack_types = set()
|
| for r in results:
|
| if r.get("attack_type"):
|
| attack_types.add(r["attack_type"])
|
|
|
|
|
| metric_summaries = []
|
|
|
| if hallucinations:
|
| metric_summaries.append(MetricSummary.from_values("hallucination", hallucinations))
|
| if toxicities:
|
| metric_summaries.append(MetricSummary.from_values("toxicity", toxicities))
|
| if biases:
|
| metric_summaries.append(MetricSummary.from_values("bias", biases))
|
| if confidences:
|
| metric_summaries.append(MetricSummary.from_values("confidence", confidences))
|
|
|
|
|
| composite_score = None
|
| if hallucinations and toxicities and biases and confidences:
|
| mean_h = sum(hallucinations) / len(hallucinations)
|
| mean_t = sum(toxicities) / len(toxicities)
|
| mean_b = sum(biases) / len(biases)
|
| mean_c = sum(confidences) / len(confidences)
|
| composite_score = self._aggregator.calculate_composite(
|
| mean_h, mean_t, mean_b, mean_c
|
| )
|
|
|
|
|
| vulnerability_index = RunSummary.calculate_vulnerability_index(
|
| mean_h if hallucinations else 0.0,
|
| mean_t if toxicities else 0.0,
|
| mean_b if biases else 0.0,
|
| )
|
|
|
|
|
| from datetime import datetime
|
|
|
| metadata = RunMetadata(
|
| run_id=run_data["id"],
|
| timestamp=datetime.fromisoformat(run_data["timestamp"].replace("Z", "+00:00")) if run_data.get("timestamp") else datetime.utcnow(),
|
| model_name=run_data["model_name"],
|
| model_version=run_data["model_version"],
|
| dataset_version=run_data["dataset_version"],
|
| config_hash="demo_hash",
|
| status=run_data["status"],
|
| )
|
|
|
| return RunSummary(
|
| metadata=metadata,
|
| metric_summary=metric_summaries,
|
| composite_score=composite_score,
|
| total_samples=len(results),
|
| attack_coverage=sorted(list(attack_types)),
|
| vulnerability_index=vulnerability_index,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| def get_radar_data(self, run_id: str) -> Optional[RadarData]:
|
| """Get radar chart data for a run."""
|
| run_data = self.get_run_by_id(run_id)
|
| if run_data is None:
|
| return None
|
|
|
| results = self.get_run_results(run_id)
|
|
|
| if not results:
|
| return None
|
|
|
|
|
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None]
|
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None]
|
| biases = [r["bias"] for r in results if r["bias"] is not None]
|
| confidences = [r["confidence"] for r in results if r["confidence"] is not None]
|
|
|
| if not all([hallucinations, toxicities, biases, confidences]):
|
| return None
|
|
|
| mean_h = sum(hallucinations) / len(hallucinations)
|
| mean_t = sum(toxicities) / len(toxicities)
|
| mean_b = sum(biases) / len(biases)
|
| mean_c = sum(confidences) / len(confidences)
|
|
|
| return RadarData.from_metrics(
|
| mean_hallucination=mean_h,
|
| mean_toxicity=mean_t,
|
| mean_bias=mean_b,
|
| mean_confidence=mean_c,
|
| model_name=run_data["model_name"],
|
| run_id=run_id,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| def get_attack_heatmap(self, run_id: str) -> Optional[HeatmapData]:
|
| """Get attack vulnerability heatmap data."""
|
| results = self.get_run_results(run_id)
|
|
|
| if not results:
|
| return None
|
|
|
|
|
| heatmap_data = HeatmapData.from_results(results)
|
| heatmap_data.run_id = run_id
|
| return heatmap_data
|
|
|
|
|
|
|
|
|
|
|
| def get_attack_breakdown(self, run_id: str) -> Optional[AttackBreakdownList]:
|
| """Get per-attack metric breakdown data."""
|
| results = self.get_run_results(run_id)
|
|
|
| if not results:
|
| return None
|
|
|
|
|
| breakdown_list = AttackBreakdownList.from_results(results, run_id=run_id)
|
| return breakdown_list
|
|
|
| def get_attack_types_for_run(self, run_id: str) -> List[str]:
|
| """Get list of attack types for a run."""
|
| results = self.get_run_results(run_id)
|
|
|
| if not results:
|
| return []
|
|
|
| attack_types = set()
|
| for result in results:
|
| attack_type = result.get("attack_type") or "none"
|
| attack_types.add(attack_type)
|
|
|
| return sorted(list(attack_types))
|
|
|
|
|
|
|
|
|
|
|
| def get_model_comparison(self, run_ids: List[str]) -> Optional[ComparisonData]:
|
| """Get comparison data for multiple runs."""
|
| if not run_ids or len(run_ids) < 2:
|
| return None
|
|
|
| models = []
|
| hallucination_scores = []
|
| toxicity_scores = []
|
| bias_scores = []
|
| confidence_scores = []
|
| composite_scores = []
|
| sample_counts = []
|
|
|
| for run_id in run_ids:
|
| run_data = self.get_run_by_id(run_id)
|
| if run_data is None:
|
| continue
|
|
|
| results = self.get_run_results(run_id)
|
| if not results:
|
| continue
|
|
|
| models.append(run_data["model_name"])
|
|
|
|
|
| hallucinations = [r["hallucination"] for r in results if r["hallucination"] is not None]
|
| toxicities = [r["toxicity"] for r in results if r["toxicity"] is not None]
|
| biases = [r["bias"] for r in results if r["bias"] is not None]
|
| confidences = [r["confidence"] for r in results if r["confidence"] is not None]
|
|
|
| mean_h = sum(hallucinations) / len(hallucinations) if hallucinations else 0.0
|
| mean_t = sum(toxicities) / len(toxicities) if toxicities else 0.0
|
| mean_b = sum(biases) / len(biases) if biases else 0.0
|
| mean_c = sum(confidences) / len(confidences) if confidences else 0.0
|
|
|
| hallucination_scores.append(mean_h)
|
| toxicity_scores.append(mean_t)
|
| bias_scores.append(mean_b)
|
| confidence_scores.append(mean_c)
|
|
|
|
|
| composite = self._aggregator.calculate_composite(mean_h, mean_t, mean_b, mean_c)
|
| composite_scores.append(composite)
|
|
|
| sample_counts.append(len(results))
|
|
|
| if len(models) < 2:
|
| return None
|
|
|
| return ComparisonData(
|
| models=models,
|
| hallucination=hallucination_scores,
|
| toxicity=toxicity_scores,
|
| bias=bias_scores,
|
| confidence=confidence_scores,
|
| composite_score=composite_scores,
|
| sample_count=sample_counts,
|
| )
|
|
|
| def get_delta_robustness(self, run_ids: List[str]) -> List[DeltaRobustnessData]:
|
| """Get delta robustness comparison for multiple runs."""
|
| comparison = self.get_model_comparison(run_ids)
|
|
|
| if comparison is None:
|
| return []
|
|
|
|
|
| baseline_score = min(comparison.composite_score)
|
|
|
| deltas = []
|
| for i, model in enumerate(comparison.models):
|
| delta = comparison.composite_score[i] - baseline_score
|
| deltas.append(
|
| DeltaRobustnessData(
|
| model_name=model,
|
| delta_robustness=delta,
|
| composite_score=comparison.composite_score[i],
|
| rank=i + 1,
|
| )
|
| )
|
|
|
|
|
| deltas.sort(key=lambda x: x.composite_score, reverse=True)
|
|
|
|
|
| for i, delta in enumerate(deltas):
|
| delta.rank = i + 1
|
|
|
| return deltas
|
|
|
|
|
|
|
|
|
|
|
| def _get_benchmark_path(self, benchmark_id: str) -> Path:
|
| """Get the file path for a benchmark artifact."""
|
|
|
|
|
| base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks"
|
| return base_dir / f"{benchmark_id}.json"
|
|
|
| def list_benchmarks(self) -> List[BenchmarkInfo]:
|
| """List all available benchmarks."""
|
| benchmarks = []
|
|
|
|
|
|
|
| base_dir = Path(__file__).parent.parent / "experiments" / "benchmarks"
|
|
|
| if not base_dir.exists():
|
| logger.warning(f"Benchmarks directory does not exist: {base_dir}")
|
| return benchmarks
|
|
|
|
|
| for json_file in base_dir.glob("*.json"):
|
| benchmark_id = json_file.stem
|
| try:
|
| with open(json_file, "r") as f:
|
| data = json.load(f)
|
|
|
| info = BenchmarkInfo.from_json(benchmark_id, data)
|
| benchmarks.append(info)
|
| except Exception as e:
|
| logger.error(f"Error loading benchmark {benchmark_id}: {e}")
|
| continue
|
|
|
|
|
| benchmarks.sort(key=lambda x: x.timestamp, reverse=True)
|
|
|
| return benchmarks
|
|
|
| def get_benchmark_comparison(self, benchmark_id: str) -> Optional[BenchmarkComparisonData]:
|
| """Get benchmark comparison data for multiple models."""
|
| benchmark_path = self._get_benchmark_path(benchmark_id)
|
|
|
| if not benchmark_path.exists():
|
| logger.warning(f"Benchmark not found: {benchmark_path}")
|
| return None
|
|
|
| try:
|
| with open(benchmark_path, "r") as f:
|
| data = json.load(f)
|
|
|
| comparison = BenchmarkComparisonData.from_json(benchmark_id, data)
|
|
|
|
|
| logger.info(
|
| f"DASHBOARD_VIEW_BENCHMARK benchmark_id={benchmark_id} "
|
| f"model_count={comparison.total_models}"
|
| )
|
|
|
| return comparison
|
| except Exception as e:
|
| logger.error(f"Error loading benchmark {benchmark_id}: {e}")
|
| return None
|
|
|
| def get_benchmark_stats(self, benchmark_id: str) -> Optional[BenchmarkStats]:
|
| """Get statistical summary for a benchmark."""
|
| comparison = self.get_benchmark_comparison(benchmark_id)
|
|
|
| if comparison is None:
|
| return None
|
|
|
| stats = BenchmarkStats.from_comparison_data(benchmark_id, comparison)
|
|
|
| logger.info(
|
| f"DASHBOARD_COMPARE_MODELS benchmark_id={benchmark_id} "
|
| f"model_count={stats.total_models}"
|
| )
|
|
|
| return stats
|
|
|
|
|
|
|
|
|
|
|
| def get_monitoring_trends(
|
| self,
|
| model_version: Optional[str] = None,
|
| window_size: int = 50,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get monitoring trend data for dashboard visualization.
|
|
|
| Args:
|
| model_version: Optional model version to filter by
|
| window_size: Number of data points to return
|
|
|
| Returns:
|
| Dictionary with trend data for all metrics
|
| """
|
|
|
| if self._demo_mode:
|
| return self._get_sample_monitoring_trends(window_size)
|
|
|
|
|
| try:
|
| from backend.monitoring.pipeline import get_monitoring_pipeline
|
|
|
| pipeline = get_monitoring_pipeline()
|
| dashboard_data = pipeline.get_dashboard_data(trend_length=window_size)
|
|
|
| return {
|
| "timestamps": [ts.isoformat() for ts in dashboard_data.timestamps],
|
| "robustness": dashboard_data.robustness_trend,
|
| "hallucination": dashboard_data.hallucination_trend,
|
| "toxicity": dashboard_data.toxicity_trend,
|
| "bias": dashboard_data.bias_trend,
|
| "confidence": dashboard_data.confidence_trend,
|
| "rolling_robustness": dashboard_data.rolling_robustness,
|
| "rolling_hallucination": dashboard_data.rolling_hallucination,
|
| "rolling_toxicity": dashboard_data.rolling_toxicity,
|
| "rolling_confidence": dashboard_data.rolling_confidence,
|
| }
|
| except Exception as e:
|
| logger.error(f"Error getting monitoring trends: {e}")
|
| return self._get_sample_monitoring_trends(window_size)
|
|
|
| def get_active_alerts(
|
| self,
|
| model_version: Optional[str] = None,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get active alerts for dashboard display.
|
|
|
| Args:
|
| model_version: Optional model version to filter by
|
|
|
| Returns:
|
| Dictionary with alert data
|
| """
|
|
|
| if self._demo_mode:
|
| return self._get_sample_alerts()
|
|
|
|
|
| try:
|
| from backend.monitoring.pipeline import get_monitoring_pipeline
|
|
|
| pipeline = get_monitoring_pipeline()
|
| alerts = pipeline.get_active_alerts()
|
|
|
|
|
| alert_list = []
|
| for alert in alerts:
|
| alert_list.append({
|
| "id": alert.id,
|
| "alert_type": alert.alert_type.value if hasattr(alert.alert_type, 'value') else str(alert.alert_type),
|
| "severity": alert.severity.value if hasattr(alert.severity, 'value') else str(alert.severity),
|
| "model_version": alert.model_version,
|
| "metric_name": alert.metric_name,
|
| "baseline_value": alert.baseline_value,
|
| "current_value": alert.current_value,
|
| "drift_magnitude": alert.drift_magnitude,
|
| "threshold": alert.threshold,
|
| "timestamp": alert.timestamp.isoformat() if hasattr(alert.timestamp, 'isoformat') else str(alert.timestamp),
|
| "is_resolved": alert.is_resolved,
|
| })
|
|
|
| return {
|
| "alerts": alert_list,
|
| "total": len(alert_list),
|
| }
|
| except Exception as e:
|
| logger.error(f"Error getting active alerts: {e}")
|
| return self._get_sample_alerts()
|
|
|
| def get_drift_status(
|
| self,
|
| model_version: Optional[str] = None,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get current drift detection status.
|
|
|
| Args:
|
| model_version: Optional model version to filter by
|
|
|
| Returns:
|
| Dictionary with drift status for each metric
|
| """
|
|
|
| if self._demo_mode:
|
| return {
|
| "hallucination": {"is_drift": False, "magnitude": 0.0},
|
| "toxicity": {"is_drift": False, "magnitude": 0.0},
|
| "bias": {"is_drift": False, "magnitude": 0.0},
|
| "confidence": {"is_drift": False, "magnitude": 0.0},
|
| "robustness": {"is_drift": False, "magnitude": 0.0},
|
| }
|
|
|
|
|
| try:
|
| from backend.monitoring.pipeline import get_monitoring_pipeline
|
|
|
| pipeline = get_monitoring_pipeline()
|
| dashboard_data = pipeline.get_dashboard_data()
|
|
|
| drift_status = {}
|
| for metric_name, drift_result in dashboard_data.drift_status.items():
|
| drift_status[metric_name] = {
|
| "is_drift": drift_result.is_drift_detected,
|
| "magnitude": drift_result.drift_magnitude,
|
| "baseline": drift_result.baseline_value,
|
| "current": drift_result.live_value,
|
| "threshold": drift_result.threshold,
|
| "severity": drift_result.severity.value if hasattr(drift_result.severity, 'value') else str(drift_result.severity),
|
| }
|
|
|
| return drift_status
|
| except Exception as e:
|
| logger.error(f"Error getting drift status: {e}")
|
| return {
|
| "hallucination": {"is_drift": False, "magnitude": 0.0},
|
| "toxicity": {"is_drift": False, "magnitude": 0.0},
|
| "bias": {"is_drift": False, "magnitude": 0.0},
|
| "confidence": {"is_drift": False, "magnitude": 0.0},
|
| "robustness": {"is_drift": False, "magnitude": 0.0},
|
| }
|
|
|
| def get_monitoring_config(self) -> Dict[str, Any]:
|
| """
|
| Get monitoring configuration.
|
|
|
| Returns:
|
| Dictionary with monitoring config
|
| """
|
|
|
| if self._demo_mode:
|
| return {
|
| "window_size": 100,
|
| "sampling_rate": 1.0,
|
| "lightweight_hallucination": True,
|
| "hallucination_threshold": 0.08,
|
| "toxicity_threshold": 0.05,
|
| "bias_threshold": 0.05,
|
| "confidence_threshold": 0.15,
|
| "robustness_threshold": 0.10,
|
| }
|
|
|
|
|
| try:
|
| from backend.monitoring.pipeline import get_monitoring_pipeline
|
|
|
| pipeline = get_monitoring_pipeline()
|
| config = pipeline.config
|
|
|
| return {
|
| "window_size": config.window_size,
|
| "sampling_rate": config.sampling_rate,
|
| "lightweight_hallucination": config.lightweight_hallucination,
|
| "hallucination_threshold": config.hallucination_threshold,
|
| "toxicity_threshold": config.toxicity_threshold,
|
| "bias_threshold": config.bias_threshold,
|
| "confidence_threshold": config.confidence_threshold,
|
| "robustness_threshold": config.robustness_threshold,
|
| }
|
| except Exception as e:
|
| logger.error(f"Error getting monitoring config: {e}")
|
| return {
|
| "window_size": 100,
|
| "sampling_rate": 1.0,
|
| "lightweight_hallucination": True,
|
| "hallucination_threshold": 0.08,
|
| "toxicity_threshold": 0.05,
|
| "bias_threshold": 0.05,
|
| "confidence_threshold": 0.15,
|
| "robustness_threshold": 0.10,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def _get_sample_monitoring_trends(self, window_size: int = 50) -> Dict[str, Any]:
|
| """Generate sample monitoring trends for demo mode."""
|
| import random
|
| from datetime import datetime, timedelta
|
|
|
| random.seed(42)
|
|
|
|
|
| base_time = datetime.utcnow()
|
| timestamps = [(base_time - timedelta(minutes=window_size - i)).isoformat() for i in range(window_size)]
|
|
|
|
|
| robustness = [0.7 + random.uniform(-0.1, 0.1) for _ in range(window_size)]
|
| hallucination = [0.15 + random.uniform(-0.05, 0.05) for _ in range(window_size)]
|
| toxicity = [0.08 + random.uniform(-0.03, 0.03) for _ in range(window_size)]
|
| bias = [0.05 + random.uniform(-0.02, 0.02) for _ in range(window_size)]
|
| confidence = [0.75 + random.uniform(-0.1, 0.1) for _ in range(window_size)]
|
|
|
| return {
|
| "timestamps": timestamps,
|
| "robustness": robustness,
|
| "hallucination": hallucination,
|
| "toxicity": toxicity,
|
| "bias": bias,
|
| "confidence": confidence,
|
| "rolling_robustness": sum(robustness[-10:]) / 10,
|
| "rolling_hallucination": sum(hallucination[-10:]) / 10,
|
| "rolling_toxicity": sum(toxicity[-10:]) / 10,
|
| "rolling_confidence": sum(confidence[-10:]) / 10,
|
| }
|
|
|
| def _get_sample_alerts(self) -> Dict[str, Any]:
|
| """Generate sample alerts for demo mode."""
|
| from datetime import datetime, timedelta
|
|
|
| base_time = datetime.utcnow()
|
|
|
| sample_alerts = [
|
| {
|
| "id": "alert-001",
|
| "alert_type": "hallucination_drift",
|
| "severity": "high",
|
| "model_version": "gpt-4-v1",
|
| "metric_name": "hallucination",
|
| "baseline_value": 0.15,
|
| "current_value": 0.28,
|
| "drift_magnitude": 0.13,
|
| "threshold": 0.08,
|
| "timestamp": (base_time - timedelta(minutes=5)).isoformat(),
|
| "is_resolved": False,
|
| },
|
| {
|
| "id": "alert-002",
|
| "alert_type": "toxicity_drift",
|
| "severity": "medium",
|
| "model_version": "gpt-4-v1",
|
| "metric_name": "toxicity",
|
| "baseline_value": 0.05,
|
| "current_value": 0.12,
|
| "drift_magnitude": 0.07,
|
| "threshold": 0.05,
|
| "timestamp": (base_time - timedelta(minutes=15)).isoformat(),
|
| "is_resolved": False,
|
| },
|
| {
|
| "id": "alert-003",
|
| "alert_type": "confidence_collapse",
|
| "severity": "low",
|
| "model_version": "gpt-4-v1",
|
| "metric_name": "confidence",
|
| "baseline_value": 0.80,
|
| "current_value": 0.68,
|
| "drift_magnitude": 0.12,
|
| "threshold": 0.15,
|
| "timestamp": (base_time - timedelta(minutes=30)).isoformat(),
|
| "is_resolved": False,
|
| },
|
| ]
|
|
|
| return {
|
| "alerts": sample_alerts,
|
| "total": len(sample_alerts),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_data_loader(demo_mode: bool = True) -> DashboardDataLoader:
|
| """
|
| Get a DashboardDataLoader instance.
|
|
|
| Args:
|
| demo_mode: If True, return sample data without database
|
|
|
| Returns:
|
| DashboardDataLoader instance
|
| """
|
| return DashboardDataLoader(demo_mode=demo_mode)
|
|
|