| """ |
| Coding Agent Evaluation Exporter |
| |
| Exports coding agent annotations in formats useful for training: |
| - PRM (Process Reward Model): per-step reward signals |
| - DPO/RLHF preference format: chosen/rejected trace pairs |
| - SWE-bench compatible evaluation results |
| - Code review format: structured review data |
| """ |
|
|
| import json |
| import os |
| import logging |
| from typing import Dict, List, Any, Optional, Tuple |
|
|
| from .base import BaseExporter, ExportContext, ExportResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CodingEvalExporter(BaseExporter): |
| """Export coding agent annotations for ML training pipelines.""" |
|
|
| format_name = "coding_eval" |
| description = "Coding agent evaluation data (PRM, DPO, SWE-bench, code review)" |
| file_extensions = [".jsonl", ".json"] |
|
|
| def export(self, context: ExportContext, output_path: str, |
| options: Optional[dict] = None) -> ExportResult: |
| options = options or {} |
| export_types = options.get("types", ["prm", "preference", "swebench", "code_review"]) |
| files_written = [] |
| warnings = [] |
| stats = {} |
|
|
| os.makedirs(output_path, exist_ok=True) |
|
|
| if "prm" in export_types: |
| path, count = self._export_prm(context, output_path) |
| if path: |
| files_written.append(path) |
| stats["prm_instances"] = count |
|
|
| if "preference" in export_types: |
| path, count = self._export_preference(context, output_path) |
| if path: |
| files_written.append(path) |
| stats["preference_pairs"] = count |
|
|
| if "swebench" in export_types: |
| path, count = self._export_swebench(context, output_path) |
| if path: |
| files_written.append(path) |
| stats["swebench_results"] = count |
|
|
| if "code_review" in export_types: |
| path, count = self._export_code_review(context, output_path) |
| if path: |
| files_written.append(path) |
| stats["code_reviews"] = count |
|
|
| return ExportResult( |
| success=True, |
| format_name=self.format_name, |
| files_written=files_written, |
| warnings=warnings, |
| stats=stats, |
| ) |
|
|
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: |
| if not context.annotations: |
| return False, "No annotations to export" |
|
|
| |
| schema_types = {s.get("annotation_type") for s in context.schemas} |
| relevant = schema_types & {"process_reward", "code_review", "pairwise", "radio"} |
| if not relevant: |
| return False, "No coding evaluation schemas found (process_reward, code_review, pairwise, radio)" |
|
|
| return True, "" |
|
|
| def _export_prm(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: |
| """Export PRM training data.""" |
| output_path = os.path.join(output_dir, "prm_training_data.jsonl") |
| count = 0 |
|
|
| with open(output_path, "w") as f: |
| for ann in context.annotations: |
| instance_id = ann.get("instance_id", "") |
| labels = ann.get("labels", {}) |
|
|
| for schema_name, value in labels.items(): |
| if not isinstance(value, dict): |
| continue |
| label_val = value.get("label", "") |
| if not isinstance(label_val, str): |
| continue |
|
|
| try: |
| parsed = json.loads(label_val) |
| except (json.JSONDecodeError, TypeError): |
| continue |
|
|
| if not isinstance(parsed, dict) or "steps" not in parsed: |
| continue |
|
|
| steps = parsed["steps"] |
| if not isinstance(steps, list): |
| continue |
|
|
| record = { |
| "instance_id": instance_id, |
| "annotator": ann.get("user_id", ""), |
| "steps": [ |
| {"index": s.get("index", i), "reward": s.get("reward", 0)} |
| for i, s in enumerate(steps) |
| ], |
| } |
| if "mode" in parsed: |
| record["mode"] = parsed["mode"] |
|
|
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
| count += 1 |
|
|
| if count == 0: |
| os.remove(output_path) |
| return None, 0 |
|
|
| logger.info(f"Exported {count} PRM records to {output_path}") |
| return output_path, count |
|
|
| def _export_preference(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: |
| """Export DPO/RLHF preference pairs from pairwise annotations.""" |
| output_path = os.path.join(output_dir, "preference_pairs.jsonl") |
| count = 0 |
|
|
| with open(output_path, "w") as f: |
| for ann in context.annotations: |
| instance_id = ann.get("instance_id", "") |
| labels = ann.get("labels", {}) |
|
|
| for schema_name, value in labels.items(): |
| if not isinstance(value, dict): |
| continue |
|
|
| label_val = value.get("label", "") |
| |
| if label_val not in ("A", "B", "a", "b"): |
| continue |
|
|
| |
| item_data = context.items.get(instance_id, {}) |
| prompt = item_data.get("task_description", item_data.get("text", "")) |
|
|
| record = { |
| "instance_id": instance_id, |
| "prompt": prompt, |
| "chosen": label_val.upper(), |
| "annotator": ann.get("user_id", ""), |
| } |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
| count += 1 |
|
|
| if count == 0: |
| os.remove(output_path) |
| return None, 0 |
|
|
| logger.info(f"Exported {count} preference pairs to {output_path}") |
| return output_path, count |
|
|
| def _export_swebench(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: |
| """Export SWE-bench compatible evaluation results.""" |
| output_path = os.path.join(output_dir, "swebench_results.jsonl") |
| count = 0 |
|
|
| with open(output_path, "w") as f: |
| for ann in context.annotations: |
| instance_id = ann.get("instance_id", "") |
| labels = ann.get("labels", {}) |
|
|
| |
| resolved = None |
| for schema_name, value in labels.items(): |
| if not isinstance(value, dict): |
| continue |
| label_val = value.get("label", "") |
| if label_val in ("success", "resolved", "correct"): |
| resolved = True |
| elif label_val in ("failure", "unresolved", "incorrect"): |
| resolved = False |
| elif label_val in ("partial", "partially_resolved"): |
| resolved = False |
|
|
| if resolved is not None: |
| record = { |
| "instance_id": instance_id, |
| "resolved": resolved, |
| "annotator": ann.get("user_id", ""), |
| } |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
| count += 1 |
|
|
| if count == 0: |
| os.remove(output_path) |
| return None, 0 |
|
|
| logger.info(f"Exported {count} SWE-bench results to {output_path}") |
| return output_path, count |
|
|
| def _export_code_review(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]: |
| """Export structured code review data.""" |
| output_path = os.path.join(output_dir, "code_reviews.jsonl") |
| count = 0 |
|
|
| with open(output_path, "w") as f: |
| for ann in context.annotations: |
| instance_id = ann.get("instance_id", "") |
| labels = ann.get("labels", {}) |
|
|
| for schema_name, value in labels.items(): |
| if not isinstance(value, dict): |
| continue |
| label_val = value.get("label", "") |
| if not isinstance(label_val, str): |
| continue |
|
|
| try: |
| parsed = json.loads(label_val) |
| except (json.JSONDecodeError, TypeError): |
| continue |
|
|
| if not isinstance(parsed, dict): |
| continue |
|
|
| |
| if "verdict" in parsed or "comments" in parsed: |
| record = { |
| "instance_id": instance_id, |
| "annotator": ann.get("user_id", ""), |
| "verdict": parsed.get("verdict", ""), |
| "comments": parsed.get("comments", []), |
| "file_ratings": parsed.get("file_ratings", {}), |
| } |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
| count += 1 |
|
|
| if count == 0: |
| os.remove(output_path) |
| return None, 0 |
|
|
| logger.info(f"Exported {count} code reviews to {output_path}") |
| return output_path, count |
|
|