import argparse import json import os import re import sqlite3 from collections import Counter from pathlib import Path from typing import Dict, Iterable, List, Optional from datasets import load_dataset, load_from_disk from tqdm import tqdm from dataset_cleaner import build_balanced_dataset, clean_record from dataset_formatter import build_instruction_sample from utils import ensure_dirs, setup_logger RAW_DIR = Path("./data/raw") FINAL_DIR = Path("./data/final") FINAL_TRAIN = FINAL_DIR / "train.jsonl" LOG_DIR = Path("./logs") def _safe_get(item: Dict[str, object], keys: Iterable[str]) -> str: for key in keys: value = item.get(key) if value: return str(value) return "" def _write_jsonl(path: Path, rows: Iterable[Dict[str, str]]) -> int: path.parent.mkdir(parents=True, exist_ok=True) count = 0 with path.open("w", encoding="utf-8") as f: for row in rows: if not row.get("instruction") or not row.get("response"): continue f.write(json.dumps(row, ensure_ascii=False) + "\n") count += 1 return count def _iter_jsonl(path: Path) -> Iterable[Dict[str, object]]: with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError: continue def _source_to_category(source_name: str) -> str: s = source_name.lower() if any(k in s for k in ("codealpaca", "evol", "ultrachat", "openhermes", "orca")): return "instruction" if any( k in s for k in ( "leetcode", "contest", "problem", "mbpp", "humaneval", "apps", "codeforces", "codesearchnet_problem", ) ): return "problem" return "structured" def _decode_text(value) -> str: if value is None: return "" if isinstance(value, str): return value if isinstance(value, bytes): return value.decode("utf-8", errors="ignore") return str(value) def _extract_solution_from_code_contests(item: Dict[str, object]) -> str: sols = item.get("solutions") if isinstance(sols, dict): # Typical schema: {"language": [...], "solution": [bytes...]} cand = sols.get("solution") if isinstance(cand, list): # Prefer Python-looking snippets when possible. for s in cand: t = _decode_text(s) if re.search(r"\bdef\b|\bimport\b|\bprint\(", t): return t if cand: return _decode_text(cand[0]) if isinstance(sols, list) and sols: return _decode_text(sols[0]) return _safe_get(item, ["solution", "answer", "code"]) def _extract_many_code_contests_solutions(item: Dict[str, object], max_per_problem: int = 6) -> List[str]: out: List[str] = [] sols = item.get("solutions") if isinstance(sols, dict): cand = sols.get("solution") if isinstance(cand, list): for s in cand: t = _decode_text(s).strip() if not t: continue if t not in out: out.append(t) if len(out) >= max_per_problem: break if not out: one = _extract_solution_from_code_contests(item).strip() if one: out.append(one) return out def _extract_many_apps_solutions(item: Dict[str, object], max_per_problem: int = 5) -> List[str]: out: List[str] = [] for key in ("solutions", "solution", "answer", "code"): val = item.get(key) if isinstance(val, list): for x in val: t = _decode_text(x).strip() if t and t not in out: out.append(t) if len(out) >= max_per_problem: return out elif isinstance(val, dict): for x in val.values(): if isinstance(x, list): for y in x: t = _decode_text(y).strip() if t and t not in out: out.append(t) if len(out) >= max_per_problem: return out else: t = _decode_text(val).strip() if t and t not in out: out.append(t) if len(out) >= max_per_problem: return out return out def _collect_code_candidates(value, out: List[str], max_per_problem: int) -> None: if len(out) >= max_per_problem: return if value is None: return if isinstance(value, str): v = value.strip() if v and v not in out: out.append(v) return if isinstance(value, bytes): v = _decode_text(value).strip() if v and v not in out: out.append(v) return if isinstance(value, list): for x in value: _collect_code_candidates(x, out, max_per_problem) if len(out) >= max_per_problem: return return if isinstance(value, dict): for k in ("solution", "solutions", "code", "answer", "python", "cpp", "java", "javascript"): if k in value: _collect_code_candidates(value.get(k), out, max_per_problem) if len(out) >= max_per_problem: return for v in value.values(): _collect_code_candidates(v, out, max_per_problem) if len(out) >= max_per_problem: return def _extract_many_generic_solutions( item: Dict[str, object], *, max_per_problem: int = 6, ) -> List[str]: out: List[str] = [] for key in ("solutions", "solution", "code", "answer", "python", "cpp", "java", "javascript"): _collect_code_candidates(item.get(key), out, max_per_problem) if len(out) >= max_per_problem: break return out def _compute_targets(target_size: int, min_problem_samples: int) -> Dict[str, int]: instruction_target = int(target_size * 0.60) structured_target = int(target_size * 0.30) problem_target = target_size - instruction_target - structured_target problem_target = max(problem_target, min_problem_samples) remainder = target_size - problem_target if remainder < 0: raise RuntimeError( f"Invalid target sizing: min_problem_samples={min_problem_samples} exceeds " f"target_size={target_size}." ) instruction_target = int(remainder * (60.0 / 90.0)) structured_target = remainder - instruction_target return { "instruction": instruction_target, "structured": structured_target, "problem": problem_target, } def rebalance_final_dataset( *, raw_paths: List[Path], output_path: Path, target_size: int, min_tokens: int, max_tokens: int, min_problem_samples: int, logger, ) -> Dict[str, object]: # Post-build rebalance using streaming + temp shards, then exact down/upsample. tmp_dir = output_path.parent / "_rebalance_tmp" ensure_dirs([tmp_dir]) shard_paths = { "instruction": tmp_dir / "instruction.jsonl", "structured": tmp_dir / "structured.jsonl", "problem": tmp_dir / "problem.jsonl", } for p in shard_paths.values(): if p.exists(): p.unlink() dedupe_db = tmp_dir / "rebalance_seen.sqlite" if dedupe_db.exists(): dedupe_db.unlink() for suffix in ("-wal", "-shm"): side = dedupe_db.with_name(dedupe_db.name + suffix) if side.exists(): side.unlink() conn = sqlite3.connect(str(dedupe_db)) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("CREATE TABLE IF NOT EXISTS seen_hashes (h TEXT PRIMARY KEY)") def is_dup(instruction: str, response: str) -> bool: import hashlib h = hashlib.sha256(f"{instruction}||{response}".encode("utf-8")).hexdigest() try: conn.execute("INSERT INTO seen_hashes(h) VALUES (?)", (h,)) return False except sqlite3.IntegrityError: return True shard_counts = Counter() with ( shard_paths["instruction"].open("w", encoding="utf-8") as f_inst, shard_paths["structured"].open("w", encoding="utf-8") as f_struct, shard_paths["problem"].open("w", encoding="utf-8") as f_prob, ): writers = { "instruction": f_inst, "structured": f_struct, "problem": f_prob, } for raw_path in raw_paths: if not raw_path.exists(): continue src_default = raw_path.stem for rec in tqdm(_iter_jsonl(raw_path), desc=f"rebalance_scan:{raw_path.name}", unit="rows"): if "_source" not in rec: rec["_source"] = src_default if "_category" not in rec: rec["_category"] = _source_to_category(src_default) cleaned = clean_record(rec, min_tokens=min_tokens, max_tokens=max_tokens) if cleaned is None: continue if is_dup(cleaned["instruction"], cleaned["response"]): continue cat = cleaned["_category"] if cat not in writers: cat = _source_to_category(cleaned.get("_source", "")) line_obj = { "instruction": cleaned["instruction"], "response": cleaned["response"], "_source": cleaned["_source"], "_category": cat, } writers[cat].write(json.dumps(line_obj, ensure_ascii=False) + "\n") shard_counts[cat] += 1 conn.commit() conn.close() targets = _compute_targets(target_size=target_size, min_problem_samples=min_problem_samples) logger.info("Rebalance targets: %s (available=%s)", targets, dict(shard_counts)) source_breakdown = Counter() category_breakdown = Counter() total_tokens = 0 total_samples = 0 problem_real_count = 0 problem_synthetic_count = 0 max_synth_problem = int(targets["problem"] * 0.30) def write_from_shard(cat: str, needed: int, out_f) -> int: nonlocal total_samples, total_tokens, problem_real_count, problem_synthetic_count written = 0 shard = shard_paths[cat] if not shard.exists(): return 0 with shard.open("r", encoding="utf-8") as f: for line in f: if written >= needed: break obj = json.loads(line) src = obj.get("_source", "unknown") is_problem_synth = cat == "problem" and "codesearchnet_problem_fallback" in src if is_problem_synth and problem_synthetic_count >= max_synth_problem: continue out_f.write( json.dumps( {"instruction": obj["instruction"], "response": obj["response"]}, ensure_ascii=False, ) + "\n" ) written += 1 total_samples += 1 category_breakdown[cat] += 1 source_breakdown[src] += 1 if cat == "problem": if is_problem_synth: problem_synthetic_count += 1 else: problem_real_count += 1 total_tokens += len((obj["instruction"] + " " + obj["response"]).split()) return written def upsample_shard(cat: str, needed: int, out_f) -> int: nonlocal total_samples, total_tokens, problem_real_count, problem_synthetic_count shard = shard_paths[cat] if not shard.exists() or needed <= 0: return 0 written = 0 while written < needed: made_progress = 0 with shard.open("r", encoding="utf-8") as f: for line in f: if written >= needed: break obj = json.loads(line) src = obj.get("_source", "unknown") is_problem_synth = cat == "problem" and "codesearchnet_problem_fallback" in src if is_problem_synth and problem_synthetic_count >= max_synth_problem: continue out_f.write( json.dumps( {"instruction": obj["instruction"], "response": obj["response"]}, ensure_ascii=False, ) + "\n" ) written += 1 made_progress += 1 total_samples += 1 category_breakdown[cat] += 1 source_breakdown[src] += 1 if cat == "problem": if is_problem_synth: problem_synthetic_count += 1 else: problem_real_count += 1 total_tokens += len((obj["instruction"] + " " + obj["response"]).split()) if made_progress == 0: break return written with output_path.open("w", encoding="utf-8") as out_f: for cat in ("instruction", "structured", "problem"): want = targets[cat] got = write_from_shard(cat, want, out_f) if got < want: deficit = want - got if cat == "problem": logger.warning( "Category %s shortfall: need=%d got=%d (no upsampling allowed for problem).", cat, want, got, ) else: upsampled = upsample_shard(cat, deficit, out_f) logger.warning( "Category %s shortfall: need=%d got=%d upsampled=%d", cat, want, got, upsampled, ) inst = category_breakdown["instruction"] struct = category_breakdown["structured"] problem = category_breakdown["problem"] instruction_vs_raw = { "instruction_pct": round(100.0 * inst / max(total_samples, 1), 2), "raw_converted_pct": round(100.0 * (struct + problem) / max(total_samples, 1), 2), } avg_len = round(total_tokens / max(total_samples, 1), 2) return { "total_samples": total_samples, "avg_length_tokens": avg_len, "source_breakdown": dict(source_breakdown), "category_breakdown": dict(category_breakdown), "instruction_vs_raw_ratio": instruction_vs_raw, "targets": targets, "problem_real_count": problem_real_count, "problem_synthetic_count": problem_synthetic_count, "problem_synthetic_pct": round( 100.0 * problem_synthetic_count / max(problem_real_count + problem_synthetic_count, 1), 2 ), } def _try_load_dataset(candidates: List[Dict[str, object]], logger): last_exc: Optional[Exception] = None for cand in candidates: try: ds = load_dataset(**cand) logger.info("Loaded dataset: %s", cand) return ds except Exception as exc: logger.warning("Dataset load failed for %s: %s", cand, exc) last_exc = exc if last_exc: raise last_exc raise RuntimeError("No dataset candidates provided.") def fetch_instruction_codealpaca(raw_path: Path, limit: int, logger) -> int: ds = _try_load_dataset( [ {"path": "sahil2801/CodeAlpaca-20k", "split": "train"}, {"path": "HuggingFaceH4/CodeAlpaca_20K", "split": "train"}, ], logger, ) def rows(): emitted = 0 for item in tqdm(ds, desc="codealpaca", unit="rows"): if emitted >= limit: break instruction = _safe_get(item, ["instruction"]) inp = _safe_get(item, ["input"]) output = _safe_get(item, ["output", "response", "answer"]) if inp: instruction = f"{instruction}\n\nInput:\n{inp}".strip() emitted += 1 yield build_instruction_sample( instruction=instruction, response=output, source="codealpaca", category="instruction", ) return _write_jsonl(raw_path, rows()) def fetch_instruction_evol(raw_path: Path, limit: int, logger) -> int: ds = _try_load_dataset( [ {"path": "nickrosh/Evol-Instruct-Code-80k-v1", "split": "train"}, {"path": "WizardLMTeam/WizardCoder-Evol-Instruct-V2-196k", "split": "train"}, {"path": "ise-uiuc/Magicoder-OSS-Instruct-75K", "split": "train"}, ], logger, ) def rows(): emitted = 0 for item in tqdm(ds, desc="evol_instruct_code", unit="rows"): if emitted >= limit: break instruction = _safe_get(item, ["instruction", "prompt", "question"]) inp = _safe_get(item, ["input"]) output = _safe_get(item, ["output", "response", "answer"]) if inp: instruction = f"{instruction}\n\nInput:\n{inp}".strip() emitted += 1 yield build_instruction_sample( instruction=instruction, response=output, source="evol_instruct_code", category="instruction", ) return _write_jsonl(raw_path, rows()) def fetch_instruction_ultrachat_code(raw_path: Path, limit: int, logger) -> int: ds = _try_load_dataset( [ {"path": "HuggingFaceH4/ultrachat_200k", "split": "train_sft"}, {"path": "stingning/ultrachat", "split": "train"}, ], logger, ) code_terms = ("python", "javascript", "typescript", "java", "code", "api", "backend", "frontend") def rows(): emitted = 0 for item in tqdm(ds, desc="ultrachat_code", unit="rows"): if emitted >= limit: break msgs = item.get("messages") or item.get("conversation") or item.get("conversations") if not isinstance(msgs, list) or len(msgs) < 2: continue user = "" assistant = "" for msg in msgs: if not isinstance(msg, dict): continue role = str(msg.get("role", "")).lower() content = str(msg.get("content", "")).strip() if role in {"user", "human"} and not user: user = content if role in {"assistant", "gpt"} and user and not assistant: assistant = content break if not user or not assistant: continue low = (user + " " + assistant).lower() if not any(term in low for term in code_terms): continue emitted += 1 yield build_instruction_sample( instruction=user, response=assistant, source="ultrachat_code", category="instruction", ) return _write_jsonl(raw_path, rows()) def fetch_instruction_openhermes_code(raw_path: Path, limit: int, logger) -> int: ds = _try_load_dataset( [ {"path": "teknium/OpenHermes-2.5", "split": "train"}, {"path": "Open-Orca/OpenOrca", "split": "train"}, ], logger, ) code_terms = ("python", "javascript", "typescript", "java", "code", "function", "api", "fastapi") def rows(): emitted = 0 for item in tqdm(ds, desc="openhermes_code", unit="rows"): if emitted >= limit: break instruction = _safe_get(item, ["instruction", "question", "prompt"]) response = _safe_get(item, ["output", "response", "answer"]) if (not instruction or not response) and isinstance(item.get("conversations"), list): user = "" assistant = "" for msg in item.get("conversations"): if not isinstance(msg, dict): continue from_role = str(msg.get("from", "")).lower() value = str(msg.get("value", "")).strip() if from_role in {"human", "user"} and not user: user = value if from_role in {"gpt", "assistant"} and user and not assistant: assistant = value break instruction = instruction or user response = response or assistant if not instruction or not response: continue low = (instruction + " " + response).lower() if not any(term in low for term in code_terms): continue emitted += 1 yield build_instruction_sample( instruction=instruction, response=response, source="openhermes_code", category="instruction", ) return _write_jsonl(raw_path, rows()) def fetch_structured_codesearchnet(raw_path: Path, limit: int, logger) -> int: languages = ["python", "javascript", "java"] per_lang = max(1, limit // max(1, len(languages))) def rows(): emitted = 0 for lang in languages: if emitted >= limit: break ds = None cache_by_lang = Path(f"./data/cache/raw/code_search_net_{lang}") if cache_by_lang.exists(): try: ds = load_from_disk(str(cache_by_lang))["train"] logger.info("Loaded cached CodeSearchNet language=%s from %s", lang, cache_by_lang) except Exception as exc: logger.warning("Failed cached CodeSearchNet for %s: %s", lang, exc) if ds is None: try: ds = load_dataset("code_search_net", lang, split="train", streaming=True) logger.info("Loaded streamed CodeSearchNet language=%s", lang) except Exception as exc: logger.warning("Skipping CodeSearchNet language=%s: %s", lang, exc) continue lang_count = 0 for item in tqdm(ds, desc=f"codesearchnet_{lang}", unit="rows"): if emitted >= limit or lang_count >= per_lang: break code = _safe_get(item, ["whole_func_string", "code"]) path = _safe_get(item, ["path", "func_name"]) doc = _safe_get(item, ["docstring", "func_documentation_string"]) if not code: continue emitted += 1 lang_count += 1 yield build_instruction_sample( code=code, instruction=doc, language=lang, path=path, source=f"codesearchnet_{lang}", category="structured", ) return _write_jsonl(raw_path, rows()) def fetch_structured_github_functions(raw_path: Path, limit: int, logger) -> int: ds = None cache_path = Path("./data/cache/raw/code_search_net_python") if cache_path.exists(): ds = load_from_disk(str(cache_path))["train"] logger.info("Using cached GitHub function corpus from %s", cache_path.resolve()) else: ds = load_dataset("code_search_net", "python", split="train", streaming=True) logger.info("Using streamed CodeSearchNet python as GitHub-curated function source.") def rows(): emitted = 0 for item in tqdm(ds, desc="github_curated_functions", unit="rows"): if emitted >= limit: break code = _safe_get(item, ["whole_func_string", "code", "content"]) path = _safe_get(item, ["path", "func_name"]) repo = _safe_get(item, ["repo", "repository_name"]) doc = _safe_get(item, ["docstring", "func_documentation_string"]) if not code: continue title = f"{repo}/{path}" if repo and path else path emitted += 1 yield build_instruction_sample( code=code, instruction=doc, language="python", path=path, title=title, source="github_curated_functions", category="structured", ) return _write_jsonl(raw_path, rows()) def fetch_problem_leetcode(raw_path: Path, limit: int, logger) -> int: def rows(): emitted = 0 synth_emitted = 0 candidates = [ ("greengerong/leetcode", {"path": "greengerong/leetcode", "split": "train"}), ("deepmind/code_contests", {"path": "deepmind/code_contests", "split": "train"}), ("codeparrot/apps", {"path": "codeparrot/apps", "split": "train"}), ("google-research-datasets/mbpp", {"path": "google-research-datasets/mbpp", "split": "train"}), ("openai_humaneval", {"path": "openai_humaneval", "split": "test"}), # Streamed high-volume real problem source; avoid full git clone. ("open-r1/codeforces", {"path": "open-r1/codeforces", "split": "train", "streaming": True}), ] # Optional local codeforces/problem-solution JSONL fallback. local_problem_files = sorted(RAW_DIR.glob("codeforces*.jsonl")) + sorted( RAW_DIR.glob("problem_solution*.jsonl") ) if not local_problem_files: logger.warning( "Codeforces dataset missing – recommended for production quality." ) for local_file in local_problem_files: if emitted >= limit: break for item in tqdm(_iter_jsonl(local_file), desc=f"problem_local:{local_file.name}", unit="rows"): if emitted >= limit: break problem = _safe_get(item, ["problem", "instruction", "statement", "question"]) solution = _safe_get(item, ["solution", "response", "answer", "code"]) if not problem or not solution: continue emitted += 1 yield build_instruction_sample( instruction=f"Solve the following problem:\n\n{problem}", response=solution, source="codeforces_local", category="problem", ) for source_name, cand in candidates: if emitted >= limit: break try: ds = load_dataset(**cand) logger.info("Loaded problem dataset: %s", cand) except Exception as exc: logger.warning("Problem dataset load failed for %s: %s", cand, exc) if source_name == "codeparrot/apps": apps_local = sorted(RAW_DIR.glob("apps*.jsonl")) + sorted(RAW_DIR.glob("apps*.json")) if not apps_local: logger.warning( "APPS dataset unavailable via HF and local APPS JSON missing in ./data/raw." ) for local_file in apps_local: if emitted >= limit: break for item in tqdm( _iter_jsonl(local_file), desc=f"problem_apps_local:{local_file.name}", unit="rows", ): if emitted >= limit: break problem = _safe_get(item, ["question", "prompt", "problem", "statement"]) solution = _safe_get(item, ["solution", "answer", "code"]) if not problem or not solution: continue emitted += 1 yield build_instruction_sample( instruction=f"Solve the following problem:\n\n{problem}", response=solution, source="problem_apps_local", category="problem", ) continue for item in tqdm(ds, desc=f"problem_{source_name}", unit="rows"): if emitted >= limit: break title = _safe_get(item, ["title", "name", "problem_id", "task_id"]) base_instruction = "" solutions: List[str] = [] if source_name.endswith("mbpp"): problem = _safe_get(item, ["text"]) tests = item.get("test_list") or [] test_blob = "\n".join(tests) if isinstance(tests, list) else _decode_text(tests) if test_blob: problem = f"{problem}\n\nTests:\n{test_blob}" sol = _safe_get(item, ["code"]) solutions = [sol] if sol else [] base_instruction = f"Solve this coding problem: {title}\n\n{problem}" elif source_name.endswith("humaneval"): problem = _safe_get(item, ["prompt"]) tests = _safe_get(item, ["test"]) if tests: problem = f"{problem}\n\nTests:\n{tests}" sol = _safe_get(item, ["canonical_solution"]) solutions = [sol] if sol else [] base_instruction = f"Solve this coding problem: {title}\n\n{problem}" elif source_name.endswith("code_contests"): problem = _safe_get(item, ["description", "problem", "question", "prompt"]) solutions = _extract_many_code_contests_solutions(item, max_per_problem=6) base_instruction = f"Solve this coding problem: {title}\n\n{problem}" elif source_name.endswith("apps"): problem = _safe_get(item, ["question", "problem", "prompt", "statement"]) solutions = _extract_many_apps_solutions(item, max_per_problem=5) base_instruction = f"Solve this coding problem: {title}\n\n{problem}" elif source_name.endswith("open-r1/codeforces"): problem = _safe_get( item, ["problem", "statement", "question", "prompt", "description", "content"], ) solutions = _extract_many_generic_solutions(item, max_per_problem=6) base_instruction = f"Solve this coding problem: {title}\n\n{problem}" else: problem = _safe_get(item, ["content", "description", "question", "prompt", "statement"]) langs = [ _safe_get(item, ["python"]), _safe_get(item, ["javascript"]), _safe_get(item, ["java"]), _safe_get(item, ["c++"]), _safe_get(item, ["answer"]), _safe_get(item, ["code"]), ] solutions = [s for s in langs if s] if isinstance(item.get("solutions"), list): for extra in item["solutions"]: t = _decode_text(extra).strip() if t and t not in solutions: solutions.append(t) base_instruction = f"Solve this coding problem: {title}\n\n{problem}" if not problem or not solutions: continue for sol in solutions: if emitted >= limit: break if not sol or len(sol.strip()) < 20: continue emitted += 1 yield build_instruction_sample( instruction=base_instruction, response=sol, source=f"problem_{source_name.replace('/', '_')}", category="problem", ) # Final problem fallback from CodeSearchNet docstrings to boost high-quality problem pairs. if emitted < limit: synth_cap = int(limit * 0.30) cache_path = Path("./data/cache/raw/code_search_net_python") ds = None if cache_path.exists(): try: ds = load_from_disk(str(cache_path))["train"] logger.info("Using cached CodeSearchNet Python for problem fallback.") except Exception: ds = None if ds is None: try: ds = load_dataset("code_search_net", "python", split="train", streaming=True) logger.info("Using streamed CodeSearchNet Python for problem fallback.") except Exception as exc: logger.warning("Problem fallback CodeSearchNet failed: %s", exc) ds = None if ds is not None: for item in tqdm(ds, desc="problem_codesearchnet_fallback", unit="rows"): if emitted >= limit or synth_emitted >= synth_cap: break doc = _safe_get(item, ["docstring", "func_documentation_string"]) code = _safe_get(item, ["whole_func_string", "code"]) if len(doc.strip()) < 30 or not code: continue emitted += 1 synth_emitted += 1 yield build_instruction_sample( instruction=f"Solve the following programming task:\n\n{doc}", response=code, source="codesearchnet_problem_fallback", category="problem", ) return _write_jsonl(raw_path, rows()) def fetch_problem_codeforces(raw_path: Path, limit: int, logger) -> int: source_file = RAW_DIR / "codeforces.jsonl" if not source_file.exists(): logger.warning("Codeforces dataset file not found: %s", source_file.resolve()) return 0 def rows(): emitted = 0 for item in tqdm(_iter_jsonl(source_file), desc="problem_codeforces", unit="rows"): if emitted >= limit: break instruction = _safe_get(item, ["instruction", "problem", "statement", "question"]) response = _safe_get(item, ["response", "solution", "answer", "code"]) if not instruction or not response: continue if not instruction.lower().startswith("solve the following problem"): instruction = f"Solve the following problem:\n{instruction}" emitted += 1 yield build_instruction_sample( instruction=instruction, response=response, source="codeforces_dataset", category="problem", ) count = _write_jsonl(raw_path, rows()) logger.info("Loaded Codeforces pre-ingested samples: %d", count) return count def build_dataset(args) -> Path: ensure_dirs([RAW_DIR, FINAL_DIR, LOG_DIR]) logger = setup_logger("data_fetch_build", LOG_DIR / "data_fetch.log") logger.info("Starting production dataset build. target_size=%d", args.target_size) logger.info("Raw dir: %s", RAW_DIR.resolve()) logger.info("Final dir: %s", FINAL_DIR.resolve()) fetch_plan = { "codealpaca": (fetch_instruction_codealpaca, args.codealpaca_limit), "evol_instruct_code": (fetch_instruction_evol, args.evol_limit), "ultrachat_code": (fetch_instruction_ultrachat_code, args.ultrachat_limit), "openhermes_code": (fetch_instruction_openhermes_code, min(args.openhermes_limit, 120_000)), "codesearchnet_multilang": (fetch_structured_codesearchnet, args.codesearchnet_limit), "github_curated_functions": (fetch_structured_github_functions, args.github_limit), "codeforces_problem": (fetch_problem_codeforces, args.codeforces_limit), "leetcode_competitive": (fetch_problem_leetcode, args.leetcode_limit), } raw_paths: List[Path] = [] if not args.skip_fetch: for name, (fn, limit) in fetch_plan.items(): raw_path = RAW_DIR / f"{name}.jsonl" raw_paths.append(raw_path) try: count = fn(raw_path, limit, logger) logger.info("Fetched %d rows for source=%s", count, name) except Exception as exc: logger.warning("Skipping source=%s due to fetch error: %s", name, exc) else: raw_paths = sorted(RAW_DIR.glob("*.jsonl")) logger.info("Skip fetch enabled. Using existing raw files: %d", len(raw_paths)) # Phase 1: base balanced build (streaming + dedupe). stats = build_balanced_dataset( input_paths=raw_paths, output_path=FINAL_TRAIN, target_size=args.target_size, min_tokens=args.min_tokens, max_tokens=args.max_tokens, num_workers=args.workers, category_weights={"instruction": 0.60, "structured": 0.30, "problem": 0.10}, sqlite_path=FINAL_DIR / "dedupe_hashes.sqlite", ) # Phase 2: post-build strict rebalance (downsample excess + upsample deficits). rebalance_stats = rebalance_final_dataset( raw_paths=raw_paths, output_path=FINAL_TRAIN, target_size=args.target_size, min_tokens=args.min_tokens, max_tokens=args.max_tokens, min_problem_samples=args.min_problem_samples, logger=logger, ) actual_problem = int(rebalance_stats["category_breakdown"].get("problem", 0)) required_problem = int(args.min_problem_samples) real_problem = int(rebalance_stats.get("problem_real_count", 0)) synthetic_problem = int(rebalance_stats.get("problem_synthetic_count", 0)) synthetic_ratio = synthetic_problem / max(actual_problem, 1) if actual_problem < max(required_problem, args.min_total_problem_samples): raise RuntimeError( "Build aborted: insufficient problem-solving data after rebalance. " f"Required >= {max(required_problem, args.min_total_problem_samples)}, actual = {actual_problem}. " "Increase problem dataset sources (e.g., leetcode/code contests/problem-solution datasets) " "or raise problem fetch limits, then rebuild." ) if real_problem < args.min_real_problem_samples: raise RuntimeError( "Build aborted: insufficient REAL problem-solving data after rebalance. " f"Required real >= {args.min_real_problem_samples}, actual real = {real_problem}. " "Add more high-quality real problem datasets (APPS/CodeContests/Codeforces/LeetCode)." ) if synthetic_ratio > args.max_synthetic_problem_ratio: raise RuntimeError( "Build aborted: synthetic problem share too high. " f"Allowed <= {args.max_synthetic_problem_ratio:.0%}, actual = {synthetic_ratio:.2%}. " "Increase real problem sources and reduce synthetic fallback usage." ) logger.info("Build complete. Final dataset: %s", FINAL_TRAIN.resolve()) logger.info("Base stats: %s", stats) logger.info("Rebalanced stats: %s", rebalance_stats) print(f"Final dataset: {FINAL_TRAIN.resolve()}") print(f"Total samples: {rebalance_stats['total_samples']}") print(f"Avg length (tokens est.): {rebalance_stats['avg_length_tokens']}") print("Per-source breakdown:") for src, count in sorted( rebalance_stats["source_breakdown"].items(), key=lambda x: x[1], reverse=True ): print(f" - {src}: {count}") print("Category breakdown:") for cat, count in sorted(rebalance_stats["category_breakdown"].items(), key=lambda x: x[0]): print(f" - {cat}: {count} (target: {rebalance_stats['targets'].get(cat, 0)})") ratio = rebalance_stats["instruction_vs_raw_ratio"] print( f"Instruction vs raw-converted ratio: {ratio['instruction_pct']}% / {ratio['raw_converted_pct']}%" ) total = max(1, rebalance_stats["total_samples"]) print("Category percentages:") for cat in ("instruction", "structured", "problem"): pct = 100.0 * rebalance_stats["category_breakdown"].get(cat, 0) / total print(f" - {cat}: {pct:.2f}%") print(f"Real problem count: {real_problem}") print(f"Synthetic problem count: {synthetic_problem}") print(f"Synthetic problem %: {synthetic_ratio * 100:.2f}%") return FINAL_TRAIN def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Production-grade coding dataset build pipeline.") parser.add_argument("--build", action="store_true", help="Run the full build pipeline.") parser.add_argument("--target-size", type=int, default=1_000_000) parser.add_argument("--min-tokens", type=int, default=10) parser.add_argument("--max-tokens", type=int, default=2048) parser.add_argument("--skip-fetch", action="store_true", help="Use existing ./data/raw/*.jsonl only.") parser.add_argument( "--workers", type=int, default=max(1, (os.cpu_count() or 4) // 2), help="Parallel worker processes for cleaning stage.", ) parser.add_argument("--codealpaca-limit", type=int, default=20000) parser.add_argument("--evol-limit", type=int, default=300000) parser.add_argument("--ultrachat-limit", type=int, default=250000) parser.add_argument("--openhermes-limit", type=int, default=250000) parser.add_argument("--codesearchnet-limit", type=int, default=300000) parser.add_argument("--github-limit", type=int, default=200000) parser.add_argument("--codeforces-limit", type=int, default=200000) parser.add_argument("--leetcode-limit", type=int, default=300000) parser.add_argument( "--stackoverflow-limit", type=int, default=0, help="Deprecated. StackOverflow sources were removed due unreliability.", ) parser.add_argument( "--min-problem-samples", type=int, default=50_000, help="Ensure at least this many samples in problem category during post-rebalance.", ) parser.add_argument( "--min-real-problem-samples", type=int, default=50_000, help="Minimum REAL problem samples required after rebalance.", ) parser.add_argument( "--min-total-problem-samples", type=int, default=80_000, help="Minimum total problem samples required after rebalance.", ) parser.add_argument( "--max-synthetic-problem-ratio", type=float, default=0.30, help="Maximum allowed synthetic (docstring fallback) share in problem category.", ) return parser if __name__ == "__main__": parser = _build_parser() args = parser.parse_args() if args.build: build_dataset(args) else: parser.print_help()