ai-code-analyzer / analyzer /code_analyzer.py
arun3676
Initial commit: LLM Code Analyzer with multi-model support
646ba30
raw
history blame
14.4 kB
from typing import Dict, Any, List, Optional
import time
import requests
import os
import re
from .llm_clients import LLMClientManager, LLMResponse
from .prompts import get_code_analysis_prompt, get_comparison_prompt, get_github_analysis_prompt
from .utils import detect_language, parse_analysis_result
class CodeAnalyzer:
"""Main code analysis engine with GitHub integration."""
def __init__(self):
self.llm_manager = LLMClientManager()
self.available_models = self.llm_manager.get_available_models()
def analyze_code(self, code: str, model: str, language: Optional[str] = None) -> Dict[str, Any]:
"""Analyze code using a specific model with focused output."""
start_time = time.time()
# Detect language if not provided
if not language:
language = detect_language(code)
# Generate focused prompt
prompt = get_code_analysis_prompt(code, language)
# Query LLM
response = self.llm_manager.query(model, prompt)
# Process response
if response.success:
analysis = parse_analysis_result(response.content)
analysis['raw_response'] = response.content
else:
analysis = {
'error': response.error,
'quality_score': 0,
'summary': f"Analysis failed: {response.error}",
'bugs': [],
'quality_issues': [],
'security_vulnerabilities': [],
'quick_fixes': [],
# Legacy fields
'strengths': [],
'issues': [],
'suggestions': [],
'security_concerns': [],
'performance_notes': []
}
# Add metadata
analysis['model'] = response.model
analysis['language'] = language
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['code_length'] = len(code)
analysis['line_count'] = len(code.splitlines())
return analysis
def analyze_github_repo(self, repo_url: str, model: str = None) -> Dict[str, Any]:
"""Analyze a GitHub repository."""
start_time = time.time()
# Use first available model if none specified
if not model or model not in self.available_models:
model = list(self.available_models.keys())[0]
try:
# Parse GitHub URL
if not repo_url.startswith('https://github.com/'):
return {'error': 'Please provide a valid GitHub repository URL'}
# Extract owner and repo
parts = repo_url.replace('https://github.com/', '').split('/')
if len(parts) < 2:
return {'error': 'Invalid GitHub repository URL format'}
owner, repo = parts[0], parts[1]
# Get repository structure and key files
repo_data = self._fetch_github_repo_data(owner, repo)
if 'error' in repo_data:
return repo_data
# Generate analysis prompt
prompt = get_github_analysis_prompt(
repo_data['structure'],
repo_data['main_files']
)
# Query LLM
response = self.llm_manager.query(model, prompt)
if response.success:
analysis = self._parse_github_analysis(response.content)
analysis['raw_response'] = response.content
analysis['repository_info'] = repo_data['info']
else:
analysis = {
'error': response.error,
'project_overview': f"Analysis failed: {response.error}",
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
# Add metadata
analysis['model'] = response.model
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['repo_url'] = repo_url
return analysis
except Exception as e:
return {
'error': f"GitHub analysis failed: {str(e)}",
'execution_time': round(time.time() - start_time, 2)
}
def _fetch_github_repo_data(self, owner: str, repo: str) -> Dict[str, Any]:
"""Fetch repository data from GitHub API."""
try:
# GitHub API endpoints
api_base = f"https://api.github.com/repos/{owner}/{repo}"
# Get repository info
headers = {}
if os.getenv('GITHUB_TOKEN'):
headers['Authorization'] = f"token {os.getenv('GITHUB_TOKEN')}"
repo_response = requests.get(api_base, headers=headers)
if repo_response.status_code != 200:
return {'error': f'Repository not found or private: {owner}/{repo}'}
repo_info = repo_response.json()
# Get file tree
tree_response = requests.get(f"{api_base}/git/trees/main?recursive=1", headers=headers)
if tree_response.status_code != 200:
# Try master branch
tree_response = requests.get(f"{api_base}/git/trees/master?recursive=1", headers=headers)
if tree_response.status_code != 200:
return {'error': 'Could not fetch repository structure'}
tree_data = tree_response.json()
# Build structure and get key files
structure = self._build_repo_structure(tree_data['tree'])
main_files = self._get_key_files(owner, repo, tree_data['tree'], headers)
return {
'info': {
'name': repo_info['name'],
'description': repo_info.get('description', 'No description'),
'language': repo_info.get('language', 'Unknown'),
'stars': repo_info.get('stargazers_count', 0),
'forks': repo_info.get('forks_count', 0),
'size': repo_info.get('size', 0)
},
'structure': structure,
'main_files': main_files
}
except Exception as e:
return {'error': f'Failed to fetch repository data: {str(e)}'}
def _build_repo_structure(self, tree: List[Dict]) -> str:
"""Build a readable repository structure."""
structure_lines = []
dirs = set()
for item in tree[:50]: # Limit to first 50 items
if item['type'] == 'tree':
dirs.add(item['path'])
else:
structure_lines.append(f"📄 {item['path']}")
for dir_path in sorted(dirs):
structure_lines.append(f"📁 {dir_path}/")
return '\n'.join(structure_lines[:30]) # Limit output
def _get_key_files(self, owner: str, repo: str, tree: List[Dict], headers: Dict) -> str:
"""Get content of key files like README, main source files."""
key_files = []
# Priority files to analyze
priority_patterns = [
'README.md', 'readme.md', 'README.txt',
'package.json', 'requirements.txt', 'Cargo.toml', 'go.mod',
'main.py', 'index.js', 'main.js', 'app.py', 'server.js'
]
for item in tree:
if item['type'] == 'blob':
filename = item['path'].split('/')[-1]
# Check if it's a priority file
if filename in priority_patterns or any(
pattern in filename.lower() for pattern in ['main', 'index', 'app']
):
try:
file_response = requests.get(
f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}",
headers=headers
)
if file_response.status_code == 200:
file_data = file_response.json()
if file_data.get('encoding') == 'base64':
import base64
content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore')
key_files.append(f"\n--- {item['path']} ---\n{content[:1000]}") # First 1000 chars
except:
continue
if len(key_files) >= 5: # Limit to 5 key files
break
return '\n'.join(key_files)
def _parse_github_analysis(self, text: str) -> Dict[str, Any]:
"""Parse GitHub repository analysis results."""
result = {
'project_overview': '',
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
sections = {
'project_overview': r'(?:PROJECT_OVERVIEW|project\s+overview)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'architecture_quality': r'(?:ARCHITECTURE_QUALITY|architecture|structure)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'critical_issues': r'(?:CRITICAL_ISSUES|critical|major\s+issue)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'improvement_priorities': r'(?:IMPROVEMENT_PRIORITIES|improvement|priorit)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)'
}
for key, pattern in sections.items():
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
content = match.group(1).strip()
if key == 'project_overview':
# Clean project overview
clean_overview = content.split('\n')[0].strip()
clean_overview = re.sub(r'#+\s*', '', clean_overview) # Remove ### symbols
clean_overview = re.sub(r'^\*+\s*', '', clean_overview) # Remove ** symbols
result[key] = clean_overview
else:
# Extract and clean bullet points
items = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line and not line.lower() in ['none', 'none found']:
# Clean up markdown symbols and extra characters
line = re.sub(r'#+\s*', '', line) # Remove ### symbols
line = re.sub(r'^\*+\s*', '', line) # Remove ** symbols
line = re.sub(r'^[-•*]\s*', '', line) # Remove bullet markers
line = re.sub(r'^[:\-\s]*', '', line) # Remove colons and dashes
if len(line) > 10: # Only include substantial content
items.append(line)
# If no structured items found, try to extract sentences
if not items and content.strip():
sentences = re.split(r'[.!?]+', content)
for sentence in sentences:
clean_sentence = sentence.strip()
clean_sentence = re.sub(r'#+\s*', '', clean_sentence) # Remove ### symbols
clean_sentence = re.sub(r'^\*+\s*', '', clean_sentence) # Remove ** symbols
if clean_sentence and len(clean_sentence) > 15:
items.append(clean_sentence)
result[key] = items[:4] # Limit to 4 items per section
return result
def analyze_with_all_models(self, code: str, language: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""Analyze code using all available models."""
results = {}
for model_key in self.available_models:
results[model_key] = self.analyze_code(code, model_key, language)
return results
def compare_analyses(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Compare results from different models with focus on critical issues."""
comparison = {
'average_score': 0,
'consensus_bugs': [],
'consensus_security': [],
'model_scores': {},
'best_model': None,
'analysis_time': sum(r['execution_time'] for r in results.values())
}
# Calculate average score and find best model
scores = []
for model, result in results.items():
if 'error' not in result:
score = result['quality_score']
scores.append(score)
comparison['model_scores'][model] = score
if scores:
comparison['average_score'] = round(sum(scores) / len(scores), 1)
best_model = max(comparison['model_scores'].items(), key=lambda x: x[1])
comparison['best_model'] = best_model[0]
# Find consensus on critical issues
all_bugs = []
all_security = []
for result in results.values():
if 'error' not in result:
all_bugs.extend(result.get('bugs', []))
all_security.extend(result.get('security_vulnerabilities', []))
# Simple consensus: issues mentioned by multiple models
def find_consensus(items):
consensus = []
for item in items:
if any(item.lower() in other.lower() or other.lower() in item.lower()
for other in items if other != item):
if item not in consensus:
consensus.append(item)
return consensus[:3] # Top 3 consensus items
comparison['consensus_bugs'] = find_consensus(all_bugs)
comparison['consensus_security'] = find_consensus(all_security)
return comparison