Scodey / utils /utils.py
DeepActionPotential's picture
Upload folder using huggingface_hub
5249394 verified
raw
history blame
6.78 kB
from autogen import ConversableAgent
import os
from typing import List, Dict, Optional, Callable
import time
import streamlit as st
from config import DefaultSettings, GeminiLLMConfig, OpenAILLMConfig, extension_to_language
def load_css(file_path):
"""Load and inject CSS styles from file."""
try:
with open(file_path, "r") as f:
css = f.read()
st.markdown(f"<style>{css}</style>", unsafe_allow_html=True)
except Exception as e:
st.error(f"Failed to load CSS: {str(e)}")
def load_config(
model_type: str) -> dict:
"""
Load the configuration for a given model type.
Args:
- model_type (str): The type of model to load the config for. Supported options are 'google' and 'openai'.
Returns:
- dict: A dictionary containing the configuration for the specified model type.
"""
if model_type == 'google':
return GeminiLLMConfig().get_llm_config()
elif model_type == 'openai':
return OpenAILLMConfig().get_llm_config()
else:
raise ValueError(f"Unsupported model type: {model_type}")
def get_directory_tree(
path: str,
indent: str = "",
exclude_folder: str = "__pycache__"
) -> str:
"""
Returns a tree-like string representation of the directory at 'path', skipping excluded folders.
"""
result = ""
for item in os.listdir(path):
if item == exclude_folder:
continue
full_path = os.path.join(path, item)
if os.path.isdir(full_path):
result += f"{indent}📁 {item}/\n"
result += get_directory_tree(full_path, indent + " ", exclude_folder)
else:
result += f"{indent}📄 {item}\n"
return result
def detect_language_by_extension(file_path: str) -> str:
"""
Detect the language of a file by its extension.
Args:
file_path (str): The path to the file to detect.
Returns:
str: The detected language.
"""
_, ext = os.path.splitext(file_path.lower())
return extension_to_language.get(ext, 'Unknown')
def get_supported_code_files(directory_path: str) -> List[str]:
"""
Scans a directory for files with supported programming language extensions.
Args:
directory_path (str): Path to the directory to scan
Returns:
List[str]: List of absolute paths to supported code files
"""
code_files = []
for root, _, files in os.walk(directory_path):
for file in files:
_, ext = os.path.splitext(file)
if ext in extension_to_language:
full_path = os.path.join(root, file)
code_files.append(full_path)
return code_files
def analyze_code_with_agents(
source_code: str,
analysis_agents: List[ConversableAgent]
) -> Dict[str, str]:
"""
Analyzes source code using multiple code analysis agents.
Args:
source_code (str): The code to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
Returns:
Dict[str, str]: A dictionary mapping each agent's name to its analysis result
"""
results = {}
for agent in analysis_agents:
agent_response = agent.generate_reply(messages=[{'role': 'user', 'content': source_code}])
results[agent.name] = agent_response['content']
return results
def analyze_code_file(
file_path: str,
analysis_agents: List[ConversableAgent]
) -> Optional[Dict[str, str]]:
"""
Analyzes a single code file using multiple analysis agents.
Args:
file_path (str): Path to the code file to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
Returns:
Optional[Dict[str, str]]: Analysis results if successful, None if an error occurred
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
code = file.read()
analysis_results = analyze_code_with_agents(code, analysis_agents)
return analysis_results
except UnicodeDecodeError:
print(f"Skipping binary file {file_path}")
return None
except PermissionError:
print(f"Permission denied for file {file_path}")
return None
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
return None
def analyze_directory_with_agents(
directory_path: str,
analysis_agents: List[ConversableAgent],
time_between_analysis: int = 10,
verbose: bool = False,
callback_func: Callable[[int], None] = None
) -> Dict[str, Dict[str, str]]:
"""
Analyzes all code files in a directory using multiple analysis agents.
Args:
directory_path (str): Path to the directory to analyze
analysis_agents (List[ConversableAgent]): List of agents to perform analysis
verbose (bool): Whether to print verbose output (default: False)
Returns:
Dict[str, Dict[str, str]]: A nested dictionary where:
- Outer keys are file paths
- Inner keys are agent names
- Values are agent analysis results
"""
if verbose:
print(f"Started Analyzing using these agents {[agent.name for agent in analysis_agents]} [+]")
analysis_results = {}
# Get all supported code files in the directory
code_files = get_supported_code_files(directory_path)
# Progress Value
progress_value = int( 100 / (len(code_files) + 1))
# Process each code file
for idx, file_path in enumerate(code_files):
if callback_func:
callback_func(idx, file_path, progress_value)
try:
agents_names = [agent.name for agent in analysis_agents]
file_language = detect_language_by_extension(file_path)
file_analysis = analyze_code_file(file_path, analysis_agents)
if file_analysis is not None:
analysis_results[file_path] = {
"language": file_language,
"analysis_results": file_analysis
}
if verbose:
print(f"Analyzed {file_path} Successfully using agents {agents_names} [+]")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
continue
time.sleep(time_between_analysis)
if verbose:
print(f"Analyzed {len(code_files)} files in {directory_path}")
return analysis_results