Multimodel_Rag / tests /test_api.py
Dhrumil Parikh
deploy GeminiRAG
cdc55f4
Raw
History Blame Contribute Delete
5.01 kB
import io
from fastapi.testclient import TestClient
def test_health(client: TestClient):
resp = client.get("/health")
# ChromaDB and DB may be unavailable in test environment; accept both
assert resp.status_code in (200, 503)
body = resp.json()
assert "status" in body
def test_register_and_login(client: TestClient):
resp = client.post("/auth/register", json={
"email": "testuser@example.com",
"password": "TestPass123",
"role": "user",
})
assert resp.status_code == 201
data = resp.json()
assert data["email"] == "testuser@example.com"
assert data["role"] == "user"
resp2 = client.post("/auth/login", json={
"email": "testuser@example.com",
"password": "TestPass123",
})
assert resp2.status_code == 200
token_data = resp2.json()
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
def test_register_duplicate_email(client: TestClient):
client.post("/auth/register", json={
"email": "dup@example.com",
"password": "pass",
"role": "user",
})
resp = client.post("/auth/register", json={
"email": "dup@example.com",
"password": "pass",
"role": "user",
})
assert resp.status_code == 409
def test_upload_requires_auth(client: TestClient):
resp = client.post("/v1/files/upload", files={"file": ("test.pdf", io.BytesIO(b"data"), "application/pdf")})
assert resp.status_code == 401
def test_upload_unsupported_type(client: TestClient):
client.post("/auth/register", json={"email": "uploader@example.com", "password": "pass123", "role": "user"})
login = client.post("/auth/login", json={"email": "uploader@example.com", "password": "pass123"})
token = login.json()["access_token"]
resp = client.post(
"/v1/files/upload",
files={"file": ("test.exe", io.BytesIO(b"data"), "application/octet-stream")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 400
assert "Unsupported file type" in resp.json()["detail"]
def test_upload_file_too_large(client: TestClient, monkeypatch):
import app.api.files as files_mod
monkeypatch.setattr(files_mod, "MAX_FILE_SIZE_BYTES", 10)
client.post("/auth/register", json={"email": "bigfile@example.com", "password": "pass123", "role": "user"})
login = client.post("/auth/login", json={"email": "bigfile@example.com", "password": "pass123"})
token = login.json()["access_token"]
resp = client.post(
"/v1/files/upload",
files={"file": ("big.pdf", io.BytesIO(b"x" * 20), "application/pdf")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 413
def test_get_job_wrong_user_403(client: TestClient):
# Create two users; user A creates a job, user B tries to read it
import uuid
client.post("/auth/register", json={"email": "owner@example.com", "password": "pass123", "role": "user"})
client.post("/auth/register", json={"email": "other@example.com", "password": "pass123", "role": "user"})
# owner logs in — we just confirm 403 on a fake UUID, since upload requires Celery
login_b = client.post("/auth/login", json={"email": "other@example.com", "password": "pass123"})
token_b = login_b.json()["access_token"]
fake_id = str(uuid.uuid4())
resp = client.get(f"/v1/jobs/{fake_id}", headers={"Authorization": f"Bearer {token_b}"})
assert resp.status_code in (403, 404)
def test_admin_usage_requires_admin(client: TestClient):
client.post("/auth/register", json={"email": "regular@example.com", "password": "pass", "role": "user"})
login = client.post("/auth/login", json={"email": "regular@example.com", "password": "pass"})
token = login.json()["access_token"]
resp = client.get("/v1/admin/usage", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 403
def test_login_inactive_user(client: TestClient):
# Register, deactivate via admin, then try login
client.post("/auth/register", json={"email": "inactive@example.com", "password": "pass123", "role": "user"})
# Register admin
client.post("/auth/register", json={"email": "admn@example.com", "password": "pass123", "role": "admin"})
admin_login = client.post("/auth/login", json={"email": "admn@example.com", "password": "pass123"})
admin_token = admin_login.json()["access_token"]
# Get user id
users_resp = client.get("/v1/admin/users", headers={"Authorization": f"Bearer {admin_token}"})
user = next(u for u in users_resp.json() if u["email"] == "inactive@example.com")
# Deactivate
client.patch(
f"/v1/admin/users/{user['id']}",
json={"is_active": False},
headers={"Authorization": f"Bearer {admin_token}"},
)
# Try login as deactivated user
resp = client.post("/auth/login", json={"email": "inactive@example.com", "password": "pass123"})
assert resp.status_code == 401