import os import re import json import time import torch import torch.nn as nn import gradio as gr from pathlib import Path from collections import deque # ───────────────────────────────────────── # 🔐 Admin password # ───────────────────────────────────────── ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "linny-admin") # ───────────────────────────────────────── # 📁 Paths # ───────────────────────────────────────── SPACE_ROOT = Path(__file__).parent CONFIG_PATH = SPACE_ROOT / "config.json" _pt_files = sorted(SPACE_ROOT.glob("*.pt")) MODEL_PATH = _pt_files[0] if _pt_files else SPACE_ROOT / "default_model.pt" _tok_files = [f for f in SPACE_ROOT.glob("*.json") if f.name != "config.json"] TOKENIZER_PATH = _tok_files[0] if _tok_files else None # ───────────────────────────────────────── # 🗃️ Config # ───────────────────────────────────────── DEFAULT_CONFIG = { "model_type": "char", "hidden_layers": 5, "neurons": 768, "embed_size": 384, "dropout": 0.2, "vocab_size": 20000, "working_memory": 2048, "user_tag": "### Instruction:", "bot_tag": "### Response:", "eos_token": "<|end|>", "system_prompt": "You are a helpful and intelligent AI assistant named Linny.", "default_temp": 0.8, "default_penalty": 1.05, "default_penalty_window": 110, "default_top_p": 0.4, "default_top_k": 65, "default_max_len": 2600, "reasoning_mode": "response_prefix", "reasoning_start": False, "min_response_tokens": 3, "max_reasoning_tokens": 2500, } def load_config() -> dict: if CONFIG_PATH.exists(): with open(CONFIG_PATH) as f: data = json.load(f) for k, v in DEFAULT_CONFIG.items(): data.setdefault(k, v) return data return DEFAULT_CONFIG.copy() def save_config(cfg: dict): with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) # ───────────────────────────────────────── # 🧠 Model Architectures (unchanged) # ───────────────────────────────────────── class LSTMCharLM(nn.Module): def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2): super().__init__() self.embed = nn.Embedding(vocab_size, embed_size) self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.fc = nn.Linear(hidden_size, vocab_size) def forward(self, x, hidden=None): out, hidden = self.lstm(self.embed(x), hidden) return self.fc(out), hidden class LSTMTokenLM(nn.Module): def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2): super().__init__() self.embed = nn.Embedding(vocab_size, embed_size) self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.fc = nn.Linear(hidden_size, vocab_size) def forward(self, x, hidden=None): out, hidden = self.lstm(self.embed(x), hidden) return self.fc(out), hidden # ───────────────────────────────────────── # 🔤 GPT-2 byte decoder # ───────────────────────────────────────── def _build_byte_decoder(): bs = (list(range(ord('!'), ord('~')+1)) + list(range(ord('¡'), ord('¬')+1)) + list(range(ord('®'), ord('ÿ')+1))) cs = bs[:] n = 0 for b in range(256): if b not in bs: bs.append(b) cs.append(256+n) n += 1 return {chr(c): b for b, c in zip(bs, cs)} _BYTE_DECODER = _build_byte_decoder() def _tok_to_bytes(tok_str): try: return bytes([_BYTE_DECODER[c] for c in tok_str]) except KeyError: return tok_str.encode('utf-8', errors='replace') # ───────────────────────────────────────── # ⚙️ Model Loader (identical generation to local) # ───────────────────────────────────────── class LinnyModel: def __init__(self, pt_path, config: dict, tokenizer_path=None): self.config = config self.model_type = config.get("model_type", "char") self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") ckpt = torch.load(pt_path, map_location=self.device, weights_only=False) if self.model_type == "token": from tokenizers import Tokenizer as HFTokenizer tok_path = tokenizer_path or config.get("tokenizer_path") if not tok_path or not Path(str(tok_path)).exists(): raise FileNotFoundError(f"Tokenizer not found: {tok_path}") self.tokenizer = HFTokenizer.from_file(str(tok_path)) vocab_size = self.tokenizer.get_vocab_size() self.chars = None; self.stoi = None; self.itos = None arch = ckpt.get('config', {}) layers = arch.get('hidden_layers', config['hidden_layers']) neurons = arch.get('neurons', config['neurons']) embed = arch.get('embed_size', config['embed_size']) dropout = arch.get('dropout', config.get('dropout', 0.2)) self.model = LSTMTokenLM(vocab_size, embed, neurons, layers, dropout).to(self.device) self.model.load_state_dict(ckpt['model_state']) else: self.tokenizer = None self.chars = ckpt["chars"] self.stoi = {ch: i for i, ch in enumerate(self.chars)} self.itos = {i: ch for i, ch in enumerate(self.chars)} dropout = ckpt.get("config", {}).get("dropout", config.get("dropout", 0.2)) self.model = LSTMCharLM(len(self.chars), config["embed_size"], config["neurons"], config["hidden_layers"], dropout).to(self.device) self.model.load_state_dict(ckpt["model_state"]) self.model.eval() self.epoch = ckpt.get('epoch', '?') # ------------------------------------------------------------------ # Exact copy of local server's generate_stream logic # ------------------------------------------------------------------ def stream_generate(self, prompt, temperature=0.8, max_len=2600, penalty=1.05, penalty_window=110, top_p=0.4, top_k=65, force_thinking=False, prefix_text="", penalize_prefix=False, min_response_tokens=3, max_reasoning_tokens=2500): """ Matches local linny_server.py generate_stream exactly. """ cfg = self.config user_tag = cfg.get("user_tag", "### Instruction:") bot_tag = cfg.get("bot_tag", "### Response:") r_mode = cfg.get("reasoning_mode", "response_prefix") eos_token_str = cfg.get("eos_token", "<|end|>") # Apply prompt_suffix mode actual_prompt = prompt if force_thinking and r_mode == "prompt_suffix": if not prompt.strip().endswith("/think"): actual_prompt = prompt.strip() + " /think" formatted = f"{user_tag}\n{actual_prompt}\n\n{bot_tag}\n" # Working memory cap working_memory = cfg.get("working_memory", 0) if working_memory > 0: max_len = max(50, min(max_len, working_memory - len(formatted))) hidden = None generated = "" recent_tokens = deque(maxlen=penalty_window) # State tracking (exactly as local) in_reasoning = False think_closed = False awaiting_response = False response_token_count = 0 reasoning_toks = 0 with torch.no_grad(): # Encode the conversation prefix if self.model_type == "token": ids = self.tokenizer.encode(formatted).ids else: ids = [self.stoi.get(c, 0) for c in formatted] t = torch.tensor([ids], dtype=torch.long, device=self.device) _, hidden = self.model(t, hidden) input_token = torch.tensor([[ids[-1]]], dtype=torch.long, device=self.device) # If we have existing assistant response (continue mode) if prefix_text: if self.model_type == "token": prefix_ids = self.tokenizer.encode(prefix_text).ids else: prefix_ids = [self.stoi.get(c, 0) for c in prefix_text] if prefix_ids: pt = torch.tensor([prefix_ids], dtype=torch.long, device=self.device) _, hidden = self.model(pt, hidden) generated = prefix_text input_token = torch.tensor([[prefix_ids[-1]]], dtype=torch.long, device=self.device) if penalize_prefix: recent_tokens.extend(prefix_ids) # Update state based on prefix if "" in prefix_text and "" not in prefix_text: in_reasoning = True elif "" in prefix_text: think_closed = True awaiting_response = True # Prefill if in response_prefix mode and no prefix if not prefix_text and force_thinking and r_mode == "response_prefix": if self.model_type == "token": think_id = self.tokenizer.token_to_id("") if think_id is not None: tt = torch.tensor([[think_id]], dtype=torch.long, device=self.device) _, hidden = self.model(tt, hidden) input_token = tt generated = "" yield "" in_reasoning = True else: # Char mode: prime hidden with "" think_ids = [self.stoi.get(ch, 0) for ch in ""] tt = torch.tensor([think_ids], dtype=torch.long, device=self.device) _, hidden = self.model(tt, hidden) input_token = torch.tensor([[think_ids[-1]]], dtype=torch.long, device=self.device) generated = "" for ch in "": yield ch in_reasoning = True if cfg.get("reasoning_start", False) and not prefix_text: prefix = f"I need to think about this. The user said '{prompt}'" if self.model_type == "token": pids = self.tokenizer.encode(prefix).ids pt = torch.tensor([pids], dtype=torch.long, device=self.device) _, hidden = self.model(pt, hidden) input_token = torch.tensor([[pids[-1]]], dtype=torch.long, device=self.device) generated += prefix yield prefix else: for ch in prefix: idx = self.stoi.get(ch, 0) it = torch.tensor([[idx]], dtype=torch.long, device=self.device) _, hidden = self.model(it, hidden) input_token = it generated += ch yield ch # Get special token IDs eos_id = None think_open_id = None think_close_id = None if self.model_type == "token": eos_id = self.tokenizer.token_to_id(eos_token_str) think_open_id = self.tokenizer.token_to_id("") think_close_id = self.tokenizer.token_to_id("") # ------------------------------------------------------------------ # Token generation (exactly as local) # ------------------------------------------------------------------ if self.model_type == "token": byte_buf = b"" for step in range(max_len): logits, hidden = self.model(input_token, hidden) lf = logits[0, -1].float() / max(temperature, 1e-8) # Repetition penalty if penalty != 1.0 and len(recent_tokens) > 0: penalized_ids = set(recent_tokens) for token_id in penalized_ids: if token_id < lf.size(0): lf[token_id] /= penalty # Top-K if top_k > 0: tv, _ = torch.topk(lf, min(top_k, lf.size(-1))) lf[lf < tv[-1]] = float("-inf") # Top-P if top_p < 1.0: sl, si = torch.sort(lf, descending=True) cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1) rm = cp > top_p rm[..., 1:] = rm[..., :-1].clone() rm[..., 0] = False lf[si[rm]] = float("-inf") nxt = torch.multinomial(torch.softmax(lf, dim=-1), 1).item() # EOS handling with forced minimum response if nxt == eos_id: if awaiting_response and response_token_count < min_response_tokens: continue else: break recent_tokens.append(nxt) # Update reasoning state if think_open_id is not None and nxt == think_open_id: in_reasoning = True reasoning_toks = 0 if think_close_id is not None and nxt == think_close_id: in_reasoning = False think_closed = True awaiting_response = True response_token_count = 0 if in_reasoning and not think_closed: reasoning_toks += 1 if max_reasoning_tokens and reasoning_toks >= max_reasoning_tokens: # Force close think if byte_buf: decoded = byte_buf.decode('utf-8', errors='replace') generated += decoded yield decoded byte_buf = b"" yield "" in_reasoning = False think_closed = True awaiting_response = True response_token_count = 0 ct = torch.tensor([[think_close_id]], dtype=torch.long, device=self.device) _, hidden = self.model(ct, hidden) input_token = ct recent_tokens.append(think_close_id) continue if awaiting_response and nxt != think_close_id: response_token_count += 1 elif not in_reasoning and not think_closed: response_token_count += 1 # Output token tok_str = self.tokenizer.id_to_token(nxt) or "" byte_buf += _tok_to_bytes(tok_str) try: decoded = byte_buf.decode('utf-8') generated += decoded yield decoded byte_buf = b"" except UnicodeDecodeError: pass input_token = torch.tensor([[nxt]], dtype=torch.long, device=self.device) if byte_buf: leftover = byte_buf.decode('utf-8', errors='replace') generated += leftover yield leftover # ------------------------------------------------------------------ # Character generation (exactly as local) # ------------------------------------------------------------------ else: for step in range(max_len): logits, hidden = self.model(input_token, hidden) lf = logits[0, -1].float() / max(temperature, 1e-8) # Repetition penalty on characters if penalty != 1.0 and len(recent_tokens) > 0: # recent_tokens stores characters (strings) penalized_chars = set(recent_tokens) for ch in penalized_chars: idx = self.stoi.get(ch, 0) if idx < lf.size(0): lf[idx] /= penalty if top_k > 0: tv, _ = torch.topk(lf, min(top_k, lf.size(-1))) lf[lf < tv[-1]] = float("-inf") if top_p < 1.0: sl, si = torch.sort(lf, descending=True) cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1) rm = cp > top_p rm[..., 1:] = rm[..., :-1].clone() rm[..., 0] = False lf[si[rm]] = float("-inf") idx = torch.multinomial(torch.softmax(lf, dim=-1), 1).item() char = self.itos[idx] recent_tokens.append(char) input_token = torch.tensor([[idx]], dtype=torch.long, device=self.device) if char == "#" and generated.endswith("##"): break generated += char yield char # ───────────────────────────────────────── # 🧰 Helpers (unchanged from original HF) # ───────────────────────────────────────── def parse_think_tags(text: str): if "" not in text: return None, text, False before, rest = text.split("", 1) if "" in rest: inner, after = rest.split("", 1) return inner.strip(), (before + after).strip(), True return rest.strip(), before.strip(), False def extract_current_topic(thinking_text: str) -> str: if not thinking_text: return "Reasoning..." matches = re.findall(r'\*\*([^*]+)\*\*', thinking_text) if matches: return f"Reasoning: {matches[-1].strip()}" return "Reasoning..." def format_message(visible: str, thinking: str | None, think_complete: bool, think_elapsed: float | None, current_topic: str = "Reasoning...") -> str: if not thinking: return visible if think_complete and think_elapsed is not None: summary = f"💭 Thought for {think_elapsed:.1f}s" open_attr = "" else: summary = current_topic open_attr = "" think_block = (f"
" f"{summary}" f"
{thinking}
" f"
") if visible.strip(): return think_block + "\n\n" + visible return think_block def extract_html_canvas(text: str): pattern = r"```html\s*\n([\s\S]*?)```" match = re.search(pattern, text, re.IGNORECASE) if match: html_code = match.group(1) cleaned_text = text[:match.start()] + text[match.end():] return html_code.strip(), cleaned_text.strip() return None, text def make_canvas_html(code: str) -> str: escaped = code.replace('"', """) return (f"
" f"
🖼️ HTML Canvas
" f'' f"
") # ───────────────────────────────────────── # 🚀 Auto-load # ───────────────────────────────────────── _startup_model = None _startup_cfg = None _startup_msg = "⚠️ No model found. Place your `.pt` in the Space root." if MODEL_PATH.exists(): try: _startup_cfg = load_config() _startup_model = LinnyModel(MODEL_PATH, _startup_cfg, tokenizer_path=TOKENIZER_PATH) wm = _startup_cfg.get("working_memory", 0) mtype = _startup_cfg.get("model_type", "char") epoch = _startup_model.epoch _startup_msg = (f"✅ Model auto-loaded! ({_startup_cfg['hidden_layers']}L × {_startup_cfg['neurons']}N, " f"epoch {epoch}, {mtype}" + (f", {wm} ctx)" if wm else ")")) print(_startup_msg) except Exception as e: _startup_msg = f"❌ Auto-load failed: {e}" print(_startup_msg) # ───────────────────────────────────────── # 🎨 CSS (unchanged) # ───────────────────────────────────────── CSS = """ html, body { height: 100%; margin: 0; } .gradio-container { max-width: 100% !important; padding: 0 !important; height: 100vh; display: flex; flex-direction: column; } footer { display: none !important; } .app-header { text-align: center; padding: 18px 0 10px; background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%); border-bottom: 1px solid #2d2d4e; } .app-header h1 { font-size: 2em; margin: 0; background: linear-gradient(90deg, #a78bfa, #818cf8); -webkit-background-clip: text; -webkit-text-fill-color: transparent; } .app-header p { color: #666; margin: 4px 0 0; font-size: 0.85em; } #chatbox { flex: 1; min-height: 0; } .gradio-chatbot { height: calc(100vh - 260px) !important; border-radius: 12px !important; background: #0f0f1a !important; border: 1px solid #2d2d4e !important; } .input-row { padding: 10px 16px; background: #0f0f1a; border-top: 1px solid #2d2d4e; } .think-details { margin: 0 0 10px 0; border-left: 3px solid #7c3aed; border-radius: 0 8px 8px 0; background: #13111c; overflow: hidden; } .think-summary { cursor: pointer; padding: 8px 12px; color: #a78bfa; font-size: 0.82em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.8px; list-style: none; user-select: none; } .think-summary::-webkit-details-marker { display: none; } .think-summary::before { content: "▶ "; font-size: 0.7em; } details[open] .think-summary::before { content: "▼ "; } .think-content { padding: 10px 14px; color: #c4b5fd; font-style: italic; font-size: 0.87em; line-height: 1.6; border-top: 1px solid #2d1f5e; white-space: pre-wrap; max-height: 320px; overflow-y: auto; } .canvas-wrapper { margin: 12px 0; border-radius: 10px; overflow: hidden; border: 1px solid #3d2d6e; background: #0d0d1a; } .canvas-label { padding: 7px 14px; background: #1e1040; color: #a78bfa; font-size: 0.78em; font-weight: 600; letter-spacing: 0.5px; } .canvas-frame { width: 100%; min-height: 300px; border: none; display: block; background: white; } .status-bar { font-size: 0.8em; padding: 4px 0; color: #666; } .reasoning-row { padding: 4px 0 0 2px; } .tab-nav { background: #0f0f1a !important; border-bottom: 1px solid #2d2d4e !important; } """ # ───────────────────────────────────────── # 🖥️ UI (identical to original HF but with penalty window) # ───────────────────────────────────────── def build_ui(): cfg = load_config() with gr.Blocks(title="Linny AI", css=CSS) as demo: session_model = gr.State(_startup_model) session_cfg = gr.State(_startup_cfg) gr.HTML("""

🤖 Linny AI

LSTM Language Model · Runs locally on the Space

""") with gr.Tabs(): with gr.TabItem("💬 Chat"): model_status = gr.Markdown(value=_startup_msg, elem_classes=["status-bar"]) chatbot = gr.Chatbot(elem_id="chatbox", label="", render_markdown=True) canvas_display = gr.HTML(visible=False) with gr.Row(elem_classes=["input-row"]): msg_box = gr.Textbox(placeholder="Message Linny…", show_label=False, scale=8, lines=1, autofocus=True) send_btn = gr.Button("Send ↩", variant="primary", scale=1) with gr.Row(elem_classes=["reasoning-row"]): reasoning_toggle = gr.Checkbox(value=False, label="🧠 Force Reasoning (pre-fills token)", scale=1) with gr.Accordion("⚙️ Generation Settings", open=False): with gr.Row(): temp_sl = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Temperature") penalty_sl = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Repetition Penalty") with gr.Row(): penalty_window_sl = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Penalty Window") with gr.Row(): topp_sl = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Top-P") topk_sl = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Top-K (0 = off)") max_len_sl = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Max Response Length") with gr.Accordion("📤 Upload Your Own Model (optional)", open=False): gr.Markdown("Upload a `.pt` file and optionally a tokenizer `.json` for token-based models.") with gr.Row(): user_pt = gr.File(label="Upload .pt file", file_types=[".pt"]) user_tok = gr.File(label="Upload tokenizer .json (token model only)", file_types=[".json"]) model_type_radio = gr.Radio(choices=["char", "token"], value="char", label="Model Type") with gr.Row(): u_layers = gr.Number(value=5, label="Hidden Layers", precision=0) u_neurons = gr.Number(value=768, label="Neurons", precision=0) u_embed = gr.Number(value=384, label="Embed Size", precision=0) u_dropout = gr.Number(value=0.2, label="Dropout", precision=2) u_working_mem = gr.Number(value=2048, label="Working Memory (0=off)", precision=0) with gr.Row(): u_user_tag = gr.Textbox(value="### Instruction:", label="User Tag") u_bot_tag = gr.Textbox(value="### Response:", label="Bot Tag") u_eos = gr.Textbox(value="<|end|>", label="EOS Token") load_user_btn = gr.Button("🚀 Load My Model", variant="secondary") user_load_status = gr.Markdown("") with gr.TabItem("🔒 Admin"): gr.Markdown("## Admin Panel\nEdit `config.json` settings.") with gr.Row(): admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter password", scale=3) admin_login_btn = gr.Button("Login", scale=1) admin_status = gr.Markdown("") with gr.Group(visible=False) as admin_panel: gr.Markdown("### Model Architecture") model_type_admin = gr.Radio(choices=["char", "token"], value=cfg.get("model_type", "char"), label="Model Type") with gr.Row(): a_layers = gr.Number(value=cfg.get("hidden_layers", 5), label="Hidden Layers", precision=0) a_neurons = gr.Number(value=cfg.get("neurons", 768), label="Neurons", precision=0) a_embed = gr.Number(value=cfg.get("embed_size", 384), label="Embed Size", precision=0) a_dropout = gr.Number(value=cfg.get("dropout", 0.2), label="Dropout", precision=2) a_working_mem = gr.Number(value=cfg.get("working_memory", 2048), label="Working Memory (0=off)", precision=0) gr.Markdown("### Tags & Tokens") with gr.Row(): a_user_tag = gr.Textbox(value=cfg.get("user_tag", "### Instruction:"), label="User Tag") a_bot_tag = gr.Textbox(value=cfg.get("bot_tag", "### Response:"), label="Bot Tag") a_eos = gr.Textbox(value=cfg.get("eos_token", "<|end|>"), label="EOS Token") a_sys = gr.Textbox(value=cfg.get("system_prompt", "You are a helpful AI named Linny."), label="System Prompt", lines=2) gr.Markdown("### Reasoning") a_reasoning_mode = gr.Radio(choices=["prompt_suffix", "response_prefix"], value=cfg.get("reasoning_mode", "response_prefix"), label="Force Reasoning Mode") a_reasoning_start = gr.Checkbox(value=cfg.get("reasoning_start", False), label="Enable Reasoning Start Prefix") a_min_response_tokens = gr.Number(value=cfg.get("min_response_tokens", 3), label="Minimum Response Tokens After ", precision=0) a_max_reasoning_tokens = gr.Number(value=cfg.get("max_reasoning_tokens", 2500), label="Max Reasoning Tokens", precision=0) gr.Markdown("### Default Generation Settings") with gr.Row(): a_temp = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Default Temperature") a_penalty = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Default Penalty") with gr.Row(): a_penalty_window = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Default Penalty Window") with gr.Row(): a_topp = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Default Top-P") a_topk = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Default Top-K") a_maxlen = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Default Max Length") save_cfg_btn = gr.Button("💾 Save config.json", variant="primary") save_status = gr.Markdown("") # Callbacks def do_admin_login(pw): if pw == ADMIN_PASSWORD: return gr.update(visible=True), "✅ Logged in." return gr.update(visible=False), "❌ Incorrect password." admin_login_btn.click(do_admin_login, inputs=[admin_pw], outputs=[admin_panel, admin_status]) def do_save_config(mtype, layers, neurons, embed, dropout, working_mem, user_tag, bot_tag, eos, sys_prompt, reasoning_mode, reasoning_start, min_response_tokens, max_reasoning_tokens, temp, penalty, penalty_window, top_p, top_k, max_len): try: new_cfg = { "model_type": mtype, "hidden_layers": int(layers), "neurons": int(neurons), "embed_size": int(embed), "dropout": float(dropout), "working_memory": int(working_mem), "user_tag": user_tag, "bot_tag": bot_tag, "eos_token": eos, "system_prompt": sys_prompt, "reasoning_mode": reasoning_mode, "reasoning_start": bool(reasoning_start), "min_response_tokens": int(min_response_tokens), "max_reasoning_tokens": int(max_reasoning_tokens), "default_temp": float(temp), "default_penalty": float(penalty), "default_penalty_window": int(penalty_window), "default_top_p": float(top_p), "default_top_k": int(top_k), "default_max_len": int(max_len), } save_config(new_cfg) return "✅ config.json saved! Restart the Space to apply changes." except Exception as e: return f"❌ Error: {e}" save_cfg_btn.click(do_save_config, inputs=[model_type_admin, a_layers, a_neurons, a_embed, a_dropout, a_working_mem, a_user_tag, a_bot_tag, a_eos, a_sys, a_reasoning_mode, a_reasoning_start, a_min_response_tokens, a_max_reasoning_tokens, a_temp, a_penalty, a_penalty_window, a_topp, a_topk, a_maxlen], outputs=[save_status]) def load_user_model(pt_file, tok_file, mtype, layers, neurons, embed, dropout, working_mem, user_tag, bot_tag, eos): if pt_file is None: return None, None, "❌ Please upload a .pt file first." try: user_cfg = { "model_type": mtype, "hidden_layers": int(layers), "neurons": int(neurons), "embed_size": int(embed), "dropout": float(dropout), "working_memory": int(working_mem), "user_tag": user_tag, "bot_tag": bot_tag, "eos_token": eos, "reasoning_mode": cfg.get("reasoning_mode", "response_prefix"), "reasoning_start": cfg.get("reasoning_start", False), "min_response_tokens": cfg.get("min_response_tokens", 3), "max_reasoning_tokens": cfg.get("max_reasoning_tokens", 2500), } tok_path = tok_file.name if tok_file else None m = LinnyModel(pt_file.name, user_cfg, tokenizer_path=tok_path) return m, user_cfg, f"✅ Model loaded! ({user_cfg['hidden_layers']}L × {user_cfg['neurons']}N, {mtype}, epoch {m.epoch})" except Exception as e: return None, None, f"❌ Error: {e}" load_user_btn.click(load_user_model, inputs=[user_pt, user_tok, model_type_radio, u_layers, u_neurons, u_embed, u_dropout, u_working_mem, u_user_tag, u_bot_tag, u_eos], outputs=[session_model, session_cfg, user_load_status]) def respond(message, history, model, temp, penalty, penalty_window, top_p, top_k, max_len, force_thinking): if not message.strip(): yield history, "", gr.update(visible=False) return if model is None: yield history + [{"role":"user","content":message},{"role":"assistant","content":"⚠️ No model loaded."}], "", gr.update(visible=False) return history = history + [{"role":"user","content":message},{"role":"assistant","content":""}] full_response = "" stop = False think_start = None think_end = None try: for chunk in model.stream_generate( message, temperature=float(temp), max_len=int(max_len), penalty=float(penalty), penalty_window=int(penalty_window), top_p=float(top_p), top_k=int(top_k), force_thinking=bool(force_thinking), min_response_tokens=model.config.get("min_response_tokens", 3), max_reasoning_tokens=model.config.get("max_reasoning_tokens", 2500), ): full_response += chunk # Stop at EOS if present eos_str = model.config.get("eos_token", "<|end|>") if eos_str in full_response: full_response = full_response[:full_response.find(eos_str)] if full_response.count("") > full_response.count(""): full_response += "
" else: stop = True thinking, visible, think_complete = parse_think_tags(full_response) if thinking and think_start is None: think_start = time.time() if think_complete and think_end is None and think_start is not None: think_end = time.time() think_elapsed = (think_end - think_start) if (think_end and think_start) else (time.time() - think_start if think_start else None) current_topic = extract_current_topic(thinking) if thinking else "Reasoning..." html_code, visible_clean = extract_html_canvas(visible) canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "") history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, think_complete, think_elapsed, current_topic)} yield history, "", canvas_update if stop: break except Exception as e: history[-1] = {"role":"assistant","content":f"⚠️ Generation error: {e}"} yield history, "", gr.update(visible=False) return # Final pass thinking, visible, think_complete = parse_think_tags(full_response) if think_end is None and think_start is not None: think_end = time.time() think_elapsed = (think_end - think_start) if (think_end and think_start) else None if think_complete and not visible.strip(): visible = "*(no response generated)*" current_topic = extract_current_topic(thinking) if thinking else "Reasoning..." html_code, visible_clean = extract_html_canvas(visible) history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, True, think_elapsed, current_topic)} canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "") yield history, "", canvas_update shared_inputs = [msg_box, chatbot, session_model, temp_sl, penalty_sl, penalty_window_sl, topp_sl, topk_sl, max_len_sl, reasoning_toggle] shared_outputs = [chatbot, msg_box, canvas_display] send_btn.click(respond, inputs=shared_inputs, outputs=shared_outputs) msg_box.submit(respond, inputs=shared_inputs, outputs=shared_outputs) return demo if __name__ == "__main__": build_ui().launch(ssr_mode=False)