Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Agent Manager | |
| This module provides the main orchestrator for the Code Review Agent. | |
| It coordinates the review process and manages the state of the application. | |
| """ | |
| import os | |
| import time | |
| import logging | |
| import tempfile | |
| import json | |
| from datetime import datetime | |
| import gradio as gr | |
| from src.core.language_detector import LanguageDetector | |
| from src.services.code_analyzer import CodeAnalyzer | |
| from src.services.report_generator import ReportGenerator | |
| from src.services.repository_service import RepositoryService | |
| from src.services.security_scanner import SecurityScanner | |
| from src.services.performance_analyzer import PerformanceAnalyzer | |
| logger = logging.getLogger(__name__) | |
| class AgentManager: | |
| """ | |
| Main orchestrator for the Code Review Agent. | |
| This class coordinates the review process, manages the application state, | |
| and provides the interface between the UI and the business logic. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the AgentManager. | |
| """ | |
| # Initialize state management | |
| self.state = { | |
| 'repo_url': None, | |
| 'progress': {}, | |
| 'results': {}, | |
| 'current_step': None | |
| } | |
| # Initialize services | |
| self.language_detector = LanguageDetector() | |
| self.code_analyzer = CodeAnalyzer() | |
| self.report_generator = ReportGenerator() | |
| self.repository_service = RepositoryService() | |
| self.security_scanner = SecurityScanner() | |
| self.performance_analyzer = PerformanceAnalyzer() | |
| self.temp_dir = tempfile.mkdtemp(prefix="code_review_agent_") | |
| logger.info(f"Initialized AgentManager with temp directory: {self.temp_dir}") | |
| def start_review(self, repo_url, github_token=None, selected_languages=None): | |
| """ | |
| Start the code review process for a GitHub repository. | |
| Args: | |
| repo_url (str): The URL of the GitHub repository to review. | |
| github_token (str, optional): GitHub authentication token for private repositories. | |
| selected_languages (list, optional): List of languages to analyze. If None, | |
| languages will be auto-detected. | |
| Returns: | |
| tuple: (progress_group, overall_progress, status_message, results_dashboard) - Updated UI components. | |
| """ | |
| # Initialize progress components outside the try block | |
| progress_group = gr.Group(visible=True) | |
| overall_progress = gr.Slider(value=0) | |
| status_message = gr.Markdown("*Starting review...*") | |
| try: | |
| # Initialize state for new review | |
| self.state = { | |
| 'repo_url': repo_url, | |
| 'progress': {}, | |
| 'results': {}, | |
| 'current_step': None | |
| } | |
| # Clone repository | |
| self._update_progress("Repository Cloning", 0, overall_progress, status_message) | |
| repo_path = self._clone_repository(repo_url, github_token) | |
| self._update_progress("Repository Cloning", 100, overall_progress, status_message) | |
| # Detect languages | |
| self._update_progress("Language Detection", 0, overall_progress, status_message) | |
| if selected_languages and len(selected_languages) > 0: | |
| languages = selected_languages | |
| logger.info(f"Using selected languages: {languages}") | |
| else: | |
| languages = self.language_detector.detect_languages(repo_path) | |
| logger.info(f"Auto-detected languages: {languages}") | |
| self.state['languages'] = languages | |
| self._update_progress("Language Detection", 100, overall_progress, status_message) | |
| # Perform code analysis | |
| self._update_progress("Code Analysis", 0, overall_progress, status_message) | |
| code_analysis_results = self.code_analyzer.analyze_repository(repo_path, languages) | |
| self.state['results']['code_analysis'] = code_analysis_results | |
| self._update_progress("Code Analysis", 100, overall_progress, status_message) | |
| # Perform security scanning | |
| self._update_progress("Security Scanning", 0, overall_progress, status_message) | |
| security_results = self.security_scanner.scan_repository(repo_path, languages) | |
| self.state['results']['security'] = security_results | |
| self._update_progress("Security Scanning", 100, overall_progress, status_message) | |
| # Perform performance analysis | |
| self._update_progress("Performance Analysis", 0, overall_progress, status_message) | |
| performance_results = self.performance_analyzer.analyze_repository(repo_path, languages) | |
| self.state['results']['performance'] = performance_results | |
| self._update_progress("Performance Analysis", 100, overall_progress, status_message) | |
| # Perform AI review | |
| self._update_progress("AI Review", 0, overall_progress, status_message) | |
| ai_review_results = self._perform_ai_review(repo_path, languages) | |
| self.state['results']['ai_review'] = ai_review_results | |
| self._update_progress("AI Review", 100, overall_progress, status_message) | |
| # Generate report | |
| self._update_progress("Report Generation", 0, overall_progress, status_message) | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| report_paths = self.report_generator.generate_report( | |
| repo_name, self.state['results'] | |
| ) | |
| self.state['report_paths'] = report_paths | |
| self._update_progress("Report Generation", 100, overall_progress, status_message) | |
| # Update results dashboard | |
| results_dashboard = self._create_results_dashboard(self.state['results']) | |
| results_dashboard.update(visible=True) | |
| return progress_group, overall_progress, status_message, results_dashboard | |
| except Exception as e: | |
| logger.exception(f"Error during code review: {e}") | |
| # Update progress components with error | |
| status_message.update(value=f"*Error: {str(e)}*") | |
| return progress_group, overall_progress, status_message, None | |
| def export_report(self, results_dashboard, export_format): | |
| """ | |
| Export the code review report in the specified format. | |
| Args: | |
| results_dashboard: The results dashboard component. | |
| export_format (str): The format to export the report in ('pdf', 'json', 'html', 'csv'). | |
| Returns: | |
| str: The path to the exported file. | |
| """ | |
| try: | |
| if not self.state.get('results'): | |
| logger.warning("No results available to export") | |
| return None | |
| # Get the actual format value from the textbox component | |
| format_value = export_format.value if hasattr(export_format, 'value') else export_format | |
| # Create exports directory if it doesn't exist | |
| exports_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'exports') | |
| os.makedirs(exports_dir, exist_ok=True) | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| repo_name = self.state['repo_url'].split('/')[-1].replace('.git', '') | |
| filename = f"{repo_name}_review_{timestamp}.{format_value}" | |
| filepath = os.path.join(exports_dir, filename) | |
| # Export report in the specified format using report_generator | |
| report_paths = self.report_generator.generate_report( | |
| repo_name, self.state['results'], format_value | |
| ) | |
| if format_value in report_paths: | |
| return report_paths[format_value] | |
| else: | |
| logger.warning(f"Unsupported export format: {format_value}") | |
| return None | |
| logger.info(f"Exported report to {filepath}") | |
| return filepath | |
| except Exception as e: | |
| logger.exception(f"Error exporting report: {e}") | |
| return None | |
| def _clone_repository(self, repo_url, github_token=None): | |
| """ | |
| Clone the GitHub repository to a temporary directory. | |
| Args: | |
| repo_url (str): The URL of the GitHub repository to clone. | |
| github_token (str, optional): GitHub authentication token for private repositories. | |
| Returns: | |
| str: The path to the cloned repository. | |
| """ | |
| # Import the repository service here to avoid circular imports | |
| from src.services.repository_service import RepositoryService | |
| # Create a repository service instance | |
| repo_service = RepositoryService(base_temp_dir=self.temp_dir) | |
| # Clone the repository using the service | |
| try: | |
| # If a GitHub token is provided, use it for authentication | |
| if github_token and github_token.strip(): | |
| # Modify the URL to include the token for authentication | |
| auth_url = repo_url.replace('https://', f'https://{github_token}@') | |
| repo_path = repo_service.clone_repository(auth_url) | |
| logger.info(f"Cloned repository using GitHub token authentication") | |
| else: | |
| # Clone without authentication (for public repositories) | |
| repo_path = repo_service.clone_repository(repo_url) | |
| logger.info(f"Cloned repository without authentication") | |
| return repo_path | |
| except Exception as e: | |
| logger.error(f"Error cloning repository: {e}") | |
| raise | |
| def _perform_ai_review(self, repo_path, languages): | |
| """ | |
| Perform AI-powered code review. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| languages (list): List of programming languages to analyze. | |
| Returns: | |
| dict: AI review results. | |
| """ | |
| try: | |
| # This is a placeholder for AI review functionality | |
| # In a real implementation, this would use the MCP AI review service | |
| from src.mcp.ai_review import AIReviewMCP | |
| ai_reviewer = AIReviewMCP() | |
| results = ai_reviewer.review_repository(repo_path, languages) | |
| logger.info(f"AI review completed for {len(languages)} languages") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error during AI review: {e}") | |
| return { | |
| 'error': str(e), | |
| 'suggestions': [], | |
| 'issues': [] | |
| } | |
| def _update_progress(self, step, value, overall_progress, status_message): | |
| """ | |
| Update the progress components for a specific step. | |
| Args: | |
| step (str): The step to update. | |
| value (int): The progress value (0-100). | |
| overall_progress: The overall progress slider component. | |
| status_message: The status message markdown component. | |
| """ | |
| # Update state | |
| self.state['progress'][step] = value | |
| self.state['current_step'] = step | |
| # Calculate overall progress | |
| total_steps = 7 # Total number of steps in the review process | |
| completed_steps = sum(1 for v in self.state['progress'].values() if v == 100) | |
| current_step_progress = value if step in self.state['progress'] else 0 | |
| overall_value = (completed_steps * 100 + current_step_progress) / total_steps | |
| # Update UI components | |
| overall_progress.update(value=overall_value) | |
| status_message.update(value=f"*{step}: {value}%*") | |
| logger.info(f"Progress update: {step} - {value}% (Overall: {overall_value:.1f}%)") | |
| time.sleep(0.5) # Simulate progress update time | |
| def _create_results_dashboard(self, report): | |
| """ | |
| Create a results dashboard component for the UI. | |
| Args: | |
| report (dict): The code review report. | |
| Returns: | |
| object: A results dashboard component. | |
| """ | |
| # This is a placeholder. In a real implementation, this would create a | |
| # results dashboard component for the UI. | |
| class ResultsDashboard: | |
| def __init__(self): | |
| self.visible = False | |
| def update(self, visible=None): | |
| if visible is not None: | |
| self.visible = visible | |
| return self | |
| return ResultsDashboard() |