| | """Test the memory leak fixes""" |
| |
|
| | import unittest |
| | import time |
| | import tempfile |
| | import os |
| | from unittest.mock import patch, MagicMock |
| |
|
| | from utils.session_manager import SessionManager |
| | from utils.file_manager import FileManager |
| |
|
| |
|
| | class TestSessionManager(unittest.TestCase): |
| | """Test SessionManager cleanup functionality""" |
| | |
| | def setUp(self): |
| | """Create a session manager for testing""" |
| | self.session_manager = SessionManager(ttl_seconds=5) |
| | |
| | def test_create_session(self): |
| | """Test session creation""" |
| | chain = MagicMock() |
| | session_id = self.session_manager.create_session("test-1", chain) |
| | |
| | self.assertEqual(session_id, "test-1") |
| | self.assertEqual(self.session_manager.get_session_count(), 1) |
| | |
| | def test_get_session(self): |
| | """Test session retrieval""" |
| | chain = MagicMock() |
| | self.session_manager.create_session("test-1", chain) |
| | |
| | retrieved = self.session_manager.get_session("test-1") |
| | self.assertEqual(retrieved, chain) |
| | |
| | def test_session_access_count(self): |
| | """Test access count increments""" |
| | chain = MagicMock() |
| | self.session_manager.create_session("test-1", chain) |
| | |
| | for _ in range(3): |
| | self.session_manager.get_session("test-1") |
| | |
| | info = self.session_manager.get_session_info("test-1") |
| | self.assertEqual(info['access_count'], 3) |
| | |
| | def test_delete_session(self): |
| | """Test manual session deletion""" |
| | chain = MagicMock() |
| | self.session_manager.create_session("test-1", chain) |
| | self.assertEqual(self.session_manager.get_session_count(), 1) |
| | |
| | deleted = self.session_manager.delete_session("test-1") |
| | self.assertTrue(deleted) |
| | self.assertEqual(self.session_manager.get_session_count(), 0) |
| | |
| | def test_session_expiration(self): |
| | """Test automatic session expiration""" |
| | chain = MagicMock() |
| | self.session_manager.create_session("test-1", chain) |
| | |
| | |
| | self.assertEqual(self.session_manager.get_session_count(), 1) |
| | |
| | |
| | with patch('time.time') as mock_time: |
| | |
| | current = time.time() |
| | mock_time.return_value = current + 10 |
| | |
| | |
| | self.session_manager._cleanup_expired_sessions() |
| | |
| | |
| | self.assertEqual(self.session_manager.get_session_count(), 0) |
| | |
| | def test_get_session_not_found(self): |
| | """Test getting non-existent session""" |
| | result = self.session_manager.get_session("nonexistent") |
| | self.assertIsNone(result) |
| | |
| | def test_memory_stats(self): |
| | """Test memory statistics""" |
| | chain = MagicMock() |
| | self.session_manager.create_session("test-1", chain) |
| | self.session_manager.create_session("test-2", chain) |
| | |
| | stats = self.session_manager.get_memory_stats() |
| | |
| | self.assertEqual(stats['total_sessions'], 2) |
| | self.assertEqual(stats['ttl_seconds'], 5) |
| | self.assertGreater(stats['total_accesses'], 0) |
| |
|
| |
|
| | class TestFileManager(unittest.TestCase): |
| | """Test FileManager cleanup functionality""" |
| | |
| | def setUp(self): |
| | """Create a temporary directory for testing""" |
| | self.temp_dir = tempfile.mkdtemp() |
| | self.file_manager = FileManager(self.temp_dir, max_age_seconds=5) |
| | |
| | def tearDown(self): |
| | """Clean up temporary directory""" |
| | import shutil |
| | if os.path.exists(self.temp_dir): |
| | shutil.rmtree(self.temp_dir) |
| | |
| | def test_register_file(self): |
| | """Test file registration""" |
| | |
| | test_file = os.path.join(self.temp_dir, "test.txt") |
| | with open(test_file, 'w') as f: |
| | f.write("test content") |
| | |
| | |
| | self.file_manager.register_file("session-1", test_file) |
| | |
| | files = self.file_manager.get_session_files("session-1") |
| | self.assertEqual(len(files), 1) |
| | self.assertEqual(files[0]['path'], test_file) |
| | |
| | def test_cleanup_session_files(self): |
| | """Test session file cleanup""" |
| | |
| | test_file = os.path.join(self.temp_dir, "test.txt") |
| | with open(test_file, 'w') as f: |
| | f.write("test content") |
| | |
| | self.file_manager.register_file("session-1", test_file) |
| | |
| | |
| | self.assertTrue(os.path.exists(test_file)) |
| | |
| | |
| | count = self.file_manager.cleanup_session_files("session-1") |
| | |
| | self.assertEqual(count, 1) |
| | self.assertFalse(os.path.exists(test_file)) |
| | |
| | def test_disk_usage(self): |
| | """Test disk usage calculation""" |
| | |
| | test_file = os.path.join(self.temp_dir, "test.txt") |
| | with open(test_file, 'w') as f: |
| | f.write("test content" * 1000) |
| | |
| | usage = self.file_manager.get_disk_usage() |
| | |
| | self.assertEqual(usage['file_count'], 1) |
| | self.assertGreater(usage['total_bytes'], 0) |
| | self.assertGreater(usage['total_mb'], 0) |
| | |
| | def test_cleanup_old_files(self): |
| | """Test cleanup of old files""" |
| | |
| | test_file = os.path.join(self.temp_dir, "test.txt") |
| | with open(test_file, 'w') as f: |
| | f.write("old content") |
| | |
| | |
| | usage = self.file_manager.get_disk_usage() |
| | self.assertEqual(usage['file_count'], 1) |
| | |
| | |
| | with patch('time.time') as mock_time: |
| | current = time.time() |
| | |
| | mock_time.return_value = current + 10 |
| | |
| | |
| | os.utime(test_file, (current + 10, current + 10)) |
| | |
| | |
| | result = self.file_manager.cleanup_old_files() |
| | |
| | self.assertEqual(result['deleted'], 1) |
| | self.assertFalse(os.path.exists(test_file)) |
| |
|
| |
|
| | class TestIntegration(unittest.TestCase): |
| | """Integration tests for session and file managers""" |
| | |
| | def setUp(self): |
| | """Setup for integration tests""" |
| | self.temp_dir = tempfile.mkdtemp() |
| | self.session_manager = SessionManager(ttl_seconds=5) |
| | self.file_manager = FileManager(self.temp_dir, max_age_seconds=5) |
| | |
| | def tearDown(self): |
| | """Cleanup""" |
| | import shutil |
| | if os.path.exists(self.temp_dir): |
| | shutil.rmtree(self.temp_dir) |
| | |
| | def test_session_with_files(self): |
| | """Test session creation with associated files""" |
| | chain = MagicMock() |
| | |
| | |
| | test_file = os.path.join(self.temp_dir, "doc.pdf") |
| | with open(test_file, 'w') as f: |
| | f.write("pdf content") |
| | |
| | |
| | session_id = self.session_manager.create_session("session-1", chain) |
| | self.file_manager.register_file(session_id, test_file) |
| | |
| | |
| | self.assertEqual(self.session_manager.get_session_count(), 1) |
| | files = self.file_manager.get_session_files(session_id) |
| | self.assertEqual(len(files), 1) |
| | |
| | |
| | self.session_manager.delete_session(session_id) |
| | self.file_manager.cleanup_session_files(session_id) |
| | |
| | |
| | self.assertEqual(self.session_manager.get_session_count(), 0) |
| | self.assertFalse(os.path.exists(test_file)) |
| | |
| | def test_multiple_sessions(self): |
| | """Test managing multiple sessions""" |
| | chains = [MagicMock() for _ in range(3)] |
| | |
| | |
| | for i, chain in enumerate(chains): |
| | self.session_manager.create_session(f"session-{i}", chain) |
| | |
| | self.assertEqual(self.session_manager.get_session_count(), 3) |
| | |
| | |
| | all_info = self.session_manager.get_all_sessions_info() |
| | self.assertEqual(len(all_info), 3) |
| | |
| | |
| | self.session_manager.delete_session("session-1") |
| | self.assertEqual(self.session_manager.get_session_count(), 2) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | unittest.main(verbosity=2) |
| |
|