Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /lineage_weaver.py
| """HyperPure lineage weaver. | |
| Builds a deterministic provenance graph from tiny answer fragments back to | |
| record, skill, domain, and source hashes. This lets TinyMind trace a partial | |
| piece of knowledge to its origin instead of treating dataset rows as anonymous | |
| text. | |
| """ | |
| from __future__ import annotations | |
| from collections import defaultdict | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| SCHEMA_VERSION = "tinymind-hyper-pure-lineage-v1" | |
| TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE) | |
| def _sha256(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _tokens(text: str) -> set[str]: | |
| return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok) >= 3} | |
| def _fragments(text: str, max_fragments: int = 8) -> list[str]: | |
| pieces = [] | |
| for raw in re.split(r"(?<=[.!?。])\s+|\n+", text): | |
| item = raw.strip() | |
| if len(item) >= 48: | |
| pieces.append(item[:280]) | |
| if len(pieces) >= max_fragments: | |
| break | |
| if not pieces and text.strip(): | |
| pieces.append(text.strip()[:280]) | |
| return pieces | |
| class HyperPureLineageWeaver: | |
| def __init__(self, dataset_path: str | Path): | |
| self.dataset_path = Path(dataset_path) | |
| def build(self, out_dir: str | Path) -> dict: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| nodes: dict[str, dict] = {} | |
| edges: list[dict] = [] | |
| fragment_index: list[dict] = [] | |
| records = self._load_rows() | |
| def add_node(node_id: str, kind: str, payload: dict) -> None: | |
| nodes.setdefault(node_id, {"id": node_id, "kind": kind, **payload}) | |
| for row in records: | |
| record_id = row.get("id") or _sha256(json.dumps(row, ensure_ascii=False, sort_keys=True))[:24] | |
| record_node = f"record:{record_id}" | |
| domain_node = f"domain:{row['domain']}" | |
| skill_node = f"skill:{row['domain']}:{row.get('skill', 'unknown')}" | |
| source_node = f"source:{row.get('source_sha256') or _sha256(row.get('source', 'unknown'))}" | |
| add_node(record_node, "record", {"record_id": record_id, "question": row.get("question", "")}) | |
| add_node(domain_node, "domain", {"domain": row["domain"]}) | |
| add_node(skill_node, "skill", {"domain": row["domain"], "skill": row.get("skill", "unknown")}) | |
| add_node( | |
| source_node, | |
| "source", | |
| { | |
| "source": row.get("source", ""), | |
| "source_sha256": row.get("source_sha256", ""), | |
| "license": row.get("license", ""), | |
| }, | |
| ) | |
| edges.extend( | |
| [ | |
| {"from": record_node, "to": skill_node, "type": "belongs_to_skill"}, | |
| {"from": skill_node, "to": domain_node, "type": "belongs_to_domain"}, | |
| {"from": record_node, "to": source_node, "type": "derived_from_source"}, | |
| ] | |
| ) | |
| for frag in _fragments(row.get("answer", "")): | |
| frag_hash = _sha256(frag) | |
| fragment_node = f"fragment:{frag_hash[:24]}" | |
| add_node( | |
| fragment_node, | |
| "fragment", | |
| { | |
| "fragment_sha256": frag_hash, | |
| "preview": frag[:160], | |
| "terms": sorted(_tokens(frag))[:64], | |
| }, | |
| ) | |
| edge = {"from": fragment_node, "to": record_node, "type": "fragment_of_record"} | |
| edges.append(edge) | |
| fragment_index.append( | |
| { | |
| "fragment_id": fragment_node, | |
| "fragment_sha256": frag_hash, | |
| "record_id": record_id, | |
| "domain": row["domain"], | |
| "skill": row.get("skill", "unknown"), | |
| "source_sha256": row.get("source_sha256", ""), | |
| "terms": sorted(_tokens(frag))[:64], | |
| "lineage_path": [fragment_node, record_node, skill_node, domain_node, source_node], | |
| } | |
| ) | |
| graph = { | |
| "schema_version": SCHEMA_VERSION, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "dataset_path": str(self.dataset_path), | |
| "node_count": len(nodes), | |
| "edge_count": len(edges), | |
| "fragment_count": len(fragment_index), | |
| "nodes": list(nodes.values()), | |
| "edges": edges, | |
| "fragment_index": fragment_index, | |
| "gate": self._gate(nodes, edges, fragment_index), | |
| "world_best_claim_allowed": False, | |
| } | |
| graph_path = out / "hyper_pure_lineage_graph.json" | |
| index_path = out / "hyper_pure_fragment_index.jsonl" | |
| graph["graph_path"] = str(graph_path) | |
| graph["fragment_index_path"] = str(index_path) | |
| graph_path.write_text(json.dumps(graph, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| with index_path.open("w", encoding="utf-8", newline="\n") as f: | |
| for row in fragment_index: | |
| f.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n") | |
| report_path = out / "hyper_pure_lineage_report.md" | |
| graph["report_path"] = str(report_path) | |
| report_path.write_text(self._markdown(graph), encoding="utf-8") | |
| graph_path.write_text(json.dumps(graph, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return graph | |
| def trace(self, query: str, graph_path: str | Path | None = None, top_k: int = 5) -> list[dict]: | |
| graph = json.loads(Path(graph_path).read_text(encoding="utf-8")) if graph_path else self.build(self.dataset_path.parent)["fragment_index"] | |
| fragment_index = graph["fragment_index"] if isinstance(graph, dict) else graph | |
| q = _tokens(query) | |
| hits = [] | |
| for row in fragment_index: | |
| terms = set(row.get("terms", [])) | |
| overlap = q & terms | |
| score = len(overlap) / max(len(q), 1) | |
| if score <= 0: | |
| continue | |
| hits.append({**row, "score": score, "matched_terms": sorted(overlap)}) | |
| hits.sort(key=lambda row: (row["score"], len(row["matched_terms"])), reverse=True) | |
| return hits[: max(1, int(top_k))] | |
| def _load_rows(self) -> list[dict]: | |
| rows = [] | |
| with self.dataset_path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| if line.strip(): | |
| rows.append(json.loads(line)) | |
| return rows | |
| def _gate(self, nodes: dict[str, dict], edges: list[dict], fragment_index: list[dict]) -> dict: | |
| node_ids = set(nodes) | |
| edge_ok = all(edge["from"] in node_ids and edge["to"] in node_ids for edge in edges) | |
| lineage_ok = all(len(row.get("lineage_path", [])) == 5 and all(node in node_ids for node in row["lineage_path"]) for row in fragment_index) | |
| source_ok = all(row.get("source_sha256") for row in fragment_index) | |
| return { | |
| "passed": bool(fragment_index) and edge_ok and lineage_ok and source_ok, | |
| "edge_integrity": edge_ok, | |
| "lineage_path_integrity": lineage_ok, | |
| "source_hash_integrity": source_ok, | |
| "reason": "every fragment must trace to record->skill->domain->source hash", | |
| } | |
| def _markdown(self, graph: dict) -> str: | |
| domains = defaultdict(int) | |
| for row in graph["fragment_index"]: | |
| domains[row["domain"]] += 1 | |
| lines = [ | |
| "# TinyMind HyperPure Lineage Graph", | |
| "", | |
| f"- Fragments: {graph['fragment_count']}", | |
| f"- Nodes: {graph['node_count']}", | |
| f"- Edges: {graph['edge_count']}", | |
| f"- Gate passed: {graph['gate']['passed']}", | |
| "- World-best claim: false", | |
| "", | |
| "## Fragment Coverage", | |
| "", | |
| ] | |
| for domain, count in sorted(domains.items()): | |
| lines.append(f"- {domain}: {count}") | |
| return "\n".join(lines) + "\n" | |
| def build_hyper_pure_lineage(dataset_path: str | Path, out_dir: str | Path) -> dict: | |
| return HyperPureLineageWeaver(dataset_path).build(out_dir) | |
Xet Storage Details
- Size:
- 8.38 kB
- Xet hash:
- 1278d79de5f72f66b1d96597dd873971505789e9160e15a1fcf5eceb8242f7a7
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.