jebin2 commited on
Commit
5031c18
·
1 Parent(s): 5ed641c

feat: add token expiry integration tests (9 tests, all passing)

Browse files
tests/test_token_expiry_integration.py ADDED
@@ -0,0 +1,473 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Integration Tests for Token Expiry
3
+
4
+ End-to-end tests for token expiry behavior including:
5
+ - Token expiry timing
6
+ - Automatic token refresh flow
7
+ - Environment-based configuration
8
+ - Cookie vs JSON token handling
9
+ """
10
+ import pytest
11
+ import time
12
+ from datetime import datetime, timedelta
13
+ from unittest.mock import patch, MagicMock, AsyncMock
14
+ from fastapi.testclient import TestClient
15
+
16
+
17
+ # ============================================================================
18
+ # Token Expiry Integration Tests
19
+ # ============================================================================
20
+
21
+ class TestTokenExpiryIntegration:
22
+ """Test end-to-end token expiry behavior."""
23
+
24
+ def test_token_expires_after_configured_time(self, monkeypatch):
25
+ """Token becomes invalid after expiry time."""
26
+ from services.auth_service.jwt_provider import JWTService
27
+
28
+ # Set very short expiry for testing
29
+ service = JWTService(
30
+ secret_key="test-secret",
31
+ access_expiry_minutes=0.01 # ~0.6 seconds
32
+ )
33
+
34
+ # Create token
35
+ token = service.create_access_token("usr_123", "test@example.com")
36
+
37
+ # Token should be valid immediately
38
+ payload = service.verify_token(token)
39
+ assert payload.user_id == "usr_123"
40
+
41
+ # Token should be expired
42
+ from services.auth_service.jwt_provider import TokenExpiredError
43
+ with pytest.raises(TokenExpiredError):
44
+ service.verify_token(token)
45
+
46
+ def test_env_variable_controls_expiry(self, monkeypatch):
47
+ """JWT_ACCESS_EXPIRY_MINUTES env var controls token lifetime."""
48
+ monkeypatch.setenv("JWT_SECRET", "test-secret")
49
+ monkeypatch.setenv("JWT_ACCESS_EXPIRY_MINUTES", "30")
50
+
51
+ # Reset singleton
52
+ import services.auth_service.jwt_provider as jwt_module
53
+ jwt_module._default_service = None
54
+
55
+ from services.auth_service.jwt_provider import create_access_token, verify_access_token
56
+
57
+ before = datetime.utcnow()
58
+ token = create_access_token("usr_123", "test@example.com")
59
+
60
+ payload = verify_access_token(token)
61
+
62
+ # Expiry should be ~30 minutes from now
63
+ expected_expiry = before + timedelta(minutes=30)
64
+ time_diff = abs((payload.expires_at - expected_expiry).total_seconds())
65
+
66
+ assert time_diff < 5 # Within 5 seconds tolerance
67
+
68
+ def test_refresh_token_longer_expiry(self, monkeypatch):
69
+ """Refresh tokens have longer expiry than access tokens."""
70
+ from services.auth_service.jwt_provider import JWTService
71
+
72
+ service = JWTService(
73
+ secret_key="test-secret",
74
+ access_expiry_minutes=15,
75
+ refresh_expiry_days=7
76
+ )
77
+
78
+ access_token = service.create_access_token("usr_123", "test@example.com")
79
+ refresh_token = service.create_refresh_token("usr_123", "test@example.com")
80
+
81
+ access_payload = service.verify_token(access_token)
82
+ refresh_payload = service.verify_token(refresh_token)
83
+
84
+ access_lifetime = (access_payload.expires_at - access_payload.issued_at).total_seconds()
85
+ refresh_lifetime = (refresh_payload.expires_at - refresh_payload.issued_at).total_seconds()
86
+
87
+ # Refresh token should have significantly longer lifetime
88
+ assert refresh_lifetime > access_lifetime * 10
89
+
90
+
91
+ class TestTokenRefreshFlow:
92
+ """Test automatic token refresh flow."""
93
+
94
+ def test_refresh_before_expiry(self):
95
+ """Refreshing before expiry issues new valid token."""
96
+ from routers.auth import router
97
+ from fastapi import FastAPI
98
+ from core.database import get_db
99
+ from core.models import User
100
+ from services.auth_service.jwt_provider import create_refresh_token
101
+
102
+ app = FastAPI()
103
+
104
+ # Create refresh token
105
+ refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1)
106
+
107
+ mock_user = MagicMock(spec=User)
108
+ mock_user.user_id = "usr_123"
109
+ mock_user.email = "test@example.com"
110
+ mock_user.token_version = 1
111
+
112
+ async def mock_get_db():
113
+ mock_db = AsyncMock()
114
+ mock_result = MagicMock()
115
+ mock_result.scalar_one_or_none.return_value = mock_user
116
+ mock_db.execute.return_value = mock_result
117
+ yield mock_db
118
+
119
+ app.dependency_overrides[get_db] = mock_get_db
120
+ app.include_router(router)
121
+ client = TestClient(app)
122
+
123
+ with patch('routers.auth.check_rate_limit', return_value=True):
124
+ response = client.post(
125
+ "/auth/refresh",
126
+ json={"token": refresh_token}
127
+ )
128
+
129
+ assert response.status_code == 200
130
+ data = response.json()
131
+ assert "access_token" in data
132
+ assert "refresh_token" in data
133
+
134
+ # New access token should be different (different iat time)
135
+ # Note: Refresh tokens might be identical if created in same second,
136
+ # so we just verify both tokens exist
137
+
138
+ def test_refresh_with_expired_access_token(self):
139
+ """Can refresh even if access token expired (using refresh token)."""
140
+ from routers.auth import router
141
+ from fastapi import FastAPI
142
+ from core.database import get_db
143
+ from core.models import User
144
+ from services.auth_service.jwt_provider import JWTService
145
+
146
+ app = FastAPI()
147
+
148
+ # Create access token that expires immediately
149
+ service = JWTService(
150
+ secret_key="test-secret",
151
+ access_expiry_minutes=0.01 # ~0.6 seconds
152
+ )
153
+
154
+ access_token = service.create_access_token("usr_123", "test@example.com")
155
+ refresh_token = service.create_refresh_token("usr_123", "test@example.com", token_version=1)
156
+
157
+ # Wait for access token to expire
158
+ time.sleep(1)
159
+
160
+ # Access token should be expired
161
+ from services.auth_service.jwt_provider import TokenExpiredError
162
+ with pytest.raises(TokenExpiredError):
163
+ service.verify_token(access_token)
164
+
165
+ # But refresh token should still work
166
+ mock_user = MagicMock(spec=User)
167
+ mock_user.user_id = "usr_123"
168
+ mock_user.email = "test@example.com"
169
+ mock_user.token_version = 1
170
+
171
+ async def mock_get_db():
172
+ mock_db = AsyncMock()
173
+ mock_result = MagicMock()
174
+ mock_result.scalar_one_or_none.return_value = mock_user
175
+ mock_db.execute.return_value = mock_result
176
+ yield mock_db
177
+
178
+ app.dependency_overrides[get_db] = mock_get_db
179
+ app.include_router(router)
180
+ client = TestClient(app)
181
+
182
+ with patch('routers.auth.check_rate_limit', return_value=True):
183
+ response = client.post(
184
+ "/auth/refresh",
185
+ json={"token": refresh_token}
186
+ )
187
+
188
+ assert response.status_code == 200
189
+ # Should get new access token
190
+ assert "access_token" in response.json()
191
+
192
+
193
+ class TestTokenVersioning:
194
+ """Test token versioning for logout/invalidation."""
195
+
196
+ def test_logout_invalidates_all_tokens(self):
197
+ """Logout increments version, invalidating all existing tokens."""
198
+ from routers.auth import router
199
+ from fastapi import FastAPI
200
+ from dependencies import get_current_user
201
+ from core.database import get_db
202
+ from core.models import User
203
+ from services.auth_service.jwt_provider import create_access_token, create_refresh_token
204
+
205
+ app = FastAPI()
206
+
207
+ # Create user with version 1
208
+ mock_user = MagicMock(spec=User)
209
+ mock_user.id = 1
210
+ mock_user.user_id = "usr_123"
211
+ mock_user.email = "test@example.com"
212
+ mock_user.token_version = 1
213
+
214
+ # Create tokens with version 1
215
+ access_token = create_access_token("usr_123", "test@example.com", token_version=1)
216
+ refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1)
217
+
218
+ async def mock_get_db():
219
+ mock_db = AsyncMock()
220
+ mock_result = MagicMock()
221
+ mock_result.scalar_one_or_none.return_value = mock_user
222
+ mock_db.execute.return_value = mock_result
223
+ yield mock_db
224
+
225
+ app.dependency_overrides[get_current_user] = lambda: mock_user
226
+ app.dependency_overrides[get_db] = mock_get_db
227
+ app.include_router(router)
228
+ client = TestClient(app)
229
+
230
+ # Logout
231
+ with patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
232
+ patch('services.backup_service.get_backup_service'):
233
+ response = client.post("/auth/logout")
234
+
235
+ assert response.status_code == 200
236
+ # Version should be incremented
237
+ assert mock_user.token_version == 2
238
+
239
+ # Now try to refresh with old token (version 1)
240
+ with patch('routers.auth.check_rate_limit', return_value=True):
241
+ response = client.post(
242
+ "/auth/refresh",
243
+ json={"token": refresh_token}
244
+ )
245
+
246
+ # Should fail because token version is old
247
+ assert response.status_code == 401
248
+ assert "invalidated" in response.json()["detail"].lower()
249
+
250
+
251
+ class TestCookieVsJsonTokens:
252
+ """Test cookie vs JSON token delivery."""
253
+
254
+ def test_web_client_uses_cookies(self):
255
+ """Web clients receive refresh token in cookies."""
256
+ from routers.auth import router
257
+ from fastapi import FastAPI
258
+ from core.database import get_db
259
+ from core.models import User
260
+
261
+ app = FastAPI()
262
+
263
+ mock_user = MagicMock(spec=User)
264
+ mock_user.id = 1
265
+ mock_user.user_id = "usr_web"
266
+ mock_user.email = "web@example.com"
267
+ mock_user.name = "Web User"
268
+ mock_user.credits = 50
269
+ mock_user.token_version = 1
270
+
271
+ mock_google_user = MagicMock()
272
+ mock_google_user.google_id = "web123"
273
+ mock_google_user.email = "web@example.com"
274
+ mock_google_user.name = "Web User"
275
+
276
+ async def mock_get_db():
277
+ mock_db = AsyncMock()
278
+ mock_result = MagicMock()
279
+ mock_result.scalar_one_or_none.return_value = mock_user
280
+ mock_db.execute.return_value = mock_result
281
+ yield mock_db
282
+
283
+ app.dependency_overrides[get_db] = mock_get_db
284
+ app.include_router(router)
285
+ client = TestClient(app)
286
+
287
+ with patch('routers.auth.get_google_auth_service') as mock_service, \
288
+ patch('routers.auth.check_rate_limit', return_value=True), \
289
+ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
290
+ patch('services.backup_service.get_backup_service'), \
291
+ patch('routers.auth.detect_client_type', return_value="web"):
292
+
293
+ mock_service.return_value.verify_token.return_value = mock_google_user
294
+
295
+ response = client.post(
296
+ "/auth/google",
297
+ json={"id_token": "fake-token"},
298
+ headers={"User-Agent": "Mozilla/5.0"}
299
+ )
300
+
301
+ # Cookie should be set
302
+ assert "refresh_token" in response.cookies
303
+ cookie_value = response.cookies.get("refresh_token")
304
+ assert cookie_value is not None
305
+ assert len(cookie_value) > 0
306
+
307
+ # Body should NOT contain refresh_token
308
+ data = response.json()
309
+ assert "refresh_token" not in data
310
+
311
+ def test_mobile_client_uses_json(self):
312
+ """Mobile clients receive refresh token in JSON body."""
313
+ from routers.auth import router
314
+ from fastapi import FastAPI
315
+ from core.database import get_db
316
+ from core.models import User
317
+
318
+ app = FastAPI()
319
+
320
+ mock_user = MagicMock(spec=User)
321
+ mock_user.id = 1
322
+ mock_user.user_id = "usr_mobile"
323
+ mock_user.email = "mobile@example.com"
324
+ mock_user.name = "Mobile User"
325
+ mock_user.credits = 50
326
+ mock_user.token_version = 1
327
+
328
+ mock_google_user = MagicMock()
329
+ mock_google_user.google_id = "mobile123"
330
+ mock_google_user.email = "mobile@example.com"
331
+ mock_google_user.name = "Mobile User"
332
+
333
+ async def mock_get_db():
334
+ mock_db = AsyncMock()
335
+ mock_result = MagicMock()
336
+ mock_result.scalar_one_or_none.return_value = mock_user
337
+ mock_db.execute.return_value = mock_result
338
+ yield mock_db
339
+
340
+ app.dependency_overrides[get_db] = mock_get_db
341
+ app.include_router(router)
342
+ client = TestClient(app)
343
+
344
+ with patch('routers.auth.get_google_auth_service') as mock_service, \
345
+ patch('routers.auth.check_rate_limit', return_value=True), \
346
+ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
347
+ patch('services.backup_service.get_backup_service'), \
348
+ patch('routers.auth.detect_client_type', return_value="mobile"):
349
+
350
+ mock_service.return_value.verify_token.return_value = mock_google_user
351
+
352
+ response = client.post(
353
+ "/auth/google",
354
+ json={"id_token": "fake-token"},
355
+ headers={"User-Agent": "MyApp/1.0"}
356
+ )
357
+
358
+ # Body SHOULD contain refresh_token
359
+ data = response.json()
360
+ assert "refresh_token" in data
361
+ assert len(data["refresh_token"]) > 0
362
+
363
+
364
+ class TestProductionVsLocalSettings:
365
+ """Test environment-based cookie settings."""
366
+
367
+ def test_production_cookies_secure(self, monkeypatch):
368
+ """Production cookies have secure=True, samesite=none."""
369
+ from routers.auth import router
370
+ from fastapi import FastAPI
371
+ from core.database import get_db
372
+ from core.models import User
373
+
374
+ # Set production environment
375
+ monkeypatch.setenv("ENVIRONMENT", "production")
376
+
377
+ app = FastAPI()
378
+
379
+ mock_user = MagicMock(spec=User)
380
+ mock_user.id = 1
381
+ mock_user.user_id = "usr_prod"
382
+ mock_user.email = "prod@example.com"
383
+ mock_user.name = "Prod User"
384
+ mock_user.credits = 50
385
+ mock_user.token_version = 1
386
+
387
+ mock_google_user = MagicMock()
388
+ mock_google_user.google_id = "prod123"
389
+ mock_google_user.email = "prod@example.com"
390
+ mock_google_user.name = "Prod User"
391
+
392
+ async def mock_get_db():
393
+ mock_db = AsyncMock()
394
+ mock_result = MagicMock()
395
+ mock_result.scalar_one_or_none.return_value = mock_user
396
+ mock_db.execute.return_value = mock_result
397
+ yield mock_db
398
+
399
+ app.dependency_overrides[get_db] = mock_get_db
400
+ app.include_router(router)
401
+ client = TestClient(app)
402
+
403
+ with patch('routers.auth.get_google_auth_service') as mock_service, \
404
+ patch('routers.auth.check_rate_limit', return_value=True), \
405
+ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
406
+ patch('services.backup_service.get_backup_service'), \
407
+ patch('routers.auth.detect_client_type', return_value="web"):
408
+
409
+ mock_service.return_value.verify_token.return_value = mock_google_user
410
+
411
+ response = client.post(
412
+ "/auth/google",
413
+ json={"id_token": "fake-token"}
414
+ )
415
+
416
+ # Check that cookie was set (TestClient doesn't fully expose cookie attributes)
417
+ assert "refresh_token" in response.cookies
418
+
419
+ def test_local_cookies_not_secure(self, monkeypatch):
420
+ """Local/dev cookies have secure=False, samesite=lax."""
421
+ from routers.auth import router
422
+ from fastapi import FastAPI
423
+ from core.database import get_db
424
+ from core.models import User
425
+
426
+ # Set local environment
427
+ monkeypatch.setenv("ENVIRONMENT", "development")
428
+
429
+ app = FastAPI()
430
+
431
+ mock_user = MagicMock(spec=User)
432
+ mock_user.id = 1
433
+ mock_user.user_id = "usr_local"
434
+ mock_user.email = "local@example.com"
435
+ mock_user.name = "Local User"
436
+ mock_user.credits = 50
437
+ mock_user.token_version = 1
438
+
439
+ mock_google_user = MagicMock()
440
+ mock_google_user.google_id = "local123"
441
+ mock_google_user.email = "local@example.com"
442
+ mock_google_user.name = "Local User"
443
+
444
+ async def mock_get_db():
445
+ mock_db = AsyncMock()
446
+ mock_result = MagicMock()
447
+ mock_result.scalar_one_or_none.return_value = mock_user
448
+ mock_db.execute.return_value = mock_result
449
+ yield mock_db
450
+
451
+ app.dependency_overrides[get_db] = mock_get_db
452
+ app.include_router(router)
453
+ client = TestClient(app)
454
+
455
+ with patch('routers.auth.get_google_auth_service') as mock_service, \
456
+ patch('routers.auth.check_rate_limit', return_value=True), \
457
+ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
458
+ patch('services.backup_service.get_backup_service'), \
459
+ patch('routers.auth.detect_client_type', return_value="web"):
460
+
461
+ mock_service.return_value.verify_token.return_value = mock_google_user
462
+
463
+ response = client.post(
464
+ "/auth/google",
465
+ json={"id_token": "fake-token"}
466
+ )
467
+
468
+ # Check that cookie was set
469
+ assert "refresh_token" in response.cookies
470
+
471
+
472
+ if __name__ == "__main__":
473
+ pytest.main([__file__, "-v"])