bbkdevops's picture
download
raw
27.5 kB
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import json
import os
import re
from pathlib import Path
from typing import Any
SCHEMA_VERSION = "tinymind-coverage-100k-v1"
COVERAGE_AXIS_COUNT = 100
SYSTEM = (
"You are TinyMind Pure Coverage Tutor. Answer with evidence discipline, exact constraints, "
"and useful natural language. Prefer verified reasoning over memorized slogans. "
"For cyber, reverse engineering, OS, or tool topics, stay defensive, authorized, and audit-friendly."
)
@dataclass(frozen=True)
class Axis:
axis_id: str
macro_domain: str
name: str
languages: tuple[str, ...]
skills: tuple[str, ...]
safety_boundary: str = "standard"
MACRO_DOMAINS = (
"thai_english_language",
"instruction_following",
"coding_project_agent",
"tool_grounding_sandbox",
"mathematics_logic",
"science_engineering",
"medicine_law_finance_safe",
"cyber_reverse_defensive",
"data_ml_systems",
"multimodal_research_grounding",
)
DOMAIN_TOPICS = {
"thai_english_language": (
"Thai technical explanation",
"English precision writing",
"Thai-English translation invariance",
"idiom and register control",
"summarization with nuance",
"argument repair",
"teaching complex ideas simply",
"terminology alignment",
"long-form dialogue",
"cross-cultural communication",
),
"instruction_following": (
"system instruction hierarchy",
"format constraints",
"negative instruction contrast",
"multi-step task planning",
"ambiguity handling",
"refusal boundary",
"schema completion",
"self-check loops",
"concise versus detailed answer control",
"stateful conversation repair",
),
"coding_project_agent": (
"Python project architecture",
"TypeScript service design",
"Rust native helper",
"Go concurrent service",
"C++ performance module",
"SQL and data migrations",
"test-driven repair",
"CI release pipeline",
"debugging from logs",
"cross-platform command ergonomics",
),
"tool_grounding_sandbox": (
"Lua sandbox planning",
"local proxy policy",
"detached command streaming",
"snapshot and fork workflow",
"file transfer audit",
"resource budget choice",
"workspace containment",
"tool result summarization",
"failure recovery",
"manifest writing",
),
"mathematics_logic": (
"proof invariants",
"probability calibration",
"optimization objective design",
"linear algebra intuition",
"discrete algorithms",
"numerical stability",
"information compression",
"causal reasoning",
"formal counterexamples",
"bit-level exactness",
),
"science_engineering": (
"physics model limits",
"chemistry mechanism reasoning",
"biology systems reasoning",
"electrical engineering tradeoffs",
"mechanical design verification",
"materials constraints",
"robotics control",
"cloud distributed systems",
"embedded IoT",
"energy and thermal analysis",
),
"medicine_law_finance_safe": (
"medical triage explanation",
"public health evidence",
"legal issue spotting",
"financial risk framing",
"accounting reconciliation",
"business strategy critique",
"policy analysis",
"ethics governance",
"education assessment",
"safety limitation disclosure",
),
"cyber_reverse_defensive": (
"CVE defensive analysis",
"malware report reading",
"APK static analysis",
"binary format explanation",
"Ghidra workflow overview",
"Il2Cpp metadata understanding",
"threat model documentation",
"secure patch planning",
"incident timeline reconstruction",
"authorized reverse engineering boundary",
),
"data_ml_systems": (
"dataset quality filtering",
"deduplication and lineage",
"retrieval index design",
"LoRA training diagnosis",
"quantization drift",
"GPU benchmark honesty",
"evaluation harness design",
"leaderboard claim gates",
"model card evidence",
"active learning loop",
),
"multimodal_research_grounding": (
"image reasoning from description",
"audio transcript analysis",
"video event timeline",
"document extraction",
"folder-scale code review",
"evidence ledger recall",
"long context chunking",
"deep research synthesis",
"source conflict resolution",
"answer uncertainty calibration",
),
}
def _axes() -> list[Axis]:
axes: list[Axis] = []
idx = 1
for domain in MACRO_DOMAINS:
for topic in DOMAIN_TOPICS[domain]:
boundary = "defensive_only" if domain == "cyber_reverse_defensive" else "standard"
if domain == "medicine_law_finance_safe":
boundary = "not_professional_advice"
axes.append(
Axis(
axis_id=f"axis_{idx:03d}",
macro_domain=domain,
name=topic,
languages=("th", "en") if idx % 2 else ("en", "th"),
skills=(
"decompose",
"ground",
"reason",
"verify",
"compress",
),
safety_boundary=boundary,
)
)
idx += 1
if len(axes) != COVERAGE_AXIS_COUNT:
raise RuntimeError(f"expected {COVERAGE_AXIS_COUNT} axes, got {len(axes)}")
return axes
TASK_FRAMES = (
"diagnose",
"teach",
"design",
"compare",
"verify",
"repair",
"compress",
"expand",
"translate",
"evaluate",
"plan",
"audit",
"simulate",
"summarize",
"ground",
"critique",
)
DIFFICULTIES = ("foundation", "intermediate", "advanced", "expert", "frontier_probe")
DEFAULT_SOURCE_ROOTS = ("data/jsonl", "third_party")
SOURCE_SUFFIXES = {
".jsonl",
".json",
".md",
".txt",
".py",
".js",
".ts",
".rs",
".go",
".c",
".cpp",
".h",
".cs",
".java",
".kt",
".lua",
".yml",
".yaml",
}
EXCLUDED_DIR_NAMES = {
".git",
"__pycache__",
".pytest_cache",
"node_modules",
"coverage_100k",
}
def _sha(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _thai_task(axis: Axis, frame: str, variant: int, difficulty: str) -> str:
return (
f"โจทย์ {axis.axis_id}: ช่วย {frame} หัวข้อ `{axis.name}` ระดับ {difficulty}. "
f"ให้ตอบแบบมนุษย์ธรรมชาติ อธิบายลึกแต่ตรวจสอบได้ แยกข้อเท็จจริง สมมติฐาน ข้อจำกัด "
f"และขั้นตอนยืนยันผล ตัวอย่างรอบที่ {variant} ต้องไม่ตอบลอย ๆ"
)
def _english_task(axis: Axis, frame: str, variant: int, difficulty: str) -> str:
return (
f"Task {axis.axis_id}: {frame} the topic `{axis.name}` at {difficulty} level. "
f"Give a natural but rigorous answer. Separate facts, assumptions, constraints, "
f"verification steps, and failure modes. Variant {variant} must be useful without unsupported claims."
)
def _assistant(axis: Axis, frame: str, variant: int, difficulty: str, lang: str) -> str:
safety = ""
if axis.safety_boundary == "defensive_only":
safety = (
"\nSafety boundary: keep this defensive and authorized. Do not provide exploit payloads, "
"stealth steps, credential theft, or malware improvement."
)
elif axis.safety_boundary == "not_professional_advice":
safety = (
"\nBoundary: this is educational decision support, not medical, legal, or financial advice. "
"Escalate to qualified professionals for real-world decisions."
)
if lang == "th":
return (
f"แก่นคำตอบ: `{axis.name}` ต้องเริ่มจากการนิยามเป้าหมายของงาน {frame} ให้ชัด แล้วค่อยเชื่อมเหตุผลกับหลักฐาน.\n\n"
f"1. โครงคิด: แยกปัญหาเป็นบริบท อินพุต เงื่อนไขที่ห้ามละเมิด ผลลัพธ์ที่ต้องการ และวิธีตรวจซ้ำ.\n"
f"2. วิธีทำระดับ {difficulty}: สร้างสมมติฐานขนาดเล็ก ตรวจด้วยตัวอย่าง คัดทิ้งสิ่งที่ไม่มีหลักฐาน แล้วสรุปเป็นกฎที่ใช้ซ้ำได้.\n"
f"3. จุดพลาดที่ต้องกัน: อย่าแทนที่การตรวจสอบด้วยความมั่นใจทางภาษา อย่าเครมว่าแน่ถ้าไม่มี artifact, hash, test, หรือแหล่งอ้างอิง.\n"
f"4. Verification: ระบุข้อมูลนำเข้า เกณฑ์ผ่าน/ไม่ผ่าน คำสั่งหรือขั้นตอนทดสอบ และบันทึกผลเป็น manifest.\n"
f"5. Compression note: เก็บสูตรตัดสินใจ ไม่เก็บประโยคจำสำเร็จรูป เพื่อให้โมเดลถ่ายโอนความรู้ไปงานใหม่ได้.{safety}\n\n"
f"สรุปสั้น: คำตอบที่ดีต้องแม่น ตรวจซ้ำได้ และยอมรับขอบเขตของหลักฐานเสมอ."
)
return (
f"Core answer: `{axis.name}` should be handled by defining the {frame} target first, then binding each claim to evidence.\n\n"
f"1. Reasoning frame: split the task into context, inputs, constraints, desired output, and reproducible checks.\n"
f"2. Method at {difficulty} level: form a small hypothesis, test it with examples, discard unsupported parts, and keep the reusable rule.\n"
f"3. Failure mode: fluent text is not proof. Do not claim certainty without an artifact, hash, test result, or cited source.\n"
f"4. Verification: record the input, pass/fail criteria, command or inspection method, and final manifest.\n"
f"5. Compression note: store the decision procedure rather than memorized wording so the model can transfer it to new tasks.{safety}\n\n"
f"Bottom line: the answer is high quality only when it is useful, bounded, and auditable."
)
def _source_grounded_assistant(
axis: Axis,
frame: str,
variant: int,
difficulty: str,
lang: str,
evidence: dict[str, str],
) -> str:
excerpt = evidence["excerpt"]
src = evidence["source_path"]
h = evidence["source_sha256"][:16]
safety = ""
if axis.safety_boundary == "defensive_only":
safety = "\nขอบเขตความปลอดภัย: ใช้เพื่อป้องกัน วิเคราะห์ที่ได้รับอนุญาต และทำเอกสารเท่านั้น ไม่ให้ payload หรือขั้นตอนโจมตี."
elif axis.safety_boundary == "not_professional_advice":
safety = "\nขอบเขต: เป็นการศึกษาและช่วยจัดกรอบ ไม่ใช่คำแนะนำแพทย์ กฎหมาย หรือการเงินสำหรับการตัดสินใจจริง."
if lang == "th":
return (
f"Evidence anchor: `{src}` sha256-prefix `{h}`\n\n"
f"Excerpt:\n```text\n{excerpt}\n```\n\n"
f"คำตอบ: สำหรับ `{axis.name}` ให้ทำงานแบบ {frame} โดยยึด excerpt นี้เป็นหลักฐานตั้งต้น ไม่ขยายข้อเท็จจริงเกินสิ่งที่ตรวจได้.\n\n"
f"1. แก่นที่ดึงได้: ระบุคำสำคัญ โครงสร้าง หรือพฤติกรรมที่ปรากฏในหลักฐานจริงก่อน.\n"
f"2. เหตุผลระดับ {difficulty}: เชื่อมข้อมูลใน excerpt กับเป้าหมายงานทีละขั้น และแยกสิ่งที่รู้จริงออกจากข้อสมมติ.\n"
f"3. การตรวจซ้ำ: เปิดไฟล์ต้นทาง ตรวจ hash/ตำแหน่ง chunk แล้วเทียบข้อความก่อนใช้เป็นคำตอบหรือข้อมูลฝึก.\n"
f"4. การบีบอัดความรู้: เก็บกฎการอ่านหลักฐานและข้อจำกัด ไม่จำประโยคตอบแบบตายตัว.{safety}\n\n"
f"สรุป: คำตอบนี้ source-grounded เพราะชี้กลับไปยัง artifact จริงได้."
)
return (
f"Evidence anchor: `{src}` sha256-prefix `{h}`\n\n"
f"Excerpt:\n```text\n{excerpt}\n```\n\n"
f"Answer: for `{axis.name}`, perform the {frame} task by treating this excerpt as the starting evidence, not as permission to invent extra facts.\n\n"
f"1. Extracted signal: name the terms, structure, or behavior visible in the evidence first.\n"
f"2. {difficulty.title()} reasoning: connect the excerpt to the task step by step and separate verified facts from assumptions.\n"
f"3. Reproducibility: reopen the source file, verify hash/chunk position, and compare the text before using it for training or answers.\n"
f"4. Compression: store the evidence-reading procedure and limits, not a memorized canned response.{safety}\n\n"
f"Summary: this response is source-grounded because it can be traced back to a real artifact."
)
def _record(axis: Axis, frame: str, variant: int) -> dict[str, Any]:
difficulty = DIFFICULTIES[(variant + int(axis.axis_id[-3:])) % len(DIFFICULTIES)]
lang = axis.languages[variant % len(axis.languages)]
user = _thai_task(axis, frame, variant, difficulty) if lang == "th" else _english_task(axis, frame, variant, difficulty)
assistant = _assistant(axis, frame, variant, difficulty, lang)
fingerprint = _sha(f"{axis.axis_id}|{frame}|{variant}|{difficulty}|{lang}")
return {
"messages": [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
],
"source": "coverage_100k_forge",
"metadata": {
"schema_version": SCHEMA_VERSION,
"axis_id": axis.axis_id,
"axis_name": axis.name,
"macro_domain": axis.macro_domain,
"difficulty": difficulty,
"language": lang,
"task_frame": frame,
"variant": variant,
"fingerprint_sha256": fingerprint,
"safety_boundary": axis.safety_boundary,
"quality_tags": [
"instruction_following",
"reasoning",
"grounding",
"verification",
"natural_language",
"compression_ready",
],
},
}
def _source_record(axis: Axis, frame: str, variant: int, evidence: dict[str, str]) -> dict[str, Any]:
difficulty = DIFFICULTIES[(variant + int(axis.axis_id[-3:])) % len(DIFFICULTIES)]
lang = axis.languages[variant % len(axis.languages)]
user = (
f"จากหลักฐานจริง chunk `{evidence['chunk_id']}` ช่วย {frame} หัวข้อ `{axis.name}` "
f"และตอบให้ตรวจย้อนกลับแหล่งข้อมูลได้"
if lang == "th"
else f"Using real evidence chunk `{evidence['chunk_id']}`, {frame} the topic `{axis.name}` and make the answer traceable."
)
assistant = _source_grounded_assistant(axis, frame, variant, difficulty, lang, evidence)
fingerprint = _sha(f"{axis.axis_id}|{frame}|{variant}|{difficulty}|{lang}|{evidence['chunk_id']}")
return {
"messages": [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
],
"source": "coverage_100k_real_source_forge",
"metadata": {
"schema_version": SCHEMA_VERSION,
"axis_id": axis.axis_id,
"axis_name": axis.name,
"macro_domain": axis.macro_domain,
"difficulty": difficulty,
"language": lang,
"task_frame": frame,
"variant": variant,
"fingerprint_sha256": fingerprint,
"safety_boundary": axis.safety_boundary,
"source_path": evidence["source_path"],
"source_sha256": evidence["source_sha256"],
"chunk_id": evidence["chunk_id"],
"chunk_offset": int(evidence["chunk_offset"]),
"quality_tags": [
"real_source",
"instruction_following",
"reasoning",
"grounding",
"verification",
"natural_language",
"compression_ready",
],
},
}
def _clean_text(text: str) -> str:
text = text.replace("\x00", " ")
text = re.sub(r"\s+", " ", text).strip()
return text
def _collect_texts(obj: Any, limit: int = 120_000) -> list[str]:
texts: list[str] = []
total_chars = 0
def walk(value: Any) -> None:
nonlocal total_chars
if total_chars >= limit:
return
if isinstance(value, str):
cleaned = _clean_text(value)
if len(cleaned) >= 80:
texts.append(cleaned)
total_chars += len(cleaned)
elif isinstance(value, list):
for item in value[:200]:
walk(item)
elif isinstance(value, dict):
for key in ("content", "text", "body", "description", "summary", "title", "messages", "metadata"):
if key in value:
walk(value[key])
walk(obj)
return texts
def _candidate_files(source_roots: list[str | Path]):
seen: set[Path] = set()
for root in source_roots:
p = Path(root)
if not p.is_absolute():
p = Path(__file__).resolve().parents[1] / p
if p.is_file() and p.suffix.lower() in SOURCE_SUFFIXES:
resolved = p.resolve()
if resolved not in seen:
seen.add(resolved)
yield p
elif p.is_dir():
for dirpath, dirnames, filenames in os.walk(p):
dirnames[:] = sorted(d for d in dirnames if d not in EXCLUDED_DIR_NAMES)
for name in sorted(filenames):
f = Path(dirpath) / name
if f.suffix.lower() not in SOURCE_SUFFIXES:
continue
resolved = f.resolve()
if resolved in seen:
continue
seen.add(resolved)
yield f
def _iter_source_chunks(
source_roots: list[str | Path],
*,
chunk_chars: int = 900,
stride_chars: int = 650,
max_file_chars: int = 8_000_000,
max_chunks_per_file: int = 5_000,
):
for path in _candidate_files(source_roots):
try:
raw = path.read_text(encoding="utf-8", errors="replace")
except OSError:
continue
if not raw.strip():
continue
if len(raw) > max_file_chars:
raw = raw[:max_file_chars]
file_hash = _sha(raw)
text = _clean_text(raw)
if len(text) < 80:
continue
rel = str(path)
chunk_no = 0
if len(text) <= chunk_chars:
yield {
"source_path": rel,
"source_sha256": file_hash,
"chunk_id": f"{file_hash[:12]}:{chunk_no:06d}",
"chunk_offset": str(0),
"excerpt": text,
}
continue
for offset in range(0, max(1, len(text) - chunk_chars + 1), stride_chars):
excerpt = text[offset : offset + chunk_chars].strip()
if len(excerpt) < 80:
continue
yield {
"source_path": rel,
"source_sha256": file_hash,
"chunk_id": f"{file_hash[:12]}:{chunk_no:06d}",
"chunk_offset": str(offset),
"excerpt": excerpt,
}
chunk_no += 1
if chunk_no >= max_chunks_per_file:
break
def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n")
def _write_row(handle, row: dict[str, Any]) -> None:
handle.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n")
def _file_sha(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def build_coverage_100k_dataset(
out_dir: str | Path,
*,
target_records: int = 100_000,
variants_per_axis: int | None = None,
eval_fraction: float = 0.01,
source_roots: list[str | Path] | None = None,
source_grounded: bool = True,
) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
axes = _axes()
if target_records < COVERAGE_AXIS_COUNT:
raise ValueError(f"target_records must be >= {COVERAGE_AXIS_COUNT}")
if variants_per_axis is None:
variants_per_axis = (target_records + COVERAGE_AXIS_COUNT - 1) // COVERAGE_AXIS_COUNT
train_path = out / "coverage_100_axis_train.jsonl"
eval_path = out / "coverage_100_axis_eval.jsonl"
manifest_path = out / "coverage_100k_manifest.json"
eval_mod = max(1, round(1 / max(0.001, min(eval_fraction, 0.5))))
records_written = 0
train_records = 0
eval_records = 0
domain_counts: Counter[str] = Counter()
lang_counts: Counter[str] = Counter()
source_chunks_used = 0
source_files_seen: set[str] = set()
def emit(row: dict[str, Any], train_f, eval_f) -> None:
nonlocal records_written, train_records, eval_records
metadata = row["metadata"]
domain_counts[metadata["macro_domain"]] += 1
lang_counts[metadata["language"]] += 1
if records_written % eval_mod == 0:
_write_row(eval_f, row)
eval_records += 1
else:
_write_row(train_f, row)
train_records += 1
records_written += 1
with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open(
"w", encoding="utf-8", newline="\n"
) as eval_f:
if source_grounded:
roots = list(source_roots or DEFAULT_SOURCE_ROOTS)
chunk_iter = _iter_source_chunks(roots)
for global_variant, evidence in enumerate(chunk_iter):
axis = axes[global_variant % len(axes)]
axis_variant = global_variant // len(axes)
frame = TASK_FRAMES[(axis_variant + int(axis.axis_id[-3:])) % len(TASK_FRAMES)]
emit(_source_record(axis, frame, axis_variant, evidence), train_f, eval_f)
source_chunks_used += 1
source_files_seen.add(evidence["source_path"])
if records_written >= target_records:
break
else:
for axis in axes:
for variant in range(variants_per_axis):
frame = TASK_FRAMES[(variant + int(axis.axis_id[-3:])) % len(TASK_FRAMES)]
emit(_record(axis, frame, variant), train_f, eval_f)
if records_written >= target_records:
break
if records_written >= target_records:
break
if source_grounded and records_written < target_records:
raise RuntimeError(
f"real source chunks produced {records_written} records, below target {target_records}. "
"Add more source roots or lower --target-records."
)
report: dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"created_at": datetime.now(timezone.utc).isoformat(),
"summary": {
"axis_count": len(axes),
"records_written": records_written,
"train_records": train_records,
"eval_records": eval_records,
"target_records": target_records,
"variants_per_axis": variants_per_axis,
"macro_domain_counts": dict(sorted(domain_counts.items())),
"language_counts": dict(sorted(lang_counts.items())),
"source_grounded": source_grounded,
"source_chunks_used": source_chunks_used,
"source_files_used": len(source_files_seen),
},
"outputs": {
"train_jsonl": str(train_path),
"eval_jsonl": str(eval_path),
"train_sha256": _file_sha(train_path),
"eval_sha256": _file_sha(eval_path),
},
"axes": [axis.__dict__ for axis in axes],
"claim_gate": {
"coverage_axes_ready": len(axes) == COVERAGE_AXIS_COUNT,
"coverage_100k_ready": records_written >= 100_000,
"real_source_grounding_ready": (not source_grounded) or (source_chunks_used == records_written and len(source_files_seen) > 0),
"quality_100_percent_claim_allowed": False,
"world_best_claim_allowed": False,
"reason": "This forge expands deterministic supervised coverage. It does not prove 100% capability until external and local eval gates pass.",
},
}
report["manifest_path"] = str(manifest_path)
manifest_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return report

Xet Storage Details

Size:
27.5 kB
·
Xet hash:
1e4a985cd2d6d798495215d673e1ee10b0133691db08683f9da07fe0630c99b9

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.