Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Unit tests for the Performance Analyzer service | |
| """ | |
| import unittest | |
| from unittest.mock import patch, MagicMock, mock_open | |
| import os | |
| import sys | |
| import re | |
| from pathlib import Path | |
| # Add the project root directory to the Python path | |
| project_root = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(project_root)) | |
| from src.services.performance_analyzer import PerformanceAnalyzer | |
| class TestPerformanceAnalyzer(unittest.TestCase): | |
| """Test cases for the PerformanceAnalyzer class""" | |
| def setUp(self): | |
| """Set up test fixtures""" | |
| self.analyzer = PerformanceAnalyzer() | |
| self.test_repo_path = "/test/repo" | |
| def test_analyze_python_performance(self): | |
| """Test analyze_python_performance method""" | |
| # Create a sample Python file content with performance issues | |
| python_code = """ | |
| def slow_function(): | |
| # This is a slow list comprehension with nested loops | |
| result = [x * y for x in range(1000) for y in range(1000)] | |
| # Inefficient string concatenation in a loop | |
| s = "" | |
| for i in range(1000): | |
| s += str(i) | |
| # Using a list where a set would be more efficient | |
| items = [1, 2, 3, 4, 5] | |
| if 3 in items: # O(n) operation | |
| print("Found") | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/test.py']), \ | |
| patch('builtins.open', mock_open(read_data=python_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_python_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| nested_loop_issue = next((issue for issue in result['issues'] | |
| if 'nested loop' in issue['message'].lower()), None) | |
| string_concat_issue = next((issue for issue in result['issues'] | |
| if 'string concatenation' in issue['message'].lower()), None) | |
| list_vs_set_issue = next((issue for issue in result['issues'] | |
| if 'list' in issue['message'].lower() and 'set' in issue['message'].lower()), None) | |
| self.assertIsNotNone(nested_loop_issue) | |
| self.assertIsNotNone(string_concat_issue) | |
| self.assertIsNotNone(list_vs_set_issue) | |
| def test_analyze_javascript_performance(self): | |
| """Test analyze_javascript_performance method""" | |
| # Create a sample JavaScript file content with performance issues | |
| js_code = """ | |
| function slowFunction() { | |
| // Inefficient DOM manipulation in a loop | |
| for (let i = 0; i < 1000; i++) { | |
| document.getElementById('myElement').innerHTML += 'item ' + i; | |
| } | |
| // Memory leak with event listeners | |
| document.getElementById('button').addEventListener('click', function() { | |
| console.log('clicked'); | |
| }); | |
| // Blocking the main thread | |
| let start = Date.now(); | |
| while (Date.now() - start < 1000) { | |
| // Busy wait for 1 second | |
| } | |
| } | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/test.js']), \ | |
| patch('builtins.open', mock_open(read_data=js_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_javascript_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| dom_issue = next((issue for issue in result['issues'] | |
| if 'dom' in issue['message'].lower()), None) | |
| memory_leak_issue = next((issue for issue in result['issues'] | |
| if 'memory leak' in issue['message'].lower() or 'event listener' in issue['message'].lower()), None) | |
| blocking_issue = next((issue for issue in result['issues'] | |
| if 'blocking' in issue['message'].lower() or 'main thread' in issue['message'].lower()), None) | |
| self.assertIsNotNone(dom_issue) | |
| self.assertIsNotNone(memory_leak_issue) | |
| self.assertIsNotNone(blocking_issue) | |
| def test_analyze_typescript_performance(self): | |
| """Test analyze_typescript_performance method""" | |
| # Create a sample TypeScript file content with performance issues | |
| ts_code = """ | |
| function slowFunction(): void { | |
| // Inefficient array operations | |
| const array: number[] = []; | |
| for (let i = 0; i < 1000; i++) { | |
| array.unshift(i); // O(n) operation | |
| } | |
| // Excessive type casting | |
| let value: any = "123"; | |
| let num: number = <number><any>value; | |
| // Inefficient async/await usage | |
| async function fetchData(): Promise<void> { | |
| const promises = []; | |
| for (let i = 0; i < 10; i++) { | |
| const result = await fetch(`https://api.example.com/data/${i}`); // Sequential fetches | |
| promises.push(result); | |
| } | |
| } | |
| } | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/test.ts']), \ | |
| patch('builtins.open', mock_open(read_data=ts_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_typescript_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| array_issue = next((issue for issue in result['issues'] | |
| if 'array' in issue['message'].lower() and 'unshift' in issue['message'].lower()), None) | |
| type_casting_issue = next((issue for issue in result['issues'] | |
| if 'type casting' in issue['message'].lower()), None) | |
| async_issue = next((issue for issue in result['issues'] | |
| if 'async' in issue['message'].lower() or 'await' in issue['message'].lower()), None) | |
| self.assertIsNotNone(array_issue) | |
| self.assertIsNotNone(type_casting_issue) | |
| self.assertIsNotNone(async_issue) | |
| def test_analyze_java_performance(self): | |
| """Test analyze_java_performance method""" | |
| # Create a sample Java file content with performance issues | |
| java_code = """ | |
| public class SlowClass { | |
| public void slowMethod() { | |
| // Inefficient string concatenation | |
| String result = ""; | |
| for (int i = 0; i < 1000; i++) { | |
| result += i; // Creates a new string each time | |
| } | |
| // Using ArrayList where HashSet would be more efficient for lookups | |
| ArrayList<Integer> list = new ArrayList<>(); | |
| for (int i = 0; i < 1000; i++) { | |
| list.add(i); | |
| } | |
| boolean contains = list.contains(500); // O(n) operation | |
| // Excessive object creation | |
| for (int i = 0; i < 1000; i++) { | |
| Integer obj = new Integer(i); // Creates 1000 objects | |
| } | |
| } | |
| } | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/SlowClass.java']), \ | |
| patch('builtins.open', mock_open(read_data=java_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_java_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| string_concat_issue = next((issue for issue in result['issues'] | |
| if 'string concatenation' in issue['message'].lower()), None) | |
| collection_issue = next((issue for issue in result['issues'] | |
| if 'arraylist' in issue['message'].lower() and 'hashset' in issue['message'].lower()), None) | |
| object_creation_issue = next((issue for issue in result['issues'] | |
| if 'object creation' in issue['message'].lower()), None) | |
| self.assertIsNotNone(string_concat_issue) | |
| self.assertIsNotNone(collection_issue) | |
| self.assertIsNotNone(object_creation_issue) | |
| def test_analyze_go_performance(self): | |
| """Test analyze_go_performance method""" | |
| # Create a sample Go file content with performance issues | |
| go_code = """ | |
| package main | |
| import ( | |
| "fmt" | |
| "sync" | |
| ) | |
| func slowFunction() { | |
| // Inefficient slice operations | |
| slice := []int{} | |
| for i := 0; i < 1000; i++ { | |
| slice = append(slice, i) // May cause reallocation | |
| } | |
| // Mutex instead of atomic operations | |
| var mu sync.Mutex | |
| counter := 0 | |
| for i := 0; i < 1000; i++ { | |
| mu.Lock() | |
| counter++ | |
| mu.Unlock() | |
| } | |
| // Inefficient string concatenation | |
| result := "" | |
| for i := 0; i < 1000; i++ { | |
| result += fmt.Sprintf("%d", i) // Creates a new string each time | |
| } | |
| } | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/main.go']), \ | |
| patch('builtins.open', mock_open(read_data=go_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_go_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| slice_issue = next((issue for issue in result['issues'] | |
| if 'slice' in issue['message'].lower() and 'append' in issue['message'].lower()), None) | |
| mutex_issue = next((issue for issue in result['issues'] | |
| if 'mutex' in issue['message'].lower() or 'atomic' in issue['message'].lower()), None) | |
| string_concat_issue = next((issue for issue in result['issues'] | |
| if 'string concatenation' in issue['message'].lower()), None) | |
| self.assertIsNotNone(slice_issue) | |
| self.assertIsNotNone(mutex_issue) | |
| self.assertIsNotNone(string_concat_issue) | |
| def test_analyze_rust_performance(self): | |
| """Test analyze_rust_performance method""" | |
| # Create a sample Rust file content with performance issues | |
| rust_code = """ | |
| fn slow_function() { | |
| // Inefficient string operations | |
| let mut result = String::new(); | |
| for i in 0..1000 { | |
| result.push_str(&i.to_string()); // Allocates a new string each time | |
| } | |
| // Excessive cloning | |
| let data = vec![1, 2, 3, 4, 5]; | |
| let copied = data.clone(); // Clones the entire vector | |
| // Inefficient iteration | |
| let mut sum = 0; | |
| for i in 0..data.len() { | |
| sum += data[i]; // Bounds checking on each access | |
| } | |
| } | |
| """ | |
| # Mock the file discovery and reading | |
| with patch.object(self.analyzer, '_find_files', return_value=['/test/repo/main.rs']), \ | |
| patch('builtins.open', mock_open(read_data=rust_code)): | |
| # Call the method | |
| result = self.analyzer.analyze_rust_performance(self.test_repo_path) | |
| # Verify the result | |
| self.assertGreater(len(result['issues']), 0) | |
| self.assertGreater(result['issue_count'], 0) | |
| # Check for specific issues | |
| string_issue = next((issue for issue in result['issues'] | |
| if 'string' in issue['message'].lower()), None) | |
| clone_issue = next((issue for issue in result['issues'] | |
| if 'clone' in issue['message'].lower()), None) | |
| iteration_issue = next((issue for issue in result['issues'] | |
| if 'iteration' in issue['message'].lower() or 'bounds checking' in issue['message'].lower()), None) | |
| self.assertIsNotNone(string_issue) | |
| self.assertIsNotNone(clone_issue) | |
| self.assertIsNotNone(iteration_issue) | |
| def test_analyze_repository(self): | |
| """Test analyze_repository method""" | |
| # Mock the language-specific analysis methods | |
| self.analyzer.analyze_python_performance = MagicMock(return_value={ | |
| 'issues': [ | |
| {'file': 'file1.py', 'line': 10, 'message': 'Inefficient list comprehension'}, | |
| {'file': 'file1.py', 'line': 20, 'message': 'Inefficient string concatenation'} | |
| ], | |
| 'issue_count': 2 | |
| }) | |
| self.analyzer.analyze_javascript_performance = MagicMock(return_value={ | |
| 'issues': [ | |
| {'file': 'file1.js', 'line': 15, 'message': 'DOM manipulation in loop'} | |
| ], | |
| 'issue_count': 1 | |
| }) | |
| # Call the method | |
| result = self.analyzer.analyze_repository(self.test_repo_path, ['Python', 'JavaScript']) | |
| # Verify the result | |
| self.assertEqual(len(result['language_results']), 2) # Two languages | |
| self.assertIn('Python', result['language_results']) | |
| self.assertIn('JavaScript', result['language_results']) | |
| self.assertEqual(result['language_results']['Python']['issue_count'], 2) | |
| self.assertEqual(result['language_results']['JavaScript']['issue_count'], 1) | |
| # Check hotspots | |
| self.assertEqual(len(result['hotspots']), 1) # One file with multiple issues | |
| self.assertEqual(result['hotspots'][0]['file'], 'file1.py') | |
| self.assertEqual(result['hotspots'][0]['issue_count'], 2) | |
| # Verify the method calls | |
| self.analyzer.analyze_python_performance.assert_called_once_with(self.test_repo_path) | |
| self.analyzer.analyze_javascript_performance.assert_called_once_with(self.test_repo_path) | |
| def test_identify_hotspots(self): | |
| """Test _identify_hotspots method""" | |
| # Create sample language results | |
| language_results = { | |
| 'Python': { | |
| 'issues': [ | |
| {'file': 'file1.py', 'line': 10, 'message': 'Issue 1'}, | |
| {'file': 'file1.py', 'line': 20, 'message': 'Issue 2'}, | |
| {'file': 'file2.py', 'line': 5, 'message': 'Issue 3'} | |
| ], | |
| 'issue_count': 3 | |
| }, | |
| 'JavaScript': { | |
| 'issues': [ | |
| {'file': 'file1.js', 'line': 15, 'message': 'Issue 4'}, | |
| {'file': 'file3.js', 'line': 25, 'message': 'Issue 5'}, | |
| {'file': 'file3.js', 'line': 30, 'message': 'Issue 6'} | |
| ], | |
| 'issue_count': 3 | |
| } | |
| } | |
| # Call the method | |
| hotspots = self.analyzer._identify_hotspots(language_results) | |
| # Verify the result | |
| self.assertEqual(len(hotspots), 2) # Two files with multiple issues | |
| # Find the hotspots by file | |
| file1_py_hotspot = next((h for h in hotspots if h['file'] == 'file1.py'), None) | |
| file3_js_hotspot = next((h for h in hotspots if h['file'] == 'file3.js'), None) | |
| self.assertIsNotNone(file1_py_hotspot) | |
| self.assertIsNotNone(file3_js_hotspot) | |
| self.assertEqual(file1_py_hotspot['issue_count'], 2) | |
| self.assertEqual(file3_js_hotspot['issue_count'], 2) | |
| def test_find_files(self, mock_walk): | |
| """Test _find_files method""" | |
| # Set up the mock | |
| mock_walk.return_value = [ | |
| ('/test/repo', ['dir1'], ['file1.py', 'file2.js']), | |
| ('/test/repo/dir1', [], ['file3.py']) | |
| ] | |
| # Call the method | |
| python_files = self.analyzer._find_files(self.test_repo_path, '.py') | |
| # Verify the result | |
| self.assertEqual(len(python_files), 2) | |
| self.assertIn('/test/repo/file1.py', python_files) | |
| self.assertIn('/test/repo/dir1/file3.py', python_files) | |
| def test_analyze_file_with_patterns(self): | |
| """Test _analyze_file_with_patterns method""" | |
| # Create sample file content and patterns | |
| file_content = """ | |
| def slow_function(): | |
| # This is a slow list comprehension | |
| result = [x * y for x in range(1000) for y in range(1000)] | |
| # Inefficient string concatenation | |
| s = "" | |
| for i in range(1000): | |
| s += str(i) | |
| """ | |
| patterns = [ | |
| (re.compile(r'\[.*for.*for.*\]', re.MULTILINE), "Nested list comprehension can be inefficient"), | |
| (re.compile(r'\s+s\s\+=\s', re.MULTILINE), "String concatenation in a loop is inefficient") | |
| ] | |
| # Call the method | |
| issues = self.analyzer._analyze_file_with_patterns('/test/repo/test.py', file_content, patterns) | |
| # Verify the result | |
| self.assertEqual(len(issues), 2) # Two patterns matched | |
| self.assertEqual(issues[0]['file'], 'test.py') # Should be relative path | |
| self.assertEqual(issues[1]['file'], 'test.py') | |
| self.assertIn('Nested list comprehension', issues[0]['message']) | |
| self.assertIn('String concatenation', issues[1]['message']) | |
| if __name__ == "__main__": | |
| unittest.main() |