Spaces:
Paused
Paused
| from unittest.mock import patch | |
| import pytest | |
| from jose import jwt | |
| from app.modules.auth.service import AuthService | |
| from core.models.user import User | |
| class TestAuthMocks: | |
| def auth_service(self): | |
| return AuthService() | |
| def test_create_access_token(self, auth_service): | |
| data = {"sub": "test@example.com"} | |
| token = auth_service.create_access_token(data) | |
| assert token is not None | |
| assert isinstance(token, str) | |
| assert len(token) > 0 | |
| decoded = jwt.decode( | |
| token, | |
| auth_service.secret_key, | |
| algorithms=[auth_service.algorithm], | |
| options={"verify_aud": False}, | |
| ) | |
| assert decoded["sub"] == "test@example.com" | |
| assert "exp" in decoded | |
| assert decoded["iss"] == "zenith" | |
| assert decoded["type"] == "access" | |
| def test_verify_password_mock(self, auth_service): | |
| with patch.object(auth_service.pwd_context, "verify") as mock_verify: | |
| mock_verify.return_value = True | |
| result = auth_service.verify_password("plain", "hashed") | |
| assert result is True | |
| mock_verify.assert_called_once_with("plain", "hashed") | |
| def test_hash_password_actual(self, auth_service): | |
| password = "SecureP@ss123!" | |
| hashed = auth_service.hash_password(password) | |
| assert hashed != password | |
| assert auth_service.verify_password(password, hashed) is True | |
| def test_verify_password_actual(self, auth_service): | |
| password = "TestPassword123!" | |
| hashed = auth_service.hash_password(password) | |
| assert auth_service.verify_password(password, hashed) is True | |
| assert auth_service.verify_password("wrongpassword", hashed) is False | |
| async def test_authenticate_user_success(self, auth_service): | |
| mock_user = User( | |
| id="1", | |
| email="test@example.com", | |
| username="testuser", | |
| password_hash="$pbkdf2_sha256$600000$test", | |
| is_active=True, | |
| ) | |
| mock_user.password_hash = "hashed_password" # Add this to pass verification logic if needed, though verified is mocked | |
| with patch("app.modules.users.service.UserService") as mock_user_service_cls, \ | |
| patch("app.modules.auth.service.db_service") as mock_db_service: | |
| mock_user_service = mock_user_service_cls.return_value | |
| mock_user_service.get_user_by_email.return_value = mock_user | |
| # Ensure update_user doesn't raise | |
| mock_db_service.update_user.return_value = None | |
| with patch.object(auth_service, "get_user_by_email", return_value=mock_user): | |
| with patch.object(auth_service, "verify_password", return_value=True): | |
| with patch.object(auth_service, "_reset_failed_attempts"): | |
| # We also need to mock _record_failed_attempt or just ensure verify_password returns True | |
| user = auth_service.authenticate_user( | |
| "test@example.com", "password" | |
| ) | |
| assert user == mock_user | |
| async def test_authenticate_user_failure(self, auth_service): | |
| with patch("app.modules.users.service.UserService") as mock_user_service_cls: | |
| mock_user_service = mock_user_service_cls.return_value | |
| mock_user_service.get_user_by_email.return_value = None | |
| with patch.object(auth_service, "get_user_by_email", return_value=None): | |
| user = auth_service.authenticate_user("nonexistent@example.com", "password") | |
| assert user is None | |
| def test_decode_valid_token(self, auth_service): | |
| data = {"sub": "test@example.com", "role": "admin"} | |
| token = auth_service.create_access_token(data) | |
| decoded = auth_service.decode_token(token) | |
| assert decoded["sub"] == "test@example.com" | |
| assert decoded["role"] == "admin" | |
| assert "exp" in decoded | |
| def test_decode_invalid_token(self, auth_service): | |
| with pytest.raises(Exception): | |
| auth_service.decode_token("invalid.token.here") | |
| def test_password_strength_validation(self, auth_service): | |
| errors = auth_service.validate_password_strength("weak") | |
| assert len(errors) > 0 | |
| errors = auth_service.validate_password_strength("SecureP@ss123!") | |
| assert len(errors) == 0 | |
| def test_create_access_token_with_expiry(self, auth_service): | |
| from datetime import timedelta | |
| data = {"sub": "test@example.com"} | |
| expires = timedelta(hours=2) | |
| token = auth_service.create_access_token(data, expires_delta=expires) | |
| decoded = jwt.decode( | |
| token, | |
| auth_service.secret_key, | |
| algorithms=[auth_service.algorithm], | |
| options={"verify_aud": False}, | |
| ) | |
| assert decoded["sub"] == "test@example.com" | |
| def test_auth_service_attributes(self, auth_service): | |
| assert hasattr(auth_service, "pwd_context") | |
| assert hasattr(auth_service, "secret_key") | |
| assert hasattr(auth_service, "algorithm") | |
| assert auth_service._password_min_length == 8 | |