Spaces:
Sleeping
Sleeping
File size: 14,445 Bytes
646ba30 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | from typing import Dict, Any, List, Optional
import time
import requests
import os
import re
from .llm_clients import LLMClientManager, LLMResponse
from .prompts import get_code_analysis_prompt, get_comparison_prompt, get_github_analysis_prompt
from .utils import detect_language, parse_analysis_result
class CodeAnalyzer:
"""Main code analysis engine with GitHub integration."""
def __init__(self):
self.llm_manager = LLMClientManager()
self.available_models = self.llm_manager.get_available_models()
def analyze_code(self, code: str, model: str, language: Optional[str] = None) -> Dict[str, Any]:
"""Analyze code using a specific model with focused output."""
start_time = time.time()
# Detect language if not provided
if not language:
language = detect_language(code)
# Generate focused prompt
prompt = get_code_analysis_prompt(code, language)
# Query LLM
response = self.llm_manager.query(model, prompt)
# Process response
if response.success:
analysis = parse_analysis_result(response.content)
analysis['raw_response'] = response.content
else:
analysis = {
'error': response.error,
'quality_score': 0,
'summary': f"Analysis failed: {response.error}",
'bugs': [],
'quality_issues': [],
'security_vulnerabilities': [],
'quick_fixes': [],
# Legacy fields
'strengths': [],
'issues': [],
'suggestions': [],
'security_concerns': [],
'performance_notes': []
}
# Add metadata
analysis['model'] = response.model
analysis['language'] = language
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['code_length'] = len(code)
analysis['line_count'] = len(code.splitlines())
return analysis
def analyze_github_repo(self, repo_url: str, model: str = None) -> Dict[str, Any]:
"""Analyze a GitHub repository."""
start_time = time.time()
# Use first available model if none specified
if not model or model not in self.available_models:
model = list(self.available_models.keys())[0]
try:
# Parse GitHub URL
if not repo_url.startswith('https://github.com/'):
return {'error': 'Please provide a valid GitHub repository URL'}
# Extract owner and repo
parts = repo_url.replace('https://github.com/', '').split('/')
if len(parts) < 2:
return {'error': 'Invalid GitHub repository URL format'}
owner, repo = parts[0], parts[1]
# Get repository structure and key files
repo_data = self._fetch_github_repo_data(owner, repo)
if 'error' in repo_data:
return repo_data
# Generate analysis prompt
prompt = get_github_analysis_prompt(
repo_data['structure'],
repo_data['main_files']
)
# Query LLM
response = self.llm_manager.query(model, prompt)
if response.success:
analysis = self._parse_github_analysis(response.content)
analysis['raw_response'] = response.content
analysis['repository_info'] = repo_data['info']
else:
analysis = {
'error': response.error,
'project_overview': f"Analysis failed: {response.error}",
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
# Add metadata
analysis['model'] = response.model
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['repo_url'] = repo_url
return analysis
except Exception as e:
return {
'error': f"GitHub analysis failed: {str(e)}",
'execution_time': round(time.time() - start_time, 2)
}
def _fetch_github_repo_data(self, owner: str, repo: str) -> Dict[str, Any]:
"""Fetch repository data from GitHub API."""
try:
# GitHub API endpoints
api_base = f"https://api.github.com/repos/{owner}/{repo}"
# Get repository info
headers = {}
if os.getenv('GITHUB_TOKEN'):
headers['Authorization'] = f"token {os.getenv('GITHUB_TOKEN')}"
repo_response = requests.get(api_base, headers=headers)
if repo_response.status_code != 200:
return {'error': f'Repository not found or private: {owner}/{repo}'}
repo_info = repo_response.json()
# Get file tree
tree_response = requests.get(f"{api_base}/git/trees/main?recursive=1", headers=headers)
if tree_response.status_code != 200:
# Try master branch
tree_response = requests.get(f"{api_base}/git/trees/master?recursive=1", headers=headers)
if tree_response.status_code != 200:
return {'error': 'Could not fetch repository structure'}
tree_data = tree_response.json()
# Build structure and get key files
structure = self._build_repo_structure(tree_data['tree'])
main_files = self._get_key_files(owner, repo, tree_data['tree'], headers)
return {
'info': {
'name': repo_info['name'],
'description': repo_info.get('description', 'No description'),
'language': repo_info.get('language', 'Unknown'),
'stars': repo_info.get('stargazers_count', 0),
'forks': repo_info.get('forks_count', 0),
'size': repo_info.get('size', 0)
},
'structure': structure,
'main_files': main_files
}
except Exception as e:
return {'error': f'Failed to fetch repository data: {str(e)}'}
def _build_repo_structure(self, tree: List[Dict]) -> str:
"""Build a readable repository structure."""
structure_lines = []
dirs = set()
for item in tree[:50]: # Limit to first 50 items
if item['type'] == 'tree':
dirs.add(item['path'])
else:
structure_lines.append(f"📄 {item['path']}")
for dir_path in sorted(dirs):
structure_lines.append(f"📁 {dir_path}/")
return '\n'.join(structure_lines[:30]) # Limit output
def _get_key_files(self, owner: str, repo: str, tree: List[Dict], headers: Dict) -> str:
"""Get content of key files like README, main source files."""
key_files = []
# Priority files to analyze
priority_patterns = [
'README.md', 'readme.md', 'README.txt',
'package.json', 'requirements.txt', 'Cargo.toml', 'go.mod',
'main.py', 'index.js', 'main.js', 'app.py', 'server.js'
]
for item in tree:
if item['type'] == 'blob':
filename = item['path'].split('/')[-1]
# Check if it's a priority file
if filename in priority_patterns or any(
pattern in filename.lower() for pattern in ['main', 'index', 'app']
):
try:
file_response = requests.get(
f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}",
headers=headers
)
if file_response.status_code == 200:
file_data = file_response.json()
if file_data.get('encoding') == 'base64':
import base64
content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore')
key_files.append(f"\n--- {item['path']} ---\n{content[:1000]}") # First 1000 chars
except:
continue
if len(key_files) >= 5: # Limit to 5 key files
break
return '\n'.join(key_files)
def _parse_github_analysis(self, text: str) -> Dict[str, Any]:
"""Parse GitHub repository analysis results."""
result = {
'project_overview': '',
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
sections = {
'project_overview': r'(?:PROJECT_OVERVIEW|project\s+overview)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'architecture_quality': r'(?:ARCHITECTURE_QUALITY|architecture|structure)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'critical_issues': r'(?:CRITICAL_ISSUES|critical|major\s+issue)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'improvement_priorities': r'(?:IMPROVEMENT_PRIORITIES|improvement|priorit)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)'
}
for key, pattern in sections.items():
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
content = match.group(1).strip()
if key == 'project_overview':
# Clean project overview
clean_overview = content.split('\n')[0].strip()
clean_overview = re.sub(r'#+\s*', '', clean_overview) # Remove ### symbols
clean_overview = re.sub(r'^\*+\s*', '', clean_overview) # Remove ** symbols
result[key] = clean_overview
else:
# Extract and clean bullet points
items = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line and not line.lower() in ['none', 'none found']:
# Clean up markdown symbols and extra characters
line = re.sub(r'#+\s*', '', line) # Remove ### symbols
line = re.sub(r'^\*+\s*', '', line) # Remove ** symbols
line = re.sub(r'^[-•*]\s*', '', line) # Remove bullet markers
line = re.sub(r'^[:\-\s]*', '', line) # Remove colons and dashes
if len(line) > 10: # Only include substantial content
items.append(line)
# If no structured items found, try to extract sentences
if not items and content.strip():
sentences = re.split(r'[.!?]+', content)
for sentence in sentences:
clean_sentence = sentence.strip()
clean_sentence = re.sub(r'#+\s*', '', clean_sentence) # Remove ### symbols
clean_sentence = re.sub(r'^\*+\s*', '', clean_sentence) # Remove ** symbols
if clean_sentence and len(clean_sentence) > 15:
items.append(clean_sentence)
result[key] = items[:4] # Limit to 4 items per section
return result
def analyze_with_all_models(self, code: str, language: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""Analyze code using all available models."""
results = {}
for model_key in self.available_models:
results[model_key] = self.analyze_code(code, model_key, language)
return results
def compare_analyses(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Compare results from different models with focus on critical issues."""
comparison = {
'average_score': 0,
'consensus_bugs': [],
'consensus_security': [],
'model_scores': {},
'best_model': None,
'analysis_time': sum(r['execution_time'] for r in results.values())
}
# Calculate average score and find best model
scores = []
for model, result in results.items():
if 'error' not in result:
score = result['quality_score']
scores.append(score)
comparison['model_scores'][model] = score
if scores:
comparison['average_score'] = round(sum(scores) / len(scores), 1)
best_model = max(comparison['model_scores'].items(), key=lambda x: x[1])
comparison['best_model'] = best_model[0]
# Find consensus on critical issues
all_bugs = []
all_security = []
for result in results.values():
if 'error' not in result:
all_bugs.extend(result.get('bugs', []))
all_security.extend(result.get('security_vulnerabilities', []))
# Simple consensus: issues mentioned by multiple models
def find_consensus(items):
consensus = []
for item in items:
if any(item.lower() in other.lower() or other.lower() in item.lower()
for other in items if other != item):
if item not in consensus:
consensus.append(item)
return consensus[:3] # Top 3 consensus items
comparison['consensus_bugs'] = find_consensus(all_bugs)
comparison['consensus_security'] = find_consensus(all_security)
return comparison |