Lucien-shark's picture
Update app.py
6d66684 verified
import os
import re
import json
import time
import torch
import torch.nn as nn
import gradio as gr
from pathlib import Path
from collections import deque
# ─────────────────────────────────────────
# 🔐 Admin password
# ─────────────────────────────────────────
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "linny-admin")
# ─────────────────────────────────────────
# 📁 Paths
# ─────────────────────────────────────────
SPACE_ROOT = Path(__file__).parent
CONFIG_PATH = SPACE_ROOT / "config.json"
_pt_files = sorted(SPACE_ROOT.glob("*.pt"))
MODEL_PATH = _pt_files[0] if _pt_files else SPACE_ROOT / "default_model.pt"
_tok_files = [f for f in SPACE_ROOT.glob("*.json") if f.name != "config.json"]
TOKENIZER_PATH = _tok_files[0] if _tok_files else None
# ─────────────────────────────────────────
# 🗃️ Config
# ─────────────────────────────────────────
DEFAULT_CONFIG = {
"model_type": "char",
"hidden_layers": 5,
"neurons": 768,
"embed_size": 384,
"dropout": 0.2,
"vocab_size": 20000,
"working_memory": 2048,
"user_tag": "### Instruction:",
"bot_tag": "### Response:",
"eos_token": "<|end|>",
"system_prompt": "You are a helpful and intelligent AI assistant named Linny.",
"default_temp": 0.8,
"default_penalty": 1.05,
"default_penalty_window": 110,
"default_top_p": 0.4,
"default_top_k": 65,
"default_max_len": 2600,
"reasoning_mode": "response_prefix",
"reasoning_start": False,
"min_response_tokens": 3,
"max_reasoning_tokens": 2500,
}
def load_config() -> dict:
if CONFIG_PATH.exists():
with open(CONFIG_PATH) as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
return DEFAULT_CONFIG.copy()
def save_config(cfg: dict):
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
# ─────────────────────────────────────────
# 🧠 Model Architectures (unchanged)
# ─────────────────────────────────────────
class LSTMCharLM(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2):
super().__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden=None):
out, hidden = self.lstm(self.embed(x), hidden)
return self.fc(out), hidden
class LSTMTokenLM(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2):
super().__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden=None):
out, hidden = self.lstm(self.embed(x), hidden)
return self.fc(out), hidden
# ─────────────────────────────────────────
# 🔤 GPT-2 byte decoder
# ─────────────────────────────────────────
def _build_byte_decoder():
bs = (list(range(ord('!'), ord('~')+1)) +
list(range(ord('¡'), ord('¬')+1)) +
list(range(ord('®'), ord('ÿ')+1)))
cs = bs[:]
n = 0
for b in range(256):
if b not in bs:
bs.append(b)
cs.append(256+n)
n += 1
return {chr(c): b for b, c in zip(bs, cs)}
_BYTE_DECODER = _build_byte_decoder()
def _tok_to_bytes(tok_str):
try:
return bytes([_BYTE_DECODER[c] for c in tok_str])
except KeyError:
return tok_str.encode('utf-8', errors='replace')
# ─────────────────────────────────────────
# ⚙️ Model Loader (identical generation to local)
# ─────────────────────────────────────────
class LinnyModel:
def __init__(self, pt_path, config: dict, tokenizer_path=None):
self.config = config
self.model_type = config.get("model_type", "char")
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ckpt = torch.load(pt_path, map_location=self.device, weights_only=False)
if self.model_type == "token":
from tokenizers import Tokenizer as HFTokenizer
tok_path = tokenizer_path or config.get("tokenizer_path")
if not tok_path or not Path(str(tok_path)).exists():
raise FileNotFoundError(f"Tokenizer not found: {tok_path}")
self.tokenizer = HFTokenizer.from_file(str(tok_path))
vocab_size = self.tokenizer.get_vocab_size()
self.chars = None; self.stoi = None; self.itos = None
arch = ckpt.get('config', {})
layers = arch.get('hidden_layers', config['hidden_layers'])
neurons = arch.get('neurons', config['neurons'])
embed = arch.get('embed_size', config['embed_size'])
dropout = arch.get('dropout', config.get('dropout', 0.2))
self.model = LSTMTokenLM(vocab_size, embed, neurons, layers, dropout).to(self.device)
self.model.load_state_dict(ckpt['model_state'])
else:
self.tokenizer = None
self.chars = ckpt["chars"]
self.stoi = {ch: i for i, ch in enumerate(self.chars)}
self.itos = {i: ch for i, ch in enumerate(self.chars)}
dropout = ckpt.get("config", {}).get("dropout", config.get("dropout", 0.2))
self.model = LSTMCharLM(len(self.chars), config["embed_size"],
config["neurons"], config["hidden_layers"], dropout).to(self.device)
self.model.load_state_dict(ckpt["model_state"])
self.model.eval()
self.epoch = ckpt.get('epoch', '?')
# ------------------------------------------------------------------
# Exact copy of local server's generate_stream logic
# ------------------------------------------------------------------
def stream_generate(self, prompt, temperature=0.8, max_len=2600,
penalty=1.05, penalty_window=110,
top_p=0.4, top_k=65, force_thinking=False,
prefix_text="", penalize_prefix=False,
min_response_tokens=3, max_reasoning_tokens=2500):
"""
Matches local linny_server.py generate_stream exactly.
"""
cfg = self.config
user_tag = cfg.get("user_tag", "### Instruction:")
bot_tag = cfg.get("bot_tag", "### Response:")
r_mode = cfg.get("reasoning_mode", "response_prefix")
eos_token_str = cfg.get("eos_token", "<|end|>")
# Apply prompt_suffix mode
actual_prompt = prompt
if force_thinking and r_mode == "prompt_suffix":
if not prompt.strip().endswith("/think"):
actual_prompt = prompt.strip() + " /think"
formatted = f"{user_tag}\n{actual_prompt}\n\n{bot_tag}\n"
# Working memory cap
working_memory = cfg.get("working_memory", 0)
if working_memory > 0:
max_len = max(50, min(max_len, working_memory - len(formatted)))
hidden = None
generated = ""
recent_tokens = deque(maxlen=penalty_window)
# State tracking (exactly as local)
in_reasoning = False
think_closed = False
awaiting_response = False
response_token_count = 0
reasoning_toks = 0
with torch.no_grad():
# Encode the conversation prefix
if self.model_type == "token":
ids = self.tokenizer.encode(formatted).ids
else:
ids = [self.stoi.get(c, 0) for c in formatted]
t = torch.tensor([ids], dtype=torch.long, device=self.device)
_, hidden = self.model(t, hidden)
input_token = torch.tensor([[ids[-1]]], dtype=torch.long, device=self.device)
# If we have existing assistant response (continue mode)
if prefix_text:
if self.model_type == "token":
prefix_ids = self.tokenizer.encode(prefix_text).ids
else:
prefix_ids = [self.stoi.get(c, 0) for c in prefix_text]
if prefix_ids:
pt = torch.tensor([prefix_ids], dtype=torch.long, device=self.device)
_, hidden = self.model(pt, hidden)
generated = prefix_text
input_token = torch.tensor([[prefix_ids[-1]]], dtype=torch.long, device=self.device)
if penalize_prefix:
recent_tokens.extend(prefix_ids)
# Update state based on prefix
if "<think>" in prefix_text and "</think>" not in prefix_text:
in_reasoning = True
elif "</think>" in prefix_text:
think_closed = True
awaiting_response = True
# Prefill <think> if in response_prefix mode and no prefix
if not prefix_text and force_thinking and r_mode == "response_prefix":
if self.model_type == "token":
think_id = self.tokenizer.token_to_id("<think>")
if think_id is not None:
tt = torch.tensor([[think_id]], dtype=torch.long, device=self.device)
_, hidden = self.model(tt, hidden)
input_token = tt
generated = "<think>"
yield "<think>"
in_reasoning = True
else:
# Char mode: prime hidden with "<think>"
think_ids = [self.stoi.get(ch, 0) for ch in "<think>"]
tt = torch.tensor([think_ids], dtype=torch.long, device=self.device)
_, hidden = self.model(tt, hidden)
input_token = torch.tensor([[think_ids[-1]]], dtype=torch.long, device=self.device)
generated = "<think>"
for ch in "<think>":
yield ch
in_reasoning = True
if cfg.get("reasoning_start", False) and not prefix_text:
prefix = f"I need to think about this. The user said '{prompt}'"
if self.model_type == "token":
pids = self.tokenizer.encode(prefix).ids
pt = torch.tensor([pids], dtype=torch.long, device=self.device)
_, hidden = self.model(pt, hidden)
input_token = torch.tensor([[pids[-1]]], dtype=torch.long, device=self.device)
generated += prefix
yield prefix
else:
for ch in prefix:
idx = self.stoi.get(ch, 0)
it = torch.tensor([[idx]], dtype=torch.long, device=self.device)
_, hidden = self.model(it, hidden)
input_token = it
generated += ch
yield ch
# Get special token IDs
eos_id = None
think_open_id = None
think_close_id = None
if self.model_type == "token":
eos_id = self.tokenizer.token_to_id(eos_token_str)
think_open_id = self.tokenizer.token_to_id("<think>")
think_close_id = self.tokenizer.token_to_id("</think>")
# ------------------------------------------------------------------
# Token generation (exactly as local)
# ------------------------------------------------------------------
if self.model_type == "token":
byte_buf = b""
for step in range(max_len):
logits, hidden = self.model(input_token, hidden)
lf = logits[0, -1].float() / max(temperature, 1e-8)
# Repetition penalty
if penalty != 1.0 and len(recent_tokens) > 0:
penalized_ids = set(recent_tokens)
for token_id in penalized_ids:
if token_id < lf.size(0):
lf[token_id] /= penalty
# Top-K
if top_k > 0:
tv, _ = torch.topk(lf, min(top_k, lf.size(-1)))
lf[lf < tv[-1]] = float("-inf")
# Top-P
if top_p < 1.0:
sl, si = torch.sort(lf, descending=True)
cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1)
rm = cp > top_p
rm[..., 1:] = rm[..., :-1].clone()
rm[..., 0] = False
lf[si[rm]] = float("-inf")
nxt = torch.multinomial(torch.softmax(lf, dim=-1), 1).item()
# EOS handling with forced minimum response
if nxt == eos_id:
if awaiting_response and response_token_count < min_response_tokens:
continue
else:
break
recent_tokens.append(nxt)
# Update reasoning state
if think_open_id is not None and nxt == think_open_id:
in_reasoning = True
reasoning_toks = 0
if think_close_id is not None and nxt == think_close_id:
in_reasoning = False
think_closed = True
awaiting_response = True
response_token_count = 0
if in_reasoning and not think_closed:
reasoning_toks += 1
if max_reasoning_tokens and reasoning_toks >= max_reasoning_tokens:
# Force close think
if byte_buf:
decoded = byte_buf.decode('utf-8', errors='replace')
generated += decoded
yield decoded
byte_buf = b""
yield "</think>"
in_reasoning = False
think_closed = True
awaiting_response = True
response_token_count = 0
ct = torch.tensor([[think_close_id]], dtype=torch.long, device=self.device)
_, hidden = self.model(ct, hidden)
input_token = ct
recent_tokens.append(think_close_id)
continue
if awaiting_response and nxt != think_close_id:
response_token_count += 1
elif not in_reasoning and not think_closed:
response_token_count += 1
# Output token
tok_str = self.tokenizer.id_to_token(nxt) or ""
byte_buf += _tok_to_bytes(tok_str)
try:
decoded = byte_buf.decode('utf-8')
generated += decoded
yield decoded
byte_buf = b""
except UnicodeDecodeError:
pass
input_token = torch.tensor([[nxt]], dtype=torch.long, device=self.device)
if byte_buf:
leftover = byte_buf.decode('utf-8', errors='replace')
generated += leftover
yield leftover
# ------------------------------------------------------------------
# Character generation (exactly as local)
# ------------------------------------------------------------------
else:
for step in range(max_len):
logits, hidden = self.model(input_token, hidden)
lf = logits[0, -1].float() / max(temperature, 1e-8)
# Repetition penalty on characters
if penalty != 1.0 and len(recent_tokens) > 0:
# recent_tokens stores characters (strings)
penalized_chars = set(recent_tokens)
for ch in penalized_chars:
idx = self.stoi.get(ch, 0)
if idx < lf.size(0):
lf[idx] /= penalty
if top_k > 0:
tv, _ = torch.topk(lf, min(top_k, lf.size(-1)))
lf[lf < tv[-1]] = float("-inf")
if top_p < 1.0:
sl, si = torch.sort(lf, descending=True)
cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1)
rm = cp > top_p
rm[..., 1:] = rm[..., :-1].clone()
rm[..., 0] = False
lf[si[rm]] = float("-inf")
idx = torch.multinomial(torch.softmax(lf, dim=-1), 1).item()
char = self.itos[idx]
recent_tokens.append(char)
input_token = torch.tensor([[idx]], dtype=torch.long, device=self.device)
if char == "#" and generated.endswith("##"):
break
generated += char
yield char
# ─────────────────────────────────────────
# 🧰 Helpers (unchanged from original HF)
# ─────────────────────────────────────────
def parse_think_tags(text: str):
if "<think>" not in text:
return None, text, False
before, rest = text.split("<think>", 1)
if "</think>" in rest:
inner, after = rest.split("</think>", 1)
return inner.strip(), (before + after).strip(), True
return rest.strip(), before.strip(), False
def extract_current_topic(thinking_text: str) -> str:
if not thinking_text:
return "Reasoning..."
matches = re.findall(r'\*\*([^*]+)\*\*', thinking_text)
if matches:
return f"Reasoning: {matches[-1].strip()}"
return "Reasoning..."
def format_message(visible: str, thinking: str | None,
think_complete: bool, think_elapsed: float | None,
current_topic: str = "Reasoning...") -> str:
if not thinking:
return visible
if think_complete and think_elapsed is not None:
summary = f"💭 Thought for {think_elapsed:.1f}s"
open_attr = ""
else:
summary = current_topic
open_attr = ""
think_block = (f"<details class='think-details'{open_attr}>"
f"<summary class='think-summary'>{summary}</summary>"
f"<div class='think-content'>{thinking}</div>"
f"</details>")
if visible.strip():
return think_block + "\n\n" + visible
return think_block
def extract_html_canvas(text: str):
pattern = r"```html\s*\n([\s\S]*?)```"
match = re.search(pattern, text, re.IGNORECASE)
if match:
html_code = match.group(1)
cleaned_text = text[:match.start()] + text[match.end():]
return html_code.strip(), cleaned_text.strip()
return None, text
def make_canvas_html(code: str) -> str:
escaped = code.replace('"', "&quot;")
return (f"<div class='canvas-wrapper'>"
f"<div class='canvas-label'>🖼️ HTML Canvas</div>"
f'<iframe class="canvas-frame" srcdoc="{escaped}" '
f'sandbox="allow-scripts" scrolling="auto"></iframe>'
f"</div>")
# ─────────────────────────────────────────
# 🚀 Auto-load
# ─────────────────────────────────────────
_startup_model = None
_startup_cfg = None
_startup_msg = "⚠️ No model found. Place your `.pt` in the Space root."
if MODEL_PATH.exists():
try:
_startup_cfg = load_config()
_startup_model = LinnyModel(MODEL_PATH, _startup_cfg, tokenizer_path=TOKENIZER_PATH)
wm = _startup_cfg.get("working_memory", 0)
mtype = _startup_cfg.get("model_type", "char")
epoch = _startup_model.epoch
_startup_msg = (f"✅ Model auto-loaded! ({_startup_cfg['hidden_layers']}L × {_startup_cfg['neurons']}N, "
f"epoch {epoch}, {mtype}" + (f", {wm} ctx)" if wm else ")"))
print(_startup_msg)
except Exception as e:
_startup_msg = f"❌ Auto-load failed: {e}"
print(_startup_msg)
# ─────────────────────────────────────────
# 🎨 CSS (unchanged)
# ─────────────────────────────────────────
CSS = """
html, body { height: 100%; margin: 0; }
.gradio-container { max-width: 100% !important; padding: 0 !important; height: 100vh; display: flex; flex-direction: column; }
footer { display: none !important; }
.app-header { text-align: center; padding: 18px 0 10px; background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%); border-bottom: 1px solid #2d2d4e; }
.app-header h1 { font-size: 2em; margin: 0; background: linear-gradient(90deg, #a78bfa, #818cf8); -webkit-background-clip: text; -webkit-text-fill-color: transparent; }
.app-header p { color: #666; margin: 4px 0 0; font-size: 0.85em; }
#chatbox { flex: 1; min-height: 0; }
.gradio-chatbot { height: calc(100vh - 260px) !important; border-radius: 12px !important; background: #0f0f1a !important; border: 1px solid #2d2d4e !important; }
.input-row { padding: 10px 16px; background: #0f0f1a; border-top: 1px solid #2d2d4e; }
.think-details { margin: 0 0 10px 0; border-left: 3px solid #7c3aed; border-radius: 0 8px 8px 0; background: #13111c; overflow: hidden; }
.think-summary { cursor: pointer; padding: 8px 12px; color: #a78bfa; font-size: 0.82em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.8px; list-style: none; user-select: none; }
.think-summary::-webkit-details-marker { display: none; }
.think-summary::before { content: "▶ "; font-size: 0.7em; }
details[open] .think-summary::before { content: "▼ "; }
.think-content { padding: 10px 14px; color: #c4b5fd; font-style: italic; font-size: 0.87em; line-height: 1.6; border-top: 1px solid #2d1f5e; white-space: pre-wrap; max-height: 320px; overflow-y: auto; }
.canvas-wrapper { margin: 12px 0; border-radius: 10px; overflow: hidden; border: 1px solid #3d2d6e; background: #0d0d1a; }
.canvas-label { padding: 7px 14px; background: #1e1040; color: #a78bfa; font-size: 0.78em; font-weight: 600; letter-spacing: 0.5px; }
.canvas-frame { width: 100%; min-height: 300px; border: none; display: block; background: white; }
.status-bar { font-size: 0.8em; padding: 4px 0; color: #666; }
.reasoning-row { padding: 4px 0 0 2px; }
.tab-nav { background: #0f0f1a !important; border-bottom: 1px solid #2d2d4e !important; }
"""
# ─────────────────────────────────────────
# 🖥️ UI (identical to original HF but with penalty window)
# ─────────────────────────────────────────
def build_ui():
cfg = load_config()
with gr.Blocks(title="Linny AI", css=CSS) as demo:
session_model = gr.State(_startup_model)
session_cfg = gr.State(_startup_cfg)
gr.HTML("""
<div class='app-header'>
<h1>🤖 Linny AI</h1>
<p>LSTM Language Model · Runs locally on the Space</p>
</div>
""")
with gr.Tabs():
with gr.TabItem("💬 Chat"):
model_status = gr.Markdown(value=_startup_msg, elem_classes=["status-bar"])
chatbot = gr.Chatbot(elem_id="chatbox", label="", render_markdown=True)
canvas_display = gr.HTML(visible=False)
with gr.Row(elem_classes=["input-row"]):
msg_box = gr.Textbox(placeholder="Message Linny…", show_label=False, scale=8, lines=1, autofocus=True)
send_btn = gr.Button("Send ↩", variant="primary", scale=1)
with gr.Row(elem_classes=["reasoning-row"]):
reasoning_toggle = gr.Checkbox(value=False, label="🧠 Force Reasoning (pre-fills <think> token)", scale=1)
with gr.Accordion("⚙️ Generation Settings", open=False):
with gr.Row():
temp_sl = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Temperature")
penalty_sl = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Repetition Penalty")
with gr.Row():
penalty_window_sl = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Penalty Window")
with gr.Row():
topp_sl = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Top-P")
topk_sl = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Top-K (0 = off)")
max_len_sl = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Max Response Length")
with gr.Accordion("📤 Upload Your Own Model (optional)", open=False):
gr.Markdown("Upload a `.pt` file and optionally a tokenizer `.json` for token-based models.")
with gr.Row():
user_pt = gr.File(label="Upload .pt file", file_types=[".pt"])
user_tok = gr.File(label="Upload tokenizer .json (token model only)", file_types=[".json"])
model_type_radio = gr.Radio(choices=["char", "token"], value="char", label="Model Type")
with gr.Row():
u_layers = gr.Number(value=5, label="Hidden Layers", precision=0)
u_neurons = gr.Number(value=768, label="Neurons", precision=0)
u_embed = gr.Number(value=384, label="Embed Size", precision=0)
u_dropout = gr.Number(value=0.2, label="Dropout", precision=2)
u_working_mem = gr.Number(value=2048, label="Working Memory (0=off)", precision=0)
with gr.Row():
u_user_tag = gr.Textbox(value="### Instruction:", label="User Tag")
u_bot_tag = gr.Textbox(value="### Response:", label="Bot Tag")
u_eos = gr.Textbox(value="<|end|>", label="EOS Token")
load_user_btn = gr.Button("🚀 Load My Model", variant="secondary")
user_load_status = gr.Markdown("")
with gr.TabItem("🔒 Admin"):
gr.Markdown("## Admin Panel\nEdit `config.json` settings.")
with gr.Row():
admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter password", scale=3)
admin_login_btn = gr.Button("Login", scale=1)
admin_status = gr.Markdown("")
with gr.Group(visible=False) as admin_panel:
gr.Markdown("### Model Architecture")
model_type_admin = gr.Radio(choices=["char", "token"], value=cfg.get("model_type", "char"), label="Model Type")
with gr.Row():
a_layers = gr.Number(value=cfg.get("hidden_layers", 5), label="Hidden Layers", precision=0)
a_neurons = gr.Number(value=cfg.get("neurons", 768), label="Neurons", precision=0)
a_embed = gr.Number(value=cfg.get("embed_size", 384), label="Embed Size", precision=0)
a_dropout = gr.Number(value=cfg.get("dropout", 0.2), label="Dropout", precision=2)
a_working_mem = gr.Number(value=cfg.get("working_memory", 2048), label="Working Memory (0=off)", precision=0)
gr.Markdown("### Tags & Tokens")
with gr.Row():
a_user_tag = gr.Textbox(value=cfg.get("user_tag", "### Instruction:"), label="User Tag")
a_bot_tag = gr.Textbox(value=cfg.get("bot_tag", "### Response:"), label="Bot Tag")
a_eos = gr.Textbox(value=cfg.get("eos_token", "<|end|>"), label="EOS Token")
a_sys = gr.Textbox(value=cfg.get("system_prompt", "You are a helpful AI named Linny."), label="System Prompt", lines=2)
gr.Markdown("### Reasoning")
a_reasoning_mode = gr.Radio(choices=["prompt_suffix", "response_prefix"], value=cfg.get("reasoning_mode", "response_prefix"), label="Force Reasoning Mode")
a_reasoning_start = gr.Checkbox(value=cfg.get("reasoning_start", False), label="Enable Reasoning Start Prefix")
a_min_response_tokens = gr.Number(value=cfg.get("min_response_tokens", 3), label="Minimum Response Tokens After </think>", precision=0)
a_max_reasoning_tokens = gr.Number(value=cfg.get("max_reasoning_tokens", 2500), label="Max Reasoning Tokens", precision=0)
gr.Markdown("### Default Generation Settings")
with gr.Row():
a_temp = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Default Temperature")
a_penalty = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Default Penalty")
with gr.Row():
a_penalty_window = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Default Penalty Window")
with gr.Row():
a_topp = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Default Top-P")
a_topk = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Default Top-K")
a_maxlen = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Default Max Length")
save_cfg_btn = gr.Button("💾 Save config.json", variant="primary")
save_status = gr.Markdown("")
# Callbacks
def do_admin_login(pw):
if pw == ADMIN_PASSWORD:
return gr.update(visible=True), "✅ Logged in."
return gr.update(visible=False), "❌ Incorrect password."
admin_login_btn.click(do_admin_login, inputs=[admin_pw], outputs=[admin_panel, admin_status])
def do_save_config(mtype, layers, neurons, embed, dropout, working_mem,
user_tag, bot_tag, eos, sys_prompt,
reasoning_mode, reasoning_start,
min_response_tokens, max_reasoning_tokens,
temp, penalty, penalty_window, top_p, top_k, max_len):
try:
new_cfg = {
"model_type": mtype,
"hidden_layers": int(layers),
"neurons": int(neurons),
"embed_size": int(embed),
"dropout": float(dropout),
"working_memory": int(working_mem),
"user_tag": user_tag,
"bot_tag": bot_tag,
"eos_token": eos,
"system_prompt": sys_prompt,
"reasoning_mode": reasoning_mode,
"reasoning_start": bool(reasoning_start),
"min_response_tokens": int(min_response_tokens),
"max_reasoning_tokens": int(max_reasoning_tokens),
"default_temp": float(temp),
"default_penalty": float(penalty),
"default_penalty_window": int(penalty_window),
"default_top_p": float(top_p),
"default_top_k": int(top_k),
"default_max_len": int(max_len),
}
save_config(new_cfg)
return "✅ config.json saved! Restart the Space to apply changes."
except Exception as e:
return f"❌ Error: {e}"
save_cfg_btn.click(do_save_config,
inputs=[model_type_admin, a_layers, a_neurons, a_embed, a_dropout, a_working_mem,
a_user_tag, a_bot_tag, a_eos, a_sys,
a_reasoning_mode, a_reasoning_start,
a_min_response_tokens, a_max_reasoning_tokens,
a_temp, a_penalty, a_penalty_window, a_topp, a_topk, a_maxlen],
outputs=[save_status])
def load_user_model(pt_file, tok_file, mtype, layers, neurons, embed, dropout,
working_mem, user_tag, bot_tag, eos):
if pt_file is None:
return None, None, "❌ Please upload a .pt file first."
try:
user_cfg = {
"model_type": mtype,
"hidden_layers": int(layers),
"neurons": int(neurons),
"embed_size": int(embed),
"dropout": float(dropout),
"working_memory": int(working_mem),
"user_tag": user_tag,
"bot_tag": bot_tag,
"eos_token": eos,
"reasoning_mode": cfg.get("reasoning_mode", "response_prefix"),
"reasoning_start": cfg.get("reasoning_start", False),
"min_response_tokens": cfg.get("min_response_tokens", 3),
"max_reasoning_tokens": cfg.get("max_reasoning_tokens", 2500),
}
tok_path = tok_file.name if tok_file else None
m = LinnyModel(pt_file.name, user_cfg, tokenizer_path=tok_path)
return m, user_cfg, f"✅ Model loaded! ({user_cfg['hidden_layers']}L × {user_cfg['neurons']}N, {mtype}, epoch {m.epoch})"
except Exception as e:
return None, None, f"❌ Error: {e}"
load_user_btn.click(load_user_model,
inputs=[user_pt, user_tok, model_type_radio,
u_layers, u_neurons, u_embed, u_dropout, u_working_mem,
u_user_tag, u_bot_tag, u_eos],
outputs=[session_model, session_cfg, user_load_status])
def respond(message, history, model, temp, penalty, penalty_window, top_p, top_k, max_len, force_thinking):
if not message.strip():
yield history, "", gr.update(visible=False)
return
if model is None:
yield history + [{"role":"user","content":message},{"role":"assistant","content":"⚠️ No model loaded."}], "", gr.update(visible=False)
return
history = history + [{"role":"user","content":message},{"role":"assistant","content":""}]
full_response = ""
stop = False
think_start = None
think_end = None
try:
for chunk in model.stream_generate(
message,
temperature=float(temp),
max_len=int(max_len),
penalty=float(penalty),
penalty_window=int(penalty_window),
top_p=float(top_p),
top_k=int(top_k),
force_thinking=bool(force_thinking),
min_response_tokens=model.config.get("min_response_tokens", 3),
max_reasoning_tokens=model.config.get("max_reasoning_tokens", 2500),
):
full_response += chunk
# Stop at EOS if present
eos_str = model.config.get("eos_token", "<|end|>")
if eos_str in full_response:
full_response = full_response[:full_response.find(eos_str)]
if full_response.count("<think>") > full_response.count("</think>"):
full_response += "</think>"
else:
stop = True
thinking, visible, think_complete = parse_think_tags(full_response)
if thinking and think_start is None:
think_start = time.time()
if think_complete and think_end is None and think_start is not None:
think_end = time.time()
think_elapsed = (think_end - think_start) if (think_end and think_start) else (time.time() - think_start if think_start else None)
current_topic = extract_current_topic(thinking) if thinking else "Reasoning..."
html_code, visible_clean = extract_html_canvas(visible)
canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "")
history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, think_complete, think_elapsed, current_topic)}
yield history, "", canvas_update
if stop:
break
except Exception as e:
history[-1] = {"role":"assistant","content":f"⚠️ Generation error: {e}"}
yield history, "", gr.update(visible=False)
return
# Final pass
thinking, visible, think_complete = parse_think_tags(full_response)
if think_end is None and think_start is not None:
think_end = time.time()
think_elapsed = (think_end - think_start) if (think_end and think_start) else None
if think_complete and not visible.strip():
visible = "*(no response generated)*"
current_topic = extract_current_topic(thinking) if thinking else "Reasoning..."
html_code, visible_clean = extract_html_canvas(visible)
history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, True, think_elapsed, current_topic)}
canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "")
yield history, "", canvas_update
shared_inputs = [msg_box, chatbot, session_model,
temp_sl, penalty_sl, penalty_window_sl, topp_sl, topk_sl, max_len_sl, reasoning_toggle]
shared_outputs = [chatbot, msg_box, canvas_display]
send_btn.click(respond, inputs=shared_inputs, outputs=shared_outputs)
msg_box.submit(respond, inputs=shared_inputs, outputs=shared_outputs)
return demo
if __name__ == "__main__":
build_ui().launch(ssr_mode=False)