kirikir13's picture
download
raw
24.7 kB
# === PATCHED for Qwen 40B Hybrid Model ===
# RAW_PATTERNS: added linear_attn weights (A_log, conv1d, dt_bias, in_proj, out_proj)
# Defaults: target=10, anchor=12, spectral=0.95, snr_floor=5.0
# ==========================================
#!/usr/bin/env python3
"""
DR Studios — Therium Quantizer v4.3 "SNR-visible / spectrally-damped"
Design: Dave | Build: Brother Sonnet | GWP
Buckets are MOUNTED into the Space as local folders (e.g. /data, /dataOUT).
So we read/write plain local files — NO bucket API, NO token, NO path-prefix wars.
• SNR VISIBLE: every compressed tensor prints SNR (dB) and cosine live. Never blind.
• SNR FLOOR: tensors that drop below TH_SNR_FLOOR (default 5 dB) are stored raw.
• SPECTRAL DAMPING: residual weights are scaled by TH_SPECTRAL (<1.0) to tighten
the contractive bound on high-anchor / high-target runs.
• SIBLING-AWARE: if source is /data/data, it looks for output in /data/model.index.
• STUB-PROOF: ignores Git-LFS pointer stubs (<1 MB).
• AUTO-START on boot resumes into sibling output folders if they exist.
• AUTO-RESTART on crash (resume skips finished shards). Built to grind ~48h unattended.
• AUTO-LOG to <dest>/<prefix>/therium_run.log — a local write that syncs to the bucket.
• Live status panel refreshes itself (gr.Timer) — closing the tab can't kill the run.
"""
import os
import json
import time
import gc
import glob
import threading
import traceback
import datetime
import numpy as np
import gradio as gr
from therium_engine import TheriumAnchorQuantizer, pkg_to_bytes
RAW_PATTERNS = ("norm", "embed", "lm_head", "rotary", "bias",
"visual", "vision", "patch_embed", "patch_emb", "merger",
"A_log", "conv1d", "dt_bias", "in_proj", "out_proj", "linear_attn")
RAW_MIN_ELEMS = 4096
MAX_RESTARTS = 500
FLUSH_EVERY = 8
SNR_FLOOR = float(os.environ.get("TH_SNR_FLOOR", "5.0")) # dB — below this, store raw
# Ignore Git-LFS pointer stubs and ghost directory entries
MIN_FILE_SIZE = 1024 * 1024 # 1 MB — real shards are GBs, stubs are ~130 bytes
LOG = []
STATE = {"status": "idle", "shards_total": 0, "shards_done": 0,
"tensors_therium": 0, "tensors_raw": 0, "tensors_skip": 0,
"failures": 0, "overall_ratio": 0.0, "current_shard": "",
"src": "", "dest": "", "started": None, "last_update": None,
"last_snr": 0.0, "last_cos": 0.0, "snr_fails": 0}
_LK = threading.Lock()
_RUNNING = False
def _ts():
return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
# ----------------------------------------------------- mount auto-detection
def list_mounts():
pts = set()
try:
with open("/proc/mounts") as fh:
for line in fh:
parts = line.split()
if len(parts) >= 3:
mp, fstype = parts[1], parts[2]
if any(k in fstype for k in ("fuse", "nfs")) or mp.startswith("/data"):
pts.add(mp)
except Exception:
pass
for c in ("/data", "/dataOUT", "/dataout", "/data1", "/data_out",
"/inputs", "/outputs", "/in", "/out", "/mnt/data"):
if os.path.isdir(c):
pts.add(c)
return sorted(pts)
def _is_real_safetensors(path):
try:
return os.path.isfile(path) and os.path.getsize(path) >= MIN_FILE_SIZE
except Exception:
return False
def scan_source_dirs():
"""Folders (inside mounts) that contain real, readable *.safetensors (>1 MB)."""
found = set()
for mp in list_mounts():
try:
for root, _dirs, files in os.walk(mp):
safetensor_files = [f for f in files if f.endswith(".safetensors")]
if safetensor_files:
sample_paths = [os.path.join(root, f) for f in safetensor_files]
if any(_is_real_safetensors(p) for p in sample_paths):
found.add(root)
if root.count(os.sep) - mp.count(os.sep) > 4:
_dirs[:] = []
except Exception:
pass
return sorted(found)
def list_writable_dirs():
return [mp for mp in list_mounts() if os.access(mp, os.W_OK)]
def find_existing_runs(mounts):
"""Find directories that already contain Therium output (.tza, manifest.json, or therium_run.log)."""
runs = []
seen = set()
for mp in mounts:
if not os.path.isdir(mp):
continue
try:
for root, dirs, files in os.walk(mp):
has_tza = any(
f.endswith('.tza') and os.path.isfile(os.path.join(root, f)) and os.path.getsize(os.path.join(root, f)) > 10
for f in files
)
has_meta = any(
f in ('manifest.json', 'therium_run.log') and os.path.isfile(os.path.join(root, f))
for f in files
)
if has_tza or has_meta:
if root not in seen:
runs.append(root)
seen.add(root)
dirs[:] = []
continue
if root.count(os.sep) - mp.count(os.sep) > 3:
dirs[:] = []
except Exception:
pass
return sorted(runs)
def find_sibling_outputs(source_dir):
"""If source is /data/data, look for non-source sibling dirs under /data that have files.
This catches same-bucket layouts like:
/data/data <- raw model
/data/model.index <- previous output
"""
outputs = []
for m in sorted(list_mounts(), key=len, reverse=True):
if not source_dir.startswith(m + os.sep):
continue
try:
for entry in os.listdir(m):
sibling = os.path.join(m, entry)
if sibling == source_dir or not os.path.isdir(sibling):
continue
# Skip if it's another source dir (has real safetensors)
files = os.listdir(sibling)
has_raw = any(
f.endswith('.safetensors') and _is_real_safetensors(os.path.join(sibling, f))
for f in files
)
if has_raw:
continue
# If it contains real files, treat it as a candidate output / resume dir
has_files = any(
os.path.isfile(os.path.join(sibling, f)) and os.path.getsize(os.path.join(sibling, f)) > 10
for f in files
)
if has_files:
outputs.append((sibling, os.path.relpath(sibling, m)))
except Exception:
pass
return outputs
def _shards_in(d):
candidates = glob.glob(os.path.join(d, "**", "*.safetensors"), recursive=True)
real = []
for p in candidates:
try:
if _is_real_safetensors(p):
real.append(p)
except Exception:
pass
return sorted(real)
# ----------------------------------------------------- logging
def log_line(msg):
line = f"[{_ts()}] {msg}"
with _LK:
LOG.append(line)
if len(LOG) > 6000:
del LOG[:2000]
STATE["last_update"] = _ts()
return line
def flush_log(log_dir):
try:
os.makedirs(log_dir, exist_ok=True)
with _LK:
body = "\n".join(LOG[-4000:]); st = dict(STATE)
with open(os.path.join(log_dir, "therium_run.log"), "w") as f:
f.write(body)
with open(os.path.join(log_dir, "therium_state.json"), "w") as f:
json.dump(st, f, indent=2)
except Exception:
pass
# ----------------------------------------------------- per-tensor
def _is_raw(name, n_elems):
low = name.lower()
return n_elems < RAW_MIN_ELEMS or any(p in low for p in RAW_PATTERNS)
def _write_local(path, data):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(data)
def _handle_tensor(key, t, torch, q, base, resume):
out_path = os.path.join(base, key + ".tza")
raw_path = os.path.join(base, key + ".raw")
if resume and (os.path.exists(out_path) or os.path.exists(raw_path)):
return ("skip", 0, 0, 0.0, 0.0, None, None, "")
n_elems = int(t.numel())
shape = list(t.shape)
orig_itemsize = 2 if t.dtype in (torch.float16, torch.bfloat16) else t.element_size()
orig_bytes = n_elems * orig_itemsize
if _is_raw(key, n_elems):
raw = t.to(torch.float16).cpu().numpy().tobytes()
_write_local(raw_path, raw)
return ("raw", orig_bytes, len(raw), 0.0, 0.0, shape, "raw", key + ".raw")
arr = t.float().cpu().numpy()
pkg = q.compress(arr, orig_itemsize=orig_itemsize)
recon = q.decompress(pkg)
snr, cos = TheriumAnchorQuantizer.analyze(arr, recon)
with _LK:
STATE["last_snr"] = snr; STATE["last_cos"] = cos
if snr < SNR_FLOOR:
# quality would be garbage — fall back to raw fp16
raw = t.to(torch.float16).cpu().numpy().tobytes()
_write_local(raw_path, raw)
del arr, pkg, recon
with _LK:
STATE["snr_fails"] += 1
return ("snr_raw", orig_bytes, len(raw), snr, cos, shape, "raw", key + ".raw")
blob = pkg_to_bytes(pkg)
_write_local(out_path, blob)
del arr, pkg, recon
return ("therium", orig_bytes, len(blob), snr, cos, shape, "therium", key + ".tza")
# ----------------------------------------------------- one pass
def _one_pass(cfg):
import torch
from safetensors import safe_open
q = TheriumAnchorQuantizer(n_clusters=256, chunk_size=max(2, round(float(cfg["target"])) + 1),
anchor_rank=int(cfg["anchor"]), max_iter=5,
spectral_damping=float(cfg.get("spectral_damping", 1.0)))
src = cfg["src"]
base = os.path.join(cfg["dest"], cfg["prefix"].strip("/"))
resume = bool(cfg["resume"])
with _LK:
STATE["src"] = src; STATE["dest"] = base
log_line(f"=== Therium v4.3 pass | src={src} -> {base} | target~{cfg['target']} anchor={cfg['anchor']} spectral={cfg.get('spectral_damping', 1.0)} ===")
flush_log(base)
shards = _shards_in(src)
if not shards:
raise RuntimeError(f"no REAL .safetensors under {src} (LFS pointer stubs ignored)")
with _LK:
STATE["shards_total"] = len(shards)
log_line(f"{len(shards)} real shard(s) found")
flush_log(base)
tot_orig = tot_comp = 0
nd = nr = nsk = nsnr = 0
manifest_entries = []
opened_any = False
for si, shard in enumerate(shards):
with _LK:
STATE["current_shard"] = os.path.basename(shard); STATE["shards_done"] = si
log_line(f"-- shard {si+1}/{len(shards)}: {os.path.basename(shard)}")
try:
if not os.path.exists(shard):
raise FileNotFoundError(shard)
handle = safe_open(shard, framework="pt")
except Exception as e:
log_line(f" !! cannot open this shard, skipping it: {e}")
log_line(f" (if EVERY shard skips: the Source dropdown is pointing at the wrong "
f"bucket, or that bucket is mounted read-only — pick the model bucket instead)")
flush_log(base)
continue
opened_any = True
for ki, key in enumerate(handle.keys()):
t = handle.get_tensor(key)
status, ob, cb, snr, cos, shape, fmt, fname = _handle_tensor(key, t, torch, q, base, resume)
tot_orig += ob; tot_comp += cb
if status == "skip": nsk += 1
elif status in ("raw", "snr_raw"): nr += 1; nsnr += 1 if status == "snr_raw" else 0
else: nd += 1
if status != "skip":
manifest_entries.append({"key": key, "file": fname, "format": fmt,
"shape": shape, "orig_bytes": ob, "comp_bytes": cb,
"snr": round(snr, 2), "cos": round(cos, 4)})
del t
with _LK:
STATE.update(tensors_therium=nd, tensors_raw=nr, tensors_skip=nsk,
snr_fails=nsnr, last_snr=snr, last_cos=cos,
overall_ratio=tot_orig / max(tot_comp, 1))
if (ki % FLUSH_EVERY) == 0:
gc.collect()
qual = f"SNR={snr:5.1f}dB cos={cos:.4f}"
log_line(f" {key[:46]:<46} run={tot_orig/max(tot_comp,1):5.2f}x (T={nd} R={nr} S={nsk} F={nsnr}) {qual}")
flush_log(base)
del handle; gc.collect()
with _LK:
STATE["shards_done"] = si + 1
log_line(f" shard {si+1} done . overall={tot_orig/max(tot_comp,1):.2f}x")
flush_log(base)
if not opened_any:
raise RuntimeError("found shard names but NONE could be opened — the Source is pointing "
"at the wrong bucket (or it's mounted read-only). Pick the model "
"bucket in the Source dropdown and Start again.")
overall = tot_orig / max(tot_comp, 1)
_write_local(os.path.join(base, "manifest.json"),
json.dumps({"src": src, "overall_ratio": round(overall, 3),
"tensors_therium": nd, "tensors_raw": nr, "tensors_skipped": nsk,
"snr_fails": nsnr, "snr_floor": SNR_FLOOR,
"spectral_damping": float(cfg.get("spectral_damping", 1.0)),
"entries": manifest_entries},
indent=2).encode())
with _LK:
STATE["status"] = "done"; STATE["overall_ratio"] = overall
log_line(f"=== DONE overall={overall:.2f}x T={nd} R={nr} S={nsk} SNR-fails={nsnr} === GWP")
flush_log(base)
def quant_worker(cfg):
global _RUNNING
with _LK:
if _RUNNING:
return
_RUNNING = True
STATE.update(status="running", started=_ts(), failures=0)
base = os.path.join(cfg["dest"], cfg["prefix"].strip("/"))
try:
attempt = 0
while attempt < MAX_RESTARTS:
try:
_one_pass(cfg); break
except Exception as e:
with _LK:
STATE["failures"] += 1; STATE["status"] = "retrying"
tb = traceback.format_exc().strip().splitlines()[-1]
log_line(f"!! FAILURE #{STATE['failures']} (attempt {attempt+1}): {e}")
log_line(f" reason: {tb}")
flush_log(base)
attempt += 1
wait = min(60, 5 * attempt)
log_line(f" auto-restart in {wait}s (resume skips finished shards)")
time.sleep(wait)
with _LK:
if STATE["status"] != "done":
STATE["status"] = "stopped"
finally:
with _LK:
_RUNNING = False
def _auto_pick():
"""Smart auto-pick with sibling-folder support for same-bucket source/output layouts."""
# Hard override via environment variables (Space Settings -> Variables)
env_src = os.environ.get("TH_SRC", "").strip()
env_dest = os.environ.get("TH_DEST", "").strip()
env_prefix = os.environ.get("TH_PREFIX", "therium_15x").strip()
if env_src and env_dest and os.path.isdir(env_src) and os.path.isdir(env_dest):
log_line(f"AUTO-START via env vars: src={env_src} dest={env_dest} prefix={env_prefix}")
return {"src": env_src, "dest": env_dest, "prefix": env_prefix,
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
mounts = list_mounts()
existing_runs = find_existing_runs(mounts)
sources = scan_source_dirs()
sources_with_counts = [(s, len(_shards_in(s))) for s in sources]
sources_with_counts = [(s, c) for s, c in sources_with_counts if c > 0]
if not sources_with_counts:
log_line("AUTO-START abort: no source dirs with readable .safetensors found.")
return None
sources_with_counts.sort(key=lambda x: x[1], reverse=True)
src = sources_with_counts[0][0]
# --- SIBLING RESUME: catch layouts like /data/data (source) + /data/model.index (output) ---
sibling_outputs = find_sibling_outputs(src)
# Prefer siblings that are also marked by existing run artifacts
for sib_path, sib_rel in sibling_outputs:
if sib_path in existing_runs:
parent = os.path.dirname(src)
log_line(f"AUTO-START sibling-resume (run detected): src={src} dest={parent} prefix={sib_rel}")
return {"src": src, "dest": parent, "prefix": sib_rel,
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
# Fallback: any non-source sibling with files
if sibling_outputs:
sib_path, sib_rel = sibling_outputs[0]
parent = os.path.dirname(src)
log_line(f"AUTO-START sibling-output: src={src} dest={parent} prefix={sib_rel}")
return {"src": src, "dest": parent, "prefix": sib_rel,
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
# --- STANDARD RESUME: look for existing runs elsewhere ---
for run_dir in existing_runs:
if run_dir == src or run_dir.startswith(src + os.sep):
continue
for m in sorted(mounts, key=len, reverse=True):
if run_dir.startswith(m):
rel = os.path.relpath(run_dir, m)
prefix = rel if rel != "." else "therium_15x"
log_line(f"AUTO-START resume detected: src={src} dest={m} prefix={prefix}")
return {"src": src, "dest": m, "prefix": prefix,
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
log_line(f"AUTO-START resume detected: src={src} dest={run_dir} prefix=''")
return {"src": src, "dest": run_dir, "prefix": "",
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
# --- NEW RUN: pick any writable mount, same mount allowed ---
writ = list_writable_dirs()
preferred = [w for w in writ if os.path.normpath(w) != os.path.normpath(src)]
if not preferred:
preferred = writ
if not preferred:
log_line("AUTO-START abort: no writable destination found.")
return None
dest = preferred[0]
log_line(f"AUTO-START new run: src={src} dest={dest}")
return {"src": src, "dest": dest, "prefix": "therium_15x",
"target": float(os.environ.get("TH_TARGET", "10")),
"anchor": int(os.environ.get("TH_ANCHOR", "12")),
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": True}
def start_manual(src, dest, prefix, target, anchor, resume):
if not src or not dest:
return "pick a source model folder and an output bucket from the dropdowns first."
with _LK:
running = _RUNNING
if running:
return "already running — watch the status panel (auto-refreshes)."
# If user selected an existing run directory directly, zero out prefix so we don't nest
if os.path.isdir(dest) and (
any(os.path.exists(os.path.join(dest, f)) for f in ['manifest.json', 'therium_run.log'])
or any(f.endswith('.tza') for f in os.listdir(dest))
):
prefix = ""
cfg = {"src": src, "dest": dest, "prefix": prefix or "therium_15x",
"target": target, "anchor": anchor,
"spectral_damping": float(os.environ.get("TH_SPECTRAL", "0.95")),
"resume": resume}
threading.Thread(target=quant_worker, args=(cfg,), daemon=True).start()
return "STARTED in background. Keeps running even if you close this tab. Status below + saved to the bucket."
def get_status():
with _LK:
st = dict(STATE); tail = "\n".join(LOG[-80:])
return (f"status={st['status']} shards {st['shards_done']}/{st['shards_total']} "
f"tensors[T={st['tensors_therium']} R={st['tensors_raw']} S={st['tensors_skip']}] "
f"overall={st['overall_ratio']:.2f}x failures={st['failures']} "
f"last: SNR={st['last_snr']:.1f}dB cos={st['last_cos']:.4f} SNR-fails={st['snr_fails']}\n"
f"src: {st['src']}\nout: {st['dest']}\ncurrent: {st['current_shard']} updated: {st['last_update']}\n"
+ "-" * 60 + "\n" + tail)
def rescan():
srcs = scan_source_dirs()
dests = list_writable_dirs()
runs = find_existing_runs(list_mounts())
# Also show sibling outputs for each source
sib_lines = []
for s in srcs[:3]:
sibs = find_sibling_outputs(s)
if sibs:
sib_lines.append(f" siblings of {s}: {', '.join(r for _, r in sibs)}")
msg = (f"mounts: {list_mounts()}\n"
f"sources: {srcs or 'NONE'}\n"
f"writable mounts: {dests or 'NONE'}\n"
f"existing runs: {runs or 'NONE'}\n"
f"{' | '.join(sib_lines)}")
dest_choices = sorted(set(dests + runs))
default_dest = runs[0] if runs else (dests[0] if dests else None)
return (gr.update(choices=srcs, value=(srcs[0] if srcs else None)),
gr.update(choices=dest_choices, value=default_dest),
msg)
_init_srcs = scan_source_dirs()
_init_dests = list_writable_dirs()
_init_runs = find_existing_runs(list_mounts())
_init_dest_choices = sorted(set(_init_dests + _init_runs))
_init_dest_val = _init_runs[0] if _init_runs else (_init_dests[0] if _init_dests else None)
with gr.Blocks(title="TITAN PROTOCOL (Qwen-40B) - Pythagorean v4.3") as demo:
gr.Markdown(
"# TITAN PROTOCOL (Qwen-40B) - Pythagorean · v4.3\n"
"Reads your **mounted buckets as folders** — pick from the dropdowns, no typing. "
"Auto-start, auto-restart, auto-log to the output bucket. _GWP_ \n"
"**v4.3:** SNR-visible + spectrally-damped. Every tensor prints SNR/cosine. "
"SNR floor auto-fallback. Spectral damping for high-anchor stability. \n"
"*Override: set `TH_SRC`, `TH_DEST`, `TH_PREFIX`, `TH_TARGET`, `TH_ANCHOR`, `TH_SPECTRAL`, `TH_SNR_FLOOR` in Space Settings → Variables.*"
)
with gr.Row():
with gr.Column():
src = gr.Dropdown(choices=_init_srcs, value=(_init_srcs[0] if _init_srcs else None),
label="1. Source model (auto-found folders with real safetensors)")
dest = gr.Dropdown(choices=_init_dest_choices, value=_init_dest_val,
label="2. Output folder (writable mount, or pick an existing run to resume)")
prefix = gr.Textbox(label="Output subfolder / prefix", value="therium_15x")
with gr.Row():
target = gr.Slider(4, 24, value=10, step=1, label="Target ratio (~x)")
anchor = gr.Slider(0, 16, value=12, step=1, label="SVD anchor rank")
resume = gr.Checkbox(value=True, label="Resume (skip finished shards)")
with gr.Row():
rescan_btn = gr.Button("Rescan mounts")
go = gr.Button("Start quant", variant="primary")
mounts_info = gr.Textbox(label="Detected mounts, sources & siblings", lines=6)
with gr.Column():
status = gr.Textbox(label="Live status (auto-refreshes; also saved to output bucket)",
lines=28, max_lines=28, autoscroll=True)
rescan_btn.click(rescan, outputs=[src, dest, mounts_info])
go.click(start_manual, inputs=[src, dest, prefix, target, anchor, resume], outputs=status)
timer = gr.Timer(4.0)
timer.tick(get_status, outputs=status)
demo.load(get_status, outputs=status)
# auto-start on boot
_BOOT = _auto_pick()
if _BOOT:
log_line(f"AUTO-START triggered on boot.")
threading.Thread(target=quant_worker, args=(_BOOT,), daemon=True).start()
if __name__ == "__main__":
demo.queue().launch()

Xet Storage Details

Size:
24.7 kB
·
Xet hash:
2f486303a96828dadee7e70ae8b29d22e66bd1cad8e71d117bff4afd863ab882

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.