Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Code Analyzer Service | |
| This module provides functionality for analyzing code quality across different languages. | |
| """ | |
| import os | |
| import subprocess | |
| import logging | |
| import json | |
| import tempfile | |
| import concurrent.futures | |
| from collections import defaultdict | |
| logger = logging.getLogger(__name__) | |
| class CodeAnalyzer: | |
| """ | |
| Service for analyzing code quality across different languages. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the CodeAnalyzer. | |
| """ | |
| logger.info("Initialized CodeAnalyzer") | |
| self.analyzers = { | |
| 'Python': self._analyze_python, | |
| 'JavaScript': self._analyze_javascript, | |
| 'TypeScript': self._analyze_typescript, | |
| 'Java': self._analyze_java, | |
| 'Go': self._analyze_go, | |
| 'Rust': self._analyze_rust, | |
| } | |
| def analyze_repository(self, repo_path, languages): | |
| """ | |
| Analyze code quality in a repository for the specified languages using parallel processing. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| languages (list): A list of programming languages to analyze. | |
| Returns: | |
| dict: A dictionary containing analysis results for each language. | |
| """ | |
| logger.info(f"Analyzing repository at {repo_path} for languages: {languages}") | |
| results = {} | |
| # Define a function to analyze a single language | |
| def analyze_language(language): | |
| if language in self.analyzers: | |
| try: | |
| logger.info(f"Analyzing {language} code in {repo_path}") | |
| return language, self.analyzers[language](repo_path) | |
| except Exception as e: | |
| logger.error(f"Error analyzing {language} code: {e}") | |
| return language, { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| else: | |
| logger.warning(f"No analyzer available for {language}") | |
| return language, { | |
| 'status': 'not_supported', | |
| 'message': f"Analysis for {language} is not supported yet.", | |
| 'issues': [], | |
| } | |
| # Use ThreadPoolExecutor to analyze languages in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(languages), 5)) as executor: | |
| # Submit all language analysis tasks | |
| future_to_language = {executor.submit(analyze_language, language): language for language in languages} | |
| # Process results as they complete | |
| for future in concurrent.futures.as_completed(future_to_language): | |
| language = future_to_language[future] | |
| try: | |
| lang, result = future.result() | |
| results[lang] = result | |
| logger.info(f"Completed analysis for {lang}") | |
| except Exception as e: | |
| logger.error(f"Exception occurred during analysis of {language}: {e}") | |
| results[language] = { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| return results | |
| def _analyze_python(self, repo_path): | |
| """ | |
| Analyze Python code using pylint. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing Python code in {repo_path}") | |
| # Find Python files | |
| python_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith('.py'): | |
| python_files.append(os.path.join(root, file)) | |
| if not python_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No Python files found in the repository.', | |
| 'issues': [], | |
| } | |
| # Create a temporary file to store pylint output | |
| with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: | |
| temp_path = temp_file.name | |
| try: | |
| # Run pylint with JSON reporter | |
| cmd = [ | |
| 'python', | |
| '-m', | |
| 'pylint', | |
| '--output-format=json', | |
| '--reports=n', | |
| ] + python_files | |
| process = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| # Parse pylint output | |
| if process.stdout.strip(): | |
| try: | |
| issues = json.loads(process.stdout) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing pylint output: {process.stdout}") | |
| issues = [] | |
| else: | |
| issues = [] | |
| # Group issues by type | |
| issues_by_type = defaultdict(list) | |
| for issue in issues: | |
| issue_type = issue.get('type', 'unknown') | |
| issues_by_type[issue_type].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': issues, | |
| 'issues_by_type': dict(issues_by_type), | |
| 'issue_count': len(issues), | |
| 'files_analyzed': len(python_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error running pylint: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| finally: | |
| # Clean up the temporary file | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| def _analyze_javascript(self, repo_path): | |
| """ | |
| Analyze JavaScript code using ESLint. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing JavaScript code in {repo_path}") | |
| # Find JavaScript files | |
| js_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith(('.js', '.jsx')) and not 'node_modules' in root: | |
| js_files.append(os.path.join(root, file)) | |
| if not js_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No JavaScript files found in the repository.', | |
| 'issues': [], | |
| } | |
| # Create a temporary ESLint configuration file | |
| eslint_config = { | |
| "env": { | |
| "browser": True, | |
| "es2021": True, | |
| "node": True | |
| }, | |
| "extends": "eslint:recommended", | |
| "parserOptions": { | |
| "ecmaVersion": 12, | |
| "sourceType": "module", | |
| "ecmaFeatures": { | |
| "jsx": True | |
| } | |
| }, | |
| "rules": {} | |
| } | |
| with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: | |
| json.dump(eslint_config, temp_config) | |
| temp_config_path = temp_config.name | |
| try: | |
| # Run ESLint with JSON formatter | |
| cmd = [ | |
| 'npx', | |
| 'eslint', | |
| '--config', temp_config_path, | |
| '--format', 'json', | |
| ] + js_files | |
| process = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| # Parse ESLint output | |
| if process.stdout.strip(): | |
| try: | |
| eslint_results = json.loads(process.stdout) | |
| # Extract issues from ESLint results | |
| issues = [] | |
| for result in eslint_results: | |
| file_path = result.get('filePath', '') | |
| for message in result.get('messages', []): | |
| issues.append({ | |
| 'path': file_path, | |
| 'line': message.get('line', 0), | |
| 'column': message.get('column', 0), | |
| 'message': message.get('message', ''), | |
| 'severity': message.get('severity', 0), | |
| 'ruleId': message.get('ruleId', ''), | |
| }) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing ESLint output: {process.stdout}") | |
| issues = [] | |
| else: | |
| issues = [] | |
| # Group issues by severity | |
| issues_by_severity = defaultdict(list) | |
| for issue in issues: | |
| severity = issue.get('severity', 0) | |
| severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') | |
| issues_by_severity[severity_name].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': issues, | |
| 'issues_by_severity': dict(issues_by_severity), | |
| 'issue_count': len(issues), | |
| 'files_analyzed': len(js_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error running ESLint: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| finally: | |
| # Clean up the temporary configuration file | |
| if os.path.exists(temp_config_path): | |
| os.unlink(temp_config_path) | |
| def _analyze_typescript(self, repo_path): | |
| """ | |
| Analyze TypeScript code using ESLint and TSC. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing TypeScript code in {repo_path}") | |
| # Find TypeScript files | |
| ts_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith(('.ts', '.tsx')) and not 'node_modules' in root: | |
| ts_files.append(os.path.join(root, file)) | |
| if not ts_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No TypeScript files found in the repository.', | |
| 'issues': [], | |
| } | |
| # Create a temporary ESLint configuration file for TypeScript | |
| eslint_config = { | |
| "env": { | |
| "browser": True, | |
| "es2021": True, | |
| "node": True | |
| }, | |
| "extends": [ | |
| "eslint:recommended", | |
| "plugin:@typescript-eslint/recommended" | |
| ], | |
| "parser": "@typescript-eslint/parser", | |
| "parserOptions": { | |
| "ecmaVersion": 12, | |
| "sourceType": "module", | |
| "ecmaFeatures": { | |
| "jsx": True | |
| } | |
| }, | |
| "plugins": [ | |
| "@typescript-eslint" | |
| ], | |
| "rules": {} | |
| } | |
| with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: | |
| json.dump(eslint_config, temp_config) | |
| temp_config_path = temp_config.name | |
| # Create a temporary tsconfig.json file | |
| tsconfig = { | |
| "compilerOptions": { | |
| "target": "es2020", | |
| "module": "commonjs", | |
| "strict": True, | |
| "esModuleInterop": True, | |
| "skipLibCheck": True, | |
| "forceConsistentCasingInFileNames": True, | |
| "noEmit": True | |
| }, | |
| "include": ts_files | |
| } | |
| with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_tsconfig: | |
| json.dump(tsconfig, temp_tsconfig) | |
| temp_tsconfig_path = temp_tsconfig.name | |
| try: | |
| # Run ESLint with TypeScript support | |
| eslint_cmd = [ | |
| 'npx', | |
| 'eslint', | |
| '--config', temp_config_path, | |
| '--format', 'json', | |
| '--ext', '.ts,.tsx', | |
| ] + ts_files | |
| eslint_process = subprocess.run( | |
| eslint_cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| # Parse ESLint output | |
| eslint_issues = [] | |
| if eslint_process.stdout.strip(): | |
| try: | |
| eslint_results = json.loads(eslint_process.stdout) | |
| # Extract issues from ESLint results | |
| for result in eslint_results: | |
| file_path = result.get('filePath', '') | |
| for message in result.get('messages', []): | |
| eslint_issues.append({ | |
| 'path': file_path, | |
| 'line': message.get('line', 0), | |
| 'column': message.get('column', 0), | |
| 'message': message.get('message', ''), | |
| 'severity': message.get('severity', 0), | |
| 'ruleId': message.get('ruleId', ''), | |
| 'source': 'eslint', | |
| }) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing ESLint output: {eslint_process.stdout}") | |
| # Run TypeScript compiler for type checking | |
| tsc_cmd = [ | |
| 'npx', | |
| 'tsc', | |
| '--project', temp_tsconfig_path, | |
| '--noEmit', | |
| ] | |
| tsc_process = subprocess.run( | |
| tsc_cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| # Parse TSC output | |
| tsc_issues = [] | |
| if tsc_process.stderr.strip(): | |
| # TSC error format: file.ts(line,col): error TS2551: message | |
| for line in tsc_process.stderr.splitlines(): | |
| if ': error ' in line or ': warning ' in line: | |
| try: | |
| file_info, error_info = line.split(':', 1) | |
| file_path, line_col = file_info.rsplit('(', 1) | |
| line_num, col_num = line_col.rstrip(')').split(',') | |
| error_type, error_message = error_info.split(':', 1) | |
| error_type = error_type.strip() | |
| error_message = error_message.strip() | |
| tsc_issues.append({ | |
| 'path': file_path, | |
| 'line': int(line_num), | |
| 'column': int(col_num), | |
| 'message': error_message, | |
| 'severity': 2 if 'error' in error_type else 1, | |
| 'ruleId': error_type, | |
| 'source': 'tsc', | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Error parsing TSC output line: {line}, error: {e}") | |
| # Combine issues from both tools | |
| all_issues = eslint_issues + tsc_issues | |
| # Group issues by source and severity | |
| issues_by_source = defaultdict(list) | |
| issues_by_severity = defaultdict(list) | |
| for issue in all_issues: | |
| source = issue.get('source', 'unknown') | |
| issues_by_source[source].append(issue) | |
| severity = issue.get('severity', 0) | |
| severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') | |
| issues_by_severity[severity_name].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': all_issues, | |
| 'issues_by_source': dict(issues_by_source), | |
| 'issues_by_severity': dict(issues_by_severity), | |
| 'issue_count': len(all_issues), | |
| 'files_analyzed': len(ts_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error analyzing TypeScript code: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| finally: | |
| # Clean up temporary files | |
| for temp_file in [temp_config_path, temp_tsconfig_path]: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| def _analyze_java(self, repo_path): | |
| """ | |
| Analyze Java code using PMD. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing Java code in {repo_path}") | |
| # Find Java files | |
| java_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith('.java'): | |
| java_files.append(os.path.join(root, file)) | |
| if not java_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No Java files found in the repository.', | |
| 'issues': [], | |
| } | |
| # Create a temporary file to store PMD output | |
| with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: | |
| temp_path = temp_file.name | |
| try: | |
| # Run PMD with JSON reporter | |
| cmd = [ | |
| 'pmd', | |
| 'check', | |
| '--dir', repo_path, | |
| '--format', 'json', | |
| '--rulesets', 'category/java/bestpractices.xml,category/java/codestyle.xml,category/java/design.xml,category/java/errorprone.xml,category/java/multithreading.xml,category/java/performance.xml,category/java/security.xml', | |
| ] | |
| process = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| # Parse PMD output | |
| if process.stdout.strip(): | |
| try: | |
| pmd_results = json.loads(process.stdout) | |
| # Extract issues from PMD results | |
| issues = [] | |
| for file_result in pmd_results.get('files', []): | |
| file_path = file_result.get('filename', '') | |
| for violation in file_result.get('violations', []): | |
| issues.append({ | |
| 'path': file_path, | |
| 'line': violation.get('beginline', 0), | |
| 'endLine': violation.get('endline', 0), | |
| 'column': violation.get('begincolumn', 0), | |
| 'endColumn': violation.get('endcolumn', 0), | |
| 'message': violation.get('description', ''), | |
| 'rule': violation.get('rule', ''), | |
| 'ruleset': violation.get('ruleset', ''), | |
| 'priority': violation.get('priority', 0), | |
| }) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing PMD output: {process.stdout}") | |
| issues = [] | |
| else: | |
| issues = [] | |
| # Group issues by ruleset | |
| issues_by_ruleset = defaultdict(list) | |
| for issue in issues: | |
| ruleset = issue.get('ruleset', 'unknown') | |
| issues_by_ruleset[ruleset].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': issues, | |
| 'issues_by_ruleset': dict(issues_by_ruleset), | |
| 'issue_count': len(issues), | |
| 'files_analyzed': len(java_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error running PMD: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| finally: | |
| # Clean up the temporary file | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| def _analyze_go(self, repo_path): | |
| """ | |
| Analyze Go code using golangci-lint. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing Go code in {repo_path}") | |
| # Find Go files | |
| go_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith('.go'): | |
| go_files.append(os.path.join(root, file)) | |
| if not go_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No Go files found in the repository.', | |
| 'issues': [], | |
| } | |
| try: | |
| # Run golangci-lint with JSON output | |
| cmd = [ | |
| 'golangci-lint', | |
| 'run', | |
| '--out-format=json', | |
| repo_path, | |
| ] | |
| process = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| cwd=repo_path, # Run in the repository directory | |
| ) | |
| # Parse golangci-lint output | |
| if process.stdout.strip(): | |
| try: | |
| lint_results = json.loads(process.stdout) | |
| # Extract issues from golangci-lint results | |
| issues = [] | |
| for issue in lint_results.get('Issues', []): | |
| issues.append({ | |
| 'path': issue.get('Pos', {}).get('Filename', ''), | |
| 'line': issue.get('Pos', {}).get('Line', 0), | |
| 'column': issue.get('Pos', {}).get('Column', 0), | |
| 'message': issue.get('Text', ''), | |
| 'linter': issue.get('FromLinter', ''), | |
| 'severity': 'error' if issue.get('Severity', '') == 'error' else 'warning', | |
| }) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing golangci-lint output: {process.stdout}") | |
| issues = [] | |
| else: | |
| issues = [] | |
| # Group issues by linter | |
| issues_by_linter = defaultdict(list) | |
| for issue in issues: | |
| linter = issue.get('linter', 'unknown') | |
| issues_by_linter[linter].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': issues, | |
| 'issues_by_linter': dict(issues_by_linter), | |
| 'issue_count': len(issues), | |
| 'files_analyzed': len(go_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error running golangci-lint: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } | |
| def _analyze_rust(self, repo_path): | |
| """ | |
| Analyze Rust code using clippy. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: Analysis results. | |
| """ | |
| logger.info(f"Analyzing Rust code in {repo_path}") | |
| # Find Rust files | |
| rust_files = [] | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith('.rs'): | |
| rust_files.append(os.path.join(root, file)) | |
| if not rust_files: | |
| return { | |
| 'status': 'no_files', | |
| 'message': 'No Rust files found in the repository.', | |
| 'issues': [], | |
| } | |
| try: | |
| # Run clippy with JSON output | |
| cmd = [ | |
| 'cargo', | |
| 'clippy', | |
| '--message-format=json', | |
| ] | |
| process = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| cwd=repo_path, # Run in the repository directory | |
| ) | |
| # Parse clippy output | |
| issues = [] | |
| if process.stdout.strip(): | |
| for line in process.stdout.splitlines(): | |
| try: | |
| message = json.loads(line) | |
| if message.get('reason') == 'compiler-message': | |
| msg = message.get('message', {}) | |
| spans = msg.get('spans', []) | |
| if spans: | |
| primary_span = next((s for s in spans if s.get('is_primary')), spans[0]) | |
| file_path = primary_span.get('file_name', '') | |
| line_num = primary_span.get('line_start', 0) | |
| column = primary_span.get('column_start', 0) | |
| issues.append({ | |
| 'path': file_path, | |
| 'line': line_num, | |
| 'column': column, | |
| 'message': msg.get('message', ''), | |
| 'level': msg.get('level', ''), | |
| 'code': msg.get('code', {}).get('code', ''), | |
| }) | |
| except json.JSONDecodeError: | |
| continue | |
| # Group issues by level | |
| issues_by_level = defaultdict(list) | |
| for issue in issues: | |
| level = issue.get('level', 'unknown') | |
| issues_by_level[level].append(issue) | |
| return { | |
| 'status': 'success', | |
| 'issues': issues, | |
| 'issues_by_level': dict(issues_by_level), | |
| 'issue_count': len(issues), | |
| 'files_analyzed': len(rust_files), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error running clippy: {e}") | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'issues': [], | |
| } |