Spaces:
Paused
Paused
| """ | |
| Agent Evaluation Exporter | |
| Exports annotations in a structured format optimized for agent evaluation, | |
| producing per-trace aggregated scores, error distributions, and per-step | |
| assessment summaries. | |
| Output format: | |
| { | |
| "summary": { | |
| "total_traces": 10, | |
| "total_annotators": 3, | |
| "schemas_evaluated": ["task_success", "efficiency", ...] | |
| }, | |
| "per_trace": [ | |
| { | |
| "trace_id": "trace_001", | |
| "annotations": { | |
| "task_success": {"distribution": {"success": 2, "partial": 1}, "majority": "success"}, | |
| "efficiency": {"mean": 4.2, "std": 0.5, "values": [4, 5, 4]}, | |
| "mast_errors": {"counts": {"no_errors": 3}, "total_annotations": 3} | |
| }, | |
| "annotator_count": 3 | |
| } | |
| ], | |
| "aggregate": { | |
| "task_success": {"success_rate": 0.7, "partial_rate": 0.2, "failure_rate": 0.1}, | |
| "efficiency": {"overall_mean": 3.8, "overall_std": 0.9}, | |
| "mast_errors": {"total_distribution": {"no_errors": 25, "step_repetition": 3, ...}} | |
| } | |
| } | |
| """ | |
| import csv | |
| import io | |
| import json | |
| import logging | |
| import os | |
| from collections import Counter, defaultdict | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from .base import BaseExporter, ExportContext, ExportResult | |
| logger = logging.getLogger(__name__) | |
| class AgentEvalExporter(BaseExporter): | |
| """ | |
| Exporter for agent trace evaluation annotations. | |
| Produces structured JSON output optimized for evaluation dashboards | |
| and leaderboard computation. | |
| """ | |
| format_name = "agent_eval" | |
| description = "Agent evaluation export with aggregated scores and error distributions" | |
| file_extensions = [".json"] | |
| def export(self, context: ExportContext, output_path: str, | |
| options: Optional[dict] = None) -> ExportResult: | |
| options = options or {} | |
| files_written = [] | |
| warnings = [] | |
| try: | |
| # Group annotations by trace | |
| trace_annotations = self._group_by_trace(context.annotations) | |
| # Get schema info | |
| schema_map = {s["name"]: s for s in context.schemas} | |
| # Compute per-trace aggregations | |
| per_trace_results = [] | |
| for trace_id, annotations in sorted(trace_annotations.items()): | |
| trace_result = self._aggregate_trace(trace_id, annotations, schema_map) | |
| per_trace_results.append(trace_result) | |
| # Compute global aggregations | |
| aggregate = self._compute_aggregate(per_trace_results, schema_map) | |
| # Build summary | |
| all_annotators = set() | |
| for anns in trace_annotations.values(): | |
| for ann in anns: | |
| all_annotators.add(ann.get("user_id", "unknown")) | |
| summary = { | |
| "total_traces": len(trace_annotations), | |
| "total_annotators": len(all_annotators), | |
| "annotators": sorted(all_annotators), | |
| "schemas_evaluated": sorted(schema_map.keys()), | |
| } | |
| # Build output | |
| output = { | |
| "summary": summary, | |
| "per_trace": per_trace_results, | |
| "aggregate": aggregate, | |
| } | |
| # Write output | |
| os.makedirs(output_path, exist_ok=True) | |
| output_file = os.path.join(output_path, "agent_evaluation.json") | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(output, f, indent=2, ensure_ascii=False) | |
| files_written.append(output_file) | |
| # Also write a per-trace CSV for easy analysis | |
| csv_file = os.path.join(output_path, "agent_evaluation_summary.csv") | |
| self._write_summary_csv(csv_file, per_trace_results, schema_map) | |
| files_written.append(csv_file) | |
| return ExportResult( | |
| success=True, | |
| format_name=self.format_name, | |
| files_written=files_written, | |
| warnings=warnings, | |
| stats={ | |
| "total_traces": len(trace_annotations), | |
| "total_annotations": sum(len(a) for a in trace_annotations.values()), | |
| "total_annotators": len(all_annotators), | |
| }, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Agent eval export failed: {e}") | |
| return ExportResult( | |
| success=False, | |
| format_name=self.format_name, | |
| errors=[str(e)], | |
| ) | |
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: | |
| if not context.annotations: | |
| return False, "No annotations to export" | |
| if not context.schemas: | |
| return False, "No annotation schemas defined" | |
| return True, "" | |
| def _group_by_trace(self, annotations: List[dict]) -> Dict[str, List[dict]]: | |
| """Group annotations by instance (trace) ID.""" | |
| grouped = defaultdict(list) | |
| for ann in annotations: | |
| trace_id = ann.get("instance_id", "unknown") | |
| grouped[trace_id].append(ann) | |
| return dict(grouped) | |
| def _aggregate_trace(self, trace_id: str, annotations: List[dict], | |
| schema_map: Dict[str, dict]) -> dict: | |
| """Aggregate annotations for a single trace.""" | |
| result = { | |
| "trace_id": trace_id, | |
| "annotator_count": len(set(a.get("user_id", "unknown") for a in annotations)), | |
| "annotations": {}, | |
| } | |
| # Group by schema | |
| schema_values = defaultdict(list) | |
| for ann in annotations: | |
| labels = ann.get("labels", {}) | |
| for schema_name, value in labels.items(): | |
| schema_values[schema_name].append(value) | |
| # Aggregate each schema | |
| for schema_name, values in schema_values.items(): | |
| schema_config = schema_map.get(schema_name, {}) | |
| schema_type = schema_config.get("annotation_type", "") | |
| if schema_type in ("radio", "select"): | |
| result["annotations"][schema_name] = self._aggregate_categorical(values) | |
| elif schema_type in ("likert", "slider", "number"): | |
| result["annotations"][schema_name] = self._aggregate_numeric(values) | |
| elif schema_type == "multiselect": | |
| result["annotations"][schema_name] = self._aggregate_multiselect(values) | |
| elif schema_type == "multirate": | |
| result["annotations"][schema_name] = self._aggregate_multirate(values) | |
| elif schema_type == "text": | |
| result["annotations"][schema_name] = {"responses": values} | |
| else: | |
| result["annotations"][schema_name] = {"values": values} | |
| return result | |
| def _aggregate_categorical(self, values: List) -> dict: | |
| """Aggregate categorical (radio/select) annotations.""" | |
| # Flatten nested dicts - values might be {"label": "value"} or just strings | |
| flat_values = [] | |
| for v in values: | |
| if isinstance(v, dict): | |
| # Take the key with the highest value (convert to float for comparison) | |
| if v: | |
| def _sort_key(k): | |
| try: | |
| return float(v[k]) | |
| except (ValueError, TypeError): | |
| return 0 | |
| flat_values.append(max(v.keys(), key=_sort_key)) | |
| else: | |
| flat_values.append(str(v)) | |
| distribution = dict(Counter(flat_values)) | |
| majority = max(distribution, key=distribution.get) if distribution else "" | |
| return { | |
| "distribution": distribution, | |
| "majority": majority, | |
| "agreement": max(distribution.values()) / len(flat_values) if flat_values else 0, | |
| } | |
| def _aggregate_numeric(self, values: List) -> dict: | |
| """Aggregate numeric (likert/slider) annotations.""" | |
| numeric_values = [] | |
| for v in values: | |
| if isinstance(v, (int, float)): | |
| numeric_values.append(float(v)) | |
| elif isinstance(v, str): | |
| try: | |
| numeric_values.append(float(v)) | |
| except ValueError: | |
| pass | |
| elif isinstance(v, dict): | |
| # Try to extract numeric value | |
| for val in v.values(): | |
| try: | |
| numeric_values.append(float(val)) | |
| except (ValueError, TypeError): | |
| pass | |
| if not numeric_values: | |
| return {"mean": None, "values": values} | |
| mean = sum(numeric_values) / len(numeric_values) | |
| # Use sample standard deviation (N-1) when N > 1, population (N) when N == 1 | |
| n = len(numeric_values) | |
| variance = sum((x - mean) ** 2 for x in numeric_values) / max(n - 1, 1) | |
| std = variance ** 0.5 | |
| return { | |
| "mean": round(mean, 3), | |
| "std": round(std, 3), | |
| "min": min(numeric_values), | |
| "max": max(numeric_values), | |
| "values": numeric_values, | |
| } | |
| def _aggregate_multiselect(self, values: List) -> dict: | |
| """Aggregate multiselect annotations.""" | |
| counts = Counter() | |
| total = 0 | |
| for v in values: | |
| total += 1 | |
| if isinstance(v, dict): | |
| for label, selected in v.items(): | |
| if selected: | |
| counts[label] += 1 | |
| elif isinstance(v, list): | |
| for label in v: | |
| counts[label] += 1 | |
| return { | |
| "counts": dict(counts), | |
| "total_annotations": total, | |
| } | |
| def _aggregate_multirate(self, values: List) -> dict: | |
| """Aggregate multirate annotations.""" | |
| item_ratings = defaultdict(list) | |
| for v in values: | |
| if isinstance(v, dict): | |
| for item_name, rating in v.items(): | |
| try: | |
| item_ratings[item_name].append(float(rating)) | |
| except (ValueError, TypeError): | |
| item_ratings[item_name].append(rating) | |
| result = {} | |
| for item_name, ratings in item_ratings.items(): | |
| numeric = [r for r in ratings if isinstance(r, (int, float))] | |
| if numeric: | |
| result[item_name] = { | |
| "mean": round(sum(numeric) / len(numeric), 3), | |
| "values": ratings, | |
| } | |
| else: | |
| result[item_name] = {"values": ratings} | |
| return {"per_item": result} | |
| def _compute_aggregate(self, per_trace_results: List[dict], | |
| schema_map: Dict[str, dict]) -> dict: | |
| """Compute aggregate statistics across all traces.""" | |
| aggregate = {} | |
| for schema_name, schema_config in schema_map.items(): | |
| schema_type = schema_config.get("annotation_type", "") | |
| if schema_type in ("radio", "select"): | |
| aggregate[schema_name] = self._aggregate_categorical_global( | |
| per_trace_results, schema_name | |
| ) | |
| elif schema_type in ("likert", "slider", "number"): | |
| aggregate[schema_name] = self._aggregate_numeric_global( | |
| per_trace_results, schema_name | |
| ) | |
| elif schema_type == "multiselect": | |
| aggregate[schema_name] = self._aggregate_multiselect_global( | |
| per_trace_results, schema_name | |
| ) | |
| return aggregate | |
| def _aggregate_categorical_global(self, results: List[dict], schema_name: str) -> dict: | |
| """Compute global rates for categorical annotations.""" | |
| all_majorities = [] | |
| total_dist = Counter() | |
| for result in results: | |
| ann = result.get("annotations", {}).get(schema_name, {}) | |
| if "majority" in ann: | |
| all_majorities.append(ann["majority"]) | |
| if "distribution" in ann: | |
| for label, count in ann["distribution"].items(): | |
| total_dist[label] += count | |
| # Compute rates | |
| total = sum(total_dist.values()) | |
| rates = {} | |
| for label, count in total_dist.items(): | |
| rates[f"{label}_rate"] = round(count / total, 3) if total > 0 else 0 | |
| return { | |
| "rates": rates, | |
| "total_distribution": dict(total_dist), | |
| "majority_distribution": dict(Counter(all_majorities)), | |
| } | |
| def _aggregate_numeric_global(self, results: List[dict], schema_name: str) -> dict: | |
| """Compute global stats for numeric annotations.""" | |
| all_means = [] | |
| for result in results: | |
| ann = result.get("annotations", {}).get(schema_name, {}) | |
| if ann.get("mean") is not None: | |
| all_means.append(ann["mean"]) | |
| if not all_means: | |
| return {"overall_mean": None} | |
| overall_mean = sum(all_means) / len(all_means) | |
| n = len(all_means) | |
| variance = sum((x - overall_mean) ** 2 for x in all_means) / max(n - 1, 1) | |
| return { | |
| "overall_mean": round(overall_mean, 3), | |
| "overall_std": round(variance ** 0.5, 3), | |
| "num_traces": len(all_means), | |
| } | |
| def _aggregate_multiselect_global(self, results: List[dict], schema_name: str) -> dict: | |
| """Compute global counts for multiselect annotations.""" | |
| total_counts = Counter() | |
| for result in results: | |
| ann = result.get("annotations", {}).get(schema_name, {}) | |
| for label, count in ann.get("counts", {}).items(): | |
| total_counts[label] += count | |
| return {"total_distribution": dict(total_counts)} | |
| def _write_summary_csv(self, csv_path: str, per_trace_results: List[dict], | |
| schema_map: Dict[str, dict]) -> None: | |
| """Write a summary CSV with one row per trace.""" | |
| if not per_trace_results: | |
| return | |
| # Collect all column names | |
| columns = ["trace_id", "annotator_count"] | |
| for result in per_trace_results: | |
| for schema_name in result.get("annotations", {}): | |
| schema_type = schema_map.get(schema_name, {}).get("annotation_type", "") | |
| if schema_type in ("radio", "select"): | |
| col = f"{schema_name}_majority" | |
| if col not in columns: | |
| columns.append(col) | |
| elif schema_type in ("likert", "slider", "number"): | |
| col = f"{schema_name}_mean" | |
| if col not in columns: | |
| columns.append(col) | |
| # Write CSV using csv module for proper escaping | |
| with open(csv_path, "w", encoding="utf-8", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(columns) | |
| for result in per_trace_results: | |
| row = [result["trace_id"], str(result["annotator_count"])] | |
| for col in columns[2:]: | |
| schema_name = col.rsplit("_", 1)[0] | |
| ann = result.get("annotations", {}).get(schema_name, {}) | |
| if col.endswith("_majority"): | |
| row.append(str(ann.get("majority", ""))) | |
| elif col.endswith("_mean"): | |
| row.append(str(ann.get("mean", ""))) | |
| else: | |
| row.append("") | |
| writer.writerow(row) | |