""" Chainlit UI for CodePilot Multi-Agent System This provides a chat interface showing detailed agent workflow: - Planner creates implementation plans - Coder writes code, uploads to sandbox, runs tests - Reviewer checks and approves code User can see every step in real-time. """ import chainlit as cl import os import sys import io from contextlib import redirect_stdout, redirect_stderr import asyncio from concurrent.futures import ThreadPoolExecutor # ============================================================ # STARTUP VERSION CHECK - Change this to detect if rebuild worked # ============================================================ APP_VERSION = "3.2.1-coder-max8" BUILD_ID = "2024-12-19-v7" print("=" * 60) print(f"[STARTUP] CodePilot Chainlit App") print(f"[STARTUP] APP_VERSION: {APP_VERSION}") print(f"[STARTUP] BUILD_ID: {BUILD_ID}") print("=" * 60) # ============================================================ # Import full context tools (embeddings + BM25) - requires 16GB+ RAM from codepilot.tools.context_tools import index_codebase # Import orchestrator from codepilot.agents.orchestrator import Orchestrator, ORCHESTRATOR_VERSION # Print orchestrator version for debugging print(f"[STARTUP] ORCHESTRATOR_VERSION: {ORCHESTRATOR_VERSION}") # Import GitHub tools for repo cloning from codepilot.tools.github_tools import ( extract_github_url, clone_repository, get_repo_info, cleanup_repository ) # Authentication disabled for now - uncomment to enable password protection # @cl.password_auth_callback # def auth_callback(username: str, password: str): # """ # Simple password authentication for CodePilot. # # For production, use environment variables and proper password hashing. # """ # # Get password from environment variable (more secure) # required_password = os.getenv('CHAINLIT_PASSWORD', 'codepilot2024') # # # In production, you should hash passwords and use a proper auth system # if password == required_password: # return cl.User( # identifier=username, # metadata={"role": "user", "provider": "credentials"} # ) # return None @cl.on_chat_start async def start(): """Initialize the agent system when chat starts.""" print("[CHAINLIT] on_chat_start triggered") # Debug log await cl.Message( content=f"# CodePilot - Autonomous AI Coding Agent\n\n" f"**Version:** `{APP_VERSION}` | **Build:** `{BUILD_ID}`\n\n" "I can help you write code, fix bugs, and implement features!\n\n" "**How to use:**\n" "1. Paste a **public GitHub URL** and I'll clone and analyze it\n" "2. Tell me what you want to build or fix\n" "3. Watch my agents (Planner > Coder > Reviewer) work!\n\n" "**Example:**\n" "```\nAnalyze https://github.com/user/repo and add error handling to the API endpoints\n```\n\n" "**Ready!** Paste a GitHub URL or describe your task." ).send() print("[CHAINLIT] Welcome message sent") # Debug log # Initialize session variables cl.user_session.set("repo_path", None) cl.user_session.set("repo_info", None) # Skip self-indexing - agents will only work with cloned GitHub repos # Create orchestrator and mark as ready cl.user_session.set("orchestrator", Orchestrator(max_iterations=3)) cl.user_session.set("ready", True) print("[CHAINLIT] Orchestrator created, ready for GitHub repos") @cl.on_chat_end async def end(): """Cleanup when chat ends.""" # Clean up any cloned repositories repo_path = cl.user_session.get("repo_path") if repo_path: print(f"[CHAINLIT] Cleaning up repo: {repo_path}") cleanup_repository(repo_path) @cl.on_message async def main(message: cl.Message): """Handle user messages and run the agent workflow.""" # Check if ready if not cl.user_session.get("ready"): await cl.Message(content="System is still initializing, please wait...").send() return # Get orchestrator orchestrator: Orchestrator = cl.user_session.get("orchestrator") # Check for GitHub URL in message github_url = extract_github_url(message.content) task_context = "" if github_url: # Clone the repository clone_msg = await cl.Message(content=f"Cloning repository: `{github_url}`...").send() success, result, repo_name = clone_repository(github_url) if success: repo_path = result repo_info = get_repo_info(repo_path) # Store in session cl.user_session.set("repo_path", repo_path) cl.user_session.set("repo_info", repo_info) # Index the repository for search (full BM25 + embeddings) try: index_result = index_codebase(repo_path) print(f"[CHAINLIT] Repository indexed: {index_result}") except Exception as e: print(f"[CHAINLIT] Indexing failed (non-critical): {e}") # Create context for the task (limited to avoid token overflow) languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" # Only include first 20 files to keep context small sample_files = repo_info["files"][:20] if repo_info["files"] else [] files_preview = "\n".join(f" - {f}" for f in sample_files) if len(repo_info["files"]) > 20: files_preview += f"\n ... and {len(repo_info['files']) - 20} more files" task_context = f""" [REPOSITORY CONTEXT] Repository: {repo_name} Path: {repo_path} Total Files: {repo_info['total_files']} Languages: {languages} Sample Files: {files_preview} AVAILABLE TOOLS: - search_repository: Search this cloned repository using BM25 keyword matching (use this to find functions, classes, or code patterns in the Flask repo) - read_file: Read a specific file (use full path: {repo_path}/filename.py) - search_code: Grep for exact pattern matches in the repository """ # Update clone message clone_msg.content = f"**Repository cloned successfully!**\n\n" \ f"- **Name:** {repo_name}\n" \ f"- **Files:** {repo_info['total_files']}\n" \ f"- **Languages:** {languages}\n" \ f"- **Path:** `{repo_path}`" await clone_msg.update() else: # Clone failed clone_msg.content = f"**Failed to clone repository**\n\n{result}\n\n" \ f"Make sure the repository is public and the URL is correct." await clone_msg.update() return # Check if we have a repo from previous message elif cl.user_session.get("repo_path"): repo_path = cl.user_session.get("repo_path") repo_info = cl.user_session.get("repo_info") if repo_info: languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" task_context = f""" [REPOSITORY CONTEXT] Repository: {repo_info['name']} Path: {repo_path} Total Files: {repo_info['total_files']} Languages: {languages} AVAILABLE TOOLS: - search_repository: Search this cloned repository using BM25 keyword matching (use this to find functions, classes, or code patterns in the Flask repo) - read_file: Read a specific file (use full path: {repo_path}/filename.py) - search_code: Grep for exact pattern matches in the repository """ # Prepare the full task with context # Remove the GitHub URL from the message to get just the user's query user_query = message.content print(f"[DEBUG] Original message.content: '{message.content}'") print(f"[DEBUG] GitHub URL found: '{github_url}'") if github_url: # Remove the URL from the message to get the actual task import re user_query = re.sub(r'https?://github\.com/[^\s]+', '', user_query).strip() print(f"[DEBUG] After URL removal: '{user_query}'") full_task = task_context + "\n\n" + user_query if task_context else user_query print(f"[DEBUG] task_context exists: {bool(task_context)}") print(f"[DEBUG] task_context length: {len(task_context) if task_context else 0}") print(f"[DEBUG] Final user_query: '{user_query}'") print(f"[DEBUG] Full task (first 500 chars): '{full_task[:500]}...'") # Create a message for streaming logs log_msg = cl.Message(content="") await log_msg.send() try: # Capture stdout/stderr to stream logs captured_output = io.StringIO() def run_orchestrator(): """Run orchestrator in thread and capture output.""" try: with redirect_stdout(captured_output), redirect_stderr(captured_output): return orchestrator.run(full_task) except Exception as e: # Capture any exceptions from orchestrator print(f"Error in orchestrator: {str(e)}") import traceback traceback.print_exc() raise # Run in thread pool to avoid blocking loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) # Start the orchestrator in background future = loop.run_in_executor(executor, run_orchestrator) # Track API usage total_prompt_tokens = 0 total_completion_tokens = 0 total_tokens = 0 seen_token_lines = set() # Track which token lines we've already counted # Stream logs while orchestrator is running - FILTERED accumulated_logs = "" while not future.done(): await asyncio.sleep(0.5) # Check every 500ms # Get new output current_output = captured_output.getvalue() if current_output != accumulated_logs: accumulated_logs = current_output # Filter logs to show only important lines filtered_lines = [] for line in accumulated_logs.split('\n'): # Extract token usage before filtering (only count each line once!) if 'Tokens:' in line and line not in seen_token_lines: seen_token_lines.add(line) # Mark as counted try: # Parse: "Tokens: 505 prompt + 20 completion = 525 total" parts = line.split('Tokens:')[1].strip() prompt = int(parts.split('prompt')[0].strip()) completion = int(parts.split('+')[1].split('completion')[0].strip()) total_prompt_tokens += prompt total_completion_tokens += completion total_tokens += (prompt + completion) except: pass # Skip token counts, progress bars, and verbose details if any(skip in line for skip in ['Tokens:', 'Batches:', '|##', 'it/s]']): continue # Keep important lines if any(keep in line for keep in [ '[CLASSIFIER]', '[ORCHESTRATOR]', '[PLANNER]', '[CODER]', '[REVIEWER]', '[EXPLORER]', 'Calling tool:', 'Tool', 'Transitioning', 'APPROVED', 'REJECTED', '[GITHUB]', 'Cloning', 'Repository' ]): filtered_lines.append(line) filtered_output = '\n'.join(filtered_lines) # Calculate cost (Claude Sonnet 4.5 pricing: $3/1M input, $15/1M output) input_cost = (total_prompt_tokens / 1000000) * 3.0 output_cost = (total_completion_tokens / 1000000) * 15.0 total_cost = input_cost + output_cost # Add usage summary to logs usage_summary = f"\n\nCREDITS USED:\n" usage_summary += f" Input: {total_prompt_tokens:,} tokens (${input_cost:.4f})\n" usage_summary += f" Output: {total_completion_tokens:,} tokens (${output_cost:.4f})\n" usage_summary += f" Total: {total_tokens:,} tokens (${total_cost:.4f})" # Update message with filtered logs + usage log_msg.content = f"```\n{filtered_output}\n{usage_summary}\n```" await log_msg.update() # Get final result result = await future # Get final logs final_logs = captured_output.getvalue() # Update with final logs log_msg.content = f"## Execution Log\n```\n{final_logs}\n```" await log_msg.update() # Send results summary summary_lines = [] if result.get('plan'): summary_lines.append("## Planner") summary_lines.append(f"Plan created ({len(result['plan'])} chars)\n") if result.get('code_changes'): summary_lines.append("## Coder") summary_lines.append(f"Created {len(result['code_changes'])} file(s):") for file_path in result['code_changes'].keys(): summary_lines.append(f" - {file_path}") summary_lines.append("") if result.get('review_feedback'): summary_lines.append("## Reviewer") if result.get('success'): summary_lines.append("Code approved") else: summary_lines.append("Needs revision") summary_lines.append("") summary_lines.append("## Result") if result.get('success'): summary_lines.append(f"**Success** (Iterations: {result.get('iterations', 'N/A')})") else: summary_lines.append(f"**Incomplete** (Iterations: {result.get('iterations', 'N/A')})") # Add final cost summary (Claude Sonnet 4.5 pricing: $3/1M input, $15/1M output) summary_lines.append("\n## API Credits Used (Claude Sonnet 4.5)") summary_lines.append(f"**Total Tokens:** {total_tokens:,}") summary_lines.append(f"- Input: {total_prompt_tokens:,} tokens (${(total_prompt_tokens/1000000)*3.0:.4f})") summary_lines.append(f"- Output: {total_completion_tokens:,} tokens (${(total_completion_tokens/1000000)*15.0:.4f})") summary_lines.append(f"\n**Estimated Cost:** ${total_cost:.4f}") await cl.Message(content="\n".join(summary_lines)).send() except Exception as e: # Determine error type and provide specific guidance error_message = str(e) error_type = type(e).__name__ if "rate_limit" in error_message.lower() or "429" in error_message: user_message = f"""## Rate Limit Reached Claude API rate limit exceeded. This happens when too many requests are made in a short time. **What to do:** - Wait a few minutes and try again - Reduce max_iterations (currently: {orchestrator.max_iterations}) - Your request will work once the rate limit resets **Error details:** ``` {error_message} ``` """ elif "insufficient_quota" in error_message.lower() or "credit" in error_message.lower(): user_message = f"""## API Credits Exhausted Your Anthropic API credits have been exhausted. **What to do:** - Add credits to your Anthropic account at https://console.anthropic.com/settings/billing - Check your usage at https://console.anthropic.com/settings/usage - Current model: Claude Sonnet 4.5 (~$0.20 per task) **Error details:** ``` {error_message} ``` """ elif "api_key" in error_message.lower() or "authentication" in error_message.lower(): user_message = f"""## API Key Error There's an issue with your Anthropic API key. **What to do:** - Verify your ANTHROPIC_API_KEY in .env file - Check that the key is valid at https://console.anthropic.com/settings/keys - Restart the application after updating .env **Error details:** ``` {error_message} ``` """ elif "timeout" in error_message.lower(): user_message = f"""## Request Timeout The operation took too long and timed out. **What to do:** - Try again with a simpler task - The task may be too complex for one iteration - Consider breaking it into smaller steps **Error details:** ``` {error_message} ``` """ else: # Generic error with helpful context user_message = f"""## Error Occurred An unexpected error occurred during execution. **Error type:** {error_type} **What to do:** - Try rephrasing your request - Check if all required files/dependencies exist - Verify your .env file has all required API keys **Error details:** ``` {error_message} ``` If this persists, please report the issue with the error details above. """ await cl.Message(content=user_message).send() if __name__ == "__main__": import sys sys.exit("Run with: chainlit run chainlit_app.py")