File size: 5,335 Bytes
dbe78dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
"""Simplified unit tests for JWT authentication components."""
import pytest
import base64
import json
from unittest.mock import Mock
from fastapi import HTTPException
# Direct import of JWT validation service
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from infrastructure.services.jwt_validation_service import JWTValidationService
class TestJWTAuthenticationIntegration:
"""Integration tests for JWT authentication components."""
def setup_method(self):
"""Set up test fixtures."""
self.jwt_service = JWTValidationService()
# Create a valid JWT token for testing
header = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "1234567890", "name": "John Doe", "iat": 1516239022}
header_b64 = base64.urlsafe_b64encode(
json.dumps(header).encode()
).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload).encode()
).decode().rstrip('=')
self.valid_token = f"{header_b64}.{payload_b64}.test_signature"
self.valid_auth_header = f"Bearer {self.valid_token}"
def test_jwt_validation_service_integration(self):
"""Test the JWT validation service works correctly."""
# Test valid token
assert self.jwt_service.validate_structure(self.valid_token) is True
# Test invalid tokens
assert self.jwt_service.validate_structure("invalid.token") is False
assert self.jwt_service.validate_structure("") is False
assert self.jwt_service.validate_structure(None) is False
# Test claims extraction
claims = self.jwt_service.extract_claims(self.valid_token)
assert claims is not None
assert claims["sub"] == "1234567890"
assert claims["name"] == "John Doe"
def test_bearer_token_validation_logic(self):
"""Test the bearer token validation logic manually."""
def validate_bearer_token_logic(authorization: str) -> str:
"""Simulate the bearer token validation logic."""
if not authorization:
raise ValueError("Missing Authorization header")
if not authorization.startswith("Bearer "):
raise ValueError("Invalid Authorization header format")
token = authorization[7:] # Remove "Bearer " prefix
if not token:
raise ValueError("Empty bearer token")
# Validate JWT structure
jwt_service = JWTValidationService()
if not jwt_service.validate_structure(token):
raise ValueError("Invalid JWT token structure")
return token
# Test successful validation
result = validate_bearer_token_logic(self.valid_auth_header)
assert result == self.valid_token
# Test various failure cases
with pytest.raises(ValueError, match="Missing Authorization header"):
validate_bearer_token_logic(None)
with pytest.raises(ValueError, match="Invalid Authorization header format"):
validate_bearer_token_logic("Basic dXNlcjpwYXNz")
with pytest.raises(ValueError, match="Empty bearer token"):
validate_bearer_token_logic("Bearer ")
with pytest.raises(ValueError, match="Invalid JWT token structure"):
validate_bearer_token_logic("Bearer invalid.token")
def test_end_to_end_authentication_flow(self):
"""Test the complete authentication flow."""
# 1. Client sends request with valid token
auth_header = f"Bearer {self.valid_token}"
# 2. Extract token from header
assert auth_header.startswith("Bearer ")
token = auth_header[7:]
# 3. Validate token structure
assert self.jwt_service.validate_structure(token) is True
# 4. Extract claims (optional)
claims = self.jwt_service.extract_claims(token)
assert claims is not None
assert "sub" in claims
# This represents a successful authentication flow
print(f"✅ Authentication successful for user: {claims['sub']}")
def test_configuration_flags(self):
"""Test that authentication can be enabled/disabled via configuration."""
# Simulate configuration flags
enforce_authentication = True
enable_external_job_ids = True
jwt_validation_strict = False
# When authentication is enforced
if enforce_authentication:
# Token validation should be required
assert self.jwt_service.validate_structure(self.valid_token) is True
# When external job IDs are enabled
if enable_external_job_ids:
# Should accept external job ID parameter
external_job_id = "test-job-123"
assert len(external_job_id) > 0
# JWT validation strictness
if not jwt_validation_strict:
# Only structure validation, no signature verification
assert self.jwt_service.validate_structure(self.valid_token) is True
print("✅ Configuration flags working correctly") |