from autogen import ConversableAgent import os from typing import List, Dict, Optional, Callable import time import streamlit as st from config import DefaultSettings, GeminiLLMConfig, OpenAILLMConfig, extension_to_language def load_css(file_path): """Load and inject CSS styles from file.""" try: with open(file_path, "r") as f: css = f.read() st.markdown(f"", unsafe_allow_html=True) except Exception as e: st.error(f"Failed to load CSS: {str(e)}") def load_config( model_type: str) -> dict: """ Load the configuration for a given model type. Args: - model_type (str): The type of model to load the config for. Supported options are 'google' and 'openai'. Returns: - dict: A dictionary containing the configuration for the specified model type. """ if model_type == 'google': return GeminiLLMConfig().get_llm_config() elif model_type == 'openai': return OpenAILLMConfig().get_llm_config() else: raise ValueError(f"Unsupported model type: {model_type}") def get_directory_tree( path: str, indent: str = "", exclude_folder: str = "__pycache__" ) -> str: """ Returns a tree-like string representation of the directory at 'path', skipping excluded folders. """ result = "" for item in os.listdir(path): if item == exclude_folder: continue full_path = os.path.join(path, item) if os.path.isdir(full_path): result += f"{indent}📁 {item}/\n" result += get_directory_tree(full_path, indent + " ", exclude_folder) else: result += f"{indent}📄 {item}\n" return result def detect_language_by_extension(file_path: str) -> str: """ Detect the language of a file by its extension. Args: file_path (str): The path to the file to detect. Returns: str: The detected language. """ _, ext = os.path.splitext(file_path.lower()) return extension_to_language.get(ext, 'Unknown') def get_supported_code_files(directory_path: str) -> List[str]: """ Scans a directory for files with supported programming language extensions. Args: directory_path (str): Path to the directory to scan Returns: List[str]: List of absolute paths to supported code files """ code_files = [] for root, _, files in os.walk(directory_path): for file in files: _, ext = os.path.splitext(file) if ext in extension_to_language: full_path = os.path.join(root, file) code_files.append(full_path) return code_files def analyze_code_with_agents( source_code: str, analysis_agents: List[ConversableAgent] ) -> Dict[str, str]: """ Analyzes source code using multiple code analysis agents. Args: source_code (str): The code to analyze analysis_agents (List[ConversableAgent]): List of agents to perform analysis Returns: Dict[str, str]: A dictionary mapping each agent's name to its analysis result """ results = {} for agent in analysis_agents: agent_response = agent.generate_reply(messages=[{'role': 'user', 'content': source_code}]) results[agent.name] = agent_response['content'] return results def analyze_code_file( file_path: str, analysis_agents: List[ConversableAgent] ) -> Optional[Dict[str, str]]: """ Analyzes a single code file using multiple analysis agents. Args: file_path (str): Path to the code file to analyze analysis_agents (List[ConversableAgent]): List of agents to perform analysis Returns: Optional[Dict[str, str]]: Analysis results if successful, None if an error occurred """ try: with open(file_path, 'r', encoding='utf-8') as file: code = file.read() analysis_results = analyze_code_with_agents(code, analysis_agents) return analysis_results except UnicodeDecodeError: print(f"Skipping binary file {file_path}") return None except PermissionError: print(f"Permission denied for file {file_path}") return None except Exception as e: print(f"Error processing file {file_path}: {str(e)}") return None def analyze_directory_with_agents( directory_path: str, analysis_agents: List[ConversableAgent], time_between_analysis: int = 10, verbose: bool = False, callback_func: Callable[[int], None] = None ) -> Dict[str, Dict[str, str]]: """ Analyzes all code files in a directory using multiple analysis agents. Args: directory_path (str): Path to the directory to analyze analysis_agents (List[ConversableAgent]): List of agents to perform analysis verbose (bool): Whether to print verbose output (default: False) Returns: Dict[str, Dict[str, str]]: A nested dictionary where: - Outer keys are file paths - Inner keys are agent names - Values are agent analysis results """ if verbose: print(f"Started Analyzing using these agents {[agent.name for agent in analysis_agents]} [+]") analysis_results = {} # Get all supported code files in the directory code_files = get_supported_code_files(directory_path) # Progress Value progress_value = int( 100 / (len(code_files) + 1)) # Process each code file for idx, file_path in enumerate(code_files): if callback_func: callback_func(idx, file_path, progress_value) try: agents_names = [agent.name for agent in analysis_agents] file_language = detect_language_by_extension(file_path) file_analysis = analyze_code_file(file_path, analysis_agents) if file_analysis is not None: analysis_results[file_path] = { "language": file_language, "analysis_results": file_analysis } if verbose: print(f"Analyzed {file_path} Successfully using agents {agents_names} [+]") except Exception as e: print(f"Error processing file {file_path}: {str(e)}") continue time.sleep(time_between_analysis) if verbose: print(f"Analyzed {len(code_files)} files in {directory_path}") return analysis_results