Spaces:
Paused
Paused
| """ | |
| Coding Agent Evaluation Exporter | |
| Exports coding agent annotations in formats useful for training: | |
| - PRM (Process Reward Model): per-step reward signals | |
| - DPO/RLHF preference format: chosen/rejected trace pairs | |
| - SWE-bench compatible evaluation results | |
| - Code review format: structured review data | |
| """ | |
| import json | |
| import os | |
| import logging | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from .base import BaseExporter, ExportContext, ExportResult | |
| logger = logging.getLogger(__name__) | |
| class CodingEvalExporter(BaseExporter): | |
| """Export coding agent annotations for ML training pipelines.""" | |
| format_name = "coding_eval" | |
| description = "Coding agent evaluation data (PRM, DPO, SWE-bench, code review)" | |
| file_extensions = [".jsonl", ".json"] | |
| def export(self, context: ExportContext, output_path: str, | |
| options: Optional[dict] = None) -> ExportResult: | |
| options = options or {} | |
| export_types = options.get("types", ["prm", "preference", "swebench", "code_review"]) | |
| files_written = [] | |
| warnings = [] | |
| stats = {} | |
| os.makedirs(output_path, exist_ok=True) | |
| if "prm" in export_types: | |
| path, count = self._export_prm(context, output_path) | |
| if path: | |
| files_written.append(path) | |
| stats["prm_instances"] = count | |
| if "preference" in export_types: | |
| path, count = self._export_preference(context, output_path) | |
| if path: | |
| files_written.append(path) | |
| stats["preference_pairs"] = count | |
| if "swebench" in export_types: | |
| path, count = self._export_swebench(context, output_path) | |
| if path: | |
| files_written.append(path) | |
| stats["swebench_results"] = count | |
| if "code_review" in export_types: | |
| path, count = self._export_code_review(context, output_path) | |
| if path: | |
| files_written.append(path) | |
| stats["code_reviews"] = count | |
| return ExportResult( | |
| success=True, | |
| format_name=self.format_name, | |
| files_written=files_written, | |
| warnings=warnings, | |
| stats=stats, | |
| ) | |
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: | |
| if not context.annotations: | |
| return False, "No annotations to export" | |
| # Check for relevant schema types | |
| schema_types = {s.get("annotation_type") for s in context.schemas} | |
| relevant = schema_types & {"process_reward", "code_review", "pairwise", "radio"} | |
| if not relevant: | |
| return False, "No coding evaluation schemas found (process_reward, code_review, pairwise, radio)" | |
| return True, "" | |
| def _export_prm(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: | |
| """Export PRM training data.""" | |
| output_path = os.path.join(output_dir, "prm_training_data.jsonl") | |
| count = 0 | |
| with open(output_path, "w") as f: | |
| for ann in context.annotations: | |
| instance_id = ann.get("instance_id", "") | |
| labels = ann.get("labels", {}) | |
| for schema_name, value in labels.items(): | |
| if not isinstance(value, dict): | |
| continue | |
| label_val = value.get("label", "") | |
| if not isinstance(label_val, str): | |
| continue | |
| try: | |
| parsed = json.loads(label_val) | |
| except (json.JSONDecodeError, TypeError): | |
| continue | |
| if not isinstance(parsed, dict) or "steps" not in parsed: | |
| continue | |
| steps = parsed["steps"] | |
| if not isinstance(steps, list): | |
| continue | |
| record = { | |
| "instance_id": instance_id, | |
| "annotator": ann.get("user_id", ""), | |
| "steps": [ | |
| {"index": s.get("index", i), "reward": s.get("reward", 0)} | |
| for i, s in enumerate(steps) | |
| ], | |
| } | |
| if "mode" in parsed: | |
| record["mode"] = parsed["mode"] | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| count += 1 | |
| if count == 0: | |
| os.remove(output_path) | |
| return None, 0 | |
| logger.info(f"Exported {count} PRM records to {output_path}") | |
| return output_path, count | |
| def _export_preference(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: | |
| """Export DPO/RLHF preference pairs from pairwise annotations.""" | |
| output_path = os.path.join(output_dir, "preference_pairs.jsonl") | |
| count = 0 | |
| with open(output_path, "w") as f: | |
| for ann in context.annotations: | |
| instance_id = ann.get("instance_id", "") | |
| labels = ann.get("labels", {}) | |
| for schema_name, value in labels.items(): | |
| if not isinstance(value, dict): | |
| continue | |
| label_val = value.get("label", "") | |
| # Pairwise annotations store "A" or "B" | |
| if label_val not in ("A", "B", "a", "b"): | |
| continue | |
| # Get the instance data to extract prompt | |
| item_data = context.items.get(instance_id, {}) | |
| prompt = item_data.get("task_description", item_data.get("text", "")) | |
| record = { | |
| "instance_id": instance_id, | |
| "prompt": prompt, | |
| "chosen": label_val.upper(), | |
| "annotator": ann.get("user_id", ""), | |
| } | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| count += 1 | |
| if count == 0: | |
| os.remove(output_path) | |
| return None, 0 | |
| logger.info(f"Exported {count} preference pairs to {output_path}") | |
| return output_path, count | |
| def _export_swebench(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: | |
| """Export SWE-bench compatible evaluation results.""" | |
| output_path = os.path.join(output_dir, "swebench_results.jsonl") | |
| count = 0 | |
| with open(output_path, "w") as f: | |
| for ann in context.annotations: | |
| instance_id = ann.get("instance_id", "") | |
| labels = ann.get("labels", {}) | |
| # Look for task_success or similar radio annotation | |
| resolved = None | |
| for schema_name, value in labels.items(): | |
| if not isinstance(value, dict): | |
| continue | |
| label_val = value.get("label", "") | |
| if label_val in ("success", "resolved", "correct"): | |
| resolved = True | |
| elif label_val in ("failure", "unresolved", "incorrect"): | |
| resolved = False | |
| elif label_val in ("partial", "partially_resolved"): | |
| resolved = False # SWE-bench is binary | |
| if resolved is not None: | |
| record = { | |
| "instance_id": instance_id, | |
| "resolved": resolved, | |
| "annotator": ann.get("user_id", ""), | |
| } | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| count += 1 | |
| if count == 0: | |
| os.remove(output_path) | |
| return None, 0 | |
| logger.info(f"Exported {count} SWE-bench results to {output_path}") | |
| return output_path, count | |
| def _export_code_review(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: | |
| """Export structured code review data.""" | |
| output_path = os.path.join(output_dir, "code_reviews.jsonl") | |
| count = 0 | |
| with open(output_path, "w") as f: | |
| for ann in context.annotations: | |
| instance_id = ann.get("instance_id", "") | |
| labels = ann.get("labels", {}) | |
| for schema_name, value in labels.items(): | |
| if not isinstance(value, dict): | |
| continue | |
| label_val = value.get("label", "") | |
| if not isinstance(label_val, str): | |
| continue | |
| try: | |
| parsed = json.loads(label_val) | |
| except (json.JSONDecodeError, TypeError): | |
| continue | |
| if not isinstance(parsed, dict): | |
| continue | |
| # Check for code review structure | |
| if "verdict" in parsed or "comments" in parsed: | |
| record = { | |
| "instance_id": instance_id, | |
| "annotator": ann.get("user_id", ""), | |
| "verdict": parsed.get("verdict", ""), | |
| "comments": parsed.get("comments", []), | |
| "file_ratings": parsed.get("file_ratings", {}), | |
| } | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| count += 1 | |
| if count == 0: | |
| os.remove(output_path) | |
| return None, 0 | |
| logger.info(f"Exported {count} code reviews to {output_path}") | |
| return output_path, count | |