Spaces:
Sleeping
Sleeping
File size: 19,333 Bytes
b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 b7db63d 646ba30 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 | from typing import Dict, Any, List, Optional, Generator, Literal
import time
import requests
import os
import re
import hashlib
import json
from tqdm import tqdm
from .llm_clients import LLMClientManager, LLMResponse
from .prompts import get_code_analysis_prompt, get_comparison_prompt, get_github_analysis_prompt
from .utils import detect_language, parse_analysis_result
ModelType = Literal["codet5", "deepseek-finetuned", "deepseek-finetuned-remote"]
class CodeAnalyzer:
"""Main code analysis engine with support for APIs, local models, and GitHub integration."""
def __init__(self, cache_dir: str = None, precision: str = "fp16"):
# API-based models
self.llm_manager = LLMClientManager()
self.available_models = self.llm_manager.get_available_models()
# Local/Remote Hugging Face models
self.model_type: Optional[ModelType] = None
self.model_id: Optional[str] = None
self.adapter_path: Optional[str] = None
self.remote_api_url: Optional[str] = None
self.cache_dir = cache_dir
self.precision = precision.lower().strip()
self.cache = {}
if cache_dir is not None:
os.makedirs(cache_dir, exist_ok=True)
self._load_cache()
def _get_cache_key(self, code: str) -> str:
"""Generate a unique cache key for a piece of code and model type."""
combined = f"{self.model_type}:{self.model_id}:{code}"
return hashlib.md5(combined.encode()).hexdigest()
def _load_cache(self):
"""Load analysis cache from disk if available."""
if self.cache_dir is None:
self.cache = {}
return
cache_file = os.path.join(self.cache_dir, "analysis_cache.json")
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
self.cache = json.load(f)
print(f"📁 Loaded {len(self.cache)} cached analyses")
except (json.JSONDecodeError, IOError):
self.cache = {}
def _save_cache(self):
"""Save the analysis cache to disk."""
if self.cache_dir is None:
return
cache_file = os.path.join(self.cache_dir, "analysis_cache.json")
with open(cache_file, 'w') as f:
json.dump(self.cache, f)
def _check_cache(self, code: str) -> Optional[Dict[str, Any]]:
"""Check if an analysis for the given code is in the cache."""
cache_key = self._get_cache_key(code)
return self.cache.get(cache_key)
def _save_to_cache(self, code: str, result: Dict[str, Any]):
"""Save an analysis result to the cache."""
cache_key = self._get_cache_key(code)
self.cache[cache_key] = result
self._save_cache()
def analyze_code(
self,
code: str,
model: str,
language: Optional[str] = None,
max_tokens: int = 1024,
) -> Dict[str, Any]:
"""
Analyze code using a specified LLM provider.
This is a non-streaming, direct analysis method.
Language detection is now handled by AI for accuracy.
"""
# Skip local language detection - let AI handle it for accuracy
if language is None:
language = "auto-detect" # Let AI detect it
prompt = get_code_analysis_prompt(code, language, model)
start_time = time.time()
response = self.llm_manager.query(model, prompt)
total_time = time.time() - start_time
if response.success:
structured_data = parse_analysis_result(response.content, model)
# Use AI-detected language if available, otherwise fallback to auto
detected_lang = structured_data.get('detected_language')
if detected_lang:
language = detected_lang.upper()
else:
# Fallback to LLM-based detection if not in response
language = detect_language(code).upper()
result = {
"raw_response": response.content,
"quality_score": structured_data.get('quality_score', 0),
"execution_time": total_time,
"model": response.model,
"cached": False,
**structured_data,
"language": language,
"line_count": len(code.splitlines()),
}
else:
result = {'error': response.error}
return result
def analyze_code_remote(self, code: str, max_tokens: int = 300) -> Dict[str, Any]: # Increased token limit
"""Analyze code using a remote Hugging Face Space API."""
if not self.remote_api_url:
return {'error': 'Remote API URL is not configured.'}
cached_result = self._check_cache(code)
if cached_result:
cached_result["cached"] = True
return cached_result
start_time = time.time()
try:
# First, try FastAPI endpoint /analyze
response = requests.post(
f"{self.remote_api_url}/analyze",
json={"code": code, "max_tokens": max_tokens},
timeout=60
)
response.raise_for_status()
data = response.json()
# Assuming the remote API returns a structured response
total_time = time.time() - start_time
result = {
"raw_response": data.get("analysis", str(data)),
"quality_score": data.get("quality_score", 0),
"execution_time": total_time,
"model": data.get("model", "remote-deepseek"),
"cached": False,
"bugs": data.get("bugs", []),
"security_vulnerabilities": data.get("security_vulnerabilities", []),
"quality_issues": data.get("quality_issues", []),
"quick_fixes": data.get("quick_fixes", []),
"language": data.get("language", detect_language(code)),
"line_count": data.get("line_count", len(code.splitlines())),
}
self._save_to_cache(code, result)
return result
except requests.exceptions.RequestException as e:
# Fallback for Gradio or other errors
return {'error': f"Remote analysis failed: {e}"}
def analyze_github_repo(self, repo_url: str, model: str = None) -> Dict[str, Any]:
"""Analyze a GitHub repository."""
start_time = time.time()
# Use first available model if none specified
if not model or model not in self.available_models:
model = list(self.available_models.keys())[0]
try:
# Parse GitHub URL
if not repo_url.startswith('https://github.com/'):
return {'error': 'Please provide a valid GitHub repository URL'}
# Extract owner and repo
parts = repo_url.replace('https://github.com/', '').split('/')
if len(parts) < 2:
return {'error': 'Invalid GitHub repository URL format'}
owner, repo = parts[0], parts[1]
# Get repository structure and key files
repo_data = self._fetch_github_repo_data(owner, repo)
if 'error' in repo_data:
return repo_data
# Generate analysis prompt
prompt = get_github_analysis_prompt(
repo_data['structure'],
repo_data['main_files']
)
# Query LLM
response = self.llm_manager.query(model, prompt)
if response.success:
analysis = self._parse_github_analysis(response.content)
analysis['raw_response'] = response.content
analysis['repository_info'] = repo_data['info']
else:
analysis = {
'error': response.error,
'project_overview': f"Analysis failed: {response.error}",
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
# Add metadata
analysis['model'] = response.model
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['repo_url'] = repo_url
return analysis
except Exception as e:
return {
'error': f"GitHub analysis failed: {str(e)}",
'execution_time': round(time.time() - start_time, 2)
}
def _fetch_github_repo_data(self, owner: str, repo: str) -> Dict[str, Any]:
"""Fetch repository data from GitHub API."""
try:
# GitHub API endpoints
api_base = f"https://api.github.com/repos/{owner}/{repo}"
# Get repository info
headers = {}
if os.getenv('GITHUB_TOKEN'):
headers['Authorization'] = f"token {os.getenv('GITHUB_TOKEN')}"
repo_response = requests.get(api_base, headers=headers)
if repo_response.status_code != 200:
return {'error': f'Repository not found or private: {owner}/{repo}'}
repo_info = repo_response.json()
# Get file tree
tree_response = requests.get(f"{api_base}/git/trees/main?recursive=1", headers=headers)
if tree_response.status_code != 200:
# Try master branch
tree_response = requests.get(f"{api_base}/git/trees/master?recursive=1", headers=headers)
if tree_response.status_code != 200:
return {'error': 'Could not fetch repository structure'}
tree_data = tree_response.json()
# Build structure and get key files
structure = self._build_repo_structure(tree_data['tree'])
main_files = self._get_key_files(owner, repo, tree_data['tree'], headers)
return {
'info': {
'name': repo_info['name'],
'description': repo_info.get('description', 'No description'),
'language': repo_info.get('language', 'Unknown'),
'stars': repo_info.get('stargazers_count', 0),
'forks': repo_info.get('forks_count', 0),
'size': repo_info.get('size', 0)
},
'structure': structure,
'main_files': main_files
}
except Exception as e:
return {'error': f'Failed to fetch repository data: {str(e)}'}
def _build_repo_structure(self, tree: List[Dict]) -> str:
"""Build a readable repository structure."""
structure_lines = []
dirs = set()
for item in tree[:50]: # Limit to first 50 items
if item['type'] == 'tree':
dirs.add(item['path'])
else:
structure_lines.append(f"📄 {item['path']}")
for dir_path in sorted(dirs):
structure_lines.append(f"📁 {dir_path}/")
return '\n'.join(structure_lines[:30]) # Limit output
def _get_key_files(self, owner: str, repo: str, tree: List[Dict], headers: Dict) -> str:
"""Get content of key files like README, main source files."""
key_files = []
# Priority files to analyze
priority_patterns = [
'README.md', 'readme.md', 'README.txt',
'package.json', 'requirements.txt', 'Cargo.toml', 'go.mod',
'main.py', 'index.js', 'main.js', 'app.py', 'server.js'
]
for item in tree:
if item['type'] == 'blob':
filename = item['path'].split('/')[-1]
# Check if it's a priority file
if filename in priority_patterns or any(
pattern in filename.lower() for pattern in ['main', 'index', 'app']
):
try:
file_response = requests.get(
f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}",
headers=headers
)
if file_response.status_code == 200:
file_data = file_response.json()
if file_data.get('encoding') == 'base64':
import base64
content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore')
key_files.append(f"\n--- {item['path']} ---\n{content[:1000]}") # First 1000 chars
except:
continue
if len(key_files) >= 5: # Limit to 5 key files
break
return '\n'.join(key_files)
def _parse_github_analysis(self, text: str) -> Dict[str, Any]:
"""Parse GitHub repository analysis results."""
result = {
'project_overview': '',
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': [],
'onboarding_guide': [],
'tech_stack_rationale': [],
'api_endpoint_summary': [],
}
sections = {
'project_overview': r'(?:PROJECT_OVERVIEW|project\s+overview)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'architecture_quality': r'(?:ARCHITECTURE_QUALITY|architecture|structure)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'critical_issues': r'(?:CRITICAL_ISSUES|critical|major\s+issue)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'improvement_priorities': r'(?:IMPROVEMENT_PRIORITIES|improvement|priorit)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'onboarding_guide': r'(?:ONBOARDING_GUIDE|onboarding|setup)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'tech_stack_rationale': r'(?:TECH_STACK_RATIONALE|tech\s+stack|stack\s+rationale)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'api_endpoint_summary': r'(?:API_ENDPOINT_SUMMARY|api\s+endpoint|endpoints)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
}
for key, pattern in sections.items():
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
content = match.group(1).strip()
if key == 'project_overview':
# Clean project overview
clean_overview = content.split('\n')[0].strip()
clean_overview = re.sub(r'#+\s*', '', clean_overview) # Remove ### symbols
clean_overview = re.sub(r'^\*+\s*', '', clean_overview) # Remove ** symbols
result[key] = clean_overview
else:
# Extract and clean bullet points
items = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line and not line.lower() in ['none', 'none found']:
# Clean up markdown symbols and extra characters
line = re.sub(r'#+\s*', '', line) # Remove ### symbols
line = re.sub(r'^\*+\s*', '', line) # Remove ** symbols
line = re.sub(r'^[-•*]\s*', '', line) # Remove bullet markers
line = re.sub(r'^[:\-\s]*', '', line) # Remove colons and dashes
if len(line) > 10: # Only include substantial content
items.append(line)
# If no structured items found, try to extract sentences
if not items and content.strip():
sentences = re.split(r'[.!?]+', content)
for sentence in sentences:
clean_sentence = sentence.strip()
clean_sentence = re.sub(r'#+\s*', '', clean_sentence) # Remove ### symbols
clean_sentence = re.sub(r'^\*+\s*', '', clean_sentence) # Remove ** symbols
if clean_sentence and len(clean_sentence) > 15:
items.append(clean_sentence)
result[key] = items[:4] # Limit to 4 items per section
return result
def analyze_with_all_models(self, code: str, language: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""Analyze code using all available models."""
results = {}
for model_key in self.available_models:
results[model_key] = self.analyze_code(code, model_key, language)
return results
def compare_analyses(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Compare results from different models with focus on critical issues."""
comparison = {
'average_score': 0,
'consensus_bugs': [],
'consensus_security': [],
'model_scores': {},
'best_model': None,
'analysis_time': sum(r['execution_time'] for r in results.values())
}
# Calculate average score and find best model
scores = []
for model, result in results.items():
if 'error' not in result:
score = result['quality_score']
scores.append(score)
comparison['model_scores'][model] = score
if scores:
comparison['average_score'] = round(sum(scores) / len(scores), 1)
best_model = max(comparison['model_scores'].items(), key=lambda x: x[1])
comparison['best_model'] = best_model[0]
# Find consensus on critical issues
all_bugs = []
all_security = []
for result in results.values():
if 'error' not in result:
all_bugs.extend(result.get('bugs', []))
all_security.extend(result.get('security_vulnerabilities', []))
# Simple consensus: issues mentioned by multiple models
def find_consensus(items):
consensus = []
for item in items:
if any(item.lower() in other.lower() or other.lower() in item.lower()
for other in items if other != item):
if item not in consensus:
consensus.append(item)
return consensus[:3] # Top 3 consensus items
comparison['consensus_bugs'] = find_consensus(all_bugs)
comparison['consensus_security'] = find_consensus(all_security)
return comparison |