Spaces:
Sleeping
Sleeping
File size: 7,988 Bytes
1bd7131 050d8f8 1bd7131 050d8f8 1bd7131 050d8f8 1bd7131 050d8f8 34f76dc 050d8f8 1bd7131 050d8f8 1bd7131 050d8f8 1bd7131 050d8f8 1bd7131 050d8f8 1bd7131 34f76dc 1bd7131 34f76dc 1bd7131 050d8f8 1bd7131 34f76dc 1bd7131 050d8f8 1bd7131 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
"""
Integration Tests for Google OAuth Authentication
Tests the new Google Sign-In flow, JWT token handling, and API access.
"""
import pytest
from unittest.mock import patch, MagicMock
import os
from sqlalchemy import text
from services.google_auth_service import GoogleUserInfo
from services.jwt_service import JWTService
# Cleanup fixture
@pytest.fixture(autouse=True)
def cleanup_db():
if os.path.exists("./test_blink_data.db"):
pass
yield
@pytest.fixture(autouse=True)
async def clear_tables(db_session):
"""Truncate all tables between tests."""
async with db_session.begin():
await db_session.execute(text("DELETE FROM users"))
await db_session.execute(text("DELETE FROM client_users"))
await db_session.execute(text("DELETE FROM rate_limits"))
await db_session.execute(text("DELETE FROM audit_logs"))
await db_session.commit()
@pytest.fixture
def jwt_service():
"""Create a JWT service for testing."""
return JWTService(secret_key="test-secret-key-for-testing-only")
@pytest.fixture
def mock_google_user():
"""Mock Google user info."""
return GoogleUserInfo(
google_id="google_123456789",
email="test@example.com",
email_verified=True,
name="Test User",
picture="https://example.com/photo.jpg"
)
class TestGoogleAuth:
"""Test Google OAuth authentication flow."""
@patch("routers.auth.get_google_auth_service")
def test_google_auth_new_user(self, mock_get_service, client, mock_google_user):
"""Test new user registration via Google."""
mock_service = MagicMock()
mock_service.verify_token.return_value = mock_google_user
mock_get_service.return_value = mock_service
response = client.post("/auth/google", json={
"id_token": "fake-google-token-12345",
"temp_user_id": "temp-user-abc"
})
assert response.status_code == 200
data = response.json()
assert data["success"] == True
assert data["is_new_user"] == True
assert data["email"] == "test@example.com"
assert data["name"] == "Test User"
assert data["credits"] == 100
assert "access_token" in data
assert data["access_token"] != ""
@patch("routers.auth.get_google_auth_service")
def test_google_auth_existing_user(self, mock_get_service, client, mock_google_user):
"""Test existing user login via Google."""
mock_service = MagicMock()
mock_service.verify_token.return_value = mock_google_user
mock_get_service.return_value = mock_service
# First login - creates user
response1 = client.post("/auth/google", json={"id_token": "token1"})
assert response1.status_code == 200
assert response1.json()["is_new_user"] == True
# Second login - same user
response2 = client.post("/auth/google", json={"id_token": "token2"})
assert response2.status_code == 200
data = response2.json()
assert data["is_new_user"] == False
assert data["email"] == "test@example.com"
assert data["credits"] == 100 # Credits preserved
@patch("routers.auth.get_google_auth_service")
def test_google_auth_invalid_token(self, mock_get_service, client):
"""Test handling of invalid Google token."""
from services.google_auth_service import InvalidTokenError
mock_service = MagicMock()
mock_service.verify_token.side_effect = InvalidTokenError("Invalid token")
mock_get_service.return_value = mock_service
response = client.post("/auth/google", json={"id_token": "invalid-token"})
assert response.status_code == 401
assert "Invalid Google token" in response.json()["detail"]
class TestJWTAuth:
"""Test JWT token authentication."""
@patch("routers.auth.get_google_auth_service")
def test_get_current_user(self, mock_get_service, client, mock_google_user):
"""Test getting current user with JWT."""
mock_service = MagicMock()
mock_service.verify_token.return_value = mock_google_user
mock_get_service.return_value = mock_service
# Login to get token
login_response = client.post("/auth/google", json={"id_token": "token"})
token = login_response.json()["access_token"]
# Get user info
response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 200
data = response.json()
assert data["email"] == "test@example.com"
assert data["credits"] == 100
def test_missing_auth_header(self, client):
"""Test request without Authorization header."""
response = client.get("/auth/me")
assert response.status_code == 401
assert "Missing Authorization header" in response.json()["detail"]
def test_invalid_token_format(self, client):
"""Test request with invalid token format."""
response = client.get("/auth/me", headers={"Authorization": "InvalidFormat"})
assert response.status_code == 401
assert "Invalid Authorization header format" in response.json()["detail"]
def test_invalid_token(self, client):
"""Test request with invalid JWT token."""
response = client.get("/auth/me", headers={"Authorization": "Bearer invalid.jwt.token"})
assert response.status_code == 401
class TestCreditSystem:
"""Test credit deduction system."""
@patch("routers.auth.get_google_auth_service")
def test_credit_deduction(self, mock_get_service, client, mock_google_user):
"""Test that credits are deducted when using API."""
mock_service = MagicMock()
mock_service.verify_token.return_value = mock_google_user
mock_get_service.return_value = mock_service
# Login
login_response = client.post("/auth/google", json={"id_token": "token"})
token = login_response.json()["access_token"]
initial_credits = login_response.json()["credits"]
# Make an API call that deducts credits (would need gemini endpoint mock)
# For now, just verify user info doesn't deduct credits
response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"})
assert response.json()["credits"] == initial_credits # No deduction for info endpoint
class TestBlinkFlow:
"""Test blink data collection."""
def test_blink_flow(self, client):
"""Test Blink endpoint still works."""
user_id = "12345678901234567890"
encrypted_data = "some_encrypted_data_base64"
userid_param = user_id + encrypted_data
response = client.get(f"/blink?userid={userid_param}")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["client_user_id"] == user_id # Changed from user_id
# Verify data stored in audit_logs
response = client.get("/api/data")
assert response.status_code == 200
items = response.json()["items"]
assert len(items) > 0
assert items[0]["client_user_id"] == user_id # Changed from user_id
assert items[0]["log_type"] == "client" # New field
class TestRateLimiting:
"""Test rate limiting."""
def test_rate_limiting(self, client):
"""Test rate limiting on auth endpoints."""
# 10 requests should succeed
for _ in range(10):
response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"})
assert response.status_code == 200
# 11th request should fail
response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"})
assert response.status_code == 429
|