Spaces:
Sleeping
Sleeping
arun3676
Configure for HuggingFace Spaces Docker deployment - Add Dockerfile, .dockerignore, update README with HF metadata, optimize requirements.txt
b7db63d | from typing import Dict, Any, List, Optional, Generator, Literal | |
| import time | |
| import requests | |
| import os | |
| import re | |
| import hashlib | |
| import json | |
| from tqdm import tqdm | |
| from .llm_clients import LLMClientManager, LLMResponse | |
| from .prompts import get_code_analysis_prompt, get_comparison_prompt, get_github_analysis_prompt | |
| from .utils import detect_language, parse_analysis_result | |
| ModelType = Literal["codet5", "deepseek-finetuned", "deepseek-finetuned-remote"] | |
| class CodeAnalyzer: | |
| """Main code analysis engine with support for APIs, local models, and GitHub integration.""" | |
| def __init__(self, cache_dir: str = None, precision: str = "fp16"): | |
| # API-based models | |
| self.llm_manager = LLMClientManager() | |
| self.available_models = self.llm_manager.get_available_models() | |
| # Local/Remote Hugging Face models | |
| self.model_type: Optional[ModelType] = None | |
| self.model_id: Optional[str] = None | |
| self.adapter_path: Optional[str] = None | |
| self.remote_api_url: Optional[str] = None | |
| self.cache_dir = cache_dir | |
| self.precision = precision.lower().strip() | |
| self.cache = {} | |
| if cache_dir is not None: | |
| os.makedirs(cache_dir, exist_ok=True) | |
| self._load_cache() | |
| def _get_cache_key(self, code: str) -> str: | |
| """Generate a unique cache key for a piece of code and model type.""" | |
| combined = f"{self.model_type}:{self.model_id}:{code}" | |
| return hashlib.md5(combined.encode()).hexdigest() | |
| def _load_cache(self): | |
| """Load analysis cache from disk if available.""" | |
| if self.cache_dir is None: | |
| self.cache = {} | |
| return | |
| cache_file = os.path.join(self.cache_dir, "analysis_cache.json") | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| self.cache = json.load(f) | |
| print(f"📁 Loaded {len(self.cache)} cached analyses") | |
| except (json.JSONDecodeError, IOError): | |
| self.cache = {} | |
| def _save_cache(self): | |
| """Save the analysis cache to disk.""" | |
| if self.cache_dir is None: | |
| return | |
| cache_file = os.path.join(self.cache_dir, "analysis_cache.json") | |
| with open(cache_file, 'w') as f: | |
| json.dump(self.cache, f) | |
| def _check_cache(self, code: str) -> Optional[Dict[str, Any]]: | |
| """Check if an analysis for the given code is in the cache.""" | |
| cache_key = self._get_cache_key(code) | |
| return self.cache.get(cache_key) | |
| def _save_to_cache(self, code: str, result: Dict[str, Any]): | |
| """Save an analysis result to the cache.""" | |
| cache_key = self._get_cache_key(code) | |
| self.cache[cache_key] = result | |
| self._save_cache() | |
| def analyze_code( | |
| self, | |
| code: str, | |
| model: str, | |
| language: Optional[str] = None, | |
| max_tokens: int = 1024, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze code using a specified LLM provider. | |
| This is a non-streaming, direct analysis method. | |
| Language detection is now handled by AI for accuracy. | |
| """ | |
| # Skip local language detection - let AI handle it for accuracy | |
| if language is None: | |
| language = "auto-detect" # Let AI detect it | |
| prompt = get_code_analysis_prompt(code, language, model) | |
| start_time = time.time() | |
| response = self.llm_manager.query(model, prompt) | |
| total_time = time.time() - start_time | |
| if response.success: | |
| structured_data = parse_analysis_result(response.content, model) | |
| # Use AI-detected language if available, otherwise fallback to auto | |
| detected_lang = structured_data.get('detected_language') | |
| if detected_lang: | |
| language = detected_lang.upper() | |
| else: | |
| # Fallback to LLM-based detection if not in response | |
| language = detect_language(code).upper() | |
| result = { | |
| "raw_response": response.content, | |
| "quality_score": structured_data.get('quality_score', 0), | |
| "execution_time": total_time, | |
| "model": response.model, | |
| "cached": False, | |
| **structured_data, | |
| "language": language, | |
| "line_count": len(code.splitlines()), | |
| } | |
| else: | |
| result = {'error': response.error} | |
| return result | |
| def analyze_code_remote(self, code: str, max_tokens: int = 300) -> Dict[str, Any]: # Increased token limit | |
| """Analyze code using a remote Hugging Face Space API.""" | |
| if not self.remote_api_url: | |
| return {'error': 'Remote API URL is not configured.'} | |
| cached_result = self._check_cache(code) | |
| if cached_result: | |
| cached_result["cached"] = True | |
| return cached_result | |
| start_time = time.time() | |
| try: | |
| # First, try FastAPI endpoint /analyze | |
| response = requests.post( | |
| f"{self.remote_api_url}/analyze", | |
| json={"code": code, "max_tokens": max_tokens}, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Assuming the remote API returns a structured response | |
| total_time = time.time() - start_time | |
| result = { | |
| "raw_response": data.get("analysis", str(data)), | |
| "quality_score": data.get("quality_score", 0), | |
| "execution_time": total_time, | |
| "model": data.get("model", "remote-deepseek"), | |
| "cached": False, | |
| "bugs": data.get("bugs", []), | |
| "security_vulnerabilities": data.get("security_vulnerabilities", []), | |
| "quality_issues": data.get("quality_issues", []), | |
| "quick_fixes": data.get("quick_fixes", []), | |
| "language": data.get("language", detect_language(code)), | |
| "line_count": data.get("line_count", len(code.splitlines())), | |
| } | |
| self._save_to_cache(code, result) | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| # Fallback for Gradio or other errors | |
| return {'error': f"Remote analysis failed: {e}"} | |
| def analyze_github_repo(self, repo_url: str, model: str = None) -> Dict[str, Any]: | |
| """Analyze a GitHub repository.""" | |
| start_time = time.time() | |
| # Use first available model if none specified | |
| if not model or model not in self.available_models: | |
| model = list(self.available_models.keys())[0] | |
| try: | |
| # Parse GitHub URL | |
| if not repo_url.startswith('https://github.com/'): | |
| return {'error': 'Please provide a valid GitHub repository URL'} | |
| # Extract owner and repo | |
| parts = repo_url.replace('https://github.com/', '').split('/') | |
| if len(parts) < 2: | |
| return {'error': 'Invalid GitHub repository URL format'} | |
| owner, repo = parts[0], parts[1] | |
| # Get repository structure and key files | |
| repo_data = self._fetch_github_repo_data(owner, repo) | |
| if 'error' in repo_data: | |
| return repo_data | |
| # Generate analysis prompt | |
| prompt = get_github_analysis_prompt( | |
| repo_data['structure'], | |
| repo_data['main_files'] | |
| ) | |
| # Query LLM | |
| response = self.llm_manager.query(model, prompt) | |
| if response.success: | |
| analysis = self._parse_github_analysis(response.content) | |
| analysis['raw_response'] = response.content | |
| analysis['repository_info'] = repo_data['info'] | |
| else: | |
| analysis = { | |
| 'error': response.error, | |
| 'project_overview': f"Analysis failed: {response.error}", | |
| 'architecture_quality': [], | |
| 'critical_issues': [], | |
| 'improvement_priorities': [] | |
| } | |
| # Add metadata | |
| analysis['model'] = response.model | |
| analysis['execution_time'] = round(time.time() - start_time, 2) | |
| analysis['repo_url'] = repo_url | |
| return analysis | |
| except Exception as e: | |
| return { | |
| 'error': f"GitHub analysis failed: {str(e)}", | |
| 'execution_time': round(time.time() - start_time, 2) | |
| } | |
| def _fetch_github_repo_data(self, owner: str, repo: str) -> Dict[str, Any]: | |
| """Fetch repository data from GitHub API.""" | |
| try: | |
| # GitHub API endpoints | |
| api_base = f"https://api.github.com/repos/{owner}/{repo}" | |
| # Get repository info | |
| headers = {} | |
| if os.getenv('GITHUB_TOKEN'): | |
| headers['Authorization'] = f"token {os.getenv('GITHUB_TOKEN')}" | |
| repo_response = requests.get(api_base, headers=headers) | |
| if repo_response.status_code != 200: | |
| return {'error': f'Repository not found or private: {owner}/{repo}'} | |
| repo_info = repo_response.json() | |
| # Get file tree | |
| tree_response = requests.get(f"{api_base}/git/trees/main?recursive=1", headers=headers) | |
| if tree_response.status_code != 200: | |
| # Try master branch | |
| tree_response = requests.get(f"{api_base}/git/trees/master?recursive=1", headers=headers) | |
| if tree_response.status_code != 200: | |
| return {'error': 'Could not fetch repository structure'} | |
| tree_data = tree_response.json() | |
| # Build structure and get key files | |
| structure = self._build_repo_structure(tree_data['tree']) | |
| main_files = self._get_key_files(owner, repo, tree_data['tree'], headers) | |
| return { | |
| 'info': { | |
| 'name': repo_info['name'], | |
| 'description': repo_info.get('description', 'No description'), | |
| 'language': repo_info.get('language', 'Unknown'), | |
| 'stars': repo_info.get('stargazers_count', 0), | |
| 'forks': repo_info.get('forks_count', 0), | |
| 'size': repo_info.get('size', 0) | |
| }, | |
| 'structure': structure, | |
| 'main_files': main_files | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to fetch repository data: {str(e)}'} | |
| def _build_repo_structure(self, tree: List[Dict]) -> str: | |
| """Build a readable repository structure.""" | |
| structure_lines = [] | |
| dirs = set() | |
| for item in tree[:50]: # Limit to first 50 items | |
| if item['type'] == 'tree': | |
| dirs.add(item['path']) | |
| else: | |
| structure_lines.append(f"📄 {item['path']}") | |
| for dir_path in sorted(dirs): | |
| structure_lines.append(f"📁 {dir_path}/") | |
| return '\n'.join(structure_lines[:30]) # Limit output | |
| def _get_key_files(self, owner: str, repo: str, tree: List[Dict], headers: Dict) -> str: | |
| """Get content of key files like README, main source files.""" | |
| key_files = [] | |
| # Priority files to analyze | |
| priority_patterns = [ | |
| 'README.md', 'readme.md', 'README.txt', | |
| 'package.json', 'requirements.txt', 'Cargo.toml', 'go.mod', | |
| 'main.py', 'index.js', 'main.js', 'app.py', 'server.js' | |
| ] | |
| for item in tree: | |
| if item['type'] == 'blob': | |
| filename = item['path'].split('/')[-1] | |
| # Check if it's a priority file | |
| if filename in priority_patterns or any( | |
| pattern in filename.lower() for pattern in ['main', 'index', 'app'] | |
| ): | |
| try: | |
| file_response = requests.get( | |
| f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}", | |
| headers=headers | |
| ) | |
| if file_response.status_code == 200: | |
| file_data = file_response.json() | |
| if file_data.get('encoding') == 'base64': | |
| import base64 | |
| content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore') | |
| key_files.append(f"\n--- {item['path']} ---\n{content[:1000]}") # First 1000 chars | |
| except: | |
| continue | |
| if len(key_files) >= 5: # Limit to 5 key files | |
| break | |
| return '\n'.join(key_files) | |
| def _parse_github_analysis(self, text: str) -> Dict[str, Any]: | |
| """Parse GitHub repository analysis results.""" | |
| result = { | |
| 'project_overview': '', | |
| 'architecture_quality': [], | |
| 'critical_issues': [], | |
| 'improvement_priorities': [], | |
| 'onboarding_guide': [], | |
| 'tech_stack_rationale': [], | |
| 'api_endpoint_summary': [], | |
| } | |
| sections = { | |
| 'project_overview': r'(?:PROJECT_OVERVIEW|project\s+overview)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'architecture_quality': r'(?:ARCHITECTURE_QUALITY|architecture|structure)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'critical_issues': r'(?:CRITICAL_ISSUES|critical|major\s+issue)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'improvement_priorities': r'(?:IMPROVEMENT_PRIORITIES|improvement|priorit)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'onboarding_guide': r'(?:ONBOARDING_GUIDE|onboarding|setup)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'tech_stack_rationale': r'(?:TECH_STACK_RATIONALE|tech\s+stack|stack\s+rationale)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| 'api_endpoint_summary': r'(?:API_ENDPOINT_SUMMARY|api\s+endpoint|endpoints)[:\s]*(.+?)(?=\n\s*(?:\d+\.|[A-Z_]+:)|$)', | |
| } | |
| for key, pattern in sections.items(): | |
| match = re.search(pattern, text, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| content = match.group(1).strip() | |
| if key == 'project_overview': | |
| # Clean project overview | |
| clean_overview = content.split('\n')[0].strip() | |
| clean_overview = re.sub(r'#+\s*', '', clean_overview) # Remove ### symbols | |
| clean_overview = re.sub(r'^\*+\s*', '', clean_overview) # Remove ** symbols | |
| result[key] = clean_overview | |
| else: | |
| # Extract and clean bullet points | |
| items = [] | |
| lines = content.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if line and not line.lower() in ['none', 'none found']: | |
| # Clean up markdown symbols and extra characters | |
| line = re.sub(r'#+\s*', '', line) # Remove ### symbols | |
| line = re.sub(r'^\*+\s*', '', line) # Remove ** symbols | |
| line = re.sub(r'^[-•*]\s*', '', line) # Remove bullet markers | |
| line = re.sub(r'^[:\-\s]*', '', line) # Remove colons and dashes | |
| if len(line) > 10: # Only include substantial content | |
| items.append(line) | |
| # If no structured items found, try to extract sentences | |
| if not items and content.strip(): | |
| sentences = re.split(r'[.!?]+', content) | |
| for sentence in sentences: | |
| clean_sentence = sentence.strip() | |
| clean_sentence = re.sub(r'#+\s*', '', clean_sentence) # Remove ### symbols | |
| clean_sentence = re.sub(r'^\*+\s*', '', clean_sentence) # Remove ** symbols | |
| if clean_sentence and len(clean_sentence) > 15: | |
| items.append(clean_sentence) | |
| result[key] = items[:4] # Limit to 4 items per section | |
| return result | |
| def analyze_with_all_models(self, code: str, language: Optional[str] = None) -> Dict[str, Dict[str, Any]]: | |
| """Analyze code using all available models.""" | |
| results = {} | |
| for model_key in self.available_models: | |
| results[model_key] = self.analyze_code(code, model_key, language) | |
| return results | |
| def compare_analyses(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: | |
| """Compare results from different models with focus on critical issues.""" | |
| comparison = { | |
| 'average_score': 0, | |
| 'consensus_bugs': [], | |
| 'consensus_security': [], | |
| 'model_scores': {}, | |
| 'best_model': None, | |
| 'analysis_time': sum(r['execution_time'] for r in results.values()) | |
| } | |
| # Calculate average score and find best model | |
| scores = [] | |
| for model, result in results.items(): | |
| if 'error' not in result: | |
| score = result['quality_score'] | |
| scores.append(score) | |
| comparison['model_scores'][model] = score | |
| if scores: | |
| comparison['average_score'] = round(sum(scores) / len(scores), 1) | |
| best_model = max(comparison['model_scores'].items(), key=lambda x: x[1]) | |
| comparison['best_model'] = best_model[0] | |
| # Find consensus on critical issues | |
| all_bugs = [] | |
| all_security = [] | |
| for result in results.values(): | |
| if 'error' not in result: | |
| all_bugs.extend(result.get('bugs', [])) | |
| all_security.extend(result.get('security_vulnerabilities', [])) | |
| # Simple consensus: issues mentioned by multiple models | |
| def find_consensus(items): | |
| consensus = [] | |
| for item in items: | |
| if any(item.lower() in other.lower() or other.lower() in item.lower() | |
| for other in items if other != item): | |
| if item not in consensus: | |
| consensus.append(item) | |
| return consensus[:3] # Top 3 consensus items | |
| comparison['consensus_bugs'] = find_consensus(all_bugs) | |
| comparison['consensus_security'] = find_consensus(all_security) | |
| return comparison |