bbkdevops's picture
download
raw
11.4 kB
"""Universal context ingestion for files, folders, code, and media manifests.
This is the exact-memory layer: it does not promise infinite hidden-state
context. It stores every ingested artifact with hashes and searchable chunks so
the model can retrieve source-grounded evidence on demand.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import base64
import hashlib
import json
from pathlib import Path
import re
from typing import Iterable
import zlib
TEXT_EXTENSIONS = {
".txt", ".md", ".json", ".jsonl", ".yaml", ".yml", ".toml", ".csv", ".tsv",
".py", ".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".java", ".kt", ".swift",
".c", ".h", ".cpp", ".hpp", ".cs", ".php", ".rb", ".lua", ".sh", ".ps1",
".bat", ".cmd", ".html", ".css", ".sql", ".xml", ".ini", ".cfg",
}
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".tiff"}
AUDIO_EXTENSIONS = {".wav", ".mp3", ".m4a", ".flac", ".ogg"}
VIDEO_EXTENSIONS = {".mp4", ".mov", ".mkv", ".webm", ".avi"}
DOCUMENT_EXTENSIONS = {".pdf", ".docx", ".pptx", ".xlsx"}
TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE)
JUNK_MARKERS = ("lorem ipsum", "todo", "fixme", "???", "as an ai language model")
EXCLUDED_DIR_NAMES = {".git", "__pycache__", ".pytest_cache", ".mypy_cache", "node_modules", ".venv", "venv"}
EXCLUDED_FILE_PREFIXES = ("universal_context_chunks", "universal_context_compressed", "universal_context_manifest")
STOPWORDS = {
"the", "and", "or", "is", "are", "what", "how", "why", "this", "that", "repo",
"ข้อมูล", "อะไร", "อย่างไร", "คือ", "คืออะไร", "ใน", "ที่", "ไม่มี", "ลับ",
}
def _sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def _tokens(text: str) -> set[str]:
return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok) >= 2 and tok.lower() not in STOPWORDS}
def _kind(path: Path) -> str:
suffix = path.suffix.lower()
if suffix in TEXT_EXTENSIONS:
return "text_or_code"
if suffix in IMAGE_EXTENSIONS:
return "image"
if suffix in AUDIO_EXTENSIONS:
return "audio"
if suffix in VIDEO_EXTENSIONS:
return "video"
if suffix in DOCUMENT_EXTENSIONS:
return "document_binary"
return "binary"
@dataclass(frozen=True)
class ContextChunk:
chunk_id: int
file_id: str
rel_path: str
start_char: int
end_char: int
text: str
sha256: str
class UniversalContextLedger:
def __init__(self, root: str | Path, chunk_chars: int = 4096):
self.root = Path(root)
self.chunk_chars = max(256, int(chunk_chars))
self.manifest_path = self.root / "universal_context_manifest.json"
self.chunks_path = self.root / "universal_context_chunks.jsonl"
self.compressed_path = self.root / "universal_context_compressed.jsonl"
def ingest_paths(self, paths: Iterable[str | Path]) -> dict:
self.root.mkdir(parents=True, exist_ok=True)
files = []
chunks: list[ContextChunk] = []
seen_chunk_hashes: set[str] = set()
blocked_chunks = 0
for base in paths:
p = Path(base)
candidates = [p]
if p.is_dir():
candidates = [item for item in p.rglob("*") if item.is_file()]
for file_path in sorted(candidates):
if not file_path.exists() or not file_path.is_file():
continue
if self._excluded(file_path):
continue
payload = file_path.read_bytes()
digest = _sha256_bytes(payload)
kind = _kind(file_path)
rel = str(file_path)
file_id = digest[:24]
row = {
"file_id": file_id,
"path": rel,
"name": file_path.name,
"extension": file_path.suffix.lower(),
"kind": kind,
"bytes": len(payload),
"sha256": digest,
"exact_content_stored": kind == "text_or_code",
}
if kind == "text_or_code":
text = payload.decode("utf-8", errors="replace")
row["chars"] = len(text)
for start in range(0, len(text), self.chunk_chars):
piece = text[start : start + self.chunk_chars]
if not self._pure_chunk(piece):
blocked_chunks += 1
continue
piece_sha = _sha256_bytes(piece.encode("utf-8"))
if piece_sha in seen_chunk_hashes:
continue
seen_chunk_hashes.add(piece_sha)
chunks.append(
ContextChunk(
chunk_id=len(chunks),
file_id=file_id,
rel_path=rel,
start_char=start,
end_char=start + len(piece),
text=piece,
sha256=piece_sha,
)
)
files.append(row)
self._write_chunks(chunks)
compression = self._write_compressed_chunks(chunks)
manifest = {
"schema_version": "tinymind-universal-context-ledger-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"chunk_chars": self.chunk_chars,
"file_count": len(files),
"chunk_count": len(chunks),
"blocked_junk_chunks": blocked_chunks,
"total_bytes": sum(int(row["bytes"]) for row in files),
"text_or_code_files": sum(1 for row in files if row["kind"] == "text_or_code"),
"media_files": sum(1 for row in files if row["kind"] in {"image", "audio", "video"}),
"document_binary_files": sum(1 for row in files if row["kind"] == "document_binary"),
"files": files,
"chunks_path": str(self.chunks_path),
"compressed_chunks_path": str(self.compressed_path),
"compression": compression,
"hidden_state_tokens_stored": 0,
"purity_policy": "text/code chunks with junk markers are blocked; duplicate exact chunks are removed; media is hashed as source manifest",
"guarantee": "Exact recall is provided by hashed compressed external archive chunks, not by unbounded model hidden state.",
}
manifest["manifest_sha256"] = _sha256_bytes(
json.dumps(manifest, ensure_ascii=False, sort_keys=True).encode("utf-8")
)
self.manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return {**manifest, "manifest_path": str(self.manifest_path)}
def _excluded(self, file_path: Path) -> bool:
parts = set(file_path.parts)
if parts & EXCLUDED_DIR_NAMES:
return True
if any(file_path.name.startswith(prefix) for prefix in EXCLUDED_FILE_PREFIXES):
return True
try:
file_path.resolve().relative_to(self.root.resolve())
return True
except ValueError:
return False
def _write_chunks(self, chunks: list[ContextChunk]) -> None:
with self.chunks_path.open("w", encoding="utf-8", newline="\n") as f:
for chunk in chunks:
f.write(json.dumps(chunk.__dict__, ensure_ascii=False, sort_keys=True) + "\n")
def _pure_chunk(self, text: str) -> bool:
lowered = text.lower()
if any(marker in lowered for marker in JUNK_MARKERS):
return False
return bool(text.strip())
def _write_compressed_chunks(self, chunks: list[ContextChunk]) -> dict:
raw_bytes = 0
packed_bytes = 0
with self.compressed_path.open("w", encoding="utf-8", newline="\n") as f:
for chunk in chunks:
payload = chunk.text.encode("utf-8")
packed = zlib.compress(payload, level=9)
raw_bytes += len(payload)
packed_bytes += len(packed)
f.write(
json.dumps(
{
"chunk_id": chunk.chunk_id,
"file_id": chunk.file_id,
"rel_path": chunk.rel_path,
"start_char": chunk.start_char,
"end_char": chunk.end_char,
"sha256": chunk.sha256,
"codec": "zlib9+base64",
"payload_b64": base64.b64encode(packed).decode("ascii"),
},
ensure_ascii=False,
sort_keys=True,
)
+ "\n"
)
return {
"raw_text_bytes": raw_bytes,
"compressed_bytes": packed_bytes,
"ratio": packed_bytes / max(raw_bytes, 1),
"space_saved_percent": 100.0 * (1.0 - packed_bytes / max(raw_bytes, 1)),
"deduplicated_chunks": len(chunks),
}
def recall_compressed_chunk(self, chunk_id: int) -> dict:
for line in self.compressed_path.read_text(encoding="utf-8").splitlines():
row = json.loads(line)
if int(row["chunk_id"]) != int(chunk_id):
continue
packed = base64.b64decode(row["payload_b64"])
text = zlib.decompress(packed).decode("utf-8")
digest = _sha256_bytes(text.encode("utf-8"))
if digest != row["sha256"]:
raise ValueError(f"compressed chunk {chunk_id} hash mismatch")
return {**{k: v for k, v in row.items() if k != "payload_b64"}, "text": text}
raise KeyError(chunk_id)
def _chunks(self) -> list[dict]:
if not self.chunks_path.exists():
return []
rows: list[dict] = []
with self.chunks_path.open("r", encoding="utf-8") as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
return rows
def query(self, text: str, top_k: int = 5, min_score: float = 0.2) -> list[dict]:
q = _tokens(text)
hits = []
for chunk in self._chunks():
terms = _tokens(chunk["text"])
overlap = q & terms
score = len(overlap) / max(len(q), 1)
if score < min_score:
continue
hits.append(
{
"score": score,
"matched_terms": sorted(overlap),
"file_id": chunk["file_id"],
"path": chunk["rel_path"],
"start_char": chunk["start_char"],
"end_char": chunk["end_char"],
"chunk_sha256": chunk["sha256"],
"preview": chunk["text"][:300],
}
)
hits.sort(key=lambda row: row["score"], reverse=True)
return hits[: max(1, int(top_k))]

Xet Storage Details

Size:
11.4 kB
·
Xet hash:
971db159b532ac049491ac41e9922a79b107a3e24685ebbccd3037b351021fce

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.