import os
import re
import json
import time
import torch
import torch.nn as nn
import gradio as gr
from pathlib import Path
from collections import deque
# ─────────────────────────────────────────
# 🔐 Admin password
# ─────────────────────────────────────────
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "linny-admin")
# ─────────────────────────────────────────
# 📁 Paths
# ─────────────────────────────────────────
SPACE_ROOT = Path(__file__).parent
CONFIG_PATH = SPACE_ROOT / "config.json"
_pt_files = sorted(SPACE_ROOT.glob("*.pt"))
MODEL_PATH = _pt_files[0] if _pt_files else SPACE_ROOT / "default_model.pt"
_tok_files = [f for f in SPACE_ROOT.glob("*.json") if f.name != "config.json"]
TOKENIZER_PATH = _tok_files[0] if _tok_files else None
# ─────────────────────────────────────────
# 🗃️ Config
# ─────────────────────────────────────────
DEFAULT_CONFIG = {
"model_type": "char",
"hidden_layers": 5,
"neurons": 768,
"embed_size": 384,
"dropout": 0.2,
"vocab_size": 20000,
"working_memory": 2048,
"user_tag": "### Instruction:",
"bot_tag": "### Response:",
"eos_token": "<|end|>",
"system_prompt": "You are a helpful and intelligent AI assistant named Linny.",
"default_temp": 0.8,
"default_penalty": 1.05,
"default_penalty_window": 110,
"default_top_p": 0.4,
"default_top_k": 65,
"default_max_len": 2600,
"reasoning_mode": "response_prefix",
"reasoning_start": False,
"min_response_tokens": 3,
"max_reasoning_tokens": 2500,
}
def load_config() -> dict:
if CONFIG_PATH.exists():
with open(CONFIG_PATH) as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
return DEFAULT_CONFIG.copy()
def save_config(cfg: dict):
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
# ─────────────────────────────────────────
# 🧠 Model Architectures (unchanged)
# ─────────────────────────────────────────
class LSTMCharLM(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2):
super().__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden=None):
out, hidden = self.lstm(self.embed(x), hidden)
return self.fc(out), hidden
class LSTMTokenLM(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2):
super().__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden=None):
out, hidden = self.lstm(self.embed(x), hidden)
return self.fc(out), hidden
# ─────────────────────────────────────────
# 🔤 GPT-2 byte decoder
# ─────────────────────────────────────────
def _build_byte_decoder():
bs = (list(range(ord('!'), ord('~')+1)) +
list(range(ord('¡'), ord('¬')+1)) +
list(range(ord('®'), ord('ÿ')+1)))
cs = bs[:]
n = 0
for b in range(256):
if b not in bs:
bs.append(b)
cs.append(256+n)
n += 1
return {chr(c): b for b, c in zip(bs, cs)}
_BYTE_DECODER = _build_byte_decoder()
def _tok_to_bytes(tok_str):
try:
return bytes([_BYTE_DECODER[c] for c in tok_str])
except KeyError:
return tok_str.encode('utf-8', errors='replace')
# ─────────────────────────────────────────
# ⚙️ Model Loader (identical generation to local)
# ─────────────────────────────────────────
class LinnyModel:
def __init__(self, pt_path, config: dict, tokenizer_path=None):
self.config = config
self.model_type = config.get("model_type", "char")
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ckpt = torch.load(pt_path, map_location=self.device, weights_only=False)
if self.model_type == "token":
from tokenizers import Tokenizer as HFTokenizer
tok_path = tokenizer_path or config.get("tokenizer_path")
if not tok_path or not Path(str(tok_path)).exists():
raise FileNotFoundError(f"Tokenizer not found: {tok_path}")
self.tokenizer = HFTokenizer.from_file(str(tok_path))
vocab_size = self.tokenizer.get_vocab_size()
self.chars = None; self.stoi = None; self.itos = None
arch = ckpt.get('config', {})
layers = arch.get('hidden_layers', config['hidden_layers'])
neurons = arch.get('neurons', config['neurons'])
embed = arch.get('embed_size', config['embed_size'])
dropout = arch.get('dropout', config.get('dropout', 0.2))
self.model = LSTMTokenLM(vocab_size, embed, neurons, layers, dropout).to(self.device)
self.model.load_state_dict(ckpt['model_state'])
else:
self.tokenizer = None
self.chars = ckpt["chars"]
self.stoi = {ch: i for i, ch in enumerate(self.chars)}
self.itos = {i: ch for i, ch in enumerate(self.chars)}
dropout = ckpt.get("config", {}).get("dropout", config.get("dropout", 0.2))
self.model = LSTMCharLM(len(self.chars), config["embed_size"],
config["neurons"], config["hidden_layers"], dropout).to(self.device)
self.model.load_state_dict(ckpt["model_state"])
self.model.eval()
self.epoch = ckpt.get('epoch', '?')
# ------------------------------------------------------------------
# Exact copy of local server's generate_stream logic
# ------------------------------------------------------------------
def stream_generate(self, prompt, temperature=0.8, max_len=2600,
penalty=1.05, penalty_window=110,
top_p=0.4, top_k=65, force_thinking=False,
prefix_text="", penalize_prefix=False,
min_response_tokens=3, max_reasoning_tokens=2500):
"""
Matches local linny_server.py generate_stream exactly.
"""
cfg = self.config
user_tag = cfg.get("user_tag", "### Instruction:")
bot_tag = cfg.get("bot_tag", "### Response:")
r_mode = cfg.get("reasoning_mode", "response_prefix")
eos_token_str = cfg.get("eos_token", "<|end|>")
# Apply prompt_suffix mode
actual_prompt = prompt
if force_thinking and r_mode == "prompt_suffix":
if not prompt.strip().endswith("/think"):
actual_prompt = prompt.strip() + " /think"
formatted = f"{user_tag}\n{actual_prompt}\n\n{bot_tag}\n"
# Working memory cap
working_memory = cfg.get("working_memory", 0)
if working_memory > 0:
max_len = max(50, min(max_len, working_memory - len(formatted)))
hidden = None
generated = ""
recent_tokens = deque(maxlen=penalty_window)
# State tracking (exactly as local)
in_reasoning = False
think_closed = False
awaiting_response = False
response_token_count = 0
reasoning_toks = 0
with torch.no_grad():
# Encode the conversation prefix
if self.model_type == "token":
ids = self.tokenizer.encode(formatted).ids
else:
ids = [self.stoi.get(c, 0) for c in formatted]
t = torch.tensor([ids], dtype=torch.long, device=self.device)
_, hidden = self.model(t, hidden)
input_token = torch.tensor([[ids[-1]]], dtype=torch.long, device=self.device)
# If we have existing assistant response (continue mode)
if prefix_text:
if self.model_type == "token":
prefix_ids = self.tokenizer.encode(prefix_text).ids
else:
prefix_ids = [self.stoi.get(c, 0) for c in prefix_text]
if prefix_ids:
pt = torch.tensor([prefix_ids], dtype=torch.long, device=self.device)
_, hidden = self.model(pt, hidden)
generated = prefix_text
input_token = torch.tensor([[prefix_ids[-1]]], dtype=torch.long, device=self.device)
if penalize_prefix:
recent_tokens.extend(prefix_ids)
# Update state based on prefix
if "" in prefix_text and "" not in prefix_text:
in_reasoning = True
elif "" in prefix_text:
think_closed = True
awaiting_response = True
# Prefill if in response_prefix mode and no prefix
if not prefix_text and force_thinking and r_mode == "response_prefix":
if self.model_type == "token":
think_id = self.tokenizer.token_to_id("")
if think_id is not None:
tt = torch.tensor([[think_id]], dtype=torch.long, device=self.device)
_, hidden = self.model(tt, hidden)
input_token = tt
generated = ""
yield ""
in_reasoning = True
else:
# Char mode: prime hidden with ""
think_ids = [self.stoi.get(ch, 0) for ch in ""]
tt = torch.tensor([think_ids], dtype=torch.long, device=self.device)
_, hidden = self.model(tt, hidden)
input_token = torch.tensor([[think_ids[-1]]], dtype=torch.long, device=self.device)
generated = ""
for ch in "":
yield ch
in_reasoning = True
if cfg.get("reasoning_start", False) and not prefix_text:
prefix = f"I need to think about this. The user said '{prompt}'"
if self.model_type == "token":
pids = self.tokenizer.encode(prefix).ids
pt = torch.tensor([pids], dtype=torch.long, device=self.device)
_, hidden = self.model(pt, hidden)
input_token = torch.tensor([[pids[-1]]], dtype=torch.long, device=self.device)
generated += prefix
yield prefix
else:
for ch in prefix:
idx = self.stoi.get(ch, 0)
it = torch.tensor([[idx]], dtype=torch.long, device=self.device)
_, hidden = self.model(it, hidden)
input_token = it
generated += ch
yield ch
# Get special token IDs
eos_id = None
think_open_id = None
think_close_id = None
if self.model_type == "token":
eos_id = self.tokenizer.token_to_id(eos_token_str)
think_open_id = self.tokenizer.token_to_id("")
think_close_id = self.tokenizer.token_to_id("")
# ------------------------------------------------------------------
# Token generation (exactly as local)
# ------------------------------------------------------------------
if self.model_type == "token":
byte_buf = b""
for step in range(max_len):
logits, hidden = self.model(input_token, hidden)
lf = logits[0, -1].float() / max(temperature, 1e-8)
# Repetition penalty
if penalty != 1.0 and len(recent_tokens) > 0:
penalized_ids = set(recent_tokens)
for token_id in penalized_ids:
if token_id < lf.size(0):
lf[token_id] /= penalty
# Top-K
if top_k > 0:
tv, _ = torch.topk(lf, min(top_k, lf.size(-1)))
lf[lf < tv[-1]] = float("-inf")
# Top-P
if top_p < 1.0:
sl, si = torch.sort(lf, descending=True)
cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1)
rm = cp > top_p
rm[..., 1:] = rm[..., :-1].clone()
rm[..., 0] = False
lf[si[rm]] = float("-inf")
nxt = torch.multinomial(torch.softmax(lf, dim=-1), 1).item()
# EOS handling with forced minimum response
if nxt == eos_id:
if awaiting_response and response_token_count < min_response_tokens:
continue
else:
break
recent_tokens.append(nxt)
# Update reasoning state
if think_open_id is not None and nxt == think_open_id:
in_reasoning = True
reasoning_toks = 0
if think_close_id is not None and nxt == think_close_id:
in_reasoning = False
think_closed = True
awaiting_response = True
response_token_count = 0
if in_reasoning and not think_closed:
reasoning_toks += 1
if max_reasoning_tokens and reasoning_toks >= max_reasoning_tokens:
# Force close think
if byte_buf:
decoded = byte_buf.decode('utf-8', errors='replace')
generated += decoded
yield decoded
byte_buf = b""
yield ""
in_reasoning = False
think_closed = True
awaiting_response = True
response_token_count = 0
ct = torch.tensor([[think_close_id]], dtype=torch.long, device=self.device)
_, hidden = self.model(ct, hidden)
input_token = ct
recent_tokens.append(think_close_id)
continue
if awaiting_response and nxt != think_close_id:
response_token_count += 1
elif not in_reasoning and not think_closed:
response_token_count += 1
# Output token
tok_str = self.tokenizer.id_to_token(nxt) or ""
byte_buf += _tok_to_bytes(tok_str)
try:
decoded = byte_buf.decode('utf-8')
generated += decoded
yield decoded
byte_buf = b""
except UnicodeDecodeError:
pass
input_token = torch.tensor([[nxt]], dtype=torch.long, device=self.device)
if byte_buf:
leftover = byte_buf.decode('utf-8', errors='replace')
generated += leftover
yield leftover
# ------------------------------------------------------------------
# Character generation (exactly as local)
# ------------------------------------------------------------------
else:
for step in range(max_len):
logits, hidden = self.model(input_token, hidden)
lf = logits[0, -1].float() / max(temperature, 1e-8)
# Repetition penalty on characters
if penalty != 1.0 and len(recent_tokens) > 0:
# recent_tokens stores characters (strings)
penalized_chars = set(recent_tokens)
for ch in penalized_chars:
idx = self.stoi.get(ch, 0)
if idx < lf.size(0):
lf[idx] /= penalty
if top_k > 0:
tv, _ = torch.topk(lf, min(top_k, lf.size(-1)))
lf[lf < tv[-1]] = float("-inf")
if top_p < 1.0:
sl, si = torch.sort(lf, descending=True)
cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1)
rm = cp > top_p
rm[..., 1:] = rm[..., :-1].clone()
rm[..., 0] = False
lf[si[rm]] = float("-inf")
idx = torch.multinomial(torch.softmax(lf, dim=-1), 1).item()
char = self.itos[idx]
recent_tokens.append(char)
input_token = torch.tensor([[idx]], dtype=torch.long, device=self.device)
if char == "#" and generated.endswith("##"):
break
generated += char
yield char
# ─────────────────────────────────────────
# 🧰 Helpers (unchanged from original HF)
# ─────────────────────────────────────────
def parse_think_tags(text: str):
if "" not in text:
return None, text, False
before, rest = text.split("", 1)
if "" in rest:
inner, after = rest.split("", 1)
return inner.strip(), (before + after).strip(), True
return rest.strip(), before.strip(), False
def extract_current_topic(thinking_text: str) -> str:
if not thinking_text:
return "Reasoning..."
matches = re.findall(r'\*\*([^*]+)\*\*', thinking_text)
if matches:
return f"Reasoning: {matches[-1].strip()}"
return "Reasoning..."
def format_message(visible: str, thinking: str | None,
think_complete: bool, think_elapsed: float | None,
current_topic: str = "Reasoning...") -> str:
if not thinking:
return visible
if think_complete and think_elapsed is not None:
summary = f"💭 Thought for {think_elapsed:.1f}s"
open_attr = ""
else:
summary = current_topic
open_attr = ""
think_block = (f""
f"{summary}
"
f"{thinking}
"
f" ")
if visible.strip():
return think_block + "\n\n" + visible
return think_block
def extract_html_canvas(text: str):
pattern = r"```html\s*\n([\s\S]*?)```"
match = re.search(pattern, text, re.IGNORECASE)
if match:
html_code = match.group(1)
cleaned_text = text[:match.start()] + text[match.end():]
return html_code.strip(), cleaned_text.strip()
return None, text
def make_canvas_html(code: str) -> str:
escaped = code.replace('"', """)
return (f""
f"
🖼️ HTML Canvas
"
f'
'
f"
")
# ─────────────────────────────────────────
# 🚀 Auto-load
# ─────────────────────────────────────────
_startup_model = None
_startup_cfg = None
_startup_msg = "⚠️ No model found. Place your `.pt` in the Space root."
if MODEL_PATH.exists():
try:
_startup_cfg = load_config()
_startup_model = LinnyModel(MODEL_PATH, _startup_cfg, tokenizer_path=TOKENIZER_PATH)
wm = _startup_cfg.get("working_memory", 0)
mtype = _startup_cfg.get("model_type", "char")
epoch = _startup_model.epoch
_startup_msg = (f"✅ Model auto-loaded! ({_startup_cfg['hidden_layers']}L × {_startup_cfg['neurons']}N, "
f"epoch {epoch}, {mtype}" + (f", {wm} ctx)" if wm else ")"))
print(_startup_msg)
except Exception as e:
_startup_msg = f"❌ Auto-load failed: {e}"
print(_startup_msg)
# ─────────────────────────────────────────
# 🎨 CSS (unchanged)
# ─────────────────────────────────────────
CSS = """
html, body { height: 100%; margin: 0; }
.gradio-container { max-width: 100% !important; padding: 0 !important; height: 100vh; display: flex; flex-direction: column; }
footer { display: none !important; }
.app-header { text-align: center; padding: 18px 0 10px; background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%); border-bottom: 1px solid #2d2d4e; }
.app-header h1 { font-size: 2em; margin: 0; background: linear-gradient(90deg, #a78bfa, #818cf8); -webkit-background-clip: text; -webkit-text-fill-color: transparent; }
.app-header p { color: #666; margin: 4px 0 0; font-size: 0.85em; }
#chatbox { flex: 1; min-height: 0; }
.gradio-chatbot { height: calc(100vh - 260px) !important; border-radius: 12px !important; background: #0f0f1a !important; border: 1px solid #2d2d4e !important; }
.input-row { padding: 10px 16px; background: #0f0f1a; border-top: 1px solid #2d2d4e; }
.think-details { margin: 0 0 10px 0; border-left: 3px solid #7c3aed; border-radius: 0 8px 8px 0; background: #13111c; overflow: hidden; }
.think-summary { cursor: pointer; padding: 8px 12px; color: #a78bfa; font-size: 0.82em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.8px; list-style: none; user-select: none; }
.think-summary::-webkit-details-marker { display: none; }
.think-summary::before { content: "▶ "; font-size: 0.7em; }
details[open] .think-summary::before { content: "▼ "; }
.think-content { padding: 10px 14px; color: #c4b5fd; font-style: italic; font-size: 0.87em; line-height: 1.6; border-top: 1px solid #2d1f5e; white-space: pre-wrap; max-height: 320px; overflow-y: auto; }
.canvas-wrapper { margin: 12px 0; border-radius: 10px; overflow: hidden; border: 1px solid #3d2d6e; background: #0d0d1a; }
.canvas-label { padding: 7px 14px; background: #1e1040; color: #a78bfa; font-size: 0.78em; font-weight: 600; letter-spacing: 0.5px; }
.canvas-frame { width: 100%; min-height: 300px; border: none; display: block; background: white; }
.status-bar { font-size: 0.8em; padding: 4px 0; color: #666; }
.reasoning-row { padding: 4px 0 0 2px; }
.tab-nav { background: #0f0f1a !important; border-bottom: 1px solid #2d2d4e !important; }
"""
# ─────────────────────────────────────────
# 🖥️ UI (identical to original HF but with penalty window)
# ─────────────────────────────────────────
def build_ui():
cfg = load_config()
with gr.Blocks(title="Linny AI", css=CSS) as demo:
session_model = gr.State(_startup_model)
session_cfg = gr.State(_startup_cfg)
gr.HTML("""
""")
with gr.Tabs():
with gr.TabItem("💬 Chat"):
model_status = gr.Markdown(value=_startup_msg, elem_classes=["status-bar"])
chatbot = gr.Chatbot(elem_id="chatbox", label="", render_markdown=True)
canvas_display = gr.HTML(visible=False)
with gr.Row(elem_classes=["input-row"]):
msg_box = gr.Textbox(placeholder="Message Linny…", show_label=False, scale=8, lines=1, autofocus=True)
send_btn = gr.Button("Send ↩", variant="primary", scale=1)
with gr.Row(elem_classes=["reasoning-row"]):
reasoning_toggle = gr.Checkbox(value=False, label="🧠 Force Reasoning (pre-fills token)", scale=1)
with gr.Accordion("⚙️ Generation Settings", open=False):
with gr.Row():
temp_sl = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Temperature")
penalty_sl = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Repetition Penalty")
with gr.Row():
penalty_window_sl = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Penalty Window")
with gr.Row():
topp_sl = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Top-P")
topk_sl = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Top-K (0 = off)")
max_len_sl = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Max Response Length")
with gr.Accordion("📤 Upload Your Own Model (optional)", open=False):
gr.Markdown("Upload a `.pt` file and optionally a tokenizer `.json` for token-based models.")
with gr.Row():
user_pt = gr.File(label="Upload .pt file", file_types=[".pt"])
user_tok = gr.File(label="Upload tokenizer .json (token model only)", file_types=[".json"])
model_type_radio = gr.Radio(choices=["char", "token"], value="char", label="Model Type")
with gr.Row():
u_layers = gr.Number(value=5, label="Hidden Layers", precision=0)
u_neurons = gr.Number(value=768, label="Neurons", precision=0)
u_embed = gr.Number(value=384, label="Embed Size", precision=0)
u_dropout = gr.Number(value=0.2, label="Dropout", precision=2)
u_working_mem = gr.Number(value=2048, label="Working Memory (0=off)", precision=0)
with gr.Row():
u_user_tag = gr.Textbox(value="### Instruction:", label="User Tag")
u_bot_tag = gr.Textbox(value="### Response:", label="Bot Tag")
u_eos = gr.Textbox(value="<|end|>", label="EOS Token")
load_user_btn = gr.Button("🚀 Load My Model", variant="secondary")
user_load_status = gr.Markdown("")
with gr.TabItem("🔒 Admin"):
gr.Markdown("## Admin Panel\nEdit `config.json` settings.")
with gr.Row():
admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter password", scale=3)
admin_login_btn = gr.Button("Login", scale=1)
admin_status = gr.Markdown("")
with gr.Group(visible=False) as admin_panel:
gr.Markdown("### Model Architecture")
model_type_admin = gr.Radio(choices=["char", "token"], value=cfg.get("model_type", "char"), label="Model Type")
with gr.Row():
a_layers = gr.Number(value=cfg.get("hidden_layers", 5), label="Hidden Layers", precision=0)
a_neurons = gr.Number(value=cfg.get("neurons", 768), label="Neurons", precision=0)
a_embed = gr.Number(value=cfg.get("embed_size", 384), label="Embed Size", precision=0)
a_dropout = gr.Number(value=cfg.get("dropout", 0.2), label="Dropout", precision=2)
a_working_mem = gr.Number(value=cfg.get("working_memory", 2048), label="Working Memory (0=off)", precision=0)
gr.Markdown("### Tags & Tokens")
with gr.Row():
a_user_tag = gr.Textbox(value=cfg.get("user_tag", "### Instruction:"), label="User Tag")
a_bot_tag = gr.Textbox(value=cfg.get("bot_tag", "### Response:"), label="Bot Tag")
a_eos = gr.Textbox(value=cfg.get("eos_token", "<|end|>"), label="EOS Token")
a_sys = gr.Textbox(value=cfg.get("system_prompt", "You are a helpful AI named Linny."), label="System Prompt", lines=2)
gr.Markdown("### Reasoning")
a_reasoning_mode = gr.Radio(choices=["prompt_suffix", "response_prefix"], value=cfg.get("reasoning_mode", "response_prefix"), label="Force Reasoning Mode")
a_reasoning_start = gr.Checkbox(value=cfg.get("reasoning_start", False), label="Enable Reasoning Start Prefix")
a_min_response_tokens = gr.Number(value=cfg.get("min_response_tokens", 3), label="Minimum Response Tokens After ", precision=0)
a_max_reasoning_tokens = gr.Number(value=cfg.get("max_reasoning_tokens", 2500), label="Max Reasoning Tokens", precision=0)
gr.Markdown("### Default Generation Settings")
with gr.Row():
a_temp = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Default Temperature")
a_penalty = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Default Penalty")
with gr.Row():
a_penalty_window = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Default Penalty Window")
with gr.Row():
a_topp = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Default Top-P")
a_topk = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Default Top-K")
a_maxlen = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Default Max Length")
save_cfg_btn = gr.Button("💾 Save config.json", variant="primary")
save_status = gr.Markdown("")
# Callbacks
def do_admin_login(pw):
if pw == ADMIN_PASSWORD:
return gr.update(visible=True), "✅ Logged in."
return gr.update(visible=False), "❌ Incorrect password."
admin_login_btn.click(do_admin_login, inputs=[admin_pw], outputs=[admin_panel, admin_status])
def do_save_config(mtype, layers, neurons, embed, dropout, working_mem,
user_tag, bot_tag, eos, sys_prompt,
reasoning_mode, reasoning_start,
min_response_tokens, max_reasoning_tokens,
temp, penalty, penalty_window, top_p, top_k, max_len):
try:
new_cfg = {
"model_type": mtype,
"hidden_layers": int(layers),
"neurons": int(neurons),
"embed_size": int(embed),
"dropout": float(dropout),
"working_memory": int(working_mem),
"user_tag": user_tag,
"bot_tag": bot_tag,
"eos_token": eos,
"system_prompt": sys_prompt,
"reasoning_mode": reasoning_mode,
"reasoning_start": bool(reasoning_start),
"min_response_tokens": int(min_response_tokens),
"max_reasoning_tokens": int(max_reasoning_tokens),
"default_temp": float(temp),
"default_penalty": float(penalty),
"default_penalty_window": int(penalty_window),
"default_top_p": float(top_p),
"default_top_k": int(top_k),
"default_max_len": int(max_len),
}
save_config(new_cfg)
return "✅ config.json saved! Restart the Space to apply changes."
except Exception as e:
return f"❌ Error: {e}"
save_cfg_btn.click(do_save_config,
inputs=[model_type_admin, a_layers, a_neurons, a_embed, a_dropout, a_working_mem,
a_user_tag, a_bot_tag, a_eos, a_sys,
a_reasoning_mode, a_reasoning_start,
a_min_response_tokens, a_max_reasoning_tokens,
a_temp, a_penalty, a_penalty_window, a_topp, a_topk, a_maxlen],
outputs=[save_status])
def load_user_model(pt_file, tok_file, mtype, layers, neurons, embed, dropout,
working_mem, user_tag, bot_tag, eos):
if pt_file is None:
return None, None, "❌ Please upload a .pt file first."
try:
user_cfg = {
"model_type": mtype,
"hidden_layers": int(layers),
"neurons": int(neurons),
"embed_size": int(embed),
"dropout": float(dropout),
"working_memory": int(working_mem),
"user_tag": user_tag,
"bot_tag": bot_tag,
"eos_token": eos,
"reasoning_mode": cfg.get("reasoning_mode", "response_prefix"),
"reasoning_start": cfg.get("reasoning_start", False),
"min_response_tokens": cfg.get("min_response_tokens", 3),
"max_reasoning_tokens": cfg.get("max_reasoning_tokens", 2500),
}
tok_path = tok_file.name if tok_file else None
m = LinnyModel(pt_file.name, user_cfg, tokenizer_path=tok_path)
return m, user_cfg, f"✅ Model loaded! ({user_cfg['hidden_layers']}L × {user_cfg['neurons']}N, {mtype}, epoch {m.epoch})"
except Exception as e:
return None, None, f"❌ Error: {e}"
load_user_btn.click(load_user_model,
inputs=[user_pt, user_tok, model_type_radio,
u_layers, u_neurons, u_embed, u_dropout, u_working_mem,
u_user_tag, u_bot_tag, u_eos],
outputs=[session_model, session_cfg, user_load_status])
def respond(message, history, model, temp, penalty, penalty_window, top_p, top_k, max_len, force_thinking):
if not message.strip():
yield history, "", gr.update(visible=False)
return
if model is None:
yield history + [{"role":"user","content":message},{"role":"assistant","content":"⚠️ No model loaded."}], "", gr.update(visible=False)
return
history = history + [{"role":"user","content":message},{"role":"assistant","content":""}]
full_response = ""
stop = False
think_start = None
think_end = None
try:
for chunk in model.stream_generate(
message,
temperature=float(temp),
max_len=int(max_len),
penalty=float(penalty),
penalty_window=int(penalty_window),
top_p=float(top_p),
top_k=int(top_k),
force_thinking=bool(force_thinking),
min_response_tokens=model.config.get("min_response_tokens", 3),
max_reasoning_tokens=model.config.get("max_reasoning_tokens", 2500),
):
full_response += chunk
# Stop at EOS if present
eos_str = model.config.get("eos_token", "<|end|>")
if eos_str in full_response:
full_response = full_response[:full_response.find(eos_str)]
if full_response.count("") > full_response.count(""):
full_response += ""
else:
stop = True
thinking, visible, think_complete = parse_think_tags(full_response)
if thinking and think_start is None:
think_start = time.time()
if think_complete and think_end is None and think_start is not None:
think_end = time.time()
think_elapsed = (think_end - think_start) if (think_end and think_start) else (time.time() - think_start if think_start else None)
current_topic = extract_current_topic(thinking) if thinking else "Reasoning..."
html_code, visible_clean = extract_html_canvas(visible)
canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "")
history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, think_complete, think_elapsed, current_topic)}
yield history, "", canvas_update
if stop:
break
except Exception as e:
history[-1] = {"role":"assistant","content":f"⚠️ Generation error: {e}"}
yield history, "", gr.update(visible=False)
return
# Final pass
thinking, visible, think_complete = parse_think_tags(full_response)
if think_end is None and think_start is not None:
think_end = time.time()
think_elapsed = (think_end - think_start) if (think_end and think_start) else None
if think_complete and not visible.strip():
visible = "*(no response generated)*"
current_topic = extract_current_topic(thinking) if thinking else "Reasoning..."
html_code, visible_clean = extract_html_canvas(visible)
history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, True, think_elapsed, current_topic)}
canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "")
yield history, "", canvas_update
shared_inputs = [msg_box, chatbot, session_model,
temp_sl, penalty_sl, penalty_window_sl, topp_sl, topk_sl, max_len_sl, reasoning_toggle]
shared_outputs = [chatbot, msg_box, canvas_display]
send_btn.click(respond, inputs=shared_inputs, outputs=shared_outputs)
msg_box.submit(respond, inputs=shared_inputs, outputs=shared_outputs)
return demo
if __name__ == "__main__":
build_ui().launch(ssr_mode=False)