Spaces:
Sleeping
Sleeping
| """ | |
| Integration Tests for Token Expiry | |
| End-to-end tests for token expiry behavior including: | |
| - Token expiry timing | |
| - Automatic token refresh flow | |
| - Environment-based configuration | |
| - Cookie vs JSON token handling | |
| """ | |
| import pytest | |
| import time | |
| from datetime import datetime, timedelta | |
| from unittest.mock import patch, MagicMock, AsyncMock | |
| from fastapi.testclient import TestClient | |
| # ============================================================================ | |
| # Token Expiry Integration Tests | |
| # ============================================================================ | |
| class TestTokenExpiryIntegration: | |
| """Test end-to-end token expiry behavior.""" | |
| def test_token_expires_after_configured_time(self, monkeypatch): | |
| """Token becomes invalid after expiry time.""" | |
| from services.auth_service.jwt_provider import JWTService | |
| # Set very short expiry for testing | |
| service = JWTService( | |
| secret_key="test-secret", | |
| access_expiry_minutes=0.01 # ~0.6 seconds | |
| ) | |
| # Create token | |
| token = service.create_access_token("usr_123", "test@example.com") | |
| # Token should be valid immediately | |
| payload = service.verify_token(token) | |
| assert payload.user_id == "usr_123" | |
| # Token should be expired | |
| from services.auth_service.jwt_provider import TokenExpiredError | |
| with pytest.raises(TokenExpiredError): | |
| service.verify_token(token) | |
| def test_env_variable_controls_expiry(self, monkeypatch): | |
| """JWT_ACCESS_EXPIRY_MINUTES env var controls token lifetime.""" | |
| monkeypatch.setenv("JWT_SECRET", "test-secret") | |
| monkeypatch.setenv("JWT_ACCESS_EXPIRY_MINUTES", "30") | |
| # Reset singleton | |
| import services.auth_service.jwt_provider as jwt_module | |
| jwt_module._default_service = None | |
| from services.auth_service.jwt_provider import create_access_token, verify_access_token | |
| before = datetime.utcnow() | |
| token = create_access_token("usr_123", "test@example.com") | |
| payload = verify_access_token(token) | |
| # Expiry should be ~30 minutes from now | |
| expected_expiry = before + timedelta(minutes=30) | |
| time_diff = abs((payload.expires_at - expected_expiry).total_seconds()) | |
| assert time_diff < 5 # Within 5 seconds tolerance | |
| def test_refresh_token_longer_expiry(self, monkeypatch): | |
| """Refresh tokens have longer expiry than access tokens.""" | |
| from services.auth_service.jwt_provider import JWTService | |
| service = JWTService( | |
| secret_key="test-secret", | |
| access_expiry_minutes=15, | |
| refresh_expiry_days=7 | |
| ) | |
| access_token = service.create_access_token("usr_123", "test@example.com") | |
| refresh_token = service.create_refresh_token("usr_123", "test@example.com") | |
| access_payload = service.verify_token(access_token) | |
| refresh_payload = service.verify_token(refresh_token) | |
| access_lifetime = (access_payload.expires_at - access_payload.issued_at).total_seconds() | |
| refresh_lifetime = (refresh_payload.expires_at - refresh_payload.issued_at).total_seconds() | |
| # Refresh token should have significantly longer lifetime | |
| assert refresh_lifetime > access_lifetime * 10 | |
| class TestTokenRefreshFlow: | |
| """Test automatic token refresh flow.""" | |
| def test_refresh_before_expiry(self): | |
| """Refreshing before expiry issues new valid token.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| from services.auth_service.jwt_provider import create_refresh_token | |
| app = FastAPI() | |
| # Create refresh token | |
| refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1) | |
| mock_user = MagicMock(spec=User) | |
| mock_user.user_id = "usr_123" | |
| mock_user.email = "test@example.com" | |
| mock_user.token_version = 1 | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.check_rate_limit', return_value=True): | |
| response = client.post( | |
| "/auth/refresh", | |
| json={"token": refresh_token} | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "access_token" in data | |
| assert "refresh_token" in data | |
| # New access token should be different (different iat time) | |
| # Note: Refresh tokens might be identical if created in same second, | |
| # so we just verify both tokens exist | |
| def test_refresh_with_expired_access_token(self): | |
| """Can refresh even if access token expired (using refresh token).""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| from services.auth_service.jwt_provider import JWTService | |
| app = FastAPI() | |
| # Create access token that expires immediately | |
| service = JWTService( | |
| secret_key="test-secret", | |
| access_expiry_minutes=0.01 # ~0.6 seconds | |
| ) | |
| access_token = service.create_access_token("usr_123", "test@example.com") | |
| refresh_token = service.create_refresh_token("usr_123", "test@example.com", token_version=1) | |
| # Wait for access token to expire | |
| time.sleep(1) | |
| # Access token should be expired | |
| from services.auth_service.jwt_provider import TokenExpiredError | |
| with pytest.raises(TokenExpiredError): | |
| service.verify_token(access_token) | |
| # But refresh token should still work | |
| mock_user = MagicMock(spec=User) | |
| mock_user.user_id = "usr_123" | |
| mock_user.email = "test@example.com" | |
| mock_user.token_version = 1 | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.check_rate_limit', return_value=True): | |
| response = client.post( | |
| "/auth/refresh", | |
| json={"token": refresh_token} | |
| ) | |
| assert response.status_code == 200 | |
| # Should get new access token | |
| assert "access_token" in response.json() | |
| class TestTokenVersioning: | |
| """Test token versioning for logout/invalidation.""" | |
| def test_logout_invalidates_all_tokens(self): | |
| """Logout increments version, invalidating all existing tokens.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.dependencies import get_current_user | |
| from core.database import get_db | |
| from core.models import User | |
| from services.auth_service.jwt_provider import create_access_token, create_refresh_token | |
| app = FastAPI() | |
| # Create user with version 1 | |
| mock_user = MagicMock(spec=User) | |
| mock_user.id = 1 | |
| mock_user.user_id = "usr_123" | |
| mock_user.email = "test@example.com" | |
| mock_user.token_version = 1 | |
| # Create tokens with version 1 | |
| access_token = create_access_token("usr_123", "test@example.com", token_version=1) | |
| refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1) | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_current_user] = lambda: mock_user | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| # Logout | |
| with patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ | |
| patch('services.backup_service.get_backup_service'): | |
| response = client.post("/auth/logout") | |
| assert response.status_code == 200 | |
| # Version should be incremented | |
| assert mock_user.token_version == 2 | |
| # Now try to refresh with old token (version 1) | |
| with patch('routers.auth.check_rate_limit', return_value=True): | |
| response = client.post( | |
| "/auth/refresh", | |
| json={"token": refresh_token} | |
| ) | |
| # Should fail because token version is old | |
| assert response.status_code == 401 | |
| assert "invalidated" in response.json()["detail"].lower() | |
| class TestCookieVsJsonTokens: | |
| """Test cookie vs JSON token delivery.""" | |
| def test_web_client_uses_cookies(self): | |
| """Web clients receive refresh token in cookies.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| app = FastAPI() | |
| mock_user = MagicMock(spec=User) | |
| mock_user.id = 1 | |
| mock_user.user_id = "usr_web" | |
| mock_user.email = "web@example.com" | |
| mock_user.name = "Web User" | |
| mock_user.credits = 50 | |
| mock_user.token_version = 1 | |
| mock_google_user = MagicMock() | |
| mock_google_user.google_id = "web123" | |
| mock_google_user.email = "web@example.com" | |
| mock_google_user.name = "Web User" | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.get_google_auth_service') as mock_service, \ | |
| patch('routers.auth.check_rate_limit', return_value=True), \ | |
| patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ | |
| patch('services.backup_service.get_backup_service'), \ | |
| patch('routers.auth.detect_client_type', return_value="web"): | |
| mock_service.return_value.verify_token.return_value = mock_google_user | |
| response = client.post( | |
| "/auth/google", | |
| json={"id_token": "fake-token"}, | |
| headers={"User-Agent": "Mozilla/5.0"} | |
| ) | |
| # Cookie should be set | |
| assert "refresh_token" in response.cookies | |
| cookie_value = response.cookies.get("refresh_token") | |
| assert cookie_value is not None | |
| assert len(cookie_value) > 0 | |
| # Body should NOT contain refresh_token | |
| data = response.json() | |
| assert "refresh_token" not in data | |
| def test_mobile_client_uses_json(self): | |
| """Mobile clients receive refresh token in JSON body.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| app = FastAPI() | |
| mock_user = MagicMock(spec=User) | |
| mock_user.id = 1 | |
| mock_user.user_id = "usr_mobile" | |
| mock_user.email = "mobile@example.com" | |
| mock_user.name = "Mobile User" | |
| mock_user.credits = 50 | |
| mock_user.token_version = 1 | |
| mock_google_user = MagicMock() | |
| mock_google_user.google_id = "mobile123" | |
| mock_google_user.email = "mobile@example.com" | |
| mock_google_user.name = "Mobile User" | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.get_google_auth_service') as mock_service, \ | |
| patch('routers.auth.check_rate_limit', return_value=True), \ | |
| patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ | |
| patch('services.backup_service.get_backup_service'), \ | |
| patch('routers.auth.detect_client_type', return_value="mobile"): | |
| mock_service.return_value.verify_token.return_value = mock_google_user | |
| response = client.post( | |
| "/auth/google", | |
| json={"id_token": "fake-token"}, | |
| headers={"User-Agent": "MyApp/1.0"} | |
| ) | |
| # Body SHOULD contain refresh_token | |
| data = response.json() | |
| assert "refresh_token" in data | |
| assert len(data["refresh_token"]) > 0 | |
| class TestProductionVsLocalSettings: | |
| """Test environment-based cookie settings.""" | |
| def test_production_cookies_secure(self, monkeypatch): | |
| """Production cookies have secure=True, samesite=none.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| # Set production environment | |
| monkeypatch.setenv("ENVIRONMENT", "production") | |
| app = FastAPI() | |
| mock_user = MagicMock(spec=User) | |
| mock_user.id = 1 | |
| mock_user.user_id = "usr_prod" | |
| mock_user.email = "prod@example.com" | |
| mock_user.name = "Prod User" | |
| mock_user.credits = 50 | |
| mock_user.token_version = 1 | |
| mock_google_user = MagicMock() | |
| mock_google_user.google_id = "prod123" | |
| mock_google_user.email = "prod@example.com" | |
| mock_google_user.name = "Prod User" | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.get_google_auth_service') as mock_service, \ | |
| patch('routers.auth.check_rate_limit', return_value=True), \ | |
| patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ | |
| patch('services.backup_service.get_backup_service'), \ | |
| patch('routers.auth.detect_client_type', return_value="web"): | |
| mock_service.return_value.verify_token.return_value = mock_google_user | |
| response = client.post( | |
| "/auth/google", | |
| json={"id_token": "fake-token"} | |
| ) | |
| # Check that cookie was set (TestClient doesn't fully expose cookie attributes) | |
| assert "refresh_token" in response.cookies | |
| def test_local_cookies_not_secure(self, monkeypatch): | |
| """Local/dev cookies have secure=False, samesite=lax.""" | |
| from routers.auth import router | |
| from fastapi import FastAPI | |
| from core.database import get_db | |
| from core.models import User | |
| # Set local environment | |
| monkeypatch.setenv("ENVIRONMENT", "development") | |
| app = FastAPI() | |
| mock_user = MagicMock(spec=User) | |
| mock_user.id = 1 | |
| mock_user.user_id = "usr_local" | |
| mock_user.email = "local@example.com" | |
| mock_user.name = "Local User" | |
| mock_user.credits = 50 | |
| mock_user.token_version = 1 | |
| mock_google_user = MagicMock() | |
| mock_google_user.google_id = "local123" | |
| mock_google_user.email = "local@example.com" | |
| mock_google_user.name = "Local User" | |
| async def mock_get_db(): | |
| mock_db = AsyncMock() | |
| mock_result = MagicMock() | |
| mock_result.scalar_one_or_none.return_value = mock_user | |
| mock_db.execute.return_value = mock_result | |
| yield mock_db | |
| app.dependency_overrides[get_db] = mock_get_db | |
| app.include_router(router) | |
| client = TestClient(app) | |
| with patch('routers.auth.get_google_auth_service') as mock_service, \ | |
| patch('routers.auth.check_rate_limit', return_value=True), \ | |
| patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ | |
| patch('services.backup_service.get_backup_service'), \ | |
| patch('routers.auth.detect_client_type', return_value="web"): | |
| mock_service.return_value.verify_token.return_value = mock_google_user | |
| response = client.post( | |
| "/auth/google", | |
| json={"id_token": "fake-token"} | |
| ) | |
| # Check that cookie was set | |
| assert "refresh_token" in response.cookies | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) | |