import atexit import os import signal import socket import subprocess import sys import time from pathlib import Path import gradio as gr from smolagents.agents import MultiStepAgent from .ui_helpers import stream_to_gradio from .utils import load_file preview_process = None PREVIEW_PORT = 7861 # Internal port for preview apps last_restart_time = 0 # Track when we last restarted the preview app RESTART_COOLDOWN = 10 # Minimum seconds between restarts def cleanup_preview_on_exit(): """Cleanup function called on program exit.""" print("🧹 Cleaning up preview app on exit...") stop_preview_app() def signal_handler(signum, frame): """Handle shutdown signals gracefully.""" print(f"πŸ”” Received signal {signum}, shutting down gracefully...") cleanup_preview_on_exit() sys.exit(0) # Register signal handlers and exit handler signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) atexit.register(cleanup_preview_on_exit) def find_free_port(start_port=7860, max_ports=100): """Find an available TCP port, starting from a given port.""" for port in range(start_port, start_port + max_ports): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("127.0.0.1", port)) return port except OSError: print(f" Port {port} is in use, trying next...") return None def get_preview_url(): """Get the appropriate preview URL with a cache-busting timestamp.""" # Append a timestamp as a query parameter to break browser cache return f"/preview/?_t={int(time.time() * 1000)}" def is_port_available(port, host="0.0.0.0"): """Check if a port is available for binding.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) return True except OSError: return False def find_app_py_in_sandbox(): """Find app.py file in sandbox folder and its subfolders.""" sandbox_path = Path("sandbox") if not sandbox_path.exists(): return None # Search for app.py files recursively app_files = list(sandbox_path.rglob("app.py")) if not app_files: return None # If multiple app.py files exist, throw an error if len(app_files) > 1: raise ValueError("Multiple app.py files found in sandbox directory") return str(app_files[0]) def save_file(path, new_text): if path is None: gr.Warning("⚠️ No file selected.") try: with open(path, "w", encoding="utf-8") as f: f.write(new_text) gr.Info(f"βœ… Saved to: {path.split('sandbox/')[-1]}") except Exception as e: gr.Error(f"❌ Error saving: {e}") def stop_preview_app(): """Stop the preview app subprocess if it's running.""" global preview_process if preview_process and preview_process.poll() is None: print(f"πŸ›‘ Stopping preview app process (PID: {preview_process.pid})...") try: preview_process.terminate() preview_process.wait(timeout=5) print("βœ… Preview app stopped gracefully.") except subprocess.TimeoutExpired: preview_process.kill() # Wait a bit longer for the kill to take effect try: preview_process.wait(timeout=2) print("⚠️ Preview app force-killed after timeout.") except subprocess.TimeoutExpired: print("⚠️ Preview app may still be running after force-kill attempt.") except Exception as e: print(f"❌ Error stopping preview app: {e}") finally: preview_process = None def start_preview_app(): """Start the preview app in a subprocess if it's not already running.""" global preview_process, last_restart_time # Check if preview app is already running and healthy if preview_process and preview_process.poll() is None: # Verify it's actually responsive on the port try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(1) result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) if result == 0: print( f"βœ… Preview app already running and healthy " f"(PID: {preview_process.pid})" ) # We don't need to return the URL here anymore return True, "Preview is running" except Exception: pass # Check cooldown period to avoid too frequent restarts current_time = time.time() if current_time - last_restart_time < RESTART_COOLDOWN: remaining_cooldown = RESTART_COOLDOWN - (current_time - last_restart_time) print( f"⏳ Preview app restart on cooldown, {remaining_cooldown:.1f}s remaining" ) if preview_process and preview_process.poll() is None: # If there's still a process running, return success return True, "Preview is running" else: return ( False, f"Preview app on cooldown for {remaining_cooldown:.1f} more seconds", ) # Stop any existing process before starting a new one stop_preview_app() # Update restart time last_restart_time = current_time # Wait for the port to become available (up to 5 seconds) for i in range(10): # 10 attempts * 0.5 seconds = 5 seconds max if is_port_available(PREVIEW_PORT): print(f"βœ… Port {PREVIEW_PORT} is available") break print(f"⏳ Port {PREVIEW_PORT} still busy, waiting... (attempt {i + 1}/10)") time.sleep(0.5) else: print(f"❌ Port {PREVIEW_PORT} is still not available after 5 seconds") return False, f"Port {PREVIEW_PORT} is not available" app_file = find_app_py_in_sandbox() if not app_file: return False, "No `app.py` found in the `sandbox` directory." print(f"πŸš€ Starting preview app from `{app_file}` on port {PREVIEW_PORT}...") try: # Change to the directory containing the app file app_dir = str(Path(app_file).parent) preview_process = subprocess.Popen( [ "python", "app.py", "--server-port", str(PREVIEW_PORT), "--server-name", "0.0.0.0", "--root-path", "/preview", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=app_dir, # Set working directory to the app directory ) # Give it a moment to start up time.sleep(3) # Check if process is still running if preview_process.poll() is None: print(f"βœ… Preview app started successfully (PID: {preview_process.pid}).") # Additional check: verify the process is actually listening on the port time.sleep(2) # Give it a bit more time to fully initialize if preview_process.poll() is None: # Check if port is actually being used (reverse of availability check) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(1) result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) if result == 0: print( f"βœ… Preview app is accepting connections on port " f"{PREVIEW_PORT}" ) return True, "Preview is running" else: print( f"❌ Preview app started but not accepting connections " f"on port {PREVIEW_PORT}" ) # Get error output try: stdout, stderr = preview_process.communicate(timeout=1) error_msg = f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" print(f"Process output: {error_msg}") except subprocess.TimeoutExpired: print("Process still running but not responsive") return False, "Preview app not accepting connections" except Exception as e: print(f"❌ Error checking port connection: {e}") return False, f"Error verifying connection: {e}" else: stdout, stderr = preview_process.communicate() error_msg = ( f"Process exited during initialization. " f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" ) print(f"❌ {error_msg}") return False, f"Preview app crashed during startup:\n{error_msg}" else: stdout, stderr = preview_process.communicate() error_msg = f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" print(f"❌ Failed to start preview app. Error:\n{error_msg}") return False, f"Failed to start preview app:\n{error_msg}" except Exception as e: print(f"❌ Exception while starting preview app: {e}") return False, f"Error starting preview app: {e}" def create_iframe_preview(): """Create an iframe that loads the sandbox app.""" print("πŸ” create_iframe_preview() called") # First, check if existing process is healthy if preview_process is not None: healthy, status = check_preview_health() print(f"πŸ” Health check: {status}") if healthy: print("βœ… Preview app is healthy, using existing process") iframe_html = ( f'' ) return iframe_html else: print(f"⚠️ Preview app unhealthy: {status}, attempting restart...") else: print("πŸ” No preview process exists, starting new one") # Try to start the preview app and show an iframe success, message = start_preview_app() print(f"πŸ” start_preview_app() result: success={success}, message={message}") if success: iframe_html = ( f'' ) return iframe_html else: # Show a more user-friendly error message with retry option error_html = f"""

🚧 Preview App Temporarily Unavailable

Status: {message}

The preview app is starting up. Please wait a few seconds and try refreshing.

""" print(f"πŸ” Error in preview: {message}") return error_html def is_preview_running(): """Check if the preview app is running and accessible.""" global preview_process # First check if process exists if preview_process is None or preview_process.poll() is not None: return False # Then check if it's actually responsive on the port try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(1) result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) return result == 0 except Exception: return False def check_preview_health(): """Check if the preview app is healthy and restart if needed.""" global preview_process if preview_process is None: return False, "No preview process" if preview_process.poll() is not None: # Process has exited try: stdout, stderr = preview_process.communicate() error_msg = f"Process exited. STDOUT:\n{stdout}\nSTDERR:\n{stderr}" print(f"🚨 Preview process died: {error_msg}") except Exception as e: print(f"🚨 Preview process died: {e}") preview_process = None return False, "Process died" # Check if responsive with multiple attempts and longer timeout max_attempts = 3 for attempt in range(max_attempts): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(3) # Increased timeout from 1 to 3 seconds result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) if result == 0: return True, "Healthy" else: if attempt < max_attempts - 1: print( f"πŸ” Health check attempt {attempt + 1}/" f"{max_attempts} failed, retrying..." ) time.sleep(1) # Wait before retrying else: return False, "Not responsive on port after multiple attempts" except Exception as e: if attempt < max_attempts - 1: print( f"πŸ” Health check attempt {attempt + 1}/" f"{max_attempts} failed with error: {e}, retrying..." ) time.sleep(1) else: return False, f"Connection check failed: {e}" return False, "Health check failed" def ensure_preview_running(): """Ensure the preview app is running, start it if needed.""" if not is_preview_running(): start_preview_app() def get_default_model_for_provider(provider: str) -> str: """Get the default model ID for a given provider.""" provider_model_map = { "Anthropic": "anthropic/claude-sonnet-4-20250514", "OpenAI": "openai/gpt-4.1", "Mistral": "mistral/devstral-small-latest", # "SambaNova": "sambanova/Qwen3-32B", "Hugging Face": "huggingface/together/Qwen/Qwen2.5-Coder-32B-Instruct", "OpenRouter": "openrouter/deepseek/deepseek-chat-v3-0324:free", } return provider_model_map.get( provider, "huggingface/together/Qwen/Qwen2.5-Coder-32B-Instruct" ) def get_available_providers(): """Detect which API keys are available in environment variables.""" env_vars = { "Anthropic": "ANTHROPIC_API_KEY", "OpenAI": "OPENAI_API_KEY", "Hugging Face": "HUGGINGFACE_API_KEY", # "SambaNova": "SAMBANOVA_API_KEY", "Mistral": "MISTRAL_API_KEY", "OpenRouter": "OPENROUTER_API_KEY", } available_providers = [] for provider, env_var in env_vars.items(): if os.getenv(env_var): available_providers.append(provider) return available_providers def get_default_provider(): """Get the default provider based on available API keys.""" available = get_available_providers() # Priority order for default selection priority_order = ["Anthropic", "OpenAI", "Mistral", "OpenRouter", "Hugging Face"] for provider in priority_order: if provider in available: return provider # If no keys are available, default to Anthropic return "Anthropic" def initialize_model_from_environment(): """Initialize the model configuration based on available API keys in environment.""" default_provider = get_default_provider() available_providers = get_available_providers() current_model_id = os.getenv("MODEL_ID", "") print(f"πŸ” Available providers: {available_providers}") print(f"πŸ” Default provider: {default_provider}") print(f"πŸ” Current MODEL_ID: {current_model_id if current_model_id else 'NOT_SET'}") # If we have available providers, check if current model matches the # default provider if available_providers: expected_model = get_default_model_for_provider(default_provider) # If MODEL_ID doesn't match the expected model for available provider, # override it if current_model_id != expected_model: print( f"πŸ”§ MODEL_ID mismatch: expected {expected_model} for " f"{default_provider}, got {current_model_id}" ) print("πŸ”§ Configuring based on available providers...") # Set the model for the default provider if default_provider != "Hugging Face": env_var_map = { "Anthropic": "ANTHROPIC_API_KEY", "OpenAI": "OPENAI_API_KEY", "SambaNova": "SAMBANOVA_API_KEY", "Mistral": "MISTRAL_API_KEY", "OpenRouter": "OPENROUTER_API_KEY", } env_var_name = env_var_map.get(default_provider) api_key = os.getenv(env_var_name) print( f"πŸ”§ Checking {default_provider} with key: " f"{'SET' if api_key else 'NOT_SET'}" ) if api_key: os.environ["API_KEY"] = api_key os.environ["MODEL_ID"] = expected_model print( f"πŸ”§ Auto-configured model: {default_provider}" f"-> {expected_model}" ) print("πŸ”§ Set API_KEY and MODEL_ID environment variables") return True else: print(f"❌ No API key found for {default_provider}") else: print("πŸ”§ Default provider is Hugging Face, keeping current model") else: print(f"βœ… MODEL_ID matches expected model for {default_provider}") else: print("ℹ️ No available providers detected, keeping current model") return False def save_api_key(provider, api_key): """Save API key to environment variable and update model accordingly.""" if not api_key.strip(): return f"⚠️ Please enter a valid API key for {provider}" # Map provider names to environment variable names env_var_map = { "Anthropic": "ANTHROPIC_API_KEY", "OpenAI": "OPENAI_API_KEY", "Hugging Face": "HUGGINGFACE_API_KEY", "SambaNova": "SAMBANOVA_API_KEY", "Mistral": "MISTRAL_API_KEY", "OpenRouter": "OPENROUTER_API_KEY", } env_var_name = env_var_map.get(provider) if env_var_name: # Always set the provider-specific API key os.environ[env_var_name] = api_key.strip() # For non-Hugging Face providers, also set the generic API_KEY and MODEL_ID # This ensures the main agent uses the correct model and API key if provider != "Hugging Face": os.environ["API_KEY"] = api_key.strip() os.environ["MODEL_ID"] = get_default_model_for_provider(provider) return ( f"βœ… {provider} API key saved successfully \n" f"Model: {get_default_model_for_provider(provider)}" ) else: return f"βœ… {provider} API key saved successfully" else: return f"❌ Unknown provider: {provider}" def get_api_key_status(selected_llm_provider=None): """Get the status of all API keys, highlighting the selected provider.""" if selected_llm_provider is None: selected_llm_provider = get_default_provider() env_vars = { "Hugging Face": "HUGGINGFACE_API_KEY", "Anthropic": "ANTHROPIC_API_KEY", "OpenAI": "OPENAI_API_KEY", "SambaNova": "SAMBANOVA_API_KEY", "Mistral": "MISTRAL_API_KEY", "OpenRouter": "OPENROUTER_API_KEY", } status = [] available_providers = get_available_providers() # Show status for all providers for provider, env_var in env_vars.items(): if os.getenv(env_var): key = os.getenv(env_var) masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" model = get_default_model_for_provider(provider) # Highlight the selected provider if provider == selected_llm_provider: status.append(f"🎯 {provider}: {masked_key} (Model: {model}) [ACTIVE]") else: status.append(f"βœ… {provider}: {masked_key} (Model: {model})") else: model = get_default_model_for_provider(provider) if provider == selected_llm_provider: status.append(f"❌ {provider}: Not set (Would use: {model}) [SELECTED]") else: status.append(f"❌ {provider}: Not set (Would use: {model})") # Show current active model current_model = os.getenv("MODEL_ID", "Qwen/Qwen2.5-Coder-32B-Instruct") status.append(f"πŸ€– Current Active Model: {current_model}") # Show summary if available_providers: status.append(f"πŸ“Š Available providers: {', '.join(available_providers)}") else: status.append("⚠️ No API keys detected in environment") return "\n".join(status) class GradioUI: """A one-line interface to launch your agent in Gradio""" def __init__(self, agent: MultiStepAgent): self.agent = agent self.parent_id = None def interact_with_agent(self, prompt, messages, session_state): import gradio as gr self.parent_id = int(time.time() * 1000) # Get the agent type from the template agent if "agent" not in session_state: session_state["agent"] = self.agent try: messages.append( gr.ChatMessage(role="user", content=prompt, metadata={"status": "done"}) ) messages.append( gr.ChatMessage( role="assistant", content="", metadata={ "id": self.parent_id, "title": "🧠 Thinking...", "status": "pending", }, ) ) start_time = time.time() yield messages for msg in stream_to_gradio( session_state["agent"], task=prompt, reset_agent_memory=False, parent_id=self.parent_id, ): if isinstance(msg, gr.ChatMessage): messages.append(msg) messages[-1].metadata["status"] = "done" if msg.content.startswith("**Final answer:**"): # Remove "**Final answer:**" prefix from the message content if msg.content.startswith("**Final answer:**"): msg.content = msg.content.replace("**Final answer:**\n", "") # Set the parent message status to done when final # answer is reached for message in messages: if ( isinstance(message, gr.ChatMessage) and message.metadata.get("id") == self.parent_id ): message.metadata["status"] = "done" message.metadata["title"] = ( f"🧠 Thought for {time.time() - start_time:.0f} " "sec." ) break elif isinstance(msg, str): # Then it's only a completion delta msg = msg.replace("<", r"\<").replace( ">", r"\>" ) # HTML tags seem to break Gradio Chatbot if messages[-1].metadata["status"] == "pending": messages[-1].content = msg else: messages.append( gr.ChatMessage( role="assistant", content=msg, metadata={"status": "pending"}, ) ) yield messages yield messages except Exception as e: yield messages raise gr.Error(f"Error in interaction: {str(e)}") from e def log_user_message(self, text_input, file_uploads_log): import gradio as gr return ( text_input + ( f"\nYou have been provided with these files, which might be " f"helpful or not: {file_uploads_log}" if len(file_uploads_log) > 0 else "" ), "", gr.Button(interactive=False), ) def launch(self, share: bool = True, **kwargs): self.create_app().launch( debug=True, share=share, **kwargs, ) def create_app(self): import gradio as gr with gr.Blocks( title="πŸ’—Likable", theme=gr.themes.Soft(), fill_height=True, fill_width=True, analytics_enabled=False, ) as demo: gr.Markdown("""# πŸ’—Likable ## ⚠️ IMPORTANT **Please note:** This public demo space is shared among all users - everyone sees the same app and shares the same API key. For this reason, we've disabled the settings tab and are using a free version of DeepSeek V3 from OpenRouter. **For private use or if you encounter rate limits**, we recommend duplicating this space to your own account and configuring it with your personal API keys, by either using Space sectres or uncommenting line 742-784 and 806-817 in `src/app.py` to enable the settings tab. """) with gr.Row(elem_classes="main-container"): # Left side - Chat Interface with gr.Column(scale=1, elem_classes="chat-container"): avatar_url = ( "http://em-content.zobj.net/source/apple/419/" "growing-heart_1f497.png" ) chatbot = gr.Chatbot( avatar_images=(None, avatar_url), type="messages", resizable=True, height=720, ) with gr.Column(): text_input = gr.Textbox( placeholder="Ask Likable...", scale=4, container=False, ) submit_btn = gr.Button("↑", size="sm", variant="primary") # Right side - Preview/Code/Settings Toggle with gr.Column(scale=4, elem_classes="preview-container"): with gr.Tab("Preview"): preview_html = gr.HTML( value=( f'' ), elem_id="preview-container", ) with gr.Tab("Code"): with gr.Row(): save_btn = gr.Button("Save", size="sm") with gr.Row(equal_height=True): file_explorer = gr.FileExplorer( scale=1, file_count="single", value="app.py", root_dir="sandbox", ) code_editor = gr.Code( scale=3, value=load_file("sandbox/app.py"), language="python", visible=True, interactive=True, autocomplete=True, max_lines=39, ) # with gr.Tab("Settings"): # gr.Markdown("## πŸ”‘ API Keys") # gr.Markdown( # "Configure your API keys for different AI providers:" # ) # # API Key Status Display # api_status = gr.Textbox( # label="Current API Key Status", # value=get_api_key_status(), # interactive=False, # lines=6, # max_lines=8, # ) # gr.Markdown("---") # # LLM Token with Provider Selection (now includes Hugging Face) # with gr.Row(): # llm_provider = gr.Dropdown( # label="LLM Provider", # choices=[ # "Anthropic", # "OpenAI", # "Mistral", # "SambaNova", # "Hugging Face", # ], # value=get_default_provider(), # scale=1, # ) # llm_token = gr.Textbox( # label="API Key", # placeholder="Enter your API key...", # type="password", # scale=3, # ) # llm_save_btn = gr.Button("Save", size="sm", scale=1) # # Status message for API key operations # api_message = gr.Textbox( # label="Status", interactive=False, visible=False # ) # Add session state to store session-specific data session_state = gr.State({}) stored_messages = gr.State([]) file_uploads_log = gr.State([]) # Set up event handlers for API key saving def save_and_update_status(provider, api_key, session_state=None): message = save_api_key(provider, api_key) status = get_api_key_status(provider) # For non-Hugging Face providers, recreate the agent if provider != "Hugging Face" and session_state is not None: agent_message = self.recreate_agent_with_new_model( session_state, provider ) if agent_message: message += f"\n{agent_message}" return message, status, "" # Clear the input field # llm_save_btn.click( # lambda provider, key, sess_state: save_and_update_status( # provider, key, sess_state # ), # inputs=[llm_provider, llm_token, session_state], # outputs=[api_message, api_status, llm_token], # ).then(lambda: gr.Textbox(visible=True), outputs=[api_message]) # # Update status when LLM provider dropdown changes # llm_provider.change( # fn=get_api_key_status, inputs=[llm_provider], outputs=[api_status] # ) # Set up event handlers file_explorer.change( fn=load_file, inputs=file_explorer, outputs=code_editor ) def refresh_all_with_preview_restart(): """Refresh everything including forcing a preview app restart to pick up code changes.""" print("πŸ”„ Forcing preview app restart to pick up code changes...") # Force stop the current preview app to pick up code changes stop_preview_app() # Start fresh with new code current_preview = create_iframe_preview() # Update the file explorer and code editor file_explorer_val = gr.FileExplorer( scale=1, file_count="single", value="app.py", root_dir="sandbox", ) code_editor_val = gr.Code( scale=3, value=load_file("sandbox/app.py"), language="python", visible=True, interactive=True, autocomplete=True, ) return file_explorer_val, code_editor_val, current_preview def refresh_all(): # Only refresh preview if it's not currently healthy current_preview = None if preview_process is not None: healthy, status = check_preview_health() if healthy: # Preview is healthy, just return existing iframe current_preview = ( f'' ) else: # Preview needs refresh current_preview = create_iframe_preview() else: # No preview process, create one current_preview = create_iframe_preview() # Then, update the file explorer and code editor file_explorer_val = gr.FileExplorer( scale=1, file_count="single", value="app.py", root_dir="sandbox", ) code_editor_val = gr.Code( scale=3, value=load_file("sandbox/app.py"), language="python", visible=True, interactive=True, autocomplete=True, ) return file_explorer_val, code_editor_val, current_preview save_btn.click( fn=save_file, inputs=[file_explorer, code_editor], ).then( fn=refresh_all_with_preview_restart, outputs=[file_explorer, code_editor, preview_html], ) text_input.submit( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, submit_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ).then( fn=refresh_all_with_preview_restart, outputs=[file_explorer, code_editor, preview_html], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Ask Likable...", ), gr.Button(interactive=True), ), None, [text_input, submit_btn], ) submit_btn.click( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, submit_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ).then( fn=refresh_all_with_preview_restart, outputs=[file_explorer, code_editor, preview_html], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Ask Likable....", ), gr.Button(interactive=True), ), None, [text_input, submit_btn], ) # Load the preview iframe when the app starts demo.load(fn=create_iframe_preview, outputs=[preview_html]) # Note: Removed demo.unload(stop_preview_app) as it was causing # preview app restarts on every page reload, leading to 502 errors. # We have proper cleanup via signal handlers and atexit handlers. return demo def recreate_agent_with_new_model(self, session_state, provider=None): """Recreate the agent with updated model configuration.""" from .kiss_agent import KISSAgent # Get the new model ID if provider is specified if provider and provider != "Hugging Face": model_id = get_default_model_for_provider(provider) # Get API key from provider-specific environment variable env_var_map = { "Anthropic": "ANTHROPIC_API_KEY", "OpenAI": "OPENAI_API_KEY", "SambaNova": "SAMBANOVA_API_KEY", "Mistral": "MISTRAL_API_KEY", "OpenRouter": "OPENROUTER_API_KEY", } env_var_name = env_var_map.get(provider) api_key = os.getenv(env_var_name) if env_var_name else None if not api_key: return f"❌ No API key found for {provider}" # Create new agent with updated model new_agent = KISSAgent(model_id=model_id, api_key=api_key) session_state["agent"] = new_agent return f"πŸ”„ Agent updated to use {provider} model: {model_id}" return "" if __name__ == "__main__": import sys from .kiss_agent import KISSAgent # Initialize model configuration based on available API keys print("πŸ” Checking for available API keys...") initialize_model_from_environment() # Create agent with explicit parameters from the updated environment model_id = os.getenv("MODEL_ID", "Qwen/Qwen2.5-Coder-32B-Instruct") api_key = os.getenv("API_KEY") print(f"πŸ€– Creating agent with model: {model_id}") print(f"πŸ”‘ Using API key: {'SET' if api_key else 'NOT_SET'}") agent = KISSAgent(model_id=model_id, api_key=api_key) # Start the preview app automatically when the main app starts print("πŸš€ Starting preview app automatically...") success, message = start_preview_app() if success: print(f"βœ… Preview app started: {message}") else: print(f"❌ Failed to start preview app: {message}") # Parse command line arguments for server configuration server_port_arg = 7860 # default server_name = "127.0.0.1" # default if "--server-port" in sys.argv: port_idx = sys.argv.index("--server-port") if port_idx + 1 < len(sys.argv): server_port_arg = int(sys.argv[port_idx + 1]) if "--server-name" in sys.argv: name_idx = sys.argv.index("--server-name") if name_idx + 1 < len(sys.argv): server_name = sys.argv[name_idx + 1] # Find an available port for the main app, starting with the desired one server_port = find_free_port(server_port_arg) if server_port is None: print(f"❌ Could not find any available ports starting from {server_port_arg}") sys.exit(1) if server_port != server_port_arg: print(f"⚠️ Port {server_port_arg} was busy. Running on free port: {server_port}") GradioUI(agent).launch( share=False, server_port=server_port, server_name=server_name )