TuringsSolutions's picture
Update app.py
888aff8 verified
# app.py — Trading Card Trainer (CHR) + Carrier Upload + Robust Stego Embed
# + Full-Bleed Trading Card UI (replaces add_card_text)
# + Rarity + Type
# + Subtle Foil Outer Frame + Rounded Corners
#
# Goal: dataset -> compressed payload -> visually “wow” card (carrier + holo overlay + full-bleed stats)
# -> embed payload into PNG pixels (self-describing LSB stego)
# Training reads ONLY the final card PNG pixels (extracts embedded payload)
import os
# --- HF Spaces hardening: make OpenMP thread env vars valid integers ---
def _set_int_env(name: str, value: int):
v = os.environ.get(name, "")
if not str(v).isdigit():
os.environ[name] = str(value)
_set_int_env("OMP_NUM_THREADS", 1)
_set_int_env("OPENBLAS_NUM_THREADS", 1)
_set_int_env("MKL_NUM_THREADS", 1)
_set_int_env("NUMEXPR_NUM_THREADS", 1)
import io, re, json, math, struct, tempfile, traceback, hashlib, zlib
from pathlib import Path
from typing import List, Tuple, Dict, Optional
import numpy as np
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import imageio.v2 as imageio
from PIL import Image
# -----------------------------
# Optional DOCX support
# -----------------------------
_DOCX_OK = False
try:
from docx import Document
_DOCX_OK = True
except Exception:
_DOCX_OK = False
# -----------------------------
# Embeddings: sentence-transformers (preferred), fallback to hashing
# -----------------------------
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.decomposition import PCA
_ST_MODEL = None
def _load_st_model():
global _ST_MODEL
if _ST_MODEL is not None:
return _ST_MODEL
try:
from sentence_transformers import SentenceTransformer
_ST_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
return _ST_MODEL
except Exception:
return None
def embed_texts(texts: List[str], prefer_sentence_transformer: bool = True) -> Tuple[np.ndarray, str]:
texts = [t if isinstance(t, str) else str(t) for t in texts]
if prefer_sentence_transformer:
model = _load_st_model()
if model is not None:
try:
vecs = model.encode(
texts, batch_size=32, show_progress_bar=False,
convert_to_numpy=True, normalize_embeddings=True
)
return vecs.astype(np.float32), "sentence-transformers/all-MiniLM-L6-v2"
except Exception:
pass
hv = HashingVectorizer(n_features=768, alternate_sign=False, norm=None)
X = hv.transform(texts)
vecs = X.toarray().astype(np.float32)
norms = np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-9
vecs = vecs / norms
return vecs, "HashingVectorizer(768d) fallback"
# -----------------------------
# Text ingestion / splitting
# -----------------------------
def _basic_sentence_split(text: str) -> List[str]:
rough = re.split(r'[\n\r]+|(?<=[\.\!\?])\s+', text.strip())
out = []
for s in rough:
s = s.strip()
if s:
out.append(s)
return out
def read_txt_bytes(b: bytes) -> str:
try:
return b.decode("utf-8")
except Exception:
return b.decode("latin-1", errors="ignore")
def read_docx_bytes(b: bytes) -> List[str]:
if not _DOCX_OK:
raise RuntimeError("python-docx not installed in this Space.")
bio = io.BytesIO(b)
doc = Document(bio)
paras = [p.text.strip() for p in doc.paragraphs]
return [p for p in paras if p and not p.isspace()]
def to_units(raw_text: str, mode: str) -> List[str]:
raw_text = raw_text.strip()
if not raw_text:
return []
if mode == "sentences":
return _basic_sentence_split(raw_text)
paras = [p.strip() for p in re.split(r"\n\s*\n+", raw_text) if p.strip()]
return paras
# -----------------------------
# Demo corpus (big enough to always train)
# -----------------------------
DEMO_CORPUS = """
In the beginning, people stored knowledge in libraries, then in databases, and now in neural networks.
Compression isn’t just saving space — it’s choosing what matters.
A constellation is a pattern you can navigate.
Entropy is a measure of surprise, and learning is surprise turning into structure.
A system that learns from compressed data never needs the original.
It doesn’t memorize pixels; it memorizes geometry.
It doesn’t hoard text; it extracts signals.
The question isn’t “Can it compress?” but “Can it learn after compressing?”
Investors love seeing systems move.
They love curves that fall.
They love maps that cluster.
They love a demo that feels alive.
This demo builds a codec from your dataset,
then trains a model exclusively on the encoded artifact inside a single trading card image.
No raw text is used during training.
Only the trading card exists.
We call the clusters constellations.
We call the structure harvestable.
We call the drop in entropy visible proof.
"""
# -----------------------------
# CHR core
# -----------------------------
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
ex = np.exp(x)
return ex / (np.sum(ex, axis=axis, keepdims=True) + 1e-9)
def global_range_entropy(p: np.ndarray) -> float:
m = p.mean(axis=0)
m_safe = np.clip(m, 1e-12, None)
return float(-(m_safe * np.log(m_safe)).sum())
def soft_slab_entropy(z: np.ndarray, U: np.ndarray, bins: int = 8, tau: float = 5.0) -> float:
t = z @ U.T
K = U.shape[0]
Hs = []
for j in range(K):
tj = t[:, j]
tmin, tmax = float(tj.min()), float(tj.max())
if not np.isfinite(tmin) or not np.isfinite(tmax) or tmax - tmin < 1e-6:
Hs.append(0.0)
continue
centers = np.linspace(tmin, tmax, bins)
dist2 = (tj[:, None] - centers[None, :]) ** 2
weights = softmax(-tau * dist2, axis=1)
hist = weights.mean(axis=0)
hist = np.clip(hist, 1e-12, None)
H = float(-(hist * np.log(hist)).sum())
Hs.append(H)
return float(np.mean(Hs)) if Hs else 0.0
def kmeans_plus_plus_init(z: np.ndarray, K: int, rng: np.random.RandomState) -> np.ndarray:
N, d = z.shape
inds = [rng.randint(0, N)]
centers = [z[inds[0]]]
cos0 = np.clip(z @ centers[0], -1.0, 1.0)
d2 = np.clip(1.0 - cos0, 1e-12, None)
for _ in range(1, K):
s = d2.sum()
if not np.isfinite(s) or s <= 0:
probs = np.full(N, 1.0 / N)
else:
probs = np.clip(d2 / s, 0.0, None)
probs = probs / (probs.sum() + 1e-12)
next_idx = rng.choice(N, p=probs)
inds.append(next_idx)
centers.append(z[next_idx])
cos_new = np.clip(z @ z[next_idx], -1.0, 1.0)
d2 = np.minimum(d2, np.clip(1.0 - cos_new, 1e-12, None))
U = np.stack(centers, axis=0)
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
return U
def chr_optimize(z: np.ndarray, K: int = 8, iters: int = 30, beta: float = 12.0,
bins: int = 8, tau: float = 5.0, seed: int = 42):
rng = np.random.RandomState(seed)
N, d = z.shape
U = kmeans_plus_plus_init(z, K, rng) if N >= K else np.pad(z, ((0, max(0, K - N)), (0, 0)), mode="wrap")[:K]
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
logits0 = beta * (z @ U.T)
p0 = softmax(logits0, axis=1)
Hg_traj = [global_range_entropy(p0)]
Hs_traj = [soft_slab_entropy(z, U, bins=bins, tau=tau)]
for _ in range(iters):
logits = beta * (z @ U.T)
p = softmax(logits, axis=1)
numer = p.T @ z
denom = p.sum(axis=0)[:, None] + 1e-9
U = numer / denom
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
Hg_traj.append(global_range_entropy(p))
Hs_traj.append(soft_slab_entropy(z, U, bins=bins, tau=tau))
logits = beta * (z @ U.T)
p = softmax(logits, axis=1)
return U, p, np.array(Hg_traj), np.array(Hs_traj)
def compute_mhep(Hg_traj: np.ndarray, Hs_traj: np.ndarray, K: int, bins: int, w_g: float = 0.7, w_s: float = 0.3) -> float:
if len(Hg_traj) < 2 or len(Hs_traj) < 2:
return 0.0
maxHg = math.log(max(K, 2))
maxHs = math.log(max(bins, 2))
drop_g = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (maxHg + 1e-9)
drop_s = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (maxHs + 1e-9)
return float(np.clip(100.0 * (w_g * drop_g + w_s * drop_s), 0.0, 100.0))
# -----------------------------
# CHR → discrete "compressed" byte stream (codes.bin payload)
# -----------------------------
def make_radial_bins(radials: np.ndarray, B: int = 64) -> np.ndarray:
edges = np.quantile(radials, np.linspace(0, 1, B + 1))
for i in range(1, len(edges)):
if edges[i] <= edges[i - 1]:
edges[i] = edges[i - 1] + 1e-6
return edges.astype(np.float32)
def quantize_radial(r: float, edges: np.ndarray) -> int:
b = np.searchsorted(edges, r, side="right") - 1
return int(np.clip(b, 0, len(edges) - 2))
def pack_codes_to_bytes(labels: np.ndarray, bins: np.ndarray) -> bytes:
out = bytearray()
for c, b in zip(labels.tolist(), bins.tolist()):
out.append(int(c) & 0xFF)
out.append(int(b) & 0xFF)
return bytes(out)
def save_codes_and_codec(code_bytes: bytes, codec: Dict, out_dir: str) -> Tuple[str, str]:
os.makedirs(out_dir, exist_ok=True)
bin_path = os.path.join(out_dir, "codes.bin")
meta_path = os.path.join(out_dir, "codec.json")
with open(bin_path, "wb") as f:
f.write(b"CHRC")
f.write(struct.pack("<I", 1))
f.write(code_bytes)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(codec, f, indent=2)
return bin_path, meta_path
# -----------------------------
# Trading Card payload (self-contained)
# -----------------------------
CARD_MAGIC = b"TCAR"
CARD_VER = 1
def _sha256_hex(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
def _crc32_u32(b: bytes) -> int:
return zlib.crc32(b) & 0xFFFFFFFF
def pack_trading_card_payload(code_bytes: bytes, codec: Dict, title: str = "CHR Trading Card") -> bytes:
"""
Payload layout:
CARD_MAGIC(4) | ver(u32) | header_len(u32) | header_json | code_len(u32) | code_bytes
"""
header = {
"title": title,
"codec": {
"backend": codec.get("backend"),
"K": codec.get("K"),
"radial_bins": codec.get("radial_bins"),
"iters": codec.get("iters"),
"beta": codec.get("beta"),
"slab_bins": codec.get("slab_bins"),
"tau": codec.get("tau"),
"seed": codec.get("seed"),
},
"units_count": codec.get("units_count"),
"bytes_per_unit": codec.get("bytes_per_unit"),
"code_len": int(len(code_bytes)),
"crc32": int(_crc32_u32(code_bytes)),
"sha256": _sha256_hex(code_bytes),
}
header_json = json.dumps(header, ensure_ascii=False).encode("utf-8")
blob = bytearray()
blob += CARD_MAGIC
blob += struct.pack("<I", CARD_VER)
blob += struct.pack("<I", len(header_json))
blob += header_json
blob += struct.pack("<I", len(code_bytes))
blob += code_bytes
return bytes(blob)
def unpack_trading_card_payload(payload: bytes) -> Tuple[Dict, bytes]:
if len(payload) < 16:
raise ValueError("Payload too small.")
if payload[:4] != CARD_MAGIC:
raise ValueError("Payload magic not found.")
ver = struct.unpack("<I", payload[4:8])[0]
if ver != CARD_VER:
raise ValueError(f"Unsupported payload version: {ver}")
hlen = struct.unpack("<I", payload[8:12])[0]
off = 12
header_json = payload[off:off+hlen]
off += hlen
header = json.loads(header_json.decode("utf-8"))
clen = struct.unpack("<I", payload[off:off+4])[0]
off += 4
code_bytes = payload[off:off+clen]
if len(code_bytes) != clen:
raise ValueError("Payload truncated.")
return header, code_bytes
# -----------------------------
# Robust LSB Stego (self-describing + auto-detect)
# -----------------------------
STEGO_MAGIC = b"STEG"
STEGO_VER = 1
def _bytes_to_bits_le(data: bytes) -> np.ndarray:
arr = np.frombuffer(data, dtype=np.uint8)
bits = ((arr[:, None] >> np.arange(8)) & 1).astype(np.uint8)
return bits.reshape(-1)
def _bits_to_bytes_le(bits: np.ndarray) -> bytes:
bits = bits.astype(np.uint8).reshape(-1, 8)
vals = np.sum(bits * (1 << np.arange(8, dtype=np.uint8))[None, :], axis=1).astype(np.uint8)
return bytes(vals.tobytes())
def load_image_rgb(path: str) -> np.ndarray:
img = Image.open(path).convert("RGB")
return np.array(img, dtype=np.uint8)
def save_png(rgb: np.ndarray, path: str):
Image.fromarray(rgb.astype(np.uint8), mode="RGB").save(path, format="PNG", compress_level=6)
def _capacity_bytes(rgb: np.ndarray, bits_per_channel: int) -> int:
H, W, C = rgb.shape
return (H * W * C * int(bits_per_channel)) // 8
def _pack_stego_header(payload: bytes, bits_per_channel: int) -> bytes:
"""
Header format:
magic(4) | ver(u8) | bpc(u8) | payload_len(u32) | crc32(u32)
Total = 14 bytes
"""
bpc = int(bits_per_channel)
if not (1 <= bpc <= 4):
raise ValueError("bits_per_channel must be 1..4")
plen = int(len(payload))
crc = int(_crc32_u32(payload))
return STEGO_MAGIC + struct.pack("<BBII", STEGO_VER, bpc, plen, crc)
def _unpack_stego_header(hdr: bytes):
if len(hdr) < 14:
raise ValueError("Stego header too small.")
if hdr[:4] != STEGO_MAGIC:
raise ValueError("Stego magic not found.")
ver, bpc, plen, crc = struct.unpack("<BBII", hdr[4:14])
return int(ver), int(bpc), int(plen), int(crc)
def embed_payload_lsb_rgb(carrier_rgb: np.ndarray, payload: bytes, bits_per_channel: int) -> np.ndarray:
rgb = carrier_rgb.copy().astype(np.uint8)
bpc = int(bits_per_channel)
header = _pack_stego_header(payload, bpc)
blob = header + payload
cap = _capacity_bytes(rgb, bpc)
if len(blob) > cap:
raise ValueError(f"Carrier too small: need {len(blob)} bytes, capacity {cap} bytes at {bpc} bits/channel.")
bits = _bytes_to_bits_le(blob)
pad = (-len(bits)) % bpc
if pad:
bits = np.concatenate([bits, np.zeros(pad, dtype=np.uint8)], axis=0)
bits_g = bits.reshape(-1, bpc)
vals = np.sum(bits_g * (1 << np.arange(bpc, dtype=np.uint8))[None, :], axis=1).astype(np.uint8)
flat = rgb.reshape(-1)
mask = np.uint8(0xFF ^ ((1 << bpc) - 1))
flat[:len(vals)] = (flat[:len(vals)] & mask) | vals
return rgb
def _extract_lsb_stream(rgb: np.ndarray, bits_per_channel: int, n_bytes: int) -> bytes:
bpc = int(bits_per_channel)
flat = rgb.reshape(-1).astype(np.uint8)
n_bits = n_bytes * 8
n_vals = int(np.ceil(n_bits / bpc))
vals = flat[:n_vals] & np.uint8((1 << bpc) - 1)
bits = ((vals[:, None] >> np.arange(bpc)) & 1).astype(np.uint8).reshape(-1)
bits = bits[:n_bits]
return _bits_to_bytes_le(bits)
def extract_payload_auto(rgb: np.ndarray, max_bits: int = 4) -> Tuple[bytes, Dict]:
for bpc in range(1, int(max_bits) + 1):
try:
hdr = _extract_lsb_stream(rgb, bpc, n_bytes=14)
ver, hdr_bpc, plen, crc = _unpack_stego_header(hdr)
if ver != STEGO_VER or hdr_bpc != bpc:
continue
blob = _extract_lsb_stream(rgb, bpc, n_bytes=14 + plen)
payload = blob[14:]
if len(payload) != plen:
continue
if _crc32_u32(payload) != crc:
continue
return payload, {
"stego_version": ver,
"bits_per_channel": bpc,
"payload_len": plen,
"payload_crc32": f"{crc:08x}",
"verified": True
}
except Exception:
continue
raise ValueError("Stego magic not found (wrong image or bits_per_channel, or image was altered).")
def choose_embed_params(carrier_rgb: np.ndarray, payload_len: int) -> Tuple[int, np.ndarray]:
"""
Investor-proof: try bpc=1..4. If still too small, upscale the carrier (nearest) until it fits.
Returns (bpc, possibly_resized_rgb)
"""
rgb = carrier_rgb
for _ in range(3): # at most 3 upscales
for bpc in (1, 2, 3, 4):
cap = _capacity_bytes(rgb, bpc)
if payload_len + 14 <= cap:
return bpc, rgb
H, W, _ = rgb.shape
newW = min(1024, int(W * 1.5) + 1)
newH = min(1024, int(H * 1.5) + 1)
pil = Image.fromarray(rgb, mode="RGB").resize((newW, newH), resample=Image.NEAREST)
rgb = np.array(pil, dtype=np.uint8)
raise ValueError("Carrier image too small even after upscaling. Use a larger carrier image (>=1024px).")
# -----------------------------
# “Holo overlay” visuals (sizzle)
# -----------------------------
def make_holo_texture(seed: int, W: int, H: int) -> np.ndarray:
rng = np.random.RandomState(int(seed) + 123)
yy, xx = np.mgrid[0:H, 0:W].astype(np.float32)
xx = (xx / max(1, W-1) - 0.5) * 2.0
yy = (yy / max(1, H-1) - 0.5) * 2.0
out = np.zeros((H, W, 3), dtype=np.float32)
freqs = rng.uniform(2.0, 10.0, size=12).astype(np.float32)
phases = rng.uniform(0, 2*np.pi, size=12).astype(np.float32)
weights = rng.uniform(0.6, 1.2, size=12).astype(np.float32)
for c in range(3):
acc = np.zeros((H, W), dtype=np.float32)
for i in range(12):
a = rng.uniform(-1.0, 1.0)
b = rng.uniform(-1.0, 1.0)
acc += weights[i] * np.cos(freqs[i] * (a*xx + b*yy) + phases[i] + c*0.7)
acc = (acc - acc.min()) / (acc.max() - acc.min() + 1e-9)
out[..., c] = acc
rr = np.sqrt(xx*xx + yy*yy)
vignette = np.clip(1.1 - 0.35*rr, 0.6, 1.1)
out *= vignette[..., None]
out = np.clip(out, 0.0, 1.0)
return (out * 255.0).astype(np.uint8)
def blend_holo(carrier_rgb: np.ndarray, holo_rgb: np.ndarray, alpha: float = 0.35) -> np.ndarray:
alpha = float(np.clip(alpha, 0.0, 1.0))
c = carrier_rgb.astype(np.float32)
h = holo_rgb.astype(np.float32)
out = (1.0 - alpha) * c + alpha * h
return np.clip(out, 0, 255).astype(np.uint8)
def lsb_preview(rgb: np.ndarray, bits: int = 2) -> np.ndarray:
b = int(np.clip(bits, 1, 4))
mask = (1 << b) - 1
v = (rgb & mask).astype(np.uint8)
v = (v.astype(np.float32) / float(mask)) * 255.0
return np.clip(v, 0, 255).astype(np.uint8)
# -----------------------------
# Trading card stats + rarity/type + full-bleed renderer
# -----------------------------
def compute_rarity(mhep: float) -> str:
m = float(mhep)
if m < 40:
return "COMMON"
if m < 65:
return "RARE"
if m < 85:
return "EPIC"
return "LEGENDARY"
def short_backend(backend: str) -> str:
b = (backend or "").lower()
if "minilm" in b:
return "MiniLM"
if "hashingvectorizer" in b or "hashing" in b:
return "HASH"
return (backend or "EMB")[:8]
def _clamp01(x: float) -> float:
return float(np.clip(x, 0.0, 1.0))
def _score_0_99(x01: float) -> int:
return int(np.clip(round(99.0 * _clamp01(x01)), 0, 99))
def compute_card_stats(header_tc: Dict,
mhep: float,
Hg_traj: Optional[np.ndarray] = None,
Hs_traj: Optional[np.ndarray] = None,
payload_len: int = 0,
code_bytes_len: int = 0,
bits_per_channel: Optional[int] = None,
img_shape: Optional[Tuple[int,int,int]] = None) -> Dict[str, int]:
units = int(header_tc.get("units_count", 0) or 0)
K = int(header_tc.get("codec", {}).get("K", 0) or 0)
radial_bins = int(header_tc.get("codec", {}).get("radial_bins", 0) or 0)
ent_drop = 0.0
slab_drop = 0.0
if Hg_traj is not None and len(Hg_traj) >= 2:
ent_drop = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (abs(float(Hg_traj[0])) + 1e-9)
if Hs_traj is not None and len(Hs_traj) >= 2:
slab_drop = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (abs(float(Hs_traj[0])) + 1e-9)
bpu = (float(code_bytes_len) / max(1, units)) if units > 0 else 0.0
density01 = 1.0 - math.exp(-bpu / 2.0)
cc01 = 1.0 - math.exp(- (K / 12.0) * (radial_bins / 64.0))
if bits_per_channel is None:
embed_ease01 = 0.5
else:
embed_ease01 = _clamp01((bits_per_channel - 1) / 3.0)
stability01 = 0.5
if img_shape is not None:
H, W, _ = img_shape
px = float(H * W)
stability01 = _clamp01(1.0 - math.exp(-px / (512.0 * 512.0)))
harvest = int(np.clip(round(float(mhep)), 0, 99))
signal = _score_0_99(0.65 * ent_drop + 0.35 * slab_drop)
density = _score_0_99(density01)
constellation = _score_0_99(cc01)
if payload_len <= 0:
integrity01 = 0.85
else:
integrity01 = _clamp01(0.95 - (payload_len / 2_000_000.0))
integrity = _score_0_99(integrity01)
embed = _score_0_99(0.55 * embed_ease01 + 0.45 * stability01)
scope01 = _clamp01(math.log10(max(10.0, float(units))) / 4.0)
scope = _score_0_99(scope01)
return {
"HARVEST": harvest,
"SIGNAL": signal,
"DENSITY": density,
"CONST": constellation,
"INTEG": integrity,
"EMBED": embed,
"SCOPE": scope,
}
def _rounded_mask(W: int, H: int, radius: int) -> np.ndarray:
# boolean mask of pixels inside rounded rect
r = int(max(2, radius))
yy, xx = np.mgrid[0:H, 0:W]
inside = np.ones((H, W), dtype=np.uint8)
# corners: (x,y) distances to corner centers
# top-left
cx, cy = r, r
tl = (xx < r) & (yy < r)
inside[tl] = (((xx[tl]-cx)**2 + (yy[tl]-cy)**2) <= (r*r)).astype(np.uint8)
# top-right
cx, cy = W - r - 1, r
tr = (xx > W - r - 1) & (yy < r)
inside[tr] = (((xx[tr]-cx)**2 + (yy[tr]-cy)**2) <= (r*r)).astype(np.uint8)
# bottom-left
cx, cy = r, H - r - 1
bl = (xx < r) & (yy > H - r - 1)
inside[bl] = (((xx[bl]-cx)**2 + (yy[bl]-cy)**2) <= (r*r)).astype(np.uint8)
# bottom-right
cx, cy = W - r - 1, H - r - 1
br = (xx > W - r - 1) & (yy > H - r - 1)
inside[br] = (((xx[br]-cx)**2 + (yy[br]-cy)**2) <= (r*r)).astype(np.uint8)
return inside.astype(bool)
def apply_rounded_corners(rgb: np.ndarray, radius: int = 28, bg=(8, 8, 12)) -> np.ndarray:
"""
Since we save RGB PNG (no alpha), we simulate rounded corners by blending corners into a dark bg.
"""
H, W, _ = rgb.shape
mask = _rounded_mask(W, H, radius)
out = rgb.copy().astype(np.float32)
bgv = np.array(bg, dtype=np.float32)[None, None, :]
m = mask[:, :, None].astype(np.float32)
out = m * out + (1.0 - m) * bgv
return np.clip(out, 0, 255).astype(np.uint8)
def add_subtle_foil_frame(rgb: np.ndarray, radius: int = 28, thickness: int = 10, seed: int = 0) -> np.ndarray:
"""
Adds a subtle "foil" frame by brightening a rounded-rect ring with a shimmering pattern.
"""
H, W, _ = rgb.shape
r = int(max(6, radius))
t = int(max(4, thickness))
rng = np.random.RandomState(int(seed) + 999)
outer = _rounded_mask(W, H, r)
inner = _rounded_mask(W, H, max(2, r - t))
ring = outer & (~inner)
yy, xx = np.mgrid[0:H, 0:W].astype(np.float32)
xxn = xx / max(1.0, W-1.0)
yyn = yy / max(1.0, H-1.0)
# shimmering scalar field (no explicit colors; just modulate brightness)
phase = rng.uniform(0.0, 2*np.pi)
shimmer = (0.5 + 0.5*np.sin(2*np.pi*(2.2*xxn + 1.6*yyn) + phase)).astype(np.float32)
shimmer2 = (0.5 + 0.5*np.sin(2*np.pi*(3.3*xxn - 2.1*yyn) + phase*0.7)).astype(np.float32)
s = 0.6*shimmer + 0.4*shimmer2 # 0..1
out = rgb.astype(np.float32)
# Brightness boost map inside ring
boost = (22.0 + 28.0 * s) # ~22..50
boost = boost[:, :, None]
ring_f = ring[:, :, None].astype(np.float32)
out = out + ring_f * boost
# Add a very thin inner highlight line
inner2 = _rounded_mask(W, H, max(2, r - t - 2))
ring2 = inner & (~inner2)
out = out + ring2[:, :, None].astype(np.float32) * 18.0
return np.clip(out, 0, 255).astype(np.uint8)
def draw_full_bleed_stats(card_rgb: np.ndarray,
title: str,
rarity: str,
card_type: str,
stats: Dict[str, int],
subtitle_left: str,
subtitle_right: str,
holo_alpha: float = 0.35,
radius: int = 28,
seed: int = 0) -> np.ndarray:
"""
Full-bleed trading-card overlay:
- Rounded corners + subtle foil frame
- Top title band with rarity badge + type
- Bottom stat slab with bars
"""
try:
# Base: apply rounded corners first (so UI feels like it's inside a card)
base = apply_rounded_corners(card_rgb, radius=radius, bg=(8, 8, 12))
# Subtle foil frame
base = add_subtle_foil_frame(base, radius=radius, thickness=10, seed=seed)
pil = Image.fromarray(base, mode="RGB")
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(pil)
font = ImageFont.load_default()
W, H = pil.size
# ---- Top band ----
top_h = int(max(44, H * 0.12))
draw.rounded_rectangle([10, 10, W-10, 10 + top_h],
radius=18,
fill=(0, 0, 0),
outline=(220, 220, 255),
width=2)
# Title
draw.text((22, 18), (title or "DATA PLAYING CARD")[:60], fill=(255, 255, 255), font=font)
# Subtitles (left/right)
draw.text((22, 18 + 16), (subtitle_left or "")[:60], fill=(230, 230, 230), font=font)
# Right aligned subtitle (approximate)
sr = (subtitle_right or "")[:60]
tw = len(sr) * 6 # default font approx
draw.text((max(22, W - 22 - tw), 18 + 16), sr, fill=(230, 230, 230), font=font)
# Rarity badge
badge_w = 92
badge_h = 18
bx1 = W - 20
by0 = 14
bx0 = bx1 - badge_w
by1 = by0 + badge_h
draw.rounded_rectangle([bx0, by0, bx1, by1],
radius=8,
fill=(0, 0, 0),
outline=(255, 255, 255),
width=1)
draw.text((bx0 + 8, by0 + 4), rarity[:12], fill=(255, 255, 255), font=font)
# Type tag (under badge)
ty = by1 + 4
tag = (card_type or "CHR")[:14]
tw2 = len(tag) * 6
draw.text((max(22, W - 22 - tw2), ty), tag, fill=(255, 255, 255), font=font)
# ---- Bottom stat slab ----
slab_h = int(max(130, H * 0.26))
y0 = H - slab_h - 12
draw.rounded_rectangle([10, y0, W-10, H-10],
radius=18,
fill=(0, 0, 0),
outline=(220, 220, 255),
width=2)
# Stat rows
items = list(stats.items())
# Keep consistent ordering
preferred = ["HARVEST", "SIGNAL", "DENSITY", "CONST", "INTEG", "EMBED", "SCOPE"]
items_sorted = []
d = dict(items)
for k in preferred:
if k in d:
items_sorted.append((k, d[k]))
for k, v in items:
if k not in preferred:
items_sorted.append((k, v))
pad = 16
cx = 22
cy = y0 + 14
# Bar geometry
name_w = 66
bar_w = max(120, W - 10 - cx - name_w - 58)
bar_h = 10
row_gap = 18
for name, val in items_sorted:
val = int(np.clip(val, 0, 99))
draw.text((cx, cy), f"{name:>7}", fill=(255, 255, 255), font=font)
bx0 = cx + name_w
by0 = cy + 3
bx1 = bx0 + bar_w
by1 = by0 + bar_h
draw.rectangle([bx0, by0, bx1, by1], outline=(255, 255, 255), width=1)
fill_w = int(round((val / 99.0) * (bar_w - 2)))
if fill_w > 0:
draw.rectangle([bx0 + 1, by0 + 1, bx0 + 1 + fill_w, by1 - 1], fill=(255, 255, 255))
draw.text((bx1 + 10, cy), f"{val:02d}", fill=(255, 255, 255), font=font)
cy += row_gap
if cy > H - 18:
break
# Convert back
out = np.array(pil, dtype=np.uint8)
return out
except Exception:
return card_rgb
# -----------------------------
# Visual plots
# -----------------------------
def plot_entropy(Hg, Hs, out_path):
plt.figure(figsize=(6,4))
plt.plot(Hg, label="Global range entropy")
plt.plot(Hs, label="Slab entropy")
plt.xlabel("Iteration"); plt.ylabel("Entropy")
plt.title("Entropy drops during CHR compression")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_constellation_map(z, U, labels, out_path):
if z.shape[1] > 2:
pca = PCA(n_components=2, random_state=0)
Z2 = pca.fit_transform(z)
U2 = pca.transform(U)
else:
Z2, U2 = z, U
plt.figure(figsize=(6,5))
plt.scatter(Z2[:,0], Z2[:,1], s=14, alpha=0.8, c=labels)
plt.scatter(U2[:,0], U2[:,1], marker="*", s=200)
plt.title("Constellation map (compressed geometry)")
plt.xlabel("PC1"); plt.ylabel("PC2")
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_training_curves(losses, ppls, out_path):
plt.figure(figsize=(6,4))
plt.plot(losses, label="Loss")
plt.plot(ppls, label="Perplexity")
plt.xlabel("Checkpoint")
plt.title("Learning on trading card pixels")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_rollout_tracks(seq_bytes: List[int], out_path, title="Rollout (byte tokens)"):
plt.figure(figsize=(8,3.6))
plt.plot(seq_bytes, label="Byte value")
plt.ylim(-2, 260)
plt.xlabel("Step"); plt.title(title)
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_before_after_tracks(before_bytes: List[int], after_bytes: List[int], out_path):
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(before_bytes, label="Byte value")
plt.title("BEFORE (untrained)")
plt.ylim(-2, 260)
plt.legend()
plt.subplot(1,2,2)
plt.plot(after_bytes, label="Byte value")
plt.title("AFTER (trained)")
plt.ylim(-2, 260)
plt.legend()
plt.suptitle("Rollout comparison (trained on card pixels)")
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
# -----------------------------
# Training: tiny byte model reads ONLY final PNG pixels (extracts stego payload)
# -----------------------------
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
try:
torch.set_num_threads(1)
torch.set_num_interop_threads(1)
except Exception:
pass
class StegoPayloadDataset(Dataset):
"""
Builds next-byte prediction windows from the embedded payload extracted from PNG pixels.
"""
def __init__(self, card_png_path: str, block_size: int = 96):
self.card_png_path = card_png_path
self.block_size = int(block_size)
rgb = load_image_rgb(card_png_path)
payload, info = extract_payload_auto(rgb, max_bits=4)
self.info = info
self.payload = payload
self.bytes = torch.tensor(list(np.frombuffer(payload, dtype=np.uint8)), dtype=torch.long)
def __len__(self):
return max(0, len(self.bytes) - self.block_size - 1)
def __getitem__(self, idx):
x = self.bytes[idx:idx+self.block_size]
y = self.bytes[idx+1:idx+self.block_size+1]
return x, y
class TinyByteTransformer(nn.Module):
"""
Investor demo model: fast on CPU, faster on GPU.
"""
def __init__(self, vocab_size=256, d_model=96, n_layers=1, n_heads=4, block_size=96):
super().__init__()
self.tok = nn.Embedding(vocab_size, d_model)
self.pos = nn.Embedding(block_size, d_model)
enc_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=4*d_model,
dropout=0.1, batch_first=True
)
self.tr = nn.TransformerEncoder(enc_layer, num_layers=n_layers)
self.lm = nn.Linear(d_model, vocab_size)
self.block_size = int(block_size)
def forward(self, x):
B, T = x.shape
pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T)
h = self.tok(x) + self.pos(pos)
mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
h = self.tr(h, mask=mask)
return self.lm(h)
@torch.no_grad()
def sample_bytes(model, start: List[int], steps: int, device: str = "cpu", temperature: float = 1.0) -> List[int]:
model.eval()
seq = start[:]
steps = int(steps)
for _ in range(steps):
x = torch.tensor(seq[-model.block_size:], dtype=torch.long, device=device).unsqueeze(0)
logits = model(x)[0, -1] / max(1e-6, float(temperature))
probs = torch.softmax(logits, dim=-1)
nxt = int(torch.multinomial(probs, num_samples=1).item())
seq.append(nxt)
return seq
def train_on_stego_png(card_png_path: str,
steps: int = 200,
batch_size: int = 32,
block_size: int = 96,
lr: float = 7e-4,
device: str = "cpu",
log_every: int = 25):
ds = StegoPayloadDataset(card_png_path, block_size=block_size)
n_windows = len(ds)
if n_windows <= 0:
raise RuntimeError(f"Embedded payload too small for block_size={block_size}. Reduce block_size or embed more data.")
drop_last = n_windows >= batch_size
dl = DataLoader(ds, batch_size=batch_size, shuffle=True, drop_last=drop_last)
model = TinyByteTransformer(block_size=block_size).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()
losses, ppls = [], []
steps = int(steps)
log_every = max(1, int(log_every))
it = iter(dl)
model.train()
for step in range(1, steps+1):
try:
x, y = next(it)
except StopIteration:
it = iter(dl)
x, y = next(it)
x, y = x.to(device), y.to(device)
logits = model(x)
loss = loss_fn(logits.view(-1, 256), y.view(-1))
opt.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
opt.step()
if step % log_every == 0:
l = float(loss.detach().cpu().item())
ppl = float(torch.exp(loss.detach()).cpu().item())
losses.append(l)
ppls.append(ppl)
return model, ds.info, losses, ppls
# -----------------------------
# App state
# -----------------------------
STATE = {
"units": None,
# compression artifacts
"Z": None,
"U": None,
"labels": None,
"bins": None,
"codec": None,
"code_bytes": None,
"payload": None,
"payload_header": None,
# trajectories (for trading card stats)
"Hg": None,
"Hs": None,
"mhep": None,
"out_dir": None,
"bin_path": None,
"codec_path": None,
"entropy_plot": None,
"map_plot": None,
# card build
"carrier_path": None,
"preview_path": None,
"final_card_path": None,
"tilt_gif": None,
"lsb_preview_path": None,
"stego_info": None,
# model
"model": None,
}
def _bytes_from_upload(file_obj) -> Tuple[bytes, str]:
if file_obj is None:
return b"", ""
if isinstance(file_obj, str) and os.path.exists(file_obj):
return Path(file_obj).read_bytes(), os.path.basename(file_obj)
if hasattr(file_obj, "name") and isinstance(file_obj.name, str) and os.path.exists(file_obj.name):
return Path(file_obj.name).read_bytes(), os.path.basename(file_obj.name)
return b"", "upload"
def _path_from_upload(file_obj) -> Optional[str]:
if file_obj is None:
return None
if isinstance(file_obj, str) and os.path.exists(file_obj):
return file_obj
if hasattr(file_obj, "name") and isinstance(file_obj.name, str) and os.path.exists(file_obj.name):
return file_obj.name
return None
# -----------------------------
# Callbacks
# -----------------------------
def load_demo(units_mode: str):
raw = (DEMO_CORPUS.strip() + "\n\n") * 80
units = to_units(raw, units_mode)
units = [u.strip() for u in units if u.strip()]
STATE["units"] = units
return f"Loaded **{len(units)}** demo units (built-in corpus)."
def ingest_file(file_obj, units_mode: str):
try:
b, name = _bytes_from_upload(file_obj)
if not b:
return "Upload a .txt or .docx file to begin."
if name.lower().endswith(".docx"):
paras = read_docx_bytes(b)
raw = "\n\n".join(paras)
else:
raw = read_txt_bytes(b)
units = to_units(raw, units_mode)
units = [u.strip() for u in units if u.strip()]
if len(units) > 5000:
units = units[:5000]
STATE["units"] = units
return f"Loaded **{len(units)}** units from **{name}**."
except Exception as e:
return f"Error ingesting file: {e}"
def compress_build_payload(K, iters, beta, slab_bins, tau, seed, radial_bins, title_text):
"""
1) CHR compress
2) create codes.bin + codec.json (audit only)
3) pack TCAR payload bytes (to embed into carrier image)
4) write visuals (entropy plot + constellation plot)
"""
try:
units = STATE.get("units")
if not units:
return "No units loaded. Upload or load demo corpus.", None, None, None, None, None
Z, backend = embed_texts(units, prefer_sentence_transformer=True)
U, p, Hg, Hs = chr_optimize(
Z, K=int(K), iters=int(iters), beta=float(beta),
bins=int(slab_bins), tau=float(tau), seed=int(seed)
)
labels = p.argmax(axis=1).astype(np.int32)
proj = Z @ U.T
radials = proj[np.arange(len(units)), labels].astype(np.float32)
edges = make_radial_bins(radials, B=int(radial_bins))
bins_q = np.array([quantize_radial(float(radials[i]), edges) for i in range(len(units))], dtype=np.int32)
code_bytes = pack_codes_to_bytes(labels, bins_q)
out_dir = tempfile.mkdtemp()
codec = {
"backend": backend,
"K": int(K),
"radial_bins": int(radial_bins),
"iters": int(iters),
"beta": float(beta),
"slab_bins": int(slab_bins),
"tau": float(tau),
"seed": int(seed),
"U": U.tolist(),
"radial_edges": edges.tolist(),
"units_count": int(len(units)),
"bytes_per_unit": 2.0,
"total_bytes": int(len(code_bytes) + 8),
}
bin_path, codec_path = save_codes_and_codec(code_bytes, codec, out_dir)
title = str(title_text).strip()[:120] or "CHR Trading Card"
payload = pack_trading_card_payload(code_bytes=code_bytes, codec=codec, title=title)
payload_len = len(payload)
header_tc, _ = unpack_trading_card_payload(payload)
mhep = compute_mhep(Hg, Hs, K=int(K), bins=int(slab_bins))
ent_plot = os.path.join(out_dir, "entropy.png")
map_plot = os.path.join(out_dir, "map.png")
plot_entropy(Hg, Hs, ent_plot)
plot_constellation_map(Z, U, labels, map_plot)
report = (
f"## Compressed Payload Ready\n"
f"- **Embedding backend:** `{backend}`\n"
f"- **Units:** **{len(units)}**\n"
f"- **Constellations (K):** **{int(K)}**\n"
f"- **Radial bins:** **{int(radial_bins)}**\n"
f"- **Code bytes (constellation+radial):** **{len(code_bytes)}**\n"
f"- **TCAR payload bytes to embed:** **{payload_len}**\n"
f"- **MHEP score:** **{mhep:.1f}%**\n"
f"\nNext: upload a **carrier card image** and click **Embed into Carrier**."
)
STATE.update({
"Z": Z, "U": U, "labels": labels, "bins": bins_q,
"codec": codec, "code_bytes": code_bytes,
"payload": payload, "payload_header": header_tc,
"Hg": Hg, "Hs": Hs, "mhep": float(mhep),
"out_dir": out_dir,
"bin_path": bin_path, "codec_path": codec_path,
"entropy_plot": ent_plot, "map_plot": map_plot,
"final_card_path": None, "tilt_gif": None, "lsb_preview_path": None, "stego_info": None,
"model": None
})
header_json = json.dumps(header_tc, indent=2)
return report, ent_plot, map_plot, bin_path, codec_path, header_json
except Exception as e:
return f"Error: {e}\n\n{traceback.format_exc()}", None, None, None, None, None
def make_card_tilt_gif(card_rgb: np.ndarray, out_path: str, frames: int = 20, fps: int = 12):
H, W, _ = card_rgb.shape
frames = int(max(8, min(frames, 48)))
fps = int(max(6, min(fps, 24)))
imgs = []
for t in range(frames):
a = (t / frames) * 2*np.pi
dx = int(2 + 3*np.sin(a))
dy = int(2 + 3*np.cos(a))
img = card_rgb.copy().astype(np.int16)
yy, xx = np.mgrid[0:H, 0:W]
grad = (0.90 + 0.10*np.sin(a + (xx / max(1, W-1))*2*np.pi)).astype(np.float32)
r = np.roll(img[:, :, 0], shift=dx, axis=1)
g = np.roll(img[:, :, 1], shift=dy, axis=0)
b = img[:, :, 2]
img[:, :, 0] = (r * grad).astype(np.int16)
img[:, :, 1] = (g * grad).astype(np.int16)
img[:, :, 2] = (b * grad).astype(np.int16)
img = np.clip(img, 0, 255).astype(np.uint8)
imgs.append(img)
imageio.mimsave(out_path, imgs, fps=fps)
def embed_into_carrier(carrier_img, holo_alpha, requested_bits, title_text):
"""
Upload carrier image -> apply holo overlay (visible) -> FULL-BLEED STATS overlay (visible)
-> embed TCAR payload into pixels (invisible) -> save final PNG
Also creates: tilt GIF + LSB preview image.
"""
try:
payload = STATE.get("payload")
out_dir = STATE.get("out_dir")
header_tc = STATE.get("payload_header")
if payload is None or out_dir is None or header_tc is None:
return "No payload found. Run compression first.", None, None, None, None, None
cpath = _path_from_upload(carrier_img)
if not cpath:
return "Please upload a carrier image (PNG/JPG).", None, None, None, None, None
carrier_rgb = load_image_rgb(cpath)
# visible holo
holo = make_holo_texture(seed=int(header_tc["codec"]["seed"]), W=carrier_rgb.shape[1], H=carrier_rgb.shape[0])
preview_rgb = blend_holo(carrier_rgb, holo, alpha=float(holo_alpha))
# ----- FULL-BLEED UI replaces add_card_text entirely -----
title = str(title_text).strip()[:80] or str(header_tc.get("title", "CHR Trading Card"))[:80]
Hg = STATE.get("Hg")
Hs = STATE.get("Hs")
mhep = float(STATE.get("mhep") or 0.0)
code_len = len(STATE.get("code_bytes") or b"")
backend = str(header_tc.get("codec", {}).get("backend", "") or "")
bshort = short_backend(backend)
rarity = compute_rarity(mhep)
card_type = f"CHR/{bshort}"
req_bits = int(requested_bits)
stats = compute_card_stats(
header_tc=header_tc,
mhep=mhep,
Hg_traj=Hg,
Hs_traj=Hs,
payload_len=len(payload),
code_bytes_len=code_len,
bits_per_channel=(None if req_bits == 0 else int(np.clip(req_bits, 1, 4))),
img_shape=preview_rgb.shape
)
subtitle_left = f"K={header_tc['codec']['K']} RAD={header_tc['codec']['radial_bins']} SEED={header_tc['codec']['seed']}"
subtitle_right = f"Units={header_tc.get('units_count')} Bytes={len(payload)} CRC32={int(header_tc['crc32']):08x}"
preview_rgb = draw_full_bleed_stats(
preview_rgb,
title=title,
rarity=rarity,
card_type=card_type,
stats=stats,
subtitle_left=subtitle_left,
subtitle_right=subtitle_right,
holo_alpha=float(holo_alpha),
radius=28,
seed=int(header_tc["codec"]["seed"]),
)
preview_path = os.path.join(out_dir, "carrier_preview.png")
save_png(preview_rgb, preview_path)
# Decide bits per channel + possible resize to fit capacity
req_bits = int(requested_bits)
if req_bits == 0:
bpc, embed_rgb = choose_embed_params(preview_rgb, payload_len=len(payload))
else:
bpc = int(np.clip(req_bits, 1, 4))
cap = _capacity_bytes(preview_rgb, bpc)
if (len(payload) + 14) > cap:
bpc, embed_rgb = choose_embed_params(preview_rgb, payload_len=len(payload))
else:
embed_rgb = preview_rgb
final_rgb = embed_payload_lsb_rgb(embed_rgb, payload=payload, bits_per_channel=bpc)
final_path = os.path.join(out_dir, "final_trading_card.png")
save_png(final_rgb, final_path)
# Verify by extracting from saved PNG pixels only
re_rgb = load_image_rgb(final_path)
extracted, stego_info = extract_payload_auto(re_rgb, max_bits=4)
header2, code2 = unpack_trading_card_payload(extracted)
ok_crc = (_crc32_u32(code2) == int(header2["crc32"]))
ok_sha = (_sha256_hex(code2) == str(header2["sha256"]))
verified = bool(stego_info.get("verified")) and ok_crc and ok_sha
lsb_img = lsb_preview(re_rgb, bits=int(stego_info["bits_per_channel"]))
lsb_path = os.path.join(out_dir, "lsb_preview.png")
save_png(lsb_img, lsb_path)
tilt_path = os.path.join(out_dir, "card_tilt.gif")
make_card_tilt_gif(re_rgb, tilt_path, frames=20, fps=12)
STATE.update({
"carrier_path": cpath,
"preview_path": preview_path,
"final_card_path": final_path,
"tilt_gif": tilt_path,
"lsb_preview_path": lsb_path,
"stego_info": stego_info,
"model": None
})
report = (
f"## Carrier Embedded ✅\n"
f"- **Final card:** `final_trading_card.png`\n"
f"- **Stego bits/channel (auto-detected):** **{int(stego_info['bits_per_channel'])}**\n"
f"- **Embedded payload bytes:** **{int(stego_info['payload_len'])}**\n"
f"- **Stego CRC32:** `{stego_info['payload_crc32']}`\n"
f"- **TCAR integrity:** CRC32={str(ok_crc)} SHA256={str(ok_sha)}\n"
f"- **Verified:** {'✅ YES' if verified else '❌ NO'}\n"
f"\nNext: go to **Train** and train from the **final PNG pixels only**."
)
return report, preview_path, final_path, tilt_path, lsb_path, json.dumps({"stego": stego_info, "tcar_verified": verified}, indent=2)
except Exception as e:
return f"Embed error: {e}\n\n{traceback.format_exc()}", None, None, None, None, None
def train_from_final_card(train_steps, batch_size, block_size, lr, log_every,
temperature, rollout_steps, make_gif, gif_stride, gif_fps, gif_max_frames):
"""
Train on bytes extracted from final PNG pixels (stego payload).
"""
try:
final_path = STATE.get("final_card_path")
out_dir = STATE.get("out_dir")
if not final_path or not os.path.exists(final_path):
return "No final trading card found. Embed into a carrier first.", None, None, None, None
device = "cuda" if torch.cuda.is_available() else "cpu"
rgb = load_image_rgb(final_path)
payload, stego_info = extract_payload_auto(rgb, max_bits=4)
L = len(payload)
user_block = int(block_size)
user_bs = int(batch_size)
tuned_block = min(user_block, max(48, L // 10))
tuned_block = min(tuned_block, max(48, L - 2))
block_size = int(tuned_block)
n_windows = max(0, L - block_size - 1)
tuned_bs = min(user_bs, max(8, n_windows // 4)) if n_windows > 0 else 1
batch_size = int(max(1, tuned_bs))
start = list(np.frombuffer(payload[:block_size], dtype=np.uint8).tolist())
untrained = TinyByteTransformer(block_size=block_size).to(device)
before_seq = sample_bytes(untrained, start=start, steps=int(rollout_steps), device=device, temperature=float(temperature))
before_plot = os.path.join(out_dir, "rollout_before.png")
plot_rollout_tracks(before_seq[-int(rollout_steps):], before_plot, title="BEFORE training (random)")
model, ds_info, losses, ppls = train_on_stego_png(
card_png_path=final_path,
steps=int(train_steps),
batch_size=batch_size,
block_size=block_size,
lr=float(lr),
device=device,
log_every=int(log_every),
)
STATE["model"] = model
train_plot = os.path.join(out_dir, "training.png")
plot_training_curves(losses, ppls, train_plot)
after_seq = sample_bytes(model, start=start, steps=int(rollout_steps), device=device, temperature=float(temperature))
after_plot = os.path.join(out_dir, "rollout_after.png")
plot_rollout_tracks(after_seq[-int(rollout_steps):], after_plot, title="AFTER training (trained)")
compare_plot = os.path.join(out_dir, "rollout_compare.png")
plot_before_after_tracks(before_seq[-int(rollout_steps):], after_seq[-int(rollout_steps):], compare_plot)
gif_path = None
if bool(make_gif):
gif_path = os.path.join(out_dir, "rollout.gif")
seq = after_seq[-int(rollout_steps):]
stride = max(1, int(gif_stride))
fps = max(6, int(gif_fps))
max_frames = max(12, int(gif_max_frames))
frames = []
count = 0
for t in range(10, len(seq), stride):
fig = plt.figure(figsize=(7,3.6))
plt.plot(seq[:t], linewidth=2)
plt.ylim(-2, 260)
plt.title("AFTER training — rollout from stego pixels")
plt.xlabel("Step"); plt.ylabel("Byte value")
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=140)
plt.close(fig)
buf.seek(0)
frames.append(imageio.imread(buf))
count += 1
if count >= max_frames:
break
imageio.mimsave(gif_path, frames, fps=fps)
report = (
f"## Training Complete (PNG-only)\n"
f"- **Device:** `{device}`\n"
f"- **Stego extracted:** ✅ (bits/channel={int(ds_info['bits_per_channel'])}, bytes={int(ds_info['payload_len'])})\n"
f"- **Auto block_size:** **{block_size}** (requested {user_block})\n"
f"- **Auto batch_size:** **{batch_size}** (requested {user_bs})\n"
f"- **Steps:** **{int(train_steps)}** (logged every {int(log_every)})\n"
f"- **Final logged loss:** **{losses[-1]:.4f}**\n"
f"- **Final logged perplexity:** **{ppls[-1]:.2f}**\n"
f"\n### What investors should notice\n"
f"Perplexity falls while training from **a single image** containing hidden dataset bytes."
)
metrics = {"loss": losses, "ppl": ppls, "stego": ds_info}
return report, train_plot, compare_plot, gif_path, json.dumps(metrics, indent=2)
except Exception as e:
return f"Training error: {e}\n\n{traceback.format_exc()}", None, None, None, None
# -----------------------------
# Gradio UI
# -----------------------------
INTRO = """
# Data Playing Card Trainer (Investor Demo)
**Pipeline:**
1) Compress dataset → **constellation/radial codes**
2) Pack into **TCAR payload**
3) Upload a **real “playing card” carrier image**
4) Apply **holo shimmer** + **full-bleed trading card stats** (visible) + embed payload into **PNG pixels** (invisible)
5) Train a tiny model using **only the final PNG pixels**
"""
with gr.Blocks(title="Data Playing Card Trainer (CHR + Stego)") as demo:
gr.Markdown(INTRO)
with gr.Tab("1) Ingest"):
with gr.Row():
file_in = gr.File(label="Upload .txt or .docx", file_types=[".txt", ".docx"])
units_mode = gr.Radio(["paragraphs", "sentences"], value="sentences", label="Unit granularity")
with gr.Row():
ingest_btn = gr.Button("Load file", variant="primary")
demo_btn = gr.Button("Load built-in demo corpus", variant="secondary")
ingest_status = gr.Markdown("")
ingest_btn.click(ingest_file, inputs=[file_in, units_mode], outputs=[ingest_status])
demo_btn.click(load_demo, inputs=[units_mode], outputs=[ingest_status])
with gr.Tab("2) Compress → Payload"):
with gr.Row():
K = gr.Slider(2, 48, value=16, step=1, label="K (constellations)")
iters = gr.Slider(5, 120, value=35, step=1, label="CHR iterations")
beta = gr.Slider(2, 30, value=16, step=1, label="beta (assignment sharpness)")
with gr.Row():
slab_bins = gr.Slider(3, 16, value=8, step=1, label="slab bins (entropy measure)")
tau = gr.Slider(1, 20, value=5, step=1, label="tau (slab softness)")
radial_bins = gr.Slider(8, 256, value=64, step=8, label="radial bins (compression alphabet)")
seed = gr.Slider(0, 9999, value=42, step=1, label="seed")
title_text = gr.Textbox(value="DATA PLAYING CARD", label="Card title")
compress_btn = gr.Button("Build Payload + Visuals", variant="primary")
compress_report = gr.Markdown("")
with gr.Row():
ent_img = gr.Image(label="Entropy during compression", type="filepath")
map_img = gr.Image(label="Constellation map (PCA)", type="filepath")
with gr.Row():
codes_bin = gr.File(label="codes.bin (audit only)")
codec_json = gr.File(label="codec.json (audit only)")
payload_header = gr.Code(label="TCAR header (inside payload)", language="json")
compress_btn.click(
compress_build_payload,
inputs=[K, iters, beta, slab_bins, tau, seed, radial_bins, title_text],
outputs=[compress_report, ent_img, map_img, codes_bin, codec_json, payload_header]
)
with gr.Tab("3) Upload Carrier → Embed"):
gr.Markdown(
"Upload a **carrier image** (your sci-fi playing card art). "
"We’ll apply a visible holo shimmer + **full-bleed card stats UI**, then **embed the dataset payload invisibly into PNG pixels**."
)
with gr.Row():
carrier_img = gr.File(label="Carrier image (PNG/JPG)", file_types=[".png", ".jpg", ".jpeg"])
holo_alpha = gr.Slider(0.0, 0.8, value=0.35, step=0.01, label="Holo overlay strength")
with gr.Row():
requested_bits = gr.Slider(0, 4, value=0, step=1, label="bits/channel (0 = AUTO)")
title_text2 = gr.Textbox(value="DATA PLAYING CARD", label="Card title (full-bleed)")
embed_btn = gr.Button("Embed Payload into Carrier (creates final PNG)", variant="primary")
embed_report = gr.Markdown("")
with gr.Row():
preview_img = gr.Image(label="Preview (visible holo + full-bleed stats)", type="filepath")
final_img = gr.Image(label="Final Trading Card PNG (contains hidden data)", type="filepath")
with gr.Row():
tilt_gif = gr.Image(label="Holo tilt (GIF)", type="filepath")
lsb_img = gr.Image(label="LSB preview (shows hidden structure)", type="filepath")
stego_info = gr.Code(label="Embed/Verify info", language="json")
embed_btn.click(
embed_into_carrier,
inputs=[carrier_img, holo_alpha, requested_bits, title_text2],
outputs=[embed_report, preview_img, final_img, tilt_gif, lsb_img, stego_info]
)
with gr.Tab("4) Train from Final Card"):
gr.Markdown(
"Training extracts the embedded payload **from pixels only** (auto-detects bits/channel), "
"then trains a tiny byte model. Defaults are tuned to feel fast to an investor."
)
with gr.Row():
train_steps = gr.Slider(50, 1200, value=200, step=50, label="training steps (fast demo default)")
batch_size = gr.Slider(4, 128, value=32, step=4, label="batch size")
block_size = gr.Slider(48, 256, value=96, step=16, label="sequence length (bytes)")
with gr.Row():
lr = gr.Number(value=7e-4, label="learning rate")
log_every = gr.Slider(10, 200, value=25, step=5, label="log every (steps)")
temperature = gr.Slider(0.5, 2.0, value=1.0, step=0.05, label="rollout temperature")
rollout_steps = gr.Slider(40, 400, value=120, step=20, label="rollout steps (bytes)")
with gr.Row():
make_gif = gr.Checkbox(value=False, label="Generate rollout GIF (adds time)")
gif_stride = gr.Slider(1, 12, value=5, step=1, label="GIF stride (higher = faster)")
gif_fps = gr.Slider(6, 24, value=12, step=1, label="GIF FPS")
gif_max_frames = gr.Slider(12, 120, value=40, step=4, label="GIF max frames (cap)")
train_btn = gr.Button("Train from PNG pixels + generate visuals", variant="primary")
train_report = gr.Markdown("")
with gr.Row():
train_img = gr.Image(label="Loss + perplexity", type="filepath")
compare_img = gr.Image(label="BEFORE vs AFTER rollout", type="filepath")
gif_out = gr.Image(label="Rollout GIF (optional)", type="filepath")
metrics_json = gr.Code(label="Metrics (JSON)", language="json")
train_btn.click(
train_from_final_card,
inputs=[train_steps, batch_size, block_size, lr, log_every, temperature, rollout_steps,
make_gif, gif_stride, gif_fps, gif_max_frames],
outputs=[train_report, train_img, compare_img, gif_out, metrics_json]
)
if __name__ == "__main__":
demo.launch(ssr_mode=False)