File size: 8,527 Bytes
ae279de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""Test the memory leak fixes"""

import unittest
import time
import tempfile
import os
from unittest.mock import patch, MagicMock

from utils.session_manager import SessionManager
from utils.file_manager import FileManager


class TestSessionManager(unittest.TestCase):
    """Test SessionManager cleanup functionality"""
    
    def setUp(self):
        """Create a session manager for testing"""
        self.session_manager = SessionManager(ttl_seconds=5)  # 5 second TTL for testing
    
    def test_create_session(self):
        """Test session creation"""
        chain = MagicMock()
        session_id = self.session_manager.create_session("test-1", chain)
        
        self.assertEqual(session_id, "test-1")
        self.assertEqual(self.session_manager.get_session_count(), 1)
    
    def test_get_session(self):
        """Test session retrieval"""
        chain = MagicMock()
        self.session_manager.create_session("test-1", chain)
        
        retrieved = self.session_manager.get_session("test-1")
        self.assertEqual(retrieved, chain)
    
    def test_session_access_count(self):
        """Test access count increments"""
        chain = MagicMock()
        self.session_manager.create_session("test-1", chain)
        
        for _ in range(3):
            self.session_manager.get_session("test-1")
        
        info = self.session_manager.get_session_info("test-1")
        self.assertEqual(info['access_count'], 3)
    
    def test_delete_session(self):
        """Test manual session deletion"""
        chain = MagicMock()
        self.session_manager.create_session("test-1", chain)
        self.assertEqual(self.session_manager.get_session_count(), 1)
        
        deleted = self.session_manager.delete_session("test-1")
        self.assertTrue(deleted)
        self.assertEqual(self.session_manager.get_session_count(), 0)
    
    def test_session_expiration(self):
        """Test automatic session expiration"""
        chain = MagicMock()
        self.session_manager.create_session("test-1", chain)
        
        # Verify session exists
        self.assertEqual(self.session_manager.get_session_count(), 1)
        
        # Simulate time passing beyond TTL
        with patch('time.time') as mock_time:
            # Set time to now + 10 seconds (beyond 5 second TTL)
            current = time.time()
            mock_time.return_value = current + 10
            
            # Manually run cleanup
            self.session_manager._cleanup_expired_sessions()
        
        # Session should be gone
        self.assertEqual(self.session_manager.get_session_count(), 0)
    
    def test_get_session_not_found(self):
        """Test getting non-existent session"""
        result = self.session_manager.get_session("nonexistent")
        self.assertIsNone(result)
    
    def test_memory_stats(self):
        """Test memory statistics"""
        chain = MagicMock()
        self.session_manager.create_session("test-1", chain)
        self.session_manager.create_session("test-2", chain)
        
        stats = self.session_manager.get_memory_stats()
        
        self.assertEqual(stats['total_sessions'], 2)
        self.assertEqual(stats['ttl_seconds'], 5)
        self.assertGreater(stats['total_accesses'], 0)


class TestFileManager(unittest.TestCase):
    """Test FileManager cleanup functionality"""
    
    def setUp(self):
        """Create a temporary directory for testing"""
        self.temp_dir = tempfile.mkdtemp()
        self.file_manager = FileManager(self.temp_dir, max_age_seconds=5)
    
    def tearDown(self):
        """Clean up temporary directory"""
        import shutil
        if os.path.exists(self.temp_dir):
            shutil.rmtree(self.temp_dir)
    
    def test_register_file(self):
        """Test file registration"""
        # Create a test file
        test_file = os.path.join(self.temp_dir, "test.txt")
        with open(test_file, 'w') as f:
            f.write("test content")
        
        # Register it
        self.file_manager.register_file("session-1", test_file)
        
        files = self.file_manager.get_session_files("session-1")
        self.assertEqual(len(files), 1)
        self.assertEqual(files[0]['path'], test_file)
    
    def test_cleanup_session_files(self):
        """Test session file cleanup"""
        # Create test files
        test_file = os.path.join(self.temp_dir, "test.txt")
        with open(test_file, 'w') as f:
            f.write("test content")
        
        self.file_manager.register_file("session-1", test_file)
        
        # Verify file exists
        self.assertTrue(os.path.exists(test_file))
        
        # Clean up
        count = self.file_manager.cleanup_session_files("session-1")
        
        self.assertEqual(count, 1)
        self.assertFalse(os.path.exists(test_file))
    
    def test_disk_usage(self):
        """Test disk usage calculation"""
        # Create test file
        test_file = os.path.join(self.temp_dir, "test.txt")
        with open(test_file, 'w') as f:
            f.write("test content" * 1000)
        
        usage = self.file_manager.get_disk_usage()
        
        self.assertEqual(usage['file_count'], 1)
        self.assertGreater(usage['total_bytes'], 0)
        self.assertGreater(usage['total_mb'], 0)
    
    def test_cleanup_old_files(self):
        """Test cleanup of old files"""
        # Create a test file
        test_file = os.path.join(self.temp_dir, "test.txt")
        with open(test_file, 'w') as f:
            f.write("old content")
        
        # Verify initial state
        usage = self.file_manager.get_disk_usage()
        self.assertEqual(usage['file_count'], 1)
        
        # Simulate file aging
        with patch('time.time') as mock_time:
            current = time.time()
            # Set time to now + 10 seconds (beyond 5 second max_age)
            mock_time.return_value = current + 10
            
            # Update file mtime
            os.utime(test_file, (current + 10, current + 10))
            
            # Manually run cleanup
            result = self.file_manager.cleanup_old_files()
        
        self.assertEqual(result['deleted'], 1)
        self.assertFalse(os.path.exists(test_file))


class TestIntegration(unittest.TestCase):
    """Integration tests for session and file managers"""
    
    def setUp(self):
        """Setup for integration tests"""
        self.temp_dir = tempfile.mkdtemp()
        self.session_manager = SessionManager(ttl_seconds=5)
        self.file_manager = FileManager(self.temp_dir, max_age_seconds=5)
    
    def tearDown(self):
        """Cleanup"""
        import shutil
        if os.path.exists(self.temp_dir):
            shutil.rmtree(self.temp_dir)
    
    def test_session_with_files(self):
        """Test session creation with associated files"""
        chain = MagicMock()
        
        # Create test file
        test_file = os.path.join(self.temp_dir, "doc.pdf")
        with open(test_file, 'w') as f:
            f.write("pdf content")
        
        # Create session
        session_id = self.session_manager.create_session("session-1", chain)
        self.file_manager.register_file(session_id, test_file)
        
        # Verify both exist
        self.assertEqual(self.session_manager.get_session_count(), 1)
        files = self.file_manager.get_session_files(session_id)
        self.assertEqual(len(files), 1)
        
        # Clean up session
        self.session_manager.delete_session(session_id)
        self.file_manager.cleanup_session_files(session_id)
        
        # Verify cleanup
        self.assertEqual(self.session_manager.get_session_count(), 0)
        self.assertFalse(os.path.exists(test_file))
    
    def test_multiple_sessions(self):
        """Test managing multiple sessions"""
        chains = [MagicMock() for _ in range(3)]
        
        # Create multiple sessions
        for i, chain in enumerate(chains):
            self.session_manager.create_session(f"session-{i}", chain)
        
        self.assertEqual(self.session_manager.get_session_count(), 3)
        
        # Get all session info
        all_info = self.session_manager.get_all_sessions_info()
        self.assertEqual(len(all_info), 3)
        
        # Delete one session
        self.session_manager.delete_session("session-1")
        self.assertEqual(self.session_manager.get_session_count(), 2)


if __name__ == '__main__':
    unittest.main(verbosity=2)