import json import re from pathlib import Path from typing import Iterable, List, Tuple from tqdm import tqdm PROJECT_ROOT = Path(__file__).resolve().parent INPUT_FILES = [ PROJECT_ROOT / "apps" / "train.jsonl", PROJECT_ROOT / "apps" / "test.jsonl", ] OUTPUT_FILE = PROJECT_ROOT / "data" / "raw" / "apps.jsonl" MAX_SOLUTIONS_PER_PROBLEM = 2 MIN_RESPONSE_CHARS = 20 MAX_RESPONSE_TOKENS = 3000 CODE_HINT_RE = re.compile( r"(\bdef\s+\w+\s*\(|\bclass\s+\w+|\bfor\s+\w+\s+in\b|\bwhile\b|[{;}]|\breturn\b|\bimport\b)", re.IGNORECASE, ) def _normalize_text(value: str) -> str: return value.strip() def _parse_solutions(raw_solutions) -> List[str]: if raw_solutions is None: return [] if isinstance(raw_solutions, list): return [str(x) for x in raw_solutions if x is not None] if isinstance(raw_solutions, str): raw_solutions = raw_solutions.strip() if not raw_solutions: return [] try: parsed = json.loads(raw_solutions) if isinstance(parsed, list): return [str(x) for x in parsed if x is not None] if isinstance(parsed, str): return [parsed] return [] except json.JSONDecodeError: return [raw_solutions] return [] def _is_code_like(text: str) -> bool: return bool(CODE_HINT_RE.search(text)) def _iter_jsonl(path: Path) -> Iterable[dict]: with path.open("r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue if isinstance(obj, dict): yield obj def convert_apps_dataset(input_files: List[Path], output_file: Path) -> Tuple[int, int, int]: output_file.parent.mkdir(parents=True, exist_ok=True) total_input_samples = 0 valid_output_samples = 0 skipped_samples = 0 with output_file.open("w", encoding="utf-8") as out_f: for input_path in input_files: if not input_path.exists(): continue for item in tqdm(_iter_jsonl(input_path), desc=f"apps:{input_path.name}", unit="rows"): total_input_samples += 1 question = _normalize_text(str(item.get("question", ""))) if not question: skipped_samples += 1 continue all_solutions = _parse_solutions(item.get("solutions")) if not all_solutions: skipped_samples += 1 continue usable = 0 for raw_solution in all_solutions: solution = _normalize_text(raw_solution) if not solution: continue if len(solution) < MIN_RESPONSE_CHARS: continue if len(solution.split()) > MAX_RESPONSE_TOKENS: continue if not _is_code_like(solution): continue row = { "instruction": f"Solve the following problem:\n{question}", "response": solution, } out_f.write(json.dumps(row, ensure_ascii=False) + "\n") valid_output_samples += 1 usable += 1 if usable >= MAX_SOLUTIONS_PER_PROBLEM: break if usable == 0: skipped_samples += 1 return total_input_samples, valid_output_samples, skipped_samples if __name__ == "__main__": total_input, valid_output, skipped = convert_apps_dataset(INPUT_FILES, OUTPUT_FILE) print(f"Output: {OUTPUT_FILE}") print(f"Total input samples: {total_input}") print(f"Valid output samples: {valid_output}") print(f"Skipped samples: {skipped}") print("APPS dataset ready for training pipeline")