ALM-2 / backend /scoring /visualization.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Visualization Support for AegisLM Scoring System.
Provides calibration curve data, reliability graph data,
and other visualization-ready data for advanced metrics.
"""
import numpy as np
from typing import List, Dict, Any, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class CalibrationCurvePoint:
"""Single point on calibration curve."""
confidence_bin: float
accuracy: float
sample_count: int
ideal_confidence: float
calibration_error: float
@dataclass
class ReliabilityDiagramData:
"""Data for reliability diagram visualization."""
bins: List[float]
accuracies: List[float]
confidences: List[float]
sample_counts: List[int]
ideal_line: List[float]
ece: float
@dataclass
class ConsistencyHeatmapData:
"""Data for consistency heatmap visualization."""
matrix: List[List[float]]
labels: List[str]
title: str
color_scale: str
@dataclass
class MetricsTimeSeries:
"""Time series data for metrics visualization."""
timestamps: List[datetime]
calibration_scores: List[float]
reliability_scores: List[float]
consistency_scores: List[float]
overall_scores: List[float]
class ScoringVisualizer:
"""
Visualization support for scoring system metrics.
Generates chart-ready data for calibration curves,
reliability diagrams, and other advanced metrics visualizations.
"""
def __init__(self):
"""Initialize scoring visualizer."""
self.color_palette = [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
"#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf"
]
def generate_calibration_curve_data(
self,
confidences: List[float],
correctness: List[bool],
n_bins: int = 10
) -> ReliabilityDiagramData:
"""
Generate data for calibration curve visualization.
Args:
confidences: List of confidence scores
correctness: List of correctness indicators
n_bins: Number of bins for calibration curve
Returns:
ReliabilityDiagramData: Calibration curve data
"""
if len(confidences) != len(correctness) or not confidences:
return ReliabilityDiagramData(
bins=[],
accuracies=[],
confidences=[],
sample_counts=[],
ideal_line=[],
ece=0.0
)
# Create bins
bin_boundaries = np.linspace(0, 1, n_bins + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
bin_centers = (bin_lowers + bin_uppers) / 2
# Calculate bin statistics
bin_accuracies = []
bin_confidences = []
bin_counts = []
for i in range(n_bins):
bin_mask = (np.array(confidences) > bin_lowers[i]) & \
(np.array(confidences) <= bin_uppers[i])
bin_samples = np.sum(bin_mask)
if bin_samples > 0:
bin_correctness = np.array(correctness)[bin_mask]
bin_confidence_vals = np.array(confidences)[bin_mask]
accuracy = np.mean(bin_correctness)
avg_confidence = np.mean(bin_confidence_vals)
bin_accuracies.append(accuracy)
bin_confidences.append(avg_confidence)
bin_counts.append(int(bin_samples))
else:
bin_accuracies.append(0.0)
bin_confidences.append(bin_centers[i])
bin_counts.append(0)
# Calculate ECE
ece = self._calculate_ece_from_bins(
bin_centers, bin_accuracies, bin_counts, len(confidences)
)
# Ideal line (perfect calibration)
ideal_line = list(bin_centers)
return ReliabilityDiagramData(
bins=bin_centers.tolist(),
accuracies=bin_accuracies,
confidences=bin_confidences,
sample_counts=bin_counts,
ideal_line=ideal_line,
ece=ece
)
def generate_reliability_graph_data(
self,
reliability_metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""
Generate data for reliability graph visualization.
Args:
reliability_metrics: Reliability analysis metrics
Returns:
Dict[str, Any]: Reliability graph data
"""
# Extract reliability diagram data
reliability_data = reliability_metrics.get("reliability_data", [])
if not reliability_data:
return {
"reliability_diagram": {
"bins": [],
"accuracies": [],
"confidences": [],
"sample_counts": []
},
"confidence_distribution": {
"labels": [],
"values": []
},
"issue_breakdown": {
"labels": [],
"values": []
}
}
# Process reliability diagram data
bins = [point["confidence_avg"] for point in reliability_data]
accuracies = [point["accuracy"] for point in reliability_data]
confidences = [point["confidence_avg"] for point in reliability_data]
sample_counts = [point["sample_count"] for point in reliability_data]
# Generate confidence distribution
confidence_distribution = self._generate_confidence_distribution(reliability_data)
# Generate issue breakdown
issue_breakdown = self._generate_issue_breakdown(reliability_metrics)
return {
"reliability_diagram": {
"bins": bins,
"accuracies": accuracies,
"confidences": confidences,
"sample_counts": sample_counts
},
"confidence_distribution": confidence_distribution,
"issue_breakdown": issue_breakdown
}
def generate_consistency_heatmap_data(
self,
consistency_results: List[Any]
) -> ConsistencyHeatmapData:
"""
Generate data for consistency heatmap visualization.
Args:
consistency_results: List of consistency test results
Returns:
ConsistencyHeatmapData: Heatmap data
"""
if not consistency_results:
return ConsistencyHeatmapData(
matrix=[],
labels=[],
title="Consistency Heatmap",
color_scale="RdYlBu"
)
# Create consistency matrix
n_tests = len(consistency_results)
matrix = []
for i in range(n_tests):
row = []
for j in range(n_tests):
if i == j:
# Diagonal - perfect consistency
row.append(1.0)
else:
# Calculate similarity between test i and test j
similarity = self._calculate_test_similarity(
consistency_results[i], consistency_results[j]
)
row.append(similarity)
matrix.append(row)
# Generate labels
labels = [f"Test {i+1}" for i in range(n_tests)]
return ConsistencyHeatmapData(
matrix=matrix,
labels=labels,
title="Response Consistency Heatmap",
color_scale="RdYlBu"
)
def generate_metrics_time_series(
self,
metrics_history: List[Dict[str, Any]]
) -> MetricsTimeSeries:
"""
Generate time series data for metrics visualization.
Args:
metrics_history: List of historical metrics data
Returns:
MetricsTimeSeries: Time series data
"""
if not metrics_history:
return MetricsTimeSeries(
timestamps=[],
calibration_scores=[],
reliability_scores=[],
consistency_scores=[],
overall_scores=[]
)
timestamps = []
calibration_scores = []
reliability_scores = []
consistency_scores = []
overall_scores = []
for metrics in metrics_history:
# Extract timestamp
if "timestamp" in metrics:
if isinstance(metrics["timestamp"], str):
timestamp = datetime.fromisoformat(metrics["timestamp"])
else:
timestamp = metrics["timestamp"]
else:
timestamp = datetime.utcnow()
timestamps.append(timestamp)
# Extract scores with defaults
calibration_scores.append(metrics.get("calibration_score", 0.5))
reliability_scores.append(metrics.get("reliability_score", 0.5))
consistency_scores.append(metrics.get("consistency_score", 0.5))
overall_scores.append(metrics.get("overall_quality_score", 0.5))
return MetricsTimeSeries(
timestamps=timestamps,
calibration_scores=calibration_scores,
reliability_scores=reliability_scores,
consistency_scores=consistency_scores,
overall_scores=overall_scores
)
def generate_advanced_metrics_dashboard(
self,
advanced_metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""
Generate comprehensive dashboard data for advanced metrics.
Args:
advanced_metrics: Advanced metrics data
Returns:
Dict[str, Any]: Dashboard data
"""
dashboard_data = {
"overview": {
"overall_score": advanced_metrics.get("overall_quality_score", 0.0),
"quality_grade": advanced_metrics.get("quality_grade", "N/A"),
"calibration_score": advanced_metrics.get("calibration_score", 0.0),
"reliability_score": advanced_metrics.get("reliability_score", 0.0),
"consistency_score": advanced_metrics.get("consistency_score", 0.0)
},
"detailed_metrics": {
"confidence_quality": advanced_metrics.get("confidence_quality", 0.0),
"prediction_stability": advanced_metrics.get("prediction_stability", 0.0),
"response_coherence": advanced_metrics.get("response_coherence", 0.0)
},
"breakdowns": {
"calibration": advanced_metrics.get("calibration_breakdown", {}),
"reliability": advanced_metrics.get("reliability_breakdown", {}),
"consistency": advanced_metrics.get("consistency_breakdown", {})
},
"recommendations": advanced_metrics.get("improvement_suggestions", [])
}
return dashboard_data
def _calculate_ece_from_bins(
self,
bin_centers: List[float],
bin_accuracies: List[float],
bin_counts: List[int],
total_samples: int
) -> float:
"""
Calculate Expected Calibration Error from bin data.
Args:
bin_centers: Bin center values
bin_accuracies: Bin accuracies
bin_counts: Bin sample counts
total_samples: Total number of samples
Returns:
float: Expected Calibration Error
"""
ece = 0.0
for i in range(len(bin_centers)):
if bin_counts[i] > 0:
weight = bin_counts[i] / total_samples
error = abs(bin_accuracies[i] - bin_centers[i])
ece += weight * error
return ece
def _generate_confidence_distribution(
self,
reliability_data: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Generate confidence distribution data.
Args:
reliability_data: Reliability data points
Returns:
Dict[str, Any]: Confidence distribution data
"""
# Group by confidence ranges
confidence_ranges = {
"0.0-0.2": 0,
"0.2-0.4": 0,
"0.4-0.6": 0,
"0.6-0.8": 0,
"0.8-1.0": 0
}
for point in reliability_data:
confidence = point["confidence_avg"]
count = point["sample_count"]
if confidence <= 0.2:
confidence_ranges["0.0-0.2"] += count
elif confidence <= 0.4:
confidence_ranges["0.2-0.4"] += count
elif confidence <= 0.6:
confidence_ranges["0.4-0.6"] += count
elif confidence <= 0.8:
confidence_ranges["0.6-0.8"] += count
else:
confidence_ranges["0.8-1.0"] += count
return {
"labels": list(confidence_ranges.keys()),
"values": list(confidence_ranges.values())
}
def _generate_issue_breakdown(
self,
reliability_metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""
Generate issue breakdown data.
Args:
reliability_metrics: Reliability metrics
Returns:
Dict[str, Any]: Issue breakdown data
"""
issues = reliability_metrics.get("issues", [])
# Count issue types
issue_counts = {}
for issue in issues:
issue_str = str(issue)
issue_counts[issue_str] = issue_counts.get(issue_str, 0) + 1
return {
"labels": list(issue_counts.keys()),
"values": list(issue_counts.values())
}
def _calculate_test_similarity(
self,
test1: Any,
test2: Any
) -> float:
"""
Calculate similarity between two test results.
Args:
test1: First test result
test2: Second test result
Returns:
float: Similarity score (0-1)
"""
try:
# Extract consistency scores
score1 = getattr(test1, 'consistency_score', 0.5)
score2 = getattr(test2, 'consistency_score', 0.5)
# Simple similarity based on consistency scores
similarity = 1.0 - abs(score1 - score2)
return max(0.0, min(1.0, similarity))
except Exception:
return 0.5
def export_chart_data(
self,
chart_type: str,
data: Dict[str, Any],
format: str = "json"
) -> Dict[str, Any]:
"""
Export chart data in specified format.
Args:
chart_type: Type of chart
data: Chart data
format: Export format ('json', 'csv', 'chartjs')
Returns:
Dict[str, Any]: Exported data
"""
if format == "json":
return {
"chart_type": chart_type,
"data": data,
"export_timestamp": datetime.utcnow().isoformat(),
"format": "json"
}
elif format == "chartjs":
# Convert to Chart.js format
return self._convert_to_chartjs_format(chart_type, data)
elif format == "csv":
# Convert to CSV format
return self._convert_to_csv_format(chart_type, data)
else:
raise ValueError(f"Unsupported export format: {format}")
def _convert_to_chartjs_format(
self,
chart_type: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Convert data to Chart.js format.
Args:
chart_type: Chart type
data: Raw data
Returns:
Dict[str, Any]: Chart.js formatted data
"""
if chart_type == "calibration_curve":
return {
"type": "line",
"data": {
"labels": data.get("bins", []),
"datasets": [
{
"label": "Actual Calibration",
"data": data.get("accuracies", []),
"borderColor": self.color_palette[0],
"backgroundColor": self.color_palette[0] + "20",
"borderWidth": 2
},
{
"label": "Ideal Calibration",
"data": data.get("ideal_line", []),
"borderColor": self.color_palette[1],
"backgroundColor": self.color_palette[1] + "20",
"borderWidth": 2,
"borderDash": [5, 5]
}
]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": "Calibration Curve"
}
},
"scales": {
"x": {
"title": {
"display": True,
"text": "Confidence"
}
},
"y": {
"title": {
"display": True,
"text": "Accuracy"
},
"min": 0,
"max": 1
}
}
}
}
elif chart_type == "reliability_diagram":
return {
"type": "bar",
"data": {
"labels": data.get("bins", []),
"datasets": [
{
"label": "Accuracy",
"data": data.get("accuracies", []),
"backgroundColor": self.color_palette[0] + "80",
"borderColor": self.color_palette[0],
"borderWidth": 1
}
]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": "Reliability Diagram"
}
},
"scales": {
"x": {
"title": {
"display": True,
"text": "Confidence Bins"
}
},
"y": {
"title": {
"display": True,
"text": "Accuracy"
},
"min": 0,
"max": 1
}
}
}
}
else:
return {"error": f"Unsupported chart type: {chart_type}"}
def _convert_to_csv_format(
self,
chart_type: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Convert data to CSV format.
Args:
chart_type: Chart type
data: Raw data
Returns:
Dict[str, Any]: CSV formatted data
"""
if chart_type == "calibration_curve":
headers = ["Bin", "Accuracy", "Confidence", "Ideal"]
rows = []
bins = data.get("bins", [])
accuracies = data.get("accuracies", [])
confidences = data.get("confidences", [])
ideal_line = data.get("ideal_line", [])
for i in range(len(bins)):
rows.append([
bins[i],
accuracies[i],
confidences[i],
ideal_line[i]
])
return {
"headers": headers,
"rows": rows,
"format": "csv"
}
else:
return {"error": f"CSV format not supported for chart type: {chart_type}"}
# Global visualizer instance
scoring_visualizer = ScoringVisualizer()
def get_scoring_visualizer() -> ScoringVisualizer:
"""
Get the global scoring visualizer instance.
Returns:
ScoringVisualizer: Global instance
"""
return scoring_visualizer