|
|
""" |
|
|
Chainlit UI for CodePilot Multi-Agent System |
|
|
|
|
|
This provides a chat interface showing detailed agent workflow: |
|
|
- Explorer searches the codebase |
|
|
- Planner creates implementation plans |
|
|
- Coder writes code |
|
|
- Reviewer checks and approves code |
|
|
|
|
|
User sees clean progress updates and copyable code output. |
|
|
""" |
|
|
|
|
|
import chainlit as cl |
|
|
import os |
|
|
import sys |
|
|
import io |
|
|
import re |
|
|
from contextlib import redirect_stdout, redirect_stderr |
|
|
import asyncio |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_VERSION = "3.8.0-test-results-ui" |
|
|
BUILD_ID = "2026-01-15-v1" |
|
|
print("=" * 60) |
|
|
print(f"[STARTUP] CodePilot Chainlit App") |
|
|
print(f"[STARTUP] APP_VERSION: {APP_VERSION}") |
|
|
print(f"[STARTUP] BUILD_ID: {BUILD_ID}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
|
|
|
from codepilot.tools.context_tools import index_codebase |
|
|
|
|
|
|
|
|
from codepilot.agents.orchestrator import Orchestrator, ORCHESTRATOR_VERSION |
|
|
|
|
|
|
|
|
print(f"[STARTUP] ORCHESTRATOR_VERSION: {ORCHESTRATOR_VERSION}") |
|
|
|
|
|
|
|
|
from codepilot.tools.github_tools import ( |
|
|
extract_github_url, |
|
|
clone_repository, |
|
|
get_repo_info, |
|
|
cleanup_repository |
|
|
) |
|
|
|
|
|
|
|
|
def get_file_extension(file_path: str) -> str: |
|
|
"""Get language identifier for syntax highlighting.""" |
|
|
ext_map = { |
|
|
'.py': 'python', |
|
|
'.js': 'javascript', |
|
|
'.ts': 'typescript', |
|
|
'.jsx': 'jsx', |
|
|
'.tsx': 'tsx', |
|
|
'.html': 'html', |
|
|
'.css': 'css', |
|
|
'.json': 'json', |
|
|
'.md': 'markdown', |
|
|
'.yml': 'yaml', |
|
|
'.yaml': 'yaml', |
|
|
'.sql': 'sql', |
|
|
'.sh': 'bash', |
|
|
'.rs': 'rust', |
|
|
'.go': 'go', |
|
|
'.java': 'java', |
|
|
'.rb': 'ruby', |
|
|
'.php': 'php', |
|
|
} |
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
return ext_map.get(ext, '') |
|
|
|
|
|
|
|
|
def format_code_output(code_changes: dict) -> str: |
|
|
"""Format code changes as collapsible markdown code blocks.""" |
|
|
if not code_changes: |
|
|
return "No code changes." |
|
|
|
|
|
output = ["## 💻 Generated Code\n"] |
|
|
|
|
|
|
|
|
file_count = len(code_changes) |
|
|
total_lines = sum(len(content.split('\n')) for content in code_changes.values()) |
|
|
output.append(f"**{file_count} file{'s' if file_count != 1 else ''} • {total_lines} lines**\n") |
|
|
|
|
|
for file_path, content in code_changes.items(): |
|
|
|
|
|
filename = os.path.basename(file_path) |
|
|
lang = get_file_extension(file_path) |
|
|
line_count = len(content.split('\n')) |
|
|
|
|
|
|
|
|
output.append(f"<details>") |
|
|
output.append(f"<summary>📄 <strong>{filename}</strong> ({line_count} lines)</summary>") |
|
|
output.append(f"") |
|
|
output.append(f"**Path:** `{file_path}`\n") |
|
|
output.append(f"```{lang}") |
|
|
output.append(content) |
|
|
output.append("```") |
|
|
output.append(f"</details>") |
|
|
output.append("") |
|
|
|
|
|
return "\n".join(output) |
|
|
|
|
|
|
|
|
def parse_agent_status(logs: str) -> dict: |
|
|
"""Parse logs to extract agent status with specific activities.""" |
|
|
status = { |
|
|
'current_agent': None, |
|
|
'explorer_done': False, |
|
|
'planner_done': False, |
|
|
'coder_done': False, |
|
|
'reviewer_done': False, |
|
|
'approved': None, |
|
|
|
|
|
'explorer_activity': None, |
|
|
'planner_activity': None, |
|
|
'coder_activity': None, |
|
|
'reviewer_activity': None, |
|
|
|
|
|
'files_indexed': 0, |
|
|
'files_found': 0, |
|
|
'files_written': 0, |
|
|
'plan_steps': 0, |
|
|
} |
|
|
|
|
|
for line in logs.split('\n'): |
|
|
|
|
|
if '[EXPLORER]' in line: |
|
|
status['current_agent'] = 'Explorer' |
|
|
if 'Calling tool:' in line: |
|
|
tool = line.split('Calling tool:')[1].strip() |
|
|
if 'index' in tool.lower(): |
|
|
status['explorer_activity'] = 'Indexing codebase...' |
|
|
elif 'search' in tool.lower(): |
|
|
status['explorer_activity'] = 'Searching for relevant files...' |
|
|
elif 'read' in tool.lower(): |
|
|
status['explorer_activity'] = 'Reading source files...' |
|
|
else: |
|
|
status['explorer_activity'] = f'Running {tool}...' |
|
|
|
|
|
|
|
|
if 'indexed' in line.lower() or 'indexing' in line.lower(): |
|
|
import re |
|
|
match = re.search(r'(\d+)\s*files?', line.lower()) |
|
|
if match: |
|
|
status['files_indexed'] = int(match.group(1)) |
|
|
|
|
|
|
|
|
if 'found' in line.lower() and 'file' in line.lower(): |
|
|
import re |
|
|
match = re.search(r'found\s*(\d+)', line.lower()) |
|
|
if match: |
|
|
status['files_found'] = int(match.group(1)) |
|
|
|
|
|
|
|
|
if '[PLANNER]' in line: |
|
|
status['current_agent'] = 'Planner' |
|
|
status['planner_activity'] = 'Creating implementation plan...' |
|
|
if 'Plan created' in line: |
|
|
status['planner_done'] = True |
|
|
status['planner_activity'] = 'Plan created' |
|
|
|
|
|
|
|
|
if '[CODER]' in line: |
|
|
status['current_agent'] = 'Coder' |
|
|
if 'Calling tool:' in line: |
|
|
tool = line.split('Calling tool:')[1].strip() |
|
|
if 'write' in tool.lower(): |
|
|
status['files_written'] += 1 |
|
|
status['coder_activity'] = f'Writing file #{status["files_written"]}...' |
|
|
elif 'run' in tool.lower() or 'command' in tool.lower(): |
|
|
status['coder_activity'] = 'Running tests...' |
|
|
else: |
|
|
status['coder_activity'] = f'Running {tool}...' |
|
|
if 'Finished implementation' in line: |
|
|
status['coder_done'] = True |
|
|
status['coder_activity'] = f'Wrote {status["files_written"]} files' |
|
|
|
|
|
|
|
|
if '[REVIEWER]' in line: |
|
|
status['current_agent'] = 'Reviewer' |
|
|
status['reviewer_activity'] = 'Reviewing code...' |
|
|
if 'Calling tool:' in line: |
|
|
tool = line.split('Calling tool:')[1].strip() |
|
|
if 'read' in tool.lower(): |
|
|
status['reviewer_activity'] = 'Reading generated code...' |
|
|
else: |
|
|
status['reviewer_activity'] = 'Checking code quality...' |
|
|
|
|
|
|
|
|
if 'APPROVED' in line: |
|
|
status['approved'] = True |
|
|
status['reviewer_done'] = True |
|
|
status['reviewer_activity'] = 'Approved' |
|
|
elif 'REJECTED' in line: |
|
|
status['approved'] = False |
|
|
status['reviewer_done'] = True |
|
|
status['reviewer_activity'] = 'Rejected' |
|
|
|
|
|
|
|
|
if 'Transitioning to CLARIFYING' in line: |
|
|
status['explorer_done'] = True |
|
|
elif 'Transitioning to PLANNING' in line: |
|
|
status['explorer_done'] = True |
|
|
if status['files_indexed'] > 0 or status['files_found'] > 0: |
|
|
status['explorer_activity'] = f'Indexed {status["files_indexed"]} files' |
|
|
if status['files_found'] > 0: |
|
|
status['explorer_activity'] += f', found {status["files_found"]} relevant' |
|
|
else: |
|
|
status['explorer_activity'] = 'Analyzed codebase' |
|
|
elif 'Transitioning to CODING' in line: |
|
|
status['planner_done'] = True |
|
|
elif 'Transitioning to REVIEWING' in line: |
|
|
status['coder_done'] = True |
|
|
elif 'Transitioning to COMPLETE' in line: |
|
|
status['reviewer_done'] = True |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def format_progress_display(status: dict, total_cost: float) -> str: |
|
|
"""Format progress display with specific agent activities.""" |
|
|
|
|
|
def icon(done: bool, active: bool = False) -> str: |
|
|
if done: |
|
|
return "✅" |
|
|
elif active: |
|
|
return "🔄" |
|
|
else: |
|
|
return "⏸️" |
|
|
|
|
|
def get_activity(agent: str) -> str: |
|
|
"""Get activity text for an agent.""" |
|
|
current = status['current_agent'] |
|
|
|
|
|
if agent == 'Explorer': |
|
|
if status['explorer_done']: |
|
|
return status.get('explorer_activity') or 'Complete' |
|
|
elif current == 'Explorer': |
|
|
return status.get('explorer_activity') or 'Analyzing codebase...' |
|
|
return '' |
|
|
|
|
|
elif agent == 'Planner': |
|
|
if status['planner_done']: |
|
|
return 'Complete' |
|
|
elif current == 'Planner': |
|
|
return status.get('planner_activity') or 'Creating plan...' |
|
|
return '' |
|
|
|
|
|
elif agent == 'Coder': |
|
|
if status['coder_done']: |
|
|
activity = status.get('coder_activity') |
|
|
if activity: |
|
|
return activity |
|
|
files = status.get('files_written', 0) |
|
|
return f'Complete ({files} files)' if files else 'Complete' |
|
|
elif current == 'Coder': |
|
|
return status.get('coder_activity') or 'Writing code...' |
|
|
return '' |
|
|
|
|
|
elif agent == 'Reviewer': |
|
|
if status['reviewer_done']: |
|
|
if status['approved']: |
|
|
return '**Approved ✓**' |
|
|
else: |
|
|
return '**Needs revision**' |
|
|
elif current == 'Reviewer': |
|
|
return status.get('reviewer_activity') or 'Reviewing...' |
|
|
return '' |
|
|
|
|
|
return '' |
|
|
|
|
|
current = status['current_agent'] |
|
|
|
|
|
lines = [] |
|
|
|
|
|
|
|
|
done_count = sum([status['explorer_done'], status['planner_done'], |
|
|
status['coder_done'], status['reviewer_done']]) |
|
|
progress_bar = "█" * done_count + "░" * (4 - done_count) |
|
|
lines.append(f"**Progress:** {progress_bar} {done_count}/4 agents") |
|
|
lines.append("") |
|
|
|
|
|
|
|
|
lines.append(f"{icon(status['explorer_done'], current == 'Explorer')} **Explorer** {get_activity('Explorer')}") |
|
|
lines.append(f"{icon(status['planner_done'], current == 'Planner')} **Planner** {get_activity('Planner')}") |
|
|
lines.append(f"{icon(status['coder_done'], current == 'Coder')} **Coder** {get_activity('Coder')}") |
|
|
lines.append(f"{icon(status['reviewer_done'], current == 'Reviewer')} **Reviewer** {get_activity('Reviewer')}") |
|
|
|
|
|
lines.append(f"\n💰 **Cost:** ${total_cost:.4f}") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def analyze_code_quality(code_changes: dict, review_feedback: str) -> dict: |
|
|
"""Analyze code and return test results.""" |
|
|
tests = { |
|
|
'syntax_valid': {'passed': True, 'details': 'No syntax errors detected'}, |
|
|
'imports_valid': {'passed': True, 'details': 'All imports are valid'}, |
|
|
'has_error_handling': {'passed': False, 'details': 'Checking for try/except blocks'}, |
|
|
'security_check': {'passed': True, 'details': 'No obvious security issues'}, |
|
|
'follows_plan': {'passed': True, 'details': 'Implementation matches plan'}, |
|
|
'code_quality': {'passed': True, 'details': 'Clean and readable code'} |
|
|
} |
|
|
|
|
|
|
|
|
for file_path, content in code_changes.items(): |
|
|
|
|
|
if 'try:' in content or 'except' in content or 'raise' in content: |
|
|
tests['has_error_handling']['passed'] = True |
|
|
tests['has_error_handling']['details'] = 'Error handling implemented' |
|
|
|
|
|
|
|
|
dangerous_patterns = ['eval(', 'exec(', 'pickle.loads', '__import__'] |
|
|
found_issues = [p for p in dangerous_patterns if p in content] |
|
|
if found_issues: |
|
|
tests['security_check']['passed'] = False |
|
|
tests['security_check']['details'] = f'Found: {", ".join(found_issues)}' |
|
|
|
|
|
|
|
|
import_lines = [line for line in content.split('\n') if line.strip().startswith(('import ', 'from '))] |
|
|
if import_lines: |
|
|
tests['imports_valid']['details'] = f'{len(import_lines)} imports found' |
|
|
|
|
|
|
|
|
if review_feedback: |
|
|
if 'REJECT' in review_feedback.upper() or 'bug' in review_feedback.lower(): |
|
|
tests['code_quality']['passed'] = False |
|
|
tests['code_quality']['details'] = 'Reviewer found issues' |
|
|
if 'plan' in review_feedback.lower() and 'not' in review_feedback.lower(): |
|
|
tests['follows_plan']['passed'] = False |
|
|
tests['follows_plan']['details'] = 'Does not match plan' |
|
|
|
|
|
return tests |
|
|
|
|
|
|
|
|
def format_test_results_table(tests: dict) -> str: |
|
|
"""Format test results as a nice markdown table.""" |
|
|
lines = [ |
|
|
"## 🧪 Quality Checks\n", |
|
|
"| Test | Status | Details |", |
|
|
"|------|--------|---------|" |
|
|
] |
|
|
|
|
|
test_names = { |
|
|
'syntax_valid': 'Syntax Validation', |
|
|
'imports_valid': 'Import Checks', |
|
|
'has_error_handling': 'Error Handling', |
|
|
'security_check': 'Security Scan', |
|
|
'follows_plan': 'Plan Compliance', |
|
|
'code_quality': 'Code Quality' |
|
|
} |
|
|
|
|
|
for test_key, test_data in tests.items(): |
|
|
test_name = test_names.get(test_key, test_key.replace('_', ' ').title()) |
|
|
status = "✅ Pass" if test_data['passed'] else "❌ Fail" |
|
|
details = test_data['details'] |
|
|
lines.append(f"| {test_name} | {status} | {details} |") |
|
|
|
|
|
|
|
|
passed_count = sum(1 for t in tests.values() if t['passed']) |
|
|
total_count = len(tests) |
|
|
lines.append("") |
|
|
lines.append(f"**Summary:** {passed_count}/{total_count} checks passed") |
|
|
lines.append("") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def format_final_result(result: dict, total_cost: float) -> str: |
|
|
"""Format final result with detailed test checks.""" |
|
|
success = result.get('success', False) |
|
|
code_changes = result.get('code_changes', {}) |
|
|
file_count = len(code_changes) if code_changes else 0 |
|
|
review_feedback = result.get('review_feedback', '') |
|
|
|
|
|
lines = [] |
|
|
|
|
|
|
|
|
if success: |
|
|
lines.append("## ✅ Task Complete!\n") |
|
|
lines.append(f"**Files changed:** {file_count}") |
|
|
lines.append(f"**Review:** Approved ✓") |
|
|
elif code_changes: |
|
|
lines.append("## ⚠️ Code Written (Needs Revision)\n") |
|
|
lines.append(f"**Files changed:** {file_count}") |
|
|
lines.append(f"**Review:** Needs changes") |
|
|
if review_feedback: |
|
|
lines.append(f"\n**Feedback:**\n{review_feedback}") |
|
|
else: |
|
|
lines.append("## ❌ Task Failed\n") |
|
|
error = result.get('error', 'Unknown error') |
|
|
lines.append(f"**Error:** {error}") |
|
|
|
|
|
|
|
|
if code_changes: |
|
|
lines.append("\n") |
|
|
tests = analyze_code_quality(code_changes, review_feedback) |
|
|
lines.append(format_test_results_table(tests)) |
|
|
|
|
|
lines.append(f"💰 **Cost:** ${total_cost:.4f}") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def format_plan_display(plan: str) -> str: |
|
|
"""Format plan cleanly with a simple summary.""" |
|
|
if not plan: |
|
|
return "" |
|
|
|
|
|
lines = ["## 📋 Implementation Plan\n"] |
|
|
|
|
|
|
|
|
|
|
|
plan_lines = plan.split('\n') |
|
|
steps = [] |
|
|
|
|
|
import re |
|
|
for line in plan_lines: |
|
|
stripped = line.strip() |
|
|
|
|
|
if stripped: |
|
|
match = re.match(r'^(\d+)[.)\]:]\s*(.+)', stripped) |
|
|
if match: |
|
|
step_num = match.group(1) |
|
|
step_text = match.group(2).strip() |
|
|
if len(step_text) > 10 and not step_text.startswith('/'): |
|
|
|
|
|
if len(step_text) > 100: |
|
|
step_text = step_text[:97] + '...' |
|
|
steps.append(f"{step_num}. {step_text}") |
|
|
|
|
|
if steps and len(steps) <= 10: |
|
|
|
|
|
lines.extend(steps) |
|
|
else: |
|
|
|
|
|
preview_lines = [l.strip() for l in plan_lines[:8] if l.strip() and not l.strip().startswith('#')] |
|
|
if preview_lines: |
|
|
lines.append('\n'.join(preview_lines[:5])) |
|
|
if len(preview_lines) > 5: |
|
|
lines.append("\n*...plan continues...*") |
|
|
else: |
|
|
lines.append("Plan created successfully") |
|
|
|
|
|
lines.append("") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
@cl.on_chat_start |
|
|
async def start(): |
|
|
"""Initialize the agent system when chat starts.""" |
|
|
|
|
|
print("[CHAINLIT] on_chat_start triggered") |
|
|
|
|
|
await cl.Message( |
|
|
content="👋 **CodePilot ready!**\n\n" |
|
|
"Paste a GitHub URL + your task to get started.\n\n" |
|
|
"*The welcome screen above explains everything you need to know.*" |
|
|
).send() |
|
|
|
|
|
print("[CHAINLIT] Welcome message sent") |
|
|
|
|
|
|
|
|
cl.user_session.set("repo_path", None) |
|
|
cl.user_session.set("repo_info", None) |
|
|
cl.user_session.set("orchestrator", Orchestrator(max_iterations=10)) |
|
|
cl.user_session.set("ready", True) |
|
|
print("[CHAINLIT] Orchestrator created, ready for GitHub repos") |
|
|
|
|
|
|
|
|
@cl.on_chat_end |
|
|
async def end(): |
|
|
"""Cleanup when chat ends.""" |
|
|
repo_path = cl.user_session.get("repo_path") |
|
|
if repo_path: |
|
|
print(f"[CHAINLIT] Cleaning up repo: {repo_path}") |
|
|
cleanup_repository(repo_path) |
|
|
|
|
|
|
|
|
async def run_workflow(orchestrator, task_or_answers, is_resume=False): |
|
|
"""Run the orchestrator workflow and display clean progress.""" |
|
|
|
|
|
|
|
|
progress_msg = cl.Message(content="## Agent Progress\n\nStarting...") |
|
|
await progress_msg.send() |
|
|
|
|
|
try: |
|
|
captured_output = io.StringIO() |
|
|
|
|
|
def run_task(): |
|
|
with redirect_stdout(captured_output), redirect_stderr(captured_output): |
|
|
if is_resume: |
|
|
return orchestrator.resume_after_clarification(task_or_answers) |
|
|
else: |
|
|
return orchestrator.run(task_or_answers) |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
executor = ThreadPoolExecutor(max_workers=1) |
|
|
future = loop.run_in_executor(executor, run_task) |
|
|
|
|
|
|
|
|
total_prompt_tokens = 0 |
|
|
total_completion_tokens = 0 |
|
|
seen_token_lines = set() |
|
|
|
|
|
|
|
|
accumulated_logs = "" |
|
|
while not future.done(): |
|
|
await asyncio.sleep(0.3) |
|
|
|
|
|
current_output = captured_output.getvalue() |
|
|
if current_output != accumulated_logs: |
|
|
accumulated_logs = current_output |
|
|
|
|
|
|
|
|
for line in accumulated_logs.split('\n'): |
|
|
if 'Tokens:' in line and line not in seen_token_lines: |
|
|
seen_token_lines.add(line) |
|
|
try: |
|
|
parts = line.split('Tokens:')[1].strip() |
|
|
prompt = int(parts.split('prompt')[0].strip()) |
|
|
completion = int(parts.split('+')[1].split('completion')[0].strip()) |
|
|
total_prompt_tokens += prompt |
|
|
total_completion_tokens += completion |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
total_cost = (total_prompt_tokens / 1000000) * 3.0 + (total_completion_tokens / 1000000) * 15.0 |
|
|
|
|
|
|
|
|
status = parse_agent_status(accumulated_logs) |
|
|
progress_msg.content = format_progress_display(status, total_cost) |
|
|
await progress_msg.update() |
|
|
|
|
|
|
|
|
result = await future |
|
|
|
|
|
|
|
|
final_logs = captured_output.getvalue() |
|
|
for line in final_logs.split('\n'): |
|
|
if 'Tokens:' in line and line not in seen_token_lines: |
|
|
seen_token_lines.add(line) |
|
|
try: |
|
|
parts = line.split('Tokens:')[1].strip() |
|
|
prompt = int(parts.split('prompt')[0].strip()) |
|
|
completion = int(parts.split('+')[1].split('completion')[0].strip()) |
|
|
total_prompt_tokens += prompt |
|
|
total_completion_tokens += completion |
|
|
except: |
|
|
pass |
|
|
|
|
|
total_cost = (total_prompt_tokens / 1000000) * 3.0 + (total_completion_tokens / 1000000) * 15.0 |
|
|
|
|
|
|
|
|
status = parse_agent_status(final_logs) |
|
|
progress_msg.content = format_progress_display(status, total_cost) |
|
|
await progress_msg.update() |
|
|
|
|
|
return result, total_cost |
|
|
|
|
|
except Exception as e: |
|
|
await cl.Message(content=f"## Error\n\n```\n{str(e)}\n```").send() |
|
|
return None, 0 |
|
|
|
|
|
|
|
|
@cl.on_message |
|
|
async def main(message: cl.Message): |
|
|
"""Handle user messages and run the agent workflow.""" |
|
|
|
|
|
if not cl.user_session.get("ready"): |
|
|
await cl.Message(content="System is still initializing, please wait...").send() |
|
|
return |
|
|
|
|
|
orchestrator: Orchestrator = cl.user_session.get("orchestrator") |
|
|
|
|
|
|
|
|
if cl.user_session.get("waiting_for_clarification"): |
|
|
cl.user_session.set("waiting_for_clarification", False) |
|
|
user_answers = message.content |
|
|
|
|
|
await cl.Message(content="Thanks! Creating plan with your input...").send() |
|
|
|
|
|
result, total_cost = await run_workflow(orchestrator, user_answers, is_resume=True) |
|
|
|
|
|
if result: |
|
|
|
|
|
if result.get('plan'): |
|
|
await cl.Message(content=format_plan_display(result['plan'])).send() |
|
|
|
|
|
|
|
|
if result.get('code_changes'): |
|
|
await cl.Message(content=format_code_output(result['code_changes'])).send() |
|
|
|
|
|
|
|
|
await cl.Message(content=format_final_result(result, total_cost)).send() |
|
|
return |
|
|
|
|
|
|
|
|
orchestrator = Orchestrator(max_iterations=10) |
|
|
cl.user_session.set("orchestrator", orchestrator) |
|
|
print("[CHAINLIT] Created fresh orchestrator for new task") |
|
|
|
|
|
|
|
|
github_url = extract_github_url(message.content) |
|
|
task_context = "" |
|
|
repo_path = None |
|
|
|
|
|
if github_url: |
|
|
clone_msg = await cl.Message(content=f"📦 Cloning `{github_url}`...").send() |
|
|
|
|
|
success, result, repo_name = clone_repository(github_url) |
|
|
|
|
|
if success: |
|
|
repo_path = result |
|
|
repo_info = get_repo_info(repo_path) |
|
|
|
|
|
cl.user_session.set("repo_path", repo_path) |
|
|
cl.user_session.set("repo_info", repo_info) |
|
|
|
|
|
|
|
|
try: |
|
|
index_codebase(repo_path) |
|
|
except Exception as e: |
|
|
print(f"[CHAINLIT] Indexing failed: {e}") |
|
|
|
|
|
languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" |
|
|
|
|
|
task_context = f""" |
|
|
[REPOSITORY CONTEXT] |
|
|
Repository: {repo_name} |
|
|
Path: {repo_path} |
|
|
Total Files: {repo_info['total_files']} |
|
|
Languages: {languages} |
|
|
|
|
|
AVAILABLE TOOLS: |
|
|
- search_repository: Search using BM25 |
|
|
- read_file: Read file (use full path: {repo_path}/...) |
|
|
- search_code: Grep for patterns |
|
|
""" |
|
|
clone_msg.content = f"✅ **Cloned:** {repo_name} ({repo_info['total_files']} files, {languages})" |
|
|
await clone_msg.update() |
|
|
else: |
|
|
clone_msg.content = f"❌ **Failed to clone:** {result}" |
|
|
await clone_msg.update() |
|
|
return |
|
|
|
|
|
elif cl.user_session.get("repo_path"): |
|
|
repo_path = cl.user_session.get("repo_path") |
|
|
repo_info = cl.user_session.get("repo_info") |
|
|
if repo_info: |
|
|
languages = ", ".join(repo_info["languages"][:5]) if repo_info["languages"] else "Unknown" |
|
|
task_context = f""" |
|
|
[REPOSITORY CONTEXT] |
|
|
Repository: {repo_info['name']} |
|
|
Path: {repo_path} |
|
|
Total Files: {repo_info['total_files']} |
|
|
Languages: {languages} |
|
|
|
|
|
AVAILABLE TOOLS: |
|
|
- search_repository: Search using BM25 |
|
|
- read_file: Read file (use full path: {repo_path}/...) |
|
|
- search_code: Grep for patterns |
|
|
""" |
|
|
|
|
|
|
|
|
user_query = message.content |
|
|
if github_url: |
|
|
user_query = re.sub(r'https?://github\.com/[^\s]+', '', user_query).strip() |
|
|
|
|
|
full_task = task_context + "\n\n" + user_query if task_context else user_query |
|
|
|
|
|
|
|
|
if repo_path: |
|
|
os.environ['CODEPILOT_REPO_PATH'] = repo_path |
|
|
print(f"[CHAINLIT] Set CODEPILOT_REPO_PATH={repo_path}") |
|
|
|
|
|
|
|
|
result, total_cost = await run_workflow(orchestrator, full_task, is_resume=False) |
|
|
|
|
|
if not result: |
|
|
return |
|
|
|
|
|
|
|
|
if result.get('status') == 'clarifying' and result.get('clarifying_questions'): |
|
|
cl.user_session.set("waiting_for_clarification", True) |
|
|
await cl.Message( |
|
|
content=f"## Before I proceed, I have some questions:\n\n{result['clarifying_questions']}\n\n**Please answer above to continue.**" |
|
|
).send() |
|
|
return |
|
|
|
|
|
|
|
|
if result.get('plan'): |
|
|
await cl.Message(content=format_plan_display(result['plan'])).send() |
|
|
|
|
|
|
|
|
if result.get('code_changes'): |
|
|
await cl.Message(content=format_code_output(result['code_changes'])).send() |
|
|
|
|
|
|
|
|
await cl.Message(content=format_final_result(result, total_cost)).send() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit("Run with: chainlit run chainlit_app.py") |
|
|
|