bbkdevops's picture
download
raw
8.38 kB
"""HyperPure lineage weaver.
Builds a deterministic provenance graph from tiny answer fragments back to
record, skill, domain, and source hashes. This lets TinyMind trace a partial
piece of knowledge to its origin instead of treating dataset rows as anonymous
text.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
import re
SCHEMA_VERSION = "tinymind-hyper-pure-lineage-v1"
TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE)
def _sha256(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _tokens(text: str) -> set[str]:
return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok) >= 3}
def _fragments(text: str, max_fragments: int = 8) -> list[str]:
pieces = []
for raw in re.split(r"(?<=[.!?。])\s+|\n+", text):
item = raw.strip()
if len(item) >= 48:
pieces.append(item[:280])
if len(pieces) >= max_fragments:
break
if not pieces and text.strip():
pieces.append(text.strip()[:280])
return pieces
class HyperPureLineageWeaver:
def __init__(self, dataset_path: str | Path):
self.dataset_path = Path(dataset_path)
def build(self, out_dir: str | Path) -> dict:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
nodes: dict[str, dict] = {}
edges: list[dict] = []
fragment_index: list[dict] = []
records = self._load_rows()
def add_node(node_id: str, kind: str, payload: dict) -> None:
nodes.setdefault(node_id, {"id": node_id, "kind": kind, **payload})
for row in records:
record_id = row.get("id") or _sha256(json.dumps(row, ensure_ascii=False, sort_keys=True))[:24]
record_node = f"record:{record_id}"
domain_node = f"domain:{row['domain']}"
skill_node = f"skill:{row['domain']}:{row.get('skill', 'unknown')}"
source_node = f"source:{row.get('source_sha256') or _sha256(row.get('source', 'unknown'))}"
add_node(record_node, "record", {"record_id": record_id, "question": row.get("question", "")})
add_node(domain_node, "domain", {"domain": row["domain"]})
add_node(skill_node, "skill", {"domain": row["domain"], "skill": row.get("skill", "unknown")})
add_node(
source_node,
"source",
{
"source": row.get("source", ""),
"source_sha256": row.get("source_sha256", ""),
"license": row.get("license", ""),
},
)
edges.extend(
[
{"from": record_node, "to": skill_node, "type": "belongs_to_skill"},
{"from": skill_node, "to": domain_node, "type": "belongs_to_domain"},
{"from": record_node, "to": source_node, "type": "derived_from_source"},
]
)
for frag in _fragments(row.get("answer", "")):
frag_hash = _sha256(frag)
fragment_node = f"fragment:{frag_hash[:24]}"
add_node(
fragment_node,
"fragment",
{
"fragment_sha256": frag_hash,
"preview": frag[:160],
"terms": sorted(_tokens(frag))[:64],
},
)
edge = {"from": fragment_node, "to": record_node, "type": "fragment_of_record"}
edges.append(edge)
fragment_index.append(
{
"fragment_id": fragment_node,
"fragment_sha256": frag_hash,
"record_id": record_id,
"domain": row["domain"],
"skill": row.get("skill", "unknown"),
"source_sha256": row.get("source_sha256", ""),
"terms": sorted(_tokens(frag))[:64],
"lineage_path": [fragment_node, record_node, skill_node, domain_node, source_node],
}
)
graph = {
"schema_version": SCHEMA_VERSION,
"created_at": datetime.now(timezone.utc).isoformat(),
"dataset_path": str(self.dataset_path),
"node_count": len(nodes),
"edge_count": len(edges),
"fragment_count": len(fragment_index),
"nodes": list(nodes.values()),
"edges": edges,
"fragment_index": fragment_index,
"gate": self._gate(nodes, edges, fragment_index),
"world_best_claim_allowed": False,
}
graph_path = out / "hyper_pure_lineage_graph.json"
index_path = out / "hyper_pure_fragment_index.jsonl"
graph["graph_path"] = str(graph_path)
graph["fragment_index_path"] = str(index_path)
graph_path.write_text(json.dumps(graph, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
with index_path.open("w", encoding="utf-8", newline="\n") as f:
for row in fragment_index:
f.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n")
report_path = out / "hyper_pure_lineage_report.md"
graph["report_path"] = str(report_path)
report_path.write_text(self._markdown(graph), encoding="utf-8")
graph_path.write_text(json.dumps(graph, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return graph
def trace(self, query: str, graph_path: str | Path | None = None, top_k: int = 5) -> list[dict]:
graph = json.loads(Path(graph_path).read_text(encoding="utf-8")) if graph_path else self.build(self.dataset_path.parent)["fragment_index"]
fragment_index = graph["fragment_index"] if isinstance(graph, dict) else graph
q = _tokens(query)
hits = []
for row in fragment_index:
terms = set(row.get("terms", []))
overlap = q & terms
score = len(overlap) / max(len(q), 1)
if score <= 0:
continue
hits.append({**row, "score": score, "matched_terms": sorted(overlap)})
hits.sort(key=lambda row: (row["score"], len(row["matched_terms"])), reverse=True)
return hits[: max(1, int(top_k))]
def _load_rows(self) -> list[dict]:
rows = []
with self.dataset_path.open("r", encoding="utf-8") as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
return rows
def _gate(self, nodes: dict[str, dict], edges: list[dict], fragment_index: list[dict]) -> dict:
node_ids = set(nodes)
edge_ok = all(edge["from"] in node_ids and edge["to"] in node_ids for edge in edges)
lineage_ok = all(len(row.get("lineage_path", [])) == 5 and all(node in node_ids for node in row["lineage_path"]) for row in fragment_index)
source_ok = all(row.get("source_sha256") for row in fragment_index)
return {
"passed": bool(fragment_index) and edge_ok and lineage_ok and source_ok,
"edge_integrity": edge_ok,
"lineage_path_integrity": lineage_ok,
"source_hash_integrity": source_ok,
"reason": "every fragment must trace to record->skill->domain->source hash",
}
def _markdown(self, graph: dict) -> str:
domains = defaultdict(int)
for row in graph["fragment_index"]:
domains[row["domain"]] += 1
lines = [
"# TinyMind HyperPure Lineage Graph",
"",
f"- Fragments: {graph['fragment_count']}",
f"- Nodes: {graph['node_count']}",
f"- Edges: {graph['edge_count']}",
f"- Gate passed: {graph['gate']['passed']}",
"- World-best claim: false",
"",
"## Fragment Coverage",
"",
]
for domain, count in sorted(domains.items()):
lines.append(f"- {domain}: {count}")
return "\n".join(lines) + "\n"
def build_hyper_pure_lineage(dataset_path: str | Path, out_dir: str | Path) -> dict:
return HyperPureLineageWeaver(dataset_path).build(out_dir)

Xet Storage Details

Size:
8.38 kB
·
Xet hash:
1278d79de5f72f66b1d96597dd873971505789e9160e15a1fcf5eceb8242f7a7

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.