| import os |
| import re |
| import json |
| import time |
| import torch |
| import torch.nn as nn |
| import gradio as gr |
| from pathlib import Path |
| from collections import deque |
|
|
| |
| |
| |
| ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "linny-admin") |
|
|
| |
| |
| |
| SPACE_ROOT = Path(__file__).parent |
| CONFIG_PATH = SPACE_ROOT / "config.json" |
|
|
| _pt_files = sorted(SPACE_ROOT.glob("*.pt")) |
| MODEL_PATH = _pt_files[0] if _pt_files else SPACE_ROOT / "default_model.pt" |
|
|
| _tok_files = [f for f in SPACE_ROOT.glob("*.json") if f.name != "config.json"] |
| TOKENIZER_PATH = _tok_files[0] if _tok_files else None |
|
|
| |
| |
| |
| DEFAULT_CONFIG = { |
| "model_type": "char", |
| "hidden_layers": 5, |
| "neurons": 768, |
| "embed_size": 384, |
| "dropout": 0.2, |
| "vocab_size": 20000, |
| "working_memory": 2048, |
| "user_tag": "### Instruction:", |
| "bot_tag": "### Response:", |
| "eos_token": "<|end|>", |
| "system_prompt": "You are a helpful and intelligent AI assistant named Linny.", |
| "default_temp": 0.8, |
| "default_penalty": 1.05, |
| "default_penalty_window": 110, |
| "default_top_p": 0.4, |
| "default_top_k": 65, |
| "default_max_len": 2600, |
| "reasoning_mode": "response_prefix", |
| "reasoning_start": False, |
| "min_response_tokens": 3, |
| "max_reasoning_tokens": 2500, |
| } |
|
|
| def load_config() -> dict: |
| if CONFIG_PATH.exists(): |
| with open(CONFIG_PATH) as f: |
| data = json.load(f) |
| for k, v in DEFAULT_CONFIG.items(): |
| data.setdefault(k, v) |
| return data |
| return DEFAULT_CONFIG.copy() |
|
|
| def save_config(cfg: dict): |
| with open(CONFIG_PATH, "w") as f: |
| json.dump(cfg, f, indent=2) |
|
|
| |
| |
| |
| class LSTMCharLM(nn.Module): |
| def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2): |
| super().__init__() |
| self.embed = nn.Embedding(vocab_size, embed_size) |
| self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, |
| batch_first=True, dropout=dropout if num_layers > 1 else 0) |
| self.fc = nn.Linear(hidden_size, vocab_size) |
| def forward(self, x, hidden=None): |
| out, hidden = self.lstm(self.embed(x), hidden) |
| return self.fc(out), hidden |
|
|
| class LSTMTokenLM(nn.Module): |
| def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.2): |
| super().__init__() |
| self.embed = nn.Embedding(vocab_size, embed_size) |
| self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, |
| batch_first=True, dropout=dropout if num_layers > 1 else 0) |
| self.fc = nn.Linear(hidden_size, vocab_size) |
| def forward(self, x, hidden=None): |
| out, hidden = self.lstm(self.embed(x), hidden) |
| return self.fc(out), hidden |
|
|
| |
| |
| |
| def _build_byte_decoder(): |
| bs = (list(range(ord('!'), ord('~')+1)) + |
| list(range(ord('¡'), ord('¬')+1)) + |
| list(range(ord('®'), ord('ÿ')+1))) |
| cs = bs[:] |
| n = 0 |
| for b in range(256): |
| if b not in bs: |
| bs.append(b) |
| cs.append(256+n) |
| n += 1 |
| return {chr(c): b for b, c in zip(bs, cs)} |
|
|
| _BYTE_DECODER = _build_byte_decoder() |
|
|
| def _tok_to_bytes(tok_str): |
| try: |
| return bytes([_BYTE_DECODER[c] for c in tok_str]) |
| except KeyError: |
| return tok_str.encode('utf-8', errors='replace') |
|
|
| |
| |
| |
| class LinnyModel: |
| def __init__(self, pt_path, config: dict, tokenizer_path=None): |
| self.config = config |
| self.model_type = config.get("model_type", "char") |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| ckpt = torch.load(pt_path, map_location=self.device, weights_only=False) |
|
|
| if self.model_type == "token": |
| from tokenizers import Tokenizer as HFTokenizer |
| tok_path = tokenizer_path or config.get("tokenizer_path") |
| if not tok_path or not Path(str(tok_path)).exists(): |
| raise FileNotFoundError(f"Tokenizer not found: {tok_path}") |
| self.tokenizer = HFTokenizer.from_file(str(tok_path)) |
| vocab_size = self.tokenizer.get_vocab_size() |
| self.chars = None; self.stoi = None; self.itos = None |
| arch = ckpt.get('config', {}) |
| layers = arch.get('hidden_layers', config['hidden_layers']) |
| neurons = arch.get('neurons', config['neurons']) |
| embed = arch.get('embed_size', config['embed_size']) |
| dropout = arch.get('dropout', config.get('dropout', 0.2)) |
| self.model = LSTMTokenLM(vocab_size, embed, neurons, layers, dropout).to(self.device) |
| self.model.load_state_dict(ckpt['model_state']) |
| else: |
| self.tokenizer = None |
| self.chars = ckpt["chars"] |
| self.stoi = {ch: i for i, ch in enumerate(self.chars)} |
| self.itos = {i: ch for i, ch in enumerate(self.chars)} |
| dropout = ckpt.get("config", {}).get("dropout", config.get("dropout", 0.2)) |
| self.model = LSTMCharLM(len(self.chars), config["embed_size"], |
| config["neurons"], config["hidden_layers"], dropout).to(self.device) |
| self.model.load_state_dict(ckpt["model_state"]) |
|
|
| self.model.eval() |
| self.epoch = ckpt.get('epoch', '?') |
|
|
| |
| |
| |
| def stream_generate(self, prompt, temperature=0.8, max_len=2600, |
| penalty=1.05, penalty_window=110, |
| top_p=0.4, top_k=65, force_thinking=False, |
| prefix_text="", penalize_prefix=False, |
| min_response_tokens=3, max_reasoning_tokens=2500): |
| """ |
| Matches local linny_server.py generate_stream exactly. |
| """ |
| cfg = self.config |
| user_tag = cfg.get("user_tag", "### Instruction:") |
| bot_tag = cfg.get("bot_tag", "### Response:") |
| r_mode = cfg.get("reasoning_mode", "response_prefix") |
| eos_token_str = cfg.get("eos_token", "<|end|>") |
|
|
| |
| actual_prompt = prompt |
| if force_thinking and r_mode == "prompt_suffix": |
| if not prompt.strip().endswith("/think"): |
| actual_prompt = prompt.strip() + " /think" |
|
|
| formatted = f"{user_tag}\n{actual_prompt}\n\n{bot_tag}\n" |
|
|
| |
| working_memory = cfg.get("working_memory", 0) |
| if working_memory > 0: |
| max_len = max(50, min(max_len, working_memory - len(formatted))) |
|
|
| hidden = None |
| generated = "" |
| recent_tokens = deque(maxlen=penalty_window) |
|
|
| |
| in_reasoning = False |
| think_closed = False |
| awaiting_response = False |
| response_token_count = 0 |
| reasoning_toks = 0 |
|
|
| with torch.no_grad(): |
| |
| if self.model_type == "token": |
| ids = self.tokenizer.encode(formatted).ids |
| else: |
| ids = [self.stoi.get(c, 0) for c in formatted] |
|
|
| t = torch.tensor([ids], dtype=torch.long, device=self.device) |
| _, hidden = self.model(t, hidden) |
| input_token = torch.tensor([[ids[-1]]], dtype=torch.long, device=self.device) |
|
|
| |
| if prefix_text: |
| if self.model_type == "token": |
| prefix_ids = self.tokenizer.encode(prefix_text).ids |
| else: |
| prefix_ids = [self.stoi.get(c, 0) for c in prefix_text] |
| if prefix_ids: |
| pt = torch.tensor([prefix_ids], dtype=torch.long, device=self.device) |
| _, hidden = self.model(pt, hidden) |
| generated = prefix_text |
| input_token = torch.tensor([[prefix_ids[-1]]], dtype=torch.long, device=self.device) |
| if penalize_prefix: |
| recent_tokens.extend(prefix_ids) |
| |
| if "<think>" in prefix_text and "</think>" not in prefix_text: |
| in_reasoning = True |
| elif "</think>" in prefix_text: |
| think_closed = True |
| awaiting_response = True |
|
|
| |
| if not prefix_text and force_thinking and r_mode == "response_prefix": |
| if self.model_type == "token": |
| think_id = self.tokenizer.token_to_id("<think>") |
| if think_id is not None: |
| tt = torch.tensor([[think_id]], dtype=torch.long, device=self.device) |
| _, hidden = self.model(tt, hidden) |
| input_token = tt |
| generated = "<think>" |
| yield "<think>" |
| in_reasoning = True |
| else: |
| |
| think_ids = [self.stoi.get(ch, 0) for ch in "<think>"] |
| tt = torch.tensor([think_ids], dtype=torch.long, device=self.device) |
| _, hidden = self.model(tt, hidden) |
| input_token = torch.tensor([[think_ids[-1]]], dtype=torch.long, device=self.device) |
| generated = "<think>" |
| for ch in "<think>": |
| yield ch |
| in_reasoning = True |
|
|
| if cfg.get("reasoning_start", False) and not prefix_text: |
| prefix = f"I need to think about this. The user said '{prompt}'" |
| if self.model_type == "token": |
| pids = self.tokenizer.encode(prefix).ids |
| pt = torch.tensor([pids], dtype=torch.long, device=self.device) |
| _, hidden = self.model(pt, hidden) |
| input_token = torch.tensor([[pids[-1]]], dtype=torch.long, device=self.device) |
| generated += prefix |
| yield prefix |
| else: |
| for ch in prefix: |
| idx = self.stoi.get(ch, 0) |
| it = torch.tensor([[idx]], dtype=torch.long, device=self.device) |
| _, hidden = self.model(it, hidden) |
| input_token = it |
| generated += ch |
| yield ch |
|
|
| |
| eos_id = None |
| think_open_id = None |
| think_close_id = None |
| if self.model_type == "token": |
| eos_id = self.tokenizer.token_to_id(eos_token_str) |
| think_open_id = self.tokenizer.token_to_id("<think>") |
| think_close_id = self.tokenizer.token_to_id("</think>") |
|
|
| |
| |
| |
| if self.model_type == "token": |
| byte_buf = b"" |
| for step in range(max_len): |
| logits, hidden = self.model(input_token, hidden) |
| lf = logits[0, -1].float() / max(temperature, 1e-8) |
|
|
| |
| if penalty != 1.0 and len(recent_tokens) > 0: |
| penalized_ids = set(recent_tokens) |
| for token_id in penalized_ids: |
| if token_id < lf.size(0): |
| lf[token_id] /= penalty |
|
|
| |
| if top_k > 0: |
| tv, _ = torch.topk(lf, min(top_k, lf.size(-1))) |
| lf[lf < tv[-1]] = float("-inf") |
| |
| if top_p < 1.0: |
| sl, si = torch.sort(lf, descending=True) |
| cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1) |
| rm = cp > top_p |
| rm[..., 1:] = rm[..., :-1].clone() |
| rm[..., 0] = False |
| lf[si[rm]] = float("-inf") |
|
|
| nxt = torch.multinomial(torch.softmax(lf, dim=-1), 1).item() |
|
|
| |
| if nxt == eos_id: |
| if awaiting_response and response_token_count < min_response_tokens: |
| continue |
| else: |
| break |
|
|
| recent_tokens.append(nxt) |
|
|
| |
| if think_open_id is not None and nxt == think_open_id: |
| in_reasoning = True |
| reasoning_toks = 0 |
| if think_close_id is not None and nxt == think_close_id: |
| in_reasoning = False |
| think_closed = True |
| awaiting_response = True |
| response_token_count = 0 |
|
|
| if in_reasoning and not think_closed: |
| reasoning_toks += 1 |
| if max_reasoning_tokens and reasoning_toks >= max_reasoning_tokens: |
| |
| if byte_buf: |
| decoded = byte_buf.decode('utf-8', errors='replace') |
| generated += decoded |
| yield decoded |
| byte_buf = b"" |
| yield "</think>" |
| in_reasoning = False |
| think_closed = True |
| awaiting_response = True |
| response_token_count = 0 |
| ct = torch.tensor([[think_close_id]], dtype=torch.long, device=self.device) |
| _, hidden = self.model(ct, hidden) |
| input_token = ct |
| recent_tokens.append(think_close_id) |
| continue |
|
|
| if awaiting_response and nxt != think_close_id: |
| response_token_count += 1 |
| elif not in_reasoning and not think_closed: |
| response_token_count += 1 |
|
|
| |
| tok_str = self.tokenizer.id_to_token(nxt) or "" |
| byte_buf += _tok_to_bytes(tok_str) |
| try: |
| decoded = byte_buf.decode('utf-8') |
| generated += decoded |
| yield decoded |
| byte_buf = b"" |
| except UnicodeDecodeError: |
| pass |
|
|
| input_token = torch.tensor([[nxt]], dtype=torch.long, device=self.device) |
|
|
| if byte_buf: |
| leftover = byte_buf.decode('utf-8', errors='replace') |
| generated += leftover |
| yield leftover |
|
|
| |
| |
| |
| else: |
| for step in range(max_len): |
| logits, hidden = self.model(input_token, hidden) |
| lf = logits[0, -1].float() / max(temperature, 1e-8) |
|
|
| |
| if penalty != 1.0 and len(recent_tokens) > 0: |
| |
| penalized_chars = set(recent_tokens) |
| for ch in penalized_chars: |
| idx = self.stoi.get(ch, 0) |
| if idx < lf.size(0): |
| lf[idx] /= penalty |
|
|
| if top_k > 0: |
| tv, _ = torch.topk(lf, min(top_k, lf.size(-1))) |
| lf[lf < tv[-1]] = float("-inf") |
| if top_p < 1.0: |
| sl, si = torch.sort(lf, descending=True) |
| cp = torch.cumsum(torch.softmax(sl, dim=-1), dim=-1) |
| rm = cp > top_p |
| rm[..., 1:] = rm[..., :-1].clone() |
| rm[..., 0] = False |
| lf[si[rm]] = float("-inf") |
|
|
| idx = torch.multinomial(torch.softmax(lf, dim=-1), 1).item() |
| char = self.itos[idx] |
| recent_tokens.append(char) |
| input_token = torch.tensor([[idx]], dtype=torch.long, device=self.device) |
|
|
| if char == "#" and generated.endswith("##"): |
| break |
| generated += char |
| yield char |
|
|
| |
| |
| |
| def parse_think_tags(text: str): |
| if "<think>" not in text: |
| return None, text, False |
| before, rest = text.split("<think>", 1) |
| if "</think>" in rest: |
| inner, after = rest.split("</think>", 1) |
| return inner.strip(), (before + after).strip(), True |
| return rest.strip(), before.strip(), False |
|
|
| def extract_current_topic(thinking_text: str) -> str: |
| if not thinking_text: |
| return "Reasoning..." |
| matches = re.findall(r'\*\*([^*]+)\*\*', thinking_text) |
| if matches: |
| return f"Reasoning: {matches[-1].strip()}" |
| return "Reasoning..." |
|
|
| def format_message(visible: str, thinking: str | None, |
| think_complete: bool, think_elapsed: float | None, |
| current_topic: str = "Reasoning...") -> str: |
| if not thinking: |
| return visible |
| if think_complete and think_elapsed is not None: |
| summary = f"💭 Thought for {think_elapsed:.1f}s" |
| open_attr = "" |
| else: |
| summary = current_topic |
| open_attr = "" |
| think_block = (f"<details class='think-details'{open_attr}>" |
| f"<summary class='think-summary'>{summary}</summary>" |
| f"<div class='think-content'>{thinking}</div>" |
| f"</details>") |
| if visible.strip(): |
| return think_block + "\n\n" + visible |
| return think_block |
|
|
| def extract_html_canvas(text: str): |
| pattern = r"```html\s*\n([\s\S]*?)```" |
| match = re.search(pattern, text, re.IGNORECASE) |
| if match: |
| html_code = match.group(1) |
| cleaned_text = text[:match.start()] + text[match.end():] |
| return html_code.strip(), cleaned_text.strip() |
| return None, text |
|
|
| def make_canvas_html(code: str) -> str: |
| escaped = code.replace('"', """) |
| return (f"<div class='canvas-wrapper'>" |
| f"<div class='canvas-label'>🖼️ HTML Canvas</div>" |
| f'<iframe class="canvas-frame" srcdoc="{escaped}" ' |
| f'sandbox="allow-scripts" scrolling="auto"></iframe>' |
| f"</div>") |
|
|
| |
| |
| |
| _startup_model = None |
| _startup_cfg = None |
| _startup_msg = "⚠️ No model found. Place your `.pt` in the Space root." |
|
|
| if MODEL_PATH.exists(): |
| try: |
| _startup_cfg = load_config() |
| _startup_model = LinnyModel(MODEL_PATH, _startup_cfg, tokenizer_path=TOKENIZER_PATH) |
| wm = _startup_cfg.get("working_memory", 0) |
| mtype = _startup_cfg.get("model_type", "char") |
| epoch = _startup_model.epoch |
| _startup_msg = (f"✅ Model auto-loaded! ({_startup_cfg['hidden_layers']}L × {_startup_cfg['neurons']}N, " |
| f"epoch {epoch}, {mtype}" + (f", {wm} ctx)" if wm else ")")) |
| print(_startup_msg) |
| except Exception as e: |
| _startup_msg = f"❌ Auto-load failed: {e}" |
| print(_startup_msg) |
|
|
| |
| |
| |
| CSS = """ |
| html, body { height: 100%; margin: 0; } |
| .gradio-container { max-width: 100% !important; padding: 0 !important; height: 100vh; display: flex; flex-direction: column; } |
| footer { display: none !important; } |
| .app-header { text-align: center; padding: 18px 0 10px; background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%); border-bottom: 1px solid #2d2d4e; } |
| .app-header h1 { font-size: 2em; margin: 0; background: linear-gradient(90deg, #a78bfa, #818cf8); -webkit-background-clip: text; -webkit-text-fill-color: transparent; } |
| .app-header p { color: #666; margin: 4px 0 0; font-size: 0.85em; } |
| #chatbox { flex: 1; min-height: 0; } |
| .gradio-chatbot { height: calc(100vh - 260px) !important; border-radius: 12px !important; background: #0f0f1a !important; border: 1px solid #2d2d4e !important; } |
| .input-row { padding: 10px 16px; background: #0f0f1a; border-top: 1px solid #2d2d4e; } |
| .think-details { margin: 0 0 10px 0; border-left: 3px solid #7c3aed; border-radius: 0 8px 8px 0; background: #13111c; overflow: hidden; } |
| .think-summary { cursor: pointer; padding: 8px 12px; color: #a78bfa; font-size: 0.82em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.8px; list-style: none; user-select: none; } |
| .think-summary::-webkit-details-marker { display: none; } |
| .think-summary::before { content: "▶ "; font-size: 0.7em; } |
| details[open] .think-summary::before { content: "▼ "; } |
| .think-content { padding: 10px 14px; color: #c4b5fd; font-style: italic; font-size: 0.87em; line-height: 1.6; border-top: 1px solid #2d1f5e; white-space: pre-wrap; max-height: 320px; overflow-y: auto; } |
| .canvas-wrapper { margin: 12px 0; border-radius: 10px; overflow: hidden; border: 1px solid #3d2d6e; background: #0d0d1a; } |
| .canvas-label { padding: 7px 14px; background: #1e1040; color: #a78bfa; font-size: 0.78em; font-weight: 600; letter-spacing: 0.5px; } |
| .canvas-frame { width: 100%; min-height: 300px; border: none; display: block; background: white; } |
| .status-bar { font-size: 0.8em; padding: 4px 0; color: #666; } |
| .reasoning-row { padding: 4px 0 0 2px; } |
| .tab-nav { background: #0f0f1a !important; border-bottom: 1px solid #2d2d4e !important; } |
| """ |
|
|
| |
| |
| |
| def build_ui(): |
| cfg = load_config() |
| with gr.Blocks(title="Linny AI", css=CSS) as demo: |
| session_model = gr.State(_startup_model) |
| session_cfg = gr.State(_startup_cfg) |
|
|
| gr.HTML(""" |
| <div class='app-header'> |
| <h1>🤖 Linny AI</h1> |
| <p>LSTM Language Model · Runs locally on the Space</p> |
| </div> |
| """) |
|
|
| with gr.Tabs(): |
| with gr.TabItem("💬 Chat"): |
| model_status = gr.Markdown(value=_startup_msg, elem_classes=["status-bar"]) |
| chatbot = gr.Chatbot(elem_id="chatbox", label="", render_markdown=True) |
| canvas_display = gr.HTML(visible=False) |
|
|
| with gr.Row(elem_classes=["input-row"]): |
| msg_box = gr.Textbox(placeholder="Message Linny…", show_label=False, scale=8, lines=1, autofocus=True) |
| send_btn = gr.Button("Send ↩", variant="primary", scale=1) |
|
|
| with gr.Row(elem_classes=["reasoning-row"]): |
| reasoning_toggle = gr.Checkbox(value=False, label="🧠 Force Reasoning (pre-fills <think> token)", scale=1) |
|
|
| with gr.Accordion("⚙️ Generation Settings", open=False): |
| with gr.Row(): |
| temp_sl = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Temperature") |
| penalty_sl = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Repetition Penalty") |
| with gr.Row(): |
| penalty_window_sl = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Penalty Window") |
| with gr.Row(): |
| topp_sl = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Top-P") |
| topk_sl = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Top-K (0 = off)") |
| max_len_sl = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Max Response Length") |
|
|
| with gr.Accordion("📤 Upload Your Own Model (optional)", open=False): |
| gr.Markdown("Upload a `.pt` file and optionally a tokenizer `.json` for token-based models.") |
| with gr.Row(): |
| user_pt = gr.File(label="Upload .pt file", file_types=[".pt"]) |
| user_tok = gr.File(label="Upload tokenizer .json (token model only)", file_types=[".json"]) |
| model_type_radio = gr.Radio(choices=["char", "token"], value="char", label="Model Type") |
| with gr.Row(): |
| u_layers = gr.Number(value=5, label="Hidden Layers", precision=0) |
| u_neurons = gr.Number(value=768, label="Neurons", precision=0) |
| u_embed = gr.Number(value=384, label="Embed Size", precision=0) |
| u_dropout = gr.Number(value=0.2, label="Dropout", precision=2) |
| u_working_mem = gr.Number(value=2048, label="Working Memory (0=off)", precision=0) |
| with gr.Row(): |
| u_user_tag = gr.Textbox(value="### Instruction:", label="User Tag") |
| u_bot_tag = gr.Textbox(value="### Response:", label="Bot Tag") |
| u_eos = gr.Textbox(value="<|end|>", label="EOS Token") |
| load_user_btn = gr.Button("🚀 Load My Model", variant="secondary") |
| user_load_status = gr.Markdown("") |
|
|
| with gr.TabItem("🔒 Admin"): |
| gr.Markdown("## Admin Panel\nEdit `config.json` settings.") |
| with gr.Row(): |
| admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter password", scale=3) |
| admin_login_btn = gr.Button("Login", scale=1) |
| admin_status = gr.Markdown("") |
| with gr.Group(visible=False) as admin_panel: |
| gr.Markdown("### Model Architecture") |
| model_type_admin = gr.Radio(choices=["char", "token"], value=cfg.get("model_type", "char"), label="Model Type") |
| with gr.Row(): |
| a_layers = gr.Number(value=cfg.get("hidden_layers", 5), label="Hidden Layers", precision=0) |
| a_neurons = gr.Number(value=cfg.get("neurons", 768), label="Neurons", precision=0) |
| a_embed = gr.Number(value=cfg.get("embed_size", 384), label="Embed Size", precision=0) |
| a_dropout = gr.Number(value=cfg.get("dropout", 0.2), label="Dropout", precision=2) |
| a_working_mem = gr.Number(value=cfg.get("working_memory", 2048), label="Working Memory (0=off)", precision=0) |
|
|
| gr.Markdown("### Tags & Tokens") |
| with gr.Row(): |
| a_user_tag = gr.Textbox(value=cfg.get("user_tag", "### Instruction:"), label="User Tag") |
| a_bot_tag = gr.Textbox(value=cfg.get("bot_tag", "### Response:"), label="Bot Tag") |
| a_eos = gr.Textbox(value=cfg.get("eos_token", "<|end|>"), label="EOS Token") |
| a_sys = gr.Textbox(value=cfg.get("system_prompt", "You are a helpful AI named Linny."), label="System Prompt", lines=2) |
|
|
| gr.Markdown("### Reasoning") |
| a_reasoning_mode = gr.Radio(choices=["prompt_suffix", "response_prefix"], value=cfg.get("reasoning_mode", "response_prefix"), label="Force Reasoning Mode") |
| a_reasoning_start = gr.Checkbox(value=cfg.get("reasoning_start", False), label="Enable Reasoning Start Prefix") |
| a_min_response_tokens = gr.Number(value=cfg.get("min_response_tokens", 3), label="Minimum Response Tokens After </think>", precision=0) |
| a_max_reasoning_tokens = gr.Number(value=cfg.get("max_reasoning_tokens", 2500), label="Max Reasoning Tokens", precision=0) |
|
|
| gr.Markdown("### Default Generation Settings") |
| with gr.Row(): |
| a_temp = gr.Slider(0.1, 1.5, value=cfg.get("default_temp", 0.8), step=0.05, label="Default Temperature") |
| a_penalty = gr.Slider(1.0, 2.0, value=cfg.get("default_penalty", 1.05), step=0.05, label="Default Penalty") |
| with gr.Row(): |
| a_penalty_window = gr.Slider(10, 500, value=cfg.get("default_penalty_window", 110), step=10, label="Default Penalty Window") |
| with gr.Row(): |
| a_topp = gr.Slider(0.0, 1.0, value=cfg.get("default_top_p", 0.4), step=0.05, label="Default Top-P") |
| a_topk = gr.Slider(0, 100, value=cfg.get("default_top_k", 65), step=1, label="Default Top-K") |
| a_maxlen = gr.Slider(100, 8000, value=cfg.get("default_max_len", 2600), step=50, label="Default Max Length") |
|
|
| save_cfg_btn = gr.Button("💾 Save config.json", variant="primary") |
| save_status = gr.Markdown("") |
|
|
| |
| def do_admin_login(pw): |
| if pw == ADMIN_PASSWORD: |
| return gr.update(visible=True), "✅ Logged in." |
| return gr.update(visible=False), "❌ Incorrect password." |
| admin_login_btn.click(do_admin_login, inputs=[admin_pw], outputs=[admin_panel, admin_status]) |
|
|
| def do_save_config(mtype, layers, neurons, embed, dropout, working_mem, |
| user_tag, bot_tag, eos, sys_prompt, |
| reasoning_mode, reasoning_start, |
| min_response_tokens, max_reasoning_tokens, |
| temp, penalty, penalty_window, top_p, top_k, max_len): |
| try: |
| new_cfg = { |
| "model_type": mtype, |
| "hidden_layers": int(layers), |
| "neurons": int(neurons), |
| "embed_size": int(embed), |
| "dropout": float(dropout), |
| "working_memory": int(working_mem), |
| "user_tag": user_tag, |
| "bot_tag": bot_tag, |
| "eos_token": eos, |
| "system_prompt": sys_prompt, |
| "reasoning_mode": reasoning_mode, |
| "reasoning_start": bool(reasoning_start), |
| "min_response_tokens": int(min_response_tokens), |
| "max_reasoning_tokens": int(max_reasoning_tokens), |
| "default_temp": float(temp), |
| "default_penalty": float(penalty), |
| "default_penalty_window": int(penalty_window), |
| "default_top_p": float(top_p), |
| "default_top_k": int(top_k), |
| "default_max_len": int(max_len), |
| } |
| save_config(new_cfg) |
| return "✅ config.json saved! Restart the Space to apply changes." |
| except Exception as e: |
| return f"❌ Error: {e}" |
| save_cfg_btn.click(do_save_config, |
| inputs=[model_type_admin, a_layers, a_neurons, a_embed, a_dropout, a_working_mem, |
| a_user_tag, a_bot_tag, a_eos, a_sys, |
| a_reasoning_mode, a_reasoning_start, |
| a_min_response_tokens, a_max_reasoning_tokens, |
| a_temp, a_penalty, a_penalty_window, a_topp, a_topk, a_maxlen], |
| outputs=[save_status]) |
|
|
| def load_user_model(pt_file, tok_file, mtype, layers, neurons, embed, dropout, |
| working_mem, user_tag, bot_tag, eos): |
| if pt_file is None: |
| return None, None, "❌ Please upload a .pt file first." |
| try: |
| user_cfg = { |
| "model_type": mtype, |
| "hidden_layers": int(layers), |
| "neurons": int(neurons), |
| "embed_size": int(embed), |
| "dropout": float(dropout), |
| "working_memory": int(working_mem), |
| "user_tag": user_tag, |
| "bot_tag": bot_tag, |
| "eos_token": eos, |
| "reasoning_mode": cfg.get("reasoning_mode", "response_prefix"), |
| "reasoning_start": cfg.get("reasoning_start", False), |
| "min_response_tokens": cfg.get("min_response_tokens", 3), |
| "max_reasoning_tokens": cfg.get("max_reasoning_tokens", 2500), |
| } |
| tok_path = tok_file.name if tok_file else None |
| m = LinnyModel(pt_file.name, user_cfg, tokenizer_path=tok_path) |
| return m, user_cfg, f"✅ Model loaded! ({user_cfg['hidden_layers']}L × {user_cfg['neurons']}N, {mtype}, epoch {m.epoch})" |
| except Exception as e: |
| return None, None, f"❌ Error: {e}" |
| load_user_btn.click(load_user_model, |
| inputs=[user_pt, user_tok, model_type_radio, |
| u_layers, u_neurons, u_embed, u_dropout, u_working_mem, |
| u_user_tag, u_bot_tag, u_eos], |
| outputs=[session_model, session_cfg, user_load_status]) |
|
|
| def respond(message, history, model, temp, penalty, penalty_window, top_p, top_k, max_len, force_thinking): |
| if not message.strip(): |
| yield history, "", gr.update(visible=False) |
| return |
| if model is None: |
| yield history + [{"role":"user","content":message},{"role":"assistant","content":"⚠️ No model loaded."}], "", gr.update(visible=False) |
| return |
|
|
| history = history + [{"role":"user","content":message},{"role":"assistant","content":""}] |
| full_response = "" |
| stop = False |
| think_start = None |
| think_end = None |
|
|
| try: |
| for chunk in model.stream_generate( |
| message, |
| temperature=float(temp), |
| max_len=int(max_len), |
| penalty=float(penalty), |
| penalty_window=int(penalty_window), |
| top_p=float(top_p), |
| top_k=int(top_k), |
| force_thinking=bool(force_thinking), |
| min_response_tokens=model.config.get("min_response_tokens", 3), |
| max_reasoning_tokens=model.config.get("max_reasoning_tokens", 2500), |
| ): |
| full_response += chunk |
| |
| eos_str = model.config.get("eos_token", "<|end|>") |
| if eos_str in full_response: |
| full_response = full_response[:full_response.find(eos_str)] |
| if full_response.count("<think>") > full_response.count("</think>"): |
| full_response += "</think>" |
| else: |
| stop = True |
|
|
| thinking, visible, think_complete = parse_think_tags(full_response) |
| if thinking and think_start is None: |
| think_start = time.time() |
| if think_complete and think_end is None and think_start is not None: |
| think_end = time.time() |
| think_elapsed = (think_end - think_start) if (think_end and think_start) else (time.time() - think_start if think_start else None) |
| current_topic = extract_current_topic(thinking) if thinking else "Reasoning..." |
| html_code, visible_clean = extract_html_canvas(visible) |
| canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "") |
| history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, think_complete, think_elapsed, current_topic)} |
| yield history, "", canvas_update |
| if stop: |
| break |
| except Exception as e: |
| history[-1] = {"role":"assistant","content":f"⚠️ Generation error: {e}"} |
| yield history, "", gr.update(visible=False) |
| return |
|
|
| |
| thinking, visible, think_complete = parse_think_tags(full_response) |
| if think_end is None and think_start is not None: |
| think_end = time.time() |
| think_elapsed = (think_end - think_start) if (think_end and think_start) else None |
| if think_complete and not visible.strip(): |
| visible = "*(no response generated)*" |
| current_topic = extract_current_topic(thinking) if thinking else "Reasoning..." |
| html_code, visible_clean = extract_html_canvas(visible) |
| history[-1] = {"role":"assistant","content":format_message(visible_clean, thinking, True, think_elapsed, current_topic)} |
| canvas_update = gr.update(visible=bool(html_code), value=make_canvas_html(html_code) if html_code else "") |
| yield history, "", canvas_update |
|
|
| shared_inputs = [msg_box, chatbot, session_model, |
| temp_sl, penalty_sl, penalty_window_sl, topp_sl, topk_sl, max_len_sl, reasoning_toggle] |
| shared_outputs = [chatbot, msg_box, canvas_display] |
| send_btn.click(respond, inputs=shared_inputs, outputs=shared_outputs) |
| msg_box.submit(respond, inputs=shared_inputs, outputs=shared_outputs) |
|
|
| return demo |
|
|
| if __name__ == "__main__": |
| build_ui().launch(ssr_mode=False) |