Spaces:
Sleeping
Sleeping
| # File: enhanced_gradio_interface.py | |
| import asyncio | |
| from collections import defaultdict | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| from typing import List, Dict, Any, Optional, Callable | |
| from dataclasses import dataclass | |
| from threading import Lock, Event, Thread | |
| import queue | |
| import traceback | |
| from queue import Queue, Empty | |
| from concurrent.futures import ThreadPoolExecutor | |
| import gradio as gr | |
| from openai import AsyncOpenAI, OpenAI | |
| import pyttsx3 | |
| from rich.console import Console | |
| # --- Configuration --- | |
| BASE_URL = "http://localhost:1234/v1" | |
| BASE_API_KEY = "not-needed" | |
| # Using the sync client for the agent's internal sync calls | |
| CLIENT = OpenAI(base_url=BASE_URL, api_key=BASE_API_KEY) | |
| # Using the async client for the static async methods and instance methods | |
| BASE_CLIENT = AsyncOpenAI(base_url=BASE_URL, api_key=BASE_API_KEY) | |
| BASEMODEL_ID = "leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf" | |
| console = Console() | |
| # HuggingFace Spaces configuration (if needed) | |
| HF_INFERENCE_URL = "https://api-inference.huggingface.co/models/" | |
| HF_API_KEY = os.getenv("HF_API_KEY", "") | |
| # Available model options (for UI reference, actual client is configured separately) | |
| MODEL_OPTIONS = { | |
| "Local LM Studio": BASE_URL, # This is a URL, not a model ID | |
| "Codellama 7B": "codellama/CodeLlama-7b-hf", | |
| "Mistral 7B": "mistralai/Mistral-7B-v0.1", | |
| "Llama 2 7B": "meta-llama/Llama-2-7b-chat-hf", | |
| "Falcon 7B": "tiiuae/falcon-7b-instruct" | |
| } | |
| DEFAULT_TEMPERATURE = 0.7 | |
| DEFAULT_MAX_TOKENS = 5000 | |
| # --- Canvas Artifact Support --- | |
| class CanvasArtifact: | |
| id: str | |
| type: str # 'code', 'diagram', 'text', 'image' | |
| content: str | |
| title: str | |
| timestamp: float | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class LLMMessage: | |
| role: str | |
| content: str | |
| message_id: str = None | |
| conversation_id: str = None | |
| timestamp: float = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.message_id is None: | |
| self.message_id = str(uuid.uuid4()) | |
| if self.timestamp is None: | |
| self.timestamp = time.time() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class LLMRequest: | |
| message: LLMMessage | |
| response_event: str = None | |
| callback: Callable = None | |
| def __post_init__(self): | |
| if self.response_event is None: | |
| self.response_event = f"llm_response_{self.message.message_id}" | |
| class LLMResponse: | |
| message: LLMMessage | |
| request_id: str | |
| success: bool = True | |
| error: str = None | |
| # --- Event Manager --- | |
| class EventManager: | |
| def __init__(self): | |
| self._handlers = defaultdict(list) | |
| self._lock = Lock() | |
| def register(self, event: str, handler: Callable): | |
| with self._lock: | |
| self._handlers[event].append(handler) | |
| def unregister(self, event: str, handler: Callable): | |
| with self._lock: | |
| if event in self._handlers and handler in self._handlers[event]: | |
| self._handlers[event].remove(handler) | |
| def raise_event(self, event: str, data: Any): | |
| with self._lock: | |
| handlers = self._handlers[event][:] | |
| for handler in handlers: | |
| try: | |
| handler(data) | |
| except Exception as e: | |
| console.log(f"Error in event handler for {event}: {e}", style="bold red") | |
| EVENT_MANAGER = EventManager() | |
| def RegisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.register(event, handler) | |
| def RaiseEvent(event: str, data: Any): | |
| EVENT_MANAGER.raise_event(event, data) | |
| def UnregisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.unregister(event, handler) | |
| class LLMAgent: | |
| """Main Agent Driver ! | |
| Agent For Multiple messages at once , | |
| has a message queing service as well as a generator method for easy integration with console | |
| applications as well as ui !""" | |
| def __init__( | |
| self, | |
| model_id: str = BASEMODEL_ID, | |
| system_prompt: str = None, | |
| max_queue_size: int = 1000, | |
| max_retries: int = 3, | |
| timeout: int = 30000, | |
| max_tokens: int = 5000, | |
| temperature: float = 0.3, | |
| base_url: str = BASE_URL, | |
| api_key: str = BASE_API_KEY, | |
| generate_fn: Callable[[List[Dict[str, str]]], str] = None # Changed to sync function | |
| ): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt or "You are a helpful AI assistant." | |
| self.request_queue = Queue(maxsize=max_queue_size) | |
| self.max_retries = max_retries | |
| self.timeout = timeout | |
| self.is_running = False | |
| self._stop_event = Event() | |
| self.processing_thread = None | |
| # Conversation tracking | |
| self.conversations: Dict[str, List[LLMMessage]] = {} | |
| self.max_history_length = 20 | |
| # Use the provided generate function or the default sync one | |
| self._generate = generate_fn or self._default_generate_sync | |
| self.api_key = api_key | |
| self.base_url = base_url | |
| self.max_tokens = max_tokens | |
| self.temperature = temperature | |
| # Use the global async client for instance methods if needed | |
| self.async_client = BASE_CLIENT | |
| # Active requests waiting for responses | |
| self.pending_requests: Dict[str, LLMRequest] = {} | |
| self.pending_requests_lock = Lock() | |
| # Canvas artifacts | |
| self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = defaultdict(list) | |
| # Register internal event handlers | |
| self._register_event_handlers() | |
| # Speech synthesis | |
| try: | |
| self.tts_engine = pyttsx3.init() | |
| self.setup_tts() | |
| self.speech_enabled = True | |
| except Exception as e: | |
| console.log(f"[yellow]TTS not available: {e}[/yellow]") | |
| self.speech_enabled = False | |
| console.log("[bold green]π Enhanced LLM Agent Initialized[/bold green]") | |
| # Start the processing thread immediately | |
| self.start() | |
| def setup_tts(self): | |
| """Configure text-to-speech engine""" | |
| if hasattr(self, 'tts_engine'): | |
| voices = self.tts_engine.getProperty('voices') | |
| if voices: | |
| self.tts_engine.setProperty('voice', voices[0].id) | |
| self.tts_engine.setProperty('rate', 150) | |
| self.tts_engine.setProperty('volume', 0.8) | |
| def speak(self, text: str): | |
| """Convert text to speech in a non-blocking way""" | |
| if not hasattr(self, 'speech_enabled') or not self.speech_enabled: | |
| return | |
| def _speak(): | |
| try: | |
| # Clean text for speech (remove markdown, code blocks) | |
| clean_text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) | |
| clean_text = re.sub(r'`.*?`', '', clean_text) | |
| clean_text = clean_text.strip() | |
| if clean_text: | |
| self.tts_engine.say(clean_text) | |
| self.tts_engine.runAndWait() | |
| else: | |
| self.tts_engine.say(text) | |
| self.tts_engine.runAndWait() | |
| except Exception as e: | |
| console.log(f"[red]TTS Error: {e}[/red]") | |
| thread = Thread(target=_speak, daemon=True) | |
| thread.start() | |
| def _default_generate_sync(self, messages: List[Dict[str, str]]) -> str: | |
| """Default sync generate function if none provided""" | |
| return self._call_llm_sync(messages) | |
| def _register_event_handlers(self): | |
| """Register internal event handlers for response routing""" | |
| RegisterEvent("llm_internal_response", self._handle_internal_response) | |
| def _handle_internal_response(self, response: LLMResponse): | |
| """Route responses to the appropriate request handlers""" | |
| console.log(f"[bold cyan]Handling internal response for: {response.request_id}[/bold cyan]") | |
| request = None | |
| with self.pending_requests_lock: | |
| if response.request_id in self.pending_requests: | |
| request = self.pending_requests[response.request_id] | |
| del self.pending_requests[response.request_id] | |
| console.log(f"Found pending request for: {response.request_id}") | |
| else: | |
| console.log(f"No pending request found for: {response.request_id}", style="yellow") | |
| return | |
| # Raise the specific response event | |
| if request.response_event: | |
| console.log(f"[bold green]Raising event: {request.response_event}[/bold green]") | |
| RaiseEvent(request.response_event, response) | |
| # Call callback if provided | |
| if request.callback: | |
| try: | |
| console.log(f"[bold yellow]Calling callback for: {response.request_id}[/bold yellow]") | |
| request.callback(response) | |
| except Exception as e: | |
| console.log(f"Error in callback: {e}", style="bold red") | |
| def _add_to_conversation_history(self, conversation_id: str, message: LLMMessage): | |
| """Add message to conversation history""" | |
| if conversation_id not in self.conversations: | |
| self.conversations[conversation_id] = [] | |
| self.conversations[conversation_id].append(message) | |
| # Trim history if too long | |
| if len(self.conversations[conversation_id]) > self.max_history_length * 2: | |
| self.conversations[conversation_id] = self.conversations[conversation_id][-(self.max_history_length * 2):] | |
| def _build_messages_from_conversation(self, conversation_id: str, new_message: LLMMessage) -> List[Dict[str, str]]: | |
| """Build message list from conversation history""" | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history | |
| if conversation_id in self.conversations: | |
| for msg in self.conversations[conversation_id][-self.max_history_length:]: | |
| messages.append({"role": msg.role, "content": msg.content}) | |
| # Add the new message | |
| messages.append({"role": new_message.role, "content": new_message.content}) | |
| return messages | |
| def _process_llm_request(self, request: LLMRequest): | |
| """Process a single LLM request""" | |
| console.log(f"[bold green]Processing LLM request: {request.message.message_id}[/bold green]") | |
| try: | |
| # Build messages for LLM | |
| messages = self._build_messages_from_conversation( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| console.log(f"Calling LLM with {len(messages)} messages") | |
| # Call LLM using the sync generate function | |
| response_content = self._generate(messages) | |
| console.log(f"[bold green]LLM response received: {response_content[:50]}...[/bold green]") | |
| # Create response message | |
| response_message = LLMMessage( | |
| role="assistant", | |
| content=response_content, | |
| conversation_id=request.message.conversation_id, | |
| metadata={"request_id": request.message.message_id} | |
| ) | |
| # Update conversation history | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| response_message | |
| ) | |
| # Create and send response | |
| response = LLMResponse( | |
| message=response_message, | |
| request_id=request.message.message_id, | |
| success=True | |
| ) | |
| console.log(f"[bold blue]Sending internal response for: {request.message.message_id}[/bold blue]") | |
| RaiseEvent("llm_internal_response", response) | |
| except Exception as e: | |
| console.log(f"[bold red]Error processing LLM request: {e}[/bold red]") | |
| traceback.print_exc() | |
| # Create error response | |
| error_response = LLMResponse( | |
| message=LLMMessage( | |
| role="system", | |
| content=f"Error: {str(e)}", | |
| conversation_id=request.message.conversation_id | |
| ), | |
| request_id=request.message.message_id, | |
| success=False, | |
| error=str(e) | |
| ) | |
| RaiseEvent("llm_internal_response", error_response) | |
| def _call_llm_sync(self, messages: List[Dict[str, str]]) -> str: | |
| """Sync call to the LLM with retry logic""" | |
| console.log(f"Making LLM call to {self.model_id}") | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = CLIENT.chat.completions.create( | |
| model=self.model_id, | |
| messages=messages, | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| content = response.choices[0].message.content | |
| console.log(f"LLM call successful, response length: {len(content)}") | |
| return content | |
| except Exception as e: | |
| console.log(f"LLM call attempt {attempt + 1} failed: {e}") | |
| if attempt == self.max_retries - 1: | |
| raise e | |
| time.sleep(1) # Wait before retry | |
| def _process_queue(self): | |
| """Main queue processing loop""" | |
| console.log("[bold cyan]LLM Agent queue processor started[/bold cyan]") | |
| while not self._stop_event.is_set(): | |
| try: | |
| request = self.request_queue.get(timeout=1.0) | |
| if request: | |
| console.log(f"Got request from queue: {request.message.message_id}") | |
| self._process_llm_request(request) | |
| self.request_queue.task_done() | |
| except Empty: | |
| continue | |
| except Exception as e: | |
| console.log(f"Error in queue processing: {e}", style="bold red") | |
| traceback.print_exc() | |
| console.log("[bold cyan]LLM Agent queue processor stopped[/bold cyan]") | |
| def send_message( | |
| self, | |
| content: str, | |
| role: str = "user", | |
| conversation_id: str = None, | |
| response_event: str = None, | |
| callback: Callable = None, | |
| metadata: Dict = None | |
| ) -> str: | |
| """Send a message to the LLM and get response via events""" | |
| if not self.is_running: | |
| raise RuntimeError("LLM Agent is not running. Call start() first.") | |
| # Create message | |
| message = LLMMessage( | |
| role=role, | |
| content=content, | |
| conversation_id=conversation_id, | |
| metadata=metadata or {} | |
| ) | |
| # Create request | |
| request = LLMRequest( | |
| message=message, | |
| response_event=response_event, | |
| callback=callback | |
| ) | |
| # Store in pending requests BEFORE adding to queue | |
| with self.pending_requests_lock: | |
| self.pending_requests[message.message_id] = request | |
| console.log(f"Added to pending requests: {message.message_id}") | |
| # Add to queue | |
| try: | |
| self.request_queue.put(request, timeout=5.0) | |
| console.log(f"[bold magenta]Message queued: {message.message_id}, Content: {content[:50]}...[/bold magenta]") | |
| return message.message_id | |
| except queue.Full: | |
| console.log(f"[bold red]Queue full, cannot send message[/bold red]") | |
| with self.pending_requests_lock: | |
| if message.message_id in self.pending_requests: | |
| del self.pending_requests[message.message_id] | |
| raise RuntimeError("LLM Agent queue is full") | |
| async def chat(self, messages: List[Dict[str, str]]) -> str: | |
| """ | |
| Async chat method that sends message via queue and returns response string. | |
| This is the main method you should use. | |
| """ | |
| # Create future for the response | |
| loop = asyncio.get_event_loop() | |
| response_future = loop.create_future() | |
| def chat_callback(response: LLMResponse): | |
| """Callback when LLM responds - thread-safe""" | |
| console.log(f"[bold yellow]β CHAT CALLBACK TRIGGERED![/bold yellow]") | |
| if not response_future.done(): | |
| if response.success: | |
| content = response.message.content | |
| console.log(f"Callback received content: {content[:50]}...") | |
| # Schedule setting the future result on the main event loop | |
| loop.call_soon_threadsafe(response_future.set_result, content) | |
| else: | |
| console.log(f"Error in response: {response.error}") | |
| error_msg = f"β Error: {response.error}" | |
| loop.call_soon_threadsafe(response_future.set_result, error_msg) | |
| else: | |
| console.log(f"[bold red]Future already done, ignoring callback[/bold red]") | |
| console.log(f"Sending message to LLM agent...") | |
| # Extract the actual message content from the messages list | |
| user_message = "" | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| user_message = msg.get("content", "") | |
| break | |
| if not user_message.strip(): | |
| return "" | |
| # Send message with callback using the queue system | |
| try: | |
| message_id = self.send_message( | |
| content=user_message, | |
| conversation_id="default", | |
| callback=chat_callback | |
| ) | |
| console.log(f"Message sent with ID: {message_id}, waiting for response...") | |
| # Wait for the response and return it | |
| try: | |
| response = await asyncio.wait_for(response_future, timeout=self.timeout) | |
| console.log(f"[bold green]β Chat complete! Response length: {len(response)}[/bold green]") | |
| return response | |
| except asyncio.TimeoutError: | |
| console.log("[bold red]Response timeout[/bold red]") | |
| # Clean up the pending request | |
| with self.pending_requests_lock: | |
| if message_id in self.pending_requests: | |
| del self.pending_requests[message_id] | |
| return "β Response timeout - check if LLM server is running" | |
| except Exception as e: | |
| console.log(f"[bold red]Error sending message: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"β Error sending message: {e}" | |
| def start(self): | |
| """Start the LLM agent""" | |
| if not self.is_running: | |
| self.is_running = True | |
| self._stop_event.clear() | |
| self.processing_thread = Thread(target=self._process_queue, daemon=True) | |
| self.processing_thread.start() | |
| console.log("[bold green]LLM Agent started[/bold green]") | |
| def stop(self): | |
| """Stop the LLM agent""" | |
| console.log("Stopping LLM Agent...") | |
| self._stop_event.set() | |
| if self.processing_thread and self.processing_thread.is_alive(): | |
| self.processing_thread.join(timeout=10) | |
| self.is_running = False | |
| console.log("LLM Agent stopped") | |
| def get_conversation_history(self, conversation_id: str = "default") -> List[LLMMessage]: | |
| """Get conversation history""" | |
| return self.conversations.get(conversation_id, [])[:] | |
| def clear_conversation(self, conversation_id: str = "default"): | |
| """Clear conversation history""" | |
| if conversation_id in self.conversations: | |
| del self.conversations[conversation_id] | |
| # --- Canvas Methods --- | |
| def add_artifact(self, conversation_id: str, artifact_type: str, content: str, title: str = "", metadata: Dict = None): | |
| """Add an artifact to the canvas for a conversation.""" | |
| artifact = CanvasArtifact( | |
| id=str(uuid.uuid4()), | |
| type=artifact_type, | |
| content=content, | |
| title=title, | |
| timestamp=time.time(), | |
| metadata=metadata or {} | |
| ) | |
| self.canvas_artifacts[conversation_id].append(artifact) | |
| def get_canvas_artifacts(self, conversation_id: str = "default") -> List[CanvasArtifact]: | |
| """Get all artifacts for a conversation.""" | |
| return self.canvas_artifacts.get(conversation_id, []) | |
| def get_canvas_summary(self, conversation_id: str = "default") -> List[Dict[str, Any]]: | |
| """Get a summary of artifacts for display.""" | |
| artifacts = self.get_canvas_artifacts(conversation_id) | |
| return [{"id": a.id, "type": a.type, "title": a.title, "timestamp": a.timestamp} for a in artifacts] | |
| def clear_canvas(self, conversation_id: str = "default"): | |
| """Clear canvas artifacts for a conversation.""" | |
| if conversation_id in self.canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = [] | |
| async def chat_with_canvas(self, user_message: str, conversation_id: str = "default", include_canvas: bool = False) -> str: | |
| """ | |
| Chat method that can optionally include canvas content in the prompt. | |
| """ | |
| messages = [{"role": "user", "content": user_message}] | |
| if include_canvas: | |
| canvas_artifacts = self.get_canvas_artifacts(conversation_id) | |
| if canvas_artifacts: | |
| canvas_content = "\n\n--- CANVAS CONTENT ---\n" | |
| for artifact in canvas_artifacts: | |
| canvas_content += f"\n[{artifact.type}] {artifact.title or 'Untitled'}:\n{artifact.content}\n" | |
| canvas_content += "\n--- END CANVAS CONTENT ---\n" | |
| # Add canvas content as a system message | |
| messages.insert(0, {"role": "system", "content": canvas_content}) | |
| return await self.chat(messages) | |
| async def openai_generate(messages: List[Dict[str, str]], max_tokens: int = 8096, temperature: float = 0.4, model: str = BASEMODEL_ID, tools=None) -> str: | |
| """Static method for generating responses using OpenAI API""" | |
| try: | |
| resp = await BASE_CLIENT.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| tools=tools | |
| ) | |
| response_text = resp.choices[0].message.content or "" | |
| return response_text | |
| except Exception as e: | |
| console.log(f"[bold red]Error in openai_generate: {e}[/bold red]") | |
| return f"[LLM_Agent Error - openai_generate: {str(e)}]" | |
| def get_queue_size(self) -> int: | |
| """Get current queue size""" | |
| return self.request_queue.qsize() | |
| def get_pending_requests_count(self) -> int: | |
| """Get number of pending requests""" | |
| with self.pending_requests_lock: | |
| return len(self.pending_requests) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get agent status information""" | |
| return { | |
| "is_running": self.is_running, | |
| "queue_size": self.get_queue_size(), | |
| "pending_requests": self.get_pending_requests_count(), | |
| "conversations_count": len(self.conversations), | |
| "model": self.model_id | |
| } | |
| class AI_Agent: | |
| def __init__(self, model_id: str, system_prompt: str = "You are a helpful assistant. Respond concisely in 1-2 sentences.", history: List[Dict] = None): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt | |
| self.history = history or [] | |
| self.conversation_id = f"conv_{uuid.uuid4().hex[:8]}" | |
| # Create agent instance - using the static async method as the generate function | |
| self.client = LLMAgent( | |
| model_id=model_id, | |
| system_prompt=self.system_prompt, | |
| generate_fn=lambda msgs: asyncio.run(LLMAgent.openai_generate(msgs, model=model_id)) | |
| ) | |
| console.log(f"[bold green]β MyAgent initialized with model: {model_id}[/bold green]") | |
| async def call_llm(self, messages: List[Dict], use_history: bool = True) -> str: | |
| """ | |
| Send messages to LLM and get response | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| use_history: Whether to include conversation history | |
| Returns: | |
| str: LLM response | |
| """ | |
| try: | |
| console.log(f"[bold yellow]Sending {len(messages)} messages to LLM (use_history: {use_history})...[/bold yellow]") | |
| # Enhance messages based on history setting | |
| enhanced_messages = await self._enhance_messages(messages, use_history) | |
| response = await self.client.chat(enhanced_messages) | |
| console.log(f"[bold green]β Response received ({len(response)} chars)[/bold green]") | |
| # Update conversation history ONLY if we're using history | |
| if use_history: | |
| self._update_history(messages, response) | |
| return response | |
| except Exception as e: | |
| console.log(f"[bold red]β ERROR: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"Error: {str(e)}" | |
| async def _enhance_messages(self, messages: List[Dict], use_history: bool) -> List[Dict]: | |
| """Enhance messages with system prompt and optional history""" | |
| enhanced = [] | |
| # Add system prompt if not already in messages | |
| has_system = any(msg.get('role') == 'system' for msg in messages) | |
| if not has_system and self.system_prompt: | |
| enhanced.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history only if requested | |
| if use_history and self.history: | |
| enhanced.extend(self.history[-10:]) # Last 10 messages for context | |
| # Add current messages | |
| enhanced.extend(messages) | |
| return enhanced | |
| def _update_history(self, messages: List[Dict], response: str): | |
| """Update conversation history with new exchange""" | |
| # Add user messages to history | |
| for msg in messages: | |
| if msg.get('role') in ['user', 'assistant']: | |
| self.history.append(msg) | |
| # Add assistant response to history | |
| self.history.append({"role": "assistant", "content": response}) | |
| # Keep history manageable (last 20 exchanges) | |
| if len(self.history) > 40: # 20 user + 20 assistant messages | |
| self.history = self.history[-40:] | |
| async def simple_query(self, query: str) -> str: | |
| """Simple one-shot query method - NO history/context""" | |
| messages = [{"role": "user", "content": query}] | |
| return await self.call_llm(messages, use_history=False) | |
| async def multi_turn_chat(self, user_input: str) -> str: | |
| """Multi-turn chat that maintains context across calls""" | |
| messages = [{"role": "user", "content": user_input}] | |
| response = await self.call_llm(messages, use_history=True) | |
| return response | |
| def get_conversation_summary(self) -> Dict: | |
| """Get conversation summary""" | |
| return { | |
| "conversation_id": self.conversation_id, | |
| "total_messages": len(self.history), | |
| "user_messages": len([msg for msg in self.history if msg.get('role') == 'user']), | |
| "assistant_messages": len([msg for msg in self.history if msg.get('role') == 'assistant']), | |
| "recent_exchanges": self.history[-4:] if self.history else [] | |
| } | |
| def clear_history(self): | |
| """Clear conversation history""" | |
| self.history.clear() | |
| console.log("[bold yellow]Conversation history cleared[/bold yellow]") | |
| def update_system_prompt(self, new_prompt: str): | |
| """Update the system prompt""" | |
| self.system_prompt = new_prompt | |
| console.log(f"[bold blue]System prompt updated[/bold blue]") | |
| def stop(self): | |
| """Stop the client gracefully""" | |
| if hasattr(self, 'client') and self.client: | |
| self.client.stop() | |
| console.log("[bold yellow]MyAgent client stopped[/bold yellow]") | |
| async def contextual_query(self, query: str, context_messages: List[Dict] = None, | |
| context_text: str = None, context_files: List[str] = None) -> str: | |
| """ | |
| Query with specific context but doesn't update main history | |
| Args: | |
| query: The user question | |
| context_messages: List of message dicts for context | |
| context_text: Plain text context (will be converted to system message) | |
| context_files: List of file paths to read and include as context | |
| """ | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Handle different context types | |
| if context_messages: | |
| messages.extend(context_messages) | |
| if context_text: | |
| messages.append({"role": "system", "content": f"Additional context: {context_text}"}) | |
| if context_files: | |
| file_context = await self._read_files_context(context_files) | |
| if file_context: | |
| messages.append({"role": "system", "content": f"File contents:\n{file_context}"}) | |
| # Add the actual query | |
| messages.append({"role": "user", "content": query}) | |
| return await self.call_llm(messages, use_history=False) | |
| async def _read_files_context(self, file_paths: List[str]) -> str: | |
| """Read multiple files and return as context string""" | |
| contexts = [] | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| contexts.append(f"--- {os.path.basename(file_path)} ---\n{content}") | |
| else: | |
| console.log(f"[bold yellow]File not found: {file_path}[/bold yellow]") | |
| except Exception as e: | |
| console.log(f"[bold red]Error reading file {file_path}: {e}[/bold red]") | |
| return "\n".join(contexts) if contexts else "" | |
| async def query_with_code_context(self, query: str, code_snippets: List[str] = None, | |
| code_files: List[str] = None) -> str: | |
| """ | |
| Specialized contextual query for code-related questions | |
| """ | |
| code_context = "CODE CONTEXT:\n" | |
| if code_snippets: | |
| for i, snippet in enumerate(code_snippets, 1): | |
| code_context += f"\nSnippet {i}:\n```\n{snippet}\n```\n" | |
| if code_files: | |
| # Read code files and include them | |
| for file_path in code_files: | |
| if file_path.endswith(('.py', '.js', '.java', '.cpp', '.c', '.html', '.css')): | |
| code_context += f"\nFile: {file_path}\n```\n" | |
| try: | |
| with open(file_path, 'r') as f: | |
| code_context += f.read() | |
| except Exception as e: | |
| code_context += f"Error reading file: {e}" | |
| code_context += "\n```\n" | |
| return await self.contextual_query(query, context_text=code_context) | |
| async def multi_context_query(self, query: str, contexts: Dict[str, Any]) -> str: | |
| """ | |
| Advanced contextual query with multiple context types | |
| Args: | |
| query: The user question | |
| contexts: Dict with various context types | |
| - 'messages': List of message dicts | |
| - 'text': Plain text context | |
| - 'files': List of file paths | |
| - 'urls': List of URLs | |
| - 'code': List of code snippets or files | |
| - 'metadata': Any additional metadata | |
| """ | |
| all_context_messages = [] | |
| # Build context from different sources | |
| if contexts.get('text'): | |
| all_context_messages.append({"role": "system", "content": f"Context: {contexts['text']}"}) | |
| if contexts.get('messages'): | |
| all_context_messages.extend(contexts['messages']) | |
| if contexts.get('files'): | |
| file_context = await self._read_files_context(contexts['files']) | |
| if file_context: | |
| all_context_messages.append({"role": "system", "content": f"File Contents:\n{file_context}"}) | |
| if contexts.get('code'): | |
| code_context = "\n".join([f"Code snippet {i}:\n```\n{code}\n```" | |
| for i, code in enumerate(contexts['code'], 1)]) | |
| all_context_messages.append({"role": "system", "content": f"Code Context:\n{code_context}"}) | |
| if contexts.get('metadata'): | |
| all_context_messages.append({"role": "system", "content": f"Metadata: {contexts['metadata']}"}) | |
| return await self.contextual_query(query, context_messages=all_context_messages) | |
| # --- LCARS Styled Gradio Interface --- | |
| class LcarsInterface: | |
| def __init__(self): | |
| # Start with the configured local client | |
| self.agent = LLMAgent(generate_fn=LLMAgent.openai_generate) | |
| def create_interface(self): | |
| """Create the full LCARS-styled interface""" | |
| lcars_css = """ | |
| :root { | |
| --lcars-orange: #FF9900; | |
| --lcars-red: #FF0033; | |
| --lcars-blue: #6699FF; | |
| --lcars-purple: #CC99FF; | |
| --lcars-pale-blue: #99CCFF; | |
| --lcars-black: #000000; | |
| --lcars-dark-blue: #3366CC; | |
| --lcars-gray: #424242; | |
| --lcars-yellow: #FFFF66; | |
| } | |
| body { | |
| background: var(--lcars-black); | |
| color: var(--lcars-orange); | |
| font-family: 'Antonio', 'LCD', 'Courier New', monospace; | |
| margin: 0; | |
| padding: 0; | |
| } | |
| .gradio-container { | |
| background: var(--lcars-black) !important; | |
| min-height: 100vh; | |
| } | |
| .lcars-container { | |
| background: var(--lcars-black); | |
| border: 4px solid var(--lcars-orange); | |
| border-radius: 0 30px 0 0; | |
| min-height: 100vh; | |
| padding: 20px; | |
| } | |
| .lcars-header { | |
| background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); | |
| padding: 20px 40px; | |
| border-radius: 0 60px 0 0; | |
| margin: -20px -20px 20px -20px; | |
| border-bottom: 6px solid var(--lcars-blue); | |
| } | |
| .lcars-title { | |
| font-size: 2.5em; | |
| font-weight: bold; | |
| color: var(--lcars-black); | |
| margin: 0; | |
| } | |
| .lcars-subtitle { | |
| font-size: 1.2em; | |
| color: var(--lcars-black); | |
| margin: 10px 0 0 0; | |
| } | |
| .lcars-panel { | |
| background: rgba(66, 66, 66, 0.9); | |
| border: 2px solid var(--lcars-orange); | |
| border-radius: 0 20px 0 20px; | |
| padding: 15px; | |
| margin-bottom: 15px; | |
| } | |
| .lcars-button { | |
| background: var(--lcars-orange); | |
| color: var(--lcars-black) !important; | |
| border: none !important; | |
| border-radius: 0 15px 0 15px !important; | |
| padding: 10px 20px !important; | |
| font-family: inherit !important; | |
| font-weight: bold !important; | |
| margin: 5px !important; | |
| } | |
| .lcars-button:hover { | |
| background: var(--lcars-red) !important; | |
| } | |
| .lcars-input { | |
| background: var(--lcars-black) !important; | |
| color: var(--lcars-orange) !important; | |
| border: 2px solid var(--lcars-blue) !important; | |
| border-radius: 0 10px 0 10px !important; | |
| padding: 10px !important; | |
| } | |
| .lcars-chatbot { | |
| background: var(--lcars-black) !important; | |
| border: 2px solid var(--lcars-purple) !important; | |
| border-radius: 0 15px 0 15px !important; | |
| } | |
| .status-indicator { | |
| display: inline-block; | |
| width: 12px; | |
| height: 12px; | |
| border-radius: 50%; | |
| background: var(--lcars-red); | |
| margin-right: 8px; | |
| } | |
| .status-online { | |
| background: var(--lcars-blue); | |
| animation: pulse 2s infinite; | |
| } | |
| @keyframes pulse { | |
| 0% { opacity: 1; } | |
| 50% { opacity: 0.5; } | |
| 100% { opacity: 1; } | |
| } | |
| """ | |
| with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: | |
| with gr.Column(elem_classes="lcars-container"): | |
| # Header | |
| with gr.Row(elem_classes="lcars-header"): | |
| gr.Markdown(""" | |
| <div style="text-align: center; width: 100%;"> | |
| <div class="lcars-title">π LCARS TERMINAL</div> | |
| <div class="lcars-subtitle">STARFLEET AI DEVELOPMENT CONSOLE</div> | |
| <div style="margin-top: 10px;"> | |
| <span class="status-indicator status-online"></span> | |
| <span style="color: var(--lcars-black); font-weight: bold;">SYSTEM ONLINE</span> | |
| </div> | |
| </div> | |
| """) | |
| # Main Content | |
| with gr.Row(): | |
| # Left Sidebar | |
| with gr.Column(scale=1): | |
| # Configuration Panel | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### π§ CONFIGURATION") | |
| with gr.Row(): | |
| model_dropdown = gr.Dropdown( | |
| choices=list(MODEL_OPTIONS.keys())[1:], # Exclude the 'Local LM Studio' URL entry | |
| value=list(MODEL_OPTIONS.keys())[1], # Default to Codellama 7B | |
| label="AI Model", | |
| elem_classes="lcars-input" | |
| ) | |
| fetch_models_btn = gr.Button("π‘ Fetch Models", elem_classes="lcars-button") | |
| with gr.Row(): | |
| temperature = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") | |
| max_tokens = gr.Slider(128, 8192, value=2000, step=128, label="Max Tokens") | |
| with gr.Row(): | |
| update_config_btn = gr.Button("πΎ Apply Config", elem_classes="lcars-button") | |
| speech_toggle = gr.Checkbox(value=True, label="π Speech Output") | |
| # Canvas Artifacts | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### π¨ CANVAS ARTIFACTS") | |
| artifact_display = gr.JSON(label="Canvas Summary") | |
| with gr.Row(): | |
| refresh_artifacts_btn = gr.Button("π Refresh", elem_classes="lcars-button") | |
| clear_canvas_btn = gr.Button("ποΈ Clear Canvas", elem_classes="lcars-button") | |
| # Main Content Area | |
| with gr.Column(scale=2): | |
| # Code Canvas | |
| with gr.Accordion("π» COLLABORATIVE CODE CANVAS", open=True): | |
| code_editor = gr.Code( | |
| value="# Welcome to LCARS Collaborative Canvas\nprint('Hello, Starfleet!')", | |
| language="python", | |
| lines=15, | |
| label="" | |
| ) | |
| with gr.Row(): | |
| load_to_chat_btn = gr.Button("π¬ Discuss Code", elem_classes="lcars-button") | |
| analyze_btn = gr.Button("π Analyze", elem_classes="lcars-button") | |
| optimize_btn = gr.Button("β‘ Optimize", elem_classes="lcars-button") | |
| # Chat Interface | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### π¬ MISSION LOG") | |
| chatbot = gr.Chatbot(label="", height=300, elem_classes="lcars-chatbot") | |
| with gr.Row(): | |
| message_input = gr.Textbox( | |
| placeholder="Enter your command or query...", | |
| show_label=False, | |
| lines=2, | |
| elem_classes="lcars-input" | |
| ) | |
| send_btn = gr.Button("π SEND", elem_classes="lcars-button") | |
| # Status | |
| with gr.Row(): | |
| status_display = gr.Textbox( | |
| value="LCARS terminal operational. Awaiting commands.", | |
| label="Status", | |
| max_lines=2, | |
| elem_classes="lcars-input" | |
| ) | |
| with gr.Column(scale=0): | |
| clear_chat_btn = gr.Button("ποΈ Clear Chat", elem_classes="lcars-button") | |
| new_session_btn = gr.Button("π New Session", elem_classes="lcars-button") | |
| # === EVENT HANDLERS === | |
| def update_agent_config(model_key, temp_val, max_tok_val, speech_enabled): | |
| # Map UI model key to actual model ID | |
| model_id = MODEL_OPTIONS.get(model_key, BASEMODEL_ID) | |
| # Update agent attributes | |
| self.agent.model_id = model_id | |
| self.agent.temperature = temp_val | |
| self.agent.max_tokens = max_tok_val | |
| self.agent.speech_enabled = speech_enabled | |
| # Update TTS if enabled/disabled | |
| if speech_enabled and not self.agent.speech_enabled: | |
| try: | |
| self.agent.tts_engine = pyttsx3.init() | |
| self.agent.setup_tts() | |
| self.agent.speech_enabled = True | |
| except Exception as e: | |
| console.log(f"[yellow]TTS re-enable failed: {e}[/yellow]") | |
| elif not speech_enabled and self.agent.speech_enabled: | |
| self.agent.speech_enabled = False | |
| return f"β Config updated: {model_key}, T={temp_val}, MaxTok={max_tok_val}, Speech={speech_enabled}" | |
| def get_artifacts(): | |
| return self.agent.get_canvas_summary("default") # Assuming single conversation for UI | |
| def clear_canvas(): | |
| self.agent.clear_canvas("default") | |
| return [], "β Canvas cleared" | |
| def clear_chat(): | |
| self.agent.clear_conversation("default") | |
| return [], "β Chat cleared" | |
| def new_session(): | |
| self.agent.clear_conversation("default") | |
| self.agent.clear_canvas("default") | |
| return [], "# New session started\nprint('Ready!')", "π New session started", [] | |
| async def process_message(message, history, speech_enabled): | |
| if not message.strip(): | |
| return "", history, "Please enter a message", self.agent.get_canvas_summary("default") | |
| history = history + [[message, None]] | |
| try: | |
| # For simplicity, use the basic chat method here. Canvas integration can be added if needed. | |
| response = await self.agent.chat([{"role": "user", "content": message}]) | |
| history[-1][1] = response | |
| if speech_enabled and self.agent.speech_enabled: | |
| self.agent.speak(response) | |
| artifacts = self.agent.get_canvas_summary("default") | |
| status = f"β Response received. Canvas artifacts: {len(artifacts)}" | |
| return "", history, status, artifacts | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| history[-1][1] = error_msg | |
| return "", history, error_msg, self.agent.get_canvas_summary("default") | |
| # Connect events | |
| update_config_btn.click( | |
| update_agent_config, | |
| inputs=[model_dropdown, temperature, max_tokens, speech_toggle], | |
| outputs=status_display | |
| ) | |
| refresh_artifacts_btn.click(get_artifacts, outputs=artifact_display) | |
| clear_canvas_btn.click(clear_canvas, outputs=[artifact_display, status_display]) | |
| clear_chat_btn.click(clear_chat, outputs=[chatbot, status_display]) | |
| new_session_btn.click(new_session, outputs=[chatbot, code_editor, status_display, artifact_display]) | |
| send_btn.click( | |
| process_message, | |
| inputs=[message_input, chatbot, speech_toggle], | |
| outputs=[message_input, chatbot, status_display, artifact_display] | |
| ) | |
| message_input.submit( | |
| process_message, | |
| inputs=[message_input, chatbot, speech_toggle], | |
| outputs=[message_input, chatbot, status_display, artifact_display] | |
| ) | |
| interface.load(get_artifacts, outputs=artifact_display) | |
| return interface | |
| # --- Main Application --- | |
| def main(): | |
| console.log("[bold blue]π Starting LCARS Terminal...[/bold blue]") | |
| is_space = os.getenv('SPACE_ID') is not None | |
| if is_space: | |
| console.log("[green]π Detected HuggingFace Space[/green]") | |
| else: | |
| console.log("[blue]π» Running locally[/blue]") | |
| interface = LcarsInterface() | |
| demo = interface.create_interface() | |
| demo.launch(share=is_space) | |
| if __name__ == "__main__": | |
| main() |