diff --git "a/src/proxy_app/settings_tool.py" "b/src/proxy_app/settings_tool.py" new file mode 100644--- /dev/null +++ "b/src/proxy_app/settings_tool.py" @@ -0,0 +1,2474 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2026 Mirrowel + +""" +Advanced settings configuration tool for the LLM API Key Proxy. +Provides interactive configuration for custom providers, model definitions, and concurrency limits. +""" + +import json +import os +from pathlib import Path +from typing import Dict, Any, Optional, List +from rich.console import Console +from rich.prompt import Prompt, IntPrompt, Confirm +from rich.panel import Panel +from dotenv import set_key, unset_key + +from rotator_library.utils.paths import get_data_file + +console = Console() + +# Sentinel value for distinguishing "no pending change" from "pending change to None" +_NOT_FOUND = object() + +# Import default OAuth port values from provider modules +# These serve as the source of truth for default port values +try: + from rotator_library.providers.gemini_auth_base import GeminiAuthBase + + GEMINI_CLI_DEFAULT_OAUTH_PORT = GeminiAuthBase.CALLBACK_PORT +except ImportError: + GEMINI_CLI_DEFAULT_OAUTH_PORT = 8085 + +try: + from rotator_library.providers.antigravity_auth_base import AntigravityAuthBase + + ANTIGRAVITY_DEFAULT_OAUTH_PORT = AntigravityAuthBase.CALLBACK_PORT +except ImportError: + ANTIGRAVITY_DEFAULT_OAUTH_PORT = 51121 + +try: + from rotator_library.providers.iflow_auth_base import ( + CALLBACK_PORT as IFLOW_DEFAULT_OAUTH_PORT, + ) +except ImportError: + IFLOW_DEFAULT_OAUTH_PORT = 11451 + + +def clear_screen(subtitle: str = ""): + """ + Cross-platform terminal clear with optional header. + + Uses native OS commands instead of ANSI escape sequences: + - Windows (conhost & Windows Terminal): cls + - Unix-like systems (Linux, Mac): clear + + Args: + subtitle: If provided, displays a header panel with this subtitle. + If empty/None, just clears the screen. + """ + os.system("cls" if os.name == "nt" else "clear") + if subtitle: + console.print( + Panel( + f"[bold cyan]{subtitle}[/bold cyan]", + title="--- API Key Proxy ---", + ) + ) + + +class AdvancedSettings: + """Manages pending changes to .env""" + + def __init__(self): + self.env_file = get_data_file(".env") + self.pending_changes = {} # key -> value (None means delete) + self.load_current_settings() + + def load_current_settings(self): + """Load current .env values into env vars""" + from dotenv import load_dotenv + + load_dotenv(self.env_file, override=True) + + def set(self, key: str, value: str): + """Stage a change""" + self.pending_changes[key] = value + + def remove(self, key: str): + """Stage a removal""" + self.pending_changes[key] = None + + def save(self): + """Write pending changes to .env""" + for key, value in self.pending_changes.items(): + if value is None: + # Remove key + unset_key(str(self.env_file), key) + else: + # Set key + set_key(str(self.env_file), key, value) + + self.pending_changes.clear() + self.load_current_settings() + + def discard(self): + """Discard pending changes""" + self.pending_changes.clear() + + def has_pending(self) -> bool: + """Check if there are pending changes""" + return bool(self.pending_changes) + + def get_pending_value(self, key: str): + """Get pending value for a key. Returns sentinel _NOT_FOUND if no pending change.""" + return self.pending_changes.get(key, _NOT_FOUND) + + def get_original_value(self, key: str) -> Optional[str]: + """Get the current .env value (before pending changes)""" + return os.getenv(key) + + def get_change_type(self, key: str) -> Optional[str]: + """Returns 'add', 'edit', 'remove', or None if no pending change""" + if key not in self.pending_changes: + return None + if self.pending_changes[key] is None: + return "remove" + elif os.getenv(key) is not None: + return "edit" + else: + return "add" + + def get_pending_keys_by_pattern( + self, prefix: str = "", suffix: str = "" + ) -> List[str]: + """Get all pending change keys that match prefix and/or suffix""" + return [ + k + for k in self.pending_changes.keys() + if k.startswith(prefix) and k.endswith(suffix) + ] + + def get_changes_summary(self) -> Dict[str, List[tuple]]: + """Get categorized summary of all pending changes. + Returns dict with 'add', 'edit', 'remove' keys, + each containing list of (key, old_val, new_val) tuples. + """ + summary: Dict[str, List[tuple]] = {"add": [], "edit": [], "remove": []} + for key, new_val in self.pending_changes.items(): + old_val = os.getenv(key) + change_type = self.get_change_type(key) + if change_type: + summary[change_type].append((key, old_val, new_val)) + # Sort each list alphabetically by key + for change_type in summary: + summary[change_type].sort(key=lambda x: x[0]) + return summary + + def get_pending_counts(self) -> Dict[str, int]: + """Get counts of pending changes by type""" + adds = len( + [ + k + for k, v in self.pending_changes.items() + if v is not None and os.getenv(k) is None + ] + ) + edits = len( + [ + k + for k, v in self.pending_changes.items() + if v is not None and os.getenv(k) is not None + ] + ) + removes = len([k for k, v in self.pending_changes.items() if v is None]) + return {"add": adds, "edit": edits, "remove": removes} + + +class CustomProviderManager: + """Manages custom provider API bases""" + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_current_providers(self) -> Dict[str, str]: + """Get currently configured custom providers""" + from proxy_app.provider_urls import PROVIDER_URL_MAP + + providers = {} + for key, value in os.environ.items(): + if key.endswith("_API_BASE"): + provider = key.replace("_API_BASE", "").lower() + # Only include if NOT in hardcoded map + if provider not in PROVIDER_URL_MAP: + providers[provider] = value + return providers + + def add_provider(self, name: str, api_base: str): + """Add PROVIDER_API_BASE""" + key = f"{name.upper()}_API_BASE" + self.settings.set(key, api_base) + + def edit_provider(self, name: str, api_base: str): + """Edit PROVIDER_API_BASE""" + self.add_provider(name, api_base) + + def remove_provider(self, name: str): + """Remove PROVIDER_API_BASE""" + key = f"{name.upper()}_API_BASE" + self.settings.remove(key) + + +class ModelDefinitionManager: + """Manages PROVIDER_MODELS""" + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_current_provider_models(self, provider: str) -> Optional[Dict]: + """Get currently configured models for a provider""" + key = f"{provider.upper()}_MODELS" + value = os.getenv(key) + if value: + try: + return json.loads(value) + except (json.JSONDecodeError, ValueError): + return None + return None + + def get_all_providers_with_models(self) -> Dict[str, int]: + """Get all providers with model definitions""" + providers = {} + for key, value in os.environ.items(): + if key.endswith("_MODELS"): + provider = key.replace("_MODELS", "").lower() + try: + parsed = json.loads(value) + if isinstance(parsed, dict): + providers[provider] = len(parsed) + elif isinstance(parsed, list): + providers[provider] = len(parsed) + except (json.JSONDecodeError, ValueError): + pass + return providers + + def set_models(self, provider: str, models: Dict[str, Dict[str, Any]]): + """Set PROVIDER_MODELS""" + key = f"{provider.upper()}_MODELS" + value = json.dumps(models) + self.settings.set(key, value) + + def remove_models(self, provider: str): + """Remove PROVIDER_MODELS""" + key = f"{provider.upper()}_MODELS" + self.settings.remove(key) + + +class ConcurrencyManager: + """Manages MAX_CONCURRENT_REQUESTS_PER_KEY_PROVIDER""" + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_current_limits(self) -> Dict[str, int]: + """Get currently configured concurrency limits""" + limits = {} + for key, value in os.environ.items(): + if key.startswith("MAX_CONCURRENT_REQUESTS_PER_KEY_"): + provider = key.replace("MAX_CONCURRENT_REQUESTS_PER_KEY_", "").lower() + try: + limits[provider] = int(value) + except (json.JSONDecodeError, ValueError): + pass + return limits + + def set_limit(self, provider: str, limit: int): + """Set concurrency limit""" + key = f"MAX_CONCURRENT_REQUESTS_PER_KEY_{provider.upper()}" + self.settings.set(key, str(limit)) + + def remove_limit(self, provider: str): + """Remove concurrency limit (reset to default)""" + key = f"MAX_CONCURRENT_REQUESTS_PER_KEY_{provider.upper()}" + self.settings.remove(key) + + +class RotationModeManager: + """Manages ROTATION_MODE_PROVIDER settings for sequential/balanced credential rotation""" + + VALID_MODES = ["balanced", "sequential"] + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_current_modes(self) -> Dict[str, str]: + """Get currently configured rotation modes""" + modes = {} + for key, value in os.environ.items(): + if key.startswith("ROTATION_MODE_"): + provider = key.replace("ROTATION_MODE_", "").lower() + if value.lower() in self.VALID_MODES: + modes[provider] = value.lower() + return modes + + def get_default_mode(self, provider: str) -> str: + """Get the default rotation mode for a provider""" + try: + from rotator_library.providers import PROVIDER_PLUGINS + + provider_class = PROVIDER_PLUGINS.get(provider.lower()) + if provider_class and hasattr(provider_class, "default_rotation_mode"): + return provider_class.default_rotation_mode + return "balanced" + except ImportError: + # Fallback defaults if import fails + if provider.lower() == "antigravity": + return "sequential" + return "balanced" + + def get_effective_mode(self, provider: str) -> str: + """Get the effective rotation mode (configured or default)""" + configured = self.get_current_modes().get(provider.lower()) + if configured: + return configured + return self.get_default_mode(provider) + + def set_mode(self, provider: str, mode: str): + """Set rotation mode for a provider""" + if mode.lower() not in self.VALID_MODES: + raise ValueError( + f"Invalid rotation mode: {mode}. Must be one of {self.VALID_MODES}" + ) + key = f"ROTATION_MODE_{provider.upper()}" + self.settings.set(key, mode.lower()) + + def remove_mode(self, provider: str): + """Remove rotation mode (reset to provider default)""" + key = f"ROTATION_MODE_{provider.upper()}" + self.settings.remove(key) + + +class PriorityMultiplierManager: + """Manages CONCURRENCY_MULTIPLIER__PRIORITY_ settings""" + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_provider_defaults(self, provider: str) -> Dict[int, int]: + """Get default priority multipliers from provider class""" + try: + from rotator_library.providers import PROVIDER_PLUGINS + + provider_class = PROVIDER_PLUGINS.get(provider.lower()) + if provider_class and hasattr( + provider_class, "default_priority_multipliers" + ): + return dict(provider_class.default_priority_multipliers) + except ImportError: + pass + return {} + + def get_sequential_fallback(self, provider: str) -> int: + """Get sequential fallback multiplier from provider class""" + try: + from rotator_library.providers import PROVIDER_PLUGINS + + provider_class = PROVIDER_PLUGINS.get(provider.lower()) + if provider_class and hasattr( + provider_class, "default_sequential_fallback_multiplier" + ): + return provider_class.default_sequential_fallback_multiplier + except ImportError: + pass + return 1 + + def get_current_multipliers(self) -> Dict[str, Dict[int, int]]: + """Get currently configured priority multipliers from env vars""" + multipliers: Dict[str, Dict[int, int]] = {} + for key, value in os.environ.items(): + if key.startswith("CONCURRENCY_MULTIPLIER_") and "_PRIORITY_" in key: + try: + # Parse: CONCURRENCY_MULTIPLIER__PRIORITY_ + parts = key.split("_PRIORITY_") + provider = parts[0].replace("CONCURRENCY_MULTIPLIER_", "").lower() + remainder = parts[1] + + # Check if mode-specific (has _SEQUENTIAL or _BALANCED suffix) + if "_" in remainder: + continue # Skip mode-specific for now (show in separate view) + + priority = int(remainder) + multiplier = int(value) + + if provider not in multipliers: + multipliers[provider] = {} + multipliers[provider][priority] = multiplier + except (ValueError, IndexError): + pass + return multipliers + + def get_effective_multiplier(self, provider: str, priority: int) -> int: + """Get effective multiplier (configured, provider default, or 1)""" + # Check env var override + current = self.get_current_multipliers() + if provider.lower() in current: + if priority in current[provider.lower()]: + return current[provider.lower()][priority] + + # Check provider defaults + defaults = self.get_provider_defaults(provider) + if priority in defaults: + return defaults[priority] + + # Return 1 (no multiplier) + return 1 + + def set_multiplier(self, provider: str, priority: int, multiplier: int): + """Set priority multiplier for a provider""" + if multiplier < 1: + raise ValueError("Multiplier must be >= 1") + key = f"CONCURRENCY_MULTIPLIER_{provider.upper()}_PRIORITY_{priority}" + self.settings.set(key, str(multiplier)) + + def remove_multiplier(self, provider: str, priority: int): + """Remove multiplier (reset to provider default)""" + key = f"CONCURRENCY_MULTIPLIER_{provider.upper()}_PRIORITY_{priority}" + self.settings.remove(key) + + +# ============================================================================= +# PROVIDER-SPECIFIC SETTINGS DEFINITIONS +# ============================================================================= + +# Antigravity provider environment variables +ANTIGRAVITY_SETTINGS = { + "ANTIGRAVITY_SIGNATURE_CACHE_TTL": { + "type": "int", + "default": 3600, + "description": "Memory cache TTL for Gemini 3 thought signatures (seconds)", + }, + "ANTIGRAVITY_SIGNATURE_DISK_TTL": { + "type": "int", + "default": 86400, + "description": "Disk cache TTL for Gemini 3 thought signatures (seconds)", + }, + "ANTIGRAVITY_PRESERVE_THOUGHT_SIGNATURES": { + "type": "bool", + "default": True, + "description": "Preserve thought signatures in client responses", + }, + "ANTIGRAVITY_ENABLE_SIGNATURE_CACHE": { + "type": "bool", + "default": True, + "description": "Enable signature caching for multi-turn conversations", + }, + "ANTIGRAVITY_ENABLE_DYNAMIC_MODELS": { + "type": "bool", + "default": False, + "description": "Enable dynamic model discovery from API", + }, + "ANTIGRAVITY_GEMINI3_TOOL_FIX": { + "type": "bool", + "default": True, + "description": "Enable Gemini 3 tool hallucination prevention", + }, + "ANTIGRAVITY_CLAUDE_TOOL_FIX": { + "type": "bool", + "default": True, + "description": "Enable Claude tool hallucination prevention", + }, + "ANTIGRAVITY_CLAUDE_THINKING_SANITIZATION": { + "type": "bool", + "default": True, + "description": "Sanitize thinking blocks for Claude multi-turn conversations", + }, + "ANTIGRAVITY_GEMINI3_TOOL_PREFIX": { + "type": "str", + "default": "gemini3_", + "description": "Prefix added to tool names for Gemini 3 disambiguation", + }, + "ANTIGRAVITY_GEMINI3_DESCRIPTION_PROMPT": { + "type": "str", + "default": "\n\nSTRICT PARAMETERS: {params}.", + "description": "Template for strict parameter hints in tool descriptions", + }, + "ANTIGRAVITY_CLAUDE_DESCRIPTION_PROMPT": { + "type": "str", + "default": "\n\nSTRICT PARAMETERS: {params}.", + "description": "Template for Claude strict parameter hints in tool descriptions", + }, + "ANTIGRAVITY_OAUTH_PORT": { + "type": "int", + "default": ANTIGRAVITY_DEFAULT_OAUTH_PORT, + "description": "Local port for OAuth callback server during authentication", + }, +} + +# Gemini CLI provider environment variables +GEMINI_CLI_SETTINGS = { + "GEMINI_CLI_SIGNATURE_CACHE_TTL": { + "type": "int", + "default": 3600, + "description": "Memory cache TTL for thought signatures (seconds)", + }, + "GEMINI_CLI_SIGNATURE_DISK_TTL": { + "type": "int", + "default": 86400, + "description": "Disk cache TTL for thought signatures (seconds)", + }, + "GEMINI_CLI_PRESERVE_THOUGHT_SIGNATURES": { + "type": "bool", + "default": True, + "description": "Preserve thought signatures in client responses", + }, + "GEMINI_CLI_ENABLE_SIGNATURE_CACHE": { + "type": "bool", + "default": True, + "description": "Enable signature caching for multi-turn conversations", + }, + "GEMINI_CLI_GEMINI3_TOOL_FIX": { + "type": "bool", + "default": True, + "description": "Enable Gemini 3 tool hallucination prevention", + }, + "GEMINI_CLI_GEMINI3_TOOL_PREFIX": { + "type": "str", + "default": "gemini3_", + "description": "Prefix added to tool names for Gemini 3 disambiguation", + }, + "GEMINI_CLI_GEMINI3_DESCRIPTION_PROMPT": { + "type": "str", + "default": "\n\nSTRICT PARAMETERS: {params}.", + "description": "Template for strict parameter hints in tool descriptions", + }, + "GEMINI_CLI_PROJECT_ID": { + "type": "str", + "default": "", + "description": "GCP Project ID for paid tier users (required for paid tiers)", + }, + "GEMINI_CLI_OAUTH_PORT": { + "type": "int", + "default": GEMINI_CLI_DEFAULT_OAUTH_PORT, + "description": "Local port for OAuth callback server during authentication", + }, +} + +# iFlow provider environment variables +IFLOW_SETTINGS = { + "IFLOW_OAUTH_PORT": { + "type": "int", + "default": IFLOW_DEFAULT_OAUTH_PORT, + "description": "Local port for OAuth callback server during authentication", + }, +} + +# Map provider names to their settings definitions +PROVIDER_SETTINGS_MAP = { + "antigravity": ANTIGRAVITY_SETTINGS, + "gemini_cli": GEMINI_CLI_SETTINGS, + "iflow": IFLOW_SETTINGS, +} + + +class ProviderSettingsManager: + """Manages provider-specific configuration settings""" + + def __init__(self, settings: AdvancedSettings): + self.settings = settings + + def get_available_providers(self) -> List[str]: + """Get list of providers with specific settings available""" + return list(PROVIDER_SETTINGS_MAP.keys()) + + def get_provider_settings_definitions( + self, provider: str + ) -> Dict[str, Dict[str, Any]]: + """Get settings definitions for a provider""" + return PROVIDER_SETTINGS_MAP.get(provider, {}) + + def get_current_value(self, key: str, definition: Dict[str, Any]) -> Any: + """Get current value of a setting from environment""" + env_value = os.getenv(key) + if env_value is None: + return definition.get("default") + + setting_type = definition.get("type", "str") + try: + if setting_type == "bool": + return env_value.lower() in ("true", "1", "yes") + elif setting_type == "int": + return int(env_value) + else: + return env_value + except (ValueError, AttributeError): + return definition.get("default") + + def get_all_current_values(self, provider: str) -> Dict[str, Any]: + """Get all current values for a provider""" + definitions = self.get_provider_settings_definitions(provider) + values = {} + for key, definition in definitions.items(): + values[key] = self.get_current_value(key, definition) + return values + + def set_value(self, key: str, value: Any, definition: Dict[str, Any]): + """Set a setting value, converting to string for .env storage""" + setting_type = definition.get("type", "str") + if setting_type == "bool": + str_value = "true" if value else "false" + else: + str_value = str(value) + self.settings.set(key, str_value) + + def reset_to_default(self, key: str): + """Remove a setting to reset it to default""" + self.settings.remove(key) + + def get_modified_settings(self, provider: str) -> Dict[str, Any]: + """Get settings that differ from defaults""" + definitions = self.get_provider_settings_definitions(provider) + modified = {} + for key, definition in definitions.items(): + current = self.get_current_value(key, definition) + default = definition.get("default") + if current != default: + modified[key] = current + return modified + + +class SettingsTool: + """Main settings tool TUI""" + + def __init__(self): + self.console = Console() + self.settings = AdvancedSettings() + self.provider_mgr = CustomProviderManager(self.settings) + self.model_mgr = ModelDefinitionManager(self.settings) + self.concurrency_mgr = ConcurrencyManager(self.settings) + self.rotation_mgr = RotationModeManager(self.settings) + self.priority_multiplier_mgr = PriorityMultiplierManager(self.settings) + self.provider_settings_mgr = ProviderSettingsManager(self.settings) + self.running = True + + def _format_item( + self, + name: str, + value: str, + change_type: Optional[str], + old_value: Optional[str] = None, + width: int = 15, + ) -> str: + """Format a list item with change indicator. + + change_type: None, 'add', 'edit', 'remove' + Returns formatted string like: + " + myapi https://api.example.com" (green) + " ~ openai 1 → 5 requests/key" (yellow) + " - oldapi https://old.api.com" (red) + " • groq 3 requests/key" (normal) + """ + if change_type == "add": + return f" [green]+ {name:{width}} {value}[/green]" + elif change_type == "edit": + if old_value is not None: + return f" [yellow]~ {name:{width}} {old_value} → {value}[/yellow]" + else: + return f" [yellow]~ {name:{width}} {value}[/yellow]" + elif change_type == "remove": + return f" [red]- {name:{width}} {value}[/red]" + else: + return f" • {name:{width}} {value}" + + def _get_pending_status_text(self) -> str: + """Get formatted pending changes status text for main menu.""" + if not self.settings.has_pending(): + return "[dim]ℹ️ No pending changes[/dim]" + + counts = self.settings.get_pending_counts() + parts = [] + if counts["add"]: + parts.append( + f"[green]{counts['add']} addition{'s' if counts['add'] > 1 else ''}[/green]" + ) + if counts["edit"]: + parts.append( + f"[yellow]{counts['edit']} modification{'s' if counts['edit'] > 1 else ''}[/yellow]" + ) + if counts["remove"]: + parts.append( + f"[red]{counts['remove']} removal{'s' if counts['remove'] > 1 else ''}[/red]" + ) + + return f"[bold]ℹ️ Pending changes: {', '.join(parts)}[/bold]" + self.running = True + + def get_available_providers(self) -> List[str]: + """Get list of providers that have credentials configured""" + env_file = get_data_file(".env") + providers = set() + + # Scan for providers with API keys from local .env + if env_file.exists(): + try: + with open(env_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + # Skip comments and empty lines + if not line or line.startswith("#"): + continue + if ( + "_API_KEY" in line + and "PROXY_API_KEY" not in line + and "=" in line + ): + provider = line.split("_API_KEY")[0].strip().lower() + providers.add(provider) + except (IOError, OSError): + pass + + # Also check for OAuth providers from files + from rotator_library.utils.paths import get_oauth_dir + + oauth_dir = get_oauth_dir() + if oauth_dir.exists(): + for file in oauth_dir.glob("*_oauth_*.json"): + provider = file.name.split("_oauth_")[0] + providers.add(provider) + + return sorted(list(providers)) + + def run(self): + """Main loop""" + while self.running: + self.show_main_menu() + + def show_main_menu(self): + """Display settings categories""" + clear_screen() + + self.console.print( + Panel.fit( + "[bold cyan]🔧 Advanced Settings Configuration[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]⚙️ Configuration Categories[/bold]") + self.console.print() + self.console.print(" 1. 🌐 Custom Provider API Bases") + self.console.print(" 2. 📦 Provider Model Definitions") + self.console.print(" 3. ⚡ Concurrency Limits") + self.console.print(" 4. 🔄 Rotation Modes") + self.console.print(" 5. 🔬 Provider-Specific Settings") + self.console.print(" 6. 🎯 Model Filters (Ignore/Whitelist)") + self.console.print(" 7. 💾 Save & Exit") + self.console.print(" 8. 🚫 Exit Without Saving") + + self.console.print() + self.console.print("━" * 70) + + self.console.print(self._get_pending_status_text()) + + self.console.print() + + choice = Prompt.ask( + "Select option", + choices=["1", "2", "3", "4", "5", "6", "7", "8"], + show_choices=False, + ) + + if choice == "1": + self.manage_custom_providers() + elif choice == "2": + self.manage_model_definitions() + elif choice == "3": + self.manage_concurrency_limits() + elif choice == "4": + self.manage_rotation_modes() + elif choice == "5": + self.manage_provider_settings() + elif choice == "6": + self.launch_model_filter_gui() + elif choice == "7": + self.save_and_exit() + elif choice == "8": + self.exit_without_saving() + + def manage_custom_providers(self): + """Manage custom provider API bases""" + while True: + clear_screen() + + # Get current providers from env + providers = self.provider_mgr.get_current_providers() + + self.console.print( + Panel.fit( + "[bold cyan]🌐 Custom Provider API Bases[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Configured Custom Providers[/bold]") + self.console.print("━" * 70) + + # Build combined view with pending changes + all_providers: Dict[str, Dict[str, Any]] = {} + + # Add current providers (from env) + for name, base in providers.items(): + key = f"{name.upper()}_API_BASE" + change_type = self.settings.get_change_type(key) + if change_type == "remove": + all_providers[name] = {"value": base, "type": "remove", "old": None} + elif change_type == "edit": + new_val = self.settings.pending_changes[key] + all_providers[name] = { + "value": new_val, + "type": "edit", + "old": base, + } + else: + all_providers[name] = {"value": base, "type": None, "old": None} + + # Add pending new providers (additions) + for key in self.settings.get_pending_keys_by_pattern(suffix="_API_BASE"): + if self.settings.get_change_type(key) == "add": + name = key.replace("_API_BASE", "").lower() + if name not in all_providers: + all_providers[name] = { + "value": self.settings.pending_changes[key], + "type": "add", + "old": None, + } + + if all_providers: + # Sort alphabetically + for name in sorted(all_providers.keys()): + info = all_providers[name] + self.console.print( + self._format_item( + name, + info["value"], + info["type"], + info["old"], + ) + ) + else: + self.console.print(" [dim]No custom providers configured[/dim]") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + self.console.print("[bold]⚙️ Actions[/bold]") + self.console.print() + self.console.print(" 1. ➕ Add New Custom Provider") + self.console.print(" 2. ✏️ Edit Existing Provider") + self.console.print(" 3. 🗑️ Remove Provider") + self.console.print(" 4. ↩️ Back to Settings Menu") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choice = Prompt.ask( + "Select option", choices=["1", "2", "3", "4"], show_choices=False + ) + + if choice == "1": + name = Prompt.ask("Provider name (e.g., 'opencode')").strip().lower() + if name: + api_base = Prompt.ask("API Base URL").strip() + if api_base: + self.provider_mgr.add_provider(name, api_base) + self.console.print( + f"\n[green]✅ Custom provider '{name}' staged![/green]" + ) + self.console.print( + f" To use: set {name.upper()}_API_KEY in credentials" + ) + input("\nPress Enter to continue...") + + elif choice == "2": + # Get editable providers (existing + pending additions, excluding pending removals) + editable = { + k: v for k, v in all_providers.items() if v["type"] != "remove" + } + if not editable: + self.console.print("\n[yellow]No providers to edit[/yellow]") + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print("\n[bold]Select provider to edit:[/bold]") + providers_list = sorted(editable.keys()) + for idx, prov in enumerate(providers_list, 1): + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(providers_list) + 1)], + ) + name = providers_list[choice_idx - 1] + info = editable[name] + # Get effective current value (could be pending or from env) + current_base = info["value"] + + self.console.print(f"\nCurrent API Base: {current_base}") + new_base = Prompt.ask( + "New API Base [press Enter to keep current]", default=current_base + ).strip() + + if new_base and new_base != current_base: + self.provider_mgr.edit_provider(name, new_base) + self.console.print( + f"\n[green]✅ Custom provider '{name}' updated![/green]" + ) + else: + self.console.print("\n[yellow]No changes made[/yellow]") + input("\nPress Enter to continue...") + + elif choice == "3": + # Get removable providers (existing ones not already pending removal) + removable = { + k: v + for k, v in all_providers.items() + if v["type"] != "remove" and v["type"] != "add" + } + # For pending additions, we can "undo" by removing from pending + pending_adds = { + k: v for k, v in all_providers.items() if v["type"] == "add" + } + + if not removable and not pending_adds: + self.console.print("\n[yellow]No providers to remove[/yellow]") + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print("\n[bold]Select provider to remove:[/bold]") + # Show existing providers first, then pending additions + providers_list = sorted(removable.keys()) + sorted(pending_adds.keys()) + for idx, prov in enumerate(providers_list, 1): + if prov in pending_adds: + self.console.print( + f" {idx}. {prov} [green](pending add)[/green]" + ) + else: + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(providers_list) + 1)], + ) + name = providers_list[choice_idx - 1] + + if Confirm.ask(f"Remove '{name}'?"): + if name in pending_adds: + # Undo pending addition - remove from pending_changes + key = f"{name.upper()}_API_BASE" + del self.settings.pending_changes[key] + self.console.print( + f"\n[green]✅ Pending addition of '{name}' cancelled![/green]" + ) + else: + self.provider_mgr.remove_provider(name) + self.console.print( + f"\n[green]✅ Provider '{name}' marked for removal![/green]" + ) + input("\nPress Enter to continue...") + + elif choice == "4": + break + + def manage_model_definitions(self): + """Manage provider model definitions""" + while True: + clear_screen() + + # Get current providers with models from env + all_providers_env = self.model_mgr.get_all_providers_with_models() + + self.console.print( + Panel.fit( + "[bold cyan]📦 Provider Model Definitions[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Configured Provider Models[/bold]") + self.console.print("━" * 70) + + # Build combined view with pending changes + all_models: Dict[str, Dict[str, Any]] = {} + suffix = "_MODELS" + + # Add current providers (from env) + for provider, count in all_providers_env.items(): + key = f"{provider.upper()}{suffix}" + change_type = self.settings.get_change_type(key) + if change_type == "remove": + all_models[provider] = { + "value": f"{count} model{'s' if count > 1 else ''}", + "type": "remove", + "old": None, + } + elif change_type == "edit": + # Get new model count from pending + new_val = self.settings.pending_changes[key] + try: + parsed = json.loads(new_val) + new_count = ( + len(parsed) if isinstance(parsed, (dict, list)) else 0 + ) + except (json.JSONDecodeError, ValueError): + new_count = 0 + all_models[provider] = { + "value": f"{new_count} model{'s' if new_count > 1 else ''}", + "type": "edit", + "old": f"{count} model{'s' if count > 1 else ''}", + } + else: + all_models[provider] = { + "value": f"{count} model{'s' if count > 1 else ''}", + "type": None, + "old": None, + } + + # Add pending new model definitions (additions) + for key in self.settings.get_pending_keys_by_pattern(suffix=suffix): + if self.settings.get_change_type(key) == "add": + provider = key.replace(suffix, "").lower() + if provider not in all_models: + new_val = self.settings.pending_changes[key] + try: + parsed = json.loads(new_val) + new_count = ( + len(parsed) if isinstance(parsed, (dict, list)) else 0 + ) + except (json.JSONDecodeError, ValueError): + new_count = 0 + all_models[provider] = { + "value": f"{new_count} model{'s' if new_count > 1 else ''}", + "type": "add", + "old": None, + } + + if all_models: + # Sort alphabetically + for provider in sorted(all_models.keys()): + info = all_models[provider] + self.console.print( + self._format_item( + provider, info["value"], info["type"], info["old"] + ) + ) + else: + self.console.print(" [dim]No model definitions configured[/dim]") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + self.console.print("[bold]⚙️ Actions[/bold]") + self.console.print() + self.console.print(" 1. ➕ Add Models for Provider") + self.console.print(" 2. ✏️ Edit Provider Models") + self.console.print(" 3. 👁️ View Provider Models") + self.console.print(" 4. 🗑️ Remove Provider Models") + self.console.print(" 5. ↩️ Back to Settings Menu") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choice = Prompt.ask( + "Select option", choices=["1", "2", "3", "4", "5"], show_choices=False + ) + + if choice == "1": + self.add_model_definitions() + elif choice == "2": + # Get editable models (existing + pending additions, excluding pending removals) + editable = { + k: v for k, v in all_models.items() if v["type"] != "remove" + } + if not editable: + self.console.print("\n[yellow]No providers to edit[/yellow]") + input("\nPress Enter to continue...") + continue + self.edit_model_definitions(sorted(editable.keys())) + elif choice == "3": + viewable = { + k: v for k, v in all_models.items() if v["type"] != "remove" + } + if not viewable: + self.console.print("\n[yellow]No providers to view[/yellow]") + input("\nPress Enter to continue...") + continue + self.view_model_definitions(sorted(viewable.keys())) + elif choice == "4": + # Get removable models (existing ones not already pending removal) + removable = { + k: v + for k, v in all_models.items() + if v["type"] != "remove" and v["type"] != "add" + } + pending_adds = { + k: v for k, v in all_models.items() if v["type"] == "add" + } + + if not removable and not pending_adds: + self.console.print("\n[yellow]No providers to remove[/yellow]") + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print( + "\n[bold]Select provider to remove models from:[/bold]" + ) + providers_list = sorted(removable.keys()) + sorted(pending_adds.keys()) + for idx, prov in enumerate(providers_list, 1): + if prov in pending_adds: + self.console.print( + f" {idx}. {prov} [green](pending add)[/green]" + ) + else: + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(providers_list) + 1)], + ) + provider = providers_list[choice_idx - 1] + + if Confirm.ask(f"Remove all model definitions for '{provider}'?"): + if provider in pending_adds: + # Undo pending addition + key = f"{provider.upper()}{suffix}" + del self.settings.pending_changes[key] + self.console.print( + f"\n[green]✅ Pending models for '{provider}' cancelled![/green]" + ) + else: + self.model_mgr.remove_models(provider) + self.console.print( + f"\n[green]✅ Model definitions marked for removal for '{provider}'![/green]" + ) + input("\nPress Enter to continue...") + elif choice == "5": + break + + def add_model_definitions(self): + """Add model definitions for a provider""" + # Get available providers from credentials + available_providers = self.get_available_providers() + + if not available_providers: + self.console.print( + "\n[yellow]No providers with credentials found. Please add credentials first.[/yellow]" + ) + input("\nPress Enter to continue...") + return + + # Show provider selection menu + self.console.print("\n[bold]Select provider:[/bold]") + for idx, prov in enumerate(available_providers, 1): + self.console.print(f" {idx}. {prov}") + self.console.print( + f" {len(available_providers) + 1}. Enter custom provider name" + ) + + choice = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(available_providers) + 2)], + ) + + if choice == len(available_providers) + 1: + provider = Prompt.ask("Provider name").strip().lower() + else: + provider = available_providers[choice - 1] + + if not provider: + return + + self.console.print("\nHow would you like to define models?") + self.console.print(" 1. Simple list (names only)") + self.console.print(" 2. Advanced (names with IDs and options)") + + mode = Prompt.ask("Select mode", choices=["1", "2"], show_choices=False) + + models = {} + + if mode == "1": + # Simple mode + while True: + name = Prompt.ask("\nModel name (or 'done' to finish)").strip() + if name.lower() == "done": + break + if name: + models[name] = {} + else: + # Advanced mode + while True: + name = Prompt.ask("\nModel name (or 'done' to finish)").strip() + if name.lower() == "done": + break + if name: + model_def = {} + model_id = Prompt.ask( + f"Model ID [press Enter to use '{name}']", default=name + ).strip() + if model_id and model_id != name: + model_def["id"] = model_id + + # Optional: model options + if Confirm.ask( + "Add model options (e.g., temperature limits)?", default=False + ): + self.console.print( + "\nEnter options as key=value pairs (one per line, 'done' to finish):" + ) + options = {} + while True: + opt = Prompt.ask("Option").strip() + if opt.lower() == "done": + break + if "=" in opt: + key, value = opt.split("=", 1) + value = value.strip() + # Try to convert to number if possible + try: + value = float(value) if "." in value else int(value) + except (ValueError, TypeError): + pass + options[key.strip()] = value + if options: + model_def["options"] = options + + models[name] = model_def + + if models: + self.model_mgr.set_models(provider, models) + self.console.print( + f"\n[green]✅ Model definitions saved for '{provider}'![/green]" + ) + else: + self.console.print("\n[yellow]No models added[/yellow]") + + input("\nPress Enter to continue...") + + def edit_model_definitions(self, providers: List[str]): + """Edit existing model definitions""" + # Show numbered list + self.console.print("\n[bold]Select provider to edit:[/bold]") + for idx, prov in enumerate(providers, 1): + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", choices=[str(i) for i in range(1, len(providers) + 1)] + ) + provider = providers[choice_idx - 1] + + current_models = self.model_mgr.get_current_provider_models(provider) + if not current_models: + self.console.print(f"\n[yellow]No models found for '{provider}'[/yellow]") + input("\nPress Enter to continue...") + return + + # Convert to dict if list + if isinstance(current_models, list): + current_models = {m: {} for m in current_models} + + while True: + clear_screen() + self.console.print(f"[bold]Editing models for: {provider}[/bold]\n") + self.console.print("Current models:") + for i, (name, definition) in enumerate(current_models.items(), 1): + model_id = ( + definition.get("id", name) if isinstance(definition, dict) else name + ) + self.console.print(f" {i}. {name} (ID: {model_id})") + + self.console.print("\nOptions:") + self.console.print(" 1. Add new model") + self.console.print(" 2. Edit existing model") + self.console.print(" 3. Remove model") + self.console.print(" 4. Done") + + choice = Prompt.ask( + "\nSelect option", choices=["1", "2", "3", "4"], show_choices=False + ) + + if choice == "1": + name = Prompt.ask("New model name").strip() + if name and name not in current_models: + model_id = Prompt.ask("Model ID", default=name).strip() + current_models[name] = {"id": model_id} if model_id != name else {} + + elif choice == "2": + # Show numbered list + models_list = list(current_models.keys()) + self.console.print("\n[bold]Select model to edit:[/bold]") + for idx, model_name in enumerate(models_list, 1): + self.console.print(f" {idx}. {model_name}") + + model_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(models_list) + 1)], + ) + name = models_list[model_idx - 1] + + current_def = current_models[name] + current_id = ( + current_def.get("id", name) + if isinstance(current_def, dict) + else name + ) + + new_id = Prompt.ask("Model ID", default=current_id).strip() + current_models[name] = {"id": new_id} if new_id != name else {} + + elif choice == "3": + # Show numbered list + models_list = list(current_models.keys()) + self.console.print("\n[bold]Select model to remove:[/bold]") + for idx, model_name in enumerate(models_list, 1): + self.console.print(f" {idx}. {model_name}") + + model_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(models_list) + 1)], + ) + name = models_list[model_idx - 1] + + if Confirm.ask(f"Remove '{name}'?"): + del current_models[name] + + elif choice == "4": + break + + if current_models: + self.model_mgr.set_models(provider, current_models) + self.console.print(f"\n[green]✅ Models updated for '{provider}'![/green]") + else: + self.console.print( + "\n[yellow]No models left - removing definition[/yellow]" + ) + self.model_mgr.remove_models(provider) + + input("\nPress Enter to continue...") + + def view_model_definitions(self, providers: List[str]): + """View model definitions for a provider""" + # Show numbered list + self.console.print("\n[bold]Select provider to view:[/bold]") + for idx, prov in enumerate(providers, 1): + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", choices=[str(i) for i in range(1, len(providers) + 1)] + ) + provider = providers[choice_idx - 1] + + models = self.model_mgr.get_current_provider_models(provider) + if not models: + self.console.print(f"\n[yellow]No models found for '{provider}'[/yellow]") + input("\nPress Enter to continue...") + return + + clear_screen() + self.console.print(f"[bold]Provider: {provider}[/bold]\n") + self.console.print("[bold]📦 Configured Models:[/bold]") + self.console.print("━" * 50) + + # Handle both dict and list formats + if isinstance(models, dict): + for name, definition in models.items(): + if isinstance(definition, dict): + model_id = definition.get("id", name) + self.console.print(f" Name: {name}") + self.console.print(f" ID: {model_id}") + if "options" in definition: + self.console.print(f" Options: {definition['options']}") + self.console.print() + else: + self.console.print(f" Name: {name}") + self.console.print() + elif isinstance(models, list): + for name in models: + self.console.print(f" Name: {name}") + self.console.print() + + input("Press Enter to return...") + + def launch_model_filter_gui(self): + """Launch the Model Filter GUI for managing ignore/whitelist rules""" + clear_screen() + self.console.print("\n[cyan]Launching Model Filter GUI...[/cyan]\n") + self.console.print( + "[dim]The GUI will open in a separate window. Close it to return here.[/dim]\n" + ) + + try: + from proxy_app.model_filter_gui import run_model_filter_gui + + run_model_filter_gui() # Blocks until GUI closes + except ImportError as e: + self.console.print(f"\n[red]Failed to launch Model Filter GUI: {e}[/red]") + self.console.print() + self.console.print( + "[yellow]Make sure 'customtkinter' is installed:[/yellow]" + ) + self.console.print(" [cyan]pip install customtkinter[/cyan]") + self.console.print() + input("Press Enter to continue...") + + def manage_provider_settings(self): + """Manage provider-specific settings (Antigravity, Gemini CLI)""" + while True: + clear_screen() + + available_providers = self.provider_settings_mgr.get_available_providers() + + self.console.print( + Panel.fit( + "[bold cyan]🔬 Provider-Specific Settings[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print( + "[bold]📋 Available Providers with Custom Settings[/bold]" + ) + self.console.print("━" * 70) + + for provider in available_providers: + modified = self.provider_settings_mgr.get_modified_settings(provider) + status = ( + f"[yellow]{len(modified)} modified[/yellow]" + if modified + else "[dim]defaults[/dim]" + ) + display_name = provider.replace("_", " ").title() + self.console.print(f" • {display_name:20} {status}") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + self.console.print("[bold]⚙️ Select Provider to Configure[/bold]") + self.console.print() + + for idx, provider in enumerate(available_providers, 1): + display_name = provider.replace("_", " ").title() + self.console.print(f" {idx}. {display_name}") + self.console.print( + f" {len(available_providers) + 1}. ↩️ Back to Settings Menu" + ) + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choices = [str(i) for i in range(1, len(available_providers) + 2)] + choice = Prompt.ask("Select option", choices=choices, show_choices=False) + choice_idx = int(choice) + + if choice_idx == len(available_providers) + 1: + break + + provider = available_providers[choice_idx - 1] + self._manage_single_provider_settings(provider) + + def _manage_single_provider_settings(self, provider: str): + """Manage settings for a single provider""" + while True: + display_name = provider.replace("_", " ").title() + clear_screen() + definitions = self.provider_settings_mgr.get_provider_settings_definitions( + provider + ) + current_values = self.provider_settings_mgr.get_all_current_values(provider) + + self.console.print( + Panel.fit( + f"[bold cyan]🔬 {display_name} Settings[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Current Settings[/bold]") + self.console.print("━" * 70) + + # Display all settings with current values and pending changes + settings_list = list(definitions.keys()) + for idx, key in enumerate(settings_list, 1): + definition = definitions[key] + current = current_values.get(key) + default = definition.get("default") + setting_type = definition.get("type", "str") + description = definition.get("description", "") + + # Check for pending changes + change_type = self.settings.get_change_type(key) + pending_val = self.settings.get_pending_value(key) + + # Determine effective value to display + if pending_val is not _NOT_FOUND and pending_val is not None: + # Has pending change - convert to proper type for display + if setting_type == "bool": + effective = pending_val.lower() in ("true", "1", "yes") + elif setting_type == "int": + try: + effective = int(pending_val) + except (ValueError, TypeError): + effective = pending_val + else: + effective = pending_val + elif pending_val is None and change_type == "remove": + # Pending removal - will revert to default + effective = default + else: + effective = current + + # Format value display + if setting_type == "bool": + value_display = ( + "[green]✓ Enabled[/green]" + if effective + else "[red]✗ Disabled[/red]" + ) + old_display = ( + ( + "[green]✓ Enabled[/green]" + if current + else "[red]✗ Disabled[/red]" + ) + if change_type + else None + ) + elif setting_type == "int": + value_display = f"[cyan]{effective}[/cyan]" + old_display = f"[cyan]{current}[/cyan]" if change_type else None + else: + value_display = ( + f"[cyan]{effective or '(not set)'}[/cyan]" + if effective + else "[dim](not set)[/dim]" + ) + old_display = ( + f"[cyan]{current}[/cyan]" if change_type and current else None + ) + + # Short key name for display (strip provider prefix) + short_key = key.replace(f"{provider.upper()}_", "") + + # Determine display marker based on pending change type + if change_type == "add": + self.console.print( + f" [green]+{idx:2}. {short_key:35} {value_display}[/green]" + ) + elif change_type == "edit": + self.console.print( + f" [yellow]~{idx:2}. {short_key:35} {old_display} → {value_display}[/yellow]" + ) + elif change_type == "remove": + self.console.print( + f" [red]-{idx:2}. {short_key:35} {old_display} → [dim](default: {default})[/dim][/red]" + ) + else: + # Check if modified from default (in env, not pending) + modified = current != default + mod_marker = "[yellow]*[/yellow]" if modified else " " + self.console.print( + f" {mod_marker}{idx:2}. {short_key:35} {value_display}" + ) + + self.console.print(f" [dim]{description}[/dim]") + + self.console.print() + self.console.print("━" * 70) + self.console.print( + "[dim]* = modified from default, + = pending add, ~ = pending edit, - = pending reset[/dim]" + ) + self.console.print() + self.console.print("[bold]⚙️ Actions[/bold]") + self.console.print() + self.console.print(" E. ✏️ Edit a Setting") + self.console.print(" R. 🔄 Reset Setting to Default") + self.console.print(" A. 🔄 Reset All to Defaults") + self.console.print(" B. ↩️ Back to Provider Selection") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choice = Prompt.ask( + "Select action", + choices=["e", "r", "a", "b", "E", "R", "A", "B"], + show_choices=False, + ).lower() + + if choice == "b": + break + elif choice == "e": + self._edit_provider_setting(provider, settings_list, definitions) + elif choice == "r": + self._reset_provider_setting(provider, settings_list, definitions) + elif choice == "a": + self._reset_all_provider_settings(provider, settings_list) + + def _edit_provider_setting( + self, + provider: str, + settings_list: List[str], + definitions: Dict[str, Dict[str, Any]], + ): + """Edit a single provider setting""" + self.console.print("\n[bold]Select setting number to edit:[/bold]") + + choices = [str(i) for i in range(1, len(settings_list) + 1)] + choice = IntPrompt.ask("Setting number", choices=choices) + key = settings_list[choice - 1] + definition = definitions[key] + + current = self.provider_settings_mgr.get_current_value(key, definition) + default = definition.get("default") + setting_type = definition.get("type", "str") + short_key = key.replace(f"{provider.upper()}_", "") + + self.console.print(f"\n[bold]Editing: {short_key}[/bold]") + self.console.print(f"Current value: [cyan]{current}[/cyan]") + self.console.print(f"Default value: [dim]{default}[/dim]") + self.console.print(f"Type: {setting_type}") + + if setting_type == "bool": + new_value = Confirm.ask("\nEnable this setting?", default=current) + self.provider_settings_mgr.set_value(key, new_value, definition) + status = "enabled" if new_value else "disabled" + self.console.print(f"\n[green]✅ {short_key} {status}![/green]") + elif setting_type == "int": + new_value = IntPrompt.ask("\nNew value", default=current) + self.provider_settings_mgr.set_value(key, new_value, definition) + self.console.print(f"\n[green]✅ {short_key} set to {new_value}![/green]") + else: + new_value = Prompt.ask( + "\nNew value", default=str(current) if current else "" + ).strip() + if new_value: + self.provider_settings_mgr.set_value(key, new_value, definition) + self.console.print(f"\n[green]✅ {short_key} updated![/green]") + else: + self.console.print("\n[yellow]No changes made[/yellow]") + + input("\nPress Enter to continue...") + + def _reset_provider_setting( + self, + provider: str, + settings_list: List[str], + definitions: Dict[str, Dict[str, Any]], + ): + """Reset a single provider setting to default""" + self.console.print("\n[bold]Select setting number to reset:[/bold]") + + choices = [str(i) for i in range(1, len(settings_list) + 1)] + choice = IntPrompt.ask("Setting number", choices=choices) + key = settings_list[choice - 1] + definition = definitions[key] + + default = definition.get("default") + short_key = key.replace(f"{provider.upper()}_", "") + + if Confirm.ask(f"\nReset {short_key} to default ({default})?"): + self.provider_settings_mgr.reset_to_default(key) + self.console.print(f"\n[green]✅ {short_key} reset to default![/green]") + else: + self.console.print("\n[yellow]No changes made[/yellow]") + + input("\nPress Enter to continue...") + + def _reset_all_provider_settings(self, provider: str, settings_list: List[str]): + """Reset all provider settings to defaults""" + display_name = provider.replace("_", " ").title() + + if Confirm.ask( + f"\n[bold red]Reset ALL {display_name} settings to defaults?[/bold red]" + ): + for key in settings_list: + self.provider_settings_mgr.reset_to_default(key) + self.console.print( + f"\n[green]✅ All {display_name} settings reset to defaults![/green]" + ) + else: + self.console.print("\n[yellow]No changes made[/yellow]") + + input("\nPress Enter to continue...") + + def manage_rotation_modes(self): + """Manage credential rotation modes (sequential vs balanced)""" + while True: + clear_screen() + + # Get current modes from env + modes = self.rotation_mgr.get_current_modes() + available_providers = self.get_available_providers() + + self.console.print( + Panel.fit( + "[bold cyan]🔄 Credential Rotation Mode Configuration[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Rotation Modes Explained[/bold]") + self.console.print("━" * 70) + self.console.print( + " [cyan]balanced[/cyan] - Rotate credentials evenly across requests (default)" + ) + self.console.print( + " [cyan]sequential[/cyan] - Use one credential until exhausted (429), then switch" + ) + self.console.print() + self.console.print("[bold]📋 Current Rotation Mode Settings[/bold]") + self.console.print("━" * 70) + + # Build combined view with pending changes + all_modes: Dict[str, Dict[str, Any]] = {} + prefix = "ROTATION_MODE_" + + # Add current modes (from env) + for provider, mode in modes.items(): + key = f"{prefix}{provider.upper()}" + change_type = self.settings.get_change_type(key) + default_mode = self.rotation_mgr.get_default_mode(provider) + if change_type == "remove": + all_modes[provider] = {"value": mode, "type": "remove", "old": None} + elif change_type == "edit": + new_val = self.settings.pending_changes[key] + all_modes[provider] = { + "value": new_val, + "type": "edit", + "old": mode, + } + else: + all_modes[provider] = {"value": mode, "type": None, "old": None} + + # Add pending new modes (additions) + for key in self.settings.get_pending_keys_by_pattern(prefix=prefix): + if self.settings.get_change_type(key) == "add": + provider = key.replace(prefix, "").lower() + if provider not in all_modes: + all_modes[provider] = { + "value": self.settings.pending_changes[key], + "type": "add", + "old": None, + } + + if all_modes: + # Sort alphabetically + for provider in sorted(all_modes.keys()): + info = all_modes[provider] + mode = info["value"] + mode_display = ( + f"[green]{mode}[/green]" + if mode == "sequential" + else f"[blue]{mode}[/blue]" + ) + old_display = None + if info["old"]: + old_display = ( + f"[green]{info['old']}[/green]" + if info["old"] == "sequential" + else f"[blue]{info['old']}[/blue]" + ) + + if info["type"] == "add": + self.console.print( + f" [green]+ {provider:20} {mode_display}[/green]" + ) + elif info["type"] == "edit": + self.console.print( + f" [yellow]~ {provider:20} {old_display} → {mode_display}[/yellow]" + ) + elif info["type"] == "remove": + self.console.print( + f" [red]- {provider:20} {mode_display}[/red]" + ) + else: + default_mode = self.rotation_mgr.get_default_mode(provider) + is_custom = mode != default_mode + marker = "[yellow]*[/yellow]" if is_custom else " " + self.console.print(f" {marker}• {provider:20} {mode_display}") + + # Show providers with default modes + providers_with_defaults = [ + p for p in available_providers if p not in modes and p not in all_modes + ] + if providers_with_defaults: + self.console.print() + self.console.print("[dim]Providers using default modes:[/dim]") + for provider in providers_with_defaults: + default_mode = self.rotation_mgr.get_default_mode(provider) + mode_display = ( + f"[green]{default_mode}[/green]" + if default_mode == "sequential" + else f"[blue]{default_mode}[/blue]" + ) + self.console.print( + f" • {provider:20} {mode_display} [dim](default)[/dim]" + ) + + self.console.print() + self.console.print("━" * 70) + self.console.print( + "[dim]* = custom setting (differs from provider default)[/dim]" + ) + self.console.print() + self.console.print("[bold]⚙️ Actions[/bold]") + self.console.print() + self.console.print(" 1. ➕ Set Rotation Mode for Provider") + self.console.print(" 2. 🗑️ Reset to Provider Default") + self.console.print(" 3. ⚡ Configure Priority Concurrency Multipliers") + self.console.print(" 4. ↩️ Back to Settings Menu") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choice = Prompt.ask( + "Select option", choices=["1", "2", "3", "4"], show_choices=False + ) + + if choice == "1": + if not available_providers: + self.console.print( + "\n[yellow]No providers with credentials found. Please add credentials first.[/yellow]" + ) + input("\nPress Enter to continue...") + continue + + # Show provider selection menu + self.console.print("\n[bold]Select provider:[/bold]") + for idx, prov in enumerate(available_providers, 1): + current_mode = self.rotation_mgr.get_effective_mode(prov) + mode_display = ( + f"[green]{current_mode}[/green]" + if current_mode == "sequential" + else f"[blue]{current_mode}[/blue]" + ) + self.console.print(f" {idx}. {prov} ({mode_display})") + self.console.print( + f" {len(available_providers) + 1}. Enter custom provider name" + ) + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(available_providers) + 2)], + ) + + if choice_idx == len(available_providers) + 1: + provider = Prompt.ask("Provider name").strip().lower() + else: + provider = available_providers[choice_idx - 1] + + if provider: + current_mode = self.rotation_mgr.get_effective_mode(provider) + self.console.print( + f"\nCurrent mode for {provider}: [cyan]{current_mode}[/cyan]" + ) + self.console.print("\nSelect new rotation mode:") + self.console.print( + " 1. [blue]balanced[/blue] - Rotate credentials evenly" + ) + self.console.print( + " 2. [green]sequential[/green] - Use until exhausted" + ) + + mode_choice = Prompt.ask( + "Select mode", choices=["1", "2"], show_choices=False + ) + new_mode = "balanced" if mode_choice == "1" else "sequential" + + self.rotation_mgr.set_mode(provider, new_mode) + self.console.print( + f"\n[green]✅ Rotation mode for '{provider}' staged as {new_mode}![/green]" + ) + input("\nPress Enter to continue...") + + elif choice == "2": + # Get resettable modes (existing + pending adds, excluding pending removes) + resettable = { + k: v for k, v in all_modes.items() if v["type"] != "remove" + } + if not resettable: + self.console.print( + "\n[yellow]No custom rotation modes to reset[/yellow]" + ) + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print( + "\n[bold]Select provider to reset to default:[/bold]" + ) + modes_list = sorted(resettable.keys()) + for idx, prov in enumerate(modes_list, 1): + default_mode = self.rotation_mgr.get_default_mode(prov) + info = resettable[prov] + if info["type"] == "add": + self.console.print( + f" {idx}. {prov} [green](pending add)[/green] - will cancel" + ) + else: + self.console.print( + f" {idx}. {prov} (will reset to: {default_mode})" + ) + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(modes_list) + 1)], + ) + provider = modes_list[choice_idx - 1] + default_mode = self.rotation_mgr.get_default_mode(provider) + info = resettable[provider] + + if Confirm.ask(f"Reset '{provider}' to default mode ({default_mode})?"): + if info["type"] == "add": + # Undo pending addition + key = f"{prefix}{provider.upper()}" + del self.settings.pending_changes[key] + self.console.print( + f"\n[green]✅ Pending mode for '{provider}' cancelled![/green]" + ) + else: + self.rotation_mgr.remove_mode(provider) + self.console.print( + f"\n[green]✅ Rotation mode for '{provider}' marked for reset to default ({default_mode})![/green]" + ) + input("\nPress Enter to continue...") + + elif choice == "3": + self.manage_priority_multipliers() + + elif choice == "4": + break + + def manage_priority_multipliers(self): + """Manage priority-based concurrency multipliers per provider""" + clear_screen() + + current_multipliers = self.priority_multiplier_mgr.get_current_multipliers() + available_providers = self.get_available_providers() + + self.console.print( + Panel.fit( + "[bold cyan]⚡ Priority Concurrency Multipliers[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Current Priority Multiplier Settings[/bold]") + self.console.print("━" * 70) + + # Show all providers with their priority multipliers + has_settings = False + for provider in available_providers: + defaults = self.priority_multiplier_mgr.get_provider_defaults(provider) + overrides = current_multipliers.get(provider, {}) + seq_fallback = self.priority_multiplier_mgr.get_sequential_fallback( + provider + ) + rotation_mode = self.rotation_mgr.get_effective_mode(provider) + + if defaults or overrides or seq_fallback != 1: + has_settings = True + self.console.print( + f"\n [bold]{provider}[/bold] ({rotation_mode} mode)" + ) + + # Combine and display priorities + all_priorities = set(defaults.keys()) | set(overrides.keys()) + for priority in sorted(all_priorities): + default_val = defaults.get(priority, 1) + override_val = overrides.get(priority) + + if override_val is not None: + self.console.print( + f" Priority {priority}: [cyan]{override_val}x[/cyan] (override, default: {default_val}x)" + ) + else: + self.console.print( + f" Priority {priority}: {default_val}x [dim](default)[/dim]" + ) + + # Show sequential fallback if applicable + if rotation_mode == "sequential" and seq_fallback != 1: + self.console.print( + f" Others (seq): {seq_fallback}x [dim](fallback)[/dim]" + ) + + if not has_settings: + self.console.print(" [dim]No priority multipliers configured[/dim]") + + self.console.print() + self.console.print("[bold]ℹ️ About Priority Multipliers:[/bold]") + self.console.print( + " Higher priority tiers (lower numbers) can have higher multipliers." + ) + self.console.print(" Example: Priority 1 = 5x, Priority 2 = 3x, Others = 1x") + self.console.print() + self.console.print("━" * 70) + self.console.print() + self.console.print(" 1. ✏️ Set Priority Multiplier") + self.console.print(" 2. 🔄 Reset to Provider Default") + self.console.print(" 3. ↩️ Back") + + choice = Prompt.ask( + "Select option", choices=["1", "2", "3"], show_choices=False + ) + + if choice == "1": + if not available_providers: + self.console.print("\n[yellow]No providers available[/yellow]") + input("\nPress Enter to continue...") + return + + # Select provider + self.console.print("\n[bold]Select provider:[/bold]") + for idx, prov in enumerate(available_providers, 1): + self.console.print(f" {idx}. {prov}") + + prov_idx = IntPrompt.ask( + "Provider", + choices=[str(i) for i in range(1, len(available_providers) + 1)], + ) + provider = available_providers[prov_idx - 1] + + # Get priority level + priority = IntPrompt.ask("Priority level (e.g., 1, 2, 3)") + + # Get current value + current = self.priority_multiplier_mgr.get_effective_multiplier( + provider, priority + ) + self.console.print( + f"\nCurrent multiplier for priority {priority}: {current}x" + ) + + multiplier = IntPrompt.ask("New multiplier (1-10)", default=current) + if 1 <= multiplier <= 10: + self.priority_multiplier_mgr.set_multiplier( + provider, priority, multiplier + ) + self.console.print( + f"\n[green]✅ Priority {priority} multiplier for '{provider}' set to {multiplier}x[/green]" + ) + else: + self.console.print( + "\n[yellow]Multiplier must be between 1 and 10[/yellow]" + ) + input("\nPress Enter to continue...") + + elif choice == "2": + # Find providers with overrides + providers_with_overrides = [ + p for p in available_providers if p in current_multipliers + ] + if not providers_with_overrides: + self.console.print("\n[yellow]No custom multipliers to reset[/yellow]") + input("\nPress Enter to continue...") + return + + self.console.print("\n[bold]Select provider to reset:[/bold]") + for idx, prov in enumerate(providers_with_overrides, 1): + self.console.print(f" {idx}. {prov}") + + prov_idx = IntPrompt.ask( + "Provider", + choices=[str(i) for i in range(1, len(providers_with_overrides) + 1)], + ) + provider = providers_with_overrides[prov_idx - 1] + + # Get priority to reset + overrides = current_multipliers.get(provider, {}) + if len(overrides) == 1: + priority = list(overrides.keys())[0] + else: + self.console.print(f"\nOverrides for {provider}: {overrides}") + priority = IntPrompt.ask("Priority level to reset") + + if priority in overrides: + self.priority_multiplier_mgr.remove_multiplier(provider, priority) + default = self.priority_multiplier_mgr.get_effective_multiplier( + provider, priority + ) + self.console.print( + f"\n[green]✅ Reset priority {priority} for '{provider}' to default ({default}x)[/green]" + ) + else: + self.console.print( + f"\n[yellow]No override for priority {priority}[/yellow]" + ) + input("\nPress Enter to continue...") + + def manage_concurrency_limits(self): + """Manage concurrency limits""" + while True: + clear_screen() + + # Get current limits from env + limits = self.concurrency_mgr.get_current_limits() + + self.console.print( + Panel.fit( + "[bold cyan]⚡ Concurrency Limits Configuration[/bold cyan]", + border_style="cyan", + ) + ) + + self.console.print() + self.console.print("[bold]📋 Current Concurrency Settings[/bold]") + self.console.print("━" * 70) + + # Build combined view with pending changes + all_limits: Dict[str, Dict[str, Any]] = {} + prefix = "MAX_CONCURRENT_REQUESTS_PER_KEY_" + + # Add current limits (from env) + for provider, limit in limits.items(): + key = f"{prefix}{provider.upper()}" + change_type = self.settings.get_change_type(key) + if change_type == "remove": + all_limits[provider] = { + "value": str(limit), + "type": "remove", + "old": None, + } + elif change_type == "edit": + new_val = self.settings.pending_changes[key] + all_limits[provider] = { + "value": new_val, + "type": "edit", + "old": str(limit), + } + else: + all_limits[provider] = { + "value": str(limit), + "type": None, + "old": None, + } + + # Add pending new limits (additions) + for key in self.settings.get_pending_keys_by_pattern(prefix=prefix): + if self.settings.get_change_type(key) == "add": + provider = key.replace(prefix, "").lower() + if provider not in all_limits: + all_limits[provider] = { + "value": self.settings.pending_changes[key], + "type": "add", + "old": None, + } + + if all_limits: + # Sort alphabetically + for provider in sorted(all_limits.keys()): + info = all_limits[provider] + value_display = f"{info['value']} requests/key" + old_display = f"{info['old']} requests/key" if info["old"] else None + self.console.print( + self._format_item( + provider, value_display, info["type"], old_display + ) + ) + self.console.print(" • Default: 1 request/key (all others)") + else: + self.console.print(" • Default: 1 request/key (all providers)") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + self.console.print("[bold]⚙️ Actions[/bold]") + self.console.print() + self.console.print(" 1. ➕ Add Concurrency Limit for Provider") + self.console.print(" 2. ✏️ Edit Existing Limit") + self.console.print(" 3. 🗑️ Remove Limit (reset to default)") + self.console.print(" 4. ↩️ Back to Settings Menu") + + self.console.print() + self.console.print("━" * 70) + self.console.print() + + choice = Prompt.ask( + "Select option", choices=["1", "2", "3", "4"], show_choices=False + ) + + if choice == "1": + # Get available providers + available_providers = self.get_available_providers() + + if not available_providers: + self.console.print( + "\n[yellow]No providers with credentials found. Please add credentials first.[/yellow]" + ) + input("\nPress Enter to continue...") + continue + + # Show provider selection menu + self.console.print("\n[bold]Select provider:[/bold]") + for idx, prov in enumerate(available_providers, 1): + self.console.print(f" {idx}. {prov}") + self.console.print( + f" {len(available_providers) + 1}. Enter custom provider name" + ) + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(available_providers) + 2)], + ) + + if choice_idx == len(available_providers) + 1: + provider = Prompt.ask("Provider name").strip().lower() + else: + provider = available_providers[choice_idx - 1] + + if provider: + limit = IntPrompt.ask( + "Max concurrent requests per key (1-100)", default=1 + ) + if 1 <= limit <= 100: + self.concurrency_mgr.set_limit(provider, limit) + self.console.print( + f"\n[green]✅ Concurrency limit staged for '{provider}': {limit} requests/key[/green]" + ) + else: + self.console.print( + "\n[red]❌ Limit must be between 1-100[/red]" + ) + input("\nPress Enter to continue...") + + elif choice == "2": + # Get editable limits (existing + pending additions, excluding pending removals) + editable = { + k: v for k, v in all_limits.items() if v["type"] != "remove" + } + if not editable: + self.console.print("\n[yellow]No limits to edit[/yellow]") + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print("\n[bold]Select provider to edit:[/bold]") + limits_list = sorted(editable.keys()) + for idx, prov in enumerate(limits_list, 1): + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(limits_list) + 1)], + ) + provider = limits_list[choice_idx - 1] + info = editable[provider] + current_limit = int(info["value"]) + + self.console.print(f"\nCurrent limit: {current_limit} requests/key") + new_limit = IntPrompt.ask( + "New limit (1-100) [press Enter to keep current]", + default=current_limit, + ) + + if 1 <= new_limit <= 100: + if new_limit != current_limit: + self.concurrency_mgr.set_limit(provider, new_limit) + self.console.print( + f"\n[green]✅ Concurrency limit updated for '{provider}': {new_limit} requests/key[/green]" + ) + else: + self.console.print("\n[yellow]No changes made[/yellow]") + else: + self.console.print("\n[red]Limit must be between 1-100[/red]") + input("\nPress Enter to continue...") + + elif choice == "3": + # Get removable limits (existing ones not already pending removal) + removable = { + k: v + for k, v in all_limits.items() + if v["type"] != "remove" and v["type"] != "add" + } + # For pending additions, we can "undo" by removing from pending + pending_adds = { + k: v for k, v in all_limits.items() if v["type"] == "add" + } + + if not removable and not pending_adds: + self.console.print("\n[yellow]No limits to remove[/yellow]") + input("\nPress Enter to continue...") + continue + + # Show numbered list + self.console.print( + "\n[bold]Select provider to remove limit from:[/bold]" + ) + limits_list = sorted(removable.keys()) + sorted(pending_adds.keys()) + for idx, prov in enumerate(limits_list, 1): + if prov in pending_adds: + self.console.print( + f" {idx}. {prov} [green](pending add)[/green]" + ) + else: + self.console.print(f" {idx}. {prov}") + + choice_idx = IntPrompt.ask( + "Select option", + choices=[str(i) for i in range(1, len(limits_list) + 1)], + ) + provider = limits_list[choice_idx - 1] + + if Confirm.ask( + f"Remove concurrency limit for '{provider}' (reset to default 1)?" + ): + if provider in pending_adds: + # Undo pending addition + key = f"{prefix}{provider.upper()}" + del self.settings.pending_changes[key] + self.console.print( + f"\n[green]✅ Pending limit for '{provider}' cancelled![/green]" + ) + else: + self.concurrency_mgr.remove_limit(provider) + self.console.print( + f"\n[green]✅ Limit marked for removal for '{provider}'[/green]" + ) + input("\nPress Enter to continue...") + + elif choice == "4": + break + + def _show_changes_summary(self): + """Display categorized summary of all pending changes.""" + self.console.print( + Panel.fit( + "[bold cyan]📋 Pending Changes Summary[/bold cyan]", + border_style="cyan", + ) + ) + self.console.print() + + # Define categories with their key patterns + categories = [ + ("Custom Provider API Bases", "_API_BASE", "suffix"), + ("Model Definitions", "_MODELS", "suffix"), + ("Concurrency Limits", "MAX_CONCURRENT_REQUESTS_PER_KEY_", "prefix"), + ("Rotation Modes", "ROTATION_MODE_", "prefix"), + ("Priority Multipliers", "CONCURRENCY_MULTIPLIER_", "prefix"), + ] + + # Get provider-specific settings keys + provider_settings_keys = set() + for provider_settings in PROVIDER_SETTINGS_MAP.values(): + provider_settings_keys.update(provider_settings.keys()) + + changes = self.settings.get_changes_summary() + displayed_keys = set() + + for category_name, pattern, pattern_type in categories: + category_changes = {"add": [], "edit": [], "remove": []} + + for change_type in ["add", "edit", "remove"]: + for key, old_val, new_val in changes[change_type]: + matches = False + if pattern_type == "suffix" and key.endswith(pattern): + matches = True + elif pattern_type == "prefix" and key.startswith(pattern): + matches = True + + if matches: + category_changes[change_type].append((key, old_val, new_val)) + displayed_keys.add(key) + + # Check if this category has any changes + has_changes = any(category_changes[t] for t in ["add", "edit", "remove"]) + if has_changes: + self.console.print(f"[bold]{category_name}:[/bold]") + # Sort: additions, modifications, removals (alphabetically within each) + for change_type in ["add", "edit", "remove"]: + for key, old_val, new_val in sorted( + category_changes[change_type], key=lambda x: x[0] + ): + if change_type == "add": + self.console.print(f" [green]+ {key} = {new_val}[/green]") + elif change_type == "edit": + self.console.print( + f" [yellow]~ {key}: {old_val} → {new_val}[/yellow]" + ) + else: + self.console.print(f" [red]- {key}[/red]") + self.console.print() + + # Handle provider-specific settings that don't match the patterns above + provider_changes = {"add": [], "edit": [], "remove": []} + for change_type in ["add", "edit", "remove"]: + for key, old_val, new_val in changes[change_type]: + if key not in displayed_keys and key in provider_settings_keys: + provider_changes[change_type].append((key, old_val, new_val)) + + has_provider_changes = any( + provider_changes[t] for t in ["add", "edit", "remove"] + ) + if has_provider_changes: + self.console.print("[bold]Provider-Specific Settings:[/bold]") + for change_type in ["add", "edit", "remove"]: + for key, old_val, new_val in sorted( + provider_changes[change_type], key=lambda x: x[0] + ): + if change_type == "add": + self.console.print(f" [green]+ {key} = {new_val}[/green]") + elif change_type == "edit": + self.console.print( + f" [yellow]~ {key}: {old_val} → {new_val}[/yellow]" + ) + else: + self.console.print(f" [red]- {key}[/red]") + self.console.print() + + self.console.print("━" * 70) + + def save_and_exit(self): + """Save pending changes and exit""" + if self.settings.has_pending(): + clear_screen("Save Changes") + self._show_changes_summary() + + if Confirm.ask("\n[bold yellow]Save all pending changes?[/bold yellow]"): + self.settings.save() + self.console.print("\n[green]✅ All changes saved to .env![/green]") + input("\nPress Enter to return to launcher...") + else: + self.console.print("\n[yellow]Changes not saved[/yellow]") + input("\nPress Enter to continue...") + return + else: + self.console.print("\n[dim]No changes to save[/dim]") + input("\nPress Enter to return to launcher...") + + self.running = False + + def exit_without_saving(self): + """Exit without saving""" + if self.settings.has_pending(): + clear_screen("Exit Without Saving") + self._show_changes_summary() + + if Confirm.ask("\n[bold red]Discard all pending changes?[/bold red]"): + self.settings.discard() + self.console.print("\n[yellow]Changes discarded[/yellow]") + input("\nPress Enter to return to launcher...") + self.running = False + else: + return + else: + self.running = False + + +def run_settings_tool(): + """Entry point for settings tool""" + tool = SettingsTool() + tool.run()