""" Integration Tests for Token Expiry End-to-end tests for token expiry behavior including: - Token expiry timing - Automatic token refresh flow - Environment-based configuration - Cookie vs JSON token handling """ import pytest import time from datetime import datetime, timedelta from unittest.mock import patch, MagicMock, AsyncMock from fastapi.testclient import TestClient # ============================================================================ # Token Expiry Integration Tests # ============================================================================ class TestTokenExpiryIntegration: """Test end-to-end token expiry behavior.""" def test_token_expires_after_configured_time(self, monkeypatch): """Token becomes invalid after expiry time.""" from services.auth_service.jwt_provider import JWTService # Set very short expiry for testing service = JWTService( secret_key="test-secret", access_expiry_minutes=0.01 # ~0.6 seconds ) # Create token token = service.create_access_token("usr_123", "test@example.com") # Token should be valid immediately payload = service.verify_token(token) assert payload.user_id == "usr_123" # Token should be expired from services.auth_service.jwt_provider import TokenExpiredError with pytest.raises(TokenExpiredError): service.verify_token(token) def test_env_variable_controls_expiry(self, monkeypatch): """JWT_ACCESS_EXPIRY_MINUTES env var controls token lifetime.""" monkeypatch.setenv("JWT_SECRET", "test-secret") monkeypatch.setenv("JWT_ACCESS_EXPIRY_MINUTES", "30") # Reset singleton import services.auth_service.jwt_provider as jwt_module jwt_module._default_service = None from services.auth_service.jwt_provider import create_access_token, verify_access_token before = datetime.utcnow() token = create_access_token("usr_123", "test@example.com") payload = verify_access_token(token) # Expiry should be ~30 minutes from now expected_expiry = before + timedelta(minutes=30) time_diff = abs((payload.expires_at - expected_expiry).total_seconds()) assert time_diff < 5 # Within 5 seconds tolerance def test_refresh_token_longer_expiry(self, monkeypatch): """Refresh tokens have longer expiry than access tokens.""" from services.auth_service.jwt_provider import JWTService service = JWTService( secret_key="test-secret", access_expiry_minutes=15, refresh_expiry_days=7 ) access_token = service.create_access_token("usr_123", "test@example.com") refresh_token = service.create_refresh_token("usr_123", "test@example.com") access_payload = service.verify_token(access_token) refresh_payload = service.verify_token(refresh_token) access_lifetime = (access_payload.expires_at - access_payload.issued_at).total_seconds() refresh_lifetime = (refresh_payload.expires_at - refresh_payload.issued_at).total_seconds() # Refresh token should have significantly longer lifetime assert refresh_lifetime > access_lifetime * 10 class TestTokenRefreshFlow: """Test automatic token refresh flow.""" def test_refresh_before_expiry(self): """Refreshing before expiry issues new valid token.""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User from services.auth_service.jwt_provider import create_refresh_token app = FastAPI() # Create refresh token refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1) mock_user = MagicMock(spec=User) mock_user.user_id = "usr_123" mock_user.email = "test@example.com" mock_user.token_version = 1 async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.check_rate_limit', return_value=True): response = client.post( "/auth/refresh", json={"token": refresh_token} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "refresh_token" in data # New access token should be different (different iat time) # Note: Refresh tokens might be identical if created in same second, # so we just verify both tokens exist def test_refresh_with_expired_access_token(self): """Can refresh even if access token expired (using refresh token).""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User from services.auth_service.jwt_provider import JWTService app = FastAPI() # Create access token that expires immediately service = JWTService( secret_key="test-secret", access_expiry_minutes=0.01 # ~0.6 seconds ) access_token = service.create_access_token("usr_123", "test@example.com") refresh_token = service.create_refresh_token("usr_123", "test@example.com", token_version=1) # Wait for access token to expire time.sleep(1) # Access token should be expired from services.auth_service.jwt_provider import TokenExpiredError with pytest.raises(TokenExpiredError): service.verify_token(access_token) # But refresh token should still work mock_user = MagicMock(spec=User) mock_user.user_id = "usr_123" mock_user.email = "test@example.com" mock_user.token_version = 1 async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.check_rate_limit', return_value=True): response = client.post( "/auth/refresh", json={"token": refresh_token} ) assert response.status_code == 200 # Should get new access token assert "access_token" in response.json() class TestTokenVersioning: """Test token versioning for logout/invalidation.""" def test_logout_invalidates_all_tokens(self): """Logout increments version, invalidating all existing tokens.""" from routers.auth import router from fastapi import FastAPI from core.dependencies import get_current_user from core.database import get_db from core.models import User from services.auth_service.jwt_provider import create_access_token, create_refresh_token app = FastAPI() # Create user with version 1 mock_user = MagicMock(spec=User) mock_user.id = 1 mock_user.user_id = "usr_123" mock_user.email = "test@example.com" mock_user.token_version = 1 # Create tokens with version 1 access_token = create_access_token("usr_123", "test@example.com", token_version=1) refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1) async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_current_user] = lambda: mock_user app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) # Logout with patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ patch('services.backup_service.get_backup_service'): response = client.post("/auth/logout") assert response.status_code == 200 # Version should be incremented assert mock_user.token_version == 2 # Now try to refresh with old token (version 1) with patch('routers.auth.check_rate_limit', return_value=True): response = client.post( "/auth/refresh", json={"token": refresh_token} ) # Should fail because token version is old assert response.status_code == 401 assert "invalidated" in response.json()["detail"].lower() class TestCookieVsJsonTokens: """Test cookie vs JSON token delivery.""" def test_web_client_uses_cookies(self): """Web clients receive refresh token in cookies.""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User app = FastAPI() mock_user = MagicMock(spec=User) mock_user.id = 1 mock_user.user_id = "usr_web" mock_user.email = "web@example.com" mock_user.name = "Web User" mock_user.credits = 50 mock_user.token_version = 1 mock_google_user = MagicMock() mock_google_user.google_id = "web123" mock_google_user.email = "web@example.com" mock_google_user.name = "Web User" async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.get_google_auth_service') as mock_service, \ patch('routers.auth.check_rate_limit', return_value=True), \ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ patch('services.backup_service.get_backup_service'), \ patch('routers.auth.detect_client_type', return_value="web"): mock_service.return_value.verify_token.return_value = mock_google_user response = client.post( "/auth/google", json={"id_token": "fake-token"}, headers={"User-Agent": "Mozilla/5.0"} ) # Cookie should be set assert "refresh_token" in response.cookies cookie_value = response.cookies.get("refresh_token") assert cookie_value is not None assert len(cookie_value) > 0 # Body should NOT contain refresh_token data = response.json() assert "refresh_token" not in data def test_mobile_client_uses_json(self): """Mobile clients receive refresh token in JSON body.""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User app = FastAPI() mock_user = MagicMock(spec=User) mock_user.id = 1 mock_user.user_id = "usr_mobile" mock_user.email = "mobile@example.com" mock_user.name = "Mobile User" mock_user.credits = 50 mock_user.token_version = 1 mock_google_user = MagicMock() mock_google_user.google_id = "mobile123" mock_google_user.email = "mobile@example.com" mock_google_user.name = "Mobile User" async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.get_google_auth_service') as mock_service, \ patch('routers.auth.check_rate_limit', return_value=True), \ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ patch('services.backup_service.get_backup_service'), \ patch('routers.auth.detect_client_type', return_value="mobile"): mock_service.return_value.verify_token.return_value = mock_google_user response = client.post( "/auth/google", json={"id_token": "fake-token"}, headers={"User-Agent": "MyApp/1.0"} ) # Body SHOULD contain refresh_token data = response.json() assert "refresh_token" in data assert len(data["refresh_token"]) > 0 class TestProductionVsLocalSettings: """Test environment-based cookie settings.""" def test_production_cookies_secure(self, monkeypatch): """Production cookies have secure=True, samesite=none.""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User # Set production environment monkeypatch.setenv("ENVIRONMENT", "production") app = FastAPI() mock_user = MagicMock(spec=User) mock_user.id = 1 mock_user.user_id = "usr_prod" mock_user.email = "prod@example.com" mock_user.name = "Prod User" mock_user.credits = 50 mock_user.token_version = 1 mock_google_user = MagicMock() mock_google_user.google_id = "prod123" mock_google_user.email = "prod@example.com" mock_google_user.name = "Prod User" async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.get_google_auth_service') as mock_service, \ patch('routers.auth.check_rate_limit', return_value=True), \ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ patch('services.backup_service.get_backup_service'), \ patch('routers.auth.detect_client_type', return_value="web"): mock_service.return_value.verify_token.return_value = mock_google_user response = client.post( "/auth/google", json={"id_token": "fake-token"} ) # Check that cookie was set (TestClient doesn't fully expose cookie attributes) assert "refresh_token" in response.cookies def test_local_cookies_not_secure(self, monkeypatch): """Local/dev cookies have secure=False, samesite=lax.""" from routers.auth import router from fastapi import FastAPI from core.database import get_db from core.models import User # Set local environment monkeypatch.setenv("ENVIRONMENT", "development") app = FastAPI() mock_user = MagicMock(spec=User) mock_user.id = 1 mock_user.user_id = "usr_local" mock_user.email = "local@example.com" mock_user.name = "Local User" mock_user.credits = 50 mock_user.token_version = 1 mock_google_user = MagicMock() mock_google_user.google_id = "local123" mock_google_user.email = "local@example.com" mock_google_user.name = "Local User" async def mock_get_db(): mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_user mock_db.execute.return_value = mock_result yield mock_db app.dependency_overrides[get_db] = mock_get_db app.include_router(router) client = TestClient(app) with patch('routers.auth.get_google_auth_service') as mock_service, \ patch('routers.auth.check_rate_limit', return_value=True), \ patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \ patch('services.backup_service.get_backup_service'), \ patch('routers.auth.detect_client_type', return_value="web"): mock_service.return_value.verify_token.return_value = mock_google_user response = client.post( "/auth/google", json={"id_token": "fake-token"} ) # Check that cookie was set assert "refresh_token" in response.cookies if __name__ == "__main__": pytest.main([__file__, "-v"])