Spaces:
Sleeping
Sleeping
File size: 6,780 Bytes
5249394 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
from autogen import ConversableAgent
import os
from typing import List, Dict, Optional, Callable
import time
import streamlit as st
from config import DefaultSettings, GeminiLLMConfig, OpenAILLMConfig, extension_to_language
def load_css(file_path):
"""Load and inject CSS styles from file."""
try:
with open(file_path, "r") as f:
css = f.read()
st.markdown(f"<style>{css}</style>", unsafe_allow_html=True)
except Exception as e:
st.error(f"Failed to load CSS: {str(e)}")
def load_config(
model_type: str) -> dict:
"""
Load the configuration for a given model type.
Args:
- model_type (str): The type of model to load the config for. Supported options are 'google' and 'openai'.
Returns:
- dict: A dictionary containing the configuration for the specified model type.
"""
if model_type == 'google':
return GeminiLLMConfig().get_llm_config()
elif model_type == 'openai':
return OpenAILLMConfig().get_llm_config()
else:
raise ValueError(f"Unsupported model type: {model_type}")
def get_directory_tree(
path: str,
indent: str = "",
exclude_folder: str = "__pycache__"
) -> str:
"""
Returns a tree-like string representation of the directory at 'path', skipping excluded folders.
"""
result = ""
for item in os.listdir(path):
if item == exclude_folder:
continue
full_path = os.path.join(path, item)
if os.path.isdir(full_path):
result += f"{indent}📁 {item}/\n"
result += get_directory_tree(full_path, indent + " ", exclude_folder)
else:
result += f"{indent}📄 {item}\n"
return result
def detect_language_by_extension(file_path: str) -> str:
"""
Detect the language of a file by its extension.
Args:
file_path (str): The path to the file to detect.
Returns:
str: The detected language.
"""
_, ext = os.path.splitext(file_path.lower())
return extension_to_language.get(ext, 'Unknown')
def get_supported_code_files(directory_path: str) -> List[str]:
"""
Scans a directory for files with supported programming language extensions.
Args:
directory_path (str): Path to the directory to scan
Returns:
List[str]: List of absolute paths to supported code files
"""
code_files = []
for root, _, files in os.walk(directory_path):
for file in files:
_, ext = os.path.splitext(file)
if ext in extension_to_language:
full_path = os.path.join(root, file)
code_files.append(full_path)
return code_files
def analyze_code_with_agents(
source_code: str,
analysis_agents: List[ConversableAgent]
) -> Dict[str, str]:
"""
Analyzes source code using multiple code analysis agents.
Args:
source_code (str): The code to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
Returns:
Dict[str, str]: A dictionary mapping each agent's name to its analysis result
"""
results = {}
for agent in analysis_agents:
agent_response = agent.generate_reply(messages=[{'role': 'user', 'content': source_code}])
results[agent.name] = agent_response['content']
return results
def analyze_code_file(
file_path: str,
analysis_agents: List[ConversableAgent]
) -> Optional[Dict[str, str]]:
"""
Analyzes a single code file using multiple analysis agents.
Args:
file_path (str): Path to the code file to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
Returns:
Optional[Dict[str, str]]: Analysis results if successful, None if an error occurred
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
code = file.read()
analysis_results = analyze_code_with_agents(code, analysis_agents)
return analysis_results
except UnicodeDecodeError:
print(f"Skipping binary file {file_path}")
return None
except PermissionError:
print(f"Permission denied for file {file_path}")
return None
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
return None
def analyze_directory_with_agents(
directory_path: str,
analysis_agents: List[ConversableAgent],
time_between_analysis: int = 10,
verbose: bool = False,
callback_func: Callable[[int], None] = None
) -> Dict[str, Dict[str, str]]:
"""
Analyzes all code files in a directory using multiple analysis agents.
Args:
directory_path (str): Path to the directory to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
verbose (bool): Whether to print verbose output (default: False)
Returns:
Dict[str, Dict[str, str]]: A nested dictionary where:
- Outer keys are file paths
- Inner keys are agent names
- Values are agent analysis results
"""
if verbose:
print(f"Started Analyzing using these agents {[agent.name for agent in analysis_agents]} [+]")
analysis_results = {}
# Get all supported code files in the directory
code_files = get_supported_code_files(directory_path)
# Progress Value
progress_value = int( 100 / (len(code_files) + 1))
# Process each code file
for idx, file_path in enumerate(code_files):
if callback_func:
callback_func(idx, file_path, progress_value)
try:
agents_names = [agent.name for agent in analysis_agents]
file_language = detect_language_by_extension(file_path)
file_analysis = analyze_code_file(file_path, analysis_agents)
if file_analysis is not None:
analysis_results[file_path] = {
"language": file_language,
"analysis_results": file_analysis
}
if verbose:
print(f"Analyzed {file_path} Successfully using agents {agents_names} [+]")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
continue
time.sleep(time_between_analysis)
if verbose:
print(f"Analyzed {len(code_files)} files in {directory_path}")
return analysis_results |