bbkdevops's picture
download
raw
12.9 kB
"""DeepSharp model analyzer for TinyMind.
This module is the model-level microscope: it combines architecture topology,
purity evidence, self-evolution lessons, CNN structure, and external evidence
boundaries into one report. It is intentionally strict: local sharpness can be
ready while frontier/Mythos superiority remains blocked.
"""
from __future__ import annotations
from dataclasses import asdict
from datetime import datetime, timezone
import json
from pathlib import Path
from typing import Any
from model.config import OmegaConfig, purefield_config
DEFAULT_INPUTS = {
"mythos": "reports/mythos_purity_governor/mythos_purity_governor_report.json",
"cnn": "reports/pure_lattice_cnn/pure_lattice_cnn_report.json",
"evo": "reports/evo_learning_mythos/evo_learning_report.json",
"essence": "reports/knowledge_essence_mythos/knowledge_essence_manifest.json",
"command": "reports/command_intensity_governor/command_intensity_governor_report.json",
"ultra_deep": "reports/ultra_deep_sharp_refiner/ultra_deep_sharp_refiner_report.json",
}
def _load(path: str | Path | None) -> dict[str, Any]:
if not path:
return {}
p = Path(path)
return json.loads(p.read_text(encoding="utf-8")) if p.exists() else {}
def _score(value: Any) -> float:
try:
return max(0.0, min(100.0, float(value)))
except (TypeError, ValueError):
return 0.0
def _bool_score(value: bool) -> float:
return 100.0 if value else 0.0
def _mean(values: list[float]) -> float:
return sum(values) / max(len(values), 1)
class DeepSharpModelAnalyzer:
"""Analyze TinyMind's depth, density, sharpness, and claim readiness."""
def __init__(self, cfg: OmegaConfig):
self.cfg = cfg
def analyze(self, inputs: dict[str, str] | None = None) -> dict[str, Any]:
inputs = inputs or dict(DEFAULT_INPUTS)
reports = {name: _load(path) for name, path in inputs.items()}
mythos = reports.get("mythos", {})
cnn = reports.get("cnn", {})
evo = reports.get("evo", {})
essence = reports.get("essence", {})
command = reports.get("command", {})
ultra_deep = reports.get("ultra_deep", {})
architecture = self._architecture_summary(cnn)
structural_depth_score = self._structural_depth_score(architecture)
purity_density_score = self._purity_density_score(mythos, essence)
evidence_sharpness_score = self._evidence_sharpness_score(mythos)
self_evolution_score = self._self_evolution_score(evo)
cnn_focus_score = self._cnn_focus_score(cnn)
command_obedience_score = self._command_obedience_score(command)
ultra_deep_score = self._ultra_deep_score(ultra_deep)
external_score = _bool_score(mythos.get("claim_gate", {}).get("external_frontier_evidence_ready") is True)
local_scores = [
structural_depth_score,
purity_density_score,
evidence_sharpness_score,
self_evolution_score,
cnn_focus_score,
command_obedience_score,
ultra_deep_score,
]
local_ready = min(local_scores) >= 70.0 and purity_density_score >= 90.0
total = _mean(local_scores + [external_score])
blockers = self._blockers(mythos, local_scores, external_score)
report = {
"schema_version": "tinymind-deep-sharp-model-analyzer-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"inputs": inputs,
"architecture": architecture,
"scores": {
"structural_depth_score": structural_depth_score,
"purity_density_score": purity_density_score,
"evidence_sharpness_score": evidence_sharpness_score,
"self_evolution_score": self_evolution_score,
"cnn_focus_score": cnn_focus_score,
"command_obedience_score": command_obedience_score,
"ultra_deep_sharpness_score": ultra_deep_score,
"external_frontier_evidence_score": external_score,
"deep_sharp_local_score": _mean(local_scores),
"deep_sharp_total_score": total,
},
"blockers": blockers,
"claim_gate": {
"deep_sharp_local_ready": local_ready,
"frontier_or_mythos_superiority_claim_allowed": False,
"reason": "Local analysis can guide sharpening, but superiority requires matched external official evidence.",
},
"upgrade_plan": self._upgrade_plan(blockers),
}
return report
def _architecture_summary(self, cnn: dict[str, Any]) -> dict[str, Any]:
cfg = self.cfg
pattern = cfg.layer_pattern * (cfg.n_layers // max(len(cfg.layer_pattern), 1) + 1)
pattern = pattern[: cfg.n_layers]
return {
"mode": cfg.architecture_mode,
"dim": cfg.dim,
"layers": cfg.n_layers,
"heads": cfg.n_heads,
"head_dim": cfg.head_dim,
"layer_pattern": cfg.layer_pattern,
"ssm_layers": pattern.count("S"),
"attention_layers": pattern.count("A"),
"purefield_layers": cfg.n_layers if cfg.architecture_mode == "purefield" else pattern.count("P"),
"memory_slots": cfg.memory_slots,
"memory_ranks": cfg.memory_ranks,
"local_window": cfg.local_window,
"timescale_count": cfg.timescale_count,
"low_rank": cfg.low_rank,
"cnn_core_enabled": bool(getattr(cfg, "cnn_core_enabled", False)),
"cnn_receptive_field": int(cnn.get("config", {}).get("receptive_field", 0) or 0),
"cnn_parameter_count": int(cnn.get("parameter_count", 0) or 0),
"config_snapshot": asdict(cfg),
}
@staticmethod
def _structural_depth_score(arch: dict[str, Any]) -> float:
layer_score = min(100.0, arch["layers"] / 40.0 * 100.0)
dim_score = min(100.0, arch["dim"] / 5120.0 * 100.0)
memory_score = min(100.0, (arch["memory_slots"] * arch["timescale_count"]) / 256.0 * 100.0)
rank_score = min(100.0, arch["low_rank"] / 96.0 * 100.0)
cnn_score = 100.0 if arch["cnn_core_enabled"] else 65.0
purefield_bonus = 100.0 if arch["mode"] == "purefield" else 80.0
return _mean([layer_score, dim_score, memory_score, rank_score, cnn_score, purefield_bonus])
@staticmethod
def _purity_density_score(mythos: dict[str, Any], essence: dict[str, Any]) -> float:
mythos_score = _score(mythos.get("scores", {}).get("purity_intensity_score"))
kept = int(essence.get("kept_records", 0) or 0)
rejected = int(essence.get("rejected_records", 0) or 0)
essence_volume = min(100.0, kept / 10_000.0 * 100.0)
purification = 100.0 if rejected > 0 and essence.get("claim_gate", {}).get("raw_memory_replay_allowed") is False else 40.0
return _mean([mythos_score, essence_volume, purification])
@staticmethod
def _evidence_sharpness_score(mythos: dict[str, Any]) -> float:
dimensions = mythos.get("dimensions", [])
if not dimensions:
return 0.0
scored = [_score(row.get("score")) for row in dimensions if row.get("axis") != "external_frontier_evidence"]
passed = [1.0 if row.get("passed") is True else 0.0 for row in dimensions if row.get("axis") != "external_frontier_evidence"]
return _mean([_mean(scored), 100.0 * _mean(passed)])
@staticmethod
def _self_evolution_score(evo: dict[str, Any]) -> float:
promoted = int(evo.get("promoted_count", 0) or 0)
blocked = int(evo.get("blocked_count", 0) or 0)
return _mean(
[
min(100.0, promoted / 7.0 * 100.0),
100.0 if blocked > 0 else 50.0,
_bool_score(evo.get("claim_gate", {}).get("self_learning_real") is True),
]
)
@staticmethod
def _cnn_focus_score(cnn: dict[str, Any]) -> float:
gate = cnn.get("claim_gate", {})
receptive = float(cnn.get("config", {}).get("receptive_field", 0) or 0)
return _mean(
[
min(100.0, receptive / 43.0 * 100.0),
_bool_score(gate.get("cnn_core_ready") is True),
_bool_score(gate.get("integrated_into_omega_model") is True),
]
)
@staticmethod
def _command_obedience_score(command: dict[str, Any]) -> float:
score = _score(command.get("scores", {}).get("overall"))
gate = command.get("claim_gate", {})
return _mean(
[
score,
_bool_score(gate.get("command_intensity_ready") is True),
_bool_score(gate.get("perfect_instruction_following_claim_allowed") is False),
]
)
@staticmethod
def _ultra_deep_score(report: dict[str, Any]) -> float:
score = _score(report.get("scores", {}).get("ultra_deep_sharpness_score"))
gate = report.get("claim_gate", {})
return _mean(
[
score,
_bool_score(gate.get("ultra_deep_local_ready") is True),
_bool_score(gate.get("flawless_or_frontier_claim_allowed") is False),
]
)
@staticmethod
def _blockers(mythos: dict[str, Any], local_scores: list[float], external_score: float) -> list[str]:
blockers = []
if external_score < 100.0:
blockers.append("external_frontier_evidence")
if min(local_scores) < 70.0:
blockers.append("local_axis_below_70")
for item in mythos.get("hard_blockers", []):
if item not in blockers:
blockers.append(str(item))
return blockers
@staticmethod
def _upgrade_plan(blockers: list[str]) -> list[dict[str, str]]:
plan = []
if "external_frontier_evidence" in blockers:
plan.append(
{
"priority": "P0",
"action": "run/import official external eval evidence before any Mythos/frontier superiority claim",
"expected_effect": "turns local sharpness into comparable public evidence",
}
)
if "local_axis_below_70" in blockers:
plan.append(
{
"priority": "P0",
"action": "repair weakest local axis with targeted SFT/eval loop",
"expected_effect": "prevents one-axis collapse under broad prompts",
}
)
plan.append(
{
"priority": "P1",
"action": "train with high-density essence plus anti-template probes and completion-only loss",
"expected_effect": "raises response sharpness without memorized fixed answers",
}
)
plan.append(
{
"priority": "P1",
"action": "route extracted image/audio/document/code features through PureLatticeCNN before language reasoning",
"expected_effect": "improves local pattern sensitivity with bounded parameter cost",
}
)
return plan
def build_deep_sharp_model_analysis(
out_dir: str | Path,
*,
cfg: OmegaConfig | None = None,
inputs: dict[str, str] | None = None,
) -> dict[str, Any]:
cfg = cfg or purefield_config("12b")
cfg.cnn_core_enabled = True
report = DeepSharpModelAnalyzer(cfg).analyze(inputs=inputs)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
json_path = out / "deep_sharp_model_analysis.json"
md_path = out / "deep_sharp_model_analysis.md"
report["json_path"] = str(json_path)
report["markdown_path"] = str(md_path)
json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
md_path.write_text(_markdown(report), encoding="utf-8")
return report
def _markdown(report: dict[str, Any]) -> str:
lines = [
"# TinyMind DeepSharp Model Analysis",
"",
f"- Deep sharp local score: {report['scores']['deep_sharp_local_score']:.2f}",
f"- Deep sharp total score: {report['scores']['deep_sharp_total_score']:.2f}",
f"- Local ready: {report['claim_gate']['deep_sharp_local_ready']}",
f"- Mythos/frontier superiority claim allowed: {report['claim_gate']['frontier_or_mythos_superiority_claim_allowed']}",
"",
"## Scores",
"",
"| Axis | Score |",
"|---|---:|",
]
for key, value in report["scores"].items():
lines.append(f"| {key} | {value:.2f} |")
lines.extend(["", "## Upgrade Plan", ""])
for item in report["upgrade_plan"]:
lines.append(f"- {item['priority']}: {item['action']} ({item['expected_effect']})")
return "\n".join(lines) + "\n"

Xet Storage Details

Size:
12.9 kB
·
Xet hash:
ada3f98453728cd8233493b6f4239d11a26c7eb7a7c846825364766ebc199c6a

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.