bbkdevops's picture
download
raw
14.8 kB
"""Full-cycle pure knowledge development pipeline for TinyMind."""
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
import hashlib
import json
import re
from pathlib import Path
from typing import Iterable
from data.expert_curriculum_forge import COVERAGE_TAGS, ExpertCurriculumForge, JUNK_MARKERS
from evaluation.knowledge_dashboard import run_knowledge_dashboard
from evaluation.local_evidence import run_local_train_eval_bundle
REQUIRED_FIELDS = (
"schema_version",
"domain",
"lang",
"question",
"answer",
"claim",
"evidence",
"verification",
"source",
"license",
"quality_score",
"rarity_score",
)
TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE)
def _norm(text: str) -> str:
return re.sub(r"\s+", " ", text.strip().lower())
def _read_jsonl(path: str | Path) -> list[dict]:
p = Path(path)
if not p.exists():
return []
return [json.loads(line) for line in p.read_text(encoding="utf-8").splitlines() if line.strip()]
def _tokens(text: str) -> set[str]:
return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok.strip()) >= 2}
def _record_sha(row: dict) -> str:
return hashlib.sha256(json.dumps(row, ensure_ascii=False, sort_keys=True).encode("utf-8")).hexdigest()
class SourceTraceIndex:
"""Small-data source linker with exact provenance pointers."""
def __init__(self, entries: list[dict]):
self.entries = entries
@classmethod
def from_records(cls, records: Iterable[dict]) -> "SourceTraceIndex":
entries: list[dict] = []
for row in records:
text = "\n".join(
str(row.get(field, ""))
for field in ("domain", "question", "answer", "claim", "evidence", "verification", "source")
)
primary_text = "\n".join(str(row.get(field, "")) for field in ("domain", "question", "answer", "evidence"))
entries.append(
{
"id": row.get("id"),
"domain": row.get("domain"),
"lang": row.get("lang"),
"question": row.get("question"),
"answer": row.get("answer"),
"claim": row.get("claim"),
"evidence": row.get("evidence"),
"verification": row.get("verification"),
"source": row.get("source"),
"license": row.get("license"),
"record_sha256": _record_sha(row),
"search_text": text.lower(),
"primary_terms": sorted(_tokens(primary_text)),
"terms": sorted(_tokens(text)),
}
)
return cls(entries)
def write(self, path: str | Path) -> dict:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
payload = {
"schema_version": "tinymind-source-trace-index-v1",
"entry_count": len(self.entries),
"entries": self.entries,
}
p.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return {"index_path": str(p), "entry_count": len(self.entries)}
@classmethod
def read(cls, path: str | Path) -> "SourceTraceIndex":
payload = json.loads(Path(path).read_text(encoding="utf-8"))
return cls(list(payload.get("entries", [])))
def query(self, query: str, top_k: int = 3) -> list[dict]:
q_terms = _tokens(query)
scored: list[tuple[float, dict]] = []
for entry in self.entries:
terms = set(entry.get("terms", []))
primary_terms = set(entry.get("primary_terms", []))
overlap = q_terms & terms
if not overlap:
score = 0.0
else:
score = len(overlap) / max(len(q_terms), 1)
score += 0.4 * len(q_terms & primary_terms) / max(len(q_terms), 1)
if str(entry.get("domain", "")).lower() in q_terms:
score += 0.25
search_text = str(entry.get("search_text", ""))
for term in q_terms:
if term and term in search_text:
score += 0.05
scored.append((score, entry))
scored.sort(key=lambda item: (item[0], str(item[1].get("id", ""))), reverse=True)
hits = []
for score, entry in scored[: max(1, int(top_k))]:
hit = {key: value for key, value in entry.items() if key not in {"terms", "primary_terms", "search_text"}}
hit["score"] = score
hit["matched"] = score > 0
hits.append(hit)
return hits
def audit_pure_records(records: Iterable[dict]) -> dict:
rows = list(records)
seen_questions: set[str] = set()
passed: list[dict] = []
blocked: list[dict] = []
domain_counts: Counter[str] = Counter()
lang_counts: Counter[str] = Counter()
for index, row in enumerate(rows):
reasons: list[str] = []
for field in REQUIRED_FIELDS:
value = row.get(field)
if value is None or (isinstance(value, str) and not value.strip()):
reasons.append(f"missing_{field}")
text = "\n".join(str(row.get(field, "")) for field in ("question", "answer", "claim", "evidence", "verification"))
lowered = text.lower()
if any(marker in lowered for marker in JUNK_MARKERS):
reasons.append("junk_marker")
if float(row.get("quality_score", 0.0) or 0.0) < 0.95:
reasons.append("quality_below_0.95")
if float(row.get("rarity_score", 0.0) or 0.0) < 0.7:
reasons.append("rarity_below_0.70")
if len(str(row.get("answer", "")).strip()) < 60:
reasons.append("answer_too_short")
key = f"{row.get('domain', '')}:{row.get('lang', '')}:{_norm(str(row.get('question', '')))}"
if key in seen_questions:
reasons.append("duplicate_normalized_question")
seen_questions.add(key)
if reasons:
blocked.append({"index": index, "id": row.get("id"), "domain": row.get("domain"), "reasons": reasons})
continue
passed.append(row)
domain_counts[str(row.get("domain", "unknown"))] += 1
lang_counts[str(row.get("lang", "unknown"))] += 1
total = len(rows)
return {
"schema_version": "tinymind-pure-knowledge-audit-v1",
"total_records": total,
"passed_records": len(passed),
"blocked_records": len(blocked),
"purity_score": len(passed) / max(total, 1),
"domain_counts": dict(domain_counts),
"lang_counts": dict(lang_counts),
"blocked": blocked,
"passed_sha256": hashlib.sha256(
"\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in passed).encode("utf-8")
).hexdigest(),
}
def evaluate_natural_answer_style(records: Iterable[dict]) -> dict:
rows = list(records)
checked = []
passed = 0
for row in rows:
answer = str(row.get("answer", ""))
word_like = len(_tokens(answer))
compact_chars = len(re.sub(r"\s+", "", answer))
has_order = any(marker in answer for marker in ("จากนั้น", "อย่างไร", "ตัวอย่าง", "ข้อจำกัด", "then", "example", "uncertainty", "evidence"))
no_markup = "<" not in answer and ">" not in answer
not_overfocused_on_cev = answer.lower().count("cev") <= 2
detailed_enough = word_like >= 18 or compact_chars >= 90
ok = detailed_enough and has_order and no_markup and not_overfocused_on_cev
passed += int(ok)
checked.append(
{
"id": row.get("id"),
"domain": row.get("domain"),
"passed": ok,
"word_like_terms": word_like,
"compact_chars": compact_chars,
"detailed_enough": detailed_enough,
"has_explanation_order": has_order,
"no_markup": no_markup,
"not_overfocused_on_cev": not_overfocused_on_cev,
}
)
return {
"schema_version": "tinymind-natural-answer-style-v1",
"samples": len(rows),
"passed": passed,
"score": passed / max(len(rows), 1),
"rows": checked,
"rule": "Natural answers must be sufficiently detailed, ordered, markup-free, and not dominated by provenance jargon.",
}
def _coverage_gate(audit: dict) -> dict:
expected_domains = {
"thai_advanced_language",
"english_advanced_language",
"polyglot_code_projects",
"safe_cross_platform_commands",
"cev_claim_evidence_verification",
"sandbox_rl_lua_os_tools",
"natural_explanation_mastery",
"deep_learning_to_learn",
"strict_instruction_following",
"deep_logic_reasoning",
}
present = set(audit["domain_counts"])
missing = sorted(expected_domains - present)
coverage = 100.0 * len(present & expected_domains) / len(expected_domains)
return {
"passed": not missing,
"coverage_percent": coverage,
"expected_domains": sorted(expected_domains),
"present_domains": sorted(present),
"missing_domains": missing,
"coverage_tags": list(COVERAGE_TAGS),
}
def run_knowledge_full_cycle(
out_dir: str | Path,
records_per_domain: int = 4,
train_steps: int = 12,
mmlu_limit: int = 20,
seed: int = 20260523,
skip_dashboard: bool = False,
) -> dict:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
dataset_dir = out / "dataset"
train_dir = out / "train_eval"
dash_dir = out / "dashboard"
manifest = ExpertCurriculumForge(records_per_domain=records_per_domain, eval_ratio=0.2).write_jsonl(dataset_dir)
rows = _read_jsonl(manifest["train_path"]) + _read_jsonl(manifest["eval_path"])
audit = audit_pure_records(rows)
natural_style = evaluate_natural_answer_style(rows)
source_index = SourceTraceIndex.from_records(rows)
source_index_path = out / "source_trace_index.json"
source_meta = source_index.write(source_index_path)
source_trace = {
**source_meta,
"method": "lexical-overlap over CEV/source/provenance fields with per-record sha256",
"example_queries": [
{"query": "CEV หลักฐานตรวจซ้ำ", "hits": source_index.query("CEV หลักฐานตรวจซ้ำ", top_k=2)},
{"query": "PowerShell dry run safe command", "hits": source_index.query("PowerShell dry run safe command", top_k=2)},
],
}
coverage = _coverage_gate(audit)
pure_gate = {
"passed": audit["blocked_records"] == 0 and audit["purity_score"] == 1.0,
"purity_score": audit["purity_score"],
"blocked_records": audit["blocked_records"],
"policy": "strict CEV + provenance + dedupe + junk-marker + quality/rarity threshold",
}
natural_gate = {
"passed": natural_style["score"] >= 0.75,
"score": natural_style["score"],
"policy": "answers should be natural, ordered, detailed, markup-free, and not dominated by provenance jargon",
}
train = run_local_train_eval_bundle(
train_dir,
train_steps=train_steps,
context_lengths=(32, 128, 1024),
seed=seed,
records=rows,
)
dashboard = None
if not skip_dashboard:
dashboard = run_knowledge_dashboard(
checkpoint_path=train["artifacts"]["checkpoint"],
out_dir=dash_dir,
mmlu_limit=mmlu_limit,
int4_artifact_path=train["artifacts"].get("int4_artifact"),
)
report = {
"schema_version": "tinymind-knowledge-full-cycle-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"goal": "full-cycle pure knowledge development with measurable gates",
"dataset_manifest": manifest,
"audit": audit,
"natural_answer_style": natural_style,
"source_trace": source_trace,
"pure_gate": pure_gate,
"natural_gate": natural_gate,
"coverage_gate": coverage,
"train_eval": train.get("train_eval", {}),
"artifacts": train.get("artifacts", {}),
"dashboard": dashboard,
"full_cycle_100_percent_definition": (
"100% means all declared domains are covered, all records pass strict purity audit, "
"each record links back to source/evidence/verification hashes, training/evaluation artifacts exist, "
"and unsupported world-best claims remain blocked."
),
"full_cycle_gate": {
"passed": bool(pure_gate["passed"] and natural_gate["passed"] and coverage["passed"] and train.get("artifacts")),
"world_best_claim_allowed": False,
"notes": "This gate measures pipeline completeness and data purity, not perfect universal knowledge.",
},
}
json_path = out / "knowledge_full_cycle_report.json"
md_path = out / "knowledge_full_cycle_report.md"
report["json_path"] = str(json_path)
report["markdown_path"] = str(md_path)
json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
md_path.write_text(_markdown(report), encoding="utf-8")
return report
def _markdown(report: dict) -> str:
train = report.get("train_eval", {})
dash = report.get("dashboard") or {}
scores = dash.get("summary_scores", {})
lines = [
"# TinyMind Knowledge Full Cycle",
"",
f"- Pure gate: {report['pure_gate']['passed']} ({report['pure_gate']['purity_score']:.2%})",
f"- Natural answer gate: {report['natural_gate']['passed']} ({report['natural_gate']['score']:.2%})",
f"- Coverage gate: {report['coverage_gate']['passed']} ({report['coverage_gate']['coverage_percent']:.1f}%)",
f"- Full-cycle gate: {report['full_cycle_gate']['passed']}",
f"- Source trace entries: {report['source_trace']['entry_count']}",
f"- Natural answer style: {report['natural_answer_style']['score']:.2%}",
f"- Eval loss: {train.get('eval_loss', 'missing')}",
f"- Perplexity: {train.get('perplexity', 'missing')}",
f"- Dashboard knowledge: {scores.get('knowledge', 'skipped')}",
f"- Dashboard instruction: {scores.get('instruction', 'skipped')}",
f"- Dashboard translation: {scores.get('translation', 'skipped')}",
"- World-best claim allowed: false",
"",
"## Definition",
"",
report["full_cycle_100_percent_definition"],
"",
]
return "\n".join(lines)

Xet Storage Details

Size:
14.8 kB
·
Xet hash:
904d823d857810fd3d1cd9066956b776064de4e853a6ff7fb150382e45e69f41

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.