Spaces:
nitus-ac
/
Runtime error

OIC / app.py
nitus-ac's picture
Update app.py
028ce66 verified
# -*- coding: utf-8 -*-
import gradio as gr
import requests
import json
import os
import datetime
import textwrap
import random
import threading
import webbrowser
import shutil
import time
# =========================================================
# --- VERSIONING ---
# =========================================================
VERSION = "1.0h" # v1.0h: CRITICAL SPACING FIX: Replaced restrictive .isalnum() space re-insertion logic to correctly add spaces after punctuation marks when followed by a word token.
# =========================================================
# --- CONFIGURATION & DATA PERSISTENCE ---
# =========================================================
SETTINGS_FILE = "openrouter_settings.json"
HISTORY_FILE = "openrouter_history.json"
DEFAULT_MODEL = "deepseek/deepseek-chat-v3.1:free"
MAX_BACKUPS = 5
DEFAULT_SETTINGS = {
"api_key": "",
"system_prompt": "You are a helpful, creative, and friendly AI assistant running on an Android device.",
"model": DEFAULT_MODEL,
"temperature": 0.7,
"max_tokens": 1024,
"response_tokens": 0,
"top_p": 0.9,
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"stream": True,
"top_k": 0,
"min_p": 0.0,
"top_a": 0.0,
"repetition_penalty": 1.0,
"seed": -1
}
NEW_PRESET_SETTINGS = {
"api_key": " ",
"system_prompt": "You are a helpful assistant.",
"model": "cognitivecomputations/dolphin-mistral-24b-venice-edition:free",
"temperature": 1.05,
"max_tokens": 0,
"response_tokens": 2272,
"top_p": 0.65,
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"stream": True,
"top_k": 0,
"min_p": 0.0,
"top_a": 0.0,
"repetition_penalty": 1.39,
"seed": -1
}
# =========================================================
# --- Globals for incremental logging (in-memory only) ---
# =========================================================
last_logged_index = 0
active_session_name = None
_last_save_time = 0.0
_save_interval = 0.5
def load_data(file_path, default_data=None):
"""Loads JSON data from file or returns default."""
if default_data is None:
default_data = {}
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
it
if isinstance(default_data, list):
if isinstance(data, list):
return data
if isinstance(data, dict) and "history" in data and isinstance(data["history"], list):
return data["history"]
return default_data
if isinstance(data, dict):
for key, default_val in DEFAULT_SETTINGS.items():
for preset_name in data.get('presets', {}):
if key not in data['presets'][preset_name]:
data['presets'][preset_name][key] = default_val
return data
return default_data
except (json.JSONDecodeError, IOError):
print(f"Warning: Could not parse {file_path}, using default.")
return default_data
return default_data
def save_data(file_path, data):
"""Saves dictionary or list as JSON (Used for history in this version)."""
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
except IOError as e:
print(f"Error saving {file_path}: {e}")
def create_rolling_backup(file_path, max_backups=MAX_BACKUPS):
"""Creates a rolling backup of the settings file before overwriting it."""
if not os.path.exists(file_path):
return
oldest_backup = f"{file_path}.b{max_backups}"
if os.path.exists(oldest_backup):
os.remove(oldest_backup)
for i in range(max_backups - 1, 0, -1):
src = f"{file_path}.b{i}"
dst = f"{file_path}.b{i+1}"
if os.path.exists(src):
try:
os.rename(src, dst)
except OSError as e:
print(f"Warning: Could not rename {src} to {dst}: {e}")
newest_backup = f"{file_path}.b1"
try:
shutil.copy2(file_path, newest_backup)
print(f"Settings backed up to {newest_backup}")
except IOError as e:
print(f"Error creating backup {newest_backup}: {e}")
def save_settings_to_disk(file_path, data):
"""Handles backup and then saves the settings data."""
create_rolling_backup(file_path)
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
except IOError as e:
print(f"Error saving {file_path}: {e}")
# Initialize settings container
all_settings = load_data(SETTINGS_FILE, {
"presets": {"Default": DEFAULT_SETTINGS},
"last_preset_name": "Default"
})
# Ensure structure integrity
if "presets" not in all_settings or not isinstance(all_settings["presets"], dict):
all_settings = {"presets": {"Default": DEFAULT_SETTINGS}, "last_preset_name": "Default"}
chat_history = load_data(HISTORY_FILE, [])
if not isinstance(chat_history, list):
chat_history = []
# Load last preset
last_preset_name = all_settings.get("last_preset_name", "Default")
current_settings = all_settings["presets"].get(last_preset_name, DEFAULT_SETTINGS).copy()
# =========================================================
# --- Stop control (thread-safe) ---
# =========================================================
stop_event = threading.Event()
def quit_app():
"""Immediately quits the application via os._exit."""
print("Quitting application...")
# Use a timer to exit after a slight delay to allow the server to send the response.
threading.Timer(0.1, os._exit, args=[0]).start()
# CRITICAL FIX v1.0c: The function returns the update dictionaries
return gr.Markdown.update(value="👋 Shutting down...", visible=True), gr.update(interactive=False)
# =========================================================
# --- OPENROUTER API INTERACTION (STREAMING & FALLBACK) ---
# =========================================================
def call_openrouter_api(messages, settings):
"""Makes a streaming API call to OpenRouter, yields content chunks.
Filters <|begin of sentence|> token.
"""
api_key = settings.get("api_key").strip()
if not api_key:
yield "ERROR: API Key not set in Settings."
return
url = "https://openrouter.ai/api/v1/chat/completions"
try:
payload = {
"model": settings["model"],
"messages": messages,
"temperature": float(settings.get("temperature", 0.7)),
"top_p": float(settings.get("top_p", 0.9)),
"frequency_penalty": float(settings.get("frequency_penalty", 0.0)),
"presence_penalty": float(settings.get("presence_penalty", 0.0)),
"stream": True,
}
response_tokens = int(settings.get("response_tokens", 0))
if response_tokens > 0:
payload["max_tokens"] = response_tokens
for k in ["top_k", "min_p", "top_a", "repetition_penalty", "seed"]:
v = settings.get(k)
if v is not None:
if k == "top_k" and int(v) == 0: continue
if k in ("min_p", "top_a") and float(v) == 0.0: continue
if k == "repetition_penalty" and float(v) == 1.0: continue
if k == "seed" and int(v) == -1: continue
payload[k] = v
except ValueError as e:
yield f"ERROR: Invalid setting type. {e}"
return
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Filter tokens (v1.0a: half-width, v1.0g: full-width)
TOKEN_TO_FILTER_HALF = "<|begin of sentence|>"
TOKEN_TO_FILTER_FULL = "<|begin of sentence|>"
try:
response = requests.post(url, headers=headers, json=payload, stream=True, timeout=60)
if response.status_code != 200:
try:
msg = response.json().get('error', {}).get('message', response.text)
except Exception:
msg = response.text
yield f"❌ API request failed ({response.status_code}): {msg}"
return
got_content = False
for line_bytes in response.iter_lines():
if stop_event.is_set():
yield "â›” Inference stopped by user."
return
if not line_bytes:
continue
try:
line = line_bytes.decode('utf-8')
except UnicodeDecodeError:
print("Warning: Failed to decode a line from API stream.")
continue
if line.startswith("data:"):
data_str = line[len("data:"):].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except Exception:
continue
choices = chunk.get("choices", [])
if not choices:
continue
choice0 = choices[0]
delta = choice0.get("delta", {})
content = delta.get("content")
if content:
# CRITICAL FIX v1.0g: Apply filters for both half-width and full-width tokens
if TOKEN_TO_FILTER_FULL in content:
content = content.replace(TOKEN_TO_FILTER_FULL, "")
if TOKEN_TO_FILTER_HALF in content:
content = content.replace(TOKEN_TO_FILTER_HALF, "")
# CRITICAL FIX v1.0f: Strip leading whitespace potentially left by token removal.
content = content.lstrip()
if content: # Check if anything is left
got_content = True
yield content
if choice0.get("finish_reason") is not None:
break
elif line.startswith("event: error"):
try:
errdata = json.loads(line[len("event: error"):].strip())
yield f"STREAM ERROR: {errdata.get('message', 'Unknown Stream Error')}"
except Exception:
yield "STREAM ERROR: unknown"
return
if not got_content:
# Silent fallback to non-streaming
payload["stream"] = False
try:
r2 = requests.post(url, headers=headers, json=payload, timeout=60)
if r2.status_code == 200:
j = r2.json()
content = None
choices = j.get("choices", [])
if choices:
first = choices[0]
if isinstance(first.get("message"), dict):
content = first["message"].get("content")
elif "text" in first:
content = first.get("text")
elif isinstance(first.get("delta"), dict):
content = first["delta"].get("content")
if content:
# Apply filters
if TOKEN_TO_FILTER_FULL in content:
content = content.replace(TOKEN_TO_FILTER_FULL, "")
if TOKEN_TO_FILTER_HALF in content:
content = content.replace(TOKEN_TO_FILTER_HALF, "")
# CRITICAL FIX v1.0f: Strip leading/trailing whitespace
content = content.strip()
if content: # Check if anything is left after stripping
yield content
else:
yield "⚠️ No content received from fallback response."
else:
yield f"❌ Fallback failed ({r2.status_code}): {r2.text}"
except requests.exceptions.RequestException as e:
yield f"❌ Fallback request error: {e}"
except requests.exceptions.RequestException as e:
yield f"❌ Request error: {e}"
except Exception as e:
yield f"💥 Unexpected error: {e}"
# =========================================================
# --- CHAT FUNCTIONALITY & STATE MANAGEMENT ---
# =========================================================
# Helper function to convert Gradio's old list-of-tuples history to new messages format
def convert_to_messages_format(history):
new_history = []
for user_msg, bot_msg in history:
if user_msg:
new_history.append({"role": "user", "content": user_msg})
if bot_msg:
new_history.append({"role": "assistant", "content": bot_msg})
return new_history
def chat_interface(user_input, history_state):
"""Handles message submission and response generation with streaming."""
global current_settings, _last_save_time
if stop_event.is_set():
stop_event.clear()
if not user_input.strip():
# Yields the converted history back to the chatbot and updates the state
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True)
return
# Add user message to the internal list-of-tuples state
history_state.append([user_input, None])
# 1. IMMEDIATE SAVE OF USER MESSAGE 💾
save_current_session(history_state)
_last_save_time = time.time()
# Prepare for response: send the current list-of-tuples state converted for display,
# the state itself, and disable input
yield convert_to_messages_format(history_state), history_state, gr.update(value="", interactive=False)
# Prepare API messages
messages = [{"role": "system", "content": current_settings["system_prompt"]}]
for user_msg, bot_msg in history_state[:-1]:
if user_msg: messages.append({"role": "user", "content": user_msg})
if bot_msg: messages.append({"role": "assistant", "content": bot_msg})
messages.append({"role": "user", "content": user_input})
full_reply = ""
for chunk in call_openrouter_api(messages, current_settings):
is_error_or_stop = isinstance(chunk, str) and (chunk.startswith("ERROR:") or chunk.startswith("❌") or chunk.startswith("💥"))
if is_error_or_stop:
full_reply += ("\n\n" + chunk) if full_reply else chunk
history_state[-1][1] = full_reply
save_current_session(history_state)
# Send the updated state (converted for display)
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True)
return
if isinstance(chunk, str) and chunk.startswith("â›” Inference stopped"):
full_reply += "\n\n*(Inference stopped)*"
history_state[-1][1] = full_reply
save_current_session(history_state)
# Send the updated state (converted for display)
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True)
return
if isinstance(chunk, str) and chunk.startswith("â›” Inference stopped"):
full_reply += "\n\n*(Inference stopped)*"
history_state[-1][1] = full_reply
save_current_session(history_state)
# Send the updated state (converted for display)
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True)
return
# v1.0h CRITICAL SPACING FIX: Updated logic to correctly add a space after punctuation.
if full_reply and chunk:
# Add a space if the previous character is NOT a space, AND the new chunk starts with a letter or digit.
if full_reply[-1] != ' ' and chunk[0].isalnum():
full_reply += " " + chunk
else:
full_reply += chunk
else:
full_reply += chunk # Handles the very first chunk
history_state[-1][1] = full_reply
# Send the updated state (converted for display)
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=False)
# 2. FINAL SAVE OF ASSISTANT MESSAGE 💾
history_state[-1][1] = full_reply
save_current_session(history_state)
_last_save_time = time.time()
# Send the final state (converted for display)
yield convert_to_messages_format(history_state), history_state, gr.update(interactive=True)
def save_current_session(history):
"""Saves session to history file incrementally (only new turns)."""
global chat_history, last_logged_index, active_session_name
if not history or not any(user_msg for user_msg, _ in history):
return
sanitized = [
[user_msg, bot_msg if bot_msg else "*(Incomplete Response)*"]
for user_msg, bot_msg in history if user_msg
]
is_continuation = False
last_entry = chat_history[-1] if chat_history else None
if last_entry:
if active_session_name and active_session_name == last_entry.get("name"):
is_continuation = True
else:
existing_hist = last_entry.get("history", [])
if len(existing_hist) <= len(sanitized) and existing_hist == sanitized[:len(existing_hist)]:
active_session_name = last_entry.get("name")
last_logged_index = len(existing_hist)
is_continuation = True
if is_continuation and last_entry:
updated_history = last_entry.get("history", []).copy()
# Case 1: Updating the last turn (Assistant complete save, where length is equal)
if len(sanitized) == len(updated_history) and len(sanitized) > 0:
updated_history[-1] = sanitized[-1]
# Case 2: Appending new turns (Next User send save, where live history is longer)
elif len(sanitized) > len(updated_history):
start_idx = len(updated_history)
novel_portion = sanitized[start_idx:]
updated_history.extend(novel_portion)
chat_history[-1] = {"name": last_entry.get("name"), "history": updated_history}
save_data(HISTORY_FILE, chat_history)
last_logged_index = len(sanitized)
else:
first_message = history[0][0]
name_base = textwrap.shorten(first_message, width=30, placeholder="...")
proposed_name = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {name_base}"
chat_history.append({"name": proposed_name, "history": sanitized})
save_data(HISTORY_FILE, chat_history)
active_session_name = proposed_name
last_logged_index = len(sanitized)
def new_chat(history):
"""Clears chat and saves current session. Resets incremental-log indices."""
global last_logged_index, active_session_name
if history:
save_current_session(history)
last_logged_index = 0
active_session_name = None
return [], [] # returns empty list for chatbot (messages format) and empty list for state (tuples format)
def get_history_names():
return [s["name"] for s in chat_history]
def load_history_session(name):
"""Loads session content for the history display (History tab only)."""
if name is None:
return []
for session in chat_history:
if session["name"] == name:
# Load list-of-tuples and convert to messages format for display
return convert_to_messages_format(session["history"])
return []
def refresh_history_dropdown(history):
"""Return updated choices for the history dropdown (value cleared)."""
return gr.update(choices=get_history_names(), value=None)
def set_stop_event():
"""Set the stop_event to true so the streaming loop will stop."""
stop_event.set()
return
def clear_history_log():
"""Backs up history file and clears chat_history in memory and on disk. (v1.0a)"""
global chat_history, last_logged_index, active_session_name
message = "History log already clear."
if os.path.exists(HISTORY_FILE):
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{HISTORY_FILE}.{timestamp}.ahv"
try:
shutil.copy2(HISTORY_FILE, backup_file)
os.remove(HISTORY_FILE)
message = f"History log backed up to `{backup_file}` and cleared."
except Exception as e:
message = f"ERROR backing up/clearing log: {e}"
chat_history = []
last_logged_index = 0
active_session_name = None
return (
[], # Clear history_display (messages format)
gr.update(value=None, choices=[]), # Clear visible_history_dropdown
gr.update(value=None, choices=[]), # Clear history_dropdown_ref
message, # Update history_message
gr.update(choices=get_preset_names()) # Clear preset_dropdown (just for full output list)
)
def copy_all_session_text(name):
"""Formats and returns the entire history session content as text for copying. (v1.0a)"""
if name is None:
return "No session selected."
for session in chat_history:
if session["name"] == name:
text = f"--- Session: {name} ---\n\n"
for user_msg, bot_msg in session["history"]:
text += f"USER: {user_msg}\n"
text += f"ASSISTANT: {bot_msg}\n\n"
return text
return "Session not found."
# =========================================================
# --- SETTINGS FUNCTIONALITY ---
# =========================================================
def get_preset_names():
return list(all_settings["presets"].keys())
def save_settings(
preset_name, api_key, sys_prompt, model, temp, tokens, response_tokens, top_p, freq_p, pres_p,
top_k, min_p, top_a, rep_p, seed
):
"""Save new or updated preset, with rolling backup."""
global all_settings, current_settings
name_to_save = preset_name if preset_name.strip() else all_settings.get("last_preset_name", "Default")
new_settings = {
"api_key": api_key, "system_prompt": sys_prompt, "model": model, "temperature": temp,
"max_tokens": tokens, "response_tokens": response_tokens, "top_p": top_p,
"frequency_penalty": freq_p, "presence_penalty": pres_p,
"stream": current_settings.get("stream", True), "top_k": top_k, "min_p": min_p,
"top_a": top_a, "repetition_penalty": rep_p, "seed": seed,
}
all_settings["presets"][name_to_save] = new_settings
all_settings["last_preset_name"] = name_to_save
save_settings_to_disk(SETTINGS_FILE, all_settings)
if name_to_save == all_settings.get("last_preset_name", "Default"):
current_settings = new_settings.copy()
return f"Settings saved as '{name_to_save}'. Backups created (max {MAX_BACKUPS}).", gr.update(choices=get_preset_names(), value=name_to_save)
def load_settings(preset_name):
"""Load settings preset and update chat tab's model display."""
global all_settings, current_settings
if preset_name is None or preset_name not in all_settings["presets"]:
settings_to_load = current_settings.copy()
else:
settings_to_load = all_settings["presets"].get(preset_name, DEFAULT_SETTINGS).copy()
current_settings = settings_to_load.copy()
if preset_name and preset_name in all_settings["presets"]:
all_settings["last_preset_name"] = preset_name
save_settings_to_disk(SETTINGS_FILE, all_settings)
for key, default_val in DEFAULT_SETTINGS.items():
if key not in settings_to_load:
settings_to_load[key] = default_val
return (
settings_to_load["api_key"], settings_to_load["system_prompt"], settings_to_load["model"],
settings_to_load["temperature"], settings_to_load["max_tokens"],
settings_to_load.get("response_tokens", DEFAULT_SETTINGS.get("response_tokens", 0)),
settings_to_load["top_p"], settings_to_load["frequency_penalty"], settings_to_load["presence_penalty"],
settings_to_load.get("top_k", DEFAULT_SETTINGS["top_k"]), settings_to_load.get("min_p", DEFAULT_SETTINGS["min_p"]),
settings_to_load.get("top_a", DEFAULT_SETTINGS["top_a"]),
settings_to_load.get("repetition_penalty", DEFAULT_SETTINGS["repetition_penalty"]),
settings_to_load.get("seed", DEFAULT_SETTINGS["seed"]),
f"Settings '{preset_name}' loaded.", f"### Current Model: `{settings_to_load['model']}`",
gr.update(value=preset_name)
)
def create_new_preset(new_preset_name):
"""Creates a new preset with specific defaults, loads it, and updates the list."""
global all_settings
if not new_preset_name or not new_preset_name.strip():
load_results = load_settings(all_settings.get("last_preset_name", "Default"))
return load_results + (gr.update(choices=get_preset_names()),)
name = new_preset_name.strip()
if name in all_settings["presets"]:
load_results = load_settings(all_settings.get("last_preset_name", "Default"))
load_results = load_results[:-3] + (f"Error: Preset '{name}' already exists.",) + load_results[-2:]
return load_results + (gr.update(choices=get_preset_names()),)
all_settings["presets"][name] = NEW_PRESET_SETTINGS.copy()
all_settings["last_preset_name"] = name
save_settings_to_disk(SETTINGS_FILE, all_settings)
load_results = load_settings(name)
return load_results[:-3] + (
f"Preset '{name}' created and loaded.",
f"### Current Model: `{NEW_PRESET_SETTINGS['model']}`",
gr.update(value=name),
gr.update(choices=get_preset_names(), value=name),
)
# =========================================================
# --- GRADIO UI (MAIN) ---
# =========================================================
CHATBOT_HEIGHT = "50vh"
custom_css = f"""
/* Target the user_input Textbox. */
.gradio-container #user_input_wrapper textarea {{
min-height: 40px !important;
transition: min-height 0.2s ease-in-out;
}}
/* Hide Gradio footer (v1.0a) */
footer {{ visibility: hidden !important; height: 0 !important; }}
/* Reduce space between chatbot and input (v1.0a) */
.gradio-container #user_input_wrapper {{
margin-top: 5px !important;
padding-top: 0 !important;
}}
/* Ensure the main title is gone (v1.0a) */
.gradio-container h2:first-child {{
display: none !important;
}}
/* Hide Gradio settings button in the top right (v1.0a) */
.gradio-container button[aria-label="Settings"] {{
display: none !important;
}}
/* Fix for tab label text padding after removing text (v1.0e) */
.gradio-container .tabs > div > button.selected {{
padding: 0 10px !important;
}}
/* --- NEW FULL-WIDTH FIXES --- */
/* 1. Target the main content area inside the container and remove default padding */
.gradio-container .main {{
padding-left: 0 !important;
padding-right: 0 !important;
}}
/* 2. Target the main page wrapper/block and remove default padding */
.gradio-container .block {{
padding-left: 0 !important;
padding-right: 0 !important;
}}
/* 3. Ensure the Tabs content container also takes full width and remove its internal padding */
.gradio-container .tabitem {{
padding: 0 !important;
}}
/* 4. Ensure the Chatbot itself takes the full width and apply a minimal buffer (5px) on the sides */
.gradio-container .gradio-chatbot {{
width: 100% !important;
margin: 0 !important;
padding: 0 5px !important;
}}
/* 5. Apply the minimal buffer (5px) to the User Input/Controls Row to align with the Chatbot */
.gradio-container #user_input_wrapper {{
padding: 0 5px !important;
}}
/* Apply 5px padding to the button row (requires the new elem_classes=["send_btn_row"] on the gr.Row) */
.gradio-container .send_btn_row {{
padding: 0 5px !important;
}}
"""
with gr.Blocks(title=f"Termux LLM Client (OpenRouter) v{VERSION}", css=custom_css) as app:
# Store history as a list of tuples (user_msg, bot_msg) to simplify API and saving logic
session_history = gr.State(value=[])
current_model_display = gr.Markdown(f"### Current Model: `{current_settings['model']}`", visible=False)
# Hide the main title (v1.0a)
gr.Markdown(f"## 🤖 Termux LLM Client (OpenRouter) v{VERSION}", visible=False)
# CRITICAL FIX v1.0d: Define the shutdown component explicitly for use in outputs list
shutdown_message = gr.Markdown("", visible=False)
with gr.Row(visible=False):
pass
# --- Tab Bar with Quit Button (v1.0e: Isolated Quit Button and Tabs) ---
with gr.Row(variant="compact", equal_height=False):
# QUIT BUTTON: scale=0 ensures minimum width, positioned on the far left (Fix 2)
quit_btn = gr.Button("❌", variant="stop", scale=0, min_width=40)
# COLUMN WRAPPER: Takes all remaining horizontal space for the tabs, preventing UI squeeze (Fix 2)
with gr.Column(scale=100):
# TABS (Fix 3: Icons only)
with gr.Tabs() as tabs:
# ---------------------- CHAT TAB (💬) ----------------------
with gr.TabItem("💬"): # Fix 3: Icon only
# Use type='messages' (v1.0c)
chatbot = gr.Chatbot(
label="Chat History",
height=CHATBOT_HEIGHT,
show_copy_button=True,
type='messages'
)
with gr.Row(elem_id="user_input_wrapper"):
user_input = gr.Textbox(
placeholder="Type your message...",
label="Your Prompt",
show_label=False,
lines=3,
)
# MODIFIED: Added elem_classes to apply the CSS padding fix
with gr.Row(variant="compact", elem_classes=["send_btn_row"]):
send_btn = gr.Button("➤", variant="primary", scale=1, min_width=0)
stop_btn = gr.Button("â›”", variant="stop", scale=1, min_width=0)
new_chat_btn = gr.Button("🗑️", variant="secondary", scale=1, min_width=0)
with gr.Row(visible=False):
history_dropdown_ref = gr.Dropdown(
label="Hidden History Dropdown Ref",
choices=get_history_names(),
value=None,
interactive=True
)
def _refresh_history_dropdown_output(history):
return refresh_history_dropdown(history)
# Handler for chat, inputs are user_input and session_history (tuples)
# Outputs are chatbot (messages), session_history (tuples), and user_input (update)
send_btn.click(
chat_interface,
inputs=[user_input, session_history],
outputs=[chatbot, session_history, user_input],
show_progress=True,
).then(
_refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref]
)
user_input.submit(
chat_interface,
inputs=[user_input, session_history],
outputs=[chatbot, session_history, user_input],
show_progress=True,
).then(
_refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref]
)
stop_btn.click(set_stop_event, inputs=[], outputs=[])
# new_chat returns empty list for both chatbot display and state
new_chat_btn.click(
new_chat, inputs=[session_history], outputs=[chatbot, session_history], show_progress=False
).then(
_refresh_history_dropdown_output, inputs=[session_history], outputs=[history_dropdown_ref]
)
# Resend/Retry logic (adapted for new 'messages' type)
try:
def get_user_msg_for_resend(idx, h):
# Find the Nth user message in the list of tuples
user_index_counter = -1
for u_msg, _ in h:
if u_msg:
user_index_counter += 1
if user_index_counter == idx:
return u_msg
return ""
chatbot.message_event(
get_user_msg_for_resend,
inputs=[gr.Index(), session_history],
outputs=[user_input],
label="Resend User"
)
except Exception: pass
try:
def _retry(idx, h):
if idx is None or not h: return convert_to_messages_format(h), h, gr.update(interactive=True)
# Gradio Index is message number (0-indexed).
# We need to find the user message that started the turn being clicked on.
# 1. Determine the number of complete turns (pairs) + incomplete turn
num_turns = len(h)
# 2. Map the message index (idx) to the turn index (turn_idx)
# In the new format, index 0 is user 1, index 1 is bot 1, index 2 is user 2, etc.
turn_idx = idx // 2
if turn_idx >= num_turns:
# Clicked a message outside the range (e.g., if history was cleared)
return convert_to_messages_format(h), h, gr.update(interactive=True)
# Get the user message from the target turn
user_msg = h[turn_idx][0]
if not user_msg:
# This should not happen if the click was on a valid turn
return convert_to_messages_format(h), h, gr.update(interactive=True)
# New history state is everything *before* the turn we are retrying
new_hist = h[:turn_idx]
# Call the chat interface with the user message and the shortened history
return chat_interface(user_msg, new_hist)
chatbot.message_event(
_retry,
inputs=[gr.Index(), session_history],
outputs=[chatbot, session_history, user_input],
label="Retry Assistant"
)
except Exception: pass
# ---------------------- SETTINGS TAB (⚙️) ----------------------
with gr.TabItem("⚙️"): # Fix 3: Icon only
settings_message = gr.Markdown(f"Loaded preset: **{all_settings.get('last_preset_name', 'Default')}**")
with gr.Row():
preset_dropdown = gr.Dropdown(
label="Select Preset (Loads on selection)",
choices=get_preset_names(), # Fix 1: Correctly uses get_preset_names()
value=all_settings.get("last_preset_name", "Default"),
interactive=True, scale=2
)
save_name_input = gr.Textbox(
label="Current Preset Name", value=all_settings.get("last_preset_name", "Default"), visible=False
)
new_preset_btn = gr.Button("✨ New Preset", variant="secondary", scale=1)
save_btn = gr.Button("💾 Save Current", variant="primary", scale=1)
new_preset_name_box = gr.Textbox(visible=False)
gr.Markdown("---")
gr.Markdown("### API and Model Parameters")
api_key_input = gr.Textbox(label="OpenRouter API Key (Plaintext)", value=current_settings["api_key"], type="text", info="Saved in openrouter_settings.json")
system_prompt_input = gr.Textbox(label="System Prompt", value=current_settings["system_prompt"], lines=3)
with gr.Row():
model_input = gr.Textbox(label="Model Name", value=current_settings["model"], info="deepseek/deepseek-chat-v3.1:free\nopenai/gpt-oss-20b:free\ncognitivecomputations/dolphin-mistral-24b-venice-edition:free", scale=2)
temperature_input = gr.Slider(label="Temperature", minimum=0.0, maximum=2.0, step=0.05, value=current_settings["temperature"], scale=1)
max_tokens_input = gr.Slider(label="Max Tokens (Model Limit Dependent)", minimum=0, maximum=130000, step=1, value=current_settings["max_tokens"], scale=1)
with gr.Row():
response_tokens_input = gr.Slider(label="Response Tokens (0 = model decides)", minimum=0, maximum=32768, step=1, value=current_settings.get("response_tokens", DEFAULT_SETTINGS.get("response_tokens", 0)), scale=1)
top_p_input = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, step=0.05, value=current_settings["top_p"], scale=1)
freq_p_input = gr.Slider(label="Frequency Penalty", minimum=0, maximum=2.0, step=0.05, value=current_settings["frequency_penalty"], scale=1)
pres_p_input = gr.Slider(label="Presence Penalty", minimum=0, maximum=2.0, step=0.05, value=current_settings["presence_penalty"], scale=1)
gr.Markdown("---")
gr.Markdown("### Advanced Sampling Controls")
with gr.Row():
top_k_input = gr.Slider(label="Top K (0 to disable)", minimum=0, maximum=100, step=1, value=current_settings.get("top_k", DEFAULT_SETTINGS["top_k"]), scale=1)
min_p_input = gr.Slider(label="Min P (0.0 to disable)", minimum=0.0, maximum=1.0, step=0.01, value=current_settings.get("min_p", DEFAULT_SETTINGS["min_p"]), scale=1)
top_a_input = gr.Slider(label="Top A (0.0 to disable)", minimum=0.0, maximum=1.0, step=0.01, value=current_settings.get("top_a", DEFAULT_SETTINGS["top_a"]), scale=1)
with gr.Row():
rep_p_input = gr.Slider(label="Repetition Penalty (1.0 is neutral)", minimum=0.0, maximum=2.0, step=0.01, value=current_settings.get("repetition_penalty", DEFAULT_SETTINGS["repetition_penalty"]), scale=1)
seed_input = gr.Number(label="Seed (-1 for random)", minimum=-1, value=current_settings.get("seed", DEFAULT_SETTINGS["seed"]), scale=1)
save_btn.click(
save_settings,
inputs=[save_name_input, api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input],
outputs=[settings_message, preset_dropdown]
)
preset_dropdown.change(
load_settings,
inputs=[preset_dropdown],
outputs=[api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input, settings_message, current_model_display, save_name_input]
)
new_preset_btn.click(
None, inputs=[], outputs=[new_preset_name_box],
js="""() => prompt('Enter a name for the new preset:') || ''"""
)
new_preset_name_box.change(
create_new_preset, inputs=[new_preset_name_box],
outputs=[api_key_input, system_prompt_input, model_input, temperature_input, max_tokens_input, response_tokens_input, top_p_input, freq_p_input, pres_p_input, top_k_input, min_p_input, top_a_input, rep_p_input, seed_input, settings_message, current_model_display, save_name_input, preset_dropdown]
)
# ---------------------- HISTORY TAB (📜) ----------------------
with gr.TabItem("📜"): # Fix 3: Icon only
history_message = gr.Markdown("### Historical Chat Sessions")
with gr.Row():
visible_history_dropdown = gr.Dropdown(
label="Load Session",
choices=get_history_names(),
value=None,
interactive=True, scale=2
)
load_history_btn = gr.Button("View Session", variant="secondary", scale=1)
# COPY ALL BUTTON (v1.0a)
copy_all_btn = gr.Button("📋 Copy All", variant="secondary", scale=1)
# CLEAR LOG BUTTON (v1.0a)
clear_log_btn = gr.Button("🗑️ Clear Log", variant="secondary", scale=1)
# Use type='messages' (v1.0c)
history_display = gr.Chatbot(
label="Historical Session Content",
height=CHATBOT_HEIGHT,
type='messages',
show_copy_button=True # Per-message copy (v1.0a)
)
# Hidden element to hold the text for the Copy All functionality
copy_text_output = gr.Textbox(visible=False, label="Copy Text")
# Event Handlers
history_dropdown_ref.change(
lambda value: gr.update(choices=get_history_names(), value=value),
inputs=[history_dropdown_ref], outputs=[visible_history_dropdown], show_progress=False
)
visible_history_dropdown.change(
load_history_session, inputs=[visible_history_dropdown], outputs=[history_display]
)
load_history_btn.click(
load_history_session, inputs=[visible_history_dropdown], outputs=[history_display]
)
# Copy All handler (v1.0a)
copy_all_btn.click(
copy_all_session_text,
inputs=[visible_history_dropdown],
outputs=[copy_text_output]
).then(
lambda x: (gr.Markdown.update(value=f"Copied {len(x)} characters from session."), gr.update(value="")),
inputs=[copy_text_output],
outputs=[history_message, copy_text_output]
)
# Clear Log handler (v1.0a)
clear_log_btn.click(
clear_history_log,
inputs=[],
outputs=[history_display, visible_history_dropdown, history_dropdown_ref, history_message, preset_dropdown]
)
# --- Quit Button Handler (placed outside tabs for global visibility) ---
quit_btn.click(
quit_app,
# CRITICAL FIX v1.0d: Outputs must be component objects, matching the return values (Markdown Update, Generic Update for Button)
outputs=[shutdown_message, quit_btn]
)
# =========================================================
# --- LAUNCH ---
# =========================================================
# =========================================================
# --- LAUNCH (HUGGING FACE SPACES FIX) ---
# =========================================================
if __name__ == "__main__":
# CRITICAL FIX 1: Hardcode the server port to 7860, which is the standard
# expected by the Hugging Face Spaces router.
PORT = 7860
# CRITICAL FIX 2: Remove all unnecessary local print statements and
# the thread that tries to open a browser (webbrowser.open), as this
# is impossible in the cloud environment and causes issues.
print(f"Launching Termux LLM Client v{VERSION} on 0.0.0.0:{PORT} (Hugging Face Mode)")
app.launch(
server_name="0.0.0.0",
server_port=PORT,
# Set inbrowser=False to prevent the app from trying to launch a browser
inbrowser=False,
debug=False, # Debug output is usually too verbose for HFS logs
quiet=False # Set to False or remove, so you can see the startup logs
)