bbkdevops's picture
download
raw
10.1 kB
"""High-purity knowledge essence distillation for TinyMind.
The distiller converts mixed training records into compact, generalizable
knowledge cards. It avoids raw memorization by removing instance-specific
numbers from principles, rejecting fixed templates/secrets, and preserving
provenance for audit.
"""
from __future__ import annotations
from collections import Counter
import json
from hashlib import sha256
from pathlib import Path
import re
from typing import Any, Iterable
SECRET_RE = re.compile(r"\b(?:hf|sk|sk-or|ghp|glpat)_[A-Za-z0-9_\-]{20,}\b")
FIXED_TEMPLATE_MARKERS = (
"use the expected structured tool call exactly as specified by the schema",
"start with read-only powershell diagnostics",
"tinymind model is not loaded yet",
)
def _text_from_record(record: dict[str, Any]) -> tuple[str, str, str]:
domain = str(record.get("domain") or record.get("category") or "general")
source = str(record.get("source") or record.get("source_dataset") or record.get("bucket_or_repo") or "local")
if isinstance(record.get("messages"), list):
parts = []
for message in record["messages"]:
if isinstance(message, dict):
parts.append(str(message.get("content", "")))
return "\n".join(parts), domain, source
prompt = str(record.get("prompt") or record.get("instruction") or record.get("question") or "")
completion = str(record.get("completion") or record.get("answer") or record.get("response") or record.get("output") or "")
text = str(record.get("text") or f"{prompt}\n{completion}".strip())
return text, domain, source
class KnowledgeEssenceDistiller:
def __init__(self, max_records: int = 50_000, min_chars: int = 48):
self.max_records = int(max_records)
self.min_chars = int(min_chars)
def distill(self, sources: Iterable[str | Path], out_dir: str | Path) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
output_path = out / "knowledge_essence_sft.jsonl"
manifest_path = out / "knowledge_essence_manifest.json"
reject_counts: Counter[str] = Counter()
domain_counts: Counter[str] = Counter()
seen: set[str] = set()
rows: list[dict[str, Any]] = []
scanned = 0
for source_path in [Path(path) for path in sources]:
if not source_path.exists():
reject_counts["missing_source"] += 1
continue
for line_no, line in enumerate(source_path.read_text(encoding="utf-8", errors="ignore").splitlines(), start=1):
if len(rows) >= self.max_records:
break
if not line.strip():
continue
scanned += 1
try:
record = json.loads(line)
except json.JSONDecodeError:
reject_counts["invalid_json"] += 1
continue
text, domain, source = _text_from_record(record if isinstance(record, dict) else {"text": str(record)})
reason = self._reject_reason(text)
if reason:
reject_counts[reason] += 1
continue
essence = self._extract_essence(text, domain)
digest = sha256(json.dumps(essence, ensure_ascii=False, sort_keys=True).encode("utf-8")).hexdigest()
if digest in seen:
reject_counts["duplicate_essence"] += 1
continue
seen.add(digest)
domain_counts[domain] += 1
rows.append(
{
"kind": "knowledge_essence",
"domain": domain,
"essence": essence,
"messages": self._to_messages(essence),
"provenance": {
"source_file": str(source_path),
"source": source,
"line": line_no,
"sha256": digest,
},
}
)
output_path.write_text(
"\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in rows) + ("\n" if rows else ""),
encoding="utf-8",
)
report = {
"schema_version": "tinymind-knowledge-essence-distiller-v1",
"sources": [str(Path(path)) for path in sources],
"scanned_records": scanned,
"kept_records": len(rows),
"rejected_records": sum(reject_counts.values()),
"reject_counts": dict(sorted(reject_counts.items())),
"domain_counts": dict(sorted(domain_counts.items())),
"output_jsonl": str(output_path),
"manifest_path": str(manifest_path),
"claim_gate": {
"trainable_sft_ready": bool(rows),
"raw_memory_replay_allowed": False,
"secret_scan_applied": True,
"fixed_template_rejected": True,
"world_best_claim_allowed": False,
},
}
manifest_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return report
def _reject_reason(self, text: str) -> str | None:
normalized = " ".join(text.lower().split())
if SECRET_RE.search(text):
return "secret_like"
if any(marker in normalized for marker in FIXED_TEMPLATE_MARKERS):
return "fixed_template"
if len(text.strip()) < self.min_chars:
return "too_short"
if self._low_information(text):
return "low_information"
return None
@staticmethod
def _low_information(text: str) -> bool:
words = re.findall(r"[\wก-๙]+", text.lower())
if len(words) < 8:
return True
return len(set(words)) / max(len(words), 1) < 0.25
def _extract_essence(self, text: str, domain: str) -> dict[str, str]:
cleaned = re.sub(r"\s+", " ", text).strip()
generic = re.sub(r"\b\d[\d,]*(?:\.\d+)?%?\b", "<value>", cleaned)
sentences = re.split(r"(?<=[.!?。!?])\s+|\n+", generic)
strongest = max((s.strip() for s in sentences if s.strip()), key=len, default=generic)
principle = self._principle_from_text(strongest, domain)
procedure = self._procedure_from_text(generic, domain)
pitfall = self._pitfall_from_text(generic, domain)
check = self._check_from_text(domain)
return {
"principle": principle,
"application_pattern": self._application_pattern(generic),
"procedure": procedure,
"pitfall": pitfall,
"check": check,
}
@staticmethod
def _principle_from_text(text: str, domain: str) -> str:
if domain in {"math", "thai_math_grounding"} or any(term in text.lower() for term in ["calculate", "คำนวณ", "คูณ", "หาร"]):
return "Separate variables, operation, and units before calculating; explain the reusable method rather than memorizing values."
if "tool" in text.lower() or "json" in text.lower():
return "Validate tool-call shape before execution: command name must be explicit and arguments must match the schema."
if "code" in domain or "coding" in domain:
return "Explain code by invariants, inputs, outputs, failure modes, and tests instead of copying implementation text."
return f"Extract the stable concept in {domain}: define the rule, when it applies, and how to verify it."
@staticmethod
def _procedure_from_text(text: str, domain: str) -> str:
if "tool" in text.lower() or "json" in text.lower():
return "1. Parse intent. 2. Validate schema. 3. Execute only after validation. 4. Report observation separately from conclusion."
if domain in {"math", "thai_math_grounding"} or "คำนวณ" in text:
return "1. Identify given values. 2. Choose the operation. 3. Compute step by step. 4. State unit and sanity-check magnitude."
return "1. Identify the invariant. 2. Apply it to the new context. 3. Check edge cases. 4. State uncertainty and evidence."
@staticmethod
def _pitfall_from_text(text: str, domain: str) -> str:
if "tool" in text.lower():
return "Do not claim a tool result before a valid tool call and observation exist."
if domain in {"math", "thai_math_grounding"} or "คำนวณ" in text:
return "Do not memorize the numeric answer; recompute when numbers, units, or rates change."
return "Do not replay source wording; preserve meaning while adapting to the user's actual question."
@staticmethod
def _check_from_text(domain: str) -> str:
return f"Holdout check for {domain}: paraphrase the task with changed surface details and verify the same principle still works."
@staticmethod
def _application_pattern(text: str) -> str:
words = re.findall(r"[\wก-๙<>]+", text)
compact = " ".join(words[:28])
return compact[:220] if compact else "Apply the principle to a paraphrased task with changed surface details."
@staticmethod
def _to_messages(essence: dict[str, str]) -> list[dict[str, str]]:
user = "สกัดแก่นความรู้จากกรณีนี้ให้ใช้กับโจทย์ใหม่ได้ โดยห้ามท่องจำข้อความเดิม"
assistant = (
f"หลักการ: {essence['principle']}\n"
f"รูปแบบการใช้: {essence['application_pattern']}\n"
f"ขั้นตอน: {essence['procedure']}\n"
f"จุดพลาดที่ต้องกัน: {essence['pitfall']}\n"
f"วิธีตรวจ: {essence['check']}"
)
return [{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]

Xet Storage Details

Size:
10.1 kB
·
Xet hash:
6af3210070a80cfe32644c96e8555f98ff0b2f6e5071ccb04fcd3097b100c4a3

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.