bbkdevops's picture
download
raw
13.2 kB
from __future__ import annotations
from datetime import datetime, timezone
import json
import os
from pathlib import Path
import subprocess
import time
from typing import Any, Callable
import torch
from model.axiom_kv import build_axiomkv_report
from model.config import purefield_config
from model.regen_kv import ReGenesisKVBlock
from runtime.evidence_regenesis_dll import DEFAULT_DLL, EvidenceReGenesisDll
ProviderFn = Callable[[str, str], str]
PROBES = [
{
"axis": "tool_json",
"prompt": 'Return only JSON for a tool named "search_web" with arguments query="thai benchmark" and k=3.',
"check": "json_tool",
},
{
"axis": "math",
"prompt": "What is 7 * 8? Return only the number.",
"check": "contains_56",
},
{
"axis": "thai",
"prompt": "ตอบเป็นภาษาไทย: อธิบาย Data Governance แบบสั้นและชัดเจน",
"check": "thai_chars",
},
{
"axis": "code",
"prompt": "Write a minimal Python function add(a, b) that returns their sum.",
"check": "python_def",
},
]
def _score_response(check: str, text: str) -> float:
body = str(text)
if check == "json_tool":
try:
payload = json.loads(body)
except json.JSONDecodeError:
return 0.0
return 1.0 if payload.get("tool") == "search_web" and isinstance(payload.get("arguments"), dict) else 0.0
if check == "contains_56":
return 1.0 if "56" in body else 0.0
if check == "thai_chars":
return 1.0 if any("\u0e00" <= char <= "\u0e7f" for char in body) else 0.0
if check == "python_def":
return 1.0 if "def " in body and "return" in body else 0.0
return 0.0
def _hf_provider_fn(provider: str | None, timeout: float) -> ProviderFn:
from huggingface_hub import InferenceClient
def call(model_id: str, prompt: str) -> str:
client = InferenceClient(model=model_id, provider=provider or "auto", timeout=timeout)
response = client.chat_completion(
messages=[{"role": "user", "content": prompt}],
max_tokens=128,
temperature=0.0,
)
return str(response.choices[0].message.content or "")
return call
def _llm_stats_provider_fn(timeout: float) -> ProviderFn:
from evaluation.llm_stats_gateway import LLMStatsGatewayClient, _choice_text
client = LLMStatsGatewayClient(timeout_s=timeout)
client.headers()
def call(model_id: str, prompt: str) -> str:
response = client.chat_completion(
model=model_id,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=128,
)
return _choice_text(response)
return call
def _run_external_eval(
provider_models: list[str],
provider_fn: ProviderFn | None,
*,
run_external: bool,
provider: str | None,
provider_kind: str,
timeout: float,
) -> dict[str, Any]:
if not run_external:
return {
"status": "skipped",
"reason": "run_external=false; local stress still ran.",
"models": [],
"external_official_result": False,
}
if provider_fn is None:
if provider_kind == "hf" and not os.environ.get("HF_TOKEN"):
return {
"status": "blocked",
"reason": "HF_TOKEN is not available in this process; no token was printed or embedded.",
"models": [],
"external_official_result": False,
}
try:
if provider_kind == "hf":
provider_fn = _hf_provider_fn(provider, timeout)
elif provider_kind == "llm-stats":
provider_fn = _llm_stats_provider_fn(timeout)
else:
raise ValueError(f"unsupported provider_kind={provider_kind!r}")
except Exception as exc:
return {
"status": "blocked",
"reason": f"provider client unavailable: {type(exc).__name__}: {exc}",
"models": [],
"external_official_result": False,
}
models = []
for model_id in provider_models:
rows = []
for probe in PROBES:
try:
response = provider_fn(model_id, probe["prompt"])
score = _score_response(probe["check"], response)
rows.append(
{
"axis": probe["axis"],
"score": score,
"response_excerpt": response[:300],
}
)
except Exception as exc:
rows.append(
{
"axis": probe["axis"],
"score": 0.0,
"error": f"{type(exc).__name__}: {exc}",
}
)
score = sum(float(row["score"]) for row in rows) / max(len(rows), 1)
models.append({"model_id": model_id, "score": score, "rows": rows})
return {
"status": "completed",
"provider_kind": provider_kind,
"provider": provider or "auto",
"models": models,
"external_official_result": False,
"note": "Provider probes are live external calls when enabled, but they are not official leaderboard submissions.",
}
def _run_regenesis_dll_stress(out_dir: Path, loops: int = 16) -> dict[str, Any]:
dll = EvidenceReGenesisDll(DEFAULT_DLL)
cfg = purefield_config("tiny")
cfg.vocab_size = 512
cfg.dim = 32
cfg.n_layers = 1
cfg.memory_ranks = 4
cfg.timescale_count = 2
cfg.local_window = 4
cfg.regen_kv_enabled = True
cfg.regen_kv_rank = 4
cfg.retrieval_top_k = 2
block = ReGenesisKVBlock(cfg)
all_finite = True
tokens_seen = 0
for idx in range(max(1, loops)):
with dll.open(out_dir / f"regenesis_ledger_{idx}", chunk_tokens=8) as ledger:
base = 1000 + idx * 64
ledger.ingest_i64(range(base, base + 32))
ledger.verify()
chunk = ledger.retrieve_chunk_tokens_for_token(base + 17)
torch.manual_seed(20260527 + idx)
hidden = torch.randn(1, 4, cfg.dim, requires_grad=True)
out, cache, _aux = block(hidden, retrieved_chunks=[chunk], return_aux=True)
loss = out.pow(2).mean() + cache["regen_k"].pow(2).mean() + cache["regen_v"].pow(2).mean()
loss.backward()
all_finite = all_finite and bool(torch.isfinite(out).all().item()) and hidden.grad is not None and bool(torch.isfinite(hidden.grad).all().item())
all_finite = all_finite and int(cache["kv_tokens_stored"]) == 0
tokens_seen += len(chunk)
return {
"loops": max(1, loops),
"retrieved_tokens_seen": tokens_seen,
"passed": bool(all_finite),
"kv_tokens_stored": 0,
}
def _rss_mb() -> float | None:
try:
import psutil # type: ignore
return float(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
except Exception:
return None
def _gpu_snapshot() -> dict[str, Any]:
try:
completed = subprocess.run(
[
"nvidia-smi",
"--query-gpu=memory.used,utilization.gpu,temperature.gpu,power.draw",
"--format=csv,noheader,nounits",
],
check=False,
capture_output=True,
text=True,
timeout=5,
)
except Exception as exc:
return {"available": False, "error": f"{type(exc).__name__}: {exc}"}
if completed.returncode != 0 or not completed.stdout.strip():
return {"available": False, "error": completed.stderr.strip()[:300]}
row = completed.stdout.strip().splitlines()[0]
parts = [part.strip() for part in row.split(",")]
return {
"available": True,
"memory_used_mb": float(parts[0]) if len(parts) > 0 and parts[0] else None,
"gpu_util_pct": float(parts[1]) if len(parts) > 1 and parts[1] else None,
"temperature_c": float(parts[2]) if len(parts) > 2 and parts[2] else None,
"power_w": float(parts[3]) if len(parts) > 3 and parts[3] else None,
}
def _summary(values: list[float]) -> dict[str, float | None]:
if not values:
return {"min": None, "max": None, "mean": None, "drift": None}
return {
"min": min(values),
"max": max(values),
"mean": sum(values) / len(values),
"drift": values[-1] - values[0],
}
def _run_soak(out_dir: Path, seconds: float, sample_interval: float) -> dict[str, Any]:
if seconds <= 0:
return {"enabled": False, "iterations": 0}
started = time.perf_counter()
deadline = started + seconds
samples: list[dict[str, Any]] = []
idx = 0
while True:
iter_started = time.perf_counter()
stress = _run_regenesis_dll_stress(out_dir / f"soak_{idx}", loops=1)
latency_ms = (time.perf_counter() - iter_started) * 1000.0
samples.append(
{
"iteration": idx,
"t_s": time.perf_counter() - started,
"latency_ms": latency_ms,
"rss_mb": _rss_mb(),
"gpu": _gpu_snapshot(),
"passed": stress["passed"],
}
)
idx += 1
if time.perf_counter() >= deadline:
break
time.sleep(max(0.0, min(sample_interval, deadline - time.perf_counter())))
latencies = [float(row["latency_ms"]) for row in samples]
rss_values = [float(row["rss_mb"]) for row in samples if row["rss_mb"] is not None]
gpu_mem = [
float(row["gpu"]["memory_used_mb"])
for row in samples
if isinstance(row.get("gpu"), dict) and row["gpu"].get("memory_used_mb") is not None
]
path = out_dir / "soak_samples.jsonl"
path.write_text("\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in samples) + "\n", encoding="utf-8")
return {
"enabled": True,
"duration_requested_s": seconds,
"duration_actual_s": time.perf_counter() - started,
"iterations": len(samples),
"samples_path": str(path),
"all_iterations_passed": all(bool(row["passed"]) for row in samples),
"latency_ms": _summary(latencies),
"rss_mb": _summary(rss_values),
"gpu_memory_used_mb": _summary(gpu_mem),
}
def build_external_stress_suite(
out_dir: str | Path,
*,
provider_models: list[str] | None = None,
provider_fn: ProviderFn | None = None,
run_external: bool = False,
provider: str | None = None,
provider_kind: str = "hf",
timeout: float = 60.0,
stress_seq_lengths: list[int] | None = None,
regenesis_loops: int = 16,
soak_seconds: float = 0.0,
soak_sample_interval: float = 5.0,
) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
provider_models = provider_models or ["Qwen/Qwen2.5-72B-Instruct"]
seq_lengths = stress_seq_lengths or [128, 1024, 8192, 65536]
stress_local_window = min(64, max(1, min(seq_lengths)))
external = _run_external_eval(
provider_models,
provider_fn,
run_external=run_external,
provider=provider,
provider_kind=provider_kind,
timeout=timeout,
)
axiomkv = build_axiomkv_report(
out / "axiomkv",
effective_dim=20_480,
physical_dim=512,
seq_lengths=seq_lengths,
local_window=stress_local_window,
anchor_slots=64,
anchor_rank=64,
)
regenesis = _run_regenesis_dll_stress(out, loops=regenesis_loops)
soak = _run_soak(out, soak_seconds, soak_sample_interval)
soak_passed = (not soak.get("enabled")) or bool(soak.get("all_iterations_passed"))
stress_passed = bool(axiomkv["bounded_kv_gate"]["passed"] and regenesis["passed"] and soak_passed)
external_completed = external["status"] == "completed"
report = {
"schema": "tinymind.external_stress_suite.v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"external_eval": external,
"stress": {
"passed": stress_passed,
"axiomkv_report": axiomkv["json_path"],
"axiomkv_cached_token_capacity": axiomkv["bounded_kv_gate"]["cached_token_capacity"],
"axiomkv_seq_lengths": seq_lengths,
"regenesis_dll_to_kv": regenesis,
"soak": soak,
},
"claim_gate": {
"external_probe_completed": external_completed,
"local_stress_passed": stress_passed,
"official_external_claim_allowed": False,
"production_complete_claim_allowed": False,
"world_best_claim_allowed": False,
"reason": "External provider probes and local stress tests are evidence, not official leaderboard ranks or production soak certification.",
},
}
path = out / "external_stress_suite_report.json"
report["json_path"] = str(path)
path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return report

Xet Storage Details

Size:
13.2 kB
·
Xet hash:
e9e87c8baa200f907e635dd9209cc3cfc9d7acbde5abef74df8feb9574882052

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.