Spaces:
Sleeping
Sleeping
| # File: enhanced_gradio_interface.py | |
| import asyncio | |
| from collections import defaultdict | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| from threading import Lock | |
| import threading | |
| import json | |
| import os | |
| import queue | |
| import traceback | |
| import uuid | |
| from typing import Coroutine, Dict, List, Any, Optional, Callable | |
| from dataclasses import dataclass | |
| from queue import Queue, Empty | |
| from threading import Lock, Event, Thread | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| import gradio as gr | |
| from openai import AsyncOpenAI, OpenAI | |
| import pyttsx3 | |
| from rich.console import Console | |
| BASE_URL="http://localhost:1234/v1" | |
| BASE_API_KEY="not-needed" | |
| BASE_CLIENT = AsyncOpenAI( | |
| base_url=BASE_URL, | |
| api_key=BASE_API_KEY | |
| ) # Global state for client | |
| BASEMODEL_ID = "leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf" # Global state for selected model ID | |
| CLIENT =OpenAI( | |
| base_url=BASE_URL, | |
| api_key=BASE_API_KEY | |
| ) # Global state for client | |
| # --- Global Variables (if needed) --- | |
| console = Console() | |
| # Example global client if needed elsewhere, adjust based on your setup | |
| # BASE_CLIENT = AsyncOpenAI(base_url=DEFAULT_BASE_URL, api_key=DEFAULT_API_KEY) | |
| # CLIENT = OpenAI(base_url=DEFAULT_BASE_URL, api_key=DEFAULT_API_KEY) | |
| # --- Dataclasses (copied from your original code or imported) --- | |
| class LLMMessage: | |
| role: str | |
| content: str | |
| message_id: str = None | |
| conversation_id: str = None | |
| timestamp: float = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.message_id is None: | |
| self.message_id = str(uuid.uuid4()) | |
| if self.timestamp is None: | |
| self.timestamp = time.time() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class LLMRequest: | |
| message: LLMMessage | |
| response_event: str = None | |
| callback: Callable = None | |
| def __post_init__(self): | |
| if self.response_event is None: | |
| self.response_event = f"llm_response_{self.message.message_id}" | |
| class LLMResponse: | |
| message: LLMMessage | |
| request_id: str | |
| success: bool = True | |
| error: str = None | |
| # --- Event Manager (copied from your original code or imported) --- | |
| class EventManager: | |
| def __init__(self): | |
| self._handlers = defaultdict(list) | |
| self._lock = threading.Lock() | |
| def register(self, event: str, handler: Callable): | |
| with self._lock: | |
| self._handlers[event].append(handler) | |
| def unregister(self, event: str, handler: Callable): | |
| with self._lock: | |
| if event in self._handlers and handler in self._handlers[event]: | |
| self._handlers[event].remove(handler) | |
| def raise_event(self, event: str, data: Any): | |
| with self._lock: | |
| handlers = self._handlers[event][:] | |
| for handler in handlers: | |
| try: | |
| handler(data) | |
| except Exception as e: | |
| console.log(f"Error in event handler for {event}: {e}", style="bold red") | |
| EVENT_MANAGER = EventManager() | |
| def RegisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.register(event, handler) | |
| def RaiseEvent(event: str, data: Any): | |
| EVENT_MANAGER.raise_event(event, data) | |
| def UnregisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.unregister(event, handler) | |
| class LLMAgent: | |
| """Main Agent Driver ! | |
| Agent For Multiple messages at once , | |
| has a message queing service as well as agenerator method for easy intergration with console | |
| applications as well as ui !""" | |
| def __init__( | |
| self, | |
| model_id: str = BASEMODEL_ID, | |
| system_prompt: str = None, | |
| max_queue_size: int = 1000, | |
| max_retries: int = 3, | |
| timeout: int = 30000, | |
| max_tokens: int = 5000, | |
| temperature: float = 0.3, | |
| base_url: str = "http://localhost:1234/v1", | |
| api_key: str = "not-needed", | |
| generate_fn: Callable[[List[Dict[str, str]]], Coroutine[Any, Any, str]] = None | |
| ): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt or "You are a helpful AI assistant." | |
| self.request_queue = Queue(maxsize=max_queue_size) | |
| self.max_retries = max_retries | |
| self.timeout = timeout | |
| self.is_running = False | |
| self._stop_event = Event() | |
| self.processing_thread = None | |
| # Conversation tracking | |
| self.conversations: Dict[str, List[LLMMessage]] = {} | |
| self.max_history_length = 20 | |
| self._generate = generate_fn or self._default_generate | |
| self.api_key = api_key | |
| self.base_url = base_url | |
| self.max_tokens = max_tokens | |
| self.temperature = temperature | |
| self.async_client = self.CreateClient(base_url, api_key) | |
| # Active requests waiting for responses | |
| self.pending_requests: Dict[str, LLMRequest] = {} | |
| self.pending_requests_lock = Lock() | |
| # Register internal event handlers | |
| self._register_event_handlers() | |
| # Speech synthesis | |
| try: | |
| self.tts_engine = pyttsx3.init() | |
| self.setup_tts() | |
| self.speech_enabled = True | |
| except Exception as e: | |
| console.log(f"[yellow]TTS not available: {e}[/yellow]") | |
| self.speech_enabled = False | |
| console.log("[bold green]🚀 Enhanced LLM Agent Initialized[/bold green]") | |
| # Start the processing thread immediately | |
| self.start() | |
| def setup_tts(self): | |
| """Configure text-to-speech engine""" | |
| if hasattr(self, 'tts_engine'): | |
| voices = self.tts_engine.getProperty('voices') | |
| if voices: | |
| self.tts_engine.setProperty('voice', voices[0].id) | |
| self.tts_engine.setProperty('rate', 150) | |
| self.tts_engine.setProperty('volume', 0.8) | |
| def speak(self, text: str): | |
| """Convert text to speech in a non-blocking way""" | |
| if not hasattr(self, 'speech_enabled') or not self.speech_enabled: | |
| return | |
| def _speak(): | |
| try: | |
| # Clean text for speech (remove markdown, code blocks) | |
| clean_text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) | |
| clean_text = re.sub(r'`.*?`', '', clean_text) | |
| clean_text = clean_text.strip() | |
| if clean_text: | |
| self.tts_engine.say(clean_text) | |
| self.tts_engine.runAndWait() | |
| else: | |
| self.tts_engine.say(text) | |
| self.tts_engine.runAndWait() | |
| except Exception as e: | |
| console.log(f"[red]TTS Error: {e}[/red]") | |
| thread = threading.Thread(target=_speak, daemon=True) | |
| thread.start() | |
| async def _default_generate(self, messages: List[Dict[str, str]]) -> str: | |
| """Default generate function if none provided""" | |
| return await self.openai_generate(messages) | |
| def _register_event_handlers(self): | |
| """Register internal event handlers for response routing""" | |
| RegisterEvent("llm_internal_response", self._handle_internal_response) | |
| def _handle_internal_response(self, response: LLMResponse): | |
| """Route responses to the appropriate request handlers""" | |
| console.log(f"[bold cyan]Handling internal response for: {response.request_id}[/bold cyan]") | |
| request = None | |
| with self.pending_requests_lock: | |
| if response.request_id in self.pending_requests: | |
| request = self.pending_requests[response.request_id] | |
| del self.pending_requests[response.request_id] | |
| console.log(f"Found pending request for: {response.request_id}") | |
| else: | |
| console.log(f"No pending request found for: {response.request_id}", style="yellow") | |
| return | |
| # Raise the specific response event | |
| if request.response_event: | |
| console.log(f"[bold green]Raising event: {request.response_event}[/bold green]") | |
| RaiseEvent(request.response_event, response) | |
| # Call callback if provided | |
| if request.callback: | |
| try: | |
| console.log(f"[bold yellow]Calling callback for: {response.request_id}[/bold yellow]") | |
| request.callback(response) | |
| except Exception as e: | |
| console.log(f"Error in callback: {e}", style="bold red") | |
| def _add_to_conversation_history(self, conversation_id: str, message: LLMMessage): | |
| """Add message to conversation history""" | |
| if conversation_id not in self.conversations: | |
| self.conversations[conversation_id] = [] | |
| self.conversations[conversation_id].append(message) | |
| # Trim history if too long | |
| if len(self.conversations[conversation_id]) > self.max_history_length * 2: | |
| self.conversations[conversation_id] = self.conversations[conversation_id][-(self.max_history_length * 2):] | |
| def _build_messages_from_conversation(self, conversation_id: str, new_message: LLMMessage) -> List[Dict[str, str]]: | |
| """Build message list from conversation history""" | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history | |
| if conversation_id in self.conversations: | |
| for msg in self.conversations[conversation_id][-self.max_history_length:]: | |
| messages.append({"role": msg.role, "content": msg.content}) | |
| # Add the new message | |
| messages.append({"role": new_message.role, "content": new_message.content}) | |
| return messages | |
| def _process_llm_request(self, request: LLMRequest): | |
| """Process a single LLM request""" | |
| console.log(f"[bold green]Processing LLM request: {request.message.message_id}[/bold green]") | |
| try: | |
| # Build messages for LLM | |
| messages = self._build_messages_from_conversation( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| console.log(f"Calling LLM with {len(messages)} messages") | |
| # Call LLM - Use sync call for thread compatibility | |
| response_content = self._call_llm_sync(messages) | |
| console.log(f"[bold green]LLM response received: {response_content}...[/bold green]") | |
| # Create response message | |
| response_message = LLMMessage( | |
| role="assistant", | |
| content=response_content, | |
| conversation_id=request.message.conversation_id, | |
| metadata={"request_id": request.message.message_id} | |
| ) | |
| # Update conversation history | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| response_message | |
| ) | |
| # Create and send response | |
| response = LLMResponse( | |
| message=response_message, | |
| request_id=request.message.message_id, | |
| success=True | |
| ) | |
| console.log(f"[bold blue]Sending internal response for: {request.message.message_id}[/bold blue]") | |
| RaiseEvent("llm_internal_response", response) | |
| except Exception as e: | |
| console.log(f"[bold red]Error processing LLM request: {e}[/bold red]") | |
| traceback.print_exc() | |
| # Create error response | |
| error_response = LLMResponse( | |
| message=LLMMessage( | |
| role="system", | |
| content=f"Error: {str(e)}", | |
| conversation_id=request.message.conversation_id | |
| ), | |
| request_id=request.message.message_id, | |
| success=False, | |
| error=str(e) | |
| ) | |
| RaiseEvent("llm_internal_response", error_response) | |
| def _call_llm_sync(self, messages: List[Dict[str, str]]) -> str: | |
| """Sync call to the LLM with retry logic""" | |
| console.log(f"Making LLM call to {self.model_id}") | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = CLIENT.chat.completions.create( | |
| model=self.model_id, | |
| messages=messages, | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| content = response.choices[0].message.content | |
| console.log(f"LLM call successful, response length: {len(content)}") | |
| return content | |
| except Exception as e: | |
| console.log(f"LLM call attempt {attempt + 1} failed: {e}") | |
| if attempt == self.max_retries - 1: | |
| raise e | |
| # Wait before retry | |
| def _process_queue(self): | |
| """Main queue processing loop""" | |
| console.log("[bold cyan]LLM Agent queue processor started[/bold cyan]") | |
| while not self._stop_event.is_set(): | |
| try: | |
| request = self.request_queue.get(timeout=1.0) | |
| if request: | |
| console.log(f"Got request from queue: {request.message.message_id}") | |
| self._process_llm_request(request) | |
| self.request_queue.task_done() | |
| except Empty: | |
| continue | |
| except Exception as e: | |
| console.log(f"Error in queue processing: {e}", style="bold red") | |
| traceback.print_exc() | |
| console.log("[bold cyan]LLM Agent queue processor stopped[/bold cyan]") | |
| def send_message( | |
| self, | |
| content: str, | |
| role: str = "user", | |
| conversation_id: str = None, | |
| response_event: str = None, | |
| callback: Callable = None, | |
| metadata: Dict = None | |
| ) -> str: | |
| """Send a message to the LLM and get response via events""" | |
| if not self.is_running: | |
| raise RuntimeError("LLM Agent is not running. Call start() first.") | |
| # Create message | |
| message = LLMMessage( | |
| role=role, | |
| content=content, | |
| conversation_id=conversation_id, | |
| metadata=metadata or {} | |
| ) | |
| # Create request | |
| request = LLMRequest( | |
| message=message, | |
| response_event=response_event, | |
| callback=callback | |
| ) | |
| # Store in pending requests BEFORE adding to queue | |
| with self.pending_requests_lock: | |
| self.pending_requests[message.message_id] = request | |
| console.log(f"Added to pending requests: {message.message_id}") | |
| # Add to queue | |
| try: | |
| self.request_queue.put(request, timeout=5.0) | |
| console.log(f"[bold magenta]Message queued: {message.message_id}, Content: {content[:50]}...[/bold magenta]") | |
| return message.message_id | |
| except queue.Full: | |
| console.log(f"[bold red]Queue full, cannot send message[/bold red]") | |
| with self.pending_requests_lock: | |
| if message.message_id in self.pending_requests: | |
| del self.pending_requests[message.message_id] | |
| raise RuntimeError("LLM Agent queue is full") | |
| async def chat(self, messages: List[Dict[str, str]]) -> str: | |
| """ | |
| Async chat method that sends message via queue and returns response string. | |
| This is the main method you should use. | |
| """ | |
| # Create future for the response | |
| loop = asyncio.get_event_loop() | |
| response_future = loop.create_future() | |
| def chat_callback(response: LLMResponse): | |
| """Callback when LLM responds - thread-safe""" | |
| console.log(f"[bold yellow]✓ CHAT CALLBACK TRIGGERED![/bold yellow]") | |
| if not response_future.done(): | |
| if response.success: | |
| content = response.message.content | |
| console.log(f"Callback received content: {content}...") | |
| # Schedule setting the future result on the main event loop | |
| loop.call_soon_threadsafe(response_future.set_result, content) | |
| else: | |
| console.log(f"Error in response: {response.error}") | |
| error_msg = f"❌ Error: {response.error}" | |
| loop.call_soon_threadsafe(response_future.set_result, error_msg) | |
| else: | |
| console.log(f"[bold red]Future already done, ignoring callback[/bold red]") | |
| console.log(f"Sending message to LLM agent...") | |
| # Extract the actual message content from the messages list | |
| user_message = "" | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| user_message = msg.get("content", "") | |
| break | |
| if not user_message.strip(): | |
| return "" | |
| # Send message with callback using the queue system | |
| try: | |
| message_id = self.send_message( | |
| content=user_message, | |
| conversation_id="default", | |
| callback=chat_callback | |
| ) | |
| console.log(f"Message sent with ID: {message_id}, waiting for response...") | |
| # Wait for the response and return it | |
| try: | |
| response = await asyncio.wait_for(response_future, timeout=self.timeout) | |
| console.log(f"[bold green]✓ Chat complete! Response length: {len(response)}[/bold green]") | |
| return response | |
| except asyncio.TimeoutError: | |
| console.log("[bold red]Response timeout[/bold red]") | |
| # Clean up the pending request | |
| with self.pending_requests_lock: | |
| if message_id in self.pending_requests: | |
| del self.pending_requests[message_id] | |
| return "❌ Response timeout - check if LLM server is running" | |
| except Exception as e: | |
| console.log(f"[bold red]Error sending message: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"❌ Error sending message: {e}" | |
| def start(self): | |
| """Start the LLM agent""" | |
| if not self.is_running: | |
| self.is_running = True | |
| self._stop_event.clear() | |
| self.processing_thread = Thread(target=self._process_queue, daemon=True) | |
| self.processing_thread.start() | |
| console.log("[bold green]LLM Agent started[/bold green]") | |
| def stop(self): | |
| """Stop the LLM agent""" | |
| console.log("Stopping LLM Agent...") | |
| self._stop_event.set() | |
| if self.processing_thread and self.processing_thread.is_alive(): | |
| self.processing_thread.join(timeout=10) | |
| self.is_running = False | |
| console.log("LLM Agent stopped") | |
| def get_conversation_history(self, conversation_id: str = "default") -> List[LLMMessage]: | |
| """Get conversation history""" | |
| return self.conversations.get(conversation_id, [])[:] | |
| def clear_conversation(self, conversation_id: str = "default"): | |
| """Clear conversation history""" | |
| if conversation_id in self.conversations: | |
| del self.conversations[conversation_id] | |
| async def _chat(self, messages: List[Dict[str, str]]) -> str: | |
| return await self._generate(messages) | |
| async def openai_generate(messages: List[Dict[str, str]], max_tokens: int = 8096, temperature: float = 0.4, model: str = BASEMODEL_ID,tools=None) -> str: | |
| """Static method for generating responses using OpenAI API""" | |
| try: | |
| resp = await BASE_CLIENT.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| tools=tools | |
| ) | |
| response_text = resp.choices[0].message.content or "" | |
| return response_text | |
| except Exception as e: | |
| console.log(f"[bold red]Error in openai_generate: {e}[/bold red]") | |
| return f"[LLM_Agent Error - openai_generate: {str(e)}]" | |
| async def _call_(self, messages: List[Dict[str, str]]) -> str: | |
| """Internal call method using instance client""" | |
| try: | |
| resp = await self.async_client.chat.completions.create( | |
| model=self.model_id, | |
| messages=messages, | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| response_text = resp.choices[0].message.content or "" | |
| return response_text | |
| except Exception as e: | |
| console.log(f"[bold red]Error in _call_: {e}[/bold red]") | |
| return f"[LLM_Agent Error - _call_: {str(e)}]" | |
| def CreateClient(base_url: str, api_key: str) -> AsyncOpenAI: | |
| '''Create async OpenAI Client required for multi tasking''' | |
| return AsyncOpenAI( | |
| base_url=base_url, | |
| api_key=api_key | |
| ) | |
| async def fetch_available_models(base_url: str, api_key: str) -> List[str]: | |
| """Fetches available models from the OpenAI API.""" | |
| try: | |
| async_client = AsyncOpenAI(base_url=base_url, api_key=api_key) | |
| models = await async_client.models.list() | |
| model_choices = [model.id for model in models.data] | |
| return model_choices | |
| except Exception as e: | |
| console.log(f"[bold red]LLM_Agent Error fetching models: {e}[/bold red]") | |
| return ["LLM_Agent Error fetching models"] | |
| def get_models(self) -> List[str]: | |
| """Get available models using instance credentials""" | |
| return asyncio.run(self.fetch_available_models(self.base_url, self.api_key)) | |
| def get_queue_size(self) -> int: | |
| """Get current queue size""" | |
| return self.request_queue.qsize() | |
| def get_pending_requests_count(self) -> int: | |
| """Get number of pending requests""" | |
| with self.pending_requests_lock: | |
| return len(self.pending_requests) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get agent status information""" | |
| return { | |
| "is_running": self.is_running, | |
| "queue_size": self.get_queue_size(), | |
| "pending_requests": self.get_pending_requests_count(), | |
| "conversations_count": len(self.conversations), | |
| "model": self.model_id | |
| } | |
| class AI_Agent: | |
| def __init__(self, model_id: str, system_prompt: str = "You are a helpful assistant. Respond concisely in 1-2 sentences.", history: List[Dict] = None): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt | |
| self.history = history or [] | |
| self.conversation_id = f"conv_{uuid.uuid4().hex[:8]}" | |
| # Create agent instance | |
| self.client = LLMAgent( | |
| model_id=model_id, | |
| system_prompt=self.system_prompt, | |
| generate_fn=LLMAgent.openai_generate | |
| ) | |
| console.log(f"[bold green]✓ MyAgent initialized with model: {model_id}[/bold green]") | |
| async def call_llm(self, messages: List[Dict], use_history: bool = True) -> str: | |
| """ | |
| Send messages to LLM and get response | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| use_history: Whether to include conversation history | |
| Returns: | |
| str: LLM response | |
| """ | |
| try: | |
| console.log(f"[bold yellow]Sending {len(messages)} messages to LLM (use_history: {use_history})...[/bold yellow]") | |
| # Enhance messages based on history setting | |
| enhanced_messages = await self._enhance_messages(messages, use_history) | |
| response = await self.client.chat(enhanced_messages) | |
| console.log(f"[bold green]✓ Response received ({len(response)} chars)[/bold green]") | |
| # Update conversation history ONLY if we're using history | |
| if use_history: | |
| self._update_history(messages, response) | |
| return response | |
| except Exception as e: | |
| console.log(f"[bold red]✗ ERROR: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"Error: {str(e)}" | |
| async def _enhance_messages(self, messages: List[Dict], use_history: bool) -> List[Dict]: | |
| """Enhance messages with system prompt and optional history""" | |
| enhanced = [] | |
| # Add system prompt if not already in messages | |
| has_system = any(msg.get('role') == 'system' for msg in messages) | |
| if not has_system and self.system_prompt: | |
| enhanced.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history only if requested | |
| if use_history and self.history: | |
| enhanced.extend(self.history[-10:]) # Last 10 messages for context | |
| # Add current messages | |
| enhanced.extend(messages) | |
| return enhanced | |
| def _update_history(self, messages: List[Dict], response: str): | |
| """Update conversation history with new exchange""" | |
| # Add user messages to history | |
| for msg in messages: | |
| if msg.get('role') in ['user', 'assistant']: | |
| self.history.append(msg) | |
| # Add assistant response to history | |
| self.history.append({"role": "assistant", "content": response}) | |
| # Keep history manageable (last 20 exchanges) | |
| if len(self.history) > 40: # 20 user + 20 assistant messages | |
| self.history = self.history[-40:] | |
| async def simple_query(self, query: str) -> str: | |
| """Simple one-shot query method - NO history/context""" | |
| messages = [{"role": "user", "content": query}] | |
| return await self.call_llm(messages, use_history=False) | |
| async def multi_turn_chat(self, user_input: str) -> str: | |
| """Multi-turn chat that maintains context across calls""" | |
| messages = [{"role": "user", "content": user_input}] | |
| response = await self.call_llm(messages, use_history=True) | |
| return response | |
| def get_conversation_summary(self) -> Dict: | |
| """Get conversation summary""" | |
| return { | |
| "conversation_id": self.conversation_id, | |
| "total_messages": len(self.history), | |
| "user_messages": len([msg for msg in self.history if msg.get('role') == 'user']), | |
| "assistant_messages": len([msg for msg in self.history if msg.get('role') == 'assistant']), | |
| "recent_exchanges": self.history[-4:] if self.history else [] | |
| } | |
| def clear_history(self): | |
| """Clear conversation history""" | |
| self.history.clear() | |
| console.log("[bold yellow]Conversation history cleared[/bold yellow]") | |
| def update_system_prompt(self, new_prompt: str): | |
| """Update the system prompt""" | |
| self.system_prompt = new_prompt | |
| console.log(f"[bold blue]System prompt updated[/bold blue]") | |
| def stop(self): | |
| """Stop the client gracefully""" | |
| if hasattr(self, 'client') and self.client: | |
| self.client.stop() | |
| console.log("[bold yellow]MyAgent client stopped[/bold yellow]") | |
| async def contextual_query(self, query: str, context_messages: List[Dict] = None, | |
| context_text: str = None, context_files: List[str] = None) -> str: | |
| """ | |
| Query with specific context but doesn't update main history | |
| Args: | |
| query: The user question | |
| context_messages: List of message dicts for context | |
| context_text: Plain text context (will be converted to system message) | |
| context_files: List of file paths to read and include as context | |
| """ | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Handle different context types | |
| if context_messages: | |
| messages.extend(context_messages) | |
| if context_text: | |
| messages.append({"role": "system", "content": f"Additional context: {context_text}"}) | |
| if context_files: | |
| file_context = await self._read_files_context(context_files) | |
| if file_context: | |
| messages.append({"role": "system", "content": f"File contents:\n{file_context}"}) | |
| # Add the actual query | |
| messages.append({"role": "user", "content": query}) | |
| return await self.call_llm(messages, use_history=False) | |
| async def _read_files_context(self, file_paths: List[str]) -> str: | |
| """Read multiple files and return as context string""" | |
| contexts = [] | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| contexts.append(f"--- {os.path.basename(file_path)} ---\n{content}") | |
| else: | |
| console.log(f"[bold yellow]File not found: {file_path}[/bold yellow]") | |
| except Exception as e: | |
| console.log(f"[bold red]Error reading file {file_path}: {e}[/bold red]") | |
| return "\n\n".join(contexts) if contexts else "" | |
| async def query_with_code_context(self, query: str, code_snippets: List[str] = None, | |
| code_files: List[str] = None) -> str: | |
| """ | |
| Specialized contextual query for code-related questions | |
| """ | |
| code_context = "CODE CONTEXT:\n" | |
| if code_snippets: | |
| for i, snippet in enumerate(code_snippets, 1): | |
| code_context += f"\nSnippet {i}:\n```\n{snippet}\n```\n" | |
| if code_files: | |
| # Read code files and include them | |
| for file_path in code_files: | |
| if file_path.endswith(('.py', '.js', '.java', '.cpp', '.c', '.html', '.css')): | |
| code_context += f"\nFile: {file_path}\n```\n" | |
| try: | |
| with open(file_path, 'r') as f: | |
| code_context += f.read() | |
| except Exception as e: | |
| code_context += f"Error reading file: {e}" | |
| code_context += "\n```\n" | |
| return await self.contextual_query(query, context_text=code_context) | |
| async def multi_context_query(self, query: str, contexts: Dict[str, Any]) -> str: | |
| """ | |
| Advanced contextual query with multiple context types | |
| Args: | |
| query: The user question | |
| contexts: Dict with various context types | |
| - 'messages': List of message dicts | |
| - 'text': Plain text context | |
| - 'files': List of file paths | |
| - 'urls': List of URLs | |
| - 'code': List of code snippets or files | |
| - 'metadata': Any additional metadata | |
| """ | |
| all_context_messages = [] | |
| # Build context from different sources | |
| if contexts.get('text'): | |
| all_context_messages.append({"role": "system", "content": f"Context: {contexts['text']}"}) | |
| if contexts.get('messages'): | |
| all_context_messages.extend(contexts['messages']) | |
| if contexts.get('files'): | |
| file_context = await self._read_files_context(contexts['files']) | |
| if file_context: | |
| all_context_messages.append({"role": "system", "content": f"File Contents:\n{file_context}"}) | |
| if contexts.get('code'): | |
| code_context = "\n".join([f"Code snippet {i}:\n```\n{code}\n```" | |
| for i, code in enumerate(contexts['code'], 1)]) | |
| all_context_messages.append({"role": "system", "content": f"Code Context:\n{code_context}"}) | |
| if contexts.get('metadata'): | |
| all_context_messages.append({"role": "system", "content": f"Metadata: {contexts['metadata']}"}) | |
| return await self.contextual_query(query, context_messages=all_context_messages) | |
| console = Console() | |
| # --- Canvas Artifact Support --- | |
| class CanvasArtifact: | |
| id: str | |
| type: str # 'code', 'diagram', 'text', 'image' | |
| content: str | |
| title: str | |
| timestamp: float | |
| metadata: Dict[str, Any] | |
| class EnhancedAIAgent: | |
| """ | |
| Wrapper around your AI_Agent that adds canvas/artifact management | |
| without modifying the original agent. | |
| """ | |
| def __init__(self, ai_agent): | |
| self.agent = ai_agent | |
| self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = {} | |
| self.max_canvas_artifacts = 50 | |
| console.log("[bold green]✓ Enhanced AI Agent wrapper initialized[/bold green]") | |
| def add_artifact_to_canvas(self, conversation_id: str, content: str, | |
| artifact_type: str = "code", title: str = None): | |
| """Add artifacts to the collaborative canvas""" | |
| if conversation_id not in self.canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = [] | |
| artifact = CanvasArtifact( | |
| id=str(uuid.uuid4())[:8], | |
| type=artifact_type, | |
| content=content, | |
| title=title or f"{artifact_type}_{len(self.canvas_artifacts[conversation_id]) + 1}", | |
| timestamp=time.time(), | |
| metadata={"conversation_id": conversation_id} | |
| ) | |
| self.canvas_artifacts[conversation_id].append(artifact) | |
| # Keep only recent artifacts | |
| if len(self.canvas_artifacts[conversation_id]) > self.max_canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = self.canvas_artifacts[conversation_id][-self.max_canvas_artifacts:] | |
| console.log(f"[green]Added artifact to canvas: {artifact.title}[/green]") | |
| return artifact | |
| def get_canvas_context(self, conversation_id: str) -> str: | |
| """Get formatted canvas context for LLM prompts""" | |
| if conversation_id not in self.canvas_artifacts or not self.canvas_artifacts[conversation_id]: | |
| return "" | |
| context_lines = ["\n=== COLLABORATIVE CANVAS ARTIFACTS ==="] | |
| for artifact in self.canvas_artifacts[conversation_id][-10:]: # Last 10 artifacts | |
| context_lines.append(f"\n--- {artifact.title} [{artifact.type.upper()}] ---") | |
| preview = artifact.content[:500] + "..." if len(artifact.content) > 500 else artifact.content | |
| context_lines.append(preview) | |
| return "\n".join(context_lines) + "\n=================================\n" | |
| async def chat_with_canvas(self, message: str, conversation_id: str = "default", | |
| include_canvas: bool = True) -> str: | |
| """Enhanced chat that includes canvas context""" | |
| # Build context with canvas artifacts if requested | |
| full_message = message | |
| if include_canvas: | |
| canvas_context = self.get_canvas_context(conversation_id) | |
| if canvas_context: | |
| full_message = f"{canvas_context}\n\nUser Query: {message}" | |
| try: | |
| # Use your original agent's multi_turn_chat method | |
| response = await self.agent.multi_turn_chat(full_message) | |
| # Auto-extract and add code artifacts to canvas | |
| self._extract_artifacts_to_canvas(response, conversation_id) | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error in chat_with_canvas: {str(e)}" | |
| console.log(f"[red]{error_msg}[/red]") | |
| return error_msg | |
| def _extract_artifacts_to_canvas(self, response: str, conversation_id: str): | |
| """Automatically extract code blocks and add to canvas""" | |
| # Find all code blocks with optional language specification | |
| code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', response, re.DOTALL) | |
| for i, (lang, code_block) in enumerate(code_blocks): | |
| if len(code_block.strip()) > 10: # Only add substantial code blocks | |
| self.add_artifact_to_canvas( | |
| conversation_id, | |
| code_block.strip(), | |
| "code", | |
| f"code_snippet_{lang or 'unknown'}_{len(self.canvas_artifacts.get(conversation_id, [])) + 1}" | |
| ) | |
| def get_canvas_summary(self, conversation_id: str) -> List[Dict]: | |
| """Get summary of canvas artifacts for display""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return [] | |
| artifacts = [] | |
| for artifact in reversed(self.canvas_artifacts[conversation_id]): # Newest first | |
| artifacts.append({ | |
| "id": artifact.id, | |
| "type": artifact.type.upper(), | |
| "title": artifact.title, | |
| "preview": artifact.content[:100] + "..." if len(artifact.content) > 100 else artifact.content, | |
| "timestamp": time.strftime("%H:%M:%S", time.localtime(artifact.timestamp)) | |
| }) | |
| return artifacts | |
| def get_artifact_by_id(self, conversation_id: str, artifact_id: str) -> Optional[CanvasArtifact]: | |
| """Get specific artifact by ID""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return None | |
| for artifact in self.canvas_artifacts[conversation_id]: | |
| if artifact.id == artifact_id: | |
| return artifact | |
| return None | |
| def clear_canvas(self, conversation_id: str = "default"): | |
| """Clear canvas artifacts""" | |
| if conversation_id in self.canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = [] | |
| console.log(f"[yellow]Cleared canvas: {conversation_id}[/yellow]") | |
| def get_latest_code_artifact(self, conversation_id: str) -> Optional[str]: | |
| """Get the most recent code artifact content""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return None | |
| for artifact in reversed(self.canvas_artifacts[conversation_id]): | |
| if artifact.type == "code": | |
| return artifact.content | |
| return None | |
| class LcarsInterface: | |
| """LCARS-styled Gradio interface for your AI_Agent""" | |
| def __init__(self, ai_agent): | |
| """ | |
| Initialize interface with your AI_Agent instance | |
| Args: | |
| ai_agent: Instance of your AI_Agent class | |
| """ | |
| self.enhanced_agent = EnhancedAIAgent(ai_agent) | |
| self.current_conversation = "default" | |
| self.processing_lock = Lock() | |
| console.log("[bold cyan]✓ LCARS Interface initialized[/bold cyan]") | |
| def create_interface(self): | |
| """Create the full LCARS-styled interface""" | |
| # Enhanced LCARS CSS | |
| lcars_css = """ | |
| :root { | |
| --lcars-orange: #FF9900; | |
| --lcars-red: #FF0033; | |
| --lcars-blue: #6699FF; | |
| --lcars-purple: #CC99FF; | |
| --lcars-pale-blue: #99CCFF; | |
| --lcars-black: #000000; | |
| --lcars-dark-blue: #3366CC; | |
| --lcars-gray: #424242; | |
| --lcars-yellow: #FFFF66; | |
| } | |
| body { | |
| background: var(--lcars-black); | |
| color: var(--lcars-orange); | |
| font-family: 'Antonio', 'LCD', 'Courier New', monospace; | |
| } | |
| .gradio-container { | |
| background: var(--lcars-black) !important; | |
| min-height: 100vh; | |
| } | |
| .lcars-container { | |
| background: var(--lcars-black); | |
| border: 4px solid var(--lcars-orange); | |
| border-radius: 0 30px 0 0; | |
| min-height: 100vh; | |
| padding: 20px; | |
| } | |
| .lcars-header { | |
| background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); | |
| padding: 20px 40px; | |
| border-radius: 0 60px 0 0; | |
| margin: -20px -20px 20px -20px; | |
| border-bottom: 6px solid var(--lcars-blue); | |
| box-shadow: 0 4px 20px rgba(255, 153, 0, 0.3); | |
| } | |
| .lcars-title { | |
| font-size: 3em; | |
| font-weight: bold; | |
| color: var(--lcars-black); | |
| text-shadow: 3px 3px 6px rgba(255, 255, 255, 0.4); | |
| margin: 0; | |
| letter-spacing: 2px; | |
| } | |
| .lcars-subtitle { | |
| font-size: 1.4em; | |
| color: var(--lcars-black); | |
| margin: 10px 0 0 0; | |
| font-weight: bold; | |
| } | |
| .lcars-panel { | |
| background: linear-gradient(135deg, rgba(66, 66, 66, 0.9), rgba(40, 40, 40, 0.9)); | |
| border: 3px solid var(--lcars-orange); | |
| border-radius: 0 25px 0 25px; | |
| padding: 20px; | |
| margin-bottom: 20px; | |
| box-shadow: 0 4px 15px rgba(255, 153, 0, 0.2); | |
| } | |
| .lcars-button { | |
| background: linear-gradient(135deg, var(--lcars-orange), var(--lcars-red)) !important; | |
| color: var(--lcars-black) !important; | |
| border: none !important; | |
| border-radius: 0 20px 0 20px !important; | |
| padding: 12px 24px !important; | |
| font-family: inherit !important; | |
| font-weight: bold !important; | |
| font-size: 1.1em !important; | |
| cursor: pointer !important; | |
| transition: all 0.3s ease !important; | |
| margin: 8px !important; | |
| box-shadow: 0 4px 8px rgba(255, 153, 0, 0.3) !important; | |
| } | |
| .lcars-button:hover { | |
| background: linear-gradient(135deg, var(--lcars-red), var(--lcars-orange)) !important; | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 12px rgba(255, 153, 0, 0.4) !important; | |
| } | |
| .lcars-input { | |
| background: var(--lcars-black) !important; | |
| color: var(--lcars-orange) !important; | |
| border: 2px solid var(--lcars-blue) !important; | |
| border-radius: 0 15px 0 15px !important; | |
| padding: 12px !important; | |
| font-family: inherit !important; | |
| font-size: 1.1em !important; | |
| } | |
| .lcars-chatbot { | |
| background: var(--lcars-black) !important; | |
| border: 3px solid var(--lcars-purple) !important; | |
| border-radius: 0 20px 0 20px !important; | |
| min-height: 400px; | |
| max-height: 500px; | |
| } | |
| .lcars-code-editor { | |
| background: var(--lcars-black) !important; | |
| color: var(--lcars-pale-blue) !important; | |
| border: 3px solid var(--lcars-blue) !important; | |
| border-radius: 0 20px 0 20px !important; | |
| font-family: 'Fira Code', 'Courier New', monospace !important; | |
| font-size: 1em !important; | |
| } | |
| .status-indicator { | |
| display: inline-block; | |
| width: 16px; | |
| height: 16px; | |
| border-radius: 50%; | |
| background: var(--lcars-red); | |
| margin-right: 12px; | |
| box-shadow: 0 0 10px currentColor; | |
| } | |
| .status-online { | |
| background: var(--lcars-blue); | |
| animation: pulse 1.5s infinite; | |
| } | |
| @keyframes pulse { | |
| 0% { transform: scale(1); opacity: 1; } | |
| 50% { transform: scale(1.1); opacity: 0.7; } | |
| 100% { transform: scale(1); opacity: 1; } | |
| } | |
| .panel-title { | |
| color: var(--lcars-yellow) !important; | |
| font-size: 1.4em !important; | |
| font-weight: bold !important; | |
| margin-bottom: 15px !important; | |
| border-bottom: 2px solid var(--lcars-orange); | |
| padding-bottom: 8px; | |
| } | |
| """ | |
| with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: | |
| with gr.Column(elem_classes="lcars-container"): | |
| # Header Section | |
| with gr.Row(elem_classes="lcars-header"): | |
| gr.Markdown(""" | |
| <div style="text-align: center; width: 100%;"> | |
| <div class="lcars-title">🚀 LCARS AI TERMINAL</div> | |
| <div class="lcars-subtitle">ADVANCED AI DEVELOPMENT CONSOLE</div> | |
| <div style="margin-top: 10px;"> | |
| <span class="status-indicator status-online"></span> | |
| <span style="color: var(--lcars-black); font-weight: bold;">SYSTEM ONLINE</span> | |
| </div> | |
| </div> | |
| """) | |
| # Main Content Area | |
| with gr.Row(): | |
| # Left Sidebar - Canvas Artifacts | |
| with gr.Column(scale=1, min_width=400): | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### 🎨 CANVAS ARTIFACTS", elem_classes="panel-title") | |
| artifact_display = gr.JSON( | |
| label="", | |
| elem_id="artifact-display" | |
| ) | |
| with gr.Row(): | |
| refresh_artifacts_btn = gr.Button("🔄 Refresh", elem_classes="lcars-button") | |
| clear_canvas_btn = gr.Button("🗑️ Clear Canvas", elem_classes="lcars-button") | |
| load_latest_btn = gr.Button("📥 Load Latest", elem_classes="lcars-button") | |
| # Main Content - Chat and Code Canvas | |
| with gr.Column(scale=2): | |
| # Collaborative Code Canvas | |
| with gr.Accordion("💻 COLLABORATIVE CODE CANVAS", open=True): | |
| code_editor = gr.Code( | |
| value="# Welcome to LCARS Collaborative Canvas\n# Your code artifacts will appear here\n\nprint('Hello, Starfleet!')", | |
| language="python", | |
| lines=20, | |
| label="", | |
| elem_classes="lcars-code-editor" | |
| ) | |
| with gr.Row(): | |
| discuss_code_btn = gr.Button("💬 Discuss This Code", elem_classes="lcars-button") | |
| analyze_code_btn = gr.Button("🔍 Analyze", elem_classes="lcars-button") | |
| optimize_code_btn = gr.Button("⚡ Optimize", elem_classes="lcars-button") | |
| document_code_btn = gr.Button("📚 Document", elem_classes="lcars-button") | |
| # Chat Interface | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### 💬 MISSION LOG", elem_classes="panel-title") | |
| chatbot = gr.Chatbot( | |
| label="", | |
| elem_classes="lcars-chatbot", | |
| show_label=False, | |
| height=400 | |
| ) | |
| with gr.Row(): | |
| message_input = gr.Textbox( | |
| placeholder="Enter your command or query...", | |
| show_label=False, | |
| lines=2, | |
| elem_classes="lcars-input", | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("🚀 TRANSMIT", elem_classes="lcars-button", scale=1) | |
| # Status and Controls | |
| with gr.Row(): | |
| status_display = gr.Textbox( | |
| value=f"LCARS terminal operational. Model: {self.enhanced_agent.agent.model_id}", | |
| label="Status", | |
| max_lines=2, | |
| elem_classes="lcars-input" | |
| ) | |
| with gr.Column(scale=0): | |
| clear_chat_btn = gr.Button("🗑️ Clear Chat", elem_classes="lcars-button") | |
| new_session_btn = gr.Button("🆕 New Session", elem_classes="lcars-button") | |
| # === EVENT HANDLERS === | |
| def get_artifacts(): | |
| """Get current canvas artifacts""" | |
| return self.enhanced_agent.get_canvas_summary(self.current_conversation) | |
| def clear_canvas(): | |
| """Clear the canvas""" | |
| self.enhanced_agent.clear_canvas(self.current_conversation) | |
| return [], "✅ Canvas cleared" | |
| def load_latest_artifact_to_canvas(): | |
| """Load the most recent code artifact to the canvas""" | |
| latest_code = self.enhanced_agent.get_latest_code_artifact(self.current_conversation) | |
| if latest_code: | |
| return latest_code, "✅ Latest artifact loaded" | |
| return "# No code artifacts available", "⚠️ No artifacts found" | |
| async def process_message(message, history): | |
| """Process a chat message""" | |
| if not message.strip(): | |
| return "", history, "Please enter a message" | |
| # Add user message to history | |
| history = history + [[message, None]] | |
| try: | |
| # Get AI response using the enhanced agent | |
| response = await self.enhanced_agent.chat_with_canvas( | |
| message, | |
| self.current_conversation, | |
| include_canvas=True | |
| ) | |
| # Update history with response | |
| history[-1][1] = response | |
| # Get updated artifacts | |
| artifacts = get_artifacts() | |
| status = f"✅ Response received. Canvas artifacts: {len(artifacts)}" | |
| return "", history, status, artifacts | |
| except Exception as e: | |
| error_msg = f"❌ Error: {str(e)}" | |
| history[-1][1] = error_msg | |
| return "", history, error_msg, get_artifacts() | |
| def create_code_query(code, query_template): | |
| """Create a query about code""" | |
| if not code.strip(): | |
| return "Please provide some code first" | |
| return query_template.format(code=code) | |
| def discuss_code(code): | |
| return create_code_query(code, "Please analyze this code:\n```python\n{code}\n```") | |
| def analyze_code(code): | |
| return create_code_query(code, "Perform a comprehensive analysis of this code:\n```python\n{code}\n```") | |
| def optimize_code(code): | |
| return create_code_query(code, "Optimize this code for performance and best practices:\n```python\n{code}\n```") | |
| def document_code(code): | |
| return create_code_query(code, "Generate comprehensive documentation for this code:\n```python\n{code}\n```") | |
| def clear_chat(): | |
| """Clear chat history""" | |
| self.enhanced_agent.agent.clear_history() | |
| return [], "✅ Chat cleared" | |
| def new_session(): | |
| """Start new session""" | |
| self.enhanced_agent.agent.clear_history() | |
| self.enhanced_agent.clear_canvas(self.current_conversation) | |
| return [], "# New collaborative session started\n\nprint('Ready for development!')", "🆕 New session started", [] | |
| # Connect event handlers | |
| send_btn.click( | |
| process_message, | |
| inputs=[message_input, chatbot], | |
| outputs=[message_input, chatbot, status_display, artifact_display] | |
| ) | |
| message_input.submit( | |
| process_message, | |
| inputs=[message_input, chatbot], | |
| outputs=[message_input, chatbot, status_display, artifact_display] | |
| ) | |
| discuss_code_btn.click( | |
| discuss_code, | |
| inputs=code_editor, | |
| outputs=message_input | |
| ) | |
| analyze_code_btn.click( | |
| analyze_code, | |
| inputs=code_editor, | |
| outputs=message_input | |
| ) | |
| optimize_code_btn.click( | |
| optimize_code, | |
| inputs=code_editor, | |
| outputs=message_input | |
| ) | |
| document_code_btn.click( | |
| document_code, | |
| inputs=code_editor, | |
| outputs=message_input | |
| ) | |
| refresh_artifacts_btn.click( | |
| get_artifacts, | |
| outputs=artifact_display | |
| ) | |
| clear_canvas_btn.click( | |
| clear_canvas, | |
| outputs=[artifact_display, status_display] | |
| ) | |
| load_latest_btn.click( | |
| load_latest_artifact_to_canvas, | |
| outputs=[code_editor, status_display] | |
| ) | |
| clear_chat_btn.click( | |
| clear_chat, | |
| outputs=[chatbot, status_display] | |
| ) | |
| new_session_btn.click( | |
| new_session, | |
| outputs=[chatbot, code_editor, status_display, artifact_display] | |
| ) | |
| # Initialize artifacts on load | |
| interface.load(get_artifacts, outputs=artifact_display) | |
| return interface | |
| # --- Example Usage --- | |
| if __name__ == "__main__": | |
| """ | |
| Example of how to use this interface with your AI_Agent | |
| Uncomment and modify based on your actual import paths: | |
| """ | |
| # Create your agent instance | |
| my_agent = AI_Agent( | |
| model_id="leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf", | |
| system_prompt="You are a helpful AI development assistant." | |
| ) | |
| # Create and launch the interface | |
| interface = LcarsInterface(my_agent) | |
| demo = interface.create_interface() | |
| demo.launch(share=False, show_error=True) | |
| console.log("[bold yellow]⚠️ Please uncomment and configure the main block with your AI_Agent[/bold yellow]") | |
| console.log("[bold cyan]Example:[/bold cyan]") | |
| console.log(" from your_module import AI_Agent") | |
| console.log(" my_agent = AI_Agent(model_id='your-model', system_prompt='...')") | |
| console.log(" interface = LcarsInterface(my_agent)") | |
| console.log(" demo = interface.create_interface()") | |
| console.log(" demo.launch()") | |