""" Chainlit UI for CodePilot Multi-Agent System This provides a chat interface showing detailed agent workflow: - Explorer searches the codebase - Planner creates implementation plans - Coder writes code - Reviewer checks and approves code User sees clean progress updates and copyable code output. """ import chainlit as cl import os import sys import io import re from contextlib import redirect_stdout, redirect_stderr import asyncio from concurrent.futures import ThreadPoolExecutor # ============================================================ # STARTUP VERSION CHECK - Change this to detect if rebuild worked # ============================================================ APP_VERSION = "3.8.0-test-results-ui" BUILD_ID = "2026-01-15-v1" print("=" * 60) print(f"[STARTUP] CodePilot Chainlit App") print(f"[STARTUP] APP_VERSION: {APP_VERSION}") print(f"[STARTUP] BUILD_ID: {BUILD_ID}") print("=" * 60) # ============================================================ # Import full context tools (embeddings + BM25) - requires 16GB+ RAM from codepilot.tools.context_tools import index_codebase # Import orchestrator from codepilot.agents.orchestrator import Orchestrator, ORCHESTRATOR_VERSION # Print orchestrator version for debugging print(f"[STARTUP] ORCHESTRATOR_VERSION: {ORCHESTRATOR_VERSION}") # Import GitHub tools for repo cloning from codepilot.tools.github_tools import ( extract_github_url, clone_repository, get_repo_info, cleanup_repository ) def get_file_extension(file_path: str) -> str: """Get language identifier for syntax highlighting.""" ext_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.jsx': 'jsx', '.tsx': 'tsx', '.html': 'html', '.css': 'css', '.json': 'json', '.md': 'markdown', '.yml': 'yaml', '.yaml': 'yaml', '.sql': 'sql', '.sh': 'bash', '.rs': 'rust', '.go': 'go', '.java': 'java', '.rb': 'ruby', '.php': 'php', } ext = os.path.splitext(file_path)[1].lower() return ext_map.get(ext, '') def format_code_output(code_changes: dict) -> str: """Format code changes as collapsible markdown code blocks.""" if not code_changes: return "No code changes." output = ["## πŸ’» Generated Code\n"] # Summary file_count = len(code_changes) total_lines = sum(len(content.split('\n')) for content in code_changes.values()) output.append(f"**{file_count} file{'s' if file_count != 1 else ''} β€’ {total_lines} lines**\n") for file_path, content in code_changes.items(): # Get just the filename for display filename = os.path.basename(file_path) lang = get_file_extension(file_path) line_count = len(content.split('\n')) # Use collapsible details/summary output.append(f"
") output.append(f"πŸ“„ {filename} ({line_count} lines)") output.append(f"") output.append(f"**Path:** `{file_path}`\n") output.append(f"```{lang}") output.append(content) output.append("```") output.append(f"
") output.append("") return "\n".join(output) def parse_agent_status(logs: str) -> dict: """Parse logs to extract agent status with specific activities.""" status = { 'current_agent': None, 'explorer_done': False, 'planner_done': False, 'coder_done': False, 'reviewer_done': False, 'approved': None, # Activity tracking 'explorer_activity': None, 'planner_activity': None, 'coder_activity': None, 'reviewer_activity': None, # Specific counts 'files_indexed': 0, 'files_found': 0, 'files_written': 0, 'plan_steps': 0, } for line in logs.split('\n'): # Explorer activity tracking if '[EXPLORER]' in line: status['current_agent'] = 'Explorer' if 'Calling tool:' in line: tool = line.split('Calling tool:')[1].strip() if 'index' in tool.lower(): status['explorer_activity'] = 'Indexing codebase...' elif 'search' in tool.lower(): status['explorer_activity'] = 'Searching for relevant files...' elif 'read' in tool.lower(): status['explorer_activity'] = 'Reading source files...' else: status['explorer_activity'] = f'Running {tool}...' # Count indexed files (look for indexing output) if 'indexed' in line.lower() or 'indexing' in line.lower(): import re match = re.search(r'(\d+)\s*files?', line.lower()) if match: status['files_indexed'] = int(match.group(1)) # Count found/relevant files if 'found' in line.lower() and 'file' in line.lower(): import re match = re.search(r'found\s*(\d+)', line.lower()) if match: status['files_found'] = int(match.group(1)) # Planner activity tracking if '[PLANNER]' in line: status['current_agent'] = 'Planner' status['planner_activity'] = 'Creating implementation plan...' if 'Plan created' in line: status['planner_done'] = True status['planner_activity'] = 'Plan created' # Coder activity tracking if '[CODER]' in line: status['current_agent'] = 'Coder' if 'Calling tool:' in line: tool = line.split('Calling tool:')[1].strip() if 'write' in tool.lower(): status['files_written'] += 1 status['coder_activity'] = f'Writing file #{status["files_written"]}...' elif 'run' in tool.lower() or 'command' in tool.lower(): status['coder_activity'] = 'Running tests...' else: status['coder_activity'] = f'Running {tool}...' if 'Finished implementation' in line: status['coder_done'] = True status['coder_activity'] = f'Wrote {status["files_written"]} files' # Reviewer activity tracking if '[REVIEWER]' in line: status['current_agent'] = 'Reviewer' status['reviewer_activity'] = 'Reviewing code...' if 'Calling tool:' in line: tool = line.split('Calling tool:')[1].strip() if 'read' in tool.lower(): status['reviewer_activity'] = 'Reading generated code...' else: status['reviewer_activity'] = 'Checking code quality...' # Approval status if 'APPROVED' in line: status['approved'] = True status['reviewer_done'] = True status['reviewer_activity'] = 'Approved' elif 'REJECTED' in line: status['approved'] = False status['reviewer_done'] = True status['reviewer_activity'] = 'Rejected' # State transitions if 'Transitioning to CLARIFYING' in line: status['explorer_done'] = True elif 'Transitioning to PLANNING' in line: status['explorer_done'] = True if status['files_indexed'] > 0 or status['files_found'] > 0: status['explorer_activity'] = f'Indexed {status["files_indexed"]} files' if status['files_found'] > 0: status['explorer_activity'] += f', found {status["files_found"]} relevant' else: status['explorer_activity'] = 'Analyzed codebase' elif 'Transitioning to CODING' in line: status['planner_done'] = True elif 'Transitioning to REVIEWING' in line: status['coder_done'] = True elif 'Transitioning to COMPLETE' in line: status['reviewer_done'] = True return status def format_progress_display(status: dict, total_cost: float) -> str: """Format progress display with specific agent activities.""" def icon(done: bool, active: bool = False) -> str: if done: return "βœ…" elif active: return "πŸ”„" else: return "⏸️" def get_activity(agent: str) -> str: """Get activity text for an agent.""" current = status['current_agent'] if agent == 'Explorer': if status['explorer_done']: return status.get('explorer_activity') or 'Complete' elif current == 'Explorer': return status.get('explorer_activity') or 'Analyzing codebase...' return '' elif agent == 'Planner': if status['planner_done']: return 'Complete' elif current == 'Planner': return status.get('planner_activity') or 'Creating plan...' return '' elif agent == 'Coder': if status['coder_done']: activity = status.get('coder_activity') if activity: return activity files = status.get('files_written', 0) return f'Complete ({files} files)' if files else 'Complete' elif current == 'Coder': return status.get('coder_activity') or 'Writing code...' return '' elif agent == 'Reviewer': if status['reviewer_done']: if status['approved']: return '**Approved βœ“**' else: return '**Needs revision**' elif current == 'Reviewer': return status.get('reviewer_activity') or 'Reviewing...' return '' return '' current = status['current_agent'] lines = [] # Progress bar done_count = sum([status['explorer_done'], status['planner_done'], status['coder_done'], status['reviewer_done']]) progress_bar = "β–ˆ" * done_count + "β–‘" * (4 - done_count) lines.append(f"**Progress:** {progress_bar} {done_count}/4 agents") lines.append("") # Agent status lines.append(f"{icon(status['explorer_done'], current == 'Explorer')} **Explorer** {get_activity('Explorer')}") lines.append(f"{icon(status['planner_done'], current == 'Planner')} **Planner** {get_activity('Planner')}") lines.append(f"{icon(status['coder_done'], current == 'Coder')} **Coder** {get_activity('Coder')}") lines.append(f"{icon(status['reviewer_done'], current == 'Reviewer')} **Reviewer** {get_activity('Reviewer')}") lines.append(f"\nπŸ’° **Cost:** ${total_cost:.4f}") return "\n".join(lines) def analyze_code_quality(code_changes: dict, review_feedback: str) -> dict: """Analyze code and return test results.""" tests = { 'syntax_valid': {'passed': True, 'details': 'No syntax errors detected'}, 'imports_valid': {'passed': True, 'details': 'All imports are valid'}, 'has_error_handling': {'passed': False, 'details': 'Checking for try/except blocks'}, 'security_check': {'passed': True, 'details': 'No obvious security issues'}, 'follows_plan': {'passed': True, 'details': 'Implementation matches plan'}, 'code_quality': {'passed': True, 'details': 'Clean and readable code'} } # Analyze each file for file_path, content in code_changes.items(): # Check for error handling if 'try:' in content or 'except' in content or 'raise' in content: tests['has_error_handling']['passed'] = True tests['has_error_handling']['details'] = 'Error handling implemented' # Check for common security patterns dangerous_patterns = ['eval(', 'exec(', 'pickle.loads', '__import__'] found_issues = [p for p in dangerous_patterns if p in content] if found_issues: tests['security_check']['passed'] = False tests['security_check']['details'] = f'Found: {", ".join(found_issues)}' # Check imports import_lines = [line for line in content.split('\n') if line.strip().startswith(('import ', 'from '))] if import_lines: tests['imports_valid']['details'] = f'{len(import_lines)} imports found' # Check review feedback for issues if review_feedback: if 'REJECT' in review_feedback.upper() or 'bug' in review_feedback.lower(): tests['code_quality']['passed'] = False tests['code_quality']['details'] = 'Reviewer found issues' if 'plan' in review_feedback.lower() and 'not' in review_feedback.lower(): tests['follows_plan']['passed'] = False tests['follows_plan']['details'] = 'Does not match plan' return tests def format_test_results_table(tests: dict) -> str: """Format test results as a nice markdown table.""" lines = [ "## πŸ§ͺ Quality Checks\n", "| Test | Status | Details |", "|------|--------|---------|" ] test_names = { 'syntax_valid': 'Syntax Validation', 'imports_valid': 'Import Checks', 'has_error_handling': 'Error Handling', 'security_check': 'Security Scan', 'follows_plan': 'Plan Compliance', 'code_quality': 'Code Quality' } for test_key, test_data in tests.items(): test_name = test_names.get(test_key, test_key.replace('_', ' ').title()) status = "βœ… Pass" if test_data['passed'] else "❌ Fail" details = test_data['details'] lines.append(f"| {test_name} | {status} | {details} |") # Summary passed_count = sum(1 for t in tests.values() if t['passed']) total_count = len(tests) lines.append("") lines.append(f"**Summary:** {passed_count}/{total_count} checks passed") lines.append("") return "\n".join(lines) def format_final_result(result: dict, total_cost: float) -> str: """Format final result with detailed test checks.""" success = result.get('success', False) code_changes = result.get('code_changes', {}) file_count = len(code_changes) if code_changes else 0 review_feedback = result.get('review_feedback', '') lines = [] # Overall status if success: lines.append("## βœ… Task Complete!\n") lines.append(f"**Files changed:** {file_count}") lines.append(f"**Review:** Approved βœ“") elif code_changes: lines.append("## ⚠️ Code Written (Needs Revision)\n") lines.append(f"**Files changed:** {file_count}") lines.append(f"**Review:** Needs changes") if review_feedback: lines.append(f"\n**Feedback:**\n{review_feedback}") else: lines.append("## ❌ Task Failed\n") error = result.get('error', 'Unknown error') lines.append(f"**Error:** {error}") # Add test results table if code was generated if code_changes: lines.append("\n") tests = analyze_code_quality(code_changes, review_feedback) lines.append(format_test_results_table(tests)) lines.append(f"πŸ’° **Cost:** ${total_cost:.4f}") return "\n".join(lines) def format_plan_display(plan: str) -> str: """Format plan cleanly with a simple summary.""" if not plan: return "" lines = ["## πŸ“‹ Implementation Plan\n"] # Simple approach: just show the plan in a clean format # Extract key steps if numbered, otherwise show abbreviated version plan_lines = plan.split('\n') steps = [] import re for line in plan_lines: stripped = line.strip() # Match numbered items like "1.", "2.", etc. if stripped: match = re.match(r'^(\d+)[.)\]:]\s*(.+)', stripped) if match: step_num = match.group(1) step_text = match.group(2).strip() if len(step_text) > 10 and not step_text.startswith('/'): # Truncate long steps if len(step_text) > 100: step_text = step_text[:97] + '...' steps.append(f"{step_num}. {step_text}") if steps and len(steps) <= 10: # Show numbered steps if we found them lines.extend(steps) else: # Otherwise just show first few lines of the plan preview_lines = [l.strip() for l in plan_lines[:8] if l.strip() and not l.strip().startswith('#')] if preview_lines: lines.append('\n'.join(preview_lines[:5])) if len(preview_lines) > 5: lines.append("\n*...plan continues...*") else: lines.append("Plan created successfully") lines.append("") return "\n".join(lines) @cl.on_chat_start async def start(): """Initialize the agent system when chat starts.""" print("[CHAINLIT] on_chat_start triggered") await cl.Message( content="πŸ‘‹ **CodePilot ready!**\n\n" "Paste a GitHub URL + your task to get started.\n\n" "*The welcome screen above explains everything you need to know.*" ).send() print("[CHAINLIT] Welcome message sent") # Initialize session variables cl.user_session.set("repo_path", None) cl.user_session.set("repo_info", None) cl.user_session.set("orchestrator", Orchestrator(max_iterations=10)) cl.user_session.set("ready", True) print("[CHAINLIT] Orchestrator created, ready for GitHub repos") @cl.on_chat_end async def end(): """Cleanup when chat ends.""" repo_path = cl.user_session.get("repo_path") if repo_path: print(f"[CHAINLIT] Cleaning up repo: {repo_path}") cleanup_repository(repo_path) async def run_workflow(orchestrator, task_or_answers, is_resume=False): """Run the orchestrator workflow and display clean progress.""" # Create progress message progress_msg = cl.Message(content="## Agent Progress\n\nStarting...") await progress_msg.send() try: captured_output = io.StringIO() def run_task(): with redirect_stdout(captured_output), redirect_stderr(captured_output): if is_resume: return orchestrator.resume_after_clarification(task_or_answers) else: return orchestrator.run(task_or_answers) loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) future = loop.run_in_executor(executor, run_task) # Track tokens for cost calculation total_prompt_tokens = 0 total_completion_tokens = 0 seen_token_lines = set() # Stream progress updates accumulated_logs = "" while not future.done(): await asyncio.sleep(0.3) current_output = captured_output.getvalue() if current_output != accumulated_logs: accumulated_logs = current_output # Extract token usage for line in accumulated_logs.split('\n'): if 'Tokens:' in line and line not in seen_token_lines: seen_token_lines.add(line) try: parts = line.split('Tokens:')[1].strip() prompt = int(parts.split('prompt')[0].strip()) completion = int(parts.split('+')[1].split('completion')[0].strip()) total_prompt_tokens += prompt total_completion_tokens += completion except: pass # Calculate cost total_cost = (total_prompt_tokens / 1000000) * 3.0 + (total_completion_tokens / 1000000) * 15.0 # Parse and display progress status = parse_agent_status(accumulated_logs) progress_msg.content = format_progress_display(status, total_cost) await progress_msg.update() # Get final result result = await future # Final token count final_logs = captured_output.getvalue() for line in final_logs.split('\n'): if 'Tokens:' in line and line not in seen_token_lines: seen_token_lines.add(line) try: parts = line.split('Tokens:')[1].strip() prompt = int(parts.split('prompt')[0].strip()) completion = int(parts.split('+')[1].split('completion')[0].strip()) total_prompt_tokens += prompt total_completion_tokens += completion except: pass total_cost = (total_prompt_tokens / 1000000) * 3.0 + (total_completion_tokens / 1000000) * 15.0 # Update progress with final status status = parse_agent_status(final_logs) progress_msg.content = format_progress_display(status, total_cost) await progress_msg.update() return result, total_cost except Exception as e: await cl.Message(content=f"## Error\n\n```\n{str(e)}\n```").send() return None, 0 @cl.on_message async def main(message: cl.Message): """Handle user messages and run the agent workflow.""" if not cl.user_session.get("ready"): await cl.Message(content="System is still initializing, please wait...").send() return orchestrator: Orchestrator = cl.user_session.get("orchestrator") # Handle clarification answers if cl.user_session.get("waiting_for_clarification"): cl.user_session.set("waiting_for_clarification", False) user_answers = message.content await cl.Message(content="Thanks! Creating plan with your input...").send() result, total_cost = await run_workflow(orchestrator, user_answers, is_resume=True) if result: # 1. Show plan first if result.get('plan'): await cl.Message(content=format_plan_display(result['plan'])).send() # 2. Then show code if result.get('code_changes'): await cl.Message(content=format_code_output(result['code_changes'])).send() # 3. Finally show result table await cl.Message(content=format_final_result(result, total_cost)).send() return # New task - reset orchestrator orchestrator = Orchestrator(max_iterations=10) cl.user_session.set("orchestrator", orchestrator) print("[CHAINLIT] Created fresh orchestrator for new task") # Check for GitHub URL github_url = extract_github_url(message.content) task_context = "" repo_path = None # Initialize to avoid NameError if github_url: clone_msg = await cl.Message(content=f"πŸ“¦ Cloning `{github_url}`...").send() success, result, repo_name = clone_repository(github_url) if success: repo_path = result repo_info = get_repo_info(repo_path) cl.user_session.set("repo_path", repo_path) cl.user_session.set("repo_info", repo_info) # Index repository try: index_codebase(repo_path) except Exception as e: print(f"[CHAINLIT] Indexing failed: {e}") languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" task_context = f""" [REPOSITORY CONTEXT] Repository: {repo_name} Path: {repo_path} Total Files: {repo_info['total_files']} Languages: {languages} AVAILABLE TOOLS: - search_repository: Search using BM25 - read_file: Read file (use full path: {repo_path}/...) - search_code: Grep for patterns """ clone_msg.content = f"βœ… **Cloned:** {repo_name} ({repo_info['total_files']} files, {languages})" await clone_msg.update() else: clone_msg.content = f"❌ **Failed to clone:** {result}" await clone_msg.update() return elif cl.user_session.get("repo_path"): repo_path = cl.user_session.get("repo_path") repo_info = cl.user_session.get("repo_info") if repo_info: languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" task_context = f""" [REPOSITORY CONTEXT] Repository: {repo_info['name']} Path: {repo_path} Total Files: {repo_info['total_files']} Languages: {languages} AVAILABLE TOOLS: - search_repository: Search using BM25 - read_file: Read file (use full path: {repo_path}/...) - search_code: Grep for patterns """ # Extract user query (remove URL if present) user_query = message.content if github_url: user_query = re.sub(r'https?://github\.com/[^\s]+', '', user_query).strip() full_task = task_context + "\n\n" + user_query if task_context else user_query # Set repository path as environment variable for tools to use if repo_path: os.environ['CODEPILOT_REPO_PATH'] = repo_path print(f"[CHAINLIT] Set CODEPILOT_REPO_PATH={repo_path}") # Run workflow result, total_cost = await run_workflow(orchestrator, full_task, is_resume=False) if not result: return # Check if clarification needed if result.get('status') == 'clarifying' and result.get('clarifying_questions'): cl.user_session.set("waiting_for_clarification", True) await cl.Message( content=f"## Before I proceed, I have some questions:\n\n{result['clarifying_questions']}\n\n**Please answer above to continue.**" ).send() return # 1. Show plan first if result.get('plan'): await cl.Message(content=format_plan_display(result['plan'])).send() # 2. Then show generated code if result.get('code_changes'): await cl.Message(content=format_code_output(result['code_changes'])).send() # 3. Finally show result table await cl.Message(content=format_final_result(result, total_cost)).send() if __name__ == "__main__": sys.exit("Run with: chainlit run chainlit_app.py")