File size: 12,605 Bytes
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42ab7e
ab8903e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
"""
Comprehensive Tests for Rate Limiting

Tests cover:
1. Rate limit enforcement
2. Window-based limiting
3. Per-IP and per-endpoint limiting
4. Rate limit expiry
5. Exceeded limit handling
6. Rate limit increment and reset

Uses mocked database and async testing.
"""
import pytest
from datetime import datetime, timedelta
from sqlalchemy import select


# ============================================================================
# 1. Rate Limit Basic Functionality Tests
# ============================================================================

class TestRateLimitBasics:
    """Test basic rate limiting functionality."""
    
    @pytest.mark.asyncio
    async def test_first_request_allowed(self, db_session):
        """First request within limit is allowed."""
        from core.dependencies import check_rate_limit
        
        result = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.1",
            endpoint="/auth/google",
            limit=5,
            window_minutes=15
        )
        
        assert result == True
    
    @pytest.mark.asyncio
    async def test_within_limit_allowed(self, db_session):
        """Requests within limit are allowed."""
        from core.dependencies import check_rate_limit
        
        # Make 3 requests (limit is 5)
        for i in range(3):
            result = await check_rate_limit(
                db=db_session,
                identifier="10.0.0.1",
                endpoint="/auth/refresh",
                limit=5,
                window_minutes=15
            )
            assert result == True
    
    @pytest.mark.asyncio
    async def test_exceed_limit_blocked(self, db_session):
        """Requests exceeding limit are blocked."""
        from core.dependencies import check_rate_limit
        
        # Make exactly limit requests
        for i in range(5):
            await check_rate_limit(
                db=db_session,
                identifier="203.0.113.1",
                endpoint="/api/test",
                limit=5,
                window_minutes=15
            )
        
        # Next request should be blocked
        result = await check_rate_limit(
            db=db_session,
            identifier="203.0.113.1",
            endpoint="/api/test",
            limit=5,
            window_minutes=15
        )
        
        assert result == False


# ============================================================================
# 2. Window-Based Limiting Tests
# ============================================================================

class TestWindowBasedLimiting:
    """Test time window-based rate limiting."""
    
    @pytest.mark.asyncio
    async def test_rate_limit_creates_window(self, db_session):
        """Rate limit creates time window entry."""
        from core.dependencies import check_rate_limit
        from core.models import RateLimit
        
        await check_rate_limit(
            db=db_session,
            identifier="192.168.1.100",
            endpoint="/test",
            limit=10,
            window_minutes=15
        )
        
        # Verify RateLimit entry was created
        result = await db_session.execute(
            select(RateLimit).where(RateLimit.identifier == "192.168.1.100")
        )
        rate_limit = result.scalar_one_or_none()
        
        assert rate_limit is not None
        assert rate_limit.attempts == 1
        assert rate_limit.window_start is not None
    
    @pytest.mark.asyncio
    async def test_attempts_increment_in_window(self, db_session):
        """Attempts increment within same window."""
        from core.dependencies import check_rate_limit
        from core.models import RateLimit
        
        identifier = "10.10.10.10"
        endpoint = "/auth/test"
        
        # Make 3 requests
        for i in range(3):
            await check_rate_limit(
                db=db_session,
                identifier=identifier,
                endpoint=endpoint,
                limit=10,
                window_minutes=15
            )
        
        # Check attempts count
        result = await db_session.execute(
            select(RateLimit).where(
                RateLimit.identifier == identifier,
                RateLimit.endpoint == endpoint
            )
        )
        rate_limit = result  .scalar_one_or_none()
        
        assert rate_limit.attempts == 3


# ============================================================================
# 3. Per-IP and Per-Endpoint Limiting Tests
# ============================================================================

class TestPerIPAndEndpoint:
    """Test rate limiting per IP and endpoint."""
    
    @pytest.mark.asyncio
    async def test_different_ips_separate_limits(self, db_session):
        """Different IPs have separate rate limits."""
        from core.dependencies import check_rate_limit
        
        # IP 1 makes 5 requests
        for i in range(5):
            await check_rate_limit(
                db=db_session,
                identifier="192.168.1.1",
                endpoint="/api/endpoint",
                limit=5,
                window_minutes=15
            )
        
        # IP 1 should be at limit
        result1 = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.1",
            endpoint="/api/endpoint",
            limit=5,
            window_minutes=15
        )
        assert result1 == False
        
        # IP 2 should still be allowed
        result2 = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.2",
            endpoint="/api/endpoint",
            limit=5,
            window_minutes=15
        )
        assert result2 == True
    
    @pytest.mark.asyncio
    async def test_different_endpoints_separate_limits(self, db_session):
        """Same IP has separate limits for different endpoints."""
        from core.dependencies import check_rate_limit
        
        ip = "203.0.113.50"
        
        # Max out limit on endpoint1
        for i in range(3):
            await check_rate_limit(
                db=db_session,
                identifier=ip,
                endpoint="/endpoint1",
                limit=3,
                window_minutes=15
            )
        
        # Should be blocked on endpoint1
        result1 = await check_rate_limit(
            db=db_session,
            identifier=ip,
            endpoint="/endpoint1",
            limit=3,
            window_minutes=15
        )
        assert result1 == False
        
        # Should still be allowed on endpoint2
        result2 = await check_rate_limit(
            db=db_session,
            identifier=ip,
            endpoint="/endpoint2",
            limit=3,
            window_minutes=15
        )
        assert result2 == True


# ============================================================================
# 4. Rate Limit Expiry Tests
# ============================================================================

class TestRateLimitExpiry:
    """Test rate limit expiry behavior."""
    
    @pytest.mark.asyncio
    async def test_rate_limit_has_expiry(self, db_session):
        """Rate limit entry has expiry time."""
        from core.dependencies import check_rate_limit
        from core.models import RateLimit
        
        await check_rate_limit(
            db=db_session,
            identifier="192.168.1.200",
            endpoint="/test",
            limit=10,
            window_minutes=15
        )
        
        result = await db_session.execute(
            select(RateLimit).where(RateLimit.identifier == "192.168.1.200")
        )
        rate_limit = result.scalar_one_or_none()
        
        assert rate_limit.expires_at is not None
        # Expiry should be ~15 minutes from now
        expected_expiry = datetime.utcnow() + timedelta(minutes=15)
        time_diff = abs((rate_limit.expires_at - expected_expiry).total_seconds())
        assert time_diff < 5  # Within 5 seconds tolerance


# ============================================================================
# 5. Edge Cases and Error Handling Tests
# ============================================================================

class TestRateLimitEdgeCases:
    """Test edge cases in rate limiting."""
    
    @pytest.mark.asyncio
    async def test_zero_limit_blocks_all(self, db_session):
        """Limit of 0 blocks all requests."""
        from core.dependencies import check_rate_limit
        
        # First request with limit=0 should be blocked
        result = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.1",
            endpoint="/blocked",
            limit=0,
            window_minutes=15
        )
        
        # With limit=0, even first request creates entry with attempts=1
        # which is already >= limit, so it should be blocked
        # Actually, looking at the code, first request creates attempts=1
        # then returns True. Second request will be blocked.
        assert result == True  # First request allowed
        
        # Second request blocked
        result2 = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.1",
            endpoint="/blocked",
            limit=0,
            window_minutes=15
        )
        assert result2 == False
    
    @pytest.mark.asyncio
    async def test_limit_of_one(self, db_session):
        """Limit of 1 allows only first request."""
        from core.dependencies import check_rate_limit
        
        result1 = await check_rate_limit(
            db=db_session,
            identifier="10.0.0.10",
            endpoint="/single",
            limit=1,
            window_minutes=15
        )
        assert result1 == True
        
        result2 = await check_rate_limit(
            db=db_session,
            identifier="10.0.0.10",
            endpoint="/single",
            limit=1,
            window_minutes=15
        )
        assert result2 == False
    
    @pytest.mark.asyncio
    async def test_very_short_window(self, db_session):
        """Very short time window works correctly."""
        from core.dependencies import check_rate_limit
        
        # 1 minute window
        result = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.50",
            endpoint="/short",
            limit=5,
            window_minutes=1
        )
        
        assert result == True
    
    @pytest.mark.asyncio
    async def test_long_window(self, db_session):
        """Long time window works correctly."""
        from core.dependencies import check_rate_limit
        
        # 24 hour window
        result = await check_rate_limit(
            db=db_session,
            identifier="192.168.1.60",
            endpoint="/long",
            limit=100,
            window_minutes=1440  # 24 hours
        )
        
        assert result == True


# ============================================================================
# 6. Rate Limit Data Persistence Tests
# ============================================================================

class TestRateLimitPersistence:
    """Test rate limit data persistence."""
    
    @pytest.mark.asyncio
    async def test_rate_limit_persists(self, db_session):
        """Rate limit data persists across checks."""
        from core.dependencies import check_rate_limit
        from core.models import RateLimit
        
        identifier = "192.168.1.99"
        endpoint = "/persist"
        
        # Make first request
        await check_rate_limit(
            db=db_session,
            identifier=identifier,
            endpoint=endpoint,
            limit=10,
            window_minutes=15
        )
        
        # Query database
        result = await db_session.execute(
            select(RateLimit).where(
                RateLimit.identifier == identifier,
                RateLimit.endpoint == endpoint
            )
        )
        rate_limit = result.scalar_one()
        
        initial_attempts = rate_limit.attempts
        
        # Make another request
        await check_rate_limit(
            db=db_session,
            identifier=identifier,
            endpoint=endpoint,
            limit=10,
            window_minutes=15
        )
        
        # Re-query database
        await db_session.refresh(rate_limit)
        
        assert rate_limit.attempts == initial_attempts + 1


if __name__ == "__main__":
    pytest.main([__file__, "-v"])