"""Test the memory leak fixes""" import unittest import time import tempfile import os from unittest.mock import patch, MagicMock from utils.session_manager import SessionManager from utils.file_manager import FileManager class TestSessionManager(unittest.TestCase): """Test SessionManager cleanup functionality""" def setUp(self): """Create a session manager for testing""" self.session_manager = SessionManager(ttl_seconds=5) # 5 second TTL for testing def test_create_session(self): """Test session creation""" chain = MagicMock() session_id = self.session_manager.create_session("test-1", chain) self.assertEqual(session_id, "test-1") self.assertEqual(self.session_manager.get_session_count(), 1) def test_get_session(self): """Test session retrieval""" chain = MagicMock() self.session_manager.create_session("test-1", chain) retrieved = self.session_manager.get_session("test-1") self.assertEqual(retrieved, chain) def test_session_access_count(self): """Test access count increments""" chain = MagicMock() self.session_manager.create_session("test-1", chain) for _ in range(3): self.session_manager.get_session("test-1") info = self.session_manager.get_session_info("test-1") self.assertEqual(info['access_count'], 3) def test_delete_session(self): """Test manual session deletion""" chain = MagicMock() self.session_manager.create_session("test-1", chain) self.assertEqual(self.session_manager.get_session_count(), 1) deleted = self.session_manager.delete_session("test-1") self.assertTrue(deleted) self.assertEqual(self.session_manager.get_session_count(), 0) def test_session_expiration(self): """Test automatic session expiration""" chain = MagicMock() self.session_manager.create_session("test-1", chain) # Verify session exists self.assertEqual(self.session_manager.get_session_count(), 1) # Simulate time passing beyond TTL with patch('time.time') as mock_time: # Set time to now + 10 seconds (beyond 5 second TTL) current = time.time() mock_time.return_value = current + 10 # Manually run cleanup self.session_manager._cleanup_expired_sessions() # Session should be gone self.assertEqual(self.session_manager.get_session_count(), 0) def test_get_session_not_found(self): """Test getting non-existent session""" result = self.session_manager.get_session("nonexistent") self.assertIsNone(result) def test_memory_stats(self): """Test memory statistics""" chain = MagicMock() self.session_manager.create_session("test-1", chain) self.session_manager.create_session("test-2", chain) stats = self.session_manager.get_memory_stats() self.assertEqual(stats['total_sessions'], 2) self.assertEqual(stats['ttl_seconds'], 5) self.assertGreater(stats['total_accesses'], 0) class TestFileManager(unittest.TestCase): """Test FileManager cleanup functionality""" def setUp(self): """Create a temporary directory for testing""" self.temp_dir = tempfile.mkdtemp() self.file_manager = FileManager(self.temp_dir, max_age_seconds=5) def tearDown(self): """Clean up temporary directory""" import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) def test_register_file(self): """Test file registration""" # Create a test file test_file = os.path.join(self.temp_dir, "test.txt") with open(test_file, 'w') as f: f.write("test content") # Register it self.file_manager.register_file("session-1", test_file) files = self.file_manager.get_session_files("session-1") self.assertEqual(len(files), 1) self.assertEqual(files[0]['path'], test_file) def test_cleanup_session_files(self): """Test session file cleanup""" # Create test files test_file = os.path.join(self.temp_dir, "test.txt") with open(test_file, 'w') as f: f.write("test content") self.file_manager.register_file("session-1", test_file) # Verify file exists self.assertTrue(os.path.exists(test_file)) # Clean up count = self.file_manager.cleanup_session_files("session-1") self.assertEqual(count, 1) self.assertFalse(os.path.exists(test_file)) def test_disk_usage(self): """Test disk usage calculation""" # Create test file test_file = os.path.join(self.temp_dir, "test.txt") with open(test_file, 'w') as f: f.write("test content" * 1000) usage = self.file_manager.get_disk_usage() self.assertEqual(usage['file_count'], 1) self.assertGreater(usage['total_bytes'], 0) self.assertGreater(usage['total_mb'], 0) def test_cleanup_old_files(self): """Test cleanup of old files""" # Create a test file test_file = os.path.join(self.temp_dir, "test.txt") with open(test_file, 'w') as f: f.write("old content") # Verify initial state usage = self.file_manager.get_disk_usage() self.assertEqual(usage['file_count'], 1) # Simulate file aging with patch('time.time') as mock_time: current = time.time() # Set time to now + 10 seconds (beyond 5 second max_age) mock_time.return_value = current + 10 # Update file mtime os.utime(test_file, (current + 10, current + 10)) # Manually run cleanup result = self.file_manager.cleanup_old_files() self.assertEqual(result['deleted'], 1) self.assertFalse(os.path.exists(test_file)) class TestIntegration(unittest.TestCase): """Integration tests for session and file managers""" def setUp(self): """Setup for integration tests""" self.temp_dir = tempfile.mkdtemp() self.session_manager = SessionManager(ttl_seconds=5) self.file_manager = FileManager(self.temp_dir, max_age_seconds=5) def tearDown(self): """Cleanup""" import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) def test_session_with_files(self): """Test session creation with associated files""" chain = MagicMock() # Create test file test_file = os.path.join(self.temp_dir, "doc.pdf") with open(test_file, 'w') as f: f.write("pdf content") # Create session session_id = self.session_manager.create_session("session-1", chain) self.file_manager.register_file(session_id, test_file) # Verify both exist self.assertEqual(self.session_manager.get_session_count(), 1) files = self.file_manager.get_session_files(session_id) self.assertEqual(len(files), 1) # Clean up session self.session_manager.delete_session(session_id) self.file_manager.cleanup_session_files(session_id) # Verify cleanup self.assertEqual(self.session_manager.get_session_count(), 0) self.assertFalse(os.path.exists(test_file)) def test_multiple_sessions(self): """Test managing multiple sessions""" chains = [MagicMock() for _ in range(3)] # Create multiple sessions for i, chain in enumerate(chains): self.session_manager.create_session(f"session-{i}", chain) self.assertEqual(self.session_manager.get_session_count(), 3) # Get all session info all_info = self.session_manager.get_all_sessions_info() self.assertEqual(len(all_info), 3) # Delete one session self.session_manager.delete_session("session-1") self.assertEqual(self.session_manager.get_session_count(), 2) if __name__ == '__main__': unittest.main(verbosity=2)