Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /distill /scripts /dimensionforge_10000.py
| #!/usr/bin/env python | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| from itertools import product | |
| from pathlib import Path | |
| from typing import Any | |
| ROOT = Path(r"D:\ad\tinymind\data\distill") | |
| SOURCE = ROOT / "jsonl" / "apexdistill_gold.jsonl" | |
| REGISTRY = ROOT / "manifests" / "dimension_registry_10000.json" | |
| VECTORS = ROOT / "jsonl" / "dimension_vectors.jsonl" | |
| ENRICHED = ROOT / "jsonl" / "apexdistill_gold_10000d.jsonl" | |
| AUDIT = ROOT / "manifests" / "dimension_audit.json" | |
| DOMAINS = [ | |
| "tool_use", | |
| "data_engineering", | |
| "reverse_engineering", | |
| "tool_calling", | |
| "thai_data", | |
| "systems_performance", | |
| "windows_reliability", | |
| "agent_architecture", | |
| "software_engineering", | |
| "evaluation", | |
| ] | |
| LENSES = ["architect", "operator", "critic", "security", "teacher", "evaluator", "base"] | |
| PRESSURES = ["clean", "ambiguous", "overreach", "unsafe_pressure", "resource_limited", "base"] | |
| AXES = ["precision", "adversarial", "thai", "tool_schema", "verification", "compression", "uncertainty", "rollback", "edge_cases", "curriculum", "base"] | |
| TOOL_CLASSES = ["filesystem", "shell", "git", "dataset", "web", "confirm", "none"] | |
| RISK_CLASSES = ["low", "medium", "high", "license", "privacy", "admin", "destructive", "unknown"] | |
| FAILURE_MODES = ["hallucination", "bad_schema", "unsafe_action", "missing_tool", "bad_path", "permission_denied", "license_violation", "overclaim", "underverify", "none"] | |
| EVIDENCE_TYPES = ["command_output", "official_doc", "manifest", "hash", "schema", "event_log", "source_registry", "static_analysis", "none"] | |
| VERIFICATION_TYPES = ["schema_check", "hash_check", "read_only_command", "unit_test", "audit_manifest", "official_source", "manual_review", "none"] | |
| OUTPUT_FORMS = ["jsonl", "markdown", "powershell", "tool_call", "report", "rubric", "plan", "summary"] | |
| DEPTH_LEVELS = [f"d{i}" for i in range(1, 21)] | |
| def stable_id(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] | |
| def make_dimension(index: int, category: str, values: dict[str, str]) -> dict[str, Any]: | |
| key = "|".join([category] + [f"{k}={v}" for k, v in sorted(values.items())]) | |
| return { | |
| "index": index, | |
| "id": f"dim_{index:05d}_{stable_id(key)}", | |
| "category": category, | |
| "values": values, | |
| "description": key, | |
| } | |
| def build_registry() -> list[dict[str, Any]]: | |
| dims: list[dict[str, Any]] = [] | |
| def add(category: str, values: dict[str, str]) -> None: | |
| if len(dims) < 10_000: | |
| dims.append(make_dimension(len(dims), category, values)) | |
| for domain, axis, lens, pressure in product(DOMAINS, AXES, LENSES, PRESSURES): | |
| add("core_context", {"domain": domain, "axis": axis, "lens": lens, "pressure": pressure}) | |
| for domain, tool, risk, failure in product(DOMAINS, TOOL_CLASSES, RISK_CLASSES, FAILURE_MODES): | |
| add("tool_risk_failure", {"domain": domain, "tool": tool, "risk": risk, "failure": failure}) | |
| for evidence, verify, output, depth in product(EVIDENCE_TYPES, VERIFICATION_TYPES, OUTPUT_FORMS, DEPTH_LEVELS): | |
| add("evidence_verification_output", {"evidence": evidence, "verification": verify, "output": output, "depth": depth}) | |
| for domain, evidence, verify in product(DOMAINS, EVIDENCE_TYPES, VERIFICATION_TYPES): | |
| add("domain_evidence_verification", {"domain": domain, "evidence": evidence, "verification": verify}) | |
| if len(dims) != 10_000: | |
| raise RuntimeError(f"Expected 10000 dimensions, got {len(dims)}") | |
| return dims | |
| def infer_tool_classes(item: dict[str, Any]) -> set[str]: | |
| text = json.dumps(item, ensure_ascii=False).lower() | |
| found = set() | |
| mapping = { | |
| "filesystem": ["filesystem.", "file", "path"], | |
| "shell": ["powershell", "shell.", "powercfg", "get-ciminstance"], | |
| "git": ["git."], | |
| "dataset": ["jsonl", "dataset", "audit_jsonl"], | |
| "web": ["web.", "official_doc", "url"], | |
| "confirm": ["user.confirm", "confirmation", "approval"], | |
| } | |
| for key, needles in mapping.items(): | |
| if any(needle in text for needle in needles): | |
| found.add(key) | |
| return found or {"none"} | |
| def infer_values(item: dict[str, Any]) -> dict[str, set[str]]: | |
| inputs = item.get("inputs", {}) | |
| quality = item.get("quality", {}) | |
| synthesis = item.get("synthesis", {}) | |
| text = json.dumps(item, ensure_ascii=False).lower() | |
| risk = str(inputs.get("risk", item.get("risk", "low"))).lower() | |
| if "privacy" in text: | |
| risk = "privacy" | |
| elif "license" in text: | |
| risk = "license" | |
| elif "admin" in text: | |
| risk = "admin" | |
| elif risk not in RISK_CLASSES: | |
| risk = "unknown" | |
| failures = set() | |
| for failure in FAILURE_MODES: | |
| if failure != "none" and failure.replace("_", " ") in text: | |
| failures.add(failure) | |
| if not failures: | |
| failures.add("none") | |
| evidence = set() | |
| for ev in EVIDENCE_TYPES: | |
| if ev != "none" and ev.replace("_", " ") in text: | |
| evidence.add(ev) | |
| if "manifest" in text: | |
| evidence.add("manifest") | |
| if "schema" in text: | |
| evidence.add("schema") | |
| if not evidence: | |
| evidence.add("none") | |
| verification = set() | |
| for vt in VERIFICATION_TYPES: | |
| if vt != "none" and vt.replace("_", " ") in text: | |
| verification.add(vt) | |
| if item.get("verification", {}).get("passes") is not None: | |
| verification.add("schema_check") | |
| if not verification: | |
| verification.add("none") | |
| output = set() | |
| if synthesis.get("recommended_tool_calls"): | |
| output.add("tool_call") | |
| for form in OUTPUT_FORMS: | |
| if form in text: | |
| output.add(form) | |
| if not output: | |
| output.add("summary") | |
| score = float(quality.get("score", 0.0)) | |
| depth_idx = min(max(int(score * 20), 1), 20) | |
| return { | |
| "domain": {item.get("domain", "evaluation")}, | |
| "axis": {inputs.get("distill_axis", "base")}, | |
| "lens": {inputs.get("lens", "base")}, | |
| "pressure": {inputs.get("pressure", "base")}, | |
| "tool": infer_tool_classes(item), | |
| "risk": {risk}, | |
| "failure": failures, | |
| "evidence": evidence, | |
| "verification": verification, | |
| "output": output, | |
| "depth": {f"d{depth_idx}"}, | |
| } | |
| def encode(item: dict[str, Any], dims: list[dict[str, Any]]) -> dict[str, float]: | |
| values = infer_values(item) | |
| sparse: dict[str, float] = {} | |
| for dim in dims: | |
| dim_values = dim["values"] | |
| matched = True | |
| for key, val in dim_values.items(): | |
| if val not in values.get(key, set()): | |
| matched = False | |
| break | |
| if matched: | |
| sparse[str(dim["index"])] = 1.0 | |
| # Guarantee a non-empty vector by anchoring domain base dimensions. | |
| if not sparse: | |
| for dim in dims: | |
| if dim["category"] == "core_context" and dim["values"].get("domain") == item.get("domain") and dim["values"].get("axis") == "base": | |
| sparse[str(dim["index"])] = 1.0 | |
| break | |
| return sparse | |
| def main() -> int: | |
| dims = build_registry() | |
| REGISTRY.write_text(json.dumps({"dimensions": dims}, indent=2, ensure_ascii=False), encoding="utf-8") | |
| records = [json.loads(line) for line in SOURCE.read_text(encoding="utf-8").splitlines() if line.strip()] | |
| active_counts = [] | |
| used_dims = set() | |
| with VECTORS.open("w", encoding="utf-8") as vf, ENRICHED.open("w", encoding="utf-8") as ef: | |
| for item in records: | |
| vector = encode(item, dims) | |
| active_counts.append(len(vector)) | |
| used_dims.update(vector.keys()) | |
| vector_record = { | |
| "id": item["id"], | |
| "dimension_count": 10_000, | |
| "active_dimensions": vector, | |
| } | |
| enriched = { | |
| **item, | |
| "dimension_vector": { | |
| "dimension_count": 10_000, | |
| "active_dimensions": vector, | |
| }, | |
| } | |
| vf.write(json.dumps(vector_record, ensure_ascii=False) + "\n") | |
| ef.write(json.dumps(enriched, ensure_ascii=False) + "\n") | |
| audit = { | |
| "dimension_count": len(dims), | |
| "records": len(records), | |
| "used_dimension_count": len(used_dims), | |
| "avg_active_dimensions": round(sum(active_counts) / max(len(active_counts), 1), 3), | |
| "min_active_dimensions": min(active_counts) if active_counts else 0, | |
| "max_active_dimensions": max(active_counts) if active_counts else 0, | |
| "registry": str(REGISTRY), | |
| "vectors": str(VECTORS), | |
| "enriched": str(ENRICHED), | |
| "registry_sha256": hashlib.sha256(REGISTRY.read_bytes()).hexdigest(), | |
| "vectors_sha256": hashlib.sha256(VECTORS.read_bytes()).hexdigest(), | |
| "enriched_sha256": hashlib.sha256(ENRICHED.read_bytes()).hexdigest(), | |
| } | |
| AUDIT.write_text(json.dumps(audit, indent=2, ensure_ascii=False), encoding="utf-8") | |
| print(json.dumps(audit, indent=2, ensure_ascii=False)) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
Xet Storage Details
- Size:
- 9.09 kB
- Xet hash:
- 6a2d46440c52502d3353b94034c7c72f06450bde057bd6fa76ae6dadf84d6a8c
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.