# app.py — Trading Card Trainer (CHR) + Carrier Upload + Robust Stego Embed # + Full-Bleed Trading Card UI (replaces add_card_text) # + Rarity + Type # + Subtle Foil Outer Frame + Rounded Corners # # Goal: dataset -> compressed payload -> visually “wow” card (carrier + holo overlay + full-bleed stats) # -> embed payload into PNG pixels (self-describing LSB stego) # Training reads ONLY the final card PNG pixels (extracts embedded payload) import os # --- HF Spaces hardening: make OpenMP thread env vars valid integers --- def _set_int_env(name: str, value: int): v = os.environ.get(name, "") if not str(v).isdigit(): os.environ[name] = str(value) _set_int_env("OMP_NUM_THREADS", 1) _set_int_env("OPENBLAS_NUM_THREADS", 1) _set_int_env("MKL_NUM_THREADS", 1) _set_int_env("NUMEXPR_NUM_THREADS", 1) import io, re, json, math, struct, tempfile, traceback, hashlib, zlib from pathlib import Path from typing import List, Tuple, Dict, Optional import numpy as np import gradio as gr import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import imageio.v2 as imageio from PIL import Image # ----------------------------- # Optional DOCX support # ----------------------------- _DOCX_OK = False try: from docx import Document _DOCX_OK = True except Exception: _DOCX_OK = False # ----------------------------- # Embeddings: sentence-transformers (preferred), fallback to hashing # ----------------------------- from sklearn.feature_extraction.text import HashingVectorizer from sklearn.decomposition import PCA _ST_MODEL = None def _load_st_model(): global _ST_MODEL if _ST_MODEL is not None: return _ST_MODEL try: from sentence_transformers import SentenceTransformer _ST_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") return _ST_MODEL except Exception: return None def embed_texts(texts: List[str], prefer_sentence_transformer: bool = True) -> Tuple[np.ndarray, str]: texts = [t if isinstance(t, str) else str(t) for t in texts] if prefer_sentence_transformer: model = _load_st_model() if model is not None: try: vecs = model.encode( texts, batch_size=32, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True ) return vecs.astype(np.float32), "sentence-transformers/all-MiniLM-L6-v2" except Exception: pass hv = HashingVectorizer(n_features=768, alternate_sign=False, norm=None) X = hv.transform(texts) vecs = X.toarray().astype(np.float32) norms = np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-9 vecs = vecs / norms return vecs, "HashingVectorizer(768d) fallback" # ----------------------------- # Text ingestion / splitting # ----------------------------- def _basic_sentence_split(text: str) -> List[str]: rough = re.split(r'[\n\r]+|(?<=[\.\!\?])\s+', text.strip()) out = [] for s in rough: s = s.strip() if s: out.append(s) return out def read_txt_bytes(b: bytes) -> str: try: return b.decode("utf-8") except Exception: return b.decode("latin-1", errors="ignore") def read_docx_bytes(b: bytes) -> List[str]: if not _DOCX_OK: raise RuntimeError("python-docx not installed in this Space.") bio = io.BytesIO(b) doc = Document(bio) paras = [p.text.strip() for p in doc.paragraphs] return [p for p in paras if p and not p.isspace()] def to_units(raw_text: str, mode: str) -> List[str]: raw_text = raw_text.strip() if not raw_text: return [] if mode == "sentences": return _basic_sentence_split(raw_text) paras = [p.strip() for p in re.split(r"\n\s*\n+", raw_text) if p.strip()] return paras # ----------------------------- # Demo corpus (big enough to always train) # ----------------------------- DEMO_CORPUS = """ In the beginning, people stored knowledge in libraries, then in databases, and now in neural networks. Compression isn’t just saving space — it’s choosing what matters. A constellation is a pattern you can navigate. Entropy is a measure of surprise, and learning is surprise turning into structure. A system that learns from compressed data never needs the original. It doesn’t memorize pixels; it memorizes geometry. It doesn’t hoard text; it extracts signals. The question isn’t “Can it compress?” but “Can it learn after compressing?” Investors love seeing systems move. They love curves that fall. They love maps that cluster. They love a demo that feels alive. This demo builds a codec from your dataset, then trains a model exclusively on the encoded artifact inside a single trading card image. No raw text is used during training. Only the trading card exists. We call the clusters constellations. We call the structure harvestable. We call the drop in entropy visible proof. """ # ----------------------------- # CHR core # ----------------------------- def softmax(x, axis=-1): x = x - np.max(x, axis=axis, keepdims=True) ex = np.exp(x) return ex / (np.sum(ex, axis=axis, keepdims=True) + 1e-9) def global_range_entropy(p: np.ndarray) -> float: m = p.mean(axis=0) m_safe = np.clip(m, 1e-12, None) return float(-(m_safe * np.log(m_safe)).sum()) def soft_slab_entropy(z: np.ndarray, U: np.ndarray, bins: int = 8, tau: float = 5.0) -> float: t = z @ U.T K = U.shape[0] Hs = [] for j in range(K): tj = t[:, j] tmin, tmax = float(tj.min()), float(tj.max()) if not np.isfinite(tmin) or not np.isfinite(tmax) or tmax - tmin < 1e-6: Hs.append(0.0) continue centers = np.linspace(tmin, tmax, bins) dist2 = (tj[:, None] - centers[None, :]) ** 2 weights = softmax(-tau * dist2, axis=1) hist = weights.mean(axis=0) hist = np.clip(hist, 1e-12, None) H = float(-(hist * np.log(hist)).sum()) Hs.append(H) return float(np.mean(Hs)) if Hs else 0.0 def kmeans_plus_plus_init(z: np.ndarray, K: int, rng: np.random.RandomState) -> np.ndarray: N, d = z.shape inds = [rng.randint(0, N)] centers = [z[inds[0]]] cos0 = np.clip(z @ centers[0], -1.0, 1.0) d2 = np.clip(1.0 - cos0, 1e-12, None) for _ in range(1, K): s = d2.sum() if not np.isfinite(s) or s <= 0: probs = np.full(N, 1.0 / N) else: probs = np.clip(d2 / s, 0.0, None) probs = probs / (probs.sum() + 1e-12) next_idx = rng.choice(N, p=probs) inds.append(next_idx) centers.append(z[next_idx]) cos_new = np.clip(z @ z[next_idx], -1.0, 1.0) d2 = np.minimum(d2, np.clip(1.0 - cos_new, 1e-12, None)) U = np.stack(centers, axis=0) U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) return U def chr_optimize(z: np.ndarray, K: int = 8, iters: int = 30, beta: float = 12.0, bins: int = 8, tau: float = 5.0, seed: int = 42): rng = np.random.RandomState(seed) N, d = z.shape U = kmeans_plus_plus_init(z, K, rng) if N >= K else np.pad(z, ((0, max(0, K - N)), (0, 0)), mode="wrap")[:K] U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) logits0 = beta * (z @ U.T) p0 = softmax(logits0, axis=1) Hg_traj = [global_range_entropy(p0)] Hs_traj = [soft_slab_entropy(z, U, bins=bins, tau=tau)] for _ in range(iters): logits = beta * (z @ U.T) p = softmax(logits, axis=1) numer = p.T @ z denom = p.sum(axis=0)[:, None] + 1e-9 U = numer / denom U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) Hg_traj.append(global_range_entropy(p)) Hs_traj.append(soft_slab_entropy(z, U, bins=bins, tau=tau)) logits = beta * (z @ U.T) p = softmax(logits, axis=1) return U, p, np.array(Hg_traj), np.array(Hs_traj) def compute_mhep(Hg_traj: np.ndarray, Hs_traj: np.ndarray, K: int, bins: int, w_g: float = 0.7, w_s: float = 0.3) -> float: if len(Hg_traj) < 2 or len(Hs_traj) < 2: return 0.0 maxHg = math.log(max(K, 2)) maxHs = math.log(max(bins, 2)) drop_g = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (maxHg + 1e-9) drop_s = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (maxHs + 1e-9) return float(np.clip(100.0 * (w_g * drop_g + w_s * drop_s), 0.0, 100.0)) # ----------------------------- # CHR → discrete "compressed" byte stream (codes.bin payload) # ----------------------------- def make_radial_bins(radials: np.ndarray, B: int = 64) -> np.ndarray: edges = np.quantile(radials, np.linspace(0, 1, B + 1)) for i in range(1, len(edges)): if edges[i] <= edges[i - 1]: edges[i] = edges[i - 1] + 1e-6 return edges.astype(np.float32) def quantize_radial(r: float, edges: np.ndarray) -> int: b = np.searchsorted(edges, r, side="right") - 1 return int(np.clip(b, 0, len(edges) - 2)) def pack_codes_to_bytes(labels: np.ndarray, bins: np.ndarray) -> bytes: out = bytearray() for c, b in zip(labels.tolist(), bins.tolist()): out.append(int(c) & 0xFF) out.append(int(b) & 0xFF) return bytes(out) def save_codes_and_codec(code_bytes: bytes, codec: Dict, out_dir: str) -> Tuple[str, str]: os.makedirs(out_dir, exist_ok=True) bin_path = os.path.join(out_dir, "codes.bin") meta_path = os.path.join(out_dir, "codec.json") with open(bin_path, "wb") as f: f.write(b"CHRC") f.write(struct.pack(" str: return hashlib.sha256(b).hexdigest() def _crc32_u32(b: bytes) -> int: return zlib.crc32(b) & 0xFFFFFFFF def pack_trading_card_payload(code_bytes: bytes, codec: Dict, title: str = "CHR Trading Card") -> bytes: """ Payload layout: CARD_MAGIC(4) | ver(u32) | header_len(u32) | header_json | code_len(u32) | code_bytes """ header = { "title": title, "codec": { "backend": codec.get("backend"), "K": codec.get("K"), "radial_bins": codec.get("radial_bins"), "iters": codec.get("iters"), "beta": codec.get("beta"), "slab_bins": codec.get("slab_bins"), "tau": codec.get("tau"), "seed": codec.get("seed"), }, "units_count": codec.get("units_count"), "bytes_per_unit": codec.get("bytes_per_unit"), "code_len": int(len(code_bytes)), "crc32": int(_crc32_u32(code_bytes)), "sha256": _sha256_hex(code_bytes), } header_json = json.dumps(header, ensure_ascii=False).encode("utf-8") blob = bytearray() blob += CARD_MAGIC blob += struct.pack(" Tuple[Dict, bytes]: if len(payload) < 16: raise ValueError("Payload too small.") if payload[:4] != CARD_MAGIC: raise ValueError("Payload magic not found.") ver = struct.unpack(" np.ndarray: arr = np.frombuffer(data, dtype=np.uint8) bits = ((arr[:, None] >> np.arange(8)) & 1).astype(np.uint8) return bits.reshape(-1) def _bits_to_bytes_le(bits: np.ndarray) -> bytes: bits = bits.astype(np.uint8).reshape(-1, 8) vals = np.sum(bits * (1 << np.arange(8, dtype=np.uint8))[None, :], axis=1).astype(np.uint8) return bytes(vals.tobytes()) def load_image_rgb(path: str) -> np.ndarray: img = Image.open(path).convert("RGB") return np.array(img, dtype=np.uint8) def save_png(rgb: np.ndarray, path: str): Image.fromarray(rgb.astype(np.uint8), mode="RGB").save(path, format="PNG", compress_level=6) def _capacity_bytes(rgb: np.ndarray, bits_per_channel: int) -> int: H, W, C = rgb.shape return (H * W * C * int(bits_per_channel)) // 8 def _pack_stego_header(payload: bytes, bits_per_channel: int) -> bytes: """ Header format: magic(4) | ver(u8) | bpc(u8) | payload_len(u32) | crc32(u32) Total = 14 bytes """ bpc = int(bits_per_channel) if not (1 <= bpc <= 4): raise ValueError("bits_per_channel must be 1..4") plen = int(len(payload)) crc = int(_crc32_u32(payload)) return STEGO_MAGIC + struct.pack(" np.ndarray: rgb = carrier_rgb.copy().astype(np.uint8) bpc = int(bits_per_channel) header = _pack_stego_header(payload, bpc) blob = header + payload cap = _capacity_bytes(rgb, bpc) if len(blob) > cap: raise ValueError(f"Carrier too small: need {len(blob)} bytes, capacity {cap} bytes at {bpc} bits/channel.") bits = _bytes_to_bits_le(blob) pad = (-len(bits)) % bpc if pad: bits = np.concatenate([bits, np.zeros(pad, dtype=np.uint8)], axis=0) bits_g = bits.reshape(-1, bpc) vals = np.sum(bits_g * (1 << np.arange(bpc, dtype=np.uint8))[None, :], axis=1).astype(np.uint8) flat = rgb.reshape(-1) mask = np.uint8(0xFF ^ ((1 << bpc) - 1)) flat[:len(vals)] = (flat[:len(vals)] & mask) | vals return rgb def _extract_lsb_stream(rgb: np.ndarray, bits_per_channel: int, n_bytes: int) -> bytes: bpc = int(bits_per_channel) flat = rgb.reshape(-1).astype(np.uint8) n_bits = n_bytes * 8 n_vals = int(np.ceil(n_bits / bpc)) vals = flat[:n_vals] & np.uint8((1 << bpc) - 1) bits = ((vals[:, None] >> np.arange(bpc)) & 1).astype(np.uint8).reshape(-1) bits = bits[:n_bits] return _bits_to_bytes_le(bits) def extract_payload_auto(rgb: np.ndarray, max_bits: int = 4) -> Tuple[bytes, Dict]: for bpc in range(1, int(max_bits) + 1): try: hdr = _extract_lsb_stream(rgb, bpc, n_bytes=14) ver, hdr_bpc, plen, crc = _unpack_stego_header(hdr) if ver != STEGO_VER or hdr_bpc != bpc: continue blob = _extract_lsb_stream(rgb, bpc, n_bytes=14 + plen) payload = blob[14:] if len(payload) != plen: continue if _crc32_u32(payload) != crc: continue return payload, { "stego_version": ver, "bits_per_channel": bpc, "payload_len": plen, "payload_crc32": f"{crc:08x}", "verified": True } except Exception: continue raise ValueError("Stego magic not found (wrong image or bits_per_channel, or image was altered).") def choose_embed_params(carrier_rgb: np.ndarray, payload_len: int) -> Tuple[int, np.ndarray]: """ Investor-proof: try bpc=1..4. If still too small, upscale the carrier (nearest) until it fits. Returns (bpc, possibly_resized_rgb) """ rgb = carrier_rgb for _ in range(3): # at most 3 upscales for bpc in (1, 2, 3, 4): cap = _capacity_bytes(rgb, bpc) if payload_len + 14 <= cap: return bpc, rgb H, W, _ = rgb.shape newW = min(1024, int(W * 1.5) + 1) newH = min(1024, int(H * 1.5) + 1) pil = Image.fromarray(rgb, mode="RGB").resize((newW, newH), resample=Image.NEAREST) rgb = np.array(pil, dtype=np.uint8) raise ValueError("Carrier image too small even after upscaling. Use a larger carrier image (>=1024px).") # ----------------------------- # “Holo overlay” visuals (sizzle) # ----------------------------- def make_holo_texture(seed: int, W: int, H: int) -> np.ndarray: rng = np.random.RandomState(int(seed) + 123) yy, xx = np.mgrid[0:H, 0:W].astype(np.float32) xx = (xx / max(1, W-1) - 0.5) * 2.0 yy = (yy / max(1, H-1) - 0.5) * 2.0 out = np.zeros((H, W, 3), dtype=np.float32) freqs = rng.uniform(2.0, 10.0, size=12).astype(np.float32) phases = rng.uniform(0, 2*np.pi, size=12).astype(np.float32) weights = rng.uniform(0.6, 1.2, size=12).astype(np.float32) for c in range(3): acc = np.zeros((H, W), dtype=np.float32) for i in range(12): a = rng.uniform(-1.0, 1.0) b = rng.uniform(-1.0, 1.0) acc += weights[i] * np.cos(freqs[i] * (a*xx + b*yy) + phases[i] + c*0.7) acc = (acc - acc.min()) / (acc.max() - acc.min() + 1e-9) out[..., c] = acc rr = np.sqrt(xx*xx + yy*yy) vignette = np.clip(1.1 - 0.35*rr, 0.6, 1.1) out *= vignette[..., None] out = np.clip(out, 0.0, 1.0) return (out * 255.0).astype(np.uint8) def blend_holo(carrier_rgb: np.ndarray, holo_rgb: np.ndarray, alpha: float = 0.35) -> np.ndarray: alpha = float(np.clip(alpha, 0.0, 1.0)) c = carrier_rgb.astype(np.float32) h = holo_rgb.astype(np.float32) out = (1.0 - alpha) * c + alpha * h return np.clip(out, 0, 255).astype(np.uint8) def lsb_preview(rgb: np.ndarray, bits: int = 2) -> np.ndarray: b = int(np.clip(bits, 1, 4)) mask = (1 << b) - 1 v = (rgb & mask).astype(np.uint8) v = (v.astype(np.float32) / float(mask)) * 255.0 return np.clip(v, 0, 255).astype(np.uint8) # ----------------------------- # Trading card stats + rarity/type + full-bleed renderer # ----------------------------- def compute_rarity(mhep: float) -> str: m = float(mhep) if m < 40: return "COMMON" if m < 65: return "RARE" if m < 85: return "EPIC" return "LEGENDARY" def short_backend(backend: str) -> str: b = (backend or "").lower() if "minilm" in b: return "MiniLM" if "hashingvectorizer" in b or "hashing" in b: return "HASH" return (backend or "EMB")[:8] def _clamp01(x: float) -> float: return float(np.clip(x, 0.0, 1.0)) def _score_0_99(x01: float) -> int: return int(np.clip(round(99.0 * _clamp01(x01)), 0, 99)) def compute_card_stats(header_tc: Dict, mhep: float, Hg_traj: Optional[np.ndarray] = None, Hs_traj: Optional[np.ndarray] = None, payload_len: int = 0, code_bytes_len: int = 0, bits_per_channel: Optional[int] = None, img_shape: Optional[Tuple[int,int,int]] = None) -> Dict[str, int]: units = int(header_tc.get("units_count", 0) or 0) K = int(header_tc.get("codec", {}).get("K", 0) or 0) radial_bins = int(header_tc.get("codec", {}).get("radial_bins", 0) or 0) ent_drop = 0.0 slab_drop = 0.0 if Hg_traj is not None and len(Hg_traj) >= 2: ent_drop = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (abs(float(Hg_traj[0])) + 1e-9) if Hs_traj is not None and len(Hs_traj) >= 2: slab_drop = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (abs(float(Hs_traj[0])) + 1e-9) bpu = (float(code_bytes_len) / max(1, units)) if units > 0 else 0.0 density01 = 1.0 - math.exp(-bpu / 2.0) cc01 = 1.0 - math.exp(- (K / 12.0) * (radial_bins / 64.0)) if bits_per_channel is None: embed_ease01 = 0.5 else: embed_ease01 = _clamp01((bits_per_channel - 1) / 3.0) stability01 = 0.5 if img_shape is not None: H, W, _ = img_shape px = float(H * W) stability01 = _clamp01(1.0 - math.exp(-px / (512.0 * 512.0))) harvest = int(np.clip(round(float(mhep)), 0, 99)) signal = _score_0_99(0.65 * ent_drop + 0.35 * slab_drop) density = _score_0_99(density01) constellation = _score_0_99(cc01) if payload_len <= 0: integrity01 = 0.85 else: integrity01 = _clamp01(0.95 - (payload_len / 2_000_000.0)) integrity = _score_0_99(integrity01) embed = _score_0_99(0.55 * embed_ease01 + 0.45 * stability01) scope01 = _clamp01(math.log10(max(10.0, float(units))) / 4.0) scope = _score_0_99(scope01) return { "HARVEST": harvest, "SIGNAL": signal, "DENSITY": density, "CONST": constellation, "INTEG": integrity, "EMBED": embed, "SCOPE": scope, } def _rounded_mask(W: int, H: int, radius: int) -> np.ndarray: # boolean mask of pixels inside rounded rect r = int(max(2, radius)) yy, xx = np.mgrid[0:H, 0:W] inside = np.ones((H, W), dtype=np.uint8) # corners: (x,y) distances to corner centers # top-left cx, cy = r, r tl = (xx < r) & (yy < r) inside[tl] = (((xx[tl]-cx)**2 + (yy[tl]-cy)**2) <= (r*r)).astype(np.uint8) # top-right cx, cy = W - r - 1, r tr = (xx > W - r - 1) & (yy < r) inside[tr] = (((xx[tr]-cx)**2 + (yy[tr]-cy)**2) <= (r*r)).astype(np.uint8) # bottom-left cx, cy = r, H - r - 1 bl = (xx < r) & (yy > H - r - 1) inside[bl] = (((xx[bl]-cx)**2 + (yy[bl]-cy)**2) <= (r*r)).astype(np.uint8) # bottom-right cx, cy = W - r - 1, H - r - 1 br = (xx > W - r - 1) & (yy > H - r - 1) inside[br] = (((xx[br]-cx)**2 + (yy[br]-cy)**2) <= (r*r)).astype(np.uint8) return inside.astype(bool) def apply_rounded_corners(rgb: np.ndarray, radius: int = 28, bg=(8, 8, 12)) -> np.ndarray: """ Since we save RGB PNG (no alpha), we simulate rounded corners by blending corners into a dark bg. """ H, W, _ = rgb.shape mask = _rounded_mask(W, H, radius) out = rgb.copy().astype(np.float32) bgv = np.array(bg, dtype=np.float32)[None, None, :] m = mask[:, :, None].astype(np.float32) out = m * out + (1.0 - m) * bgv return np.clip(out, 0, 255).astype(np.uint8) def add_subtle_foil_frame(rgb: np.ndarray, radius: int = 28, thickness: int = 10, seed: int = 0) -> np.ndarray: """ Adds a subtle "foil" frame by brightening a rounded-rect ring with a shimmering pattern. """ H, W, _ = rgb.shape r = int(max(6, radius)) t = int(max(4, thickness)) rng = np.random.RandomState(int(seed) + 999) outer = _rounded_mask(W, H, r) inner = _rounded_mask(W, H, max(2, r - t)) ring = outer & (~inner) yy, xx = np.mgrid[0:H, 0:W].astype(np.float32) xxn = xx / max(1.0, W-1.0) yyn = yy / max(1.0, H-1.0) # shimmering scalar field (no explicit colors; just modulate brightness) phase = rng.uniform(0.0, 2*np.pi) shimmer = (0.5 + 0.5*np.sin(2*np.pi*(2.2*xxn + 1.6*yyn) + phase)).astype(np.float32) shimmer2 = (0.5 + 0.5*np.sin(2*np.pi*(3.3*xxn - 2.1*yyn) + phase*0.7)).astype(np.float32) s = 0.6*shimmer + 0.4*shimmer2 # 0..1 out = rgb.astype(np.float32) # Brightness boost map inside ring boost = (22.0 + 28.0 * s) # ~22..50 boost = boost[:, :, None] ring_f = ring[:, :, None].astype(np.float32) out = out + ring_f * boost # Add a very thin inner highlight line inner2 = _rounded_mask(W, H, max(2, r - t - 2)) ring2 = inner & (~inner2) out = out + ring2[:, :, None].astype(np.float32) * 18.0 return np.clip(out, 0, 255).astype(np.uint8) def draw_full_bleed_stats(card_rgb: np.ndarray, title: str, rarity: str, card_type: str, stats: Dict[str, int], subtitle_left: str, subtitle_right: str, holo_alpha: float = 0.35, radius: int = 28, seed: int = 0) -> np.ndarray: """ Full-bleed trading-card overlay: - Rounded corners + subtle foil frame - Top title band with rarity badge + type - Bottom stat slab with bars """ try: # Base: apply rounded corners first (so UI feels like it's inside a card) base = apply_rounded_corners(card_rgb, radius=radius, bg=(8, 8, 12)) # Subtle foil frame base = add_subtle_foil_frame(base, radius=radius, thickness=10, seed=seed) pil = Image.fromarray(base, mode="RGB") from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(pil) font = ImageFont.load_default() W, H = pil.size # ---- Top band ---- top_h = int(max(44, H * 0.12)) draw.rounded_rectangle([10, 10, W-10, 10 + top_h], radius=18, fill=(0, 0, 0), outline=(220, 220, 255), width=2) # Title draw.text((22, 18), (title or "DATA PLAYING CARD")[:60], fill=(255, 255, 255), font=font) # Subtitles (left/right) draw.text((22, 18 + 16), (subtitle_left or "")[:60], fill=(230, 230, 230), font=font) # Right aligned subtitle (approximate) sr = (subtitle_right or "")[:60] tw = len(sr) * 6 # default font approx draw.text((max(22, W - 22 - tw), 18 + 16), sr, fill=(230, 230, 230), font=font) # Rarity badge badge_w = 92 badge_h = 18 bx1 = W - 20 by0 = 14 bx0 = bx1 - badge_w by1 = by0 + badge_h draw.rounded_rectangle([bx0, by0, bx1, by1], radius=8, fill=(0, 0, 0), outline=(255, 255, 255), width=1) draw.text((bx0 + 8, by0 + 4), rarity[:12], fill=(255, 255, 255), font=font) # Type tag (under badge) ty = by1 + 4 tag = (card_type or "CHR")[:14] tw2 = len(tag) * 6 draw.text((max(22, W - 22 - tw2), ty), tag, fill=(255, 255, 255), font=font) # ---- Bottom stat slab ---- slab_h = int(max(130, H * 0.26)) y0 = H - slab_h - 12 draw.rounded_rectangle([10, y0, W-10, H-10], radius=18, fill=(0, 0, 0), outline=(220, 220, 255), width=2) # Stat rows items = list(stats.items()) # Keep consistent ordering preferred = ["HARVEST", "SIGNAL", "DENSITY", "CONST", "INTEG", "EMBED", "SCOPE"] items_sorted = [] d = dict(items) for k in preferred: if k in d: items_sorted.append((k, d[k])) for k, v in items: if k not in preferred: items_sorted.append((k, v)) pad = 16 cx = 22 cy = y0 + 14 # Bar geometry name_w = 66 bar_w = max(120, W - 10 - cx - name_w - 58) bar_h = 10 row_gap = 18 for name, val in items_sorted: val = int(np.clip(val, 0, 99)) draw.text((cx, cy), f"{name:>7}", fill=(255, 255, 255), font=font) bx0 = cx + name_w by0 = cy + 3 bx1 = bx0 + bar_w by1 = by0 + bar_h draw.rectangle([bx0, by0, bx1, by1], outline=(255, 255, 255), width=1) fill_w = int(round((val / 99.0) * (bar_w - 2))) if fill_w > 0: draw.rectangle([bx0 + 1, by0 + 1, bx0 + 1 + fill_w, by1 - 1], fill=(255, 255, 255)) draw.text((bx1 + 10, cy), f"{val:02d}", fill=(255, 255, 255), font=font) cy += row_gap if cy > H - 18: break # Convert back out = np.array(pil, dtype=np.uint8) return out except Exception: return card_rgb # ----------------------------- # Visual plots # ----------------------------- def plot_entropy(Hg, Hs, out_path): plt.figure(figsize=(6,4)) plt.plot(Hg, label="Global range entropy") plt.plot(Hs, label="Slab entropy") plt.xlabel("Iteration"); plt.ylabel("Entropy") plt.title("Entropy drops during CHR compression") plt.legend() plt.tight_layout() plt.savefig(out_path, dpi=150) plt.close() def plot_constellation_map(z, U, labels, out_path): if z.shape[1] > 2: pca = PCA(n_components=2, random_state=0) Z2 = pca.fit_transform(z) U2 = pca.transform(U) else: Z2, U2 = z, U plt.figure(figsize=(6,5)) plt.scatter(Z2[:,0], Z2[:,1], s=14, alpha=0.8, c=labels) plt.scatter(U2[:,0], U2[:,1], marker="*", s=200) plt.title("Constellation map (compressed geometry)") plt.xlabel("PC1"); plt.ylabel("PC2") plt.tight_layout() plt.savefig(out_path, dpi=150) plt.close() def plot_training_curves(losses, ppls, out_path): plt.figure(figsize=(6,4)) plt.plot(losses, label="Loss") plt.plot(ppls, label="Perplexity") plt.xlabel("Checkpoint") plt.title("Learning on trading card pixels") plt.legend() plt.tight_layout() plt.savefig(out_path, dpi=150) plt.close() def plot_rollout_tracks(seq_bytes: List[int], out_path, title="Rollout (byte tokens)"): plt.figure(figsize=(8,3.6)) plt.plot(seq_bytes, label="Byte value") plt.ylim(-2, 260) plt.xlabel("Step"); plt.title(title) plt.legend() plt.tight_layout() plt.savefig(out_path, dpi=150) plt.close() def plot_before_after_tracks(before_bytes: List[int], after_bytes: List[int], out_path): plt.figure(figsize=(10,4)) plt.subplot(1,2,1) plt.plot(before_bytes, label="Byte value") plt.title("BEFORE (untrained)") plt.ylim(-2, 260) plt.legend() plt.subplot(1,2,2) plt.plot(after_bytes, label="Byte value") plt.title("AFTER (trained)") plt.ylim(-2, 260) plt.legend() plt.suptitle("Rollout comparison (trained on card pixels)") plt.tight_layout() plt.savefig(out_path, dpi=150) plt.close() # ----------------------------- # Training: tiny byte model reads ONLY final PNG pixels (extracts stego payload) # ----------------------------- import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader try: torch.set_num_threads(1) torch.set_num_interop_threads(1) except Exception: pass class StegoPayloadDataset(Dataset): """ Builds next-byte prediction windows from the embedded payload extracted from PNG pixels. """ def __init__(self, card_png_path: str, block_size: int = 96): self.card_png_path = card_png_path self.block_size = int(block_size) rgb = load_image_rgb(card_png_path) payload, info = extract_payload_auto(rgb, max_bits=4) self.info = info self.payload = payload self.bytes = torch.tensor(list(np.frombuffer(payload, dtype=np.uint8)), dtype=torch.long) def __len__(self): return max(0, len(self.bytes) - self.block_size - 1) def __getitem__(self, idx): x = self.bytes[idx:idx+self.block_size] y = self.bytes[idx+1:idx+self.block_size+1] return x, y class TinyByteTransformer(nn.Module): """ Investor demo model: fast on CPU, faster on GPU. """ def __init__(self, vocab_size=256, d_model=96, n_layers=1, n_heads=4, block_size=96): super().__init__() self.tok = nn.Embedding(vocab_size, d_model) self.pos = nn.Embedding(block_size, d_model) enc_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=n_heads, dim_feedforward=4*d_model, dropout=0.1, batch_first=True ) self.tr = nn.TransformerEncoder(enc_layer, num_layers=n_layers) self.lm = nn.Linear(d_model, vocab_size) self.block_size = int(block_size) def forward(self, x): B, T = x.shape pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T) h = self.tok(x) + self.pos(pos) mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool() h = self.tr(h, mask=mask) return self.lm(h) @torch.no_grad() def sample_bytes(model, start: List[int], steps: int, device: str = "cpu", temperature: float = 1.0) -> List[int]: model.eval() seq = start[:] steps = int(steps) for _ in range(steps): x = torch.tensor(seq[-model.block_size:], dtype=torch.long, device=device).unsqueeze(0) logits = model(x)[0, -1] / max(1e-6, float(temperature)) probs = torch.softmax(logits, dim=-1) nxt = int(torch.multinomial(probs, num_samples=1).item()) seq.append(nxt) return seq def train_on_stego_png(card_png_path: str, steps: int = 200, batch_size: int = 32, block_size: int = 96, lr: float = 7e-4, device: str = "cpu", log_every: int = 25): ds = StegoPayloadDataset(card_png_path, block_size=block_size) n_windows = len(ds) if n_windows <= 0: raise RuntimeError(f"Embedded payload too small for block_size={block_size}. Reduce block_size or embed more data.") drop_last = n_windows >= batch_size dl = DataLoader(ds, batch_size=batch_size, shuffle=True, drop_last=drop_last) model = TinyByteTransformer(block_size=block_size).to(device) opt = torch.optim.AdamW(model.parameters(), lr=lr) loss_fn = nn.CrossEntropyLoss() losses, ppls = [], [] steps = int(steps) log_every = max(1, int(log_every)) it = iter(dl) model.train() for step in range(1, steps+1): try: x, y = next(it) except StopIteration: it = iter(dl) x, y = next(it) x, y = x.to(device), y.to(device) logits = model(x) loss = loss_fn(logits.view(-1, 256), y.view(-1)) opt.zero_grad(set_to_none=True) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) opt.step() if step % log_every == 0: l = float(loss.detach().cpu().item()) ppl = float(torch.exp(loss.detach()).cpu().item()) losses.append(l) ppls.append(ppl) return model, ds.info, losses, ppls # ----------------------------- # App state # ----------------------------- STATE = { "units": None, # compression artifacts "Z": None, "U": None, "labels": None, "bins": None, "codec": None, "code_bytes": None, "payload": None, "payload_header": None, # trajectories (for trading card stats) "Hg": None, "Hs": None, "mhep": None, "out_dir": None, "bin_path": None, "codec_path": None, "entropy_plot": None, "map_plot": None, # card build "carrier_path": None, "preview_path": None, "final_card_path": None, "tilt_gif": None, "lsb_preview_path": None, "stego_info": None, # model "model": None, } def _bytes_from_upload(file_obj) -> Tuple[bytes, str]: if file_obj is None: return b"", "" if isinstance(file_obj, str) and os.path.exists(file_obj): return Path(file_obj).read_bytes(), os.path.basename(file_obj) if hasattr(file_obj, "name") and isinstance(file_obj.name, str) and os.path.exists(file_obj.name): return Path(file_obj.name).read_bytes(), os.path.basename(file_obj.name) return b"", "upload" def _path_from_upload(file_obj) -> Optional[str]: if file_obj is None: return None if isinstance(file_obj, str) and os.path.exists(file_obj): return file_obj if hasattr(file_obj, "name") and isinstance(file_obj.name, str) and os.path.exists(file_obj.name): return file_obj.name return None # ----------------------------- # Callbacks # ----------------------------- def load_demo(units_mode: str): raw = (DEMO_CORPUS.strip() + "\n\n") * 80 units = to_units(raw, units_mode) units = [u.strip() for u in units if u.strip()] STATE["units"] = units return f"Loaded **{len(units)}** demo units (built-in corpus)." def ingest_file(file_obj, units_mode: str): try: b, name = _bytes_from_upload(file_obj) if not b: return "Upload a .txt or .docx file to begin." if name.lower().endswith(".docx"): paras = read_docx_bytes(b) raw = "\n\n".join(paras) else: raw = read_txt_bytes(b) units = to_units(raw, units_mode) units = [u.strip() for u in units if u.strip()] if len(units) > 5000: units = units[:5000] STATE["units"] = units return f"Loaded **{len(units)}** units from **{name}**." except Exception as e: return f"Error ingesting file: {e}" def compress_build_payload(K, iters, beta, slab_bins, tau, seed, radial_bins, title_text): """ 1) CHR compress 2) create codes.bin + codec.json (audit only) 3) pack TCAR payload bytes (to embed into carrier image) 4) write visuals (entropy plot + constellation plot) """ try: units = STATE.get("units") if not units: return "No units loaded. Upload or load demo corpus.", None, None, None, None, None Z, backend = embed_texts(units, prefer_sentence_transformer=True) U, p, Hg, Hs = chr_optimize( Z, K=int(K), iters=int(iters), beta=float(beta), bins=int(slab_bins), tau=float(tau), seed=int(seed) ) labels = p.argmax(axis=1).astype(np.int32) proj = Z @ U.T radials = proj[np.arange(len(units)), labels].astype(np.float32) edges = make_radial_bins(radials, B=int(radial_bins)) bins_q = np.array([quantize_radial(float(radials[i]), edges) for i in range(len(units))], dtype=np.int32) code_bytes = pack_codes_to_bytes(labels, bins_q) out_dir = tempfile.mkdtemp() codec = { "backend": backend, "K": int(K), "radial_bins": int(radial_bins), "iters": int(iters), "beta": float(beta), "slab_bins": int(slab_bins), "tau": float(tau), "seed": int(seed), "U": U.tolist(), "radial_edges": edges.tolist(), "units_count": int(len(units)), "bytes_per_unit": 2.0, "total_bytes": int(len(code_bytes) + 8), } bin_path, codec_path = save_codes_and_codec(code_bytes, codec, out_dir) title = str(title_text).strip()[:120] or "CHR Trading Card" payload = pack_trading_card_payload(code_bytes=code_bytes, codec=codec, title=title) payload_len = len(payload) header_tc, _ = unpack_trading_card_payload(payload) mhep = compute_mhep(Hg, Hs, K=int(K), bins=int(slab_bins)) ent_plot = os.path.join(out_dir, "entropy.png") map_plot = os.path.join(out_dir, "map.png") plot_entropy(Hg, Hs, ent_plot) plot_constellation_map(Z, U, labels, map_plot) report = ( f"## Compressed Payload Ready\n" f"- **Embedding backend:** `{backend}`\n" f"- **Units:** **{len(units)}**\n" f"- **Constellations (K):** **{int(K)}**\n" f"- **Radial bins:** **{int(radial_bins)}**\n" f"- **Code bytes (constellation+radial):** **{len(code_bytes)}**\n" f"- **TCAR payload bytes to embed:** **{payload_len}**\n" f"- **MHEP score:** **{mhep:.1f}%**\n" f"\nNext: upload a **carrier card image** and click **Embed into Carrier**." ) STATE.update({ "Z": Z, "U": U, "labels": labels, "bins": bins_q, "codec": codec, "code_bytes": code_bytes, "payload": payload, "payload_header": header_tc, "Hg": Hg, "Hs": Hs, "mhep": float(mhep), "out_dir": out_dir, "bin_path": bin_path, "codec_path": codec_path, "entropy_plot": ent_plot, "map_plot": map_plot, "final_card_path": None, "tilt_gif": None, "lsb_preview_path": None, "stego_info": None, "model": None }) header_json = json.dumps(header_tc, indent=2) return report, ent_plot, map_plot, bin_path, codec_path, header_json except Exception as e: return f"Error: {e}\n\n{traceback.format_exc()}", None, None, None, None, None def make_card_tilt_gif(card_rgb: np.ndarray, out_path: str, frames: int = 20, fps: int = 12): H, W, _ = card_rgb.shape frames = int(max(8, min(frames, 48))) fps = int(max(6, min(fps, 24))) imgs = [] for t in range(frames): a = (t / frames) * 2*np.pi dx = int(2 + 3*np.sin(a)) dy = int(2 + 3*np.cos(a)) img = card_rgb.copy().astype(np.int16) yy, xx = np.mgrid[0:H, 0:W] grad = (0.90 + 0.10*np.sin(a + (xx / max(1, W-1))*2*np.pi)).astype(np.float32) r = np.roll(img[:, :, 0], shift=dx, axis=1) g = np.roll(img[:, :, 1], shift=dy, axis=0) b = img[:, :, 2] img[:, :, 0] = (r * grad).astype(np.int16) img[:, :, 1] = (g * grad).astype(np.int16) img[:, :, 2] = (b * grad).astype(np.int16) img = np.clip(img, 0, 255).astype(np.uint8) imgs.append(img) imageio.mimsave(out_path, imgs, fps=fps) def embed_into_carrier(carrier_img, holo_alpha, requested_bits, title_text): """ Upload carrier image -> apply holo overlay (visible) -> FULL-BLEED STATS overlay (visible) -> embed TCAR payload into pixels (invisible) -> save final PNG Also creates: tilt GIF + LSB preview image. """ try: payload = STATE.get("payload") out_dir = STATE.get("out_dir") header_tc = STATE.get("payload_header") if payload is None or out_dir is None or header_tc is None: return "No payload found. Run compression first.", None, None, None, None, None cpath = _path_from_upload(carrier_img) if not cpath: return "Please upload a carrier image (PNG/JPG).", None, None, None, None, None carrier_rgb = load_image_rgb(cpath) # visible holo holo = make_holo_texture(seed=int(header_tc["codec"]["seed"]), W=carrier_rgb.shape[1], H=carrier_rgb.shape[0]) preview_rgb = blend_holo(carrier_rgb, holo, alpha=float(holo_alpha)) # ----- FULL-BLEED UI replaces add_card_text entirely ----- title = str(title_text).strip()[:80] or str(header_tc.get("title", "CHR Trading Card"))[:80] Hg = STATE.get("Hg") Hs = STATE.get("Hs") mhep = float(STATE.get("mhep") or 0.0) code_len = len(STATE.get("code_bytes") or b"") backend = str(header_tc.get("codec", {}).get("backend", "") or "") bshort = short_backend(backend) rarity = compute_rarity(mhep) card_type = f"CHR/{bshort}" req_bits = int(requested_bits) stats = compute_card_stats( header_tc=header_tc, mhep=mhep, Hg_traj=Hg, Hs_traj=Hs, payload_len=len(payload), code_bytes_len=code_len, bits_per_channel=(None if req_bits == 0 else int(np.clip(req_bits, 1, 4))), img_shape=preview_rgb.shape ) subtitle_left = f"K={header_tc['codec']['K']} RAD={header_tc['codec']['radial_bins']} SEED={header_tc['codec']['seed']}" subtitle_right = f"Units={header_tc.get('units_count')} Bytes={len(payload)} CRC32={int(header_tc['crc32']):08x}" preview_rgb = draw_full_bleed_stats( preview_rgb, title=title, rarity=rarity, card_type=card_type, stats=stats, subtitle_left=subtitle_left, subtitle_right=subtitle_right, holo_alpha=float(holo_alpha), radius=28, seed=int(header_tc["codec"]["seed"]), ) preview_path = os.path.join(out_dir, "carrier_preview.png") save_png(preview_rgb, preview_path) # Decide bits per channel + possible resize to fit capacity req_bits = int(requested_bits) if req_bits == 0: bpc, embed_rgb = choose_embed_params(preview_rgb, payload_len=len(payload)) else: bpc = int(np.clip(req_bits, 1, 4)) cap = _capacity_bytes(preview_rgb, bpc) if (len(payload) + 14) > cap: bpc, embed_rgb = choose_embed_params(preview_rgb, payload_len=len(payload)) else: embed_rgb = preview_rgb final_rgb = embed_payload_lsb_rgb(embed_rgb, payload=payload, bits_per_channel=bpc) final_path = os.path.join(out_dir, "final_trading_card.png") save_png(final_rgb, final_path) # Verify by extracting from saved PNG pixels only re_rgb = load_image_rgb(final_path) extracted, stego_info = extract_payload_auto(re_rgb, max_bits=4) header2, code2 = unpack_trading_card_payload(extracted) ok_crc = (_crc32_u32(code2) == int(header2["crc32"])) ok_sha = (_sha256_hex(code2) == str(header2["sha256"])) verified = bool(stego_info.get("verified")) and ok_crc and ok_sha lsb_img = lsb_preview(re_rgb, bits=int(stego_info["bits_per_channel"])) lsb_path = os.path.join(out_dir, "lsb_preview.png") save_png(lsb_img, lsb_path) tilt_path = os.path.join(out_dir, "card_tilt.gif") make_card_tilt_gif(re_rgb, tilt_path, frames=20, fps=12) STATE.update({ "carrier_path": cpath, "preview_path": preview_path, "final_card_path": final_path, "tilt_gif": tilt_path, "lsb_preview_path": lsb_path, "stego_info": stego_info, "model": None }) report = ( f"## Carrier Embedded ✅\n" f"- **Final card:** `final_trading_card.png`\n" f"- **Stego bits/channel (auto-detected):** **{int(stego_info['bits_per_channel'])}**\n" f"- **Embedded payload bytes:** **{int(stego_info['payload_len'])}**\n" f"- **Stego CRC32:** `{stego_info['payload_crc32']}`\n" f"- **TCAR integrity:** CRC32={str(ok_crc)} SHA256={str(ok_sha)}\n" f"- **Verified:** {'✅ YES' if verified else '❌ NO'}\n" f"\nNext: go to **Train** and train from the **final PNG pixels only**." ) return report, preview_path, final_path, tilt_path, lsb_path, json.dumps({"stego": stego_info, "tcar_verified": verified}, indent=2) except Exception as e: return f"Embed error: {e}\n\n{traceback.format_exc()}", None, None, None, None, None def train_from_final_card(train_steps, batch_size, block_size, lr, log_every, temperature, rollout_steps, make_gif, gif_stride, gif_fps, gif_max_frames): """ Train on bytes extracted from final PNG pixels (stego payload). """ try: final_path = STATE.get("final_card_path") out_dir = STATE.get("out_dir") if not final_path or not os.path.exists(final_path): return "No final trading card found. Embed into a carrier first.", None, None, None, None device = "cuda" if torch.cuda.is_available() else "cpu" rgb = load_image_rgb(final_path) payload, stego_info = extract_payload_auto(rgb, max_bits=4) L = len(payload) user_block = int(block_size) user_bs = int(batch_size) tuned_block = min(user_block, max(48, L // 10)) tuned_block = min(tuned_block, max(48, L - 2)) block_size = int(tuned_block) n_windows = max(0, L - block_size - 1) tuned_bs = min(user_bs, max(8, n_windows // 4)) if n_windows > 0 else 1 batch_size = int(max(1, tuned_bs)) start = list(np.frombuffer(payload[:block_size], dtype=np.uint8).tolist()) untrained = TinyByteTransformer(block_size=block_size).to(device) before_seq = sample_bytes(untrained, start=start, steps=int(rollout_steps), device=device, temperature=float(temperature)) before_plot = os.path.join(out_dir, "rollout_before.png") plot_rollout_tracks(before_seq[-int(rollout_steps):], before_plot, title="BEFORE training (random)") model, ds_info, losses, ppls = train_on_stego_png( card_png_path=final_path, steps=int(train_steps), batch_size=batch_size, block_size=block_size, lr=float(lr), device=device, log_every=int(log_every), ) STATE["model"] = model train_plot = os.path.join(out_dir, "training.png") plot_training_curves(losses, ppls, train_plot) after_seq = sample_bytes(model, start=start, steps=int(rollout_steps), device=device, temperature=float(temperature)) after_plot = os.path.join(out_dir, "rollout_after.png") plot_rollout_tracks(after_seq[-int(rollout_steps):], after_plot, title="AFTER training (trained)") compare_plot = os.path.join(out_dir, "rollout_compare.png") plot_before_after_tracks(before_seq[-int(rollout_steps):], after_seq[-int(rollout_steps):], compare_plot) gif_path = None if bool(make_gif): gif_path = os.path.join(out_dir, "rollout.gif") seq = after_seq[-int(rollout_steps):] stride = max(1, int(gif_stride)) fps = max(6, int(gif_fps)) max_frames = max(12, int(gif_max_frames)) frames = [] count = 0 for t in range(10, len(seq), stride): fig = plt.figure(figsize=(7,3.6)) plt.plot(seq[:t], linewidth=2) plt.ylim(-2, 260) plt.title("AFTER training — rollout from stego pixels") plt.xlabel("Step"); plt.ylabel("Byte value") plt.tight_layout() buf = io.BytesIO() plt.savefig(buf, format="png", dpi=140) plt.close(fig) buf.seek(0) frames.append(imageio.imread(buf)) count += 1 if count >= max_frames: break imageio.mimsave(gif_path, frames, fps=fps) report = ( f"## Training Complete (PNG-only)\n" f"- **Device:** `{device}`\n" f"- **Stego extracted:** ✅ (bits/channel={int(ds_info['bits_per_channel'])}, bytes={int(ds_info['payload_len'])})\n" f"- **Auto block_size:** **{block_size}** (requested {user_block})\n" f"- **Auto batch_size:** **{batch_size}** (requested {user_bs})\n" f"- **Steps:** **{int(train_steps)}** (logged every {int(log_every)})\n" f"- **Final logged loss:** **{losses[-1]:.4f}**\n" f"- **Final logged perplexity:** **{ppls[-1]:.2f}**\n" f"\n### What investors should notice\n" f"Perplexity falls while training from **a single image** containing hidden dataset bytes." ) metrics = {"loss": losses, "ppl": ppls, "stego": ds_info} return report, train_plot, compare_plot, gif_path, json.dumps(metrics, indent=2) except Exception as e: return f"Training error: {e}\n\n{traceback.format_exc()}", None, None, None, None # ----------------------------- # Gradio UI # ----------------------------- INTRO = """ # Data Playing Card Trainer (Investor Demo) **Pipeline:** 1) Compress dataset → **constellation/radial codes** 2) Pack into **TCAR payload** 3) Upload a **real “playing card” carrier image** 4) Apply **holo shimmer** + **full-bleed trading card stats** (visible) + embed payload into **PNG pixels** (invisible) 5) Train a tiny model using **only the final PNG pixels** """ with gr.Blocks(title="Data Playing Card Trainer (CHR + Stego)") as demo: gr.Markdown(INTRO) with gr.Tab("1) Ingest"): with gr.Row(): file_in = gr.File(label="Upload .txt or .docx", file_types=[".txt", ".docx"]) units_mode = gr.Radio(["paragraphs", "sentences"], value="sentences", label="Unit granularity") with gr.Row(): ingest_btn = gr.Button("Load file", variant="primary") demo_btn = gr.Button("Load built-in demo corpus", variant="secondary") ingest_status = gr.Markdown("") ingest_btn.click(ingest_file, inputs=[file_in, units_mode], outputs=[ingest_status]) demo_btn.click(load_demo, inputs=[units_mode], outputs=[ingest_status]) with gr.Tab("2) Compress → Payload"): with gr.Row(): K = gr.Slider(2, 48, value=16, step=1, label="K (constellations)") iters = gr.Slider(5, 120, value=35, step=1, label="CHR iterations") beta = gr.Slider(2, 30, value=16, step=1, label="beta (assignment sharpness)") with gr.Row(): slab_bins = gr.Slider(3, 16, value=8, step=1, label="slab bins (entropy measure)") tau = gr.Slider(1, 20, value=5, step=1, label="tau (slab softness)") radial_bins = gr.Slider(8, 256, value=64, step=8, label="radial bins (compression alphabet)") seed = gr.Slider(0, 9999, value=42, step=1, label="seed") title_text = gr.Textbox(value="DATA PLAYING CARD", label="Card title") compress_btn = gr.Button("Build Payload + Visuals", variant="primary") compress_report = gr.Markdown("") with gr.Row(): ent_img = gr.Image(label="Entropy during compression", type="filepath") map_img = gr.Image(label="Constellation map (PCA)", type="filepath") with gr.Row(): codes_bin = gr.File(label="codes.bin (audit only)") codec_json = gr.File(label="codec.json (audit only)") payload_header = gr.Code(label="TCAR header (inside payload)", language="json") compress_btn.click( compress_build_payload, inputs=[K, iters, beta, slab_bins, tau, seed, radial_bins, title_text], outputs=[compress_report, ent_img, map_img, codes_bin, codec_json, payload_header] ) with gr.Tab("3) Upload Carrier → Embed"): gr.Markdown( "Upload a **carrier image** (your sci-fi playing card art). " "We’ll apply a visible holo shimmer + **full-bleed card stats UI**, then **embed the dataset payload invisibly into PNG pixels**." ) with gr.Row(): carrier_img = gr.File(label="Carrier image (PNG/JPG)", file_types=[".png", ".jpg", ".jpeg"]) holo_alpha = gr.Slider(0.0, 0.8, value=0.35, step=0.01, label="Holo overlay strength") with gr.Row(): requested_bits = gr.Slider(0, 4, value=0, step=1, label="bits/channel (0 = AUTO)") title_text2 = gr.Textbox(value="DATA PLAYING CARD", label="Card title (full-bleed)") embed_btn = gr.Button("Embed Payload into Carrier (creates final PNG)", variant="primary") embed_report = gr.Markdown("") with gr.Row(): preview_img = gr.Image(label="Preview (visible holo + full-bleed stats)", type="filepath") final_img = gr.Image(label="Final Trading Card PNG (contains hidden data)", type="filepath") with gr.Row(): tilt_gif = gr.Image(label="Holo tilt (GIF)", type="filepath") lsb_img = gr.Image(label="LSB preview (shows hidden structure)", type="filepath") stego_info = gr.Code(label="Embed/Verify info", language="json") embed_btn.click( embed_into_carrier, inputs=[carrier_img, holo_alpha, requested_bits, title_text2], outputs=[embed_report, preview_img, final_img, tilt_gif, lsb_img, stego_info] ) with gr.Tab("4) Train from Final Card"): gr.Markdown( "Training extracts the embedded payload **from pixels only** (auto-detects bits/channel), " "then trains a tiny byte model. Defaults are tuned to feel fast to an investor." ) with gr.Row(): train_steps = gr.Slider(50, 1200, value=200, step=50, label="training steps (fast demo default)") batch_size = gr.Slider(4, 128, value=32, step=4, label="batch size") block_size = gr.Slider(48, 256, value=96, step=16, label="sequence length (bytes)") with gr.Row(): lr = gr.Number(value=7e-4, label="learning rate") log_every = gr.Slider(10, 200, value=25, step=5, label="log every (steps)") temperature = gr.Slider(0.5, 2.0, value=1.0, step=0.05, label="rollout temperature") rollout_steps = gr.Slider(40, 400, value=120, step=20, label="rollout steps (bytes)") with gr.Row(): make_gif = gr.Checkbox(value=False, label="Generate rollout GIF (adds time)") gif_stride = gr.Slider(1, 12, value=5, step=1, label="GIF stride (higher = faster)") gif_fps = gr.Slider(6, 24, value=12, step=1, label="GIF FPS") gif_max_frames = gr.Slider(12, 120, value=40, step=4, label="GIF max frames (cap)") train_btn = gr.Button("Train from PNG pixels + generate visuals", variant="primary") train_report = gr.Markdown("") with gr.Row(): train_img = gr.Image(label="Loss + perplexity", type="filepath") compare_img = gr.Image(label="BEFORE vs AFTER rollout", type="filepath") gif_out = gr.Image(label="Rollout GIF (optional)", type="filepath") metrics_json = gr.Code(label="Metrics (JSON)", language="json") train_btn.click( train_from_final_card, inputs=[train_steps, batch_size, block_size, lr, log_every, temperature, rollout_steps, make_gif, gif_stride, gif_fps, gif_max_frames], outputs=[train_report, train_img, compare_img, gif_out, metrics_json] ) if __name__ == "__main__": demo.launch(ssr_mode=False)