| import argparse |
| import json |
| import os |
| import re |
| import sqlite3 |
| from collections import Counter |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Optional |
|
|
| from datasets import load_dataset, load_from_disk |
| from tqdm import tqdm |
|
|
| from dataset_cleaner import build_balanced_dataset, clean_record |
| from dataset_formatter import build_instruction_sample |
| from utils import ensure_dirs, setup_logger |
|
|
|
|
| RAW_DIR = Path("./data/raw") |
| FINAL_DIR = Path("./data/final") |
| FINAL_TRAIN = FINAL_DIR / "train.jsonl" |
| LOG_DIR = Path("./logs") |
|
|
|
|
| def _safe_get(item: Dict[str, object], keys: Iterable[str]) -> str: |
| for key in keys: |
| value = item.get(key) |
| if value: |
| return str(value) |
| return "" |
|
|
|
|
| def _write_jsonl(path: Path, rows: Iterable[Dict[str, str]]) -> int: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| count = 0 |
| with path.open("w", encoding="utf-8") as f: |
| for row in rows: |
| if not row.get("instruction") or not row.get("response"): |
| continue |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") |
| count += 1 |
| return count |
|
|
|
|
| def _iter_jsonl(path: Path) -> Iterable[Dict[str, object]]: |
| with path.open("r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| yield json.loads(line) |
| except json.JSONDecodeError: |
| continue |
|
|
|
|
| def _source_to_category(source_name: str) -> str: |
| s = source_name.lower() |
| if any(k in s for k in ("codealpaca", "evol", "ultrachat", "openhermes", "orca")): |
| return "instruction" |
| if any( |
| k in s |
| for k in ( |
| "leetcode", |
| "contest", |
| "problem", |
| "mbpp", |
| "humaneval", |
| "apps", |
| "codeforces", |
| "codesearchnet_problem", |
| ) |
| ): |
| return "problem" |
| return "structured" |
|
|
|
|
| def _decode_text(value) -> str: |
| if value is None: |
| return "" |
| if isinstance(value, str): |
| return value |
| if isinstance(value, bytes): |
| return value.decode("utf-8", errors="ignore") |
| return str(value) |
|
|
|
|
| def _extract_solution_from_code_contests(item: Dict[str, object]) -> str: |
| sols = item.get("solutions") |
| if isinstance(sols, dict): |
| |
| cand = sols.get("solution") |
| if isinstance(cand, list): |
| |
| for s in cand: |
| t = _decode_text(s) |
| if re.search(r"\bdef\b|\bimport\b|\bprint\(", t): |
| return t |
| if cand: |
| return _decode_text(cand[0]) |
| if isinstance(sols, list) and sols: |
| return _decode_text(sols[0]) |
| return _safe_get(item, ["solution", "answer", "code"]) |
|
|
|
|
| def _extract_many_code_contests_solutions(item: Dict[str, object], max_per_problem: int = 6) -> List[str]: |
| out: List[str] = [] |
| sols = item.get("solutions") |
| if isinstance(sols, dict): |
| cand = sols.get("solution") |
| if isinstance(cand, list): |
| for s in cand: |
| t = _decode_text(s).strip() |
| if not t: |
| continue |
| if t not in out: |
| out.append(t) |
| if len(out) >= max_per_problem: |
| break |
| if not out: |
| one = _extract_solution_from_code_contests(item).strip() |
| if one: |
| out.append(one) |
| return out |
|
|
|
|
| def _extract_many_apps_solutions(item: Dict[str, object], max_per_problem: int = 5) -> List[str]: |
| out: List[str] = [] |
| for key in ("solutions", "solution", "answer", "code"): |
| val = item.get(key) |
| if isinstance(val, list): |
| for x in val: |
| t = _decode_text(x).strip() |
| if t and t not in out: |
| out.append(t) |
| if len(out) >= max_per_problem: |
| return out |
| elif isinstance(val, dict): |
| for x in val.values(): |
| if isinstance(x, list): |
| for y in x: |
| t = _decode_text(y).strip() |
| if t and t not in out: |
| out.append(t) |
| if len(out) >= max_per_problem: |
| return out |
| else: |
| t = _decode_text(val).strip() |
| if t and t not in out: |
| out.append(t) |
| if len(out) >= max_per_problem: |
| return out |
| return out |
|
|
|
|
| def _collect_code_candidates(value, out: List[str], max_per_problem: int) -> None: |
| if len(out) >= max_per_problem: |
| return |
| if value is None: |
| return |
| if isinstance(value, str): |
| v = value.strip() |
| if v and v not in out: |
| out.append(v) |
| return |
| if isinstance(value, bytes): |
| v = _decode_text(value).strip() |
| if v and v not in out: |
| out.append(v) |
| return |
| if isinstance(value, list): |
| for x in value: |
| _collect_code_candidates(x, out, max_per_problem) |
| if len(out) >= max_per_problem: |
| return |
| return |
| if isinstance(value, dict): |
| for k in ("solution", "solutions", "code", "answer", "python", "cpp", "java", "javascript"): |
| if k in value: |
| _collect_code_candidates(value.get(k), out, max_per_problem) |
| if len(out) >= max_per_problem: |
| return |
| for v in value.values(): |
| _collect_code_candidates(v, out, max_per_problem) |
| if len(out) >= max_per_problem: |
| return |
|
|
|
|
| def _extract_many_generic_solutions( |
| item: Dict[str, object], |
| *, |
| max_per_problem: int = 6, |
| ) -> List[str]: |
| out: List[str] = [] |
| for key in ("solutions", "solution", "code", "answer", "python", "cpp", "java", "javascript"): |
| _collect_code_candidates(item.get(key), out, max_per_problem) |
| if len(out) >= max_per_problem: |
| break |
| return out |
|
|
|
|
| def _compute_targets(target_size: int, min_problem_samples: int) -> Dict[str, int]: |
| instruction_target = int(target_size * 0.60) |
| structured_target = int(target_size * 0.30) |
| problem_target = target_size - instruction_target - structured_target |
| problem_target = max(problem_target, min_problem_samples) |
| remainder = target_size - problem_target |
| if remainder < 0: |
| raise RuntimeError( |
| f"Invalid target sizing: min_problem_samples={min_problem_samples} exceeds " |
| f"target_size={target_size}." |
| ) |
| instruction_target = int(remainder * (60.0 / 90.0)) |
| structured_target = remainder - instruction_target |
| return { |
| "instruction": instruction_target, |
| "structured": structured_target, |
| "problem": problem_target, |
| } |
|
|
|
|
| def rebalance_final_dataset( |
| *, |
| raw_paths: List[Path], |
| output_path: Path, |
| target_size: int, |
| min_tokens: int, |
| max_tokens: int, |
| min_problem_samples: int, |
| logger, |
| ) -> Dict[str, object]: |
| |
| tmp_dir = output_path.parent / "_rebalance_tmp" |
| ensure_dirs([tmp_dir]) |
|
|
| shard_paths = { |
| "instruction": tmp_dir / "instruction.jsonl", |
| "structured": tmp_dir / "structured.jsonl", |
| "problem": tmp_dir / "problem.jsonl", |
| } |
| for p in shard_paths.values(): |
| if p.exists(): |
| p.unlink() |
|
|
| dedupe_db = tmp_dir / "rebalance_seen.sqlite" |
| if dedupe_db.exists(): |
| dedupe_db.unlink() |
| for suffix in ("-wal", "-shm"): |
| side = dedupe_db.with_name(dedupe_db.name + suffix) |
| if side.exists(): |
| side.unlink() |
|
|
| conn = sqlite3.connect(str(dedupe_db)) |
| conn.execute("PRAGMA journal_mode=WAL;") |
| conn.execute("CREATE TABLE IF NOT EXISTS seen_hashes (h TEXT PRIMARY KEY)") |
|
|
| def is_dup(instruction: str, response: str) -> bool: |
| import hashlib |
|
|
| h = hashlib.sha256(f"{instruction}||{response}".encode("utf-8")).hexdigest() |
| try: |
| conn.execute("INSERT INTO seen_hashes(h) VALUES (?)", (h,)) |
| return False |
| except sqlite3.IntegrityError: |
| return True |
|
|
| shard_counts = Counter() |
| with ( |
| shard_paths["instruction"].open("w", encoding="utf-8") as f_inst, |
| shard_paths["structured"].open("w", encoding="utf-8") as f_struct, |
| shard_paths["problem"].open("w", encoding="utf-8") as f_prob, |
| ): |
| writers = { |
| "instruction": f_inst, |
| "structured": f_struct, |
| "problem": f_prob, |
| } |
| for raw_path in raw_paths: |
| if not raw_path.exists(): |
| continue |
| src_default = raw_path.stem |
| for rec in tqdm(_iter_jsonl(raw_path), desc=f"rebalance_scan:{raw_path.name}", unit="rows"): |
| if "_source" not in rec: |
| rec["_source"] = src_default |
| if "_category" not in rec: |
| rec["_category"] = _source_to_category(src_default) |
| cleaned = clean_record(rec, min_tokens=min_tokens, max_tokens=max_tokens) |
| if cleaned is None: |
| continue |
| if is_dup(cleaned["instruction"], cleaned["response"]): |
| continue |
| cat = cleaned["_category"] |
| if cat not in writers: |
| cat = _source_to_category(cleaned.get("_source", "")) |
| line_obj = { |
| "instruction": cleaned["instruction"], |
| "response": cleaned["response"], |
| "_source": cleaned["_source"], |
| "_category": cat, |
| } |
| writers[cat].write(json.dumps(line_obj, ensure_ascii=False) + "\n") |
| shard_counts[cat] += 1 |
| conn.commit() |
| conn.close() |
|
|
| targets = _compute_targets(target_size=target_size, min_problem_samples=min_problem_samples) |
| logger.info("Rebalance targets: %s (available=%s)", targets, dict(shard_counts)) |
|
|
| source_breakdown = Counter() |
| category_breakdown = Counter() |
| total_tokens = 0 |
| total_samples = 0 |
| problem_real_count = 0 |
| problem_synthetic_count = 0 |
| max_synth_problem = int(targets["problem"] * 0.30) |
|
|
| def write_from_shard(cat: str, needed: int, out_f) -> int: |
| nonlocal total_samples, total_tokens, problem_real_count, problem_synthetic_count |
| written = 0 |
| shard = shard_paths[cat] |
| if not shard.exists(): |
| return 0 |
| with shard.open("r", encoding="utf-8") as f: |
| for line in f: |
| if written >= needed: |
| break |
| obj = json.loads(line) |
| src = obj.get("_source", "unknown") |
| is_problem_synth = cat == "problem" and "codesearchnet_problem_fallback" in src |
| if is_problem_synth and problem_synthetic_count >= max_synth_problem: |
| continue |
| out_f.write( |
| json.dumps( |
| {"instruction": obj["instruction"], "response": obj["response"]}, |
| ensure_ascii=False, |
| ) |
| + "\n" |
| ) |
| written += 1 |
| total_samples += 1 |
| category_breakdown[cat] += 1 |
| source_breakdown[src] += 1 |
| if cat == "problem": |
| if is_problem_synth: |
| problem_synthetic_count += 1 |
| else: |
| problem_real_count += 1 |
| total_tokens += len((obj["instruction"] + " " + obj["response"]).split()) |
| return written |
|
|
| def upsample_shard(cat: str, needed: int, out_f) -> int: |
| nonlocal total_samples, total_tokens, problem_real_count, problem_synthetic_count |
| shard = shard_paths[cat] |
| if not shard.exists() or needed <= 0: |
| return 0 |
| written = 0 |
| while written < needed: |
| made_progress = 0 |
| with shard.open("r", encoding="utf-8") as f: |
| for line in f: |
| if written >= needed: |
| break |
| obj = json.loads(line) |
| src = obj.get("_source", "unknown") |
| is_problem_synth = cat == "problem" and "codesearchnet_problem_fallback" in src |
| if is_problem_synth and problem_synthetic_count >= max_synth_problem: |
| continue |
| out_f.write( |
| json.dumps( |
| {"instruction": obj["instruction"], "response": obj["response"]}, |
| ensure_ascii=False, |
| ) |
| + "\n" |
| ) |
| written += 1 |
| made_progress += 1 |
| total_samples += 1 |
| category_breakdown[cat] += 1 |
| source_breakdown[src] += 1 |
| if cat == "problem": |
| if is_problem_synth: |
| problem_synthetic_count += 1 |
| else: |
| problem_real_count += 1 |
| total_tokens += len((obj["instruction"] + " " + obj["response"]).split()) |
| if made_progress == 0: |
| break |
| return written |
|
|
| with output_path.open("w", encoding="utf-8") as out_f: |
| for cat in ("instruction", "structured", "problem"): |
| want = targets[cat] |
| got = write_from_shard(cat, want, out_f) |
| if got < want: |
| deficit = want - got |
| if cat == "problem": |
| logger.warning( |
| "Category %s shortfall: need=%d got=%d (no upsampling allowed for problem).", |
| cat, |
| want, |
| got, |
| ) |
| else: |
| upsampled = upsample_shard(cat, deficit, out_f) |
| logger.warning( |
| "Category %s shortfall: need=%d got=%d upsampled=%d", |
| cat, |
| want, |
| got, |
| upsampled, |
| ) |
|
|
| inst = category_breakdown["instruction"] |
| struct = category_breakdown["structured"] |
| problem = category_breakdown["problem"] |
| instruction_vs_raw = { |
| "instruction_pct": round(100.0 * inst / max(total_samples, 1), 2), |
| "raw_converted_pct": round(100.0 * (struct + problem) / max(total_samples, 1), 2), |
| } |
| avg_len = round(total_tokens / max(total_samples, 1), 2) |
|
|
| return { |
| "total_samples": total_samples, |
| "avg_length_tokens": avg_len, |
| "source_breakdown": dict(source_breakdown), |
| "category_breakdown": dict(category_breakdown), |
| "instruction_vs_raw_ratio": instruction_vs_raw, |
| "targets": targets, |
| "problem_real_count": problem_real_count, |
| "problem_synthetic_count": problem_synthetic_count, |
| "problem_synthetic_pct": round( |
| 100.0 * problem_synthetic_count / max(problem_real_count + problem_synthetic_count, 1), 2 |
| ), |
| } |
|
|
|
|
| def _try_load_dataset(candidates: List[Dict[str, object]], logger): |
| last_exc: Optional[Exception] = None |
| for cand in candidates: |
| try: |
| ds = load_dataset(**cand) |
| logger.info("Loaded dataset: %s", cand) |
| return ds |
| except Exception as exc: |
| logger.warning("Dataset load failed for %s: %s", cand, exc) |
| last_exc = exc |
| if last_exc: |
| raise last_exc |
| raise RuntimeError("No dataset candidates provided.") |
|
|
|
|
| def fetch_instruction_codealpaca(raw_path: Path, limit: int, logger) -> int: |
| ds = _try_load_dataset( |
| [ |
| {"path": "sahil2801/CodeAlpaca-20k", "split": "train"}, |
| {"path": "HuggingFaceH4/CodeAlpaca_20K", "split": "train"}, |
| ], |
| logger, |
| ) |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(ds, desc="codealpaca", unit="rows"): |
| if emitted >= limit: |
| break |
| instruction = _safe_get(item, ["instruction"]) |
| inp = _safe_get(item, ["input"]) |
| output = _safe_get(item, ["output", "response", "answer"]) |
| if inp: |
| instruction = f"{instruction}\n\nInput:\n{inp}".strip() |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=instruction, |
| response=output, |
| source="codealpaca", |
| category="instruction", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_instruction_evol(raw_path: Path, limit: int, logger) -> int: |
| ds = _try_load_dataset( |
| [ |
| {"path": "nickrosh/Evol-Instruct-Code-80k-v1", "split": "train"}, |
| {"path": "WizardLMTeam/WizardCoder-Evol-Instruct-V2-196k", "split": "train"}, |
| {"path": "ise-uiuc/Magicoder-OSS-Instruct-75K", "split": "train"}, |
| ], |
| logger, |
| ) |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(ds, desc="evol_instruct_code", unit="rows"): |
| if emitted >= limit: |
| break |
| instruction = _safe_get(item, ["instruction", "prompt", "question"]) |
| inp = _safe_get(item, ["input"]) |
| output = _safe_get(item, ["output", "response", "answer"]) |
| if inp: |
| instruction = f"{instruction}\n\nInput:\n{inp}".strip() |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=instruction, |
| response=output, |
| source="evol_instruct_code", |
| category="instruction", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_instruction_ultrachat_code(raw_path: Path, limit: int, logger) -> int: |
| ds = _try_load_dataset( |
| [ |
| {"path": "HuggingFaceH4/ultrachat_200k", "split": "train_sft"}, |
| {"path": "stingning/ultrachat", "split": "train"}, |
| ], |
| logger, |
| ) |
| code_terms = ("python", "javascript", "typescript", "java", "code", "api", "backend", "frontend") |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(ds, desc="ultrachat_code", unit="rows"): |
| if emitted >= limit: |
| break |
| msgs = item.get("messages") or item.get("conversation") or item.get("conversations") |
| if not isinstance(msgs, list) or len(msgs) < 2: |
| continue |
| user = "" |
| assistant = "" |
| for msg in msgs: |
| if not isinstance(msg, dict): |
| continue |
| role = str(msg.get("role", "")).lower() |
| content = str(msg.get("content", "")).strip() |
| if role in {"user", "human"} and not user: |
| user = content |
| if role in {"assistant", "gpt"} and user and not assistant: |
| assistant = content |
| break |
| if not user or not assistant: |
| continue |
| low = (user + " " + assistant).lower() |
| if not any(term in low for term in code_terms): |
| continue |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=user, |
| response=assistant, |
| source="ultrachat_code", |
| category="instruction", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_instruction_openhermes_code(raw_path: Path, limit: int, logger) -> int: |
| ds = _try_load_dataset( |
| [ |
| {"path": "teknium/OpenHermes-2.5", "split": "train"}, |
| {"path": "Open-Orca/OpenOrca", "split": "train"}, |
| ], |
| logger, |
| ) |
| code_terms = ("python", "javascript", "typescript", "java", "code", "function", "api", "fastapi") |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(ds, desc="openhermes_code", unit="rows"): |
| if emitted >= limit: |
| break |
| instruction = _safe_get(item, ["instruction", "question", "prompt"]) |
| response = _safe_get(item, ["output", "response", "answer"]) |
| if (not instruction or not response) and isinstance(item.get("conversations"), list): |
| user = "" |
| assistant = "" |
| for msg in item.get("conversations"): |
| if not isinstance(msg, dict): |
| continue |
| from_role = str(msg.get("from", "")).lower() |
| value = str(msg.get("value", "")).strip() |
| if from_role in {"human", "user"} and not user: |
| user = value |
| if from_role in {"gpt", "assistant"} and user and not assistant: |
| assistant = value |
| break |
| instruction = instruction or user |
| response = response or assistant |
| if not instruction or not response: |
| continue |
| low = (instruction + " " + response).lower() |
| if not any(term in low for term in code_terms): |
| continue |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=instruction, |
| response=response, |
| source="openhermes_code", |
| category="instruction", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_structured_codesearchnet(raw_path: Path, limit: int, logger) -> int: |
| languages = ["python", "javascript", "java"] |
| per_lang = max(1, limit // max(1, len(languages))) |
|
|
| def rows(): |
| emitted = 0 |
| for lang in languages: |
| if emitted >= limit: |
| break |
| ds = None |
| cache_by_lang = Path(f"./data/cache/raw/code_search_net_{lang}") |
| if cache_by_lang.exists(): |
| try: |
| ds = load_from_disk(str(cache_by_lang))["train"] |
| logger.info("Loaded cached CodeSearchNet language=%s from %s", lang, cache_by_lang) |
| except Exception as exc: |
| logger.warning("Failed cached CodeSearchNet for %s: %s", lang, exc) |
| if ds is None: |
| try: |
| ds = load_dataset("code_search_net", lang, split="train", streaming=True) |
| logger.info("Loaded streamed CodeSearchNet language=%s", lang) |
| except Exception as exc: |
| logger.warning("Skipping CodeSearchNet language=%s: %s", lang, exc) |
| continue |
|
|
| lang_count = 0 |
| for item in tqdm(ds, desc=f"codesearchnet_{lang}", unit="rows"): |
| if emitted >= limit or lang_count >= per_lang: |
| break |
| code = _safe_get(item, ["whole_func_string", "code"]) |
| path = _safe_get(item, ["path", "func_name"]) |
| doc = _safe_get(item, ["docstring", "func_documentation_string"]) |
| if not code: |
| continue |
| emitted += 1 |
| lang_count += 1 |
| yield build_instruction_sample( |
| code=code, |
| instruction=doc, |
| language=lang, |
| path=path, |
| source=f"codesearchnet_{lang}", |
| category="structured", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_structured_github_functions(raw_path: Path, limit: int, logger) -> int: |
| ds = None |
| cache_path = Path("./data/cache/raw/code_search_net_python") |
| if cache_path.exists(): |
| ds = load_from_disk(str(cache_path))["train"] |
| logger.info("Using cached GitHub function corpus from %s", cache_path.resolve()) |
| else: |
| ds = load_dataset("code_search_net", "python", split="train", streaming=True) |
| logger.info("Using streamed CodeSearchNet python as GitHub-curated function source.") |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(ds, desc="github_curated_functions", unit="rows"): |
| if emitted >= limit: |
| break |
| code = _safe_get(item, ["whole_func_string", "code", "content"]) |
| path = _safe_get(item, ["path", "func_name"]) |
| repo = _safe_get(item, ["repo", "repository_name"]) |
| doc = _safe_get(item, ["docstring", "func_documentation_string"]) |
| if not code: |
| continue |
| title = f"{repo}/{path}" if repo and path else path |
| emitted += 1 |
| yield build_instruction_sample( |
| code=code, |
| instruction=doc, |
| language="python", |
| path=path, |
| title=title, |
| source="github_curated_functions", |
| category="structured", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_problem_leetcode(raw_path: Path, limit: int, logger) -> int: |
| def rows(): |
| emitted = 0 |
| synth_emitted = 0 |
| candidates = [ |
| ("greengerong/leetcode", {"path": "greengerong/leetcode", "split": "train"}), |
| ("deepmind/code_contests", {"path": "deepmind/code_contests", "split": "train"}), |
| ("codeparrot/apps", {"path": "codeparrot/apps", "split": "train"}), |
| ("google-research-datasets/mbpp", {"path": "google-research-datasets/mbpp", "split": "train"}), |
| ("openai_humaneval", {"path": "openai_humaneval", "split": "test"}), |
| |
| ("open-r1/codeforces", {"path": "open-r1/codeforces", "split": "train", "streaming": True}), |
| ] |
|
|
| |
| local_problem_files = sorted(RAW_DIR.glob("codeforces*.jsonl")) + sorted( |
| RAW_DIR.glob("problem_solution*.jsonl") |
| ) |
| if not local_problem_files: |
| logger.warning( |
| "Codeforces dataset missing – recommended for production quality." |
| ) |
| for local_file in local_problem_files: |
| if emitted >= limit: |
| break |
| for item in tqdm(_iter_jsonl(local_file), desc=f"problem_local:{local_file.name}", unit="rows"): |
| if emitted >= limit: |
| break |
| problem = _safe_get(item, ["problem", "instruction", "statement", "question"]) |
| solution = _safe_get(item, ["solution", "response", "answer", "code"]) |
| if not problem or not solution: |
| continue |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=f"Solve the following problem:\n\n{problem}", |
| response=solution, |
| source="codeforces_local", |
| category="problem", |
| ) |
|
|
| for source_name, cand in candidates: |
| if emitted >= limit: |
| break |
| try: |
| ds = load_dataset(**cand) |
| logger.info("Loaded problem dataset: %s", cand) |
| except Exception as exc: |
| logger.warning("Problem dataset load failed for %s: %s", cand, exc) |
| if source_name == "codeparrot/apps": |
| apps_local = sorted(RAW_DIR.glob("apps*.jsonl")) + sorted(RAW_DIR.glob("apps*.json")) |
| if not apps_local: |
| logger.warning( |
| "APPS dataset unavailable via HF and local APPS JSON missing in ./data/raw." |
| ) |
| for local_file in apps_local: |
| if emitted >= limit: |
| break |
| for item in tqdm( |
| _iter_jsonl(local_file), |
| desc=f"problem_apps_local:{local_file.name}", |
| unit="rows", |
| ): |
| if emitted >= limit: |
| break |
| problem = _safe_get(item, ["question", "prompt", "problem", "statement"]) |
| solution = _safe_get(item, ["solution", "answer", "code"]) |
| if not problem or not solution: |
| continue |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=f"Solve the following problem:\n\n{problem}", |
| response=solution, |
| source="problem_apps_local", |
| category="problem", |
| ) |
| continue |
| for item in tqdm(ds, desc=f"problem_{source_name}", unit="rows"): |
| if emitted >= limit: |
| break |
| title = _safe_get(item, ["title", "name", "problem_id", "task_id"]) |
| base_instruction = "" |
| solutions: List[str] = [] |
| if source_name.endswith("mbpp"): |
| problem = _safe_get(item, ["text"]) |
| tests = item.get("test_list") or [] |
| test_blob = "\n".join(tests) if isinstance(tests, list) else _decode_text(tests) |
| if test_blob: |
| problem = f"{problem}\n\nTests:\n{test_blob}" |
| sol = _safe_get(item, ["code"]) |
| solutions = [sol] if sol else [] |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| elif source_name.endswith("humaneval"): |
| problem = _safe_get(item, ["prompt"]) |
| tests = _safe_get(item, ["test"]) |
| if tests: |
| problem = f"{problem}\n\nTests:\n{tests}" |
| sol = _safe_get(item, ["canonical_solution"]) |
| solutions = [sol] if sol else [] |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| elif source_name.endswith("code_contests"): |
| problem = _safe_get(item, ["description", "problem", "question", "prompt"]) |
| solutions = _extract_many_code_contests_solutions(item, max_per_problem=6) |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| elif source_name.endswith("apps"): |
| problem = _safe_get(item, ["question", "problem", "prompt", "statement"]) |
| solutions = _extract_many_apps_solutions(item, max_per_problem=5) |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| elif source_name.endswith("open-r1/codeforces"): |
| problem = _safe_get( |
| item, |
| ["problem", "statement", "question", "prompt", "description", "content"], |
| ) |
| solutions = _extract_many_generic_solutions(item, max_per_problem=6) |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| else: |
| problem = _safe_get(item, ["content", "description", "question", "prompt", "statement"]) |
| langs = [ |
| _safe_get(item, ["python"]), |
| _safe_get(item, ["javascript"]), |
| _safe_get(item, ["java"]), |
| _safe_get(item, ["c++"]), |
| _safe_get(item, ["answer"]), |
| _safe_get(item, ["code"]), |
| ] |
| solutions = [s for s in langs if s] |
| if isinstance(item.get("solutions"), list): |
| for extra in item["solutions"]: |
| t = _decode_text(extra).strip() |
| if t and t not in solutions: |
| solutions.append(t) |
| base_instruction = f"Solve this coding problem: {title}\n\n{problem}" |
| if not problem or not solutions: |
| continue |
| for sol in solutions: |
| if emitted >= limit: |
| break |
| if not sol or len(sol.strip()) < 20: |
| continue |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=base_instruction, |
| response=sol, |
| source=f"problem_{source_name.replace('/', '_')}", |
| category="problem", |
| ) |
|
|
| |
| if emitted < limit: |
| synth_cap = int(limit * 0.30) |
| cache_path = Path("./data/cache/raw/code_search_net_python") |
| ds = None |
| if cache_path.exists(): |
| try: |
| ds = load_from_disk(str(cache_path))["train"] |
| logger.info("Using cached CodeSearchNet Python for problem fallback.") |
| except Exception: |
| ds = None |
| if ds is None: |
| try: |
| ds = load_dataset("code_search_net", "python", split="train", streaming=True) |
| logger.info("Using streamed CodeSearchNet Python for problem fallback.") |
| except Exception as exc: |
| logger.warning("Problem fallback CodeSearchNet failed: %s", exc) |
| ds = None |
| if ds is not None: |
| for item in tqdm(ds, desc="problem_codesearchnet_fallback", unit="rows"): |
| if emitted >= limit or synth_emitted >= synth_cap: |
| break |
| doc = _safe_get(item, ["docstring", "func_documentation_string"]) |
| code = _safe_get(item, ["whole_func_string", "code"]) |
| if len(doc.strip()) < 30 or not code: |
| continue |
| emitted += 1 |
| synth_emitted += 1 |
| yield build_instruction_sample( |
| instruction=f"Solve the following programming task:\n\n{doc}", |
| response=code, |
| source="codesearchnet_problem_fallback", |
| category="problem", |
| ) |
|
|
| return _write_jsonl(raw_path, rows()) |
|
|
|
|
| def fetch_problem_codeforces(raw_path: Path, limit: int, logger) -> int: |
| source_file = RAW_DIR / "codeforces.jsonl" |
| if not source_file.exists(): |
| logger.warning("Codeforces dataset file not found: %s", source_file.resolve()) |
| return 0 |
|
|
| def rows(): |
| emitted = 0 |
| for item in tqdm(_iter_jsonl(source_file), desc="problem_codeforces", unit="rows"): |
| if emitted >= limit: |
| break |
| instruction = _safe_get(item, ["instruction", "problem", "statement", "question"]) |
| response = _safe_get(item, ["response", "solution", "answer", "code"]) |
| if not instruction or not response: |
| continue |
| if not instruction.lower().startswith("solve the following problem"): |
| instruction = f"Solve the following problem:\n{instruction}" |
| emitted += 1 |
| yield build_instruction_sample( |
| instruction=instruction, |
| response=response, |
| source="codeforces_dataset", |
| category="problem", |
| ) |
|
|
| count = _write_jsonl(raw_path, rows()) |
| logger.info("Loaded Codeforces pre-ingested samples: %d", count) |
| return count |
|
|
|
|
| def build_dataset(args) -> Path: |
| ensure_dirs([RAW_DIR, FINAL_DIR, LOG_DIR]) |
| logger = setup_logger("data_fetch_build", LOG_DIR / "data_fetch.log") |
|
|
| logger.info("Starting production dataset build. target_size=%d", args.target_size) |
| logger.info("Raw dir: %s", RAW_DIR.resolve()) |
| logger.info("Final dir: %s", FINAL_DIR.resolve()) |
|
|
| fetch_plan = { |
| "codealpaca": (fetch_instruction_codealpaca, args.codealpaca_limit), |
| "evol_instruct_code": (fetch_instruction_evol, args.evol_limit), |
| "ultrachat_code": (fetch_instruction_ultrachat_code, args.ultrachat_limit), |
| "openhermes_code": (fetch_instruction_openhermes_code, min(args.openhermes_limit, 120_000)), |
| "codesearchnet_multilang": (fetch_structured_codesearchnet, args.codesearchnet_limit), |
| "github_curated_functions": (fetch_structured_github_functions, args.github_limit), |
| "codeforces_problem": (fetch_problem_codeforces, args.codeforces_limit), |
| "leetcode_competitive": (fetch_problem_leetcode, args.leetcode_limit), |
| } |
|
|
| raw_paths: List[Path] = [] |
| if not args.skip_fetch: |
| for name, (fn, limit) in fetch_plan.items(): |
| raw_path = RAW_DIR / f"{name}.jsonl" |
| raw_paths.append(raw_path) |
| try: |
| count = fn(raw_path, limit, logger) |
| logger.info("Fetched %d rows for source=%s", count, name) |
| except Exception as exc: |
| logger.warning("Skipping source=%s due to fetch error: %s", name, exc) |
| else: |
| raw_paths = sorted(RAW_DIR.glob("*.jsonl")) |
| logger.info("Skip fetch enabled. Using existing raw files: %d", len(raw_paths)) |
|
|
| |
| stats = build_balanced_dataset( |
| input_paths=raw_paths, |
| output_path=FINAL_TRAIN, |
| target_size=args.target_size, |
| min_tokens=args.min_tokens, |
| max_tokens=args.max_tokens, |
| num_workers=args.workers, |
| category_weights={"instruction": 0.60, "structured": 0.30, "problem": 0.10}, |
| sqlite_path=FINAL_DIR / "dedupe_hashes.sqlite", |
| ) |
|
|
| |
| rebalance_stats = rebalance_final_dataset( |
| raw_paths=raw_paths, |
| output_path=FINAL_TRAIN, |
| target_size=args.target_size, |
| min_tokens=args.min_tokens, |
| max_tokens=args.max_tokens, |
| min_problem_samples=args.min_problem_samples, |
| logger=logger, |
| ) |
|
|
| actual_problem = int(rebalance_stats["category_breakdown"].get("problem", 0)) |
| required_problem = int(args.min_problem_samples) |
| real_problem = int(rebalance_stats.get("problem_real_count", 0)) |
| synthetic_problem = int(rebalance_stats.get("problem_synthetic_count", 0)) |
| synthetic_ratio = synthetic_problem / max(actual_problem, 1) |
|
|
| if actual_problem < max(required_problem, args.min_total_problem_samples): |
| raise RuntimeError( |
| "Build aborted: insufficient problem-solving data after rebalance. " |
| f"Required >= {max(required_problem, args.min_total_problem_samples)}, actual = {actual_problem}. " |
| "Increase problem dataset sources (e.g., leetcode/code contests/problem-solution datasets) " |
| "or raise problem fetch limits, then rebuild." |
| ) |
| if real_problem < args.min_real_problem_samples: |
| raise RuntimeError( |
| "Build aborted: insufficient REAL problem-solving data after rebalance. " |
| f"Required real >= {args.min_real_problem_samples}, actual real = {real_problem}. " |
| "Add more high-quality real problem datasets (APPS/CodeContests/Codeforces/LeetCode)." |
| ) |
| if synthetic_ratio > args.max_synthetic_problem_ratio: |
| raise RuntimeError( |
| "Build aborted: synthetic problem share too high. " |
| f"Allowed <= {args.max_synthetic_problem_ratio:.0%}, actual = {synthetic_ratio:.2%}. " |
| "Increase real problem sources and reduce synthetic fallback usage." |
| ) |
|
|
| logger.info("Build complete. Final dataset: %s", FINAL_TRAIN.resolve()) |
| logger.info("Base stats: %s", stats) |
| logger.info("Rebalanced stats: %s", rebalance_stats) |
|
|
| print(f"Final dataset: {FINAL_TRAIN.resolve()}") |
| print(f"Total samples: {rebalance_stats['total_samples']}") |
| print(f"Avg length (tokens est.): {rebalance_stats['avg_length_tokens']}") |
| print("Per-source breakdown:") |
| for src, count in sorted( |
| rebalance_stats["source_breakdown"].items(), key=lambda x: x[1], reverse=True |
| ): |
| print(f" - {src}: {count}") |
| print("Category breakdown:") |
| for cat, count in sorted(rebalance_stats["category_breakdown"].items(), key=lambda x: x[0]): |
| print(f" - {cat}: {count} (target: {rebalance_stats['targets'].get(cat, 0)})") |
| ratio = rebalance_stats["instruction_vs_raw_ratio"] |
| print( |
| f"Instruction vs raw-converted ratio: {ratio['instruction_pct']}% / {ratio['raw_converted_pct']}%" |
| ) |
| total = max(1, rebalance_stats["total_samples"]) |
| print("Category percentages:") |
| for cat in ("instruction", "structured", "problem"): |
| pct = 100.0 * rebalance_stats["category_breakdown"].get(cat, 0) / total |
| print(f" - {cat}: {pct:.2f}%") |
| print(f"Real problem count: {real_problem}") |
| print(f"Synthetic problem count: {synthetic_problem}") |
| print(f"Synthetic problem %: {synthetic_ratio * 100:.2f}%") |
| return FINAL_TRAIN |
|
|
|
|
| def _build_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser(description="Production-grade coding dataset build pipeline.") |
| parser.add_argument("--build", action="store_true", help="Run the full build pipeline.") |
| parser.add_argument("--target-size", type=int, default=1_000_000) |
| parser.add_argument("--min-tokens", type=int, default=10) |
| parser.add_argument("--max-tokens", type=int, default=2048) |
| parser.add_argument("--skip-fetch", action="store_true", help="Use existing ./data/raw/*.jsonl only.") |
| parser.add_argument( |
| "--workers", |
| type=int, |
| default=max(1, (os.cpu_count() or 4) // 2), |
| help="Parallel worker processes for cleaning stage.", |
| ) |
|
|
| parser.add_argument("--codealpaca-limit", type=int, default=20000) |
| parser.add_argument("--evol-limit", type=int, default=300000) |
| parser.add_argument("--ultrachat-limit", type=int, default=250000) |
| parser.add_argument("--openhermes-limit", type=int, default=250000) |
| parser.add_argument("--codesearchnet-limit", type=int, default=300000) |
| parser.add_argument("--github-limit", type=int, default=200000) |
| parser.add_argument("--codeforces-limit", type=int, default=200000) |
| parser.add_argument("--leetcode-limit", type=int, default=300000) |
| parser.add_argument( |
| "--stackoverflow-limit", |
| type=int, |
| default=0, |
| help="Deprecated. StackOverflow sources were removed due unreliability.", |
| ) |
| parser.add_argument( |
| "--min-problem-samples", |
| type=int, |
| default=50_000, |
| help="Ensure at least this many samples in problem category during post-rebalance.", |
| ) |
| parser.add_argument( |
| "--min-real-problem-samples", |
| type=int, |
| default=50_000, |
| help="Minimum REAL problem samples required after rebalance.", |
| ) |
| parser.add_argument( |
| "--min-total-problem-samples", |
| type=int, |
| default=80_000, |
| help="Minimum total problem samples required after rebalance.", |
| ) |
| parser.add_argument( |
| "--max-synthetic-problem-ratio", |
| type=float, |
| default=0.30, |
| help="Maximum allowed synthetic (docstring fallback) share in problem category.", |
| ) |
| return parser |
|
|
|
|
| if __name__ == "__main__": |
| parser = _build_parser() |
| args = parser.parse_args() |
| if args.build: |
| build_dataset(args) |
| else: |
| parser.print_help() |
|
|