apigateway / tests /test_token_expiry_integration.py
jebin2's picture
ref
a42ab7e
"""
Integration Tests for Token Expiry
End-to-end tests for token expiry behavior including:
- Token expiry timing
- Automatic token refresh flow
- Environment-based configuration
- Cookie vs JSON token handling
"""
import pytest
import time
from datetime import datetime, timedelta
from unittest.mock import patch, MagicMock, AsyncMock
from fastapi.testclient import TestClient
# ============================================================================
# Token Expiry Integration Tests
# ============================================================================
class TestTokenExpiryIntegration:
"""Test end-to-end token expiry behavior."""
def test_token_expires_after_configured_time(self, monkeypatch):
"""Token becomes invalid after expiry time."""
from services.auth_service.jwt_provider import JWTService
# Set very short expiry for testing
service = JWTService(
secret_key="test-secret",
access_expiry_minutes=0.01 # ~0.6 seconds
)
# Create token
token = service.create_access_token("usr_123", "test@example.com")
# Token should be valid immediately
payload = service.verify_token(token)
assert payload.user_id == "usr_123"
# Token should be expired
from services.auth_service.jwt_provider import TokenExpiredError
with pytest.raises(TokenExpiredError):
service.verify_token(token)
def test_env_variable_controls_expiry(self, monkeypatch):
"""JWT_ACCESS_EXPIRY_MINUTES env var controls token lifetime."""
monkeypatch.setenv("JWT_SECRET", "test-secret")
monkeypatch.setenv("JWT_ACCESS_EXPIRY_MINUTES", "30")
# Reset singleton
import services.auth_service.jwt_provider as jwt_module
jwt_module._default_service = None
from services.auth_service.jwt_provider import create_access_token, verify_access_token
before = datetime.utcnow()
token = create_access_token("usr_123", "test@example.com")
payload = verify_access_token(token)
# Expiry should be ~30 minutes from now
expected_expiry = before + timedelta(minutes=30)
time_diff = abs((payload.expires_at - expected_expiry).total_seconds())
assert time_diff < 5 # Within 5 seconds tolerance
def test_refresh_token_longer_expiry(self, monkeypatch):
"""Refresh tokens have longer expiry than access tokens."""
from services.auth_service.jwt_provider import JWTService
service = JWTService(
secret_key="test-secret",
access_expiry_minutes=15,
refresh_expiry_days=7
)
access_token = service.create_access_token("usr_123", "test@example.com")
refresh_token = service.create_refresh_token("usr_123", "test@example.com")
access_payload = service.verify_token(access_token)
refresh_payload = service.verify_token(refresh_token)
access_lifetime = (access_payload.expires_at - access_payload.issued_at).total_seconds()
refresh_lifetime = (refresh_payload.expires_at - refresh_payload.issued_at).total_seconds()
# Refresh token should have significantly longer lifetime
assert refresh_lifetime > access_lifetime * 10
class TestTokenRefreshFlow:
"""Test automatic token refresh flow."""
def test_refresh_before_expiry(self):
"""Refreshing before expiry issues new valid token."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
from services.auth_service.jwt_provider import create_refresh_token
app = FastAPI()
# Create refresh token
refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1)
mock_user = MagicMock(spec=User)
mock_user.user_id = "usr_123"
mock_user.email = "test@example.com"
mock_user.token_version = 1
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.check_rate_limit', return_value=True):
response = client.post(
"/auth/refresh",
json={"token": refresh_token}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
# New access token should be different (different iat time)
# Note: Refresh tokens might be identical if created in same second,
# so we just verify both tokens exist
def test_refresh_with_expired_access_token(self):
"""Can refresh even if access token expired (using refresh token)."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
from services.auth_service.jwt_provider import JWTService
app = FastAPI()
# Create access token that expires immediately
service = JWTService(
secret_key="test-secret",
access_expiry_minutes=0.01 # ~0.6 seconds
)
access_token = service.create_access_token("usr_123", "test@example.com")
refresh_token = service.create_refresh_token("usr_123", "test@example.com", token_version=1)
# Wait for access token to expire
time.sleep(1)
# Access token should be expired
from services.auth_service.jwt_provider import TokenExpiredError
with pytest.raises(TokenExpiredError):
service.verify_token(access_token)
# But refresh token should still work
mock_user = MagicMock(spec=User)
mock_user.user_id = "usr_123"
mock_user.email = "test@example.com"
mock_user.token_version = 1
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.check_rate_limit', return_value=True):
response = client.post(
"/auth/refresh",
json={"token": refresh_token}
)
assert response.status_code == 200
# Should get new access token
assert "access_token" in response.json()
class TestTokenVersioning:
"""Test token versioning for logout/invalidation."""
def test_logout_invalidates_all_tokens(self):
"""Logout increments version, invalidating all existing tokens."""
from routers.auth import router
from fastapi import FastAPI
from core.dependencies import get_current_user
from core.database import get_db
from core.models import User
from services.auth_service.jwt_provider import create_access_token, create_refresh_token
app = FastAPI()
# Create user with version 1
mock_user = MagicMock(spec=User)
mock_user.id = 1
mock_user.user_id = "usr_123"
mock_user.email = "test@example.com"
mock_user.token_version = 1
# Create tokens with version 1
access_token = create_access_token("usr_123", "test@example.com", token_version=1)
refresh_token = create_refresh_token("usr_123", "test@example.com", token_version=1)
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_current_user] = lambda: mock_user
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
# Logout
with patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
patch('services.backup_service.get_backup_service'):
response = client.post("/auth/logout")
assert response.status_code == 200
# Version should be incremented
assert mock_user.token_version == 2
# Now try to refresh with old token (version 1)
with patch('routers.auth.check_rate_limit', return_value=True):
response = client.post(
"/auth/refresh",
json={"token": refresh_token}
)
# Should fail because token version is old
assert response.status_code == 401
assert "invalidated" in response.json()["detail"].lower()
class TestCookieVsJsonTokens:
"""Test cookie vs JSON token delivery."""
def test_web_client_uses_cookies(self):
"""Web clients receive refresh token in cookies."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
app = FastAPI()
mock_user = MagicMock(spec=User)
mock_user.id = 1
mock_user.user_id = "usr_web"
mock_user.email = "web@example.com"
mock_user.name = "Web User"
mock_user.credits = 50
mock_user.token_version = 1
mock_google_user = MagicMock()
mock_google_user.google_id = "web123"
mock_google_user.email = "web@example.com"
mock_google_user.name = "Web User"
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.get_google_auth_service') as mock_service, \
patch('routers.auth.check_rate_limit', return_value=True), \
patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
patch('services.backup_service.get_backup_service'), \
patch('routers.auth.detect_client_type', return_value="web"):
mock_service.return_value.verify_token.return_value = mock_google_user
response = client.post(
"/auth/google",
json={"id_token": "fake-token"},
headers={"User-Agent": "Mozilla/5.0"}
)
# Cookie should be set
assert "refresh_token" in response.cookies
cookie_value = response.cookies.get("refresh_token")
assert cookie_value is not None
assert len(cookie_value) > 0
# Body should NOT contain refresh_token
data = response.json()
assert "refresh_token" not in data
def test_mobile_client_uses_json(self):
"""Mobile clients receive refresh token in JSON body."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
app = FastAPI()
mock_user = MagicMock(spec=User)
mock_user.id = 1
mock_user.user_id = "usr_mobile"
mock_user.email = "mobile@example.com"
mock_user.name = "Mobile User"
mock_user.credits = 50
mock_user.token_version = 1
mock_google_user = MagicMock()
mock_google_user.google_id = "mobile123"
mock_google_user.email = "mobile@example.com"
mock_google_user.name = "Mobile User"
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.get_google_auth_service') as mock_service, \
patch('routers.auth.check_rate_limit', return_value=True), \
patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
patch('services.backup_service.get_backup_service'), \
patch('routers.auth.detect_client_type', return_value="mobile"):
mock_service.return_value.verify_token.return_value = mock_google_user
response = client.post(
"/auth/google",
json={"id_token": "fake-token"},
headers={"User-Agent": "MyApp/1.0"}
)
# Body SHOULD contain refresh_token
data = response.json()
assert "refresh_token" in data
assert len(data["refresh_token"]) > 0
class TestProductionVsLocalSettings:
"""Test environment-based cookie settings."""
def test_production_cookies_secure(self, monkeypatch):
"""Production cookies have secure=True, samesite=none."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
# Set production environment
monkeypatch.setenv("ENVIRONMENT", "production")
app = FastAPI()
mock_user = MagicMock(spec=User)
mock_user.id = 1
mock_user.user_id = "usr_prod"
mock_user.email = "prod@example.com"
mock_user.name = "Prod User"
mock_user.credits = 50
mock_user.token_version = 1
mock_google_user = MagicMock()
mock_google_user.google_id = "prod123"
mock_google_user.email = "prod@example.com"
mock_google_user.name = "Prod User"
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.get_google_auth_service') as mock_service, \
patch('routers.auth.check_rate_limit', return_value=True), \
patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
patch('services.backup_service.get_backup_service'), \
patch('routers.auth.detect_client_type', return_value="web"):
mock_service.return_value.verify_token.return_value = mock_google_user
response = client.post(
"/auth/google",
json={"id_token": "fake-token"}
)
# Check that cookie was set (TestClient doesn't fully expose cookie attributes)
assert "refresh_token" in response.cookies
def test_local_cookies_not_secure(self, monkeypatch):
"""Local/dev cookies have secure=False, samesite=lax."""
from routers.auth import router
from fastapi import FastAPI
from core.database import get_db
from core.models import User
# Set local environment
monkeypatch.setenv("ENVIRONMENT", "development")
app = FastAPI()
mock_user = MagicMock(spec=User)
mock_user.id = 1
mock_user.user_id = "usr_local"
mock_user.email = "local@example.com"
mock_user.name = "Local User"
mock_user.credits = 50
mock_user.token_version = 1
mock_google_user = MagicMock()
mock_google_user.google_id = "local123"
mock_google_user.email = "local@example.com"
mock_google_user.name = "Local User"
async def mock_get_db():
mock_db = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_db.execute.return_value = mock_result
yield mock_db
app.dependency_overrides[get_db] = mock_get_db
app.include_router(router)
client = TestClient(app)
with patch('routers.auth.get_google_auth_service') as mock_service, \
patch('routers.auth.check_rate_limit', return_value=True), \
patch('routers.auth.AuditService.log_event', return_value=AsyncMock()), \
patch('services.backup_service.get_backup_service'), \
patch('routers.auth.detect_client_type', return_value="web"):
mock_service.return_value.verify_token.return_value = mock_google_user
response = client.post(
"/auth/google",
json={"id_token": "fake-token"}
)
# Check that cookie was set
assert "refresh_token" in response.cookies
if __name__ == "__main__":
pytest.main([__file__, "-v"])