Spaces:
Sleeping
Sleeping
| """ | |
| Integration Tests for Google OAuth Authentication | |
| Tests the new Google Sign-In flow, JWT token handling, and API access. | |
| """ | |
| import pytest | |
| from unittest.mock import patch, MagicMock | |
| import os | |
| from sqlalchemy import text | |
| from services.google_auth_service import GoogleUserInfo | |
| from services.jwt_service import JWTService | |
| # Cleanup fixture | |
| def cleanup_db(): | |
| if os.path.exists("./test_blink_data.db"): | |
| pass | |
| yield | |
| async def clear_tables(db_session): | |
| """Truncate all tables between tests.""" | |
| async with db_session.begin(): | |
| await db_session.execute(text("DELETE FROM users")) | |
| await db_session.execute(text("DELETE FROM client_users")) | |
| await db_session.execute(text("DELETE FROM rate_limits")) | |
| await db_session.execute(text("DELETE FROM audit_logs")) | |
| await db_session.commit() | |
| def jwt_service(): | |
| """Create a JWT service for testing.""" | |
| return JWTService(secret_key="test-secret-key-for-testing-only") | |
| def mock_google_user(): | |
| """Mock Google user info.""" | |
| return GoogleUserInfo( | |
| google_id="google_123456789", | |
| email="test@example.com", | |
| email_verified=True, | |
| name="Test User", | |
| picture="https://example.com/photo.jpg" | |
| ) | |
| class TestGoogleAuth: | |
| """Test Google OAuth authentication flow.""" | |
| def test_google_auth_new_user(self, mock_get_service, client, mock_google_user): | |
| """Test new user registration via Google.""" | |
| mock_service = MagicMock() | |
| mock_service.verify_token.return_value = mock_google_user | |
| mock_get_service.return_value = mock_service | |
| response = client.post("/auth/google", json={ | |
| "id_token": "fake-google-token-12345", | |
| "temp_user_id": "temp-user-abc" | |
| }) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["success"] == True | |
| assert data["is_new_user"] == True | |
| assert data["email"] == "test@example.com" | |
| assert data["name"] == "Test User" | |
| assert data["credits"] == 100 | |
| assert "access_token" in data | |
| assert data["access_token"] != "" | |
| def test_google_auth_existing_user(self, mock_get_service, client, mock_google_user): | |
| """Test existing user login via Google.""" | |
| mock_service = MagicMock() | |
| mock_service.verify_token.return_value = mock_google_user | |
| mock_get_service.return_value = mock_service | |
| # First login - creates user | |
| response1 = client.post("/auth/google", json={"id_token": "token1"}) | |
| assert response1.status_code == 200 | |
| assert response1.json()["is_new_user"] == True | |
| # Second login - same user | |
| response2 = client.post("/auth/google", json={"id_token": "token2"}) | |
| assert response2.status_code == 200 | |
| data = response2.json() | |
| assert data["is_new_user"] == False | |
| assert data["email"] == "test@example.com" | |
| assert data["credits"] == 100 # Credits preserved | |
| def test_google_auth_invalid_token(self, mock_get_service, client): | |
| """Test handling of invalid Google token.""" | |
| from services.google_auth_service import InvalidTokenError | |
| mock_service = MagicMock() | |
| mock_service.verify_token.side_effect = InvalidTokenError("Invalid token") | |
| mock_get_service.return_value = mock_service | |
| response = client.post("/auth/google", json={"id_token": "invalid-token"}) | |
| assert response.status_code == 401 | |
| assert "Invalid Google token" in response.json()["detail"] | |
| class TestJWTAuth: | |
| """Test JWT token authentication.""" | |
| def test_get_current_user(self, mock_get_service, client, mock_google_user): | |
| """Test getting current user with JWT.""" | |
| mock_service = MagicMock() | |
| mock_service.verify_token.return_value = mock_google_user | |
| mock_get_service.return_value = mock_service | |
| # Login to get token | |
| login_response = client.post("/auth/google", json={"id_token": "token"}) | |
| token = login_response.json()["access_token"] | |
| # Get user info | |
| response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"}) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["email"] == "test@example.com" | |
| assert data["credits"] == 100 | |
| def test_missing_auth_header(self, client): | |
| """Test request without Authorization header.""" | |
| response = client.get("/auth/me") | |
| assert response.status_code == 401 | |
| assert "Missing Authorization header" in response.json()["detail"] | |
| def test_invalid_token_format(self, client): | |
| """Test request with invalid token format.""" | |
| response = client.get("/auth/me", headers={"Authorization": "InvalidFormat"}) | |
| assert response.status_code == 401 | |
| assert "Invalid Authorization header format" in response.json()["detail"] | |
| def test_invalid_token(self, client): | |
| """Test request with invalid JWT token.""" | |
| response = client.get("/auth/me", headers={"Authorization": "Bearer invalid.jwt.token"}) | |
| assert response.status_code == 401 | |
| class TestCreditSystem: | |
| """Test credit deduction system.""" | |
| def test_credit_deduction(self, mock_get_service, client, mock_google_user): | |
| """Test that credits are deducted when using API.""" | |
| mock_service = MagicMock() | |
| mock_service.verify_token.return_value = mock_google_user | |
| mock_get_service.return_value = mock_service | |
| # Login | |
| login_response = client.post("/auth/google", json={"id_token": "token"}) | |
| token = login_response.json()["access_token"] | |
| initial_credits = login_response.json()["credits"] | |
| # Make an API call that deducts credits (would need gemini endpoint mock) | |
| # For now, just verify user info doesn't deduct credits | |
| response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"}) | |
| assert response.json()["credits"] == initial_credits # No deduction for info endpoint | |
| class TestBlinkFlow: | |
| """Test blink data collection.""" | |
| def test_blink_flow(self, client): | |
| """Test Blink endpoint still works.""" | |
| user_id = "12345678901234567890" | |
| encrypted_data = "some_encrypted_data_base64" | |
| userid_param = user_id + encrypted_data | |
| response = client.get(f"/blink?userid={userid_param}") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "success" | |
| assert data["client_user_id"] == user_id # Changed from user_id | |
| # Verify data stored in audit_logs | |
| response = client.get("/api/data") | |
| assert response.status_code == 200 | |
| items = response.json()["items"] | |
| assert len(items) > 0 | |
| assert items[0]["client_user_id"] == user_id # Changed from user_id | |
| assert items[0]["log_type"] == "client" # New field | |
| class TestRateLimiting: | |
| """Test rate limiting.""" | |
| def test_rate_limiting(self, client): | |
| """Test rate limiting on auth endpoints.""" | |
| # 10 requests should succeed | |
| for _ in range(10): | |
| response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"}) | |
| assert response.status_code == 200 | |
| # 11th request should fail | |
| response = client.post("/auth/check-registration", json={"user_id": "rate-limit-test"}) | |
| assert response.status_code == 429 | |