| import json |
| import re |
| from pathlib import Path |
| from typing import Iterable, List, Tuple |
|
|
| from tqdm import tqdm |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parent |
| INPUT_FILES = [ |
| PROJECT_ROOT / "apps" / "train.jsonl", |
| PROJECT_ROOT / "apps" / "test.jsonl", |
| ] |
| OUTPUT_FILE = PROJECT_ROOT / "data" / "raw" / "apps.jsonl" |
|
|
| MAX_SOLUTIONS_PER_PROBLEM = 2 |
| MIN_RESPONSE_CHARS = 20 |
| MAX_RESPONSE_TOKENS = 3000 |
| CODE_HINT_RE = re.compile( |
| r"(\bdef\s+\w+\s*\(|\bclass\s+\w+|\bfor\s+\w+\s+in\b|\bwhile\b|[{;}]|\breturn\b|\bimport\b)", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def _normalize_text(value: str) -> str: |
| return value.strip() |
|
|
|
|
| def _parse_solutions(raw_solutions) -> List[str]: |
| if raw_solutions is None: |
| return [] |
| if isinstance(raw_solutions, list): |
| return [str(x) for x in raw_solutions if x is not None] |
| if isinstance(raw_solutions, str): |
| raw_solutions = raw_solutions.strip() |
| if not raw_solutions: |
| return [] |
| try: |
| parsed = json.loads(raw_solutions) |
| if isinstance(parsed, list): |
| return [str(x) for x in parsed if x is not None] |
| if isinstance(parsed, str): |
| return [parsed] |
| return [] |
| except json.JSONDecodeError: |
| return [raw_solutions] |
| return [] |
|
|
|
|
| def _is_code_like(text: str) -> bool: |
| return bool(CODE_HINT_RE.search(text)) |
|
|
|
|
| def _iter_jsonl(path: Path) -> Iterable[dict]: |
| with path.open("r", encoding="utf-8", errors="ignore") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| obj = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(obj, dict): |
| yield obj |
|
|
|
|
| def convert_apps_dataset(input_files: List[Path], output_file: Path) -> Tuple[int, int, int]: |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
| total_input_samples = 0 |
| valid_output_samples = 0 |
| skipped_samples = 0 |
|
|
| with output_file.open("w", encoding="utf-8") as out_f: |
| for input_path in input_files: |
| if not input_path.exists(): |
| continue |
|
|
| for item in tqdm(_iter_jsonl(input_path), desc=f"apps:{input_path.name}", unit="rows"): |
| total_input_samples += 1 |
|
|
| question = _normalize_text(str(item.get("question", ""))) |
| if not question: |
| skipped_samples += 1 |
| continue |
|
|
| all_solutions = _parse_solutions(item.get("solutions")) |
| if not all_solutions: |
| skipped_samples += 1 |
| continue |
|
|
| usable = 0 |
| for raw_solution in all_solutions: |
| solution = _normalize_text(raw_solution) |
| if not solution: |
| continue |
| if len(solution) < MIN_RESPONSE_CHARS: |
| continue |
| if len(solution.split()) > MAX_RESPONSE_TOKENS: |
| continue |
| if not _is_code_like(solution): |
| continue |
|
|
| row = { |
| "instruction": f"Solve the following problem:\n{question}", |
| "response": solution, |
| } |
| out_f.write(json.dumps(row, ensure_ascii=False) + "\n") |
| valid_output_samples += 1 |
| usable += 1 |
| if usable >= MAX_SOLUTIONS_PER_PROBLEM: |
| break |
|
|
| if usable == 0: |
| skipped_samples += 1 |
|
|
| return total_input_samples, valid_output_samples, skipped_samples |
|
|
|
|
| if __name__ == "__main__": |
| total_input, valid_output, skipped = convert_apps_dataset(INPUT_FILES, OUTPUT_FILE) |
| print(f"Output: {OUTPUT_FILE}") |
| print(f"Total input samples: {total_input}") |
| print(f"Valid output samples: {valid_output}") |
| print(f"Skipped samples: {skipped}") |
| print("APPS dataset ready for training pipeline") |
|
|