ppt-web / tests /test_api_key_auth.py
26fwyzpz6f-max
Clean deploy without binary files
6aecb2e
Raw
History Blame Contribute Delete
7.87 kB
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from types import SimpleNamespace
import pytest
class _HeaderMap(dict):
def __init__(self, values: dict | None = None):
super().__init__()
for key, value in (values or {}).items():
self[str(key).lower()] = str(value)
def get(self, key, default=None):
return super().get(str(key).lower(), default)
class _FakeRequest:
def __init__(self, headers: dict | None = None, cookies: dict | None = None):
self.headers = _HeaderMap(headers)
self.cookies = dict(cookies or {})
self.state = SimpleNamespace()
def _build_request(headers: dict | None = None, cookies: dict | None = None):
return _FakeRequest(headers=headers, cookies=cookies)
def _create_db():
from landppt.database.models import Base, User, UserSession, UserAPIKey
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(engine, tables=[User.__table__, UserSession.__table__, UserAPIKey.__table__])
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
return SessionLocal()
def _create_user(db, username: str, email: str):
from landppt.database.models import User
user = User(username=username, email=email, is_admin=False, is_active=True, credits_balance=0)
user.set_password("pw")
db.add(user)
db.commit()
db.refresh(user)
return user
def test_auth_service_supports_single_api_key(monkeypatch):
from landppt.auth.auth_service import AuthService
from landppt.core.config import app_config
db = _create_db()
try:
admin = _create_user(db, "admin", "admin@example.com")
auth = AuthService()
monkeypatch.setattr(app_config, "api_key", "n8n-single-key")
monkeypatch.setattr(app_config, "api_key_user", "admin")
monkeypatch.setattr(app_config, "api_keys", None)
resolved = auth.get_user_by_api_key(db, "n8n-single-key")
assert resolved is not None
assert resolved.id == admin.id
assert auth.get_user_by_api_key(db, "wrong-key") is None
finally:
db.close()
def test_auth_service_supports_multiple_api_key_bindings(monkeypatch):
from landppt.auth.auth_service import AuthService
from landppt.core.config import app_config
db = _create_db()
try:
admin = _create_user(db, "admin", "admin@example.com")
alice = _create_user(db, "alice", "alice@example.com")
auth = AuthService()
monkeypatch.setattr(app_config, "api_key", None)
monkeypatch.setattr(app_config, "api_key_user", "admin")
monkeypatch.setattr(app_config, "api_keys", "alice:key-a,admin:key-admin,key-default")
resolved_alice = auth.get_user_by_api_key(db, "key-a")
assert resolved_alice is not None
assert resolved_alice.id == alice.id
resolved_admin = auth.get_user_by_api_key(db, "key-admin")
assert resolved_admin is not None
assert resolved_admin.id == admin.id
# Key without user binding falls back to LANDPPT_API_KEY_USER
resolved_default = auth.get_user_by_api_key(db, "key-default")
assert resolved_default is not None
assert resolved_default.id == admin.id
finally:
db.close()
def test_get_current_user_optional_reads_api_key_header(monkeypatch):
pytest.importorskip("fastapi")
from landppt.auth.middleware import get_current_user_optional
from landppt.core.config import app_config
db = _create_db()
try:
admin = _create_user(db, "admin", "admin@example.com")
monkeypatch.setattr(app_config, "api_key", "n8n-header-key")
monkeypatch.setattr(app_config, "api_key_user", "admin")
monkeypatch.setattr(app_config, "api_keys", None)
request = _build_request(headers={"x-api-key": "n8n-header-key"})
resolved = get_current_user_optional(request, db)
assert resolved is not None
assert resolved.id == admin.id
assert getattr(request.state, "user", None) is not None
finally:
db.close()
def test_get_current_user_optional_ignores_x_session_id_when_disabled(monkeypatch):
pytest.importorskip("fastapi")
from landppt.auth.auth_service import AuthService
from landppt.auth.middleware import get_current_user_optional
from landppt.core.config import app_config
db = _create_db()
try:
admin = _create_user(db, "admin", "admin@example.com")
auth = AuthService()
session_id = auth.create_session(db, admin)
monkeypatch.setattr(app_config, "allow_header_session_auth", False)
request = _build_request(headers={"x-session-id": session_id})
resolved = get_current_user_optional(request, db)
assert resolved is None
finally:
db.close()
def test_get_current_user_optional_reads_x_session_id(monkeypatch):
pytest.importorskip("fastapi")
from landppt.auth.auth_service import AuthService
from landppt.auth.middleware import get_current_user_optional
from landppt.core.config import app_config
db = _create_db()
try:
admin = _create_user(db, "admin", "admin@example.com")
auth = AuthService()
session_id = auth.create_session(db, admin)
monkeypatch.setattr(app_config, "allow_header_session_auth", True)
request = _build_request(headers={"x-session-id": session_id})
resolved = get_current_user_optional(request, db)
assert resolved is not None
assert resolved.id == admin.id
finally:
db.close()
def test_auth_service_supports_user_managed_api_key(monkeypatch):
from landppt.auth.auth_service import AuthService
from landppt.core.config import app_config
db = _create_db()
try:
user = _create_user(db, "bob", "bob@example.com")
auth = AuthService()
monkeypatch.setattr(app_config, "api_key", None)
monkeypatch.setattr(app_config, "api_key_user", "admin")
monkeypatch.setattr(app_config, "api_keys", None)
_, plaintext = auth.create_or_update_user_api_key(
db=db,
user=user,
key_name="n8n",
raw_api_key="bob-n8n-api-key-0001",
)
resolved = auth.get_user_by_api_key(db, plaintext)
assert resolved is not None
assert resolved.id == user.id
finally:
db.close()
def test_user_managed_api_key_rotation_and_revoke(monkeypatch):
from landppt.auth.auth_service import AuthService
from landppt.core.config import app_config
db = _create_db()
try:
user = _create_user(db, "carol", "carol@example.com")
auth = AuthService()
monkeypatch.setattr(app_config, "api_key", None)
monkeypatch.setattr(app_config, "api_key_user", "admin")
monkeypatch.setattr(app_config, "api_keys", None)
first_record, first_key = auth.create_or_update_user_api_key(
db=db,
user=user,
key_name="default",
raw_api_key="carol-initial-api-key-0001",
)
assert auth.get_user_by_api_key(db, first_key) is not None
second_record, second_key = auth.create_or_update_user_api_key(
db=db,
user=user,
key_name="default",
raw_api_key="carol-rotated-api-key-0002",
)
assert first_record.id == second_record.id
assert auth.get_user_by_api_key(db, first_key) is None
assert auth.get_user_by_api_key(db, second_key) is not None
revoked = auth.revoke_user_api_key(db=db, user_id=user.id, key_id=second_record.id)
assert revoked is True
assert auth.get_user_by_api_key(db, second_key) is None
finally:
db.close()