Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /coverage_100k_forge.py
| from __future__ import annotations | |
| from collections import Counter | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| from pathlib import Path | |
| from typing import Any | |
| SCHEMA_VERSION = "tinymind-coverage-100k-v1" | |
| COVERAGE_AXIS_COUNT = 100 | |
| SYSTEM = ( | |
| "You are TinyMind Pure Coverage Tutor. Answer with evidence discipline, exact constraints, " | |
| "and useful natural language. Prefer verified reasoning over memorized slogans. " | |
| "For cyber, reverse engineering, OS, or tool topics, stay defensive, authorized, and audit-friendly." | |
| ) | |
| class Axis: | |
| axis_id: str | |
| macro_domain: str | |
| name: str | |
| languages: tuple[str, ...] | |
| skills: tuple[str, ...] | |
| safety_boundary: str = "standard" | |
| MACRO_DOMAINS = ( | |
| "thai_english_language", | |
| "instruction_following", | |
| "coding_project_agent", | |
| "tool_grounding_sandbox", | |
| "mathematics_logic", | |
| "science_engineering", | |
| "medicine_law_finance_safe", | |
| "cyber_reverse_defensive", | |
| "data_ml_systems", | |
| "multimodal_research_grounding", | |
| ) | |
| DOMAIN_TOPICS = { | |
| "thai_english_language": ( | |
| "Thai technical explanation", | |
| "English precision writing", | |
| "Thai-English translation invariance", | |
| "idiom and register control", | |
| "summarization with nuance", | |
| "argument repair", | |
| "teaching complex ideas simply", | |
| "terminology alignment", | |
| "long-form dialogue", | |
| "cross-cultural communication", | |
| ), | |
| "instruction_following": ( | |
| "system instruction hierarchy", | |
| "format constraints", | |
| "negative instruction contrast", | |
| "multi-step task planning", | |
| "ambiguity handling", | |
| "refusal boundary", | |
| "schema completion", | |
| "self-check loops", | |
| "concise versus detailed answer control", | |
| "stateful conversation repair", | |
| ), | |
| "coding_project_agent": ( | |
| "Python project architecture", | |
| "TypeScript service design", | |
| "Rust native helper", | |
| "Go concurrent service", | |
| "C++ performance module", | |
| "SQL and data migrations", | |
| "test-driven repair", | |
| "CI release pipeline", | |
| "debugging from logs", | |
| "cross-platform command ergonomics", | |
| ), | |
| "tool_grounding_sandbox": ( | |
| "Lua sandbox planning", | |
| "local proxy policy", | |
| "detached command streaming", | |
| "snapshot and fork workflow", | |
| "file transfer audit", | |
| "resource budget choice", | |
| "workspace containment", | |
| "tool result summarization", | |
| "failure recovery", | |
| "manifest writing", | |
| ), | |
| "mathematics_logic": ( | |
| "proof invariants", | |
| "probability calibration", | |
| "optimization objective design", | |
| "linear algebra intuition", | |
| "discrete algorithms", | |
| "numerical stability", | |
| "information compression", | |
| "causal reasoning", | |
| "formal counterexamples", | |
| "bit-level exactness", | |
| ), | |
| "science_engineering": ( | |
| "physics model limits", | |
| "chemistry mechanism reasoning", | |
| "biology systems reasoning", | |
| "electrical engineering tradeoffs", | |
| "mechanical design verification", | |
| "materials constraints", | |
| "robotics control", | |
| "cloud distributed systems", | |
| "embedded IoT", | |
| "energy and thermal analysis", | |
| ), | |
| "medicine_law_finance_safe": ( | |
| "medical triage explanation", | |
| "public health evidence", | |
| "legal issue spotting", | |
| "financial risk framing", | |
| "accounting reconciliation", | |
| "business strategy critique", | |
| "policy analysis", | |
| "ethics governance", | |
| "education assessment", | |
| "safety limitation disclosure", | |
| ), | |
| "cyber_reverse_defensive": ( | |
| "CVE defensive analysis", | |
| "malware report reading", | |
| "APK static analysis", | |
| "binary format explanation", | |
| "Ghidra workflow overview", | |
| "Il2Cpp metadata understanding", | |
| "threat model documentation", | |
| "secure patch planning", | |
| "incident timeline reconstruction", | |
| "authorized reverse engineering boundary", | |
| ), | |
| "data_ml_systems": ( | |
| "dataset quality filtering", | |
| "deduplication and lineage", | |
| "retrieval index design", | |
| "LoRA training diagnosis", | |
| "quantization drift", | |
| "GPU benchmark honesty", | |
| "evaluation harness design", | |
| "leaderboard claim gates", | |
| "model card evidence", | |
| "active learning loop", | |
| ), | |
| "multimodal_research_grounding": ( | |
| "image reasoning from description", | |
| "audio transcript analysis", | |
| "video event timeline", | |
| "document extraction", | |
| "folder-scale code review", | |
| "evidence ledger recall", | |
| "long context chunking", | |
| "deep research synthesis", | |
| "source conflict resolution", | |
| "answer uncertainty calibration", | |
| ), | |
| } | |
| def _axes() -> list[Axis]: | |
| axes: list[Axis] = [] | |
| idx = 1 | |
| for domain in MACRO_DOMAINS: | |
| for topic in DOMAIN_TOPICS[domain]: | |
| boundary = "defensive_only" if domain == "cyber_reverse_defensive" else "standard" | |
| if domain == "medicine_law_finance_safe": | |
| boundary = "not_professional_advice" | |
| axes.append( | |
| Axis( | |
| axis_id=f"axis_{idx:03d}", | |
| macro_domain=domain, | |
| name=topic, | |
| languages=("th", "en") if idx % 2 else ("en", "th"), | |
| skills=( | |
| "decompose", | |
| "ground", | |
| "reason", | |
| "verify", | |
| "compress", | |
| ), | |
| safety_boundary=boundary, | |
| ) | |
| ) | |
| idx += 1 | |
| if len(axes) != COVERAGE_AXIS_COUNT: | |
| raise RuntimeError(f"expected {COVERAGE_AXIS_COUNT} axes, got {len(axes)}") | |
| return axes | |
| TASK_FRAMES = ( | |
| "diagnose", | |
| "teach", | |
| "design", | |
| "compare", | |
| "verify", | |
| "repair", | |
| "compress", | |
| "expand", | |
| "translate", | |
| "evaluate", | |
| "plan", | |
| "audit", | |
| "simulate", | |
| "summarize", | |
| "ground", | |
| "critique", | |
| ) | |
| DIFFICULTIES = ("foundation", "intermediate", "advanced", "expert", "frontier_probe") | |
| DEFAULT_SOURCE_ROOTS = ("data/jsonl", "third_party") | |
| SOURCE_SUFFIXES = { | |
| ".jsonl", | |
| ".json", | |
| ".md", | |
| ".txt", | |
| ".py", | |
| ".js", | |
| ".ts", | |
| ".rs", | |
| ".go", | |
| ".c", | |
| ".cpp", | |
| ".h", | |
| ".cs", | |
| ".java", | |
| ".kt", | |
| ".lua", | |
| ".yml", | |
| ".yaml", | |
| } | |
| EXCLUDED_DIR_NAMES = { | |
| ".git", | |
| "__pycache__", | |
| ".pytest_cache", | |
| "node_modules", | |
| "coverage_100k", | |
| } | |
| def _sha(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _thai_task(axis: Axis, frame: str, variant: int, difficulty: str) -> str: | |
| return ( | |
| f"โจทย์ {axis.axis_id}: ช่วย {frame} หัวข้อ `{axis.name}` ระดับ {difficulty}. " | |
| f"ให้ตอบแบบมนุษย์ธรรมชาติ อธิบายลึกแต่ตรวจสอบได้ แยกข้อเท็จจริง สมมติฐาน ข้อจำกัด " | |
| f"และขั้นตอนยืนยันผล ตัวอย่างรอบที่ {variant} ต้องไม่ตอบลอย ๆ" | |
| ) | |
| def _english_task(axis: Axis, frame: str, variant: int, difficulty: str) -> str: | |
| return ( | |
| f"Task {axis.axis_id}: {frame} the topic `{axis.name}` at {difficulty} level. " | |
| f"Give a natural but rigorous answer. Separate facts, assumptions, constraints, " | |
| f"verification steps, and failure modes. Variant {variant} must be useful without unsupported claims." | |
| ) | |
| def _assistant(axis: Axis, frame: str, variant: int, difficulty: str, lang: str) -> str: | |
| safety = "" | |
| if axis.safety_boundary == "defensive_only": | |
| safety = ( | |
| "\nSafety boundary: keep this defensive and authorized. Do not provide exploit payloads, " | |
| "stealth steps, credential theft, or malware improvement." | |
| ) | |
| elif axis.safety_boundary == "not_professional_advice": | |
| safety = ( | |
| "\nBoundary: this is educational decision support, not medical, legal, or financial advice. " | |
| "Escalate to qualified professionals for real-world decisions." | |
| ) | |
| if lang == "th": | |
| return ( | |
| f"แก่นคำตอบ: `{axis.name}` ต้องเริ่มจากการนิยามเป้าหมายของงาน {frame} ให้ชัด แล้วค่อยเชื่อมเหตุผลกับหลักฐาน.\n\n" | |
| f"1. โครงคิด: แยกปัญหาเป็นบริบท อินพุต เงื่อนไขที่ห้ามละเมิด ผลลัพธ์ที่ต้องการ และวิธีตรวจซ้ำ.\n" | |
| f"2. วิธีทำระดับ {difficulty}: สร้างสมมติฐานขนาดเล็ก ตรวจด้วยตัวอย่าง คัดทิ้งสิ่งที่ไม่มีหลักฐาน แล้วสรุปเป็นกฎที่ใช้ซ้ำได้.\n" | |
| f"3. จุดพลาดที่ต้องกัน: อย่าแทนที่การตรวจสอบด้วยความมั่นใจทางภาษา อย่าเครมว่าแน่ถ้าไม่มี artifact, hash, test, หรือแหล่งอ้างอิง.\n" | |
| f"4. Verification: ระบุข้อมูลนำเข้า เกณฑ์ผ่าน/ไม่ผ่าน คำสั่งหรือขั้นตอนทดสอบ และบันทึกผลเป็น manifest.\n" | |
| f"5. Compression note: เก็บสูตรตัดสินใจ ไม่เก็บประโยคจำสำเร็จรูป เพื่อให้โมเดลถ่ายโอนความรู้ไปงานใหม่ได้.{safety}\n\n" | |
| f"สรุปสั้น: คำตอบที่ดีต้องแม่น ตรวจซ้ำได้ และยอมรับขอบเขตของหลักฐานเสมอ." | |
| ) | |
| return ( | |
| f"Core answer: `{axis.name}` should be handled by defining the {frame} target first, then binding each claim to evidence.\n\n" | |
| f"1. Reasoning frame: split the task into context, inputs, constraints, desired output, and reproducible checks.\n" | |
| f"2. Method at {difficulty} level: form a small hypothesis, test it with examples, discard unsupported parts, and keep the reusable rule.\n" | |
| f"3. Failure mode: fluent text is not proof. Do not claim certainty without an artifact, hash, test result, or cited source.\n" | |
| f"4. Verification: record the input, pass/fail criteria, command or inspection method, and final manifest.\n" | |
| f"5. Compression note: store the decision procedure rather than memorized wording so the model can transfer it to new tasks.{safety}\n\n" | |
| f"Bottom line: the answer is high quality only when it is useful, bounded, and auditable." | |
| ) | |
| def _source_grounded_assistant( | |
| axis: Axis, | |
| frame: str, | |
| variant: int, | |
| difficulty: str, | |
| lang: str, | |
| evidence: dict[str, str], | |
| ) -> str: | |
| excerpt = evidence["excerpt"] | |
| src = evidence["source_path"] | |
| h = evidence["source_sha256"][:16] | |
| safety = "" | |
| if axis.safety_boundary == "defensive_only": | |
| safety = "\nขอบเขตความปลอดภัย: ใช้เพื่อป้องกัน วิเคราะห์ที่ได้รับอนุญาต และทำเอกสารเท่านั้น ไม่ให้ payload หรือขั้นตอนโจมตี." | |
| elif axis.safety_boundary == "not_professional_advice": | |
| safety = "\nขอบเขต: เป็นการศึกษาและช่วยจัดกรอบ ไม่ใช่คำแนะนำแพทย์ กฎหมาย หรือการเงินสำหรับการตัดสินใจจริง." | |
| if lang == "th": | |
| return ( | |
| f"Evidence anchor: `{src}` sha256-prefix `{h}`\n\n" | |
| f"Excerpt:\n```text\n{excerpt}\n```\n\n" | |
| f"คำตอบ: สำหรับ `{axis.name}` ให้ทำงานแบบ {frame} โดยยึด excerpt นี้เป็นหลักฐานตั้งต้น ไม่ขยายข้อเท็จจริงเกินสิ่งที่ตรวจได้.\n\n" | |
| f"1. แก่นที่ดึงได้: ระบุคำสำคัญ โครงสร้าง หรือพฤติกรรมที่ปรากฏในหลักฐานจริงก่อน.\n" | |
| f"2. เหตุผลระดับ {difficulty}: เชื่อมข้อมูลใน excerpt กับเป้าหมายงานทีละขั้น และแยกสิ่งที่รู้จริงออกจากข้อสมมติ.\n" | |
| f"3. การตรวจซ้ำ: เปิดไฟล์ต้นทาง ตรวจ hash/ตำแหน่ง chunk แล้วเทียบข้อความก่อนใช้เป็นคำตอบหรือข้อมูลฝึก.\n" | |
| f"4. การบีบอัดความรู้: เก็บกฎการอ่านหลักฐานและข้อจำกัด ไม่จำประโยคตอบแบบตายตัว.{safety}\n\n" | |
| f"สรุป: คำตอบนี้ source-grounded เพราะชี้กลับไปยัง artifact จริงได้." | |
| ) | |
| return ( | |
| f"Evidence anchor: `{src}` sha256-prefix `{h}`\n\n" | |
| f"Excerpt:\n```text\n{excerpt}\n```\n\n" | |
| f"Answer: for `{axis.name}`, perform the {frame} task by treating this excerpt as the starting evidence, not as permission to invent extra facts.\n\n" | |
| f"1. Extracted signal: name the terms, structure, or behavior visible in the evidence first.\n" | |
| f"2. {difficulty.title()} reasoning: connect the excerpt to the task step by step and separate verified facts from assumptions.\n" | |
| f"3. Reproducibility: reopen the source file, verify hash/chunk position, and compare the text before using it for training or answers.\n" | |
| f"4. Compression: store the evidence-reading procedure and limits, not a memorized canned response.{safety}\n\n" | |
| f"Summary: this response is source-grounded because it can be traced back to a real artifact." | |
| ) | |
| def _record(axis: Axis, frame: str, variant: int) -> dict[str, Any]: | |
| difficulty = DIFFICULTIES[(variant + int(axis.axis_id[-3:])) % len(DIFFICULTIES)] | |
| lang = axis.languages[variant % len(axis.languages)] | |
| user = _thai_task(axis, frame, variant, difficulty) if lang == "th" else _english_task(axis, frame, variant, difficulty) | |
| assistant = _assistant(axis, frame, variant, difficulty, lang) | |
| fingerprint = _sha(f"{axis.axis_id}|{frame}|{variant}|{difficulty}|{lang}") | |
| return { | |
| "messages": [ | |
| {"role": "system", "content": SYSTEM}, | |
| {"role": "user", "content": user}, | |
| {"role": "assistant", "content": assistant}, | |
| ], | |
| "source": "coverage_100k_forge", | |
| "metadata": { | |
| "schema_version": SCHEMA_VERSION, | |
| "axis_id": axis.axis_id, | |
| "axis_name": axis.name, | |
| "macro_domain": axis.macro_domain, | |
| "difficulty": difficulty, | |
| "language": lang, | |
| "task_frame": frame, | |
| "variant": variant, | |
| "fingerprint_sha256": fingerprint, | |
| "safety_boundary": axis.safety_boundary, | |
| "quality_tags": [ | |
| "instruction_following", | |
| "reasoning", | |
| "grounding", | |
| "verification", | |
| "natural_language", | |
| "compression_ready", | |
| ], | |
| }, | |
| } | |
| def _source_record(axis: Axis, frame: str, variant: int, evidence: dict[str, str]) -> dict[str, Any]: | |
| difficulty = DIFFICULTIES[(variant + int(axis.axis_id[-3:])) % len(DIFFICULTIES)] | |
| lang = axis.languages[variant % len(axis.languages)] | |
| user = ( | |
| f"จากหลักฐานจริง chunk `{evidence['chunk_id']}` ช่วย {frame} หัวข้อ `{axis.name}` " | |
| f"และตอบให้ตรวจย้อนกลับแหล่งข้อมูลได้" | |
| if lang == "th" | |
| else f"Using real evidence chunk `{evidence['chunk_id']}`, {frame} the topic `{axis.name}` and make the answer traceable." | |
| ) | |
| assistant = _source_grounded_assistant(axis, frame, variant, difficulty, lang, evidence) | |
| fingerprint = _sha(f"{axis.axis_id}|{frame}|{variant}|{difficulty}|{lang}|{evidence['chunk_id']}") | |
| return { | |
| "messages": [ | |
| {"role": "system", "content": SYSTEM}, | |
| {"role": "user", "content": user}, | |
| {"role": "assistant", "content": assistant}, | |
| ], | |
| "source": "coverage_100k_real_source_forge", | |
| "metadata": { | |
| "schema_version": SCHEMA_VERSION, | |
| "axis_id": axis.axis_id, | |
| "axis_name": axis.name, | |
| "macro_domain": axis.macro_domain, | |
| "difficulty": difficulty, | |
| "language": lang, | |
| "task_frame": frame, | |
| "variant": variant, | |
| "fingerprint_sha256": fingerprint, | |
| "safety_boundary": axis.safety_boundary, | |
| "source_path": evidence["source_path"], | |
| "source_sha256": evidence["source_sha256"], | |
| "chunk_id": evidence["chunk_id"], | |
| "chunk_offset": int(evidence["chunk_offset"]), | |
| "quality_tags": [ | |
| "real_source", | |
| "instruction_following", | |
| "reasoning", | |
| "grounding", | |
| "verification", | |
| "natural_language", | |
| "compression_ready", | |
| ], | |
| }, | |
| } | |
| def _clean_text(text: str) -> str: | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def _collect_texts(obj: Any, limit: int = 120_000) -> list[str]: | |
| texts: list[str] = [] | |
| total_chars = 0 | |
| def walk(value: Any) -> None: | |
| nonlocal total_chars | |
| if total_chars >= limit: | |
| return | |
| if isinstance(value, str): | |
| cleaned = _clean_text(value) | |
| if len(cleaned) >= 80: | |
| texts.append(cleaned) | |
| total_chars += len(cleaned) | |
| elif isinstance(value, list): | |
| for item in value[:200]: | |
| walk(item) | |
| elif isinstance(value, dict): | |
| for key in ("content", "text", "body", "description", "summary", "title", "messages", "metadata"): | |
| if key in value: | |
| walk(value[key]) | |
| walk(obj) | |
| return texts | |
| def _candidate_files(source_roots: list[str | Path]): | |
| seen: set[Path] = set() | |
| for root in source_roots: | |
| p = Path(root) | |
| if not p.is_absolute(): | |
| p = Path(__file__).resolve().parents[1] / p | |
| if p.is_file() and p.suffix.lower() in SOURCE_SUFFIXES: | |
| resolved = p.resolve() | |
| if resolved not in seen: | |
| seen.add(resolved) | |
| yield p | |
| elif p.is_dir(): | |
| for dirpath, dirnames, filenames in os.walk(p): | |
| dirnames[:] = sorted(d for d in dirnames if d not in EXCLUDED_DIR_NAMES) | |
| for name in sorted(filenames): | |
| f = Path(dirpath) / name | |
| if f.suffix.lower() not in SOURCE_SUFFIXES: | |
| continue | |
| resolved = f.resolve() | |
| if resolved in seen: | |
| continue | |
| seen.add(resolved) | |
| yield f | |
| def _iter_source_chunks( | |
| source_roots: list[str | Path], | |
| *, | |
| chunk_chars: int = 900, | |
| stride_chars: int = 650, | |
| max_file_chars: int = 8_000_000, | |
| max_chunks_per_file: int = 5_000, | |
| ): | |
| for path in _candidate_files(source_roots): | |
| try: | |
| raw = path.read_text(encoding="utf-8", errors="replace") | |
| except OSError: | |
| continue | |
| if not raw.strip(): | |
| continue | |
| if len(raw) > max_file_chars: | |
| raw = raw[:max_file_chars] | |
| file_hash = _sha(raw) | |
| text = _clean_text(raw) | |
| if len(text) < 80: | |
| continue | |
| rel = str(path) | |
| chunk_no = 0 | |
| if len(text) <= chunk_chars: | |
| yield { | |
| "source_path": rel, | |
| "source_sha256": file_hash, | |
| "chunk_id": f"{file_hash[:12]}:{chunk_no:06d}", | |
| "chunk_offset": str(0), | |
| "excerpt": text, | |
| } | |
| continue | |
| for offset in range(0, max(1, len(text) - chunk_chars + 1), stride_chars): | |
| excerpt = text[offset : offset + chunk_chars].strip() | |
| if len(excerpt) < 80: | |
| continue | |
| yield { | |
| "source_path": rel, | |
| "source_sha256": file_hash, | |
| "chunk_id": f"{file_hash[:12]}:{chunk_no:06d}", | |
| "chunk_offset": str(offset), | |
| "excerpt": excerpt, | |
| } | |
| chunk_no += 1 | |
| if chunk_no >= max_chunks_per_file: | |
| break | |
| def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8", newline="\n") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n") | |
| def _write_row(handle, row: dict[str, Any]) -> None: | |
| handle.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n") | |
| def _file_sha(path: Path) -> str: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def build_coverage_100k_dataset( | |
| out_dir: str | Path, | |
| *, | |
| target_records: int = 100_000, | |
| variants_per_axis: int | None = None, | |
| eval_fraction: float = 0.01, | |
| source_roots: list[str | Path] | None = None, | |
| source_grounded: bool = True, | |
| ) -> dict[str, Any]: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| axes = _axes() | |
| if target_records < COVERAGE_AXIS_COUNT: | |
| raise ValueError(f"target_records must be >= {COVERAGE_AXIS_COUNT}") | |
| if variants_per_axis is None: | |
| variants_per_axis = (target_records + COVERAGE_AXIS_COUNT - 1) // COVERAGE_AXIS_COUNT | |
| train_path = out / "coverage_100_axis_train.jsonl" | |
| eval_path = out / "coverage_100_axis_eval.jsonl" | |
| manifest_path = out / "coverage_100k_manifest.json" | |
| eval_mod = max(1, round(1 / max(0.001, min(eval_fraction, 0.5)))) | |
| records_written = 0 | |
| train_records = 0 | |
| eval_records = 0 | |
| domain_counts: Counter[str] = Counter() | |
| lang_counts: Counter[str] = Counter() | |
| source_chunks_used = 0 | |
| source_files_seen: set[str] = set() | |
| def emit(row: dict[str, Any], train_f, eval_f) -> None: | |
| nonlocal records_written, train_records, eval_records | |
| metadata = row["metadata"] | |
| domain_counts[metadata["macro_domain"]] += 1 | |
| lang_counts[metadata["language"]] += 1 | |
| if records_written % eval_mod == 0: | |
| _write_row(eval_f, row) | |
| eval_records += 1 | |
| else: | |
| _write_row(train_f, row) | |
| train_records += 1 | |
| records_written += 1 | |
| with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open( | |
| "w", encoding="utf-8", newline="\n" | |
| ) as eval_f: | |
| if source_grounded: | |
| roots = list(source_roots or DEFAULT_SOURCE_ROOTS) | |
| chunk_iter = _iter_source_chunks(roots) | |
| for global_variant, evidence in enumerate(chunk_iter): | |
| axis = axes[global_variant % len(axes)] | |
| axis_variant = global_variant // len(axes) | |
| frame = TASK_FRAMES[(axis_variant + int(axis.axis_id[-3:])) % len(TASK_FRAMES)] | |
| emit(_source_record(axis, frame, axis_variant, evidence), train_f, eval_f) | |
| source_chunks_used += 1 | |
| source_files_seen.add(evidence["source_path"]) | |
| if records_written >= target_records: | |
| break | |
| else: | |
| for axis in axes: | |
| for variant in range(variants_per_axis): | |
| frame = TASK_FRAMES[(variant + int(axis.axis_id[-3:])) % len(TASK_FRAMES)] | |
| emit(_record(axis, frame, variant), train_f, eval_f) | |
| if records_written >= target_records: | |
| break | |
| if records_written >= target_records: | |
| break | |
| if source_grounded and records_written < target_records: | |
| raise RuntimeError( | |
| f"real source chunks produced {records_written} records, below target {target_records}. " | |
| "Add more source roots or lower --target-records." | |
| ) | |
| report: dict[str, Any] = { | |
| "schema_version": SCHEMA_VERSION, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "summary": { | |
| "axis_count": len(axes), | |
| "records_written": records_written, | |
| "train_records": train_records, | |
| "eval_records": eval_records, | |
| "target_records": target_records, | |
| "variants_per_axis": variants_per_axis, | |
| "macro_domain_counts": dict(sorted(domain_counts.items())), | |
| "language_counts": dict(sorted(lang_counts.items())), | |
| "source_grounded": source_grounded, | |
| "source_chunks_used": source_chunks_used, | |
| "source_files_used": len(source_files_seen), | |
| }, | |
| "outputs": { | |
| "train_jsonl": str(train_path), | |
| "eval_jsonl": str(eval_path), | |
| "train_sha256": _file_sha(train_path), | |
| "eval_sha256": _file_sha(eval_path), | |
| }, | |
| "axes": [axis.__dict__ for axis in axes], | |
| "claim_gate": { | |
| "coverage_axes_ready": len(axes) == COVERAGE_AXIS_COUNT, | |
| "coverage_100k_ready": records_written >= 100_000, | |
| "real_source_grounding_ready": (not source_grounded) or (source_chunks_used == records_written and len(source_files_seen) > 0), | |
| "quality_100_percent_claim_allowed": False, | |
| "world_best_claim_allowed": False, | |
| "reason": "This forge expands deterministic supervised coverage. It does not prove 100% capability until external and local eval gates pass.", | |
| }, | |
| } | |
| report["manifest_path"] = str(manifest_path) | |
| manifest_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return report | |
Xet Storage Details
- Size:
- 27.5 kB
- Xet hash:
- 1e4a985cd2d6d798495215d673e1ee10b0133691db08683f9da07fe0630c99b9
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.