File size: 7,988 Bytes
1bd7131
 
 
 
 
050d8f8
1bd7131
050d8f8
 
 
1bd7131
 
 
 
050d8f8
 
 
 
 
 
 
 
 
 
1bd7131
050d8f8
 
34f76dc
050d8f8
 
 
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
050d8f8
1bd7131
 
 
 
 
 
 
 
 
 
050d8f8
 
 
1bd7131
 
 
 
 
 
 
 
050d8f8
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
050d8f8
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34f76dc
1bd7131
34f76dc
1bd7131
050d8f8
1bd7131
 
34f76dc
 
1bd7131
 
 
 
 
 
 
 
 
 
 
050d8f8
1bd7131
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Integration Tests for Google OAuth Authentication

Tests the new Google Sign-In flow, JWT token handling, and API access.
"""
import pytest
from unittest.mock import patch, MagicMock
import os
from sqlalchemy import text

from services.google_auth_service import GoogleUserInfo
from services.jwt_service import JWTService


# Cleanup fixture
@pytest.fixture(autouse=True)
def cleanup_db():
    if os.path.exists("./test_blink_data.db"):
        pass
    yield


@pytest.fixture(autouse=True)
async def clear_tables(db_session):
    """Truncate all tables between tests."""
    async with db_session.begin():
        await db_session.execute(text("DELETE FROM users"))
        await db_session.execute(text("DELETE FROM client_users"))
        await db_session.execute(text("DELETE FROM rate_limits"))
        await db_session.execute(text("DELETE FROM audit_logs"))
    await db_session.commit()


@pytest.fixture
def jwt_service():
    """Create a JWT service for testing."""
    return JWTService(secret_key="test-secret-key-for-testing-only")


@pytest.fixture
def mock_google_user():
    """Mock Google user info."""
    return GoogleUserInfo(
        google_id="google_123456789",
        email="test@example.com",
        email_verified=True,
        name="Test User",
        picture="https://example.com/photo.jpg"
    )


class TestGoogleAuth:
    """Test Google OAuth authentication flow."""
    
    @patch("routers.auth.get_google_auth_service")
    def test_google_auth_new_user(self, mock_get_service, client, mock_google_user):
        """Test new user registration via Google."""
        mock_service = MagicMock()
        mock_service.verify_token.return_value = mock_google_user
        mock_get_service.return_value = mock_service
        
        response = client.post("/auth/google", json={
            "id_token": "fake-google-token-12345",
            "temp_user_id": "temp-user-abc"
        })
        
        assert response.status_code == 200
        data = response.json()
        assert data["success"] == True
        assert data["is_new_user"] == True
        assert data["email"] == "test@example.com"
        assert data["name"] == "Test User"
        assert data["credits"] == 100
        assert "access_token" in data
        assert data["access_token"] != ""
    
    @patch("routers.auth.get_google_auth_service")
    def test_google_auth_existing_user(self, mock_get_service, client, mock_google_user):
        """Test existing user login via Google."""
        mock_service = MagicMock()
        mock_service.verify_token.return_value = mock_google_user
        mock_get_service.return_value = mock_service
        
        # First login - creates user
        response1 = client.post("/auth/google", json={"id_token": "token1"})
        assert response1.status_code == 200
        assert response1.json()["is_new_user"] == True
        
        # Second login - same user
        response2 = client.post("/auth/google", json={"id_token": "token2"})
        assert response2.status_code == 200
        data = response2.json()
        assert data["is_new_user"] == False
        assert data["email"] == "test@example.com"
        assert data["credits"] == 100  # Credits preserved
    
    @patch("routers.auth.get_google_auth_service")
    def test_google_auth_invalid_token(self, mock_get_service, client):
        """Test handling of invalid Google token."""
        from services.google_auth_service import InvalidTokenError
        
        mock_service = MagicMock()
        mock_service.verify_token.side_effect = InvalidTokenError("Invalid token")
        mock_get_service.return_value = mock_service
        
        response = client.post("/auth/google", json={"id_token": "invalid-token"})
        
        assert response.status_code == 401
        assert "Invalid Google token" in response.json()["detail"]


class TestJWTAuth:
    """Test JWT token authentication."""
    
    @patch("routers.auth.get_google_auth_service")
    def test_get_current_user(self, mock_get_service, client, mock_google_user):
        """Test getting current user with JWT."""
        mock_service = MagicMock()
        mock_service.verify_token.return_value = mock_google_user
        mock_get_service.return_value = mock_service
        
        # Login to get token
        login_response = client.post("/auth/google", json={"id_token": "token"})
        token = login_response.json()["access_token"]
        
        # Get user info
        response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"})
        
        assert response.status_code == 200
        data = response.json()
        assert data["email"] == "test@example.com"
        assert data["credits"] == 100
    
    def test_missing_auth_header(self, client):
        """Test request without Authorization header."""
        response = client.get("/auth/me")
        assert response.status_code == 401
        assert "Missing Authorization header" in response.json()["detail"]
    
    def test_invalid_token_format(self, client):
        """Test request with invalid token format."""
        response = client.get("/auth/me", headers={"Authorization": "InvalidFormat"})
        assert response.status_code == 401
        assert "Invalid Authorization header format" in response.json()["detail"]
    
    def test_invalid_token(self, client):
        """Test request with invalid JWT token."""
        response = client.get("/auth/me", headers={"Authorization": "Bearer invalid.jwt.token"})
        assert response.status_code == 401


class TestCreditSystem:
    """Test credit deduction system."""
    
    @patch("routers.auth.get_google_auth_service")
    def test_credit_deduction(self, mock_get_service, client, mock_google_user):
        """Test that credits are deducted when using API."""
        mock_service = MagicMock()
        mock_service.verify_token.return_value = mock_google_user
        mock_get_service.return_value = mock_service
        
        # Login
        login_response = client.post("/auth/google", json={"id_token": "token"})
        token = login_response.json()["access_token"]
        initial_credits = login_response.json()["credits"]
        
        # Make an API call that deducts credits (would need gemini endpoint mock)
        # For now, just verify user info doesn't deduct credits
        response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"})
        assert response.json()["credits"] == initial_credits  # No deduction for info endpoint


class TestBlinkFlow:
    """Test blink data collection."""
    
    def test_blink_flow(self, client):
        """Test Blink endpoint still works."""
        user_id = "12345678901234567890"
        encrypted_data = "some_encrypted_data_base64"
        userid_param = user_id + encrypted_data
        
        response = client.get(f"/blink?userid={userid_param}")
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "success"
        assert data["client_user_id"] == user_id  # Changed from user_id
        
        # Verify data stored in audit_logs
        response = client.get("/api/data")
        assert response.status_code == 200
        items = response.json()["items"]
        assert len(items) > 0
        assert items[0]["client_user_id"] == user_id  # Changed from user_id
        assert items[0]["log_type"] == "client"  # New field


class TestRateLimiting:
    """Test rate limiting."""
    
    def test_rate_limiting(self, client):
        """Test rate limiting on auth endpoints."""
        # 10 requests should succeed
        for _ in range(10):
            response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"})
            assert response.status_code == 200
        
        # 11th request should fail
        response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"})
        assert response.status_code == 429