ai-code-analyzer / analyzer /code_analyzer.py
arun3676
Configure for HuggingFace Spaces Docker deployment - Add Dockerfile, .dockerignore, update README with HF metadata, optimize requirements.txt
b7db63d
from typing import Dict, Any, List, Optional, Generator, Literal
import time
import requests
import os
import re
import hashlib
import json
from tqdm import tqdm
from .llm_clients import LLMClientManager, LLMResponse
from .prompts import get_code_analysis_prompt, get_comparison_prompt, get_github_analysis_prompt
from .utils import detect_language, parse_analysis_result
ModelType = Literal["codet5", "deepseek-finetuned", "deepseek-finetuned-remote"]
class CodeAnalyzer:
"""Main code analysis engine with support for APIs, local models, and GitHub integration."""
def __init__(self, cache_dir: str = None, precision: str = "fp16"):
# API-based models
self.llm_manager = LLMClientManager()
self.available_models = self.llm_manager.get_available_models()
# Local/Remote Hugging Face models
self.model_type: Optional[ModelType] = None
self.model_id: Optional[str] = None
self.adapter_path: Optional[str] = None
self.remote_api_url: Optional[str] = None
self.cache_dir = cache_dir
self.precision = precision.lower().strip()
self.cache = {}
if cache_dir is not None:
os.makedirs(cache_dir, exist_ok=True)
self._load_cache()
def _get_cache_key(self, code: str) -> str:
"""Generate a unique cache key for a piece of code and model type."""
combined = f"{self.model_type}:{self.model_id}:{code}"
return hashlib.md5(combined.encode()).hexdigest()
def _load_cache(self):
"""Load analysis cache from disk if available."""
if self.cache_dir is None:
self.cache = {}
return
cache_file = os.path.join(self.cache_dir, "analysis_cache.json")
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
self.cache = json.load(f)
print(f"📁 Loaded {len(self.cache)} cached analyses")
except (json.JSONDecodeError, IOError):
self.cache = {}
def _save_cache(self):
"""Save the analysis cache to disk."""
if self.cache_dir is None:
return
cache_file = os.path.join(self.cache_dir, "analysis_cache.json")
with open(cache_file, 'w') as f:
json.dump(self.cache, f)
def _check_cache(self, code: str) -> Optional[Dict[str, Any]]:
"""Check if an analysis for the given code is in the cache."""
cache_key = self._get_cache_key(code)
return self.cache.get(cache_key)
def _save_to_cache(self, code: str, result: Dict[str, Any]):
"""Save an analysis result to the cache."""
cache_key = self._get_cache_key(code)
self.cache[cache_key] = result
self._save_cache()
def analyze_code(
self,
code: str,
model: str,
language: Optional[str] = None,
max_tokens: int = 1024,
) -> Dict[str, Any]:
"""
Analyze code using a specified LLM provider.
This is a non-streaming, direct analysis method.
Language detection is now handled by AI for accuracy.
"""
# Skip local language detection - let AI handle it for accuracy
if language is None:
language = "auto-detect" # Let AI detect it
prompt = get_code_analysis_prompt(code, language, model)
start_time = time.time()
response = self.llm_manager.query(model, prompt)
total_time = time.time() - start_time
if response.success:
structured_data = parse_analysis_result(response.content, model)
# Use AI-detected language if available, otherwise fallback to auto
detected_lang = structured_data.get('detected_language')
if detected_lang:
language = detected_lang.upper()
else:
# Fallback to LLM-based detection if not in response
language = detect_language(code).upper()
result = {
"raw_response": response.content,
"quality_score": structured_data.get('quality_score', 0),
"execution_time": total_time,
"model": response.model,
"cached": False,
**structured_data,
"language": language,
"line_count": len(code.splitlines()),
}
else:
result = {'error': response.error}
return result
def analyze_code_remote(self, code: str, max_tokens: int = 300) -> Dict[str, Any]: # Increased token limit
"""Analyze code using a remote Hugging Face Space API."""
if not self.remote_api_url:
return {'error': 'Remote API URL is not configured.'}
cached_result = self._check_cache(code)
if cached_result:
cached_result["cached"] = True
return cached_result
start_time = time.time()
try:
# First, try FastAPI endpoint /analyze
response = requests.post(
f"{self.remote_api_url}/analyze",
json={"code": code, "max_tokens": max_tokens},
timeout=60
)
response.raise_for_status()
data = response.json()
# Assuming the remote API returns a structured response
total_time = time.time() - start_time
result = {
"raw_response": data.get("analysis", str(data)),
"quality_score": data.get("quality_score", 0),
"execution_time": total_time,
"model": data.get("model", "remote-deepseek"),
"cached": False,
"bugs": data.get("bugs", []),
"security_vulnerabilities": data.get("security_vulnerabilities", []),
"quality_issues": data.get("quality_issues", []),
"quick_fixes": data.get("quick_fixes", []),
"language": data.get("language", detect_language(code)),
"line_count": data.get("line_count", len(code.splitlines())),
}
self._save_to_cache(code, result)
return result
except requests.exceptions.RequestException as e:
# Fallback for Gradio or other errors
return {'error': f"Remote analysis failed: {e}"}
def analyze_github_repo(self, repo_url: str, model: str = None) -> Dict[str, Any]:
"""Analyze a GitHub repository."""
start_time = time.time()
# Use first available model if none specified
if not model or model not in self.available_models:
model = list(self.available_models.keys())[0]
try:
# Parse GitHub URL
if not repo_url.startswith('https://github.com/'):
return {'error': 'Please provide a valid GitHub repository URL'}
# Extract owner and repo
parts = repo_url.replace('https://github.com/', '').split('/')
if len(parts) < 2:
return {'error': 'Invalid GitHub repository URL format'}
owner, repo = parts[0], parts[1]
# Get repository structure and key files
repo_data = self._fetch_github_repo_data(owner, repo)
if 'error' in repo_data:
return repo_data
# Generate analysis prompt
prompt = get_github_analysis_prompt(
repo_data['structure'],
repo_data['main_files']
)
# Query LLM
response = self.llm_manager.query(model, prompt)
if response.success:
analysis = self._parse_github_analysis(response.content)
analysis['raw_response'] = response.content
analysis['repository_info'] = repo_data['info']
else:
analysis = {
'error': response.error,
'project_overview': f"Analysis failed: {response.error}",
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': []
}
# Add metadata
analysis['model'] = response.model
analysis['execution_time'] = round(time.time() - start_time, 2)
analysis['repo_url'] = repo_url
return analysis
except Exception as e:
return {
'error': f"GitHub analysis failed: {str(e)}",
'execution_time': round(time.time() - start_time, 2)
}
def _fetch_github_repo_data(self, owner: str, repo: str) -> Dict[str, Any]:
"""Fetch repository data from GitHub API."""
try:
# GitHub API endpoints
api_base = f"https://api.github.com/repos/{owner}/{repo}"
# Get repository info
headers = {}
if os.getenv('GITHUB_TOKEN'):
headers['Authorization'] = f"token {os.getenv('GITHUB_TOKEN')}"
repo_response = requests.get(api_base, headers=headers)
if repo_response.status_code != 200:
return {'error': f'Repository not found or private: {owner}/{repo}'}
repo_info = repo_response.json()
# Get file tree
tree_response = requests.get(f"{api_base}/git/trees/main?recursive=1", headers=headers)
if tree_response.status_code != 200:
# Try master branch
tree_response = requests.get(f"{api_base}/git/trees/master?recursive=1", headers=headers)
if tree_response.status_code != 200:
return {'error': 'Could not fetch repository structure'}
tree_data = tree_response.json()
# Build structure and get key files
structure = self._build_repo_structure(tree_data['tree'])
main_files = self._get_key_files(owner, repo, tree_data['tree'], headers)
return {
'info': {
'name': repo_info['name'],
'description': repo_info.get('description', 'No description'),
'language': repo_info.get('language', 'Unknown'),
'stars': repo_info.get('stargazers_count', 0),
'forks': repo_info.get('forks_count', 0),
'size': repo_info.get('size', 0)
},
'structure': structure,
'main_files': main_files
}
except Exception as e:
return {'error': f'Failed to fetch repository data: {str(e)}'}
def _build_repo_structure(self, tree: List[Dict]) -> str:
"""Build a readable repository structure."""
structure_lines = []
dirs = set()
for item in tree[:50]: # Limit to first 50 items
if item['type'] == 'tree':
dirs.add(item['path'])
else:
structure_lines.append(f"📄 {item['path']}")
for dir_path in sorted(dirs):
structure_lines.append(f"📁 {dir_path}/")
return '\n'.join(structure_lines[:30]) # Limit output
def _get_key_files(self, owner: str, repo: str, tree: List[Dict], headers: Dict) -> str:
"""Get content of key files like README, main source files."""
key_files = []
# Priority files to analyze
priority_patterns = [
'README.md', 'readme.md', 'README.txt',
'package.json', 'requirements.txt', 'Cargo.toml', 'go.mod',
'main.py', 'index.js', 'main.js', 'app.py', 'server.js'
]
for item in tree:
if item['type'] == 'blob':
filename = item['path'].split('/')[-1]
# Check if it's a priority file
if filename in priority_patterns or any(
pattern in filename.lower() for pattern in ['main', 'index', 'app']
):
try:
file_response = requests.get(
f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}",
headers=headers
)
if file_response.status_code == 200:
file_data = file_response.json()
if file_data.get('encoding') == 'base64':
import base64
content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore')
key_files.append(f"\n--- {item['path']} ---\n{content[:1000]}") # First 1000 chars
except:
continue
if len(key_files) >= 5: # Limit to 5 key files
break
return '\n'.join(key_files)
def _parse_github_analysis(self, text: str) -> Dict[str, Any]:
"""Parse GitHub repository analysis results."""
result = {
'project_overview': '',
'architecture_quality': [],
'critical_issues': [],
'improvement_priorities': [],
'onboarding_guide': [],
'tech_stack_rationale': [],
'api_endpoint_summary': [],
}
sections = {
'project_overview': r'(?:PROJECT_OVERVIEW|project\s+overview)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'architecture_quality': r'(?:ARCHITECTURE_QUALITY|architecture|structure)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'critical_issues': r'(?:CRITICAL_ISSUES|critical|major\s+issue)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'improvement_priorities': r'(?:IMPROVEMENT_PRIORITIES|improvement|priorit)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'onboarding_guide': r'(?:ONBOARDING_GUIDE|onboarding|setup)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'tech_stack_rationale': r'(?:TECH_STACK_RATIONALE|tech\s+stack|stack\s+rationale)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
'api_endpoint_summary': r'(?:API_ENDPOINT_SUMMARY|api\s+endpoint|endpoints)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)',
}
for key, pattern in sections.items():
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
content = match.group(1).strip()
if key == 'project_overview':
# Clean project overview
clean_overview = content.split('\n')[0].strip()
clean_overview = re.sub(r'#+\s*', '', clean_overview) # Remove ### symbols
clean_overview = re.sub(r'^\*+\s*', '', clean_overview) # Remove ** symbols
result[key] = clean_overview
else:
# Extract and clean bullet points
items = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line and not line.lower() in ['none', 'none found']:
# Clean up markdown symbols and extra characters
line = re.sub(r'#+\s*', '', line) # Remove ### symbols
line = re.sub(r'^\*+\s*', '', line) # Remove ** symbols
line = re.sub(r'^[-•*]\s*', '', line) # Remove bullet markers
line = re.sub(r'^[:\-\s]*', '', line) # Remove colons and dashes
if len(line) > 10: # Only include substantial content
items.append(line)
# If no structured items found, try to extract sentences
if not items and content.strip():
sentences = re.split(r'[.!?]+', content)
for sentence in sentences:
clean_sentence = sentence.strip()
clean_sentence = re.sub(r'#+\s*', '', clean_sentence) # Remove ### symbols
clean_sentence = re.sub(r'^\*+\s*', '', clean_sentence) # Remove ** symbols
if clean_sentence and len(clean_sentence) > 15:
items.append(clean_sentence)
result[key] = items[:4] # Limit to 4 items per section
return result
def analyze_with_all_models(self, code: str, language: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""Analyze code using all available models."""
results = {}
for model_key in self.available_models:
results[model_key] = self.analyze_code(code, model_key, language)
return results
def compare_analyses(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Compare results from different models with focus on critical issues."""
comparison = {
'average_score': 0,
'consensus_bugs': [],
'consensus_security': [],
'model_scores': {},
'best_model': None,
'analysis_time': sum(r['execution_time'] for r in results.values())
}
# Calculate average score and find best model
scores = []
for model, result in results.items():
if 'error' not in result:
score = result['quality_score']
scores.append(score)
comparison['model_scores'][model] = score
if scores:
comparison['average_score'] = round(sum(scores) / len(scores), 1)
best_model = max(comparison['model_scores'].items(), key=lambda x: x[1])
comparison['best_model'] = best_model[0]
# Find consensus on critical issues
all_bugs = []
all_security = []
for result in results.values():
if 'error' not in result:
all_bugs.extend(result.get('bugs', []))
all_security.extend(result.get('security_vulnerabilities', []))
# Simple consensus: issues mentioned by multiple models
def find_consensus(items):
consensus = []
for item in items:
if any(item.lower() in other.lower() or other.lower() in item.lower()
for other in items if other != item):
if item not in consensus:
consensus.append(item)
return consensus[:3] # Top 3 consensus items
comparison['consensus_bugs'] = find_consensus(all_bugs)
comparison['consensus_security'] = find_consensus(all_security)
return comparison