File size: 9,727 Bytes
aceb1b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
Coding Agent Evaluation Exporter

Exports coding agent annotations in formats useful for training:
- PRM (Process Reward Model): per-step reward signals
- DPO/RLHF preference format: chosen/rejected trace pairs
- SWE-bench compatible evaluation results
- Code review format: structured review data
"""

import json
import os
import logging
from typing import Dict, List, Any, Optional, Tuple

from .base import BaseExporter, ExportContext, ExportResult

logger = logging.getLogger(__name__)


class CodingEvalExporter(BaseExporter):
    """Export coding agent annotations for ML training pipelines."""

    format_name = "coding_eval"
    description = "Coding agent evaluation data (PRM, DPO, SWE-bench, code review)"
    file_extensions = [".jsonl", ".json"]

    def export(self, context: ExportContext, output_path: str,
               options: Optional[dict] = None) -> ExportResult:
        options = options or {}
        export_types = options.get("types", ["prm", "preference", "swebench", "code_review"])
        files_written = []
        warnings = []
        stats = {}

        os.makedirs(output_path, exist_ok=True)

        if "prm" in export_types:
            path, count = self._export_prm(context, output_path)
            if path:
                files_written.append(path)
                stats["prm_instances"] = count

        if "preference" in export_types:
            path, count = self._export_preference(context, output_path)
            if path:
                files_written.append(path)
                stats["preference_pairs"] = count

        if "swebench" in export_types:
            path, count = self._export_swebench(context, output_path)
            if path:
                files_written.append(path)
                stats["swebench_results"] = count

        if "code_review" in export_types:
            path, count = self._export_code_review(context, output_path)
            if path:
                files_written.append(path)
                stats["code_reviews"] = count

        return ExportResult(
            success=True,
            format_name=self.format_name,
            files_written=files_written,
            warnings=warnings,
            stats=stats,
        )

    def can_export(self, context: ExportContext) -> Tuple[bool, str]:
        if not context.annotations:
            return False, "No annotations to export"

        # Check for relevant schema types
        schema_types = {s.get("annotation_type") for s in context.schemas}
        relevant = schema_types & {"process_reward", "code_review", "pairwise", "radio"}
        if not relevant:
            return False, "No coding evaluation schemas found (process_reward, code_review, pairwise, radio)"

        return True, ""

    def _export_prm(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]:
        """Export PRM training data."""
        output_path = os.path.join(output_dir, "prm_training_data.jsonl")
        count = 0

        with open(output_path, "w") as f:
            for ann in context.annotations:
                instance_id = ann.get("instance_id", "")
                labels = ann.get("labels", {})

                for schema_name, value in labels.items():
                    if not isinstance(value, dict):
                        continue
                    label_val = value.get("label", "")
                    if not isinstance(label_val, str):
                        continue

                    try:
                        parsed = json.loads(label_val)
                    except (json.JSONDecodeError, TypeError):
                        continue

                    if not isinstance(parsed, dict) or "steps" not in parsed:
                        continue

                    steps = parsed["steps"]
                    if not isinstance(steps, list):
                        continue

                    record = {
                        "instance_id": instance_id,
                        "annotator": ann.get("user_id", ""),
                        "steps": [
                            {"index": s.get("index", i), "reward": s.get("reward", 0)}
                            for i, s in enumerate(steps)
                        ],
                    }
                    if "mode" in parsed:
                        record["mode"] = parsed["mode"]

                    f.write(json.dumps(record, ensure_ascii=False) + "\n")
                    count += 1

        if count == 0:
            os.remove(output_path)
            return None, 0

        logger.info(f"Exported {count} PRM records to {output_path}")
        return output_path, count

    def _export_preference(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]:
        """Export DPO/RLHF preference pairs from pairwise annotations."""
        output_path = os.path.join(output_dir, "preference_pairs.jsonl")
        count = 0

        with open(output_path, "w") as f:
            for ann in context.annotations:
                instance_id = ann.get("instance_id", "")
                labels = ann.get("labels", {})

                for schema_name, value in labels.items():
                    if not isinstance(value, dict):
                        continue

                    label_val = value.get("label", "")
                    # Pairwise annotations store "A" or "B"
                    if label_val not in ("A", "B", "a", "b"):
                        continue

                    # Get the instance data to extract prompt
                    item_data = context.items.get(instance_id, {})
                    prompt = item_data.get("task_description", item_data.get("text", ""))

                    record = {
                        "instance_id": instance_id,
                        "prompt": prompt,
                        "chosen": label_val.upper(),
                        "annotator": ann.get("user_id", ""),
                    }
                    f.write(json.dumps(record, ensure_ascii=False) + "\n")
                    count += 1

        if count == 0:
            os.remove(output_path)
            return None, 0

        logger.info(f"Exported {count} preference pairs to {output_path}")
        return output_path, count

    def _export_swebench(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]:
        """Export SWE-bench compatible evaluation results."""
        output_path = os.path.join(output_dir, "swebench_results.jsonl")
        count = 0

        with open(output_path, "w") as f:
            for ann in context.annotations:
                instance_id = ann.get("instance_id", "")
                labels = ann.get("labels", {})

                # Look for task_success or similar radio annotation
                resolved = None
                for schema_name, value in labels.items():
                    if not isinstance(value, dict):
                        continue
                    label_val = value.get("label", "")
                    if label_val in ("success", "resolved", "correct"):
                        resolved = True
                    elif label_val in ("failure", "unresolved", "incorrect"):
                        resolved = False
                    elif label_val in ("partial", "partially_resolved"):
                        resolved = False  # SWE-bench is binary

                if resolved is not None:
                    record = {
                        "instance_id": instance_id,
                        "resolved": resolved,
                        "annotator": ann.get("user_id", ""),
                    }
                    f.write(json.dumps(record, ensure_ascii=False) + "\n")
                    count += 1

        if count == 0:
            os.remove(output_path)
            return None, 0

        logger.info(f"Exported {count} SWE-bench results to {output_path}")
        return output_path, count

    def _export_code_review(self, context: ExportContext, output_dir: str) -> Tuple[Optional[str], int]:
        """Export structured code review data."""
        output_path = os.path.join(output_dir, "code_reviews.jsonl")
        count = 0

        with open(output_path, "w") as f:
            for ann in context.annotations:
                instance_id = ann.get("instance_id", "")
                labels = ann.get("labels", {})

                for schema_name, value in labels.items():
                    if not isinstance(value, dict):
                        continue
                    label_val = value.get("label", "")
                    if not isinstance(label_val, str):
                        continue

                    try:
                        parsed = json.loads(label_val)
                    except (json.JSONDecodeError, TypeError):
                        continue

                    if not isinstance(parsed, dict):
                        continue

                    # Check for code review structure
                    if "verdict" in parsed or "comments" in parsed:
                        record = {
                            "instance_id": instance_id,
                            "annotator": ann.get("user_id", ""),
                            "verdict": parsed.get("verdict", ""),
                            "comments": parsed.get("comments", []),
                            "file_ratings": parsed.get("file_ratings", {}),
                        }
                        f.write(json.dumps(record, ensure_ascii=False) + "\n")
                        count += 1

        if count == 0:
            os.remove(output_path)
            return None, 0

        logger.info(f"Exported {count} code reviews to {output_path}")
        return output_path, count