Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Agent Manager | |
| This module provides the main orchestrator for the Code Review Agent. | |
| It coordinates the review process and manages the state of the application. | |
| """ | |
| import os | |
| import time | |
| import logging | |
| import tempfile | |
| import json | |
| import threading | |
| import concurrent.futures | |
| from datetime import datetime | |
| import gradio as gr | |
| from src.core.language_detector import LanguageDetector | |
| from src.services.code_analyzer import CodeAnalyzer | |
| from src.services.report_generator import ReportGenerator | |
| from src.services.repository_service import RepositoryService | |
| from src.services.security_scanner import SecurityScanner | |
| from src.services.performance_analyzer import PerformanceAnalyzer | |
| logger = logging.getLogger(__name__) | |
| class AgentManager: | |
| """ | |
| Main orchestrator for the Code Review Agent. | |
| This class coordinates the review process, manages the application state, | |
| and provides the interface between the UI and the business logic. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the AgentManager. | |
| """ | |
| # Initialize state management | |
| self.state = { | |
| 'repo_url': None, | |
| 'progress': {}, | |
| 'results': {}, | |
| 'current_step': None | |
| } | |
| # Initialize services | |
| self.language_detector = LanguageDetector() | |
| self.code_analyzer = CodeAnalyzer() | |
| self.report_generator = ReportGenerator() | |
| self.repository_service = RepositoryService() | |
| self.security_scanner = SecurityScanner() | |
| self.performance_analyzer = PerformanceAnalyzer() | |
| self.temp_dir = tempfile.mkdtemp(prefix="code_review_agent_") | |
| logger.info(f"Initialized AgentManager with temp directory: {self.temp_dir}") | |
| def start_review(self, repo_url, github_token=None, selected_languages=None, progress_components=None): | |
| """ | |
| Start the code review process for a GitHub repository. | |
| Args: | |
| repo_url (str): The URL of the GitHub repository to review. | |
| github_token (str, optional): GitHub authentication token for private repositories. | |
| selected_languages (list, optional): List of languages to analyze. If None, | |
| languages will be auto-detected. | |
| progress_components (tuple, optional): Tuple containing (progress_group, overall_progress, status_message, step_progress) | |
| from create_progress_tracker(). | |
| Returns: | |
| tuple: (progress_group, overall_progress, status_message, results_dashboard) - Updated UI components. | |
| """ | |
| # Initialize or use provided progress components | |
| if progress_components: | |
| progress_group, overall_progress, status_message, step_progress = progress_components | |
| else: | |
| progress_group = gr.Group(visible=True) | |
| overall_progress = gr.Slider(value=0) | |
| status_message = gr.Markdown("*Starting review...*") | |
| step_progress = {} | |
| try: | |
| # Initialize state | |
| self.state = { | |
| 'repo_url': repo_url, | |
| 'progress': {}, | |
| 'results': {}, | |
| 'current_step': None | |
| } | |
| # Store step progress components | |
| self.step_progress = step_progress | |
| # Clone repository | |
| self._update_progress("Repository Cloning", 0, overall_progress, status_message) | |
| repo_path = self._clone_repository(repo_url, github_token) | |
| self._update_progress("Repository Cloning", 100, overall_progress, status_message) | |
| # Detect languages | |
| self._update_progress("Language Detection", 0, overall_progress, status_message) | |
| if selected_languages and len(selected_languages) > 0: | |
| languages = selected_languages | |
| logger.info(f"Using selected languages: {languages}") | |
| else: | |
| languages = self.language_detector.detect_languages(repo_path) | |
| logger.info(f"Auto-detected languages: {languages}") | |
| self.state['languages'] = languages | |
| self._update_progress("Language Detection", 100, overall_progress, status_message) | |
| # Initialize progress for all steps | |
| self._update_progress("Code Analysis", 0, overall_progress, status_message) | |
| self._update_progress("Security Scanning", 0, overall_progress, status_message) | |
| self._update_progress("Performance Analysis", 0, overall_progress, status_message) | |
| self._update_progress("AI Review", 0, overall_progress, status_message) | |
| # Create a thread lock for updating shared state | |
| lock = threading.Lock() | |
| results = {} | |
| # Define worker functions for each analysis type | |
| def run_code_analysis(): | |
| try: | |
| code_results = self.code_analyzer.analyze_repository(repo_path, languages) | |
| with lock: | |
| results['code_analysis'] = code_results | |
| self._update_progress("Code Analysis", 100, overall_progress, status_message) | |
| except Exception as e: | |
| logger.error(f"Error in code analysis thread: {e}") | |
| with lock: | |
| results['code_analysis'] = {'status': 'error', 'error': str(e)} | |
| self._update_progress("Code Analysis", 100, overall_progress, status_message) | |
| def run_security_scan(): | |
| try: | |
| security_results = self.security_scanner.scan_repository(repo_path, languages) | |
| with lock: | |
| results['security'] = security_results | |
| self._update_progress("Security Scanning", 100, overall_progress, status_message) | |
| except Exception as e: | |
| logger.error(f"Error in security scanning thread: {e}") | |
| with lock: | |
| results['security'] = {'status': 'error', 'error': str(e)} | |
| self._update_progress("Security Scanning", 100, overall_progress, status_message) | |
| def run_performance_analysis(): | |
| try: | |
| perf_results = self.performance_analyzer.analyze_repository(repo_path, languages) | |
| with lock: | |
| results['performance'] = perf_results | |
| self._update_progress("Performance Analysis", 100, overall_progress, status_message) | |
| except Exception as e: | |
| logger.error(f"Error in performance analysis thread: {e}") | |
| with lock: | |
| results['performance'] = {'status': 'error', 'error': str(e)} | |
| self._update_progress("Performance Analysis", 100, overall_progress, status_message) | |
| def run_ai_review(): | |
| try: | |
| ai_results = self._perform_ai_review(repo_path, languages) | |
| with lock: | |
| results['ai_review'] = ai_results | |
| self._update_progress("AI Review", 100, overall_progress, status_message) | |
| except Exception as e: | |
| logger.error(f"Error in AI review thread: {e}") | |
| with lock: | |
| results['ai_review'] = {'status': 'error', 'error': str(e)} | |
| self._update_progress("AI Review", 100, overall_progress, status_message) | |
| # Run all analysis tasks in parallel using ThreadPoolExecutor | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
| executor.submit(run_code_analysis) | |
| executor.submit(run_security_scan) | |
| executor.submit(run_performance_analysis) | |
| executor.submit(run_ai_review) | |
| # Wait for all tasks to complete | |
| executor.shutdown(wait=True) | |
| # Update the state with all results | |
| with lock: | |
| self.state['results'].update(results) | |
| # Get repository info | |
| repo_info = self.repository_service.get_repository_info(repo_path) | |
| self.state['results']['repository_info'] = repo_info | |
| # Generate report | |
| self._update_progress("Report Generation", 0, overall_progress, status_message) | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| report_paths = self.report_generator.generate_report( | |
| repo_name, self.state['results'] | |
| ) | |
| self.state['report_paths'] = report_paths | |
| self._update_progress("Report Generation", 100, overall_progress, status_message) | |
| # Update results dashboard | |
| results_dashboard = self._create_results_dashboard(self.state['results']) | |
| results_dashboard.visible = True | |
| return progress_group, overall_progress, status_message, results_dashboard | |
| except Exception as e: | |
| logger.exception(f"Error during code review: {e}") | |
| # Update progress components with error | |
| status_message.value = f"*Error: {str(e)}*" | |
| return progress_group, overall_progress, status_message, None | |
| def export_report(self, results_dashboard, export_format): | |
| """ | |
| Export the code review report in the specified format. | |
| Args: | |
| results_dashboard: The results dashboard component. | |
| export_format (str): The format to export the report in ('pdf', 'json', 'html', 'csv'). | |
| Returns: | |
| str: The path to the exported file. | |
| """ | |
| try: | |
| if not self.state.get('results'): | |
| logger.warning("No results available to export") | |
| return None | |
| # Get the actual format value from the textbox component | |
| format_value = export_format.value if hasattr(export_format, 'value') else export_format | |
| # Create exports directory if it doesn't exist | |
| exports_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'exports') | |
| os.makedirs(exports_dir, exist_ok=True) | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| repo_name = self.state['repo_url'].split('/')[-1].replace('.git', '') | |
| filename = f"{repo_name}_review_{timestamp}.{format_value}" | |
| filepath = os.path.join(exports_dir, filename) | |
| # Export report in the specified format using report_generator | |
| report_paths = self.report_generator.generate_report( | |
| repo_name, self.state['results'], format_value | |
| ) | |
| if format_value in report_paths: | |
| return report_paths[format_value] | |
| else: | |
| logger.warning(f"Unsupported export format: {format_value}") | |
| return None | |
| logger.info(f"Exported report to {filepath}") | |
| return filepath | |
| except Exception as e: | |
| logger.exception(f"Error exporting report: {e}") | |
| return None | |
| def _clone_repository(self, repo_url, github_token=None): | |
| """ | |
| Clone the GitHub repository to a temporary directory. | |
| Args: | |
| repo_url (str): The URL of the GitHub repository to clone. | |
| github_token (str, optional): GitHub authentication token for private repositories. | |
| Returns: | |
| str: The path to the cloned repository. | |
| """ | |
| # Import the repository service here to avoid circular imports | |
| from src.services.repository_service import RepositoryService | |
| # Create a repository service instance | |
| repo_service = RepositoryService(base_temp_dir=self.temp_dir) | |
| # Clone the repository using the service | |
| try: | |
| # If a GitHub token is provided, use it for authentication | |
| if github_token and github_token.strip(): | |
| # Modify the URL to include the token for authentication | |
| auth_url = repo_url.replace('https://', f'https://{github_token}@') | |
| repo_path = repo_service.clone_repository(auth_url) | |
| logger.info(f"Cloned repository using GitHub token authentication") | |
| else: | |
| # Clone without authentication (for public repositories) | |
| repo_path = repo_service.clone_repository(repo_url) | |
| logger.info(f"Cloned repository without authentication") | |
| return repo_path | |
| except Exception as e: | |
| logger.error(f"Error cloning repository: {e}") | |
| raise | |
| def _perform_ai_review(self, repo_path, languages): | |
| """ | |
| Perform AI-powered code review with parallel processing. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| languages (list): List of programming languages to analyze. | |
| Returns: | |
| dict: AI review results. | |
| """ | |
| try: | |
| # Import and use the AI review service | |
| from src.mcp.ai_review import AIReviewService | |
| import os | |
| ai_reviewer = AIReviewService() | |
| # Check if AI review is available | |
| if not ai_reviewer.is_available(): | |
| logger.warning("AI review service is not available. Please set NEBIUS_API_KEY in environment variables.") | |
| return { | |
| 'error': 'AI review service is not available. Please set NEBIUS_API_KEY in environment variables.', | |
| 'suggestions': [], | |
| 'issues': [] | |
| } | |
| # Get all files in the repository | |
| all_files = [] | |
| language_extensions = { | |
| 'Python': ['.py'], | |
| 'JavaScript': ['.js'], | |
| 'TypeScript': ['.ts', '.tsx'], | |
| 'Java': ['.java'], | |
| 'Go': ['.go'], | |
| 'Rust': ['.rs'] | |
| } | |
| # Create a list of extensions to look for based on selected languages | |
| extensions_to_check = [] | |
| for lang in languages: | |
| if lang in language_extensions: | |
| extensions_to_check.extend(language_extensions[lang]) | |
| # Find all files with the specified extensions | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| _, ext = os.path.splitext(file_path) | |
| if ext in extensions_to_check: | |
| all_files.append(file_path) | |
| # Limit the number of files to review to avoid excessive processing | |
| max_files = 20 | |
| if len(all_files) > max_files: | |
| logger.warning(f"Too many files to review ({len(all_files)}). Limiting to {max_files} files.") | |
| all_files = all_files[:max_files] | |
| # Process files in parallel | |
| # Pass None for the optional analysis_results parameter | |
| results = ai_reviewer.review_repository(repo_path, all_files, languages, None) | |
| logger.info(f"AI review completed for {len(all_files)} files across {len(languages)} languages") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error during AI review: {e}") | |
| return { | |
| 'error': str(e), | |
| 'suggestions': [], | |
| 'issues': [] | |
| } | |
| def _update_progress(self, step, value, overall_progress=None, status_message=None): | |
| """Update progress for a specific step and overall progress.""" | |
| # Update state | |
| self.state['current_step'] = step | |
| self.state['progress'][step] = value | |
| # Calculate overall progress | |
| total_steps = len(self.state['progress']) | |
| if total_steps > 0: | |
| overall = sum(self.state['progress'].values()) / total_steps | |
| else: | |
| overall = 0 | |
| # Update UI components if provided | |
| if overall_progress is not None: | |
| overall_progress.value = overall | |
| if status_message is not None: | |
| status_message.value = f"*Progress update: {step} - {value}% (Overall: {overall:.1f}%)*" | |
| # Update step progress if available | |
| if hasattr(self, 'step_progress') and step in self.step_progress: | |
| self.step_progress[step].value = value | |
| # Log progress | |
| logger.info(f"Progress update: {step} - {value}% (Overall: {overall:.1f}%)") | |
| def _create_results_dashboard(self, report): | |
| """ | |
| Create a results dashboard component for the UI. | |
| Args: | |
| report (dict): The code review report. | |
| Returns: | |
| gr.Tabs: A Gradio results dashboard component. | |
| """ | |
| # Import the create_results_dashboard function from the UI components | |
| from src.ui.components.results_dashboard import create_results_dashboard | |
| # Create a new results dashboard component | |
| results_dashboard = create_results_dashboard() | |
| # Set the visibility to True | |
| results_dashboard.visible = True | |
| # In a full implementation, we would populate the dashboard with data from the report | |
| # For now, we're just returning the empty dashboard component | |
| return results_dashboard |