mindi-backup / apps_converter.py
Mindigenous
Sync latest workspace state: data/scripts updates and archive cleanup
5ae3e12
import json
import re
from pathlib import Path
from typing import Iterable, List, Tuple
from tqdm import tqdm
PROJECT_ROOT = Path(__file__).resolve().parent
INPUT_FILES = [
PROJECT_ROOT / "apps" / "train.jsonl",
PROJECT_ROOT / "apps" / "test.jsonl",
]
OUTPUT_FILE = PROJECT_ROOT / "data" / "raw" / "apps.jsonl"
MAX_SOLUTIONS_PER_PROBLEM = 2
MIN_RESPONSE_CHARS = 20
MAX_RESPONSE_TOKENS = 3000
CODE_HINT_RE = re.compile(
r"(\bdef\s+\w+\s*\(|\bclass\s+\w+|\bfor\s+\w+\s+in\b|\bwhile\b|[{;}]|\breturn\b|\bimport\b)",
re.IGNORECASE,
)
def _normalize_text(value: str) -> str:
return value.strip()
def _parse_solutions(raw_solutions) -> List[str]:
if raw_solutions is None:
return []
if isinstance(raw_solutions, list):
return [str(x) for x in raw_solutions if x is not None]
if isinstance(raw_solutions, str):
raw_solutions = raw_solutions.strip()
if not raw_solutions:
return []
try:
parsed = json.loads(raw_solutions)
if isinstance(parsed, list):
return [str(x) for x in parsed if x is not None]
if isinstance(parsed, str):
return [parsed]
return []
except json.JSONDecodeError:
return [raw_solutions]
return []
def _is_code_like(text: str) -> bool:
return bool(CODE_HINT_RE.search(text))
def _iter_jsonl(path: Path) -> Iterable[dict]:
with path.open("r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
yield obj
def convert_apps_dataset(input_files: List[Path], output_file: Path) -> Tuple[int, int, int]:
output_file.parent.mkdir(parents=True, exist_ok=True)
total_input_samples = 0
valid_output_samples = 0
skipped_samples = 0
with output_file.open("w", encoding="utf-8") as out_f:
for input_path in input_files:
if not input_path.exists():
continue
for item in tqdm(_iter_jsonl(input_path), desc=f"apps:{input_path.name}", unit="rows"):
total_input_samples += 1
question = _normalize_text(str(item.get("question", "")))
if not question:
skipped_samples += 1
continue
all_solutions = _parse_solutions(item.get("solutions"))
if not all_solutions:
skipped_samples += 1
continue
usable = 0
for raw_solution in all_solutions:
solution = _normalize_text(raw_solution)
if not solution:
continue
if len(solution) < MIN_RESPONSE_CHARS:
continue
if len(solution.split()) > MAX_RESPONSE_TOKENS:
continue
if not _is_code_like(solution):
continue
row = {
"instruction": f"Solve the following problem:\n{question}",
"response": solution,
}
out_f.write(json.dumps(row, ensure_ascii=False) + "\n")
valid_output_samples += 1
usable += 1
if usable >= MAX_SOLUTIONS_PER_PROBLEM:
break
if usable == 0:
skipped_samples += 1
return total_input_samples, valid_output_samples, skipped_samples
if __name__ == "__main__":
total_input, valid_output, skipped = convert_apps_dataset(INPUT_FILES, OUTPUT_FILE)
print(f"Output: {OUTPUT_FILE}")
print(f"Total input samples: {total_input}")
print(f"Valid output samples: {valid_output}")
print(f"Skipped samples: {skipped}")
print("APPS dataset ready for training pipeline")