bbkdevops's picture
download
raw
12.3 kB
from __future__ import annotations
from datetime import datetime, timezone
import json
from pathlib import Path
import re
import time
from typing import Any
def _load(path: str | Path | None) -> dict[str, Any]:
if not path:
return {}
p = Path(path)
if not p.exists():
return {}
return json.loads(p.read_text(encoding="utf-8", errors="replace"))
def _training_progress(log_path: str | Path | None) -> dict[str, Any]:
if not log_path or not Path(log_path).exists():
return {"active": False, "status": "missing_log", "step": 0, "total_steps": 0, "percent": 0.0}
path = Path(log_path)
text = path.read_text(encoding="utf-8", errors="replace")
text = text.replace("\x00", "")
matches = re.findall(r"(\d+)%\|[^\r\n]*?\|\s*(\d+)/(\d+)", text)
age_seconds = max(0.0, time.time() - path.stat().st_mtime)
if not matches:
return {"active": age_seconds < 900, "status": "log_present_no_progress", "step": 0, "total_steps": 0, "percent": 0.0, "log_age_seconds": age_seconds}
pct, step, total = matches[-1]
step_i = int(step)
total_i = int(total)
completed = total_i > 0 and step_i >= total_i
active = (not completed) and age_seconds < 900
status = "completed" if completed else ("running_recent_log" if active else "stale_or_stopped")
return {
"active": active,
"status": status,
"step": step_i,
"total_steps": total_i,
"percent": float(pct),
"log_age_seconds": age_seconds,
}
def _contamination_diagnostics(data: dict[str, Any]) -> dict[str, Any]:
kept = float(data.get("kept_records", 0))
rejected = float(data.get("rejected_records", 0))
total = kept + rejected
reject_rate = rejected / total if total else 0.0
domain_counts = data.get("domain_counts", {}) or {}
max_domain = max(domain_counts.values(), default=0)
max_domain_share = max_domain / kept if kept else 0.0
coverage_share = float(domain_counts.get("coverage_100k", 0)) / kept if kept else 0.0
reject_counts = data.get("reject_counts", {}) or {}
hard_rejects = float(reject_counts.get("duplicate_semantic_hash", 0)) + float(reject_counts.get("encoded_blob", 0)) + float(
reject_counts.get("repetition_loop", 0)
)
hard_reject_rate = hard_rejects / total if total else 0.0
risk = min(100.0, reject_rate * 55.0 + max_domain_share * 30.0 + hard_reject_rate * 120.0)
return {
"reject_rate": reject_rate,
"hard_reject_rate": hard_reject_rate,
"max_domain_share": max_domain_share,
"coverage_100k_share": coverage_share,
"contamination_risk": risk,
"dominant_domain": max(domain_counts, key=domain_counts.get) if domain_counts else None,
}
def _score_axes(data: dict[str, Any], evo: dict[str, Any], llm: dict[str, Any], progress: dict[str, Any]) -> dict[str, float]:
kept = float(data.get("kept_records", 0))
domain_counts = data.get("domain_counts", {}) or {}
domains = len(domain_counts)
instruction_share = (
float(domain_counts.get("alignment_constraint_following", 0)) + float(domain_counts.get("logic_instruction_following", 0))
) / kept if kept else 0.0
tool_share = (
float(domain_counts.get("alignment_tool_calling", 0))
+ float(domain_counts.get("logic_tool_grounding", 0))
+ float(domain_counts.get("sandbox_tools", 0))
) / kept if kept else 0.0
code_share = (
float(domain_counts.get("logic_coding_python", 0)) + float(domain_counts.get("logic_coding_cpp_rust", 0))
) / kept if kept else 0.0
evo_score = float(((evo.get("scores") or {}).get("whole_body_evo_score") or 0.0)) * 100.0
external = 100.0 if (llm.get("claim_gate") or {}).get("external_stats_imported") else 0.0
train_pct = float(progress.get("percent", 0.0))
diag = _contamination_diagnostics(data)
risk_penalty = diag["contamination_risk"] * 0.20
proxy_cap = 82.0 if external else 76.0
return {
"translation_th_en_proxy": max(0.0, min(proxy_cap, 38.0 + domains * 3.5 - risk_penalty)),
"instruction_following_proxy": max(0.0, min(proxy_cap, 42.0 + instruction_share * 105.0 + train_pct * 0.12 - risk_penalty)),
"coding_project_agent_proxy": max(0.0, min(proxy_cap, 44.0 + domains * 2.2 + code_share * 70.0 - risk_penalty)),
"tool_grounding_reliability_proxy": max(0.0, min(proxy_cap, 43.0 + tool_share * 110.0 + external * 0.04 - risk_penalty)),
"knowledge_mmlu_pro_proxy": max(0.0, min(proxy_cap, 28.0 + kept / 2200.0 - risk_penalty)),
"long_context_recall_proxy": max(0.0, min(74.0, 48.0 + domains * 2.0 - diag["hard_reject_rate"] * 100.0)),
"data_purity_score": max(0.0, min(99.0, 100.0 - diag["contamination_risk"])),
"evo_quality_per_resource_proxy": max(0.0, min(proxy_cap, evo_score - risk_penalty)),
}
def _axis_evidence(data: dict[str, Any], llm: dict[str, Any]) -> dict[str, Any]:
diag = _contamination_diagnostics(data)
domain_counts = data.get("domain_counts", {}) or {}
kept = float(data.get("kept_records", 0))
alignment = {
"instruction_alignment_share": (
float(domain_counts.get("alignment_constraint_following", 0)) + float(domain_counts.get("logic_instruction_following", 0))
)
/ kept
if kept
else 0.0,
"tool_alignment_share": (
float(domain_counts.get("alignment_tool_calling", 0))
+ float(domain_counts.get("logic_tool_grounding", 0))
+ float(domain_counts.get("sandbox_tools", 0))
)
/ kept
if kept
else 0.0,
}
return {
"score_type": "diagnostic_proxy_not_official_eval",
"proxy_cap_applied": True,
"official_eval_present": False,
"external_baseline_imported": bool((llm.get("claim_gate") or {}).get("external_stats_imported")),
"contamination_diagnostics": diag,
"alignment_diagnostics": alignment,
"known_dashboard_limits": [
"No submitted official leaderboard score is included.",
"Knowledge and long-context axes are proxies, not MMLU-Pro or passkey benchmark results.",
"High source-grounded data volume can still overfit if train/eval splits are not adversarial.",
"A 100 score is blocked for all proxy axes.",
],
}
def build_current_model_results(
out_dir: str | Path,
*,
gguf_path: str | Path = "model/astraweave-fusion/artifacts/tinymind-purebase.gguf",
train_log: str | Path | None = "reports/qlora_runs/sandbox_12b_25690524_133816/train.log",
data_manifest: str | Path | None = "reports/dataset_quality_governor/dataset_quality_governor_manifest.json",
evo_report: str | Path | None = "reports/evo_whole_body/evo_whole_body_report.json",
llm_stats_report: str | Path | None = "reports/llm_stats/llm_stats_report.json",
) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
gguf = Path(gguf_path)
data = _load(data_manifest)
evo = _load(evo_report)
llm = _load(llm_stats_report)
progress = _training_progress(train_log)
axes = _score_axes(data, evo, llm, progress)
axis_evidence = _axis_evidence(data, llm)
model = {
"name": "TinyMind PureBase + 12B QLoRA Evo stack",
"gguf_path": str(gguf),
"gguf_size_gb": gguf.stat().st_size / (1024**3) if gguf.exists() else 0.0,
"training_progress": progress,
"governed_records": data.get("kept_records", 0),
"rejected_records": data.get("rejected_records", 0),
}
report = {
"schema_version": "tinymind-current-model-results-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"model": model,
"axes": axes,
"axis_evidence": axis_evidence,
"external_baseline": {
"source": "LLM-Stats",
"category": (llm.get("category") or "unknown"),
"rank1": (((llm.get("rankings") or {}).get("models") or [{}])[0] if (llm.get("rankings") or {}).get("models") else {}),
},
"claim_gate": {
"world_best_claim_allowed": False,
"official_rank_claim_allowed": False,
"reason": "Current dashboard mixes local evidence/proxies and imported external baselines. TinyMind needs official submitted results before rank claims.",
"proxy_100_score_allowed": False,
"contamination_cleared": axis_evidence["contamination_diagnostics"]["contamination_risk"] < 20.0,
},
}
json_path = out / "current_model_results.json"
md_path = out / "report.md"
png_path = out / "results.png"
versioned_png_path = out / f"results_diagnostic_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.png"
report["json_path"] = str(json_path)
report["markdown_path"] = str(md_path)
report["png_path"] = str(png_path)
report["versioned_png_path"] = str(versioned_png_path)
json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
md_path.write_text(_markdown(report), encoding="utf-8")
_plot(report, png_path)
_plot(report, versioned_png_path)
return report
def _markdown(report: dict[str, Any]) -> str:
m = report["model"]
lines = [
"# Results",
"",
f"- Model: {m['name']}",
f"- GGUF size GB: {m['gguf_size_gb']:.4f}",
f"- QLoRA progress: {m['training_progress']['step']}/{m['training_progress']['total_steps']} ({m['training_progress']['percent']:.1f}%)",
f"- Governed records: {m['governed_records']}",
f"- Rejected records: {m['rejected_records']}",
f"- Training status: {m['training_progress'].get('status', 'unknown')}",
f"- Contamination risk: {report['axis_evidence']['contamination_diagnostics']['contamination_risk']:.2f}",
f"- World-best claim allowed: {report['claim_gate']['world_best_claim_allowed']}",
"",
"## Axes",
]
for key, value in report["axes"].items():
lines.append(f"- {key}: {value:.2f}")
return "\n".join(lines) + "\n"
def _plot(report: dict[str, Any], path: Path) -> None:
import matplotlib.pyplot as plt
import numpy as np
axes = report["axes"]
names = list(axes.keys())
values = [axes[k] for k in names]
fig = plt.figure(figsize=(10, 7), dpi=140)
gs = fig.add_gridspec(2, 2, height_ratios=[1.15, 1.0])
ax_bar = fig.add_subplot(gs[0, :])
colors = plt.cm.Set3(np.linspace(0, 1, len(values)))
ax_bar.bar(range(len(values)), values, color=colors, edgecolor="#334155", linewidth=0.4)
ax_bar.set_ylim(0, 105)
ax_bar.set_ylabel("Score (%)")
ax_bar.set_title("TinyMind Current Diagnostic Results")
ax_bar.text(
0.99,
0.94,
"ANTI-CONTAMINATION PROXY VIEW\n100% BLOCKED",
transform=ax_bar.transAxes,
ha="right",
va="top",
fontsize=8,
color="#991b1b",
bbox={"boxstyle": "round,pad=0.25", "facecolor": "#fee2e2", "edgecolor": "#991b1b", "linewidth": 0.6},
)
ax_bar.set_xticks(range(len(values)))
ax_bar.set_xticklabels([n.replace("_", "\n") for n in names], fontsize=7)
for i, v in enumerate(values):
ax_bar.text(i, min(v + 2, 102), f"{v:.1f}", ha="center", fontsize=7)
for idx, title in enumerate(("Core Evaluation", "Domain/Tool Evaluation")):
ax = fig.add_subplot(gs[1, idx], polar=True)
subset = values[idx * 4 : idx * 4 + 4]
labels = names[idx * 4 : idx * 4 + 4]
theta = np.linspace(0, 2 * np.pi, len(subset), endpoint=False)
data = np.r_[subset, subset[0]]
theta_closed = np.r_[theta, theta[0]]
ax.plot(theta_closed, data, color="#2563eb", linewidth=1.5)
ax.fill(theta_closed, data, color="#60a5fa", alpha=0.25)
ax.set_ylim(0, 100)
ax.set_xticks(theta)
ax.set_xticklabels([l.replace("_", "\n") for l in labels], fontsize=7)
ax.set_yticklabels([])
ax.set_title(title, fontsize=10)
fig.text(0.05, 0.03, "Diagnostic proxy view: 100 scores are blocked until held-out/official eval evidence exists.", fontsize=10, color="#475569")
fig.tight_layout(rect=[0, 0.05, 1, 1])
fig.savefig(path, bbox_inches="tight")
plt.close(fig)

Xet Storage Details

Size:
12.3 kB
·
Xet hash:
a0c8bc4fb725d26c9eda383b122011abfc81bf6c40da10842fe6e520f547fa61

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.