Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /evaluation /knowledge_full_cycle.py
| """Full-cycle pure knowledge development pipeline for TinyMind.""" | |
| from __future__ import annotations | |
| from collections import Counter | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| import re | |
| from pathlib import Path | |
| from typing import Iterable | |
| from data.expert_curriculum_forge import COVERAGE_TAGS, ExpertCurriculumForge, JUNK_MARKERS | |
| from evaluation.knowledge_dashboard import run_knowledge_dashboard | |
| from evaluation.local_evidence import run_local_train_eval_bundle | |
| REQUIRED_FIELDS = ( | |
| "schema_version", | |
| "domain", | |
| "lang", | |
| "question", | |
| "answer", | |
| "claim", | |
| "evidence", | |
| "verification", | |
| "source", | |
| "license", | |
| "quality_score", | |
| "rarity_score", | |
| ) | |
| TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE) | |
| def _norm(text: str) -> str: | |
| return re.sub(r"\s+", " ", text.strip().lower()) | |
| def _read_jsonl(path: str | Path) -> list[dict]: | |
| p = Path(path) | |
| if not p.exists(): | |
| return [] | |
| return [json.loads(line) for line in p.read_text(encoding="utf-8").splitlines() if line.strip()] | |
| def _tokens(text: str) -> set[str]: | |
| return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok.strip()) >= 2} | |
| def _record_sha(row: dict) -> str: | |
| return hashlib.sha256(json.dumps(row, ensure_ascii=False, sort_keys=True).encode("utf-8")).hexdigest() | |
| class SourceTraceIndex: | |
| """Small-data source linker with exact provenance pointers.""" | |
| def __init__(self, entries: list[dict]): | |
| self.entries = entries | |
| def from_records(cls, records: Iterable[dict]) -> "SourceTraceIndex": | |
| entries: list[dict] = [] | |
| for row in records: | |
| text = "\n".join( | |
| str(row.get(field, "")) | |
| for field in ("domain", "question", "answer", "claim", "evidence", "verification", "source") | |
| ) | |
| primary_text = "\n".join(str(row.get(field, "")) for field in ("domain", "question", "answer", "evidence")) | |
| entries.append( | |
| { | |
| "id": row.get("id"), | |
| "domain": row.get("domain"), | |
| "lang": row.get("lang"), | |
| "question": row.get("question"), | |
| "answer": row.get("answer"), | |
| "claim": row.get("claim"), | |
| "evidence": row.get("evidence"), | |
| "verification": row.get("verification"), | |
| "source": row.get("source"), | |
| "license": row.get("license"), | |
| "record_sha256": _record_sha(row), | |
| "search_text": text.lower(), | |
| "primary_terms": sorted(_tokens(primary_text)), | |
| "terms": sorted(_tokens(text)), | |
| } | |
| ) | |
| return cls(entries) | |
| def write(self, path: str | Path) -> dict: | |
| p = Path(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| payload = { | |
| "schema_version": "tinymind-source-trace-index-v1", | |
| "entry_count": len(self.entries), | |
| "entries": self.entries, | |
| } | |
| p.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return {"index_path": str(p), "entry_count": len(self.entries)} | |
| def read(cls, path: str | Path) -> "SourceTraceIndex": | |
| payload = json.loads(Path(path).read_text(encoding="utf-8")) | |
| return cls(list(payload.get("entries", []))) | |
| def query(self, query: str, top_k: int = 3) -> list[dict]: | |
| q_terms = _tokens(query) | |
| scored: list[tuple[float, dict]] = [] | |
| for entry in self.entries: | |
| terms = set(entry.get("terms", [])) | |
| primary_terms = set(entry.get("primary_terms", [])) | |
| overlap = q_terms & terms | |
| if not overlap: | |
| score = 0.0 | |
| else: | |
| score = len(overlap) / max(len(q_terms), 1) | |
| score += 0.4 * len(q_terms & primary_terms) / max(len(q_terms), 1) | |
| if str(entry.get("domain", "")).lower() in q_terms: | |
| score += 0.25 | |
| search_text = str(entry.get("search_text", "")) | |
| for term in q_terms: | |
| if term and term in search_text: | |
| score += 0.05 | |
| scored.append((score, entry)) | |
| scored.sort(key=lambda item: (item[0], str(item[1].get("id", ""))), reverse=True) | |
| hits = [] | |
| for score, entry in scored[: max(1, int(top_k))]: | |
| hit = {key: value for key, value in entry.items() if key not in {"terms", "primary_terms", "search_text"}} | |
| hit["score"] = score | |
| hit["matched"] = score > 0 | |
| hits.append(hit) | |
| return hits | |
| def audit_pure_records(records: Iterable[dict]) -> dict: | |
| rows = list(records) | |
| seen_questions: set[str] = set() | |
| passed: list[dict] = [] | |
| blocked: list[dict] = [] | |
| domain_counts: Counter[str] = Counter() | |
| lang_counts: Counter[str] = Counter() | |
| for index, row in enumerate(rows): | |
| reasons: list[str] = [] | |
| for field in REQUIRED_FIELDS: | |
| value = row.get(field) | |
| if value is None or (isinstance(value, str) and not value.strip()): | |
| reasons.append(f"missing_{field}") | |
| text = "\n".join(str(row.get(field, "")) for field in ("question", "answer", "claim", "evidence", "verification")) | |
| lowered = text.lower() | |
| if any(marker in lowered for marker in JUNK_MARKERS): | |
| reasons.append("junk_marker") | |
| if float(row.get("quality_score", 0.0) or 0.0) < 0.95: | |
| reasons.append("quality_below_0.95") | |
| if float(row.get("rarity_score", 0.0) or 0.0) < 0.7: | |
| reasons.append("rarity_below_0.70") | |
| if len(str(row.get("answer", "")).strip()) < 60: | |
| reasons.append("answer_too_short") | |
| key = f"{row.get('domain', '')}:{row.get('lang', '')}:{_norm(str(row.get('question', '')))}" | |
| if key in seen_questions: | |
| reasons.append("duplicate_normalized_question") | |
| seen_questions.add(key) | |
| if reasons: | |
| blocked.append({"index": index, "id": row.get("id"), "domain": row.get("domain"), "reasons": reasons}) | |
| continue | |
| passed.append(row) | |
| domain_counts[str(row.get("domain", "unknown"))] += 1 | |
| lang_counts[str(row.get("lang", "unknown"))] += 1 | |
| total = len(rows) | |
| return { | |
| "schema_version": "tinymind-pure-knowledge-audit-v1", | |
| "total_records": total, | |
| "passed_records": len(passed), | |
| "blocked_records": len(blocked), | |
| "purity_score": len(passed) / max(total, 1), | |
| "domain_counts": dict(domain_counts), | |
| "lang_counts": dict(lang_counts), | |
| "blocked": blocked, | |
| "passed_sha256": hashlib.sha256( | |
| "\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in passed).encode("utf-8") | |
| ).hexdigest(), | |
| } | |
| def evaluate_natural_answer_style(records: Iterable[dict]) -> dict: | |
| rows = list(records) | |
| checked = [] | |
| passed = 0 | |
| for row in rows: | |
| answer = str(row.get("answer", "")) | |
| word_like = len(_tokens(answer)) | |
| compact_chars = len(re.sub(r"\s+", "", answer)) | |
| has_order = any(marker in answer for marker in ("จากนั้น", "อย่างไร", "ตัวอย่าง", "ข้อจำกัด", "then", "example", "uncertainty", "evidence")) | |
| no_markup = "<" not in answer and ">" not in answer | |
| not_overfocused_on_cev = answer.lower().count("cev") <= 2 | |
| detailed_enough = word_like >= 18 or compact_chars >= 90 | |
| ok = detailed_enough and has_order and no_markup and not_overfocused_on_cev | |
| passed += int(ok) | |
| checked.append( | |
| { | |
| "id": row.get("id"), | |
| "domain": row.get("domain"), | |
| "passed": ok, | |
| "word_like_terms": word_like, | |
| "compact_chars": compact_chars, | |
| "detailed_enough": detailed_enough, | |
| "has_explanation_order": has_order, | |
| "no_markup": no_markup, | |
| "not_overfocused_on_cev": not_overfocused_on_cev, | |
| } | |
| ) | |
| return { | |
| "schema_version": "tinymind-natural-answer-style-v1", | |
| "samples": len(rows), | |
| "passed": passed, | |
| "score": passed / max(len(rows), 1), | |
| "rows": checked, | |
| "rule": "Natural answers must be sufficiently detailed, ordered, markup-free, and not dominated by provenance jargon.", | |
| } | |
| def _coverage_gate(audit: dict) -> dict: | |
| expected_domains = { | |
| "thai_advanced_language", | |
| "english_advanced_language", | |
| "polyglot_code_projects", | |
| "safe_cross_platform_commands", | |
| "cev_claim_evidence_verification", | |
| "sandbox_rl_lua_os_tools", | |
| "natural_explanation_mastery", | |
| "deep_learning_to_learn", | |
| "strict_instruction_following", | |
| "deep_logic_reasoning", | |
| } | |
| present = set(audit["domain_counts"]) | |
| missing = sorted(expected_domains - present) | |
| coverage = 100.0 * len(present & expected_domains) / len(expected_domains) | |
| return { | |
| "passed": not missing, | |
| "coverage_percent": coverage, | |
| "expected_domains": sorted(expected_domains), | |
| "present_domains": sorted(present), | |
| "missing_domains": missing, | |
| "coverage_tags": list(COVERAGE_TAGS), | |
| } | |
| def run_knowledge_full_cycle( | |
| out_dir: str | Path, | |
| records_per_domain: int = 4, | |
| train_steps: int = 12, | |
| mmlu_limit: int = 20, | |
| seed: int = 20260523, | |
| skip_dashboard: bool = False, | |
| ) -> dict: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| dataset_dir = out / "dataset" | |
| train_dir = out / "train_eval" | |
| dash_dir = out / "dashboard" | |
| manifest = ExpertCurriculumForge(records_per_domain=records_per_domain, eval_ratio=0.2).write_jsonl(dataset_dir) | |
| rows = _read_jsonl(manifest["train_path"]) + _read_jsonl(manifest["eval_path"]) | |
| audit = audit_pure_records(rows) | |
| natural_style = evaluate_natural_answer_style(rows) | |
| source_index = SourceTraceIndex.from_records(rows) | |
| source_index_path = out / "source_trace_index.json" | |
| source_meta = source_index.write(source_index_path) | |
| source_trace = { | |
| **source_meta, | |
| "method": "lexical-overlap over CEV/source/provenance fields with per-record sha256", | |
| "example_queries": [ | |
| {"query": "CEV หลักฐานตรวจซ้ำ", "hits": source_index.query("CEV หลักฐานตรวจซ้ำ", top_k=2)}, | |
| {"query": "PowerShell dry run safe command", "hits": source_index.query("PowerShell dry run safe command", top_k=2)}, | |
| ], | |
| } | |
| coverage = _coverage_gate(audit) | |
| pure_gate = { | |
| "passed": audit["blocked_records"] == 0 and audit["purity_score"] == 1.0, | |
| "purity_score": audit["purity_score"], | |
| "blocked_records": audit["blocked_records"], | |
| "policy": "strict CEV + provenance + dedupe + junk-marker + quality/rarity threshold", | |
| } | |
| natural_gate = { | |
| "passed": natural_style["score"] >= 0.75, | |
| "score": natural_style["score"], | |
| "policy": "answers should be natural, ordered, detailed, markup-free, and not dominated by provenance jargon", | |
| } | |
| train = run_local_train_eval_bundle( | |
| train_dir, | |
| train_steps=train_steps, | |
| context_lengths=(32, 128, 1024), | |
| seed=seed, | |
| records=rows, | |
| ) | |
| dashboard = None | |
| if not skip_dashboard: | |
| dashboard = run_knowledge_dashboard( | |
| checkpoint_path=train["artifacts"]["checkpoint"], | |
| out_dir=dash_dir, | |
| mmlu_limit=mmlu_limit, | |
| int4_artifact_path=train["artifacts"].get("int4_artifact"), | |
| ) | |
| report = { | |
| "schema_version": "tinymind-knowledge-full-cycle-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "goal": "full-cycle pure knowledge development with measurable gates", | |
| "dataset_manifest": manifest, | |
| "audit": audit, | |
| "natural_answer_style": natural_style, | |
| "source_trace": source_trace, | |
| "pure_gate": pure_gate, | |
| "natural_gate": natural_gate, | |
| "coverage_gate": coverage, | |
| "train_eval": train.get("train_eval", {}), | |
| "artifacts": train.get("artifacts", {}), | |
| "dashboard": dashboard, | |
| "full_cycle_100_percent_definition": ( | |
| "100% means all declared domains are covered, all records pass strict purity audit, " | |
| "each record links back to source/evidence/verification hashes, training/evaluation artifacts exist, " | |
| "and unsupported world-best claims remain blocked." | |
| ), | |
| "full_cycle_gate": { | |
| "passed": bool(pure_gate["passed"] and natural_gate["passed"] and coverage["passed"] and train.get("artifacts")), | |
| "world_best_claim_allowed": False, | |
| "notes": "This gate measures pipeline completeness and data purity, not perfect universal knowledge.", | |
| }, | |
| } | |
| json_path = out / "knowledge_full_cycle_report.json" | |
| md_path = out / "knowledge_full_cycle_report.md" | |
| report["json_path"] = str(json_path) | |
| report["markdown_path"] = str(md_path) | |
| json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| md_path.write_text(_markdown(report), encoding="utf-8") | |
| return report | |
| def _markdown(report: dict) -> str: | |
| train = report.get("train_eval", {}) | |
| dash = report.get("dashboard") or {} | |
| scores = dash.get("summary_scores", {}) | |
| lines = [ | |
| "# TinyMind Knowledge Full Cycle", | |
| "", | |
| f"- Pure gate: {report['pure_gate']['passed']} ({report['pure_gate']['purity_score']:.2%})", | |
| f"- Natural answer gate: {report['natural_gate']['passed']} ({report['natural_gate']['score']:.2%})", | |
| f"- Coverage gate: {report['coverage_gate']['passed']} ({report['coverage_gate']['coverage_percent']:.1f}%)", | |
| f"- Full-cycle gate: {report['full_cycle_gate']['passed']}", | |
| f"- Source trace entries: {report['source_trace']['entry_count']}", | |
| f"- Natural answer style: {report['natural_answer_style']['score']:.2%}", | |
| f"- Eval loss: {train.get('eval_loss', 'missing')}", | |
| f"- Perplexity: {train.get('perplexity', 'missing')}", | |
| f"- Dashboard knowledge: {scores.get('knowledge', 'skipped')}", | |
| f"- Dashboard instruction: {scores.get('instruction', 'skipped')}", | |
| f"- Dashboard translation: {scores.get('translation', 'skipped')}", | |
| "- World-best claim allowed: false", | |
| "", | |
| "## Definition", | |
| "", | |
| report["full_cycle_100_percent_definition"], | |
| "", | |
| ] | |
| return "\n".join(lines) | |
Xet Storage Details
- Size:
- 14.8 kB
- Xet hash:
- 904d823d857810fd3d1cd9066956b776064de4e853a6ff7fb150382e45e69f41
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.