codepilot / chainlit_app.py
ayushm98's picture
v3.2.1: Reduce Coder max iterations from 15 to 8
454104f
raw
history blame
17 kB
"""
Chainlit UI for CodePilot Multi-Agent System
This provides a chat interface showing detailed agent workflow:
- Planner creates implementation plans
- Coder writes code, uploads to sandbox, runs tests
- Reviewer checks and approves code
User can see every step in real-time.
"""
import chainlit as cl
import os
import sys
import io
from contextlib import redirect_stdout, redirect_stderr
import asyncio
from concurrent.futures import ThreadPoolExecutor
# ============================================================
# STARTUP VERSION CHECK - Change this to detect if rebuild worked
# ============================================================
APP_VERSION = "3.2.1-coder-max8"
BUILD_ID = "2024-12-19-v7"
print("=" * 60)
print(f"[STARTUP] CodePilot Chainlit App")
print(f"[STARTUP] APP_VERSION: {APP_VERSION}")
print(f"[STARTUP] BUILD_ID: {BUILD_ID}")
print("=" * 60)
# ============================================================
# Import full context tools (embeddings + BM25) - requires 16GB+ RAM
from codepilot.tools.context_tools import index_codebase
# Import orchestrator
from codepilot.agents.orchestrator import Orchestrator, ORCHESTRATOR_VERSION
# Print orchestrator version for debugging
print(f"[STARTUP] ORCHESTRATOR_VERSION: {ORCHESTRATOR_VERSION}")
# Import GitHub tools for repo cloning
from codepilot.tools.github_tools import (
extract_github_url,
clone_repository,
get_repo_info,
cleanup_repository
)
# Authentication disabled for now - uncomment to enable password protection
# @cl.password_auth_callback
# def auth_callback(username: str, password: str):
# """
# Simple password authentication for CodePilot.
#
# For production, use environment variables and proper password hashing.
# """
# # Get password from environment variable (more secure)
# required_password = os.getenv('CHAINLIT_PASSWORD', 'codepilot2024')
#
# # In production, you should hash passwords and use a proper auth system
# if password == required_password:
# return cl.User(
# identifier=username,
# metadata={"role": "user", "provider": "credentials"}
# )
# return None
@cl.on_chat_start
async def start():
"""Initialize the agent system when chat starts."""
print("[CHAINLIT] on_chat_start triggered") # Debug log
await cl.Message(
content=f"# CodePilot - Autonomous AI Coding Agent\n\n"
f"**Version:** `{APP_VERSION}` | **Build:** `{BUILD_ID}`\n\n"
"I can help you write code, fix bugs, and implement features!\n\n"
"**How to use:**\n"
"1. Paste a **public GitHub URL** and I'll clone and analyze it\n"
"2. Tell me what you want to build or fix\n"
"3. Watch my agents (Planner > Coder > Reviewer) work!\n\n"
"**Example:**\n"
"```\nAnalyze https://github.com/user/repo and add error handling to the API endpoints\n```\n\n"
"**Ready!** Paste a GitHub URL or describe your task."
).send()
print("[CHAINLIT] Welcome message sent") # Debug log
# Initialize session variables
cl.user_session.set("repo_path", None)
cl.user_session.set("repo_info", None)
# Skip self-indexing - agents will only work with cloned GitHub repos
# Create orchestrator and mark as ready
cl.user_session.set("orchestrator", Orchestrator(max_iterations=3))
cl.user_session.set("ready", True)
print("[CHAINLIT] Orchestrator created, ready for GitHub repos")
@cl.on_chat_end
async def end():
"""Cleanup when chat ends."""
# Clean up any cloned repositories
repo_path = cl.user_session.get("repo_path")
if repo_path:
print(f"[CHAINLIT] Cleaning up repo: {repo_path}")
cleanup_repository(repo_path)
@cl.on_message
async def main(message: cl.Message):
"""Handle user messages and run the agent workflow."""
# Check if ready
if not cl.user_session.get("ready"):
await cl.Message(content="System is still initializing, please wait...").send()
return
# Get orchestrator
orchestrator: Orchestrator = cl.user_session.get("orchestrator")
# Check for GitHub URL in message
github_url = extract_github_url(message.content)
task_context = ""
if github_url:
# Clone the repository
clone_msg = await cl.Message(content=f"Cloning repository: `{github_url}`...").send()
success, result, repo_name = clone_repository(github_url)
if success:
repo_path = result
repo_info = get_repo_info(repo_path)
# Store in session
cl.user_session.set("repo_path", repo_path)
cl.user_session.set("repo_info", repo_info)
# Index the repository for search (full BM25 + embeddings)
try:
index_result = index_codebase(repo_path)
print(f"[CHAINLIT] Repository indexed: {index_result}")
except Exception as e:
print(f"[CHAINLIT] Indexing failed (non-critical): {e}")
# Create context for the task (limited to avoid token overflow)
languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown"
# Only include first 20 files to keep context small
sample_files = repo_info["files"][:20] if repo_info["files"] else []
files_preview = "\n".join(f" - {f}" for f in sample_files)
if len(repo_info["files"]) > 20:
files_preview += f"\n ... and {len(repo_info['files']) - 20} more files"
task_context = f"""
[REPOSITORY CONTEXT]
Repository: {repo_name}
Path: {repo_path}
Total Files: {repo_info['total_files']}
Languages: {languages}
Sample Files:
{files_preview}
AVAILABLE TOOLS:
- search_repository: Search this cloned repository using BM25 keyword matching (use this to find functions, classes, or code patterns in the Flask repo)
- read_file: Read a specific file (use full path: {repo_path}/filename.py)
- search_code: Grep for exact pattern matches in the repository
"""
# Update clone message
clone_msg.content = f"**Repository cloned successfully!**\n\n" \
f"- **Name:** {repo_name}\n" \
f"- **Files:** {repo_info['total_files']}\n" \
f"- **Languages:** {languages}\n" \
f"- **Path:** `{repo_path}`"
await clone_msg.update()
else:
# Clone failed
clone_msg.content = f"**Failed to clone repository**\n\n{result}\n\n" \
f"Make sure the repository is public and the URL is correct."
await clone_msg.update()
return
# Check if we have a repo from previous message
elif cl.user_session.get("repo_path"):
repo_path = cl.user_session.get("repo_path")
repo_info = cl.user_session.get("repo_info")
if repo_info:
languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown"
task_context = f"""
[REPOSITORY CONTEXT]
Repository: {repo_info['name']}
Path: {repo_path}
Total Files: {repo_info['total_files']}
Languages: {languages}
AVAILABLE TOOLS:
- search_repository: Search this cloned repository using BM25 keyword matching (use this to find functions, classes, or code patterns in the Flask repo)
- read_file: Read a specific file (use full path: {repo_path}/filename.py)
- search_code: Grep for exact pattern matches in the repository
"""
# Prepare the full task with context
# Remove the GitHub URL from the message to get just the user's query
user_query = message.content
print(f"[DEBUG] Original message.content: '{message.content}'")
print(f"[DEBUG] GitHub URL found: '{github_url}'")
if github_url:
# Remove the URL from the message to get the actual task
import re
user_query = re.sub(r'https?://github\.com/[^\s]+', '', user_query).strip()
print(f"[DEBUG] After URL removal: '{user_query}'")
full_task = task_context + "\n\n" + user_query if task_context else user_query
print(f"[DEBUG] task_context exists: {bool(task_context)}")
print(f"[DEBUG] task_context length: {len(task_context) if task_context else 0}")
print(f"[DEBUG] Final user_query: '{user_query}'")
print(f"[DEBUG] Full task (first 500 chars): '{full_task[:500]}...'")
# Create a message for streaming logs
log_msg = cl.Message(content="")
await log_msg.send()
try:
# Capture stdout/stderr to stream logs
captured_output = io.StringIO()
def run_orchestrator():
"""Run orchestrator in thread and capture output."""
try:
with redirect_stdout(captured_output), redirect_stderr(captured_output):
return orchestrator.run(full_task)
except Exception as e:
# Capture any exceptions from orchestrator
print(f"Error in orchestrator: {str(e)}")
import traceback
traceback.print_exc()
raise
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
# Start the orchestrator in background
future = loop.run_in_executor(executor, run_orchestrator)
# Track API usage
total_prompt_tokens = 0
total_completion_tokens = 0
total_tokens = 0
seen_token_lines = set() # Track which token lines we've already counted
# Stream logs while orchestrator is running - FILTERED
accumulated_logs = ""
while not future.done():
await asyncio.sleep(0.5) # Check every 500ms
# Get new output
current_output = captured_output.getvalue()
if current_output != accumulated_logs:
accumulated_logs = current_output
# Filter logs to show only important lines
filtered_lines = []
for line in accumulated_logs.split('\n'):
# Extract token usage before filtering (only count each line once!)
if 'Tokens:' in line and line not in seen_token_lines:
seen_token_lines.add(line) # Mark as counted
try:
# Parse: "Tokens: 505 prompt + 20 completion = 525 total"
parts = line.split('Tokens:')[1].strip()
prompt = int(parts.split('prompt')[0].strip())
completion = int(parts.split('+')[1].split('completion')[0].strip())
total_prompt_tokens += prompt
total_completion_tokens += completion
total_tokens += (prompt + completion)
except:
pass
# Skip token counts, progress bars, and verbose details
if any(skip in line for skip in ['Tokens:', 'Batches:', '|##', 'it/s]']):
continue
# Keep important lines
if any(keep in line for keep in [
'[CLASSIFIER]', '[ORCHESTRATOR]', '[PLANNER]', '[CODER]', '[REVIEWER]',
'[EXPLORER]', 'Calling tool:', 'Tool', 'Transitioning', 'APPROVED', 'REJECTED',
'[GITHUB]', 'Cloning', 'Repository'
]):
filtered_lines.append(line)
filtered_output = '\n'.join(filtered_lines)
# Calculate cost (Claude Sonnet 4.5 pricing: $3/1M input, $15/1M output)
input_cost = (total_prompt_tokens / 1000000) * 3.0
output_cost = (total_completion_tokens / 1000000) * 15.0
total_cost = input_cost + output_cost
# Add usage summary to logs
usage_summary = f"\n\nCREDITS USED:\n"
usage_summary += f" Input: {total_prompt_tokens:,} tokens (${input_cost:.4f})\n"
usage_summary += f" Output: {total_completion_tokens:,} tokens (${output_cost:.4f})\n"
usage_summary += f" Total: {total_tokens:,} tokens (${total_cost:.4f})"
# Update message with filtered logs + usage
log_msg.content = f"```\n{filtered_output}\n{usage_summary}\n```"
await log_msg.update()
# Get final result
result = await future
# Get final logs
final_logs = captured_output.getvalue()
# Update with final logs
log_msg.content = f"## Execution Log\n```\n{final_logs}\n```"
await log_msg.update()
# Send results summary
summary_lines = []
if result.get('plan'):
summary_lines.append("## Planner")
summary_lines.append(f"Plan created ({len(result['plan'])} chars)\n")
if result.get('code_changes'):
summary_lines.append("## Coder")
summary_lines.append(f"Created {len(result['code_changes'])} file(s):")
for file_path in result['code_changes'].keys():
summary_lines.append(f" - {file_path}")
summary_lines.append("")
if result.get('review_feedback'):
summary_lines.append("## Reviewer")
if result.get('success'):
summary_lines.append("Code approved")
else:
summary_lines.append("Needs revision")
summary_lines.append("")
summary_lines.append("## Result")
if result.get('success'):
summary_lines.append(f"**Success** (Iterations: {result.get('iterations', 'N/A')})")
else:
summary_lines.append(f"**Incomplete** (Iterations: {result.get('iterations', 'N/A')})")
# Add final cost summary (Claude Sonnet 4.5 pricing: $3/1M input, $15/1M output)
summary_lines.append("\n## API Credits Used (Claude Sonnet 4.5)")
summary_lines.append(f"**Total Tokens:** {total_tokens:,}")
summary_lines.append(f"- Input: {total_prompt_tokens:,} tokens (${(total_prompt_tokens/1000000)*3.0:.4f})")
summary_lines.append(f"- Output: {total_completion_tokens:,} tokens (${(total_completion_tokens/1000000)*15.0:.4f})")
summary_lines.append(f"\n**Estimated Cost:** ${total_cost:.4f}")
await cl.Message(content="\n".join(summary_lines)).send()
except Exception as e:
# Determine error type and provide specific guidance
error_message = str(e)
error_type = type(e).__name__
if "rate_limit" in error_message.lower() or "429" in error_message:
user_message = f"""## Rate Limit Reached
Claude API rate limit exceeded. This happens when too many requests are made in a short time.
**What to do:**
- Wait a few minutes and try again
- Reduce max_iterations (currently: {orchestrator.max_iterations})
- Your request will work once the rate limit resets
**Error details:**
```
{error_message}
```
"""
elif "insufficient_quota" in error_message.lower() or "credit" in error_message.lower():
user_message = f"""## API Credits Exhausted
Your Anthropic API credits have been exhausted.
**What to do:**
- Add credits to your Anthropic account at https://console.anthropic.com/settings/billing
- Check your usage at https://console.anthropic.com/settings/usage
- Current model: Claude Sonnet 4.5 (~$0.20 per task)
**Error details:**
```
{error_message}
```
"""
elif "api_key" in error_message.lower() or "authentication" in error_message.lower():
user_message = f"""## API Key Error
There's an issue with your Anthropic API key.
**What to do:**
- Verify your ANTHROPIC_API_KEY in .env file
- Check that the key is valid at https://console.anthropic.com/settings/keys
- Restart the application after updating .env
**Error details:**
```
{error_message}
```
"""
elif "timeout" in error_message.lower():
user_message = f"""## Request Timeout
The operation took too long and timed out.
**What to do:**
- Try again with a simpler task
- The task may be too complex for one iteration
- Consider breaking it into smaller steps
**Error details:**
```
{error_message}
```
"""
else:
# Generic error with helpful context
user_message = f"""## Error Occurred
An unexpected error occurred during execution.
**Error type:** {error_type}
**What to do:**
- Try rephrasing your request
- Check if all required files/dependencies exist
- Verify your .env file has all required API keys
**Error details:**
```
{error_message}
```
If this persists, please report the issue with the error details above.
"""
await cl.Message(content=user_message).send()
if __name__ == "__main__":
import sys
sys.exit("Run with: chainlit run chainlit_app.py")