|
|
from datetime import datetime, timezone |
|
|
|
|
|
import pytest |
|
|
from httpx import AsyncClient |
|
|
from langflow.services.auth.utils import create_super_user, get_password_hash |
|
|
from langflow.services.database.models.user import UserUpdate |
|
|
from langflow.services.database.models.user.model import User |
|
|
from langflow.services.database.utils import async_session_getter |
|
|
from langflow.services.deps import get_db_service, get_settings_service |
|
|
from sqlmodel import select |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
async def super_user(client): |
|
|
settings_manager = get_settings_service() |
|
|
auth_settings = settings_manager.auth_settings |
|
|
async with async_session_getter(get_db_service()) as db: |
|
|
return await create_super_user( |
|
|
db=db, |
|
|
username=auth_settings.SUPERUSER, |
|
|
password=auth_settings.SUPERUSER_PASSWORD, |
|
|
) |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
async def super_user_headers( |
|
|
client: AsyncClient, |
|
|
super_user, |
|
|
): |
|
|
settings_service = get_settings_service() |
|
|
auth_settings = settings_service.auth_settings |
|
|
login_data = { |
|
|
"username": auth_settings.SUPERUSER, |
|
|
"password": auth_settings.SUPERUSER_PASSWORD, |
|
|
} |
|
|
response = await client.post("api/v1/login", data=login_data) |
|
|
assert response.status_code == 200 |
|
|
tokens = response.json() |
|
|
a_token = tokens["access_token"] |
|
|
return {"Authorization": f"Bearer {a_token}"} |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
async def deactivated_user(client): |
|
|
async with async_session_getter(get_db_service()) as session: |
|
|
user = User( |
|
|
username="deactivateduser", |
|
|
password=get_password_hash("testpassword"), |
|
|
is_active=False, |
|
|
is_superuser=False, |
|
|
last_login_at=datetime.now(tz=timezone.utc), |
|
|
) |
|
|
session.add(user) |
|
|
await session.commit() |
|
|
await session.refresh(user) |
|
|
return user |
|
|
|
|
|
|
|
|
async def test_user_waiting_for_approval(client): |
|
|
username = "waitingforapproval" |
|
|
password = "testpassword" |
|
|
|
|
|
|
|
|
async with async_session_getter(get_db_service()) as session: |
|
|
stmt = select(User).where(User.username == username) |
|
|
existing_user = (await session.exec(stmt)).first() |
|
|
if existing_user: |
|
|
pytest.fail( |
|
|
f"User {username} already exists before the test. Database URL: {get_db_service().database_url}" |
|
|
) |
|
|
|
|
|
|
|
|
async with async_session_getter(get_db_service()) as session: |
|
|
user = User( |
|
|
username=username, |
|
|
password=get_password_hash(password), |
|
|
is_active=False, |
|
|
last_login_at=None, |
|
|
) |
|
|
session.add(user) |
|
|
await session.commit() |
|
|
|
|
|
login_data = {"username": "waitingforapproval", "password": "testpassword"} |
|
|
response = await client.post("api/v1/login", data=login_data) |
|
|
assert response.status_code == 400 |
|
|
assert response.json()["detail"] == "Waiting for approval" |
|
|
|
|
|
|
|
|
async with async_session_getter(get_db_service()) as session: |
|
|
stmt = select(User).where(User.username == username) |
|
|
existing_user = (await session.exec(stmt)).first() |
|
|
if existing_user: |
|
|
pass |
|
|
else: |
|
|
pytest.fail(f"User {username} does not exist after the test. This is unexpected.") |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_deactivated_user_cannot_login(client: AsyncClient, deactivated_user): |
|
|
login_data = {"username": deactivated_user.username, "password": "testpassword"} |
|
|
response = await client.post("api/v1/login", data=login_data) |
|
|
assert response.status_code == 401, response.json() |
|
|
assert response.json()["detail"] == "Inactive user", response.text |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("deactivated_user") |
|
|
async def test_deactivated_user_cannot_access(client: AsyncClient, logged_in_headers): |
|
|
|
|
|
response = await client.get("api/v1/users/", headers=logged_in_headers) |
|
|
assert response.status_code == 403, response.status_code |
|
|
assert response.json()["detail"] == "The user doesn't have enough privileges", response.text |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_data_consistency_after_update(client: AsyncClient, active_user, logged_in_headers, super_user_headers): |
|
|
user_id = active_user.id |
|
|
update_data = UserUpdate(is_active=False) |
|
|
|
|
|
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=super_user_headers) |
|
|
assert response.status_code == 200, response.json() |
|
|
|
|
|
|
|
|
response = await client.get("api/v1/users/whoami", headers=logged_in_headers) |
|
|
assert response.status_code == 401, response.json() |
|
|
assert response.json()["detail"] == "User not found or is inactive." |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_data_consistency_after_delete(client: AsyncClient, test_user, super_user_headers): |
|
|
user_id = test_user.get("id") |
|
|
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) |
|
|
assert response.status_code == 200, response.json() |
|
|
|
|
|
|
|
|
response = await client.get("api/v1/users/", headers=super_user_headers) |
|
|
assert response.status_code == 200 |
|
|
assert all(user["id"] != user_id for user in response.json()["users"]) |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_inactive_user(client: AsyncClient): |
|
|
|
|
|
async with async_session_getter(get_db_service()) as session: |
|
|
user = User( |
|
|
username="inactiveuser", |
|
|
password=get_password_hash("testpassword"), |
|
|
is_active=False, |
|
|
last_login_at=datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc), |
|
|
) |
|
|
session.add(user) |
|
|
await session.commit() |
|
|
|
|
|
login_data = {"username": "inactiveuser", "password": "testpassword"} |
|
|
response = await client.post("api/v1/login", data=login_data) |
|
|
assert response.status_code == 401 |
|
|
assert response.json()["detail"] == "Inactive user" |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
def test_add_user(test_user): |
|
|
assert test_user["username"] == "testuser" |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_read_all_users(client: AsyncClient, super_user_headers): |
|
|
response = await client.get("api/v1/users/", headers=super_user_headers) |
|
|
assert response.status_code == 200, response.json() |
|
|
assert isinstance(response.json()["users"], list) |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_normal_user_cant_read_all_users(client: AsyncClient, logged_in_headers): |
|
|
response = await client.get("api/v1/users/", headers=logged_in_headers) |
|
|
assert response.status_code == 403, response.json() |
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"} |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_patch_user(client: AsyncClient, active_user, logged_in_headers): |
|
|
user_id = active_user.id |
|
|
update_data = UserUpdate( |
|
|
username="newname", |
|
|
) |
|
|
|
|
|
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) |
|
|
assert response.status_code == 200, response.json() |
|
|
update_data = UserUpdate( |
|
|
profile_image="new_image", |
|
|
) |
|
|
|
|
|
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) |
|
|
assert response.status_code == 200, response.json() |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_patch_reset_password(client: AsyncClient, active_user, logged_in_headers): |
|
|
user_id = active_user.id |
|
|
update_data = UserUpdate( |
|
|
password="newpassword", |
|
|
) |
|
|
|
|
|
response = await client.patch( |
|
|
f"/api/v1/users/{user_id}/reset-password", |
|
|
json=update_data.model_dump(), |
|
|
headers=logged_in_headers, |
|
|
) |
|
|
assert response.status_code == 200, response.json() |
|
|
|
|
|
login_data = {"username": active_user.username, "password": "newpassword"} |
|
|
response = await client.post("api/v1/login", data=login_data) |
|
|
assert response.status_code == 200 |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
@pytest.mark.usefixtures("active_user") |
|
|
async def test_patch_user_wrong_id(client: AsyncClient, logged_in_headers): |
|
|
user_id = "wrong_id" |
|
|
update_data = UserUpdate( |
|
|
username="newname", |
|
|
) |
|
|
|
|
|
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) |
|
|
assert response.status_code == 422, response.json() |
|
|
json_response = response.json() |
|
|
detail = json_response["detail"] |
|
|
error = detail[0] |
|
|
assert error["loc"] == ["path", "user_id"] |
|
|
assert error["type"] == "uuid_parsing" |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_delete_user(client: AsyncClient, test_user, super_user_headers): |
|
|
user_id = test_user["id"] |
|
|
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) |
|
|
assert response.status_code == 200 |
|
|
assert response.json() == {"detail": "User deleted"} |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
@pytest.mark.usefixtures("test_user") |
|
|
async def test_delete_user_wrong_id(client: AsyncClient, super_user_headers): |
|
|
user_id = "wrong_id" |
|
|
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) |
|
|
assert response.status_code == 422 |
|
|
json_response = response.json() |
|
|
detail = json_response["detail"] |
|
|
error = detail[0] |
|
|
assert error["loc"] == ["path", "user_id"] |
|
|
assert error["type"] == "uuid_parsing" |
|
|
|
|
|
|
|
|
@pytest.mark.api_key_required |
|
|
async def test_normal_user_cant_delete_user(client: AsyncClient, test_user, logged_in_headers): |
|
|
user_id = test_user["id"] |
|
|
response = await client.delete(f"/api/v1/users/{user_id}", headers=logged_in_headers) |
|
|
assert response.status_code == 403 |
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"} |
|
|
|