PDF-Assit_RAG / backend /tests /test_auth.py
Param20h's picture
deploy: pure backend API with keywords fix
7c46845 unverified
Raw
History Blame Contribute Delete
12.5 kB
from datetime import datetime, timedelta, timezone
import jwt
from app.config import get_settings
VALID_TEST_PASSWORD = "Password1!"
def test_register_success(client):
response = client.post(
"/api/v1/auth/register",
json={
"username": "newuser",
"email": "newuser@example.com",
"password": VALID_TEST_PASSWORD,
},
)
assert response.status_code == 201
payload = response.json()
assert payload["message"] == "Registration successful. Please check your email to verify your account before logging in."
assert payload["email"] == "newuser@example.com"
assert payload["verification_url"].startswith("/verify-email?token=")
def test_register_duplicate_email_or_username_conflict(client):
payload = {
"username": "dupuser",
"email": "dup@example.com",
"password": VALID_TEST_PASSWORD,
}
first = client.post("/api/v1/auth/register", json=payload)
assert first.status_code == 201
duplicate_email = client.post(
"/api/v1/auth/register",
json={**payload, "username": "anotheruser"},
)
assert duplicate_email.status_code == 409
assert duplicate_email.json()["error"]["message"] == "Email already registered"
duplicate_username = client.post(
"/api/v1/auth/register",
json={**payload, "email": "another@example.com"},
)
assert duplicate_username.status_code == 409
assert duplicate_username.json()["error"]["message"] == "Username already taken"
def test_register_rejects_weak_password(client):
response = client.post(
"/api/v1/auth/register",
json={
"username": "weakpassuser",
"email": "weakpass@example.com",
"password": "123456",
},
)
assert response.status_code == 422
errors = response.json()["error"]["details"]["errors"]
messages = " ".join(item["message"] for item in errors)
assert "uppercase" in messages.lower() or "8 characters" in messages.lower()
def test_register_rejects_password_missing_special_character(client):
response = client.post(
"/api/v1/auth/register",
json={
"username": "specialcharuser",
"email": "specialchar@example.com",
"password": "Password1",
},
)
assert response.status_code == 422
errors = response.json()["error"]["details"]["errors"]
messages = " ".join(item["message"] for item in errors).lower()
assert "special character" in messages
def test_login_success(client, user):
response = client.post(
"/api/v1/auth/login",
json={"email": user.email, "password": "password123"},
)
assert response.status_code == 200
payload = response.json()
assert payload["access_token"]
assert payload["refresh_token"]
assert payload["user"]["username"] == user.username
def test_login_invalid_password(client, user):
response = client.post(
"/api/v1/auth/login",
json={"email": user.email, "password": "wrong-password"},
)
assert response.status_code == 401
assert response.json()["error"]["message"] == "Invalid email or password"
def test_login_invalid_email(client):
response = client.post(
"/api/v1/auth/login",
json={"email": "missing@example.com", "password": "password123"},
)
assert response.status_code == 401
assert response.json()["error"]["message"] == "Invalid email or password"
def test_auth_me_success(client, auth_headers, user):
response = client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
payload = response.json()
assert payload["id"] == str(user.id)
assert payload["username"] == user.username
assert payload["email"] == user.email
def test_auth_me_requires_auth(client):
response = client.get("/api/v1/auth/me")
assert response.status_code in (401, 403)
def test_auth_me_rejects_expired_token(client, user):
settings = get_settings()
now = datetime.now(timezone.utc)
expired_token = jwt.encode(
{
"sub": str(user.id),
"type": "access",
"exp": now - timedelta(minutes=1),
"iat": now - timedelta(minutes=2),
},
settings.SECRET_KEY,
algorithm=settings.JWT_ALGORITHM,
)
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {expired_token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "Invalid or expired token"
def test_refresh_token_success(client, refresh_token):
response = client.post(
"/api/v1/auth/refresh",
json={"refresh_token": refresh_token},
)
assert response.status_code == 200
payload = response.json()
assert payload["access_token"]
assert payload["refresh_token"]
assert payload["token_type"] == "bearer"
def test_update_hf_token_success(client, auth_headers):
response = client.put(
"/api/v1/auth/hf-token",
json={"hf_token": "hf_new_token_value"},
headers=auth_headers,
)
assert response.status_code == 200
payload = response.json()
assert payload["hf_token"] == "hf_new_token_value"
def test_update_hf_token_requires_auth(client):
response = client.put(
"/api/v1/auth/hf-token",
json={"hf_token": "hf_unauth"},
)
assert response.status_code in (401, 403)
def test_hf_token_appears_in_user_response(client, auth_headers, user, db_session):
# First update the token
put_resp = client.put(
"/api/v1/auth/hf-token",
json={"hf_token": "hf_persist_token"},
headers=auth_headers,
)
assert put_resp.status_code == 200
# Then verify it shows up in GET /me
me_resp = client.get("/api/v1/auth/me", headers=auth_headers)
assert me_resp.status_code == 200
assert me_resp.json()["hf_token"] == "hf_persist_token"
# Verify encryption at rest in the database directly
from sqlalchemy import text
row = db_session.execute(text("SELECT hf_token FROM users WHERE id = :id"), {"id": user.id}).fetchone()
stored_token = row[0]
assert stored_token is not None
assert stored_token != "hf_persist_token"
def test_update_user_info_rejects_duplicate_email(client, auth_headers, other_user):
response = client.put(
"/api/v1/auth/update",
json={"email": other_user.email},
headers=auth_headers,
)
assert response.status_code == 400
assert response.json()["error"]["message"] == "Email already exists"
from unittest.mock import patch, AsyncMock, MagicMock
import urllib.parse
def test_huggingface_login(client):
from app.config import get_settings
settings = get_settings()
settings.HF_CLIENT_ID = "test-client-id"
settings.HF_REDIRECT_URI = "http://localhost:8000/api/v1/auth/callback/huggingface"
response = client.get("/api/v1/auth/login/huggingface")
assert response.status_code == 200
data = response.json()
assert "url" in data
assert "test-client-id" in data["url"]
assert "oauth_state" in response.cookies
@patch("httpx.AsyncClient.post")
@patch("httpx.AsyncClient.get")
def test_huggingface_callback_success(mock_get, mock_post, client):
from app.config import get_settings
settings = get_settings()
settings.HF_CLIENT_ID = "test-client-id"
settings.HF_CLIENT_SECRET = "test-client-secret"
settings.HF_REDIRECT_URI = "http://localhost:8000/api/v1/auth/callback/huggingface"
mock_post_resp = MagicMock()
mock_post_resp.status_code = 200
mock_post_resp.json.return_value = {"access_token": "hf-access-token"}
mock_post.return_value = mock_post_resp
mock_get_resp = MagicMock()
mock_get_resp.status_code = 200
mock_get_resp.json.return_value = {
"email": "hfuser@example.com",
"preferred_username": "hfuser"
}
mock_get.return_value = mock_get_resp
login_response = client.get("/api/v1/auth/login/huggingface")
state_cookie = login_response.cookies["oauth_state"]
url = login_response.json()["url"]
parsed = urllib.parse.urlparse(url)
queries = urllib.parse.parse_qs(parsed.query)
state_param = queries["state"][0]
client.cookies.set("oauth_state", state_cookie)
callback_response = client.get(
f"/api/v1/auth/callback/huggingface?code=hf-code&state={state_param}",
follow_redirects=False
)
assert callback_response.status_code == 307
assert "/dashboard" in callback_response.headers["location"]
assert "access_token" in callback_response.cookies
assert "refresh_token" in callback_response.cookies
def test_huggingface_callback_invalid_state(client):
response = client.get(
"/api/v1/auth/callback/huggingface?code=hf-code&state=invalid-state",
cookies={"oauth_state": "actual-state"}
)
assert response.status_code == 400
assert "State verification failed" in response.json()["error"]["message"]
def test_huggingface_logout(client):
response = client.post(
"/api/v1/auth/logout",
cookies={"access_token": "token-value", "refresh_token": "refresh-value"}
)
assert response.status_code == 200
assert response.cookies.get("access_token") in (None, "")
assert response.cookies.get("refresh_token") in (None, "")
def test_google_drive_connect_returns_auth_url(client, auth_headers, monkeypatch):
from app.routes import auth as auth_routes
class FakeFlow:
def authorization_url(self, **kwargs):
assert kwargs["access_type"] == "offline"
assert kwargs["prompt"] == "consent"
return "https://accounts.google.com/o/oauth2/auth?state=signed-state", "signed-state"
monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_ID", "google-client-id")
monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_SECRET", "google-client-secret")
monkeypatch.setattr(auth_routes, "_google_drive_flow", lambda state: FakeFlow())
response = client.get("/api/v1/auth/google-drive/connect", headers=auth_headers)
assert response.status_code == 200
assert response.json()["auth_url"].startswith("https://accounts.google.com")
def test_google_drive_status_reflects_stored_token(client, auth_headers, user, db_session):
initial = client.get("/api/v1/auth/google-drive/status", headers=auth_headers)
assert initial.status_code == 200
assert initial.json() == {"connected": False}
user.google_refresh_token = "google-refresh-token"
db_session.commit()
connected = client.get("/api/v1/auth/google-drive/status", headers=auth_headers)
assert connected.status_code == 200
assert connected.json() == {"connected": True}
def test_google_drive_callback_stores_encrypted_refresh_token(client, user, db_session, monkeypatch):
from app.routes import auth as auth_routes
from sqlalchemy import text
class FakeCredentials:
refresh_token = "google-refresh-token"
class FakeFlow:
credentials = FakeCredentials()
def fetch_token(self, code):
assert code == "oauth-code"
monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_ID", "google-client-id")
monkeypatch.setattr(auth_routes.settings, "GOOGLE_CLIENT_SECRET", "google-client-secret")
monkeypatch.setattr(auth_routes, "_google_drive_flow", lambda state: FakeFlow())
state = auth_routes._create_google_drive_state(user.id)
response = client.get(
"/api/v1/auth/google-drive/callback",
params={"code": "oauth-code", "state": state},
follow_redirects=False,
)
assert response.status_code == 200
assert "Google Drive connected" in response.text
db_session.refresh(user)
assert user.google_refresh_token == "google-refresh-token"
row = db_session.execute(text("SELECT google_refresh_token FROM users WHERE id = :id"), {"id": user.id}).fetchone()
stored_token = row[0]
assert stored_token is not None
assert stored_token != "google-refresh-token"
def test_google_drive_disconnect_removes_token(client, auth_headers, user, db_session):
user.google_refresh_token = "google-refresh-token"
db_session.commit()
response = client.delete("/api/v1/auth/google-drive/disconnect", headers=auth_headers)
assert response.status_code == 200
assert response.json() == {"connected": False}
db_session.refresh(user)
assert user.google_refresh_token is None