Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /universal_context.py
| """Universal context ingestion for files, folders, code, and media manifests. | |
| This is the exact-memory layer: it does not promise infinite hidden-state | |
| context. It stores every ingested artifact with hashes and searchable chunks so | |
| the model can retrieve source-grounded evidence on demand. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import base64 | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| from typing import Iterable | |
| import zlib | |
| TEXT_EXTENSIONS = { | |
| ".txt", ".md", ".json", ".jsonl", ".yaml", ".yml", ".toml", ".csv", ".tsv", | |
| ".py", ".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".java", ".kt", ".swift", | |
| ".c", ".h", ".cpp", ".hpp", ".cs", ".php", ".rb", ".lua", ".sh", ".ps1", | |
| ".bat", ".cmd", ".html", ".css", ".sql", ".xml", ".ini", ".cfg", | |
| } | |
| IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".tiff"} | |
| AUDIO_EXTENSIONS = {".wav", ".mp3", ".m4a", ".flac", ".ogg"} | |
| VIDEO_EXTENSIONS = {".mp4", ".mov", ".mkv", ".webm", ".avi"} | |
| DOCUMENT_EXTENSIONS = {".pdf", ".docx", ".pptx", ".xlsx"} | |
| TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE) | |
| JUNK_MARKERS = ("lorem ipsum", "todo", "fixme", "???", "as an ai language model") | |
| EXCLUDED_DIR_NAMES = {".git", "__pycache__", ".pytest_cache", ".mypy_cache", "node_modules", ".venv", "venv"} | |
| EXCLUDED_FILE_PREFIXES = ("universal_context_chunks", "universal_context_compressed", "universal_context_manifest") | |
| STOPWORDS = { | |
| "the", "and", "or", "is", "are", "what", "how", "why", "this", "that", "repo", | |
| "ข้อมูล", "อะไร", "อย่างไร", "คือ", "คืออะไร", "ใน", "ที่", "ไม่มี", "ลับ", | |
| } | |
| def _sha256_bytes(payload: bytes) -> str: | |
| return hashlib.sha256(payload).hexdigest() | |
| def _tokens(text: str) -> set[str]: | |
| return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok) >= 2 and tok.lower() not in STOPWORDS} | |
| def _kind(path: Path) -> str: | |
| suffix = path.suffix.lower() | |
| if suffix in TEXT_EXTENSIONS: | |
| return "text_or_code" | |
| if suffix in IMAGE_EXTENSIONS: | |
| return "image" | |
| if suffix in AUDIO_EXTENSIONS: | |
| return "audio" | |
| if suffix in VIDEO_EXTENSIONS: | |
| return "video" | |
| if suffix in DOCUMENT_EXTENSIONS: | |
| return "document_binary" | |
| return "binary" | |
| class ContextChunk: | |
| chunk_id: int | |
| file_id: str | |
| rel_path: str | |
| start_char: int | |
| end_char: int | |
| text: str | |
| sha256: str | |
| class UniversalContextLedger: | |
| def __init__(self, root: str | Path, chunk_chars: int = 4096): | |
| self.root = Path(root) | |
| self.chunk_chars = max(256, int(chunk_chars)) | |
| self.manifest_path = self.root / "universal_context_manifest.json" | |
| self.chunks_path = self.root / "universal_context_chunks.jsonl" | |
| self.compressed_path = self.root / "universal_context_compressed.jsonl" | |
| def ingest_paths(self, paths: Iterable[str | Path]) -> dict: | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| files = [] | |
| chunks: list[ContextChunk] = [] | |
| seen_chunk_hashes: set[str] = set() | |
| blocked_chunks = 0 | |
| for base in paths: | |
| p = Path(base) | |
| candidates = [p] | |
| if p.is_dir(): | |
| candidates = [item for item in p.rglob("*") if item.is_file()] | |
| for file_path in sorted(candidates): | |
| if not file_path.exists() or not file_path.is_file(): | |
| continue | |
| if self._excluded(file_path): | |
| continue | |
| payload = file_path.read_bytes() | |
| digest = _sha256_bytes(payload) | |
| kind = _kind(file_path) | |
| rel = str(file_path) | |
| file_id = digest[:24] | |
| row = { | |
| "file_id": file_id, | |
| "path": rel, | |
| "name": file_path.name, | |
| "extension": file_path.suffix.lower(), | |
| "kind": kind, | |
| "bytes": len(payload), | |
| "sha256": digest, | |
| "exact_content_stored": kind == "text_or_code", | |
| } | |
| if kind == "text_or_code": | |
| text = payload.decode("utf-8", errors="replace") | |
| row["chars"] = len(text) | |
| for start in range(0, len(text), self.chunk_chars): | |
| piece = text[start : start + self.chunk_chars] | |
| if not self._pure_chunk(piece): | |
| blocked_chunks += 1 | |
| continue | |
| piece_sha = _sha256_bytes(piece.encode("utf-8")) | |
| if piece_sha in seen_chunk_hashes: | |
| continue | |
| seen_chunk_hashes.add(piece_sha) | |
| chunks.append( | |
| ContextChunk( | |
| chunk_id=len(chunks), | |
| file_id=file_id, | |
| rel_path=rel, | |
| start_char=start, | |
| end_char=start + len(piece), | |
| text=piece, | |
| sha256=piece_sha, | |
| ) | |
| ) | |
| files.append(row) | |
| self._write_chunks(chunks) | |
| compression = self._write_compressed_chunks(chunks) | |
| manifest = { | |
| "schema_version": "tinymind-universal-context-ledger-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "chunk_chars": self.chunk_chars, | |
| "file_count": len(files), | |
| "chunk_count": len(chunks), | |
| "blocked_junk_chunks": blocked_chunks, | |
| "total_bytes": sum(int(row["bytes"]) for row in files), | |
| "text_or_code_files": sum(1 for row in files if row["kind"] == "text_or_code"), | |
| "media_files": sum(1 for row in files if row["kind"] in {"image", "audio", "video"}), | |
| "document_binary_files": sum(1 for row in files if row["kind"] == "document_binary"), | |
| "files": files, | |
| "chunks_path": str(self.chunks_path), | |
| "compressed_chunks_path": str(self.compressed_path), | |
| "compression": compression, | |
| "hidden_state_tokens_stored": 0, | |
| "purity_policy": "text/code chunks with junk markers are blocked; duplicate exact chunks are removed; media is hashed as source manifest", | |
| "guarantee": "Exact recall is provided by hashed compressed external archive chunks, not by unbounded model hidden state.", | |
| } | |
| manifest["manifest_sha256"] = _sha256_bytes( | |
| json.dumps(manifest, ensure_ascii=False, sort_keys=True).encode("utf-8") | |
| ) | |
| self.manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return {**manifest, "manifest_path": str(self.manifest_path)} | |
| def _excluded(self, file_path: Path) -> bool: | |
| parts = set(file_path.parts) | |
| if parts & EXCLUDED_DIR_NAMES: | |
| return True | |
| if any(file_path.name.startswith(prefix) for prefix in EXCLUDED_FILE_PREFIXES): | |
| return True | |
| try: | |
| file_path.resolve().relative_to(self.root.resolve()) | |
| return True | |
| except ValueError: | |
| return False | |
| def _write_chunks(self, chunks: list[ContextChunk]) -> None: | |
| with self.chunks_path.open("w", encoding="utf-8", newline="\n") as f: | |
| for chunk in chunks: | |
| f.write(json.dumps(chunk.__dict__, ensure_ascii=False, sort_keys=True) + "\n") | |
| def _pure_chunk(self, text: str) -> bool: | |
| lowered = text.lower() | |
| if any(marker in lowered for marker in JUNK_MARKERS): | |
| return False | |
| return bool(text.strip()) | |
| def _write_compressed_chunks(self, chunks: list[ContextChunk]) -> dict: | |
| raw_bytes = 0 | |
| packed_bytes = 0 | |
| with self.compressed_path.open("w", encoding="utf-8", newline="\n") as f: | |
| for chunk in chunks: | |
| payload = chunk.text.encode("utf-8") | |
| packed = zlib.compress(payload, level=9) | |
| raw_bytes += len(payload) | |
| packed_bytes += len(packed) | |
| f.write( | |
| json.dumps( | |
| { | |
| "chunk_id": chunk.chunk_id, | |
| "file_id": chunk.file_id, | |
| "rel_path": chunk.rel_path, | |
| "start_char": chunk.start_char, | |
| "end_char": chunk.end_char, | |
| "sha256": chunk.sha256, | |
| "codec": "zlib9+base64", | |
| "payload_b64": base64.b64encode(packed).decode("ascii"), | |
| }, | |
| ensure_ascii=False, | |
| sort_keys=True, | |
| ) | |
| + "\n" | |
| ) | |
| return { | |
| "raw_text_bytes": raw_bytes, | |
| "compressed_bytes": packed_bytes, | |
| "ratio": packed_bytes / max(raw_bytes, 1), | |
| "space_saved_percent": 100.0 * (1.0 - packed_bytes / max(raw_bytes, 1)), | |
| "deduplicated_chunks": len(chunks), | |
| } | |
| def recall_compressed_chunk(self, chunk_id: int) -> dict: | |
| for line in self.compressed_path.read_text(encoding="utf-8").splitlines(): | |
| row = json.loads(line) | |
| if int(row["chunk_id"]) != int(chunk_id): | |
| continue | |
| packed = base64.b64decode(row["payload_b64"]) | |
| text = zlib.decompress(packed).decode("utf-8") | |
| digest = _sha256_bytes(text.encode("utf-8")) | |
| if digest != row["sha256"]: | |
| raise ValueError(f"compressed chunk {chunk_id} hash mismatch") | |
| return {**{k: v for k, v in row.items() if k != "payload_b64"}, "text": text} | |
| raise KeyError(chunk_id) | |
| def _chunks(self) -> list[dict]: | |
| if not self.chunks_path.exists(): | |
| return [] | |
| rows: list[dict] = [] | |
| with self.chunks_path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| if line.strip(): | |
| rows.append(json.loads(line)) | |
| return rows | |
| def query(self, text: str, top_k: int = 5, min_score: float = 0.2) -> list[dict]: | |
| q = _tokens(text) | |
| hits = [] | |
| for chunk in self._chunks(): | |
| terms = _tokens(chunk["text"]) | |
| overlap = q & terms | |
| score = len(overlap) / max(len(q), 1) | |
| if score < min_score: | |
| continue | |
| hits.append( | |
| { | |
| "score": score, | |
| "matched_terms": sorted(overlap), | |
| "file_id": chunk["file_id"], | |
| "path": chunk["rel_path"], | |
| "start_char": chunk["start_char"], | |
| "end_char": chunk["end_char"], | |
| "chunk_sha256": chunk["sha256"], | |
| "preview": chunk["text"][:300], | |
| } | |
| ) | |
| hits.sort(key=lambda row: row["score"], reverse=True) | |
| return hits[: max(1, int(top_k))] | |
Xet Storage Details
- Size:
- 11.4 kB
- Xet hash:
- 971db159b532ac049491ac41e9922a79b107a3e24685ebbccd3037b351021fce
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.