from datetime import datetime, timedelta, timezone import jwt from app.config import get_settings VALID_TEST_PASSWORD = "Password1!" def test_register_success(client): response = client.post( "/api/v1/auth/register", json={ "username": "newuser", "email": "newuser@example.com", "password": VALID_TEST_PASSWORD, }, ) assert response.status_code == 201 payload = response.json() assert payload["message"] == "Registration successful. Please check your email to verify your account before logging in." assert payload["email"] == "newuser@example.com" assert payload["verification_url"].startswith("/verify-email?token=") def test_register_duplicate_email_or_username_conflict(client): payload = { "username": "dupuser", "email": "dup@example.com", "password": VALID_TEST_PASSWORD, } first = client.post("/api/v1/auth/register", json=payload) assert first.status_code == 201 duplicate_email = client.post( "/api/v1/auth/register", json={**payload, "username": "anotheruser"}, ) assert duplicate_email.status_code == 409 assert duplicate_email.json()["error"]["message"] == "Email already registered" duplicate_username = client.post( "/api/v1/auth/register", json={**payload, "email": "another@example.com"}, ) assert duplicate_username.status_code == 409 assert duplicate_username.json()["error"]["message"] == "Username already taken" def test_register_rejects_weak_password(client): response = client.post( "/api/v1/auth/register", json={ "username": "weakpassuser", "email": "weakpass@example.com", "password": "123456", }, ) assert response.status_code == 422 errors = response.json()["error"]["details"]["errors"] messages = " ".join(item["message"] for item in errors) assert "uppercase" in messages.lower() or "8 characters" in messages.lower() def test_register_rejects_password_missing_special_character(client): response = client.post( "/api/v1/auth/register", json={ "username": "specialcharuser", "email": "specialchar@example.com", "password": "Password1", }, ) assert response.status_code == 422 errors = response.json()["error"]["details"]["errors"] messages = " ".join(item["message"] for item in errors).lower() assert "special character" in messages def test_login_success(client, user): response = client.post( "/api/v1/auth/login", json={"email": user.email, "password": "password123"}, ) assert response.status_code == 200 payload = response.json() assert payload["access_token"] assert payload["refresh_token"] assert payload["user"]["username"] == user.username def test_login_invalid_password(client, user): response = client.post( "/api/v1/auth/login", json={"email": user.email, "password": "wrong-password"}, ) assert response.status_code == 401 assert response.json()["error"]["message"] == "Invalid email or password" def test_login_invalid_email(client): response = client.post( "/api/v1/auth/login", json={"email": "missing@example.com", "password": "password123"}, ) assert response.status_code == 401 assert response.json()["error"]["message"] == "Invalid email or password" def test_auth_me_success(client, auth_headers, user): response = client.get("/api/v1/auth/me", headers=auth_headers) assert response.status_code == 200 payload = response.json() assert payload["id"] == str(user.id) assert payload["username"] == user.username assert payload["email"] == user.email def test_auth_me_requires_auth(client): response = client.get("/api/v1/auth/me") assert response.status_code in (401, 403) def test_auth_me_rejects_expired_token(client, user): settings = get_settings() now = datetime.now(timezone.utc) expired_token = jwt.encode( { "sub": str(user.id), "type": "access", "exp": now - timedelta(minutes=1), "iat": now - timedelta(minutes=2), }, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM, ) response = client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {expired_token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "Invalid or expired token" def test_refresh_token_success(client, refresh_token): response = client.post( "/api/v1/auth/refresh", json={"refresh_token": refresh_token}, ) assert response.status_code == 200 payload = response.json() assert payload["access_token"] assert payload["refresh_token"] assert payload["token_type"] == "bearer" def test_update_hf_token_success(client, auth_headers): response = client.put( "/api/v1/auth/hf-token", json={"hf_token": "hf_new_token_value"}, headers=auth_headers, ) assert response.status_code == 200 payload = response.json() assert payload["hf_token"] == "hf_new_token_value" def test_update_hf_token_requires_auth(client): response = client.put( "/api/v1/auth/hf-token", json={"hf_token": "hf_unauth"}, ) assert response.status_code in (401, 403) def test_hf_token_appears_in_user_response(client, auth_headers, user, db_session): # First update the token put_resp = client.put( "/api/v1/auth/hf-token", json={"hf_token": "hf_persist_token"}, headers=auth_headers, ) assert put_resp.status_code == 200 # Then verify it shows up in GET /me me_resp = client.get("/api/v1/auth/me", headers=auth_headers) assert me_resp.status_code == 200 assert me_resp.json()["hf_token"] == "hf_persist_token" # Verify encryption at rest in the database directly from sqlalchemy import text row = db_session.execute(text("SELECT hf_token FROM users WHERE id = :id"), {"id": user.id}).fetchone() stored_token = row[0] assert stored_token is not None assert stored_token != "hf_persist_token" def test_update_user_info_rejects_duplicate_email(client, auth_headers, other_user): response = client.put( "/api/v1/auth/update", json={"email": other_user.email}, headers=auth_headers, ) assert response.status_code == 400 assert response.json()["error"]["message"] == "Email already exists" from unittest.mock import patch, AsyncMock, MagicMock import urllib.parse def test_huggingface_login(client): from app.config import get_settings settings = get_settings() settings.HF_CLIENT_ID = "test-client-id" settings.HF_REDIRECT_URI = "http://localhost:8000/api/v1/auth/callback/huggingface" response = client.get("/api/v1/auth/login/huggingface") assert response.status_code == 200 data = response.json() assert "url" in data assert "test-client-id" in data["url"] assert "oauth_state" in response.cookies @patch("httpx.AsyncClient.post") @patch("httpx.AsyncClient.get") def test_huggingface_callback_success(mock_get, mock_post, client): from app.config import get_settings settings = get_settings() settings.HF_CLIENT_ID = "test-client-id" settings.HF_CLIENT_SECRET = "test-client-secret" settings.HF_REDIRECT_URI = "http://localhost:8000/api/v1/auth/callback/huggingface" mock_post_resp = MagicMock() mock_post_resp.status_code = 200 mock_post_resp.json.return_value = {"access_token": "hf-access-token"} mock_post.return_value = mock_post_resp mock_get_resp = MagicMock() mock_get_resp.status_code = 200 mock_get_resp.json.return_value = { "email": "hfuser@example.com", "preferred_username": "hfuser" } mock_get.return_value = mock_get_resp login_response = client.get("/api/v1/auth/login/huggingface") state_cookie = login_response.cookies["oauth_state"] url = login_response.json()["url"] parsed = urllib.parse.urlparse(url) queries = urllib.parse.parse_qs(parsed.query) state_param = queries["state"][0] client.cookies.set("oauth_state", state_cookie) callback_response = client.get( f"/api/v1/auth/callback/huggingface?code=hf-code&state={state_param}", follow_redirects=False ) assert callback_response.status_code == 307 assert "/dashboard" in callback_response.headers["location"] assert "access_token" in callback_response.cookies assert "refresh_token" in callback_response.cookies def test_huggingface_callback_invalid_state(client): response = client.get( "/api/v1/auth/callback/huggingface?code=hf-code&state=invalid-state", cookies={"oauth_state": "actual-state"} ) assert response.status_code == 400 assert "State verification failed" in response.json()["error"]["message"] def test_huggingface_logout(client): response = client.post( "/api/v1/auth/logout", cookies={"access_token": "token-value", "refresh_token": "refresh-value"} ) assert response.status_code == 200 assert response.cookies.get("access_token") in (None, "") assert response.cookies.get("refresh_token") in (None, "") def test_google_drive_connect_returns_auth_url(client, auth_headers, monkeypatch): from app.routes import auth as auth_routes class FakeFlow: def authorization_url(self, **kwargs): assert kwargs["access_type"] == "offline" assert kwargs["prompt"] == "consent" return "https://accounts.google.com/o/oauth2/auth?state=signed-state", "signed-state" monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_ID", "google-client-id") monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_SECRET", "google-client-secret") monkeypatch.setattr(auth_routes, "_google_drive_flow", lambda state: FakeFlow()) response = client.get("/api/v1/auth/google-drive/connect", headers=auth_headers) assert response.status_code == 200 assert response.json()["auth_url"].startswith("https://accounts.google.com") def test_google_drive_status_reflects_stored_token(client, auth_headers, user, db_session): initial = client.get("/api/v1/auth/google-drive/status", headers=auth_headers) assert initial.status_code == 200 assert initial.json() == {"connected": False} user.google_refresh_token = "google-refresh-token" db_session.commit() connected = client.get("/api/v1/auth/google-drive/status", headers=auth_headers) assert connected.status_code == 200 assert connected.json() == {"connected": True} def test_google_drive_callback_stores_encrypted_refresh_token(client, user, db_session, monkeypatch): from app.routes import auth as auth_routes from sqlalchemy import text class FakeCredentials: refresh_token = "google-refresh-token" class FakeFlow: credentials = FakeCredentials() def fetch_token(self, code): assert code == "oauth-code" monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_ID", "google-client-id") monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_SECRET", "google-client-secret") monkeypatch.setattr(auth_routes, "_google_drive_flow", lambda state: FakeFlow()) state = auth_routes._create_google_drive_state(user.id) response = client.get( "/api/v1/auth/google-drive/callback", params={"code": "oauth-code", "state": state}, follow_redirects=False, ) assert response.status_code == 200 assert "Google Drive connected" in response.text db_session.refresh(user) assert user.google_refresh_token == "google-refresh-token" row = db_session.execute(text("SELECT google_refresh_token FROM users WHERE id = :id"), {"id": user.id}).fetchone() stored_token = row[0] assert stored_token is not None assert stored_token != "google-refresh-token" def test_google_drive_disconnect_removes_token(client, auth_headers, user, db_session): user.google_refresh_token = "google-refresh-token" db_session.commit() response = client.delete("/api/v1/auth/google-drive/disconnect", headers=auth_headers) assert response.status_code == 200 assert response.json() == {"connected": False} db_session.refresh(user) assert user.google_refresh_token is None