# -*- coding: utf-8 -*- import gradio as gr import requests import json import os import datetime import textwrap import random import threading import webbrowser import shutil import time # ========================================================= # --- VERSIONING --- # ========================================================= VERSION = "1.0h" # v1.0h: CRITICAL SPACING FIX: Replaced restrictive .isalnum() space re-insertion logic to correctly add spaces after punctuation marks when followed by a word token. # ========================================================= # --- CONFIGURATION & DATA PERSISTENCE --- # ========================================================= SETTINGS_FILE = "openrouter_settings.json" HISTORY_FILE = "openrouter_history.json" DEFAULT_MODEL = "deepseek/deepseek-chat-v3.1:free" MAX_BACKUPS = 5 DEFAULT_SETTINGS = { "api_key": "", "system_prompt": "You are a helpful, creative, and friendly AI assistant running on an Android device.", "model": DEFAULT_MODEL, "temperature": 0.7, "max_tokens": 1024, "response_tokens": 0, "top_p": 0.9, "frequency_penalty": 0.0, "presence_penalty": 0.0, "stream": True, "top_k": 0, "min_p": 0.0, "top_a": 0.0, "repetition_penalty": 1.0, "seed": -1 } NEW_PRESET_SETTINGS = { "api_key": " ", "system_prompt": "You are a helpful assistant.", "model": "cognitivecomputations/dolphin-mistral-24b-venice-edition:free", "temperature": 1.05, "max_tokens": 0, "response_tokens": 2272, "top_p": 0.65, "frequency_penalty": 0.0, "presence_penalty": 0.0, "stream": True, "top_k": 0, "min_p": 0.0, "top_a": 0.0, "repetition_penalty": 1.39, "seed": -1 } # ========================================================= # --- Globals for incremental logging (in-memory only) --- # ========================================================= last_logged_index = 0 active_session_name = None _last_save_time = 0.0 _save_interval = 0.5 def load_data(file_path, default_data=None): """Loads JSON data from file or returns default.""" if default_data is None: default_data = {} if os.path.exists(file_path): try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) it if isinstance(default_data, list): if isinstance(data, list): return data if isinstance(data, dict) and "history" in data and isinstance(data["history"], list): return data["history"] return default_data if isinstance(data, dict): for key, default_val in DEFAULT_SETTINGS.items(): for preset_name in data.get('presets', {}): if key not in data['presets'][preset_name]: data['presets'][preset_name][key] = default_val return data return default_data except (json.JSONDecodeError, IOError): print(f"Warning: Could not parse {file_path}, using default.") return default_data return default_data def save_data(file_path, data): """Saves dictionary or list as JSON (Used for history in this version).""" try: with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) except IOError as e: print(f"Error saving {file_path}: {e}") def create_rolling_backup(file_path, max_backups=MAX_BACKUPS): """Creates a rolling backup of the settings file before overwriting it.""" if not os.path.exists(file_path): return oldest_backup = f"{file_path}.b{max_backups}" if os.path.exists(oldest_backup): os.remove(oldest_backup) for i in range(max_backups - 1, 0, -1): src = f"{file_path}.b{i}" dst = f"{file_path}.b{i+1}" if os.path.exists(src): try: os.rename(src, dst) except OSError as e: print(f"Warning: Could not rename {src} to {dst}: {e}") newest_backup = f"{file_path}.b1" try: shutil.copy2(file_path, newest_backup) print(f"Settings backed up to {newest_backup}") except IOError as e: print(f"Error creating backup {newest_backup}: {e}") def save_settings_to_disk(file_path, data): """Handles backup and then saves the settings data.""" create_rolling_backup(file_path) try: with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4) except IOError as e: print(f"Error saving {file_path}: {e}") # Initialize settings container all_settings = load_data(SETTINGS_FILE, { "presets": {"Default": DEFAULT_SETTINGS}, "last_preset_name": "Default" }) # Ensure structure integrity if "presets" not in all_settings or not isinstance(all_settings["presets"], dict): all_settings = {"presets": {"Default": DEFAULT_SETTINGS}, "last_preset_name": "Default"} chat_history = load_data(HISTORY_FILE, []) if not isinstance(chat_history, list): chat_history = [] # Load last preset last_preset_name = all_settings.get("last_preset_name", "Default") current_settings = all_settings["presets"].get(last_preset_name, DEFAULT_SETTINGS).copy() # ========================================================= # --- Stop control (thread-safe) --- # ========================================================= stop_event = threading.Event() def quit_app(): """Immediately quits the application via os._exit.""" print("Quitting application...") # Use a timer to exit after a slight delay to allow the server to send the response. threading.Timer(0.1, os._exit, args=[0]).start() # CRITICAL FIX v1.0c: The function returns the update dictionaries return gr.Markdown.update(value="👋 Shutting down...", visible=True), gr.update(interactive=False) # ========================================================= # --- OPENROUTER API INTERACTION (STREAMING & FALLBACK) --- # ========================================================= def call_openrouter_api(messages, settings): """Makes a streaming API call to OpenRouter, yields content chunks. Filters <|begin of sentence|> token. """ api_key = settings.get("api_key").strip() if not api_key: yield "ERROR: API Key not set in Settings." return url = "https://openrouter.ai/api/v1/chat/completions" try: payload = { "model": settings["model"], "messages": messages, "temperature": float(settings.get("temperature", 0.7)), "top_p": float(settings.get("top_p", 0.9)), "frequency_penalty": float(settings.get("frequency_penalty", 0.0)), "presence_penalty": float(settings.get("presence_penalty", 0.0)), "stream": True, } response_tokens = int(settings.get("response_tokens", 0)) if response_tokens > 0: payload["max_tokens"] = response_tokens for k in ["top_k", "min_p", "top_a", "repetition_penalty", "seed"]: v = settings.get(k) if v is not None: if k == "top_k" and int(v) == 0: continue if k in ("min_p", "top_a") and float(v) == 0.0: continue if k == "repetition_penalty" and float(v) == 1.0: continue if k == "seed" and int(v) == -1: continue payload[k] = v except ValueError as e: yield f"ERROR: Invalid setting type. {e}" return headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # Filter tokens (v1.0a: half-width, v1.0g: full-width) TOKEN_TO_FILTER_HALF = "<|begin of sentence|>" TOKEN_TO_FILTER_FULL = "<|begin of sentence|>" try: response = requests.post(url, headers=headers, json=payload, stream=True, timeout=60) if response.status_code != 200: try: msg = response.json().get('error', {}).get('message', response.text) except Exception: msg = response.text yield f"❌ API request failed ({response.status_code}): {msg}" return got_content = False for line_bytes in response.iter_lines(): if stop_event.is_set(): yield "⛔ Inference stopped by user." return if not line_bytes: continue try: line = line_bytes.decode('utf-8') except UnicodeDecodeError: print("Warning: Failed to decode a line from API stream.") continue if line.startswith("data:"): data_str = line[len("data:"):].strip() if data_str == "[DONE]": break try: chunk = json.loads(data_str) except Exception: continue choices = chunk.get("choices", []) if not choices: continue choice0 = choices[0] delta = choice0.get("delta", {}) content = delta.get("content") if content: # CRITICAL FIX v1.0g: Apply filters for both half-width and full-width tokens if TOKEN_TO_FILTER_FULL in content: content = content.replace(TOKEN_TO_FILTER_FULL, "") if TOKEN_TO_FILTER_HALF in content: content = content.replace(TOKEN_TO_FILTER_HALF, "") # CRITICAL FIX v1.0f: Strip leading whitespace potentially left by token removal. content = content.lstrip() if content: # Check if anything is left got_content = True yield content if choice0.get("finish_reason") is not None: break elif line.startswith("event: error"): try: errdata = json.loads(line[len("event: error"):].strip()) yield f"STREAM ERROR: {errdata.get('message', 'Unknown Stream Error')}" except Exception: yield "STREAM ERROR: unknown" return if not got_content: # Silent fallback to non-streaming payload["stream"] = False try: r2 = requests.post(url, headers=headers, json=payload, timeout=60) if r2.status_code == 200: j = r2.json() content = None choices = j.get("choices", []) if choices: first = choices[0] if isinstance(first.get("message"), dict): content = first["message"].get("content") elif "text" in first: content = first.get("text") elif isinstance(first.get("delta"), dict): content = first["delta"].get("content") if content: # Apply filters if TOKEN_TO_FILTER_FULL in content: content = content.replace(TOKEN_TO_FILTER_FULL, "") if TOKEN_TO_FILTER_HALF in content: content = content.replace(TOKEN_TO_FILTER_HALF, "") # CRITICAL FIX v1.0f: Strip leading/trailing whitespace content = content.strip() if content: # Check if anything is left after stripping yield content else: yield "⚠️ No content received from fallback response." else: yield f"❌ Fallback failed ({r2.status_code}): {r2.text}" except requests.exceptions.RequestException as e: yield f"❌ Fallback request error: {e}" except requests.exceptions.RequestException as e: yield f"❌ Request error: {e}" except Exception as e: yield f"💥 Unexpected error: {e}" # ========================================================= # --- CHAT FUNCTIONALITY & STATE MANAGEMENT --- # ========================================================= # Helper function to convert Gradio's old list-of-tuples history to new messages format def convert_to_messages_format(history): new_history = [] for user_msg, bot_msg in history: if user_msg: new_history.append({"role": "user", "content": user_msg}) if bot_msg: new_history.append({"role": "assistant", "content": bot_msg}) return new_history def chat_interface(user_input, history_state): """Handles message submission and response generation with streaming.""" global current_settings, _last_save_time if stop_event.is_set(): stop_event.clear() if not user_input.strip(): # Yields the converted history back to the chatbot and updates the state yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True) return # Add user message to the internal list-of-tuples state history_state.append([user_input, None]) # 1. IMMEDIATE SAVE OF USER MESSAGE 💾 save_current_session(history_state) _last_save_time = time.time() # Prepare for response: send the current list-of-tuples state converted for display, # the state itself, and disable input yield convert_to_messages_format(history_state), history_state, gr.update(value="", interactive=False) # Prepare API messages messages = [{"role": "system", "content": current_settings["system_prompt"]}] for user_msg, bot_msg in history_state[:-1]: if user_msg: messages.append({"role": "user", "content": user_msg}) if bot_msg: messages.append({"role": "assistant", "content": bot_msg}) messages.append({"role": "user", "content": user_input}) full_reply = "" for chunk in call_openrouter_api(messages, current_settings): is_error_or_stop = isinstance(chunk, str) and (chunk.startswith("ERROR:") or chunk.startswith("❌") or chunk.startswith("💥")) if is_error_or_stop: full_reply += ("\n\n" + chunk) if full_reply else chunk history_state[-1][1] = full_reply save_current_session(history_state) # Send the updated state (converted for display) yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True) return if isinstance(chunk, str) and chunk.startswith("⛔ Inference stopped"): full_reply += "\n\n*(Inference stopped)*" history_state[-1][1] = full_reply save_current_session(history_state) # Send the updated state (converted for display) yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True) return if isinstance(chunk, str) and chunk.startswith("⛔ Inference stopped"): full_reply += "\n\n*(Inference stopped)*" history_state[-1][1] = full_reply save_current_session(history_state) # Send the updated state (converted for display) yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True) return # v1.0h CRITICAL SPACING FIX: Updated logic to correctly add a space after punctuation. if full_reply and chunk: # Add a space if the previous character is NOT a space, AND the new chunk starts with a letter or digit. if full_reply[-1] != ' ' and chunk[0].isalnum(): full_reply += " " + chunk else: full_reply += chunk else: full_reply += chunk # Handles the very first chunk history_state[-1][1] = full_reply # Send the updated state (converted for display) yield convert_to_messages_format(history_state), history_state, gr.update(interactive=False) # 2. FINAL SAVE OF ASSISTANT MESSAGE 💾 history_state[-1][1] = full_reply save_current_session(history_state) _last_save_time = time.time() # Send the final state (converted for display) yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True) def save_current_session(history): """Saves session to history file incrementally (only new turns).""" global chat_history, last_logged_index, active_session_name if not history or not any(user_msg for user_msg, _ in history): return sanitized = [ [user_msg, bot_msg if bot_msg else "*(Incomplete Response)*"] for user_msg, bot_msg in history if user_msg ] is_continuation = False last_entry = chat_history[-1] if chat_history else None if last_entry: if active_session_name and active_session_name == last_entry.get("name"): is_continuation = True else: existing_hist = last_entry.get("history", []) if len(existing_hist) <= len(sanitized) and existing_hist == sanitized[:len(existing_hist)]: active_session_name = last_entry.get("name") last_logged_index = len(existing_hist) is_continuation = True if is_continuation and last_entry: updated_history = last_entry.get("history", []).copy() # Case 1: Updating the last turn (Assistant complete save, where length is equal) if len(sanitized) == len(updated_history) and len(sanitized) > 0: updated_history[-1] = sanitized[-1] # Case 2: Appending new turns (Next User send save, where live history is longer) elif len(sanitized) > len(updated_history): start_idx = len(updated_history) novel_portion = sanitized[start_idx:] updated_history.extend(novel_portion) chat_history[-1] = {"name": last_entry.get("name"), "history": updated_history} save_data(HISTORY_FILE, chat_history) last_logged_index = len(sanitized) else: first_message = history[0][0] name_base = textwrap.shorten(first_message, width=30, placeholder="...") proposed_name = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {name_base}" chat_history.append({"name": proposed_name, "history": sanitized}) save_data(HISTORY_FILE, chat_history) active_session_name = proposed_name last_logged_index = len(sanitized) def new_chat(history): """Clears chat and saves current session. Resets incremental-log indices.""" global last_logged_index, active_session_name if history: save_current_session(history) last_logged_index = 0 active_session_name = None return [], [] # returns empty list for chatbot (messages format) and empty list for state (tuples format) def get_history_names(): return [s["name"] for s in chat_history] def load_history_session(name): """Loads session content for the history display (History tab only).""" if name is None: return [] for session in chat_history: if session["name"] == name: # Load list-of-tuples and convert to messages format for display return convert_to_messages_format(session["history"]) return [] def refresh_history_dropdown(history): """Return updated choices for the history dropdown (value cleared).""" return gr.update(choices=get_history_names(), value=None) def set_stop_event(): """Set the stop_event to true so the streaming loop will stop.""" stop_event.set() return def clear_history_log(): """Backs up history file and clears chat_history in memory and on disk. (v1.0a)""" global chat_history, last_logged_index, active_session_name message = "History log already clear." if os.path.exists(HISTORY_FILE): timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = f"{HISTORY_FILE}.{timestamp}.ahv" try: shutil.copy2(HISTORY_FILE, backup_file) os.remove(HISTORY_FILE) message = f"History log backed up to `{backup_file}` and cleared." except Exception as e: message = f"ERROR backing up/clearing log: {e}" chat_history = [] last_logged_index = 0 active_session_name = None return ( [], # Clear history_display (messages format) gr.update(value=None, choices=[]), # Clear visible_history_dropdown gr.update(value=None, choices=[]), # Clear history_dropdown_ref message, # Update history_message gr.update(choices=get_preset_names()) # Clear preset_dropdown (just for full output list) ) def copy_all_session_text(name): """Formats and returns the entire history session content as text for copying. (v1.0a)""" if name is None: return "No session selected." for session in chat_history: if session["name"] == name: text = f"--- Session: {name} ---\n\n" for user_msg, bot_msg in session["history"]: text += f"USER: {user_msg}\n" text += f"ASSISTANT: {bot_msg}\n\n" return text return "Session not found." # ========================================================= # --- SETTINGS FUNCTIONALITY --- # ========================================================= def get_preset_names(): return list(all_settings["presets"].keys()) def save_settings( preset_name, api_key, sys_prompt, model, temp, tokens, response_tokens, top_p, freq_p, pres_p, top_k, min_p, top_a, rep_p, seed ): """Save new or updated preset, with rolling backup.""" global all_settings, current_settings name_to_save = preset_name if preset_name.strip() else all_settings.get("last_preset_name", "Default") new_settings = { "api_key": api_key, "system_prompt": sys_prompt, "model": model, "temperature": temp, "max_tokens": tokens, "response_tokens": response_tokens, "top_p": top_p, "frequency_penalty": freq_p, "presence_penalty": pres_p, "stream": current_settings.get("stream", True), "top_k": top_k, "min_p": min_p, "top_a": top_a, "repetition_penalty": rep_p, "seed": seed, } all_settings["presets"][name_to_save] = new_settings all_settings["last_preset_name"] = name_to_save save_settings_to_disk(SETTINGS_FILE, all_settings) if name_to_save == all_settings.get("last_preset_name", "Default"): current_settings = new_settings.copy() return f"Settings saved as '{name_to_save}'. Backups created (max {MAX_BACKUPS}).", gr.update(choices=get_preset_names(), value=name_to_save) def load_settings(preset_name): """Load settings preset and update chat tab's model display.""" global all_settings, current_settings if preset_name is None or preset_name not in all_settings["presets"]: settings_to_load = current_settings.copy() else: settings_to_load = all_settings["presets"].get(preset_name, DEFAULT_SETTINGS).copy() current_settings = settings_to_load.copy() if preset_name and preset_name in all_settings["presets"]: all_settings["last_preset_name"] = preset_name save_settings_to_disk(SETTINGS_FILE, all_settings) for key, default_val in DEFAULT_SETTINGS.items(): if key not in settings_to_load: settings_to_load[key] = default_val return ( settings_to_load["api_key"], settings_to_load["system_prompt"], settings_to_load["model"], settings_to_load["temperature"], settings_to_load["max_tokens"], settings_to_load.get("response_tokens", DEFAULT_SETTINGS.get("response_tokens", 0)), settings_to_load["top_p"], settings_to_load["frequency_penalty"], settings_to_load["presence_penalty"], settings_to_load.get("top_k", DEFAULT_SETTINGS["top_k"]), settings_to_load.get("min_p", DEFAULT_SETTINGS["min_p"]), settings_to_load.get("top_a", DEFAULT_SETTINGS["top_a"]), settings_to_load.get("repetition_penalty", DEFAULT_SETTINGS["repetition_penalty"]), settings_to_load.get("seed", DEFAULT_SETTINGS["seed"]), f"Settings '{preset_name}' loaded.", f"### Current Model: `{settings_to_load['model']}`", gr.update(value=preset_name) ) def create_new_preset(new_preset_name): """Creates a new preset with specific defaults, loads it, and updates the list.""" global all_settings if not new_preset_name or not new_preset_name.strip(): load_results = load_settings(all_settings.get("last_preset_name", "Default")) return load_results + (gr.update(choices=get_preset_names()),) name = new_preset_name.strip() if name in all_settings["presets"]: load_results = load_settings(all_settings.get("last_preset_name", "Default")) load_results = load_results[:-3] + (f"Error: Preset '{name}' already exists.",) + load_results[-2:] return load_results + (gr.update(choices=get_preset_names()),) all_settings["presets"][name] = NEW_PRESET_SETTINGS.copy() all_settings["last_preset_name"] = name save_settings_to_disk(SETTINGS_FILE, all_settings) load_results = load_settings(name) return load_results[:-3] + ( f"Preset '{name}' created and loaded.", f"### Current Model: `{NEW_PRESET_SETTINGS['model']}`", gr.update(value=name), gr.update(choices=get_preset_names(), value=name), ) # ========================================================= # --- GRADIO UI (MAIN) --- # ========================================================= CHATBOT_HEIGHT = "50vh" custom_css = f""" /* Target the user_input Textbox. */ .gradio-container #user_input_wrapper textarea {{ min-height: 40px !important; transition: min-height 0.2s ease-in-out; }} /* Hide Gradio footer (v1.0a) */ footer {{ visibility: hidden !important; height: 0 !important; }} /* Reduce space between chatbot and input (v1.0a) */ .gradio-container #user_input_wrapper {{ margin-top: 5px !important; padding-top: 0 !important; }} /* Ensure the main title is gone (v1.0a) */ .gradio-container h2:first-child {{ display: none !important; }} /* Hide Gradio settings button in the top right (v1.0a) */ .gradio-container button[aria-label="Settings"] {{ display: none !important; }} /* Fix for tab label text padding after removing text (v1.0e) */ .gradio-container .tabs > div > button.selected {{ padding: 0 10px !important; }} /* --- NEW FULL-WIDTH FIXES --- */ /* 1. Target the main content area inside the container and remove default padding */ .gradio-container .main {{ padding-left: 0 !important; padding-right: 0 !important; }} /* 2. Target the main page wrapper/block and remove default padding */ .gradio-container .block {{ padding-left: 0 !important; padding-right: 0 !important; }} /* 3. Ensure the Tabs content container also takes full width and remove its internal padding */ .gradio-container .tabitem {{ padding: 0 !important; }} /* 4. Ensure the Chatbot itself takes the full width and apply a minimal buffer (5px) on the sides */ .gradio-container .gradio-chatbot {{ width: 100% !important; margin: 0 !important; padding: 0 5px !important; }} /* 5. Apply the minimal buffer (5px) to the User Input/Controls Row to align with the Chatbot */ .gradio-container #user_input_wrapper {{ padding: 0 5px !important; }} /* Apply 5px padding to the button row (requires the new elem_classes=["send_btn_row"] on the gr.Row) */ .gradio-container .send_btn_row {{ padding: 0 5px !important; }} """ with gr.Blocks(title=f"Termux LLM Client (OpenRouter) v{VERSION}", css=custom_css) as app: # Store history as a list of tuples (user_msg, bot_msg) to simplify API and saving logic session_history = gr.State(value=[]) current_model_display = gr.Markdown(f"### Current Model: `{current_settings['model']}`", visible=False) # Hide the main title (v1.0a) gr.Markdown(f"## 🤖 Termux LLM Client (OpenRouter) v{VERSION}", visible=False) # CRITICAL FIX v1.0d: Define the shutdown component explicitly for use in outputs list shutdown_message = gr.Markdown("", visible=False) with gr.Row(visible=False): pass # --- Tab Bar with Quit Button (v1.0e: Isolated Quit Button and Tabs) --- with gr.Row(variant="compact", equal_height=False): # QUIT BUTTON: scale=0 ensures minimum width, positioned on the far left (Fix 2) quit_btn = gr.Button("❌", variant="stop", scale=0, min_width=40) # COLUMN WRAPPER: Takes all remaining horizontal space for the tabs, preventing UI squeeze (Fix 2) with gr.Column(scale=100): # TABS (Fix 3: Icons only) with gr.Tabs() as tabs: # ---------------------- CHAT TAB (💬) ---------------------- with gr.TabItem("💬"): # Fix 3: Icon only # Use type='messages' (v1.0c) chatbot = gr.Chatbot( label="Chat History", height=CHATBOT_HEIGHT, show_copy_button=True, type='messages' ) with gr.Row(elem_id="user_input_wrapper"): user_input = gr.Textbox( placeholder="Type your message...", label="Your Prompt", show_label=False, lines=3, ) # MODIFIED: Added elem_classes to apply the CSS padding fix with gr.Row(variant="compact", elem_classes=["send_btn_row"]): send_btn = gr.Button("➤", variant="primary", scale=1, min_width=0) stop_btn = gr.Button("⛔", variant="stop", scale=1, min_width=0) new_chat_btn = gr.Button("🗑️", variant="secondary", scale=1, min_width=0) with gr.Row(visible=False): history_dropdown_ref = gr.Dropdown( label="Hidden History Dropdown Ref", choices=get_history_names(), value=None, interactive=True ) def _refresh_history_dropdown_output(history): return refresh_history_dropdown(history) # Handler for chat, inputs are user_input and session_history (tuples) # Outputs are chatbot (messages), session_history (tuples), and user_input (update) send_btn.click( chat_interface, inputs=[user_input, session_history], outputs=[chatbot, session_history, user_input], show_progress=True, ).then( _refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref] ) user_input.submit( chat_interface, inputs=[user_input, session_history], outputs=[chatbot, session_history, user_input], show_progress=True, ).then( _refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref] ) stop_btn.click(set_stop_event, inputs=[], outputs=[]) # new_chat returns empty list for both chatbot display and state new_chat_btn.click( new_chat, inputs=[session_history], outputs=[chatbot, session_history], show_progress=False ).then( _refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref] ) # Resend/Retry logic (adapted for new 'messages' type) try: def get_user_msg_for_resend(idx, h): # Find the Nth user message in the list of tuples user_index_counter = -1 for u_msg, _ in h: if u_msg: user_index_counter += 1 if user_index_counter == idx: return u_msg return "" chatbot.message_event( get_user_msg_for_resend, inputs=[gr.Index(), session_history], outputs=[user_input], label="Resend User" ) except Exception: pass try: def _retry(idx, h): if idx is None or not h: return convert_to_messages_format(h), h, gr.update(interactive=True) # Gradio Index is message number (0-indexed). # We need to find the user message that started the turn being clicked on. # 1. Determine the number of complete turns (pairs) + incomplete turn num_turns = len(h) # 2. Map the message index (idx) to the turn index (turn_idx) # In the new format, index 0 is user 1, index 1 is bot 1, index 2 is user 2, etc. turn_idx = idx // 2 if turn_idx >= num_turns: # Clicked a message outside the range (e.g., if history was cleared) return convert_to_messages_format(h), h, gr.update(interactive=True) # Get the user message from the target turn user_msg = h[turn_idx][0] if not user_msg: # This should not happen if the click was on a valid turn return convert_to_messages_format(h), h, gr.update(interactive=True) # New history state is everything *before* the turn we are retrying new_hist = h[:turn_idx] # Call the chat interface with the user message and the shortened history return chat_interface(user_msg, new_hist) chatbot.message_event( _retry, inputs=[gr.Index(), session_history], outputs=[chatbot, session_history, user_input], label="Retry Assistant" ) except Exception: pass # ---------------------- SETTINGS TAB (⚙️) ---------------------- with gr.TabItem("⚙️"): # Fix 3: Icon only settings_message = gr.Markdown(f"Loaded preset: **{all_settings.get('last_preset_name', 'Default')}**") with gr.Row(): preset_dropdown = gr.Dropdown( label="Select Preset (Loads on selection)", choices=get_preset_names(), # Fix 1: Correctly uses get_preset_names() value=all_settings.get("last_preset_name", "Default"), interactive=True, scale=2 ) save_name_input = gr.Textbox( label="Current Preset Name", value=all_settings.get("last_preset_name", "Default"), visible=False ) new_preset_btn = gr.Button("✨ New Preset", variant="secondary", scale=1) save_btn = gr.Button("💾 Save Current", variant="primary", scale=1) new_preset_name_box = gr.Textbox(visible=False) gr.Markdown("---") gr.Markdown("### API and Model Parameters") api_key_input = gr.Textbox(label="OpenRouter API Key (Plaintext)", value=current_settings["api_key"], type="text", info="Saved in openrouter_settings.json") system_prompt_input = gr.Textbox(label="System Prompt", value=current_settings["system_prompt"], lines=3) with gr.Row(): model_input = gr.Textbox(label="Model Name", value=current_settings["model"], info="deepseek/deepseek-chat-v3.1:free\nopenai/gpt-oss-20b:free\ncognitivecomputations/dolphin-mistral-24b-venice-edition:free", scale=2) temperature_input = gr.Slider(label="Temperature", minimum=0.0, maximum=2.0, step=0.05, value=current_settings["temperature"], scale=1) max_tokens_input = gr.Slider(label="Max Tokens (Model Limit Dependent)", minimum=0, maximum=130000, step=1, value=current_settings["max_tokens"], scale=1) with gr.Row(): response_tokens_input = gr.Slider(label="Response Tokens (0 = model decides)", minimum=0, maximum=32768, step=1, value=current_settings.get("response_tokens", DEFAULT_SETTINGS.get("response_tokens", 0)), scale=1) top_p_input = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, step=0.05, value=current_settings["top_p"], scale=1) freq_p_input = gr.Slider(label="Frequency Penalty", minimum=0, maximum=2.0, step=0.05, value=current_settings["frequency_penalty"], scale=1) pres_p_input = gr.Slider(label="Presence Penalty", minimum=0, maximum=2.0, step=0.05, value=current_settings["presence_penalty"], scale=1) gr.Markdown("---") gr.Markdown("### Advanced Sampling Controls") with gr.Row(): top_k_input = gr.Slider(label="Top K (0 to disable)", minimum=0, maximum=100, step=1, value=current_settings.get("top_k", DEFAULT_SETTINGS["top_k"]), scale=1) min_p_input = gr.Slider(label="Min P (0.0 to disable)", minimum=0.0, maximum=1.0, step=0.01, value=current_settings.get("min_p", DEFAULT_SETTINGS["min_p"]), scale=1) top_a_input = gr.Slider(label="Top A (0.0 to disable)", minimum=0.0, maximum=1.0, step=0.01, value=current_settings.get("top_a", DEFAULT_SETTINGS["top_a"]), scale=1) with gr.Row(): rep_p_input = gr.Slider(label="Repetition Penalty (1.0 is neutral)", minimum=0.0, maximum=2.0, step=0.01, value=current_settings.get("repetition_penalty", DEFAULT_SETTINGS["repetition_penalty"]), scale=1) seed_input = gr.Number(label="Seed (-1 for random)", minimum=-1, value=current_settings.get("seed", DEFAULT_SETTINGS["seed"]), scale=1) save_btn.click( save_settings, inputs=[save_name_input, api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input], outputs=[settings_message, preset_dropdown] ) preset_dropdown.change( load_settings, inputs=[preset_dropdown], outputs=[api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input, settings_message, current_model_display, save_name_input] ) new_preset_btn.click( None, inputs=[], outputs=[new_preset_name_box], js="""() => prompt('Enter a name for the new preset:') || ''""" ) new_preset_name_box.change( create_new_preset, inputs=[new_preset_name_box], outputs=[api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input, settings_message, current_model_display, save_name_input, preset_dropdown] ) # ---------------------- HISTORY TAB (📜) ---------------------- with gr.TabItem("📜"): # Fix 3: Icon only history_message = gr.Markdown("### Historical Chat Sessions") with gr.Row(): visible_history_dropdown = gr.Dropdown( label="Load Session", choices=get_history_names(), value=None, interactive=True, scale=2 ) load_history_btn = gr.Button("View Session", variant="secondary", scale=1) # COPY ALL BUTTON (v1.0a) copy_all_btn = gr.Button("📋 Copy All", variant="secondary", scale=1) # CLEAR LOG BUTTON (v1.0a) clear_log_btn = gr.Button("🗑️ Clear Log", variant="secondary", scale=1) # Use type='messages' (v1.0c) history_display = gr.Chatbot( label="Historical Session Content", height=CHATBOT_HEIGHT, type='messages', show_copy_button=True # Per-message copy (v1.0a) ) # Hidden element to hold the text for the Copy All functionality copy_text_output = gr.Textbox(visible=False, label="Copy Text") # Event Handlers history_dropdown_ref.change( lambda value: gr.update(choices=get_history_names(), value=value), inputs=[history_dropdown_ref], outputs=[visible_history_dropdown], show_progress=False ) visible_history_dropdown.change( load_history_session, inputs=[visible_history_dropdown], outputs=[history_display] ) load_history_btn.click( load_history_session, inputs=[visible_history_dropdown], outputs=[history_display] ) # Copy All handler (v1.0a) copy_all_btn.click( copy_all_session_text, inputs=[visible_history_dropdown], outputs=[copy_text_output] ).then( lambda x: (gr.Markdown.update(value=f"Copied {len(x)} characters from session."), gr.update(value="")), inputs=[copy_text_output], outputs=[history_message, copy_text_output] ) # Clear Log handler (v1.0a) clear_log_btn.click( clear_history_log, inputs=[], outputs=[history_display, visible_history_dropdown, history_dropdown_ref, history_message, preset_dropdown] ) # --- Quit Button Handler (placed outside tabs for global visibility) --- quit_btn.click( quit_app, # CRITICAL FIX v1.0d: Outputs must be component objects, matching the return values (Markdown Update, Generic Update for Button) outputs=[shutdown_message, quit_btn] ) # ========================================================= # --- LAUNCH --- # ========================================================= # ========================================================= # --- LAUNCH (HUGGING FACE SPACES FIX) --- # ========================================================= if __name__ == "__main__": # CRITICAL FIX 1: Hardcode the server port to 7860, which is the standard # expected by the Hugging Face Spaces router. PORT = 7860 # CRITICAL FIX 2: Remove all unnecessary local print statements and # the thread that tries to open a browser (webbrowser.open), as this # is impossible in the cloud environment and causes issues. print(f"Launching Termux LLM Client v{VERSION} on 0.0.0.0:{PORT} (Hugging Face Mode)") app.launch( server_name="0.0.0.0", server_port=PORT, # Set inbrowser=False to prevent the app from trying to launch a browser inbrowser=False, debug=False, # Debug output is usually too verbose for HFS logs quiet=False # Set to False or remove, so you can see the startup logs )