|
|
""" |
|
|
TextAI UI Module |
|
|
Clean ChatGPT/Grok style interface |
|
|
""" |
|
|
import json |
|
|
from typing import List, Tuple, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
from .text_ai import ( |
|
|
model_manager, session_manager, |
|
|
DEFAULT_CHAT_PROMPT, DEFAULT_ROLEPLAY_PROMPT, ROLEPLAY_PRESETS |
|
|
) |
|
|
from .config import MODELS_DIR |
|
|
|
|
|
|
|
|
MODEL_CONFIGS_FILE = MODELS_DIR / "model_configs.json" |
|
|
|
|
|
|
|
|
def _load_model_configs() -> dict: |
|
|
"""Load per-model configurations""" |
|
|
if MODEL_CONFIGS_FILE.exists(): |
|
|
try: |
|
|
return json.loads(MODEL_CONFIGS_FILE.read_text()) |
|
|
except: |
|
|
pass |
|
|
return {} |
|
|
|
|
|
|
|
|
def _save_model_configs(configs: dict): |
|
|
"""Save per-model configurations""" |
|
|
MODEL_CONFIGS_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
|
MODEL_CONFIGS_FILE.write_text(json.dumps(configs, indent=2)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_chat_list() -> List[List[str]]: |
|
|
"""Get all chat sessions sorted by recent - clean display""" |
|
|
sessions = session_manager.list_sessions() |
|
|
rows = [] |
|
|
for s in sessions[:30]: |
|
|
|
|
|
title = s["title"][:35] + "..." if len(s["title"]) > 35 else s["title"] |
|
|
session_type = "π" if s.get("session_type") == "roleplay" else "π¬" |
|
|
rows.append([s["session_id"], f"{session_type} {title}"]) |
|
|
return rows |
|
|
|
|
|
|
|
|
def get_model_choices() -> List[str]: |
|
|
"""Get available models for dropdown""" |
|
|
models = model_manager.get_available_models() |
|
|
choices = [] |
|
|
for m in models: |
|
|
status = "β " if m["loaded"] else "" |
|
|
choices.append(f"{status}{m['name']} ({m['type']})") |
|
|
return choices if choices else ["No models available"] |
|
|
|
|
|
|
|
|
def get_current_model_display() -> str: |
|
|
"""Get current model for display""" |
|
|
status = model_manager.get_status() |
|
|
if status["loaded"]: |
|
|
return f"{status['model_id']}" |
|
|
return "No model loaded" |
|
|
|
|
|
|
|
|
def format_chat_history(session_id: str) -> List[dict]: |
|
|
"""Format session messages for Gradio Chatbot (new format)""" |
|
|
session = session_manager.load_session(session_id) |
|
|
if not session: |
|
|
return [] |
|
|
|
|
|
history = [] |
|
|
for msg in session.messages: |
|
|
if msg["role"] == "user": |
|
|
history.append({"role": "user", "content": msg["content"]}) |
|
|
elif msg["role"] == "assistant": |
|
|
history.append({"role": "assistant", "content": msg["content"]}) |
|
|
|
|
|
return history |
|
|
|
|
|
|
|
|
def format_chat_history_tuples(session_id: str) -> List[Tuple[str, str]]: |
|
|
"""Format session messages for Gradio Chatbot (tuple format for compatibility)""" |
|
|
session = session_manager.load_session(session_id) |
|
|
if not session: |
|
|
return [] |
|
|
|
|
|
history = [] |
|
|
user_msg = None |
|
|
|
|
|
for msg in session.messages: |
|
|
if msg["role"] == "user": |
|
|
user_msg = msg["content"] |
|
|
elif msg["role"] == "assistant" and user_msg: |
|
|
history.append((user_msg, msg["content"])) |
|
|
user_msg = None |
|
|
|
|
|
if user_msg: |
|
|
history.append((user_msg, None)) |
|
|
|
|
|
return history |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ui_new_chat(mode: str = "chat"): |
|
|
"""Create new chat session""" |
|
|
|
|
|
if mode == "roleplay": |
|
|
system_prompt = DEFAULT_ROLEPLAY_PROMPT |
|
|
session_type = "roleplay" |
|
|
else: |
|
|
|
|
|
model_id = model_manager.current_model_id |
|
|
configs = _load_model_configs() |
|
|
system_prompt = configs.get(model_id, {}).get("system_prompt", DEFAULT_CHAT_PROMPT) |
|
|
session_type = "chat" |
|
|
|
|
|
session = session_manager.create_session("", session_type, system_prompt) |
|
|
return ( |
|
|
session.session_id, |
|
|
[], |
|
|
get_chat_list(), |
|
|
session.title |
|
|
) |
|
|
|
|
|
|
|
|
def ui_load_session(evt, sessions_data): |
|
|
"""Load session from sidebar click""" |
|
|
try: |
|
|
if hasattr(evt, 'index') and evt.index[0] < len(sessions_data): |
|
|
session_id = sessions_data[evt.index[0]][0] |
|
|
session = session_manager.load_session(session_id) |
|
|
if session: |
|
|
history = format_chat_history_tuples(session_id) |
|
|
return session_id, history, session.title, session.session_type == "roleplay" |
|
|
except: |
|
|
pass |
|
|
return "", [], "", False |
|
|
|
|
|
|
|
|
def ui_send_message( |
|
|
session_id: str, |
|
|
message: str, |
|
|
history: List, |
|
|
max_tokens: int, |
|
|
temperature: float, |
|
|
is_roleplay: bool = False |
|
|
): |
|
|
"""Send message and stream response""" |
|
|
if not session_id: |
|
|
|
|
|
mode = "roleplay" if is_roleplay else "chat" |
|
|
session_id, _, _, _ = ui_new_chat(mode) |
|
|
|
|
|
if not message.strip(): |
|
|
yield history, "", session_id, get_chat_list() |
|
|
return |
|
|
|
|
|
if model_manager.current_model is None: |
|
|
history = history + [(message, "Please load a model first from the menu.")] |
|
|
yield history, "", session_id, get_chat_list() |
|
|
return |
|
|
|
|
|
|
|
|
history = history + [(message, None)] |
|
|
yield history, "", session_id, get_chat_list() |
|
|
|
|
|
|
|
|
from .text_ai import api_chat |
|
|
result = json.loads(api_chat(session_id, message, max_tokens, temperature)) |
|
|
|
|
|
if result["success"]: |
|
|
history[-1] = (message, result["response"]) |
|
|
|
|
|
session = session_manager.load_session(session_id) |
|
|
title = session.title if session else "" |
|
|
else: |
|
|
history[-1] = (message, f"Error: {result.get('error', 'Unknown error')}") |
|
|
title = "" |
|
|
|
|
|
yield history, "", session_id, get_chat_list() |
|
|
|
|
|
|
|
|
def ui_rename_session(session_id: str, new_title: str): |
|
|
"""Rename current session""" |
|
|
if session_id and new_title.strip(): |
|
|
session_manager.rename_session(session_id, new_title.strip()) |
|
|
return get_chat_list(), new_title.strip() |
|
|
return get_chat_list(), "" |
|
|
|
|
|
|
|
|
def ui_delete_session(session_id: str): |
|
|
"""Delete current session""" |
|
|
if session_id: |
|
|
session_manager.delete_session(session_id) |
|
|
return "", [], get_chat_list(), "" |
|
|
|
|
|
|
|
|
def ui_clear_chat(session_id: str): |
|
|
"""Clear current chat messages""" |
|
|
if session_id: |
|
|
session_manager.clear_session(session_id) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_models_table() -> List[List[str]]: |
|
|
"""Get models for table display""" |
|
|
models = model_manager.get_available_models() |
|
|
rows = [] |
|
|
for m in models: |
|
|
configs = _load_model_configs() |
|
|
has_prompt = "β" if m["id"] in configs else "" |
|
|
rows.append([ |
|
|
"β" if m["loaded"] else "", |
|
|
m["name"], |
|
|
m["type"], |
|
|
m["size"], |
|
|
has_prompt |
|
|
]) |
|
|
return rows |
|
|
|
|
|
|
|
|
def ui_load_model_by_index(evt, models_data): |
|
|
"""Load model by clicking on table row""" |
|
|
try: |
|
|
if hasattr(evt, 'index') and evt.index[0] < len(models_data): |
|
|
models = model_manager.get_available_models() |
|
|
if evt.index[0] < len(models): |
|
|
model_id = models[evt.index[0]]["id"] |
|
|
result = model_manager.load_model(model_id) |
|
|
status = f"Loaded: {model_id}" if result.get("success") else f"Error: {result.get('error')}" |
|
|
return get_models_table(), get_current_model_display(), status |
|
|
except Exception as e: |
|
|
return get_models_table(), get_current_model_display(), f"Error: {str(e)}" |
|
|
return get_models_table(), get_current_model_display(), "" |
|
|
|
|
|
|
|
|
def ui_unload_model(): |
|
|
"""Unload current model""" |
|
|
model_manager.unload_model() |
|
|
return get_models_table(), get_current_model_display(), "Model unloaded" |
|
|
|
|
|
|
|
|
def ui_save_model_prompt(model_name: str, system_prompt: str): |
|
|
"""Save system prompt for a model""" |
|
|
if not model_name: |
|
|
return "Select a model first" |
|
|
|
|
|
|
|
|
models = model_manager.get_available_models() |
|
|
model_id = None |
|
|
for m in models: |
|
|
if m["name"] == model_name or m["id"] == model_name: |
|
|
model_id = m["id"] |
|
|
break |
|
|
|
|
|
if not model_id: |
|
|
return "Model not found" |
|
|
|
|
|
configs = _load_model_configs() |
|
|
if model_id not in configs: |
|
|
configs[model_id] = {} |
|
|
configs[model_id]["system_prompt"] = system_prompt |
|
|
_save_model_configs(configs) |
|
|
|
|
|
return f"Saved prompt for {model_name}" |
|
|
|
|
|
|
|
|
def ui_get_model_prompt(model_name: str) -> str: |
|
|
"""Get system prompt for a model""" |
|
|
models = model_manager.get_available_models() |
|
|
model_id = None |
|
|
for m in models: |
|
|
if m["name"] == model_name or m["id"] == model_name: |
|
|
model_id = m["id"] |
|
|
break |
|
|
|
|
|
if model_id: |
|
|
configs = _load_model_configs() |
|
|
return configs.get(model_id, {}).get("system_prompt", DEFAULT_CHAT_PROMPT) |
|
|
return DEFAULT_CHAT_PROMPT |
|
|
|
|
|
|
|
|
def ui_delete_model_config(model_name: str): |
|
|
"""Delete model from config (not the file)""" |
|
|
models = model_manager.get_available_models() |
|
|
model_id = None |
|
|
for m in models: |
|
|
if m["name"] == model_name: |
|
|
model_id = m["id"] |
|
|
break |
|
|
|
|
|
if model_id: |
|
|
configs = _load_model_configs() |
|
|
if model_id in configs: |
|
|
del configs[model_id] |
|
|
_save_model_configs(configs) |
|
|
return get_models_table(), "Config removed" |
|
|
return get_models_table(), "Model not found" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PERSONAS_FILE = MODELS_DIR.parent / "storage" / "personas.json" |
|
|
|
|
|
|
|
|
def _load_personas() -> dict: |
|
|
"""Load saved personas""" |
|
|
if PERSONAS_FILE.exists(): |
|
|
try: |
|
|
return json.loads(PERSONAS_FILE.read_text()) |
|
|
except: |
|
|
pass |
|
|
return {"default": {"name": "Default", "prompt": DEFAULT_CHAT_PROMPT}} |
|
|
|
|
|
|
|
|
def _save_personas(personas: dict): |
|
|
"""Save personas""" |
|
|
PERSONAS_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
|
PERSONAS_FILE.write_text(json.dumps(personas, indent=2)) |
|
|
|
|
|
|
|
|
def get_persona_choices() -> List[str]: |
|
|
"""Get persona names for dropdown""" |
|
|
personas = _load_personas() |
|
|
return list(personas.keys()) |
|
|
|
|
|
|
|
|
def ui_save_persona(name: str, prompt: str): |
|
|
"""Save a persona""" |
|
|
if not name.strip(): |
|
|
return get_persona_choices(), "Enter persona name" |
|
|
|
|
|
personas = _load_personas() |
|
|
personas[name.strip()] = {"name": name.strip(), "prompt": prompt} |
|
|
_save_personas(personas) |
|
|
return get_persona_choices(), f"Saved: {name}" |
|
|
|
|
|
|
|
|
def ui_load_persona(name: str) -> str: |
|
|
"""Load persona prompt""" |
|
|
personas = _load_personas() |
|
|
return personas.get(name, {}).get("prompt", DEFAULT_CHAT_PROMPT) |
|
|
|
|
|
|
|
|
def ui_delete_persona(name: str): |
|
|
"""Delete a persona""" |
|
|
if name == "default": |
|
|
return get_persona_choices(), "Cannot delete default persona" |
|
|
|
|
|
personas = _load_personas() |
|
|
if name in personas: |
|
|
del personas[name] |
|
|
_save_personas(personas) |
|
|
return get_persona_choices(), f"Deleted: {name}" |
|
|
return get_persona_choices(), "Persona not found" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_MODEL = { |
|
|
"id": "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF", |
|
|
"file": "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf", |
|
|
"name": "TinyLlama 1.1B", |
|
|
"size": "0.7GB" |
|
|
} |
|
|
|
|
|
SUGGESTED_MODELS = [ |
|
|
{"id": "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF", "file": "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf", "name": "TinyLlama 1.1B (Fast)", "size": "0.7GB", "recommended": True}, |
|
|
{"id": "TheBloke/phi-2-GGUF", "file": "phi-2.Q4_K_M.gguf", "name": "Phi-2 (Small & Fast)", "size": "1.6GB"}, |
|
|
{"id": "Qwen/Qwen2-0.5B-Instruct-GGUF", "file": "qwen2-0_5b-instruct-q4_k_m.gguf", "name": "Qwen2 0.5B (Tiny)", "size": "0.4GB"}, |
|
|
{"id": "TheBloke/Mistral-7B-Instruct-v0.2-GGUF", "file": "mistral-7b-instruct-v0.2.Q4_K_M.gguf", "name": "Mistral 7B Instruct", "size": "4.4GB"}, |
|
|
{"id": "TheBloke/Llama-2-7B-Chat-GGUF", "file": "llama-2-7b-chat.Q4_K_M.gguf", "name": "Llama 2 7B Chat", "size": "4.1GB"}, |
|
|
{"id": "TheBloke/OpenHermes-2.5-Mistral-7B-GGUF", "file": "openhermes-2.5-mistral-7b.Q4_K_M.gguf", "name": "OpenHermes 2.5", "size": "4.4GB"}, |
|
|
] |
|
|
|
|
|
|
|
|
def download_default_model() -> str: |
|
|
"""Download the default model if not present""" |
|
|
from .hf_hub import download_model_file |
|
|
from .config import MODELS_DIR |
|
|
|
|
|
txt_dir = MODELS_DIR / "txt" |
|
|
txt_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
existing = list(txt_dir.rglob("*.gguf")) |
|
|
if existing: |
|
|
return f"Model already exists: {existing[0].name}" |
|
|
|
|
|
|
|
|
result = download_model_file(DEFAULT_MODEL["id"], DEFAULT_MODEL["file"]) |
|
|
return result |
|
|
|
|
|
|
|
|
def ensure_model_available() -> bool: |
|
|
"""Ensure at least one model is available, download if needed""" |
|
|
models = model_manager.get_available_models() |
|
|
return len(models) > 0 |
|
|
|
|
|
|
|
|
def get_suggested_models_table() -> List[List[str]]: |
|
|
"""Get suggested models for display""" |
|
|
return [[m["name"], m["size"], m["id"]] for m in SUGGESTED_MODELS] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_sessions_list(session_type: str = None) -> List[List[str]]: |
|
|
"""Legacy: Get sessions list""" |
|
|
return get_chat_list() |
|
|
|
|
|
|
|
|
def get_model_status_display() -> str: |
|
|
"""Legacy: Get model status""" |
|
|
status = model_manager.get_status() |
|
|
if status["loaded"]: |
|
|
return f"Model: {status['model_id']}" |
|
|
return "No model loaded" |
|
|
|
|
|
|
|
|
def ui_refresh_models(): |
|
|
"""Legacy: Refresh models""" |
|
|
return get_models_table(), get_current_model_display() |
|
|
|
|
|
|
|
|
def ui_load_model(evt, models_data): |
|
|
"""Legacy: Load model""" |
|
|
return ui_load_model_by_index(evt, models_data) |
|
|
|