diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,14 +1,969 @@ +import pickle +import subprocess +import sys import gradio as gr import os from openai import AsyncOpenAI from openai import OpenAI from huggingface_hub import InferenceClient +# File: enhanced_gradio_interface.py +import asyncio +from collections import defaultdict +import json +import os +import re +from time import time +import uuid +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +from threading import Lock +import threading +import json +import os +import queue +import traceback +import uuid +from typing import Coroutine, Dict, List, Any, Optional, Callable +from dataclasses import dataclass +from queue import Queue, Empty +from threading import Lock, Event, Thread +import threading +from concurrent.futures import ThreadPoolExecutor +import time +import gradio as gr +from openai import AsyncOpenAI, OpenAI +import pyttsx3 +from rich.console import Console + api_key = "" client = OpenAI( base_url="https://Localhost/v1", api_key=api_key ) + +BASE_URL="http://localhost:1234/v1" +BASE_API_KEY="not-needed" +BASE_CLIENT = AsyncOpenAI( + base_url=BASE_URL, + api_key=BASE_API_KEY +) # Global state for client +BASEMODEL_ID = "leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf" # Global state for selected model ID +CLIENT =OpenAI( + base_url=BASE_URL, + api_key=BASE_API_KEY +) # Global state for client +# --- Global Variables (if needed) --- +console = Console() +# --- Configuration --- +LOCAL_BASE_URL = "http://localhost:1234/v1" +LOCAL_API_KEY = "not-needed" +# HuggingFace Spaces configuration +HF_INFERENCE_URL = "https://api-inference.huggingface.co/models/" +HF_API_KEY = os.getenv("HF_API_KEY", "") + +DEFAULT_TEMPERATURE = 0.7 +DEFAULT_MAX_TOKENS = 5000 +console = Console() + +############################################################# +@dataclass +class LLMMessage: + role: str + content: str + message_id: str = None + conversation_id: str = None + timestamp: float = None + metadata: Dict[str, Any] = None + + def __post_init__(self): + if self.message_id is None: + self.message_id = str(uuid.uuid4()) + if self.timestamp is None: + self.timestamp = time.time() + if self.metadata is None: + self.metadata = {} + +@dataclass +class LLMRequest: + message: LLMMessage + response_event: str = None + callback: Callable = None + + def __post_init__(self): + if self.response_event is None: + self.response_event = f"llm_response_{self.message.message_id}" + +@dataclass +class LLMResponse: + message: LLMMessage + request_id: str + success: bool = True + error: str = None + +############################################################# +class EventManager: + def __init__(self): + self._handlers = defaultdict(list) + self._lock = threading.Lock() + + def register(self, event: str, handler: Callable): + with self._lock: + self._handlers[event].append(handler) + + def unregister(self, event: str, handler: Callable): + with self._lock: + if event in self._handlers and handler in self._handlers[event]: + self._handlers[event].remove(handler) + + def raise_event(self, event: str, data: Any): + with self._lock: + handlers = self._handlers[event][:] + + for handler in handlers: + try: + handler(data) + except Exception as e: + console.log(f"Error in event handler for {event}: {e}", style="bold red") + + +EVENT_MANAGER = EventManager() +def RegisterEvent(event: str, handler: Callable): + EVENT_MANAGER.register(event, handler) + +def RaiseEvent(event: str, data: Any): + EVENT_MANAGER.raise_event(event, data) + +def UnregisterEvent(event: str, handler: Callable): + EVENT_MANAGER.unregister(event, handler) + + + ############################################################# +@dataclass +class CanvasArtifact: + id: str + type: str # 'code', 'diagram', 'text', 'image' + content: str + title: str + timestamp: float + metadata: Dict[str, Any] = None + + def __post_init__(self): + if self.metadata is None: + self.metadata = {} + +class LLMAgent: + """Main Agent Driver ! + Agent For Multiple messages at once , + has a message queing service as well as agenerator method for easy intergration with console + applications as well as ui !""" + def __init__( + self, + model_id: str = BASEMODEL_ID, + system_prompt: str = None, + max_queue_size: int = 1000, + max_retries: int = 3, + timeout: int = 30000, + max_tokens: int = 5000, + temperature: float = 0.3, + base_url: str = "http://localhost:1234/v1", + api_key: str = "not-needed", + generate_fn: Callable[[List[Dict[str, str]]], Coroutine[Any, Any, str]] = None, + ): + self.model_id = model_id + self.system_prompt = system_prompt or "You are a helpful AI assistant." + self.request_queue = Queue(maxsize=max_queue_size) + self.max_retries = max_retries + self.timeout = timeout + self.is_running = False + self._stop_event = Event() + self.processing_thread = None + # Canvas artifacts + self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = defaultdict(list) + self.max_canvas_artifacts = 1000 + # Conversation tracking + self.conversations: Dict[str, List[LLMMessage]] = {} + self.max_history_length = 100 + self._generate = generate_fn or self._default_generate + self.api_key = api_key + self.base_url = base_url + self.max_tokens = max_tokens + self.temperature = temperature + self.async_client = self.CreateClient(base_url, api_key) + self.current_conversation = "default" + + # Active requests waiting for responses + self.pending_requests: Dict[str, LLMRequest] = {} + self.pending_requests_lock = Lock() + + # Register internal event handlers + self._register_event_handlers() + # Register internal event handlers + self._register_event_handlers() + # Speech synthesis + try: + self.tts_engine = pyttsx3.init() + self.setup_tts() + self.speech_enabled = True + except Exception as e: + console.log(f"[yellow]TTS not available: {e}[/yellow]") + self.speech_enabled = False + + console.log("[bold green]πŸš€ Enhanced LLM Agent Initialized[/bold green]") + + # Start the processing thread immediately + self.start() + def setup_tts(self): + """Configure text-to-speech engine""" + if hasattr(self, 'tts_engine'): + voices = self.tts_engine.getProperty('voices') + if voices: + self.tts_engine.setProperty('voice', voices[0].id) + self.tts_engine.setProperty('rate', 150) + self.tts_engine.setProperty('volume', 0.8) + + def speak(self, text: str): + """Convert text to speech in a non-blocking way""" + if not hasattr(self, 'speech_enabled') or not self.speech_enabled: + return + + def _speak(): + try: + # Clean text for speech (remove markdown, code blocks) + clean_text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) + clean_text = re.sub(r'`.*?`', '', clean_text) + clean_text = clean_text.strip() + if clean_text: + self.tts_engine.say(clean_text) + self.tts_engine.runAndWait() + else: + self.tts_engine.say(text) + self.tts_engine.runAndWait() + except Exception as e: + console.log(f"[red]TTS Error: {e}[/red]") + + thread = threading.Thread(target=_speak, daemon=True) + thread.start() + + async def _default_generate(self, messages: List[Dict[str, str]]) -> str: + """Default generate function if none provided""" + return await self.openai_generate(messages) + def create_interface(self): + """Create the full LCARS-styled interface without HuggingFace options""" + lcars_css = """ + :root { + --lcars-orange: #FF9900; + --lcars-red: #FF0033; + --lcars-blue: #6699FF; + --lcars-purple: #CC99FF; + --lcars-pale-blue: #99CCFF; + --lcars-black: #000000; + --lcars-dark-blue: #3366CC; + --lcars-gray: #424242; + --lcars-yellow: #FFFF66; + } + body { + background: var(--lcars-black); + color: var(--lcars-orange); + font-family: 'Antonio', 'LCD', 'Courier New', monospace; + margin: 0; + padding: 0; + } + .gradio-container { + background: var(--lcars-black) !important; + min-height: 100vh; + } + .lcars-container { + background: var(--lcars-black); + border: 4px solid var(--lcars-orange); + border-radius: 0 30px 0 0; + min-height: 100vh; + padding: 20px; + } + .lcars-header { + background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); + padding: 20px 40px; + border-radius: 0 60px 0 0; + margin: -20px -20px 20px -20px; + border-bottom: 6px solid var(--lcars-blue); + } + .lcars-title { + font-size: 2.5em; + font-weight: bold; + color: var(--lcars-black); + margin: 0; + } + .lcars-subtitle { + font-size: 1.2em; + color: var(--lcars-black); + margin: 10px 0 0 0; + } + .lcars-panel { + background: rgba(66, 66, 66, 0.9); + border: 2px solid var(--lcars-orange); + border-radius: 0 20px 0 20px; + padding: 15px; + margin-bottom: 15px; + } + .lcars-button { + background: var(--lcars-orange); + color: var(--lcars-black) !important; + border: none !important; + border-radius: 0 15px 0 15px !important; + padding: 10px 20px !important; + font-family: inherit !important; + font-weight: bold !important; + margin: 5px !important; + } + .lcars-button:hover { + background: var(--lcars-red) !important; + } + .lcars-input { + background: var(--lcars-black) !important; + color: var(--lcars-orange) !important; + border: 2px solid var(--lcars-blue) !important; + border-radius: 0 10px 0 10px !important; + padding: 10px !important; + } + .lcars-chatbot { + background: var(--lcars-black) !important; + border: 2px solid var(--lcars-purple) !important; + border-radius: 0 15px 0 15px !important; + } + .status-indicator { + display: inline-block; + width: 12px; + height: 12px; + border-radius: 50%; + background: var(--lcars-red); + margin-right: 8px; + } + .status-online { + background: var(--lcars-blue); + animation: pulse 2s infinite; + } + @keyframes pulse { + 0% { opacity: 1; } + 50% { opacity: 0.5; } + 100% { opacity: 1; } + } + """ + with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: + with gr.Column(elem_classes="lcars-container"): + # Header + with gr.Row(elem_classes="lcars-header"): + gr.Markdown(""" +
+
πŸš€ LCARS TERMINAL
+
STARFLEET AI DEVELOPMENT CONSOLE
+
+ + SYSTEM ONLINE +
+
+ """) + # Main Content + with gr.Row(): + # Left Sidebar + with gr.Column(scale=1): + # Configuration Panel + with gr.Column(elem_classes="lcars-panel"): + + pass + # Canvas Artifacts + with gr.Column(elem_classes="lcars-panel"): + gr.Markdown("""### 🎨 CANVAS ARTIFACTS""") + artifact_display = gr.JSON(label="") + with gr.Row(): + refresh_artifacts_btn = gr.Button("πŸ”„ Refresh", elem_classes="lcars-button") + clear_canvas_btn = gr.Button("πŸ—‘οΈ Clear Canvas", elem_classes="lcars-button") + # Main Content Area + with gr.Column(scale=2): + # Code Canvas + with gr.Accordion("πŸ’» COLLABORATIVE CODE CANVAS", open=False): + code_editor = gr.Code(interactive=True, + value="# Welcome to LCARS Collaborative Canvas\nprint('Hello, Starfleet!')", + language="python", + lines=15, + label="" + ) + with gr.Row(): + load_to_chat_btn = gr.Button("πŸ’¬ Discuss Code", elem_classes="lcars-button") + analyze_btn = gr.Button("πŸ” Analyze", elem_classes="lcars-button") + optimize_btn = gr.Button("⚑ Optimize", elem_classes="lcars-button") + # Chat Interface + with gr.Column(elem_classes="lcars-panel"): + gr.Markdown("""### πŸ’¬ MISSION LOG""") + chatbot = gr.Chatbot(label="", height=300) + with gr.Row(): + message_input = gr.Textbox( + placeholder="Enter your command or query...", + show_label=False, + lines=2, + scale=4 + ) + send_btn = gr.Button("πŸš€ SEND", elem_classes="lcars-button", scale=1) + # Status + with gr.Row(): + status_display = gr.Textbox( + value="LCARS terminal operational. Awaiting commands.", + label="Status", + max_lines=2 + ) + with gr.Column(scale=0): + clear_chat_btn = gr.Button("πŸ—‘οΈ Clear Chat", elem_classes="lcars-button") + new_session_btn = gr.Button("πŸ†• New Session", elem_classes="lcars-button") + + # Event handlers are connected here, no change needed + async def process_message(message, history, speech_enabled=True): + if not message.strip(): + return "", history, "Please enter a message" + history = history + [[message, None]] + try: + # Fixed: Uses the new chat_with_canvas method which includes canvas context + response = await self.chat_with_canvas( + message, self.current_conversation, include_canvas=True + ) + history[-1][1] = response + if speech_enabled and self.speech_enabled: + self.speak(response) + artifacts = self.get_canvas_summary(self.current_conversation) + status = f"βœ… Response received. Canvas artifacts: {len(artifacts)}" + return "", history, status, artifacts + except Exception as e: + error_msg = f"❌ Error: {str(e)}" + history[-1][1] = error_msg + return "", history, error_msg, self.get_canvas_summary(self.current_conversation) + + def get_artifacts(): + return self.get_canvas_summary(self.current_conversation) + + def clear_canvas(): + self.clear_canvas(self.current_conversation) + return [], "βœ… Canvas cleared" + + def clear_chat(): + self.clear_conversation(self.current_conversation) + return [], "βœ… Chat cleared" + + def new_session(): + self.clear_conversation(self.current_conversation) + self.clear_canvas(self.current_conversation) + return [], "# New session started\nprint('Ready!')", "πŸ†• New session started", [] + + # Connect events + send_btn.click(process_message, + inputs=[message_input, chatbot], + outputs=[message_input, chatbot, status_display, artifact_display]) + message_input.submit(process_message, + inputs=[message_input, chatbot], + outputs=[message_input, chatbot, status_display, artifact_display]) + refresh_artifacts_btn.click(get_artifacts, outputs=artifact_display) + clear_canvas_btn.click(clear_canvas, outputs=[artifact_display, status_display]) + clear_chat_btn.click(clear_chat, outputs=[chatbot, status_display]) + new_session_btn.click(new_session, outputs=[chatbot, code_editor, status_display, artifact_display]) + return interface + + def _register_event_handlers(self): + """Register internal event handlers for response routing""" + RegisterEvent("llm_internal_response", self._handle_internal_response) + + def _handle_internal_response(self, response: LLMResponse): + """Route responses to the appropriate request handlers""" + console.log(f"[bold cyan]Handling internal response for: {response.request_id}[/bold cyan]") + + request = None + with self.pending_requests_lock: + if response.request_id in self.pending_requests: + request = self.pending_requests[response.request_id] + del self.pending_requests[response.request_id] + console.log(f"Found pending request for: {response.request_id}") + else: + console.log(f"No pending request found for: {response.request_id}", style="yellow") + return + + # Raise the specific response event + if request.response_event: + console.log(f"[bold green]Raising event: {request.response_event}[/bold green]") + RaiseEvent(request.response_event, response) + + # Call callback if provided + if request.callback: + try: + console.log(f"[bold yellow]Calling callback for: {response.request_id}[/bold yellow]") + request.callback(response) + except Exception as e: + console.log(f"Error in callback: {e}", style="bold red") + + def _add_to_conversation_history(self, conversation_id: str, message: LLMMessage): + """Add message to conversation history""" + if conversation_id not in self.conversations: + self.conversations[conversation_id] = [] + + self.conversations[conversation_id].append(message) + + # Trim history if too long + if len(self.conversations[conversation_id]) > self.max_history_length * 2: + self.conversations[conversation_id] = self.conversations[conversation_id][-(self.max_history_length * 2):] + + def _build_messages_from_conversation(self, conversation_id: str, new_message: LLMMessage) -> List[Dict[str, str]]: + """Build message list from conversation history""" + messages = [] + + # Add system prompt + if self.system_prompt: + messages.append({"role": "system", "content": self.system_prompt}) + + # Add conversation history + if conversation_id in self.conversations: + for msg in self.conversations[conversation_id][-self.max_history_length:]: + messages.append({"role": msg.role, "content": msg.content}) + + # Add the new message + messages.append({"role": new_message.role, "content": new_message.content}) + + return messages + + def _process_llm_request(self, request: LLMRequest): + """Process a single LLM request""" + console.log(f"[bold green]Processing LLM request: {request.message.message_id}[/bold green]") + try: + # Build messages for LLM + messages = self._build_messages_from_conversation( + request.message.conversation_id or "default", + request.message + ) + + console.log(f"Calling LLM with {len(messages)} messages") + + # Call LLM - Use sync call for thread compatibility + response_content = self._call_llm_sync(messages) + + console.log(f"[bold green]LLM response received: {response_content}...[/bold green]") + + # Create response message + response_message = LLMMessage( + role="assistant", + content=response_content, + conversation_id=request.message.conversation_id, + metadata={"request_id": request.message.message_id} + ) + + # Update conversation history + self._add_to_conversation_history( + request.message.conversation_id or "default", + request.message + ) + self._add_to_conversation_history( + request.message.conversation_id or "default", + response_message + ) + + # Create and send response + response = LLMResponse( + message=response_message, + request_id=request.message.message_id, + success=True + ) + + console.log(f"[bold blue]Sending internal response for: {request.message.message_id}[/bold blue]") + RaiseEvent("llm_internal_response", response) + + except Exception as e: + console.log(f"[bold red]Error processing LLM request: {e}[/bold red]") + traceback.print_exc() + # Create error response + error_response = LLMResponse( + message=LLMMessage( + role="system", + content=f"Error: {str(e)}", + conversation_id=request.message.conversation_id + ), + request_id=request.message.message_id, + success=False, + error=str(e) + ) + + RaiseEvent("llm_internal_response", error_response) + + def _call_llm_sync(self, messages: List[Dict[str, str]]) -> str: + """Sync call to the LLM with retry logic""" + console.log(f"Making LLM call to {self.model_id}") + for attempt in range(self.max_retries): + try: + response = CLIENT.chat.completions.create( + model=self.model_id, + messages=messages, + temperature=self.temperature, + max_tokens=self.max_tokens + ) + content = response.choices[0].message.content + console.log(f"LLM call successful, response length: {len(content)}") + return content + except Exception as e: + console.log(f"LLM call attempt {attempt + 1} failed: {e}") + if attempt == self.max_retries - 1: + raise e + # Wait before retry + + def _process_queue(self): + """Main queue processing loop""" + console.log("[bold cyan]LLM Agent queue processor started[/bold cyan]") + while not self._stop_event.is_set(): + try: + request = self.request_queue.get(timeout=1.0) + if request: + console.log(f"Got request from queue: {request.message.message_id}") + self._process_llm_request(request) + self.request_queue.task_done() + except Empty: + continue + except Exception as e: + console.log(f"Error in queue processing: {e}", style="bold red") + traceback.print_exc() + console.log("[bold cyan]LLM Agent queue processor stopped[/bold cyan]") + + def send_message( + self, + content: str, + role: str = "user", + conversation_id: str = None, + response_event: str = None, + callback: Callable = None, + metadata: Dict = None + ) -> str: + """Send a message to the LLM and get response via events""" + if not self.is_running: + raise RuntimeError("LLM Agent is not running. Call start() first.") + + # Create message + message = LLMMessage( + role=role, + content=content, + conversation_id=conversation_id, + metadata=metadata or {} + ) + + # Create request + request = LLMRequest( + message=message, + response_event=response_event, + callback=callback + ) + + # Store in pending requests BEFORE adding to queue + with self.pending_requests_lock: + self.pending_requests[message.message_id] = request + console.log(f"Added to pending requests: {message.message_id}") + + # Add to queue + try: + self.request_queue.put(request, timeout=5.0) + console.log(f"[bold magenta]Message queued: {message.message_id}, Content: {content[:50]}...[/bold magenta]") + return message.message_id + except queue.Full: + console.log(f"[bold red]Queue full, cannot send message[/bold red]") + with self.pending_requests_lock: + if message.message_id in self.pending_requests: + del self.pending_requests[message.message_id] + raise RuntimeError("LLM Agent queue is full") + + async def chat(self, messages: List[Dict[str, str]]) -> str: + """ + Async chat method that sends message via queue and returns response string. + This is the main method you should use. + """ + # Create future for the response + loop = asyncio.get_event_loop() + response_future = loop.create_future() + + def chat_callback(response: LLMResponse): + """Callback when LLM responds - thread-safe""" + console.log(f"[bold yellow]βœ“ CHAT CALLBACK TRIGGERED![/bold yellow]") + + if not response_future.done(): + if response.success: + content = response.message.content + console.log(f"Callback received content: {content}...") + # Schedule setting the future result on the main event loop + loop.call_soon_threadsafe(response_future.set_result, content) + else: + console.log(f"Error in response: {response.error}") + error_msg = f"❌ Error: {response.error}" + loop.call_soon_threadsafe(response_future.set_result, error_msg) + else: + console.log(f"[bold red]Future already done, ignoring callback[/bold red]") + + console.log(f"Sending message to LLM agent...") + + # Extract the actual message content from the messages list + user_message = "" + for msg in messages: + if msg.get("role") == "user": + user_message = msg.get("content", "") + break + + if not user_message.strip(): + return "" + + # Send message with callback using the queue system + try: + message_id = self.send_message( + content=user_message, + conversation_id="default", + callback=chat_callback + ) + + console.log(f"Message sent with ID: {message_id}, waiting for response...") + + # Wait for the response and return it + try: + response = await asyncio.wait_for(response_future, timeout=self.timeout) + console.log(f"[bold green]βœ“ Chat complete! Response length: {len(response)}[/bold green]") + return response + + except asyncio.TimeoutError: + console.log("[bold red]Response timeout[/bold red]") + # Clean up the pending request + with self.pending_requests_lock: + if message_id in self.pending_requests: + del self.pending_requests[message_id] + return "❌ Response timeout - check if LLM server is running" + + except Exception as e: + console.log(f"[bold red]Error sending message: {e}[/bold red]") + traceback.print_exc() + return f"❌ Error sending message: {e}" + + def start(self): + """Start the LLM agent""" + if not self.is_running: + self.is_running = True + self._stop_event.clear() + self.processing_thread = Thread(target=self._process_queue, daemon=True) + self.processing_thread.start() + console.log("[bold green]LLM Agent started[/bold green]") + + def stop(self): + """Stop the LLM agent""" + console.log("Stopping LLM Agent...") + self._stop_event.set() + if self.processing_thread and self.processing_thread.is_alive(): + self.processing_thread.join(timeout=10) + self.is_running = False + console.log("LLM Agent stopped") + + def get_conversation_history(self, conversation_id: str = "default") -> List[LLMMessage]: + """Get conversation history""" + return self.conversations.get(conversation_id, [])[:] + + def clear_conversation(self, conversation_id: str = "default"): + """Clear conversation history""" + if conversation_id in self.conversations: + del self.conversations[conversation_id] + + + async def _chat(self, messages: List[Dict[str, str]]) -> str: + return await self._generate(messages) + + @staticmethod + async def openai_generate(messages: List[Dict[str, str]], max_tokens: int = 8096, temperature: float = 0.4, model: str = BASEMODEL_ID,tools=None) -> str: + """Static method for generating responses using OpenAI API""" + try: + resp = await BASE_CLIENT.chat.completions.create( + model=model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + tools=tools + ) + response_text = resp.choices[0].message.content or "" + return response_text + except Exception as e: + console.log(f"[bold red]Error in openai_generate: {e}[/bold red]") + return f"[LLM_Agent Error - openai_generate: {str(e)}]" + + async def _call_(self, messages: List[Dict[str, str]]) -> str: + """Internal call method using instance client""" + try: + resp = await self.async_client.chat.completions.create( + model=self.model_id, + messages=messages, + temperature=self.temperature, + max_tokens=self.max_tokens + ) + response_text = resp.choices[0].message.content or "" + return response_text + except Exception as e: + console.log(f"[bold red]Error in _call_: {e}[/bold red]") + return f"[LLM_Agent Error - _call_: {str(e)}]" + + @staticmethod + def CreateClient(base_url: str, api_key: str) -> AsyncOpenAI: + '''Create async OpenAI Client required for multi tasking''' + return AsyncOpenAI( + base_url=base_url, + api_key=api_key + ) + + @staticmethod + async def fetch_available_models(base_url: str, api_key: str) -> List[str]: + """Fetches available models from the OpenAI API.""" + try: + async_client = AsyncOpenAI(base_url=base_url, api_key=api_key) + models = await async_client.models.list() + model_choices = [model.id for model in models.data] + return model_choices + except Exception as e: + console.log(f"[bold red]LLM_Agent Error fetching models: {e}[/bold red]") + return ["LLM_Agent Error fetching models"] + + def get_models(self) -> List[str]: + """Get available models using instance credentials""" + return asyncio.run(self.fetch_available_models(self.base_url, self.api_key)) + + + def get_queue_size(self) -> int: + """Get current queue size""" + return self.request_queue.qsize() + + def get_pending_requests_count(self) -> int: + """Get number of pending requests""" + with self.pending_requests_lock: + return len(self.pending_requests) + + def get_status(self) : + """Get agent status information""" + return str({ + "is_running": self.is_running, + "queue_size": self.get_queue_size(), + "pending_requests": self.get_pending_requests_count(), + "conversations_count": len(self.conversations), + "model": self.model_id, "BaseURL": self.base_url + }) + + + def direct_chat(self, user_message: str, conversation_id: str = "default") -> str: + """ + Send a message and get a response using direct API call. + """ + try: + # Create message object + message = LLMMessage(role="user", content=user_message, conversation_id=conversation_id) + + # Build messages for LLM + messages = self._build_messages_from_conversation(conversation_id, message) + console.log(f"Calling LLM at {self.base_url} with {len(messages)} messages") + + # Make the direct API call + response = CLIENT.chat.completions.create( + model=self.model_id, + messages=messages, + temperature=self.temperature, + max_tokens=self.max_tokens + ) + response_content = response.choices[0].message.content + console.log(f"[bold green]LLM response received: {response_content[:50]}...[/bold green]") + + # Update conversation history + self._add_to_conversation_history(conversation_id, message) + response_message = LLMMessage(role="assistant", content=response_content, conversation_id=conversation_id) + self._add_to_conversation_history(conversation_id, response_message) + + return response_content + + except Exception as e: + console.log(f"[bold red]Error in chat: {e}[/bold red]") + traceback.print_exc() + return f"❌ Error communicating with LLM: {str(e)}" + + + # --- TEST Canvas Methods --- + def add_artifact(self, conversation_id: str, artifact_type: str, content: str, title: str = "", metadata: Dict = None): + artifact = CanvasArtifact( + id=str(uuid.uuid4()), + type=artifact_type, + content=content, + title=title, + timestamp=time.time(), + metadata=metadata or {} + ) + self.canvas_artifacts[conversation_id].append(artifact) + + def get_canvas_artifacts(self, conversation_id: str = "default") -> List[CanvasArtifact]: + return self.canvas_artifacts.get(conversation_id, []) + + def get_canvas_summary(self, conversation_id: str = "default") -> List[Dict[str, Any]]: + artifacts = self.get_canvas_artifacts(conversation_id) + return [{"id": a.id, "type": a.type, "title": a.title, "timestamp": a.timestamp} for a in artifacts] + + def clear_canvas(self, conversation_id: str = "default"): + if conversation_id in self.canvas_artifacts: + self.canvas_artifacts[conversation_id] = [] + + def clear_conversation(self, conversation_id: str = "default"): + if conversation_id in self.conversations: + del self.conversations[conversation_id] + + def get_latest_code_artifact(self, conversation_id: str) -> Optional[str]: + """Get the most recent code artifact content""" + if conversation_id not in self.canvas_artifacts: + return None + + for artifact in reversed(self.canvas_artifacts[conversation_id]): + if artifact.type == "code": + return artifact.content + return None + + def get_canvas_context(self, conversation_id: str) -> str: + """Get formatted canvas context for LLM prompts""" + if conversation_id not in self.canvas_artifacts or not self.canvas_artifacts[conversation_id]: + return "" + + context_lines = ["\n=== COLLABORATIVE CANVAS ARTIFACTS ==="] + for artifact in self.canvas_artifacts[conversation_id][-10:]: # Last 10 artifacts + context_lines.append(f"\n--- {artifact.title} [{artifact.type.upper()}] ---") + preview = artifact.content[:500] + "..." if len(artifact.content) > 500 else artifact.content + context_lines.append(preview) + + return "\n".join(context_lines) + "\n=================================\n" + def get_artifact_by_id(self, conversation_id: str, artifact_id: str) -> Optional[CanvasArtifact]: + """Get specific artifact by ID""" + if conversation_id not in self.canvas_artifacts: + return None + + for artifact in self.canvas_artifacts[conversation_id]: + if artifact.id == artifact_id: + return artifact + return None + def _extract_artifacts_to_canvas(self, response: str, conversation_id: str): + """Automatically extract code blocks and add to canvas""" + # Find all code blocks with optional language specification + code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', response, re.DOTALL) + for i, (lang, code_block) in enumerate(code_blocks): + if len(code_block.strip()) > 10: # Only add substantial code blocks + self.add_artifact_to_canvas( + conversation_id, + code_block.strip(), + "code", + f"code_snippet_{lang or 'unknown'}_{len(self.canvas_artifacts.get(conversation_id, [])) + 1}" + ) + + async def chat_with_canvas(self, message: str, conversation_id: str, include_canvas: bool = False): + """Chat method that can optionally include canvas context.""" + messages = [{"role": "user", "content": message}] + + if include_canvas: + artifacts = self.get_canvas_summary(conversation_id) + if artifacts: + canvas_context = "Current Canvas Context:\\n" + "\\n".join([ + f"- [{art['type'].upper()}] {art['title'] or 'Untitled'}: {art['content_preview']}" + for art in artifacts + ]) + messages.insert(0, {"role": "system", "content": canvas_context}) + + return await self.chat(messages) + def respond( message, @@ -1127,651 +2082,775 @@ button[variant="primary"] { """ -with gr.Blocks( - title="πŸš€ L.C.A.R.S - Local Computer Advanced Reasoning System", - theme='Yntec/HaleyCH_Theme_Orange_Green',css=custom_css - ) as demo: - # Header - # State management - history_state = gr.State([]) +# Session management +SESSION_FILE = "lcars_session.pkl" +ARTIFACTS_FILE = "lcars_artifacts.json" + +# Initialize the agent +agent = LLMAgent( + model_id=BASEMODEL_ID, + system_prompt="You are L.C.A.R.S - Local Computer Advanced Reasoning System, an advanced AI assistant with capabilities for code generation, analysis, and collaborative problem solving.", + temperature=0.7, + max_tokens=5000 +) + +@dataclass +class ParsedResponse: + """Fixed ParsedResponse data model""" + def __init__(self, thinking="", main_content="", code_snippets=None, raw_reasoning="", raw_content=""): + self.thinking = thinking + self.main_content = main_content + self.code_snippets = code_snippets or [] + self.raw_reasoning = raw_reasoning + self.raw_content = raw_content +def execute_python_code(code): + """Execute Python code safely and return output""" + try: + # Create a temporary file + temp_file = "temp_execution.py" + with open(temp_file, 'w', encoding='utf-8') as f: + f.write(code) - # ============================================ - # HEADER SECTION - # ============================================ - with gr.Row(): - with gr.Column(scale=1): - gr.Image( - value="https://cdn-avatars.huggingface.co/v1/production/uploads/65d883893a52cd9bcd8ab7cf/tRsCJlHNZo1D02kBTmfy9.jpeg", - elem_id="lcars_logo", - height=200, show_download_button=False,container=False, - width=200 - ) - with gr.Column(scale=3): - gr.HTML(f""" -
- πŸ–₯️ L.C.A.R.S - Local Computer Advanced Reasoning System -
USS Enterprise β€’ NCC-1701-D β€’ Starfleet Command -
- """) + # Execute the code + result = subprocess.run( + [sys.executable, temp_file], + capture_output=True, + text=True, + timeout=30 # 30 second timeout + ) - - # ============================================ - # MAIN INTERFACE TABS - # ============================================ - with gr.Tabs(): + # Clean up + if os.path.exists(temp_file): + os.remove(temp_file) + + output = "" + if result.stdout: + output += f"**Output:**\n{result.stdout}\n" + if result.stderr: + output += f"**Errors:**\n{result.stderr}\n" + if result.returncode != 0: + output += f"**Return code:** {result.returncode}\n" + + return output.strip() if output else "Code executed (no output)" + + except subprocess.TimeoutExpired: + return "❌ Execution timed out (30 seconds)" + except Exception as e: + return f"❌ Execution error: {str(e)}" + + +def execute_code_artifact(artifact_id, current_code): + """Execute a specific code artifact""" + try: + artifacts = agent.get_canvas_artifacts(agent.current_conversation) + if not artifacts: + return "No artifacts available", current_code + + try: + artifact_idx = int(artifact_id) + if 0 <= artifact_idx < len(artifacts): + artifact = artifacts[artifact_idx] + if artifact.type == "code": + # Execute the code + execution_result = execute_python_code(artifact.content) + display_text = f"## πŸš€ Executing Artifact #{artifact_idx}\n\n**Title:** {artifact.title}\n\n**Execution Result:**\n{execution_result}" + return display_text, artifact.content + else: + return f"❌ Artifact {artifact_idx} is not code (type: {artifact.type})", current_code + else: + return f"❌ Invalid artifact ID. Available: 0-{len(artifacts)-1}", current_code + except ValueError: + return "❌ Please enter a valid numeric artifact ID", current_code + + except Exception as e: + return f"❌ Error: {str(e)}", current_code + +def execute_current_code(code): + """Execute the code currently in the editor""" + try: + if not code.strip(): + return "❌ No code to execute", code + + execution_result = execute_python_code(code) + display_text = f"## πŸš€ Code Execution Result\n\n{execution_result}" + + return display_text, code + except Exception as e: + return f"❌ Execution error: {str(e)}", code +def save_session(): + """Save current session to disk""" + try: + session_data = { + 'conversations': agent.conversations, + 'current_conversation': agent.current_conversation, + 'canvas_artifacts': dict(agent.canvas_artifacts), + 'history': getattr(agent, 'display_history', []) + } + with open(SESSION_FILE, 'wb') as f: + pickle.dump(session_data, f) + print(f"πŸ’Ύ Session saved to {SESSION_FILE}") + return True + except Exception as e: + print(f"❌ Error saving session: {e}") + return False + +def load_session(): + """Load session from disk""" + try: + if os.path.exists(SESSION_FILE): + with open(SESSION_FILE, 'rb') as f: + session_data = pickle.load(f) - # ============================================ - # L.C.A.R.S MAIN CHAT TAB (Enhanced) - # ============================================ - with gr.TabItem(label="πŸ€– L.C.A.R.S Chat Intelligence", elem_id="lcars_main_tab"): + agent.conversations = session_data.get('conversations', {}) + agent.current_conversation = session_data.get('current_conversation', 'default') + agent.canvas_artifacts = defaultdict(list, session_data.get('canvas_artifacts', {})) + agent.display_history = session_data.get('history', []) + + print(f"πŸ“‚ Session loaded from {SESSION_FILE}") + return True + else: + print("πŸ“‚ No existing session found, starting fresh") + return False + except Exception as e: + print(f"❌ Error loading session: {e}") + return False + +def save_artifacts(): + """Save artifacts to JSON file""" + try: + artifacts_data = [] + for conv_id, artifacts in agent.canvas_artifacts.items(): + for artifact in artifacts: + artifacts_data.append({ + 'conversation_id': conv_id, + 'id': artifact.id, + 'type': artifact.type, + 'content': artifact.content, + 'title': artifact.title, + 'timestamp': artifact.timestamp, + 'metadata': artifact.metadata + }) + + with open(ARTIFACTS_FILE, 'w', encoding='utf-8') as f: + json.dump(artifacts_data, f, indent=2, ensure_ascii=False) + + print(f"πŸ’Ύ Artifacts saved to {ARTIFACTS_FILE}") + return True + except Exception as e: + print(f"❌ Error saving artifacts: {e}") + return False + +def load_artifacts(): + """Load artifacts from JSON file""" + try: + if os.path.exists(ARTIFACTS_FILE): + with open(ARTIFACTS_FILE, 'r', encoding='utf-8') as f: + artifacts_data = json.load(f) + + agent.canvas_artifacts.clear() + for artifact_data in artifacts_data: + conv_id = artifact_data['conversation_id'] + artifact = CanvasArtifact( + id=artifact_data['id'], + type=artifact_data['type'], + content=artifact_data['content'], + title=artifact_data['title'], + timestamp=artifact_data['timestamp'], + metadata=artifact_data.get('metadata', {}) + ) + agent.canvas_artifacts[conv_id].append(artifact) + + print(f"πŸ“‚ Artifacts loaded from {ARTIFACTS_FILE}") + return True + else: + print("πŸ“‚ No existing artifacts found") + return False + except Exception as e: + print(f"❌ Error loading artifacts: {e}") + return False + +def parse_llm_response(response_text): + """Parse LLM response to extract thinking, content, and code snippets""" + parsed = ParsedResponse() + parsed.raw_content = response_text + + # Patterns for different response components + thinking_patterns = [ + r'🧠[^\n]*?(.*?)(?=πŸ€–|πŸ’»|πŸš€|$)', # 🧠 thinking section + r'Thinking:[^\n]*?(.*?)(?=Response:|Answer:|$)', # Thinking: section + r'Reasoning:[^\n]*?(.*?)(?=Response:|Answer:|$)', # Reasoning: section + ] + + # Try to extract thinking/reasoning + thinking_content = "" + for pattern in thinking_patterns: + thinking_match = re.search(pattern, response_text, re.IGNORECASE | re.DOTALL) + if thinking_match: + thinking_content = thinking_match.group(1).strip() + break + + if thinking_content: + parsed.thinking = thinking_content + parsed.raw_reasoning = thinking_content + # Remove thinking from main content + main_content = re.sub(pattern, '', response_text, flags=re.IGNORECASE | re.DOTALL).strip() + else: + main_content = response_text + + # Extract code snippets + code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', main_content, re.DOTALL) + parsed.code_snippets = [] + + for lang, code in code_blocks: + if code.strip(): + parsed.code_snippets.append({ + 'language': lang or 'text', + 'code': code.strip(), + 'description': f"Code snippet ({lang or 'unknown'})" + }) + + # Remove code blocks from main content for cleaner display + clean_content = re.sub(r'```.*?```', '', main_content, flags=re.DOTALL) + clean_content = re.sub(r'`.*?`', '', clean_content) + parsed.main_content = clean_content.strip() + + return parsed - with gr.Row(): - # LEFT COLUMN - INPUT & CONTROLS - with gr.Column(scale=2): - gr.HTML(f"
🧠 REASONING PROCESS
") - with gr.Accordion(label="🧠 Current AI Reasoning", open=False): - thinking_html = gr.HTML(label="AI Thought Process", show_label=True) - - # Main chat input - message = gr.Textbox( - show_copy_button=True, - lines=6, - label="πŸ’¬ Ask L.C.A.R.S", - placeholder="Enter your message to the Local Computer Advanced Reasoning System..." - ) - - # Control buttons - with gr.Row(): - submit_btn = gr.Button("πŸš€ Ask L.C.A.R.S", variant="huggingface", size="lg") - clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="huggingface") - - # Audio controls - with gr.Row(): - speak_response = gr.Checkbox(label="πŸ”Š Speak Response", value=False) - speak_thoughts = gr.Checkbox(label="🧠 Speak Reasoning", value=False) - - # Quick Actions - with gr.Accordion(label="⚑ Utilitys Quick Actions", open=False): - with gr.Row(): - artifact_id_input = gr.Textbox( - label="Artifact ID", - placeholder="Artifact ID (0, 1, 2)", - scale=2 - ) - execute_artifact_btn = gr.Button("▢️ Execute Artifact", variant="huggingface") - - with gr.Row(): - batch_artifact_ids = gr.Textbox( - label="Batch Execute IDs", - placeholder="e.g., 0,1 or 0-5", - scale=2 - ) - batch_execute_btn = gr.Button("⚑Batch Execute", variant="huggingface") - - # MIDDLE COLUMN - RESPONSES - with gr.Column(scale=2): - gr.HTML(f"
SYSTEM RESPONSE
") - - with gr.Accordion(label="πŸ€– Library Computer Advanced Reasoning System", open=True): - plain_text_output = gr.Markdown( - container=True, - show_copy_button=True, - label="AI Response", - height=400 - ) - - execution_output = gr.HTML(f"
🧠 Execution Results
") - gr.HTML(f"
Current Session
") - - # Enhanced Chat History Display - with gr.Accordion(label="πŸ“œ Session Chat History", open=False): - chat_history_display = gr.HTML(label="Full Session History", show_label=True) - # Artifacts Display - with gr.Accordion(label="πŸ“Š Current Session Artifacts", open=False): - artifacts_display = gr.HTML(label="Generated Artifacts Timeline", show_label=True) - - # RIGHT COLUMN - ENHANCED CODE ARTIFACTS - with gr.Column(scale=2): - gr.HTML(f"
🧱 ENHANCED CODE ARTIFACTS WORKSHOP
") - - with gr.Accordion(label="🧱 Code Artifacts Workshop", open=True): - # Enhanced Code Editor with save functionality - code_artifacts = gr.Code( - language="python", - label="Generated Code & Artifacts", - lines=10, - interactive=True, - autocomplete=True, - show_line_numbers=True, - elem_id="code_editor" - ) - - # Enhanced Artifact Controls - with gr.Row(): - artifact_description = gr.Textbox( - label="Artifact Description", - placeholder="Brief description of the code...", - scale=2 - ) - artifact_language = gr.Dropdown( - choices=["python", "javascript", "html", "css", "bash", "sql", "json"], - value="python", - label="Language", - scale=1 - ) - - with gr.Row(): - ExecuteCodePad_btn = gr.Button("▢️ Execute Code", variant="huggingface") - CreateArtifact_btn = gr.Button("🧱 Save Artifact", variant="huggingface") - - with gr.Row(): - LoadArtifact_btn = gr.Button("πŸ“‚ Load Artifact", variant="huggingface") - Load_artifact_id_input = gr.Textbox( - label="Artifact ID", - placeholder="ID to Load", - scale=1 - ) - - - with gr.TabItem("πŸ› οΈ Task Enabled AI"): - gr.HTML(f"
πŸ› οΈ Function Tools
") - with gr.Row(): - with gr.Column(): - tool_model_name = gr.Textbox( - label="Tool-Capable Model", - value="leroydyer/lcars/qwen3-0.6b-q4_k_m.gguf" - ) - tool_prompt = gr.Textbox( - label="Tool Request", - value="What is the result of 123 + 456? Use the add tool.", - lines=4 - ) - tool_call_btn = gr.Button("πŸ› οΈ Execute Tool Call", variant="huggingface") - - with gr.Column(): - tool_output = gr.Textbox(label="Tool Execution Results", lines=15) - - with gr.TabItem("πŸ’» System Tools", elem_id="system_tab"): - gr.HTML(f"
πŸ’» System Tools
") - with gr.Tabs(): - - # Tab 1: Code Fragment Analysis - with gr.TabItem("πŸ“ Code Fragment"): - gr.Markdown("### Analyze a Python code snippet") - with gr.Row(): - with gr.Column(scale=2): - code_input = gr.Code( - language="python", - label="Python Code", - lines=15 - ) - fragment_name = gr.Textbox( - label="Fragment Name (optional)", - placeholder="my_code_snippet", - value="code_fragment" - ) - analyze_fragment_btn = gr.Button("πŸ” Analyze Code Fragment", variant="huggingface") - with gr.Column(scale=3): - fragment_output = gr.Markdown(label="### πŸ” Analysis Results") - code_output = gr.Markdown("Results will appear here") - fragment_download = gr.File(label="πŸ“₯ Download Report") - - # Tab 2: Single File Analysis - with gr.TabItem("πŸ“„ Single File"): - gr.Markdown("### Analyze a single Python file") - - with gr.Row(): - with gr.Column(): - file_input = gr.File( - label="Upload Python File", - file_types=[".py"], - type="filepath" - ) - analyze_file_btn = gr.Button("πŸ” Analyze File", variant="huggingface") - - with gr.Column(scale=2): - gr.Markdown("### πŸ” Analysis Results") - single_file_output = gr.Markdown("Results will appear here") +def extract_artifacts_from_response(parsed_response, conversation_id): + """Extract and save artifacts from parsed response""" + artifacts_created = [] - file_output = gr.Markdown(label="Analysis Results") - file_download = gr.File(label="πŸ“₯ Download Report") - - # Tab 3: Directory Analysis - with gr.TabItem("πŸ“ Directory"): - with gr.Row(): - gr.HTML(f""" -
- πŸ–₯️ Analyze an entire directory/project + # Save code snippets as artifacts + for i, snippet in enumerate(parsed_response.code_snippets): + agent.add_artifact( + conversation_id=conversation_id, + artifact_type="code", + content=snippet['code'], + title=f"code_snippet_{snippet['language']}_{i}", + metadata={ + "language": snippet['language'], + "description": snippet.get('description', ''), + "source": "llm_response" + } + ) + artifacts_created.append(f"code_snippet_{i}") + + # Save thinking as a text artifact if substantial + if len(parsed_response.thinking) > 50: + agent.add_artifact( + conversation_id=conversation_id, + artifact_type="text", + content=parsed_response.thinking, + title="reasoning_process", + metadata={"type": "reasoning", "source": "llm_response"} + ) + artifacts_created.append("reasoning") + + return artifacts_created +def process_lcars_message(message, history, speak_response=False): + """Process messages using the LLMAgent and parse responses""" + if not message.strip(): + return "", history, "Please enter a message", [] + + try: + # Add user message to displayed history + new_history = history + [[message, ""]] + + # Use the agent's direct_chat method + raw_response = agent.direct_chat(message, agent.current_conversation) + + # Parse the response + parsed_response = parse_llm_response(raw_response) + + # Extract and save artifacts from the response + artifacts_created = extract_artifacts_from_response(parsed_response, agent.current_conversation) + + # Update the history with the main content + display_content = parsed_response.main_content + if parsed_response.code_snippets: + display_content += "\n\n**Code Snippets Generated:**" + for i, snippet in enumerate(parsed_response.code_snippets): + display_content += f"\n```{snippet['language']}\n{snippet['code']}\n```" + + new_history[-1][1] = display_content + + # Speak response if enabled + if speak_response and agent.speech_enabled: + agent.speak(parsed_response.main_content) + + # Get artifacts for display + artifacts = agent.get_canvas_summary(agent.current_conversation) + status = f"βœ… Response parsed. Artifacts created: {len(artifacts_created)} | Total: {len(artifacts)}" + + return "", new_history, status, artifacts, parsed_response.thinking + + except Exception as e: + error_msg = f"❌ Error: {str(e)}" + new_history = history + [[message, error_msg]] + return "", new_history, error_msg, agent.get_canvas_summary(agent.current_conversation), "" + +def update_chat_display(history): + """Convert history to formatted HTML for display""" + if not history: + return "
No messages yet
" + + html = "
" + for i, (user_msg, bot_msg) in enumerate(history): + html += f""" +
+
+ πŸ‘€ You: {user_msg}
- """) - - with gr.Column(): - gr.HTML(f""" -
-
ℹ️ Directory Scanning Tips - - - Default excluded dirs: `__pycache__`, `.git`, `.venv` -
+
+ πŸ€– L.C.A.R.S: {bot_msg}
- """) - dir_input = gr.Textbox( - label="Directory Path", - placeholder="/path/to/your/project", - lines=1 - ) - with gr.Row(): - recurse_check = gr.Checkbox(label="Include subdirectories", value=True) - analyze_dir_btn = gr.Button("πŸ” Analyze Directory", variant="huggingface") - exclude_dirs_input = gr.Textbox( - label="Directories to Exclude (comma separated)", - placeholder="tests, docs, examples", - lines=1 - ) - with gr.Column(): - gr.Markdown("### πŸ“Š Scan Summary") - summary_output = gr.Markdown("Scan results will appear here") - with gr.Column(): - gr.Markdown("### πŸ” Detailed Results") - detailed_output = gr.Markdown("Detailed errors and fixes will appear here") - dir_output = gr.Markdown(label="Analysis Results") - dir_download = gr.File(label="πŸ“₯ Download Report") - +
+ """ + html += "
" + return html + +def update_artifacts_display(): + """Get formatted artifacts display""" + artifacts = agent.get_canvas_artifacts(agent.current_conversation) + if not artifacts: + return "
No artifacts generated yet
" + + html = "
" + for i, artifact in enumerate(artifacts[-10:]): # Last 10 artifacts + type_icon = { + "code": "πŸ’»", + "text": "πŸ“", + "diagram": "πŸ“Š", + "image": "πŸ–ΌοΈ" + }.get(artifact.type, "πŸ“„") + + html += f""" +
+ {type_icon} {artifact.title} (#{i}) +
Type: {artifact.type} | Time: {time.ctime(artifact.timestamp)} +
+ {artifact.content[:150]}{'...' if len(artifact.content) > 150 else ''} +
+
+ """ + html += "
" + return html + +def get_plain_text_response(history): + """Extract the latest bot response for plain text display""" + if not history: + return "## πŸ€– L.C.A.R.S Response\n\n*Awaiting your query...*" + + last_exchange = history[-1] + if len(last_exchange) >= 2 and last_exchange[1]: + return f"## πŸ€– L.C.A.R.S Response\n\n{last_exchange[1]}" + else: + return "## πŸ€– L.C.A.R.S Response\n\n*Processing...*" + +def execute_code_artifact(artifact_id, current_code): + """Execute a specific code artifact""" + try: + artifacts = agent.get_canvas_artifacts(agent.current_conversation) + if not artifacts: + return "No artifacts available", current_code + + try: + artifact_idx = int(artifact_id) + if 0 <= artifact_idx < len(artifacts): + artifact = artifacts[artifact_idx] + if artifact.type == "code": + # Return the code to display in the editor + display_text = f"## πŸ“‹ Loaded Artifact #{artifact_idx}\n\n**Title:** {artifact.title}\n\n**Code:**\n```python\n{artifact.content}\n```" + return display_text, artifact.content + else: + return f"❌ Artifact {artifact_idx} is not code (type: {artifact.type})", current_code + else: + return f"❌ Invalid artifact ID. Available: 0-{len(artifacts)-1}", current_code + except ValueError: + return "❌ Please enter a valid numeric artifact ID", current_code - with gr.TabItem(label="πŸ“Š Enhanced Session & Artifact Management"): - with gr.Row(): - # Session Management Column - with gr.Column(): - gr.HTML(f"
πŸ“œ SESSION MANAGEMENT
") - - session_status = gr.Textbox(label="Session Status", value="Ready", interactive=False) - - with gr.Row(): - session_name_input = gr.Textbox( - label="Session Name", - placeholder="Leave empty for auto-naming (NewSession1, NewSession2...)", - scale=2 - ) - merge_session_checkbox = gr.Checkbox( - label="Merge Mode", - value=False, - info="Merge with current session instead of replacing" - ) - - with gr.Row(): - save_session_btn = gr.Button("πŸ’Ύ Save Session", variant="huggingface") - load_session_btn = gr.Button("πŸ“‚ Load Session", variant="huggingface") - - session_dropdown = gr.Dropdown( - label="Available Sessions", - choices=["none"], - interactive=True, - info="Select session to load" - ) - - with gr.Row(): - load_all_sessions_btn = gr.Button("πŸ“š Load All Sessions", variant="huggingface") - refresh_sessions_btn = gr.Button("πŸ”„ Refresh Sessions", variant="huggingface") + except Exception as e: + return f"❌ Error: {str(e)}", current_code + +def create_code_artifact(code, description, language): + """Create a new code artifact""" + try: + if not code.strip(): + return "❌ No code provided", code + + agent.add_artifact( + conversation_id=agent.current_conversation, + artifact_type="code", + content=code, + title=description or f"Code_{len(agent.get_canvas_artifacts(agent.current_conversation))}", + metadata={"language": language, "description": description} + ) + + artifacts_count = len(agent.get_canvas_artifacts(agent.current_conversation)) + return f"βœ… Code artifact saved! Total artifacts: {artifacts_count}", code + + except Exception as e: + return f"❌ Error saving artifact: {str(e)}", code + +def clear_current_chat(): + """Clear the current conversation""" + agent.clear_conversation(agent.current_conversation) + empty_history = [] + status_msg = "βœ… Chat cleared" + plain_text = "## πŸ€– L.C.A.R.S Response\n\n*Chat cleared*" + chat_display = update_chat_display(empty_history) + artifacts_display = update_artifacts_display() + + return empty_history, plain_text, status_msg, chat_display, artifacts_display, "" + +def new_session(): + """Start a new session""" + agent.clear_conversation(agent.current_conversation) + agent.clear_canvas(agent.current_conversation) + + new_code = "# New L.C.A.R.S Session Started\nprint('πŸš€ Local Computer Advanced Reasoning System Online')\nprint('πŸ€– All systems nominal - Ready for collaboration')" + empty_history = [] + status_msg = "πŸ†• New session started" + plain_text = "## πŸ€– L.C.A.R.S Response\n\n*New session started*" + chat_display = update_chat_display(empty_history) + artifacts_display = update_artifacts_display() + + return empty_history, new_code, plain_text, status_msg, chat_display, artifacts_display, "" +def update_model_settings(base_url, api_key, model_id, temperature, max_tokens): + """Update agent model settings""" + try: + agent.base_url = base_url + agent.api_key = api_key + agent.model_id = model_id + agent.temperature = float(temperature) + agent.max_tokens = int(max_tokens) + + # Recreate client with new settings + agent.async_client = agent.CreateClient(base_url, api_key) + + return f"βœ… Model settings updated: {model_id} | Temp: {temperature} | Max tokens: {max_tokens}" + except Exception as e: + return f"❌ Error updating settings: {str(e)}" + +async def fetch_models(base_url, api_key): + """Fetch available models from the API""" + try: + models = await agent.fetch_available_models(base_url, api_key) + return gr.Dropdown(choices=models, value=models[0] if models else "") + except Exception as e: + print(f"Error fetching models: {e}") + return gr.Dropdown(choices=[], value="") + +# Create the Gradio interface +with gr.Blocks( + title="πŸš€ L.C.A.R.S - Local Computer Advanced Reasoning System", + theme='Yntec/HaleyCH_Theme_Orange_Green', + css=custom_css + ) as demo: + + # State management + history_state = gr.State([]) + with gr.Sidebar(label = "Settings"): + gr.HTML("
βš™οΈ MODEL SETTINGS
") - # Artifact Management Column - with gr.Column(): - gr.HTML(f"
🧱 ARTIFACT MANAGEMENT
") - - artifact_status = gr.Textbox(label="Artifact Status", value="Ready", interactive=False) - - with gr.Row(): - artifact_session_input = gr.Textbox( - label="Artifact Session Name", - placeholder="Leave empty to load all artifacts", - scale=2 - ) - merge_artifacts_checkbox = gr.Checkbox( - label="Merge Artifacts", - value=True, - info="Add to current artifacts instead of replacing" - ) - - with gr.Row(): - load_artifacts_btn = gr.Button("πŸ“‚ Load Artifacts", variant="huggingface") - #save_artifacts_btn = gr.Button("πŸ’Ύ Save Artifacts", variant="huggingface") - - artifact_dropdown = gr.Dropdown( - label="Available Artifact Files", - choices=["none"], - interactive=True, - info="Select artifact file to load" + with gr.Accordion("πŸ”§ Configuration", open=True): + base_url = gr.Textbox( + value=agent.base_url, + label="Base URL", + placeholder="http://localhost:1234/v1" ) - - with gr.Row(): - load_all_artifacts_btn = gr.Button("πŸ“š Load All Artifacts", variant="huggingface") - refresh_artifacts_btn = gr.Button("πŸ”„ Refresh Artifacts", variant="huggingface") - - - with gr.TabItem(label="πŸ“Š Session & Artifact Browser"): - with gr.Row(): - session_info = gr.JSON(label="Session Details", value=[], elem_classes=["metadata-display"]) - artifact_info = gr.JSON(label="Artifact Details", value=[], elem_classes=["metadata-display"]) - - - - - - - - - - with gr.Tab(label="πŸ“‚ Directory to JSON Extractor", elem_id="directory_extractor_tab"): - - def ExtractDirectoryToJson(directory_path="dump", extension='.txt', json_file_path="_Data.json"): - def extract_data_from_files(directory_path, Extension='.md', max_seq_length=2048): - import os - import json - Json_list = [] - - # Check if directory exists - if not os.path.exists(directory_path): - return f"Error: Directory '{directory_path}' does not exist." - - # Check if directory is empty - if not os.listdir(directory_path): - return f"Error: Directory '{directory_path}' is empty." - - # Iterate over each file in the directory - for file_name in os.listdir(directory_path): - if file_name.endswith(Extension): - file_path = os.path.join(directory_path, file_name) - try: - with open(file_path, 'r', encoding='utf-8') as file: - chunk = file.read() - chunks = [chunk[i:i+max_seq_length] for i in range(0, len(chunk), max_seq_length)] - for text_seg in chunks: - Json_list.append({'DocumentTitle': file_name, 'Text': text_seg}) - except Exception as e: - return f"Error reading file {file_name}: {str(e)}" - return Json_list - - def save_to_json(data, json_file): - import os - import json - try: - with open(json_file, 'w') as f: - json.dump(data, f, indent=4) - return True, None - except Exception as e: - return False, str(e) - - # Extract file contents - _data = extract_data_from_files(directory_path, Extension=extension) - - # Check if we got an error message instead of data - if isinstance(_data, str): - return _data - - # Save data to JSON file - success, error = save_to_json(_data, json_file_path) - if success: - return f"File extraction completed. JSON file saved to: {json_file_path}" - else: - return f"Error saving JSON file: {error}" - - # Gradio Interface - def process_directory(directory_path, extension, json_filename): - # Validate inputs - if not directory_path: - return "Please select a directory" - if not extension: - return "Please specify a file extension" - if not json_filename: - json_filename = "_Data.json" - - # Ensure extension starts with a dot - if not extension.startswith('.'): - extension = '.' + extension - - # Call the main function - result = ExtractDirectoryToJson( - directory_path=directory_path, - extension=extension, - json_file_path=json_filename - ) - return result - - - - FileTypes=['.txt','.md','.json','.py'] - - gr.Markdown("## Directory to JSON Extractor") - gr.Markdown("Extract text from files in a directory and save as JSON") - - with gr.Row(): - directory_input = gr.Textbox(label="Directory Path", placeholder="Path to directory containing files") - directory_btn = gr.Button("Browse") - - with gr.Row(): - extension_input = gr.Dropdown(label="File Extension",choices=FileTypes, interactive=True) - json_output = gr.Textbox(label="Output JSON Filename", value="_Data.json") - - submitProcessDirectory_btn = gr.Button("Process Directory") - output_text = gr.JSON(label="Output") - - - - # Main Crawling Tab - with gr.Tab("πŸ” Crawl Repository"): - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("## 🎯 Source Configuration") - - source_type = gr.Dropdown( - choices=["GitHub", "Local", "Hugging Face"], - label="Source Type", - value="GitHub" + api_key = gr.Textbox( + value=agent.api_key, + label="API Key", + placeholder="not-needed for local models", + type="password" ) - - # GitHub settings - with gr.Group(visible=True) as github_group: - repo_url = gr.Textbox( - label="GitHub Repository URL", - placeholder="https://github.com/owner/repo", - value="" - ) - github_token = gr.Textbox( - label="GitHub Token (optional)", - type="password", - placeholder="ghp_..." - ) - - # Local settings - with gr.Group(visible=False) as local_group: - local_path = gr.Textbox( - label="Local Directory Path", - placeholder="/path/to/directory", - value="" - ) - - # Hugging Face settings - with gr.Group(visible=False) as hf_group: - hf_repo_id = gr.Textbox( - label="Hugging Face Repository ID", - placeholder="microsoft/DialoGPT-medium", - value="" - ) - hf_repo_type = gr.Dropdown( - choices=["model", "dataset", "space"], - label="Repository Type", - value="model" - ) - hf_token = gr.Textbox( - label="Hugging Face Token (optional)", - type="password", - placeholder="hf_..." - ) - - gr.Markdown("## βš™οΈ Crawling Options") - - max_file_size = gr.Number( - label="Max File Size (MB)", - value=1, + model_id = gr.Dropdown( + value=agent.model_id, + label="Model", + choices=[agent.model_id], + allow_custom_value=True + ) + temperature = gr.Slider( + value=agent.temperature, minimum=0.1, - maximum=100 + maximum=2.0, + step=0.1, + label="Temperature" ) - - include_patterns = gr.Textbox( - label="Include Patterns (comma-separated)", - placeholder="*.py, *.js, *.md", - value="" + max_tokens = gr.Slider( + value=agent.max_tokens, + minimum=100, + maximum=10000, + step=100, + label="Max Tokens" ) - exclude_patterns = gr.Textbox( - label="Exclude Patterns (comma-separated)", - placeholder="*.pyc, __pycache__/*, .git/*", - value="" - ) + with gr.Row(): + update_settings_btn = gr.Button("πŸ”„ Update Settings", variant="primary") + fetch_models_btn = gr.Button("πŸ“‹ Fetch Models", variant="secondary") - use_relative_paths = gr.Checkbox( - label="Use Relative Paths", - value=True + # ============================================ + # HEADER SECTION + # ============================================ + with gr.Row(): + with gr.Column(scale=1): + gr.Image( + value="https://cdn-avatars.huggingface.co/v1/production/uploads/65d883893a52cd9bcd8ab7cf/tRsCJlHNZo1D02kBTmfy9.jpeg", + elem_id="lcars_logo", + height=200, + show_download_button=False, + container=False, + width=200 + ) + with gr.Column(scale=3): + gr.HTML(f""" +
+ πŸ–₯️ L.C.A.R.S - Local Computer Advanced Reasoning System +
USS Enterprise β€’ NCC-1701-D β€’ Starfleet Command +
+ """) + + # ============================================ + # MAIN INTERFACE TABS + # ============================================ + with gr.Tabs(): + + # ============================================ + # L.C.A.R.S MAIN CHAT TAB (Enhanced) + # ============================================ + with gr.TabItem(label="πŸ€– L.C.A.R.S Chat Intelligence", elem_id="lcars_main_tab"): + with gr.Row(): + # LEFT COLUMN - INPUT & CONTROLS + with gr.Column(scale=2): + gr.HTML("
🧠 REASONING PROCESS
") + with gr.Accordion(label="🧠 AI Reasoning & Thinking", open=True): + thinking_display = gr.Markdown( + value="*AI reasoning will appear here during processing...*", + label="Thought Process", + show_label=True, + height=200 ) - - crawl_btn = gr.Button("πŸš€ Start Crawling", variant="primary", size="lg") - with gr.Column(scale=2): - gr.Markdown("## πŸ“Š Results") - - results_summary = gr.Textbox( - label="Crawling Summary", - lines=8, - interactive=False - ) - - file_list = gr.Dataframe( - label="Files Found", - headers=["File Path", "Size (chars)", "Type", "Lines"], - interactive=False, - wrap=True - ) - - # File Browser Tab - with gr.Tab("πŸ“ File Browser"): - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("## πŸ“‚ File Selection") - - selected_file = gr.Dropdown( - label="Select File", - choices=[], - interactive=True, - allow_custom_value=True + # Main chat input + message = gr.Textbox( + show_copy_button=True, + lines=3, + label="πŸ’¬ Ask L.C.A.R.S", + placeholder="Enter your message to the Local Computer Advanced Reasoning System..." + ) + + # Control buttons + with gr.Row(): + submit_btn = gr.Button("πŸš€ Ask L.C.A.R.S", variant="primary", size="lg") + clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", variant="secondary") + new_session_btn = gr.Button("πŸ†• New Session", variant="secondary") + + # Audio controls + with gr.Row(): + speak_response = gr.Checkbox(label="πŸ”Š Speak Response", value=False) + + # Quick Actions + with gr.Accordion(label="⚑ Utility Quick Actions", open=False): + with gr.Row(): + artifact_id_input = gr.Textbox( + label="Artifact ID", + placeholder="Artifact ID (0, 1, 2...)", + scale=2 + ) + execute_artifact_btn = gr.Button("πŸ“‚ Load Artifact", variant="primary") + + # MIDDLE COLUMN - RESPONSES + with gr.Column(scale=2): + gr.HTML("
SYSTEM RESPONSE
") + + with gr.Accordion(label="πŸ€– L.C.A.R.S Response", open=True): + plain_text_output = gr.Markdown( + value="## πŸ€– L.C.A.R.S Response\n\n*Awaiting your query...*", + container=True, + show_copy_button=True, + label="AI Response", + height=300 ) - - load_btn = gr.Button("πŸ“– Load File", variant="secondary") - save_btn = gr.Button("πŸ’Ύ Save Changes", variant="primary") - - save_status = gr.Textbox( - label="Status", - lines=2, - interactive=False + + execution_output = gr.Markdown( + value="*Execution results will appear here*", + label="Execution Results", + height=150 + ) + + status_display = gr.Textbox( + value="System ready", + label="Status", + interactive=False + ) + + gr.HTML("
Current Session
") + + # Enhanced Chat History Display + with gr.Accordion(label="πŸ“œ Session Chat History", open=True): + chat_history_display = gr.HTML( + value="
No messages yet
", + label="Full Session History", + show_label=True ) + + # RIGHT COLUMN - ENHANCED CODE ARTIFACTS + with gr.Column(scale=2): + gr.HTML("
🧱 ENHANCED CODE ARTIFACTS WORKSHOP
") - with gr.Column(scale=3): - gr.Markdown("## ✏️ File Editor") - - file_content = gr.Textbox( - label="File Content", - lines=25, + with gr.Accordion(label="🧱 Code Artifacts Workshop", open=True): + # Enhanced Code Editor with save functionality + code_artifacts = gr.Code( + language="python", + label="Generated Code & Artifacts", + lines=15, interactive=True, - show_copy_button=True, - placeholder="Select a file to view its content..." + show_line_numbers=True, + elem_id="code_editor", + value="# Welcome to L.C.A.R.S Code Workshop\n# Write or generate code here\n\nprint('πŸš€ L.C.A.R.S Code Workshop Active')" ) - - # Export Tab - with gr.Tab("πŸ“€ Export"): - with gr.Row(): - with gr.Column(): - gr.Markdown("## πŸ’Ύ Export Options") - export_format = gr.Dropdown( - choices=["JSON", "File List", "Summary Report"], - label="Export Format", - value="JSON" - ) + # Enhanced Artifact Controls + with gr.Row(): + artifact_description = gr.Textbox( + label="Artifact Description", + placeholder="Brief description of the code...", + scale=2 + ) + artifact_language = gr.Dropdown( + choices=["python", "javascript", "html", "css", "bash", "sql", "json"], + value="python", + label="Language", + scale=1 + ) - export_btn = gr.Button("πŸ“‹ Generate Export", variant="primary") + with gr.Row(): + execute_code_btn = gr.Button("▢️ Execute Code", variant="primary") + create_artifact_btn = gr.Button("πŸ’Ύ Save Artifact", variant="primary") - export_output = gr.Textbox( - label="Export Output", - lines=20, - show_copy_button=True, - interactive=False + # Artifacts Display + with gr.Accordion(label="πŸ“Š Current Session Artifacts", open=True): + artifacts_display = gr.HTML( + value="
No artifacts generated yet
", + label="Generated Artifacts Timeline", + show_label=True ) - with gr.Tab("🧠 Task Planning"): - - with gr.Column(scale = 1): - - with gr.Column(): - with gr.Row(): - sub_task_output = gr.Textbox(lines=4,show_label=True,container=False,label="Sub Tasks") - gr.HTML(f"
Task Strategy and Planning
") - with gr.Accordion("Task Graph", open=False): - task_graph_img = gr.Image(label="Task Reasoning Graph") - with gr.Row(): - graph_btn = gr.Button("Visualize Task Graph",variant="huggingface") - with gr.Row(): - with gr.Column(): - task_input = gr.Textbox(lines = 10,label="Enter Task Description",placeholder = "Write a BPE Tokenizer in VB.NET") - - - with gr.Column(): - gr.HTML(f"
Generated Code
") - - with gr.Accordion("Generated Code", open=False): - - task_code_output = gr.Code(show_label=True,container=True,label="Task Code Generated",language='python') - - with gr.Row(): - complexity_btn = gr.Button("Analyze Complexity", variant="huggingface") - decompose_btn = gr.Button("Decompose Task", variant="huggingface") - workflow_btn = gr.Button("Generate Workflow", variant="huggingface") - with gr.Row(): - GeneratePlan_btn = gr.Button("Generate plan", variant="huggingface") - GenerateTaskCode_btn = gr.Button("Generate code", variant="huggingface") - - with gr.Row(): - - with gr.Tabs(): - - with gr.Tab("Complexity"): - gr.HTML(f"
Task Complexity
") - - complexity_output = gr.Markdown(show_label=True,max_height=600,container=True,show_copy_button = True,label="Task Complexity") - - with gr.Tab("Planning"): - gr.HTML(f"
Sub Task Planning
") - - decompose_output = gr.Markdown(show_label=True,container=True,show_copy_button = True,label="Task Analysis") - - with gr.Tab("WorkFlow"): - gr.HTML(f"
Task Work-Flow
") - - workflow_output = gr.Markdown(show_label=True,container=True,label="Task WorkFlow") - - - - - - # Footer - gr.HTML(f""" -
- πŸš€ L.C.A.R.S - Enhanced Local Computer Advanced Reasoning System v3.0 β€’ Starfleet Command β€’ Computer Core Online -
- """) - - + # ============================================ + # EVENT HANDLERS - WITH PARSED RESPONSE SUPPORT + # ============================================ + + # Main chat functionality + def handle_message(message, history, speak_response): + # Process the message + cleaned_message, new_history, status_msg, artifacts, thinking = process_lcars_message(message, history, speak_response) + + # Update all displays + plain_text = get_plain_text_response(new_history) + chat_display = update_chat_display(new_history) + artifacts_html = update_artifacts_display() + + # Format thinking for display + thinking_display_content = f"## 🧠 AI Reasoning\n\n{thinking}" if thinking else "*No reasoning content extracted*" + + # Return in correct order for outputs + return cleaned_message, new_history, plain_text, status_msg, chat_display, artifacts_html, thinking_display_content + + submit_btn.click( + fn=handle_message, + inputs=[message, history_state, speak_response], + outputs=[ + message, # 0 - cleaned message input + history_state, # 1 - updated history state + plain_text_output, # 2 - markdown response (string) + status_display, # 3 - status message (string) + chat_history_display, # 4 - HTML display + artifacts_display, # 5 - HTML display + thinking_display # 6 - thinking markdown + ] + ) + + message.submit( + fn=handle_message, + inputs=[message, history_state, speak_response], + outputs=[ + message, + history_state, + plain_text_output, + status_display, + chat_history_display, + artifacts_display, + thinking_display + ] + ) + + # Clear chat + clear_btn.click( + fn=clear_current_chat, + outputs=[ + history_state, # 0 - empty history list + plain_text_output, # 1 - markdown string + status_display, # 2 - status string + chat_history_display, # 3 - HTML string + artifacts_display, # 4 - HTML string + thinking_display # 5 - thinking markdown + ] + ) + + # New session + new_session_btn.click( + fn=new_session, + outputs=[ + history_state, # 0 - empty history list + code_artifacts, # 1 - code string + plain_text_output, # 2 - markdown string + status_display, # 3 - status string + chat_history_display, # 4 - HTML string + artifacts_display, # 5 - HTML string + thinking_display # 6 - thinking markdown + ] + ) + + # Artifact operations + create_artifact_btn.click( + fn=create_code_artifact, + inputs=[code_artifacts, artifact_description, artifact_language], + outputs=[execution_output, code_artifacts] + ) + + execute_artifact_btn.click( + fn=execute_code_artifact, + inputs=[artifact_id_input, code_artifacts], + outputs=[execution_output, code_artifacts] + ) + execute_code_btn.click( + fn=execute_current_code, + inputs=[code_artifacts], + outputs=[execution_output, code_artifacts] + ) + + # Model settings + update_settings_btn.click( + fn=update_model_settings, + inputs=[base_url, api_key, model_id, temperature, max_tokens], + outputs=[status_display] + ) + + fetch_models_btn.click( + fn=fetch_models, + inputs=[base_url, api_key], + outputs=[model_id] + ) if __name__ == "__main__": - demo.launch() + # Start the agent + agent.start() + print("πŸš€ L.C.A.R.S Agent Started!") + print(f"πŸ€– Model: {agent.model_id}") + print(f"πŸ”— Base URL: {agent.base_url}") + print(f"πŸ’¬ Default Conversation: {agent.current_conversation}") + + # Launch the interface + demo.launch(share=True, server_name="0.0.0.0", server_port=7860) \ No newline at end of file