personadesk-api / tests /test_invitations.py
Legal-i's picture
Initial PersonaDesk API push to Space
45fcd6e
"""Multi-member orgs via invitations β€” full lifecycle.
Covers:
* happy path: owner creates invitation β†’ second user signs up β†’ accepts
* token storage: raw token returned ONLY at creation (listing redacts)
* single-use: same token can't be accepted twice
* expiry: backdated invitation returns 410
* revoke: owner revokes β†’ accept attempt 410
* role gate: creator-role caller cannot create / list / revoke
* cross-org: owner of org A cannot manage org B's invitations
* duplicate membership: existing member trying to accept β†’ 400
"""
from datetime import UTC, datetime, timedelta
from app.db import SessionLocal
from app.models import Invitation, Membership
from tests._helpers import auth_headers, signup
def _create_invitation(client, token: str, org_id: str, *, email="invited@x.io", role="creator"):
r = client.post(
f"/organizations/{org_id}/invitations",
json={"email": email, "role": role},
headers=auth_headers(token),
)
assert r.status_code == 201, r.text
return r.json()
def _me_org_id(client, token: str) -> str:
return client.get("/auth/me", headers=auth_headers(token)).json()["organization"]["id"]
# ─── happy path ─────────────────────────────────────────────────────────────
def test_create_returns_token_once(client, fake_storage, no_redis):
owner_token = signup(client, "owner1@example.com", org="InvOrg1")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id)
assert inv["token"], "raw token should be returned at creation"
assert len(inv["token"]) >= 16
# Listing redacts the token
rows = client.get(
f"/organizations/{org_id}/invitations", headers=auth_headers(owner_token)
).json()
assert any(r["id"] == inv["id"] and r.get("token") is None for r in rows)
def test_accept_creates_membership(client, fake_storage, no_redis):
owner_token = signup(client, "owner2@example.com", org="InvOrg2")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="invitee2@x.io")
# Invitee signs up separately (different org)
invitee_token = signup(client, "invitee2@x.io", org="InviteeOwnOrg")
r = client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(invitee_token),
)
assert r.status_code == 200
assert r.json()["name"] == "InvOrg2"
# Membership now exists with the requested role
db = SessionLocal()
try:
from app.models import User
invitee = db.query(User).filter(User.email == "invitee2@x.io").first()
m = (
db.query(Membership)
.filter(
Membership.user_id == invitee.id,
Membership.organization_id == org_id,
)
.first()
)
assert m is not None
assert m.role == "creator"
finally:
db.close()
def test_owner_membership_list_shows_both_members(client, fake_storage, no_redis):
owner_token = signup(client, "owner3@example.com", org="InvOrg3")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="invitee3@x.io")
invitee_token = signup(client, "invitee3@x.io", org="UnusedOrg3")
client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(invitee_token),
)
rows = client.get(
f"/organizations/{org_id}/memberships", headers=auth_headers(owner_token)
).json()
emails = {r["user_email"] for r in rows}
assert emails == {"owner3@example.com", "invitee3@x.io"}
# ─── failure paths ──────────────────────────────────────────────────────────
def test_accept_single_use(client, fake_storage, no_redis):
owner_token = signup(client, "owner4@example.com", org="InvOrg4")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="single@x.io")
invitee_token = signup(client, "single@x.io", org="SingleOwn")
assert client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(invitee_token),
).status_code == 200
# Another user tries the same token
other_token = signup(client, "other@x.io", org="OtherOwn")
r = client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(other_token),
)
assert r.status_code == 410
def test_accept_expired(client, fake_storage, no_redis):
owner_token = signup(client, "owner5@example.com", org="InvOrg5")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="expired@x.io")
# Backdate the expiry past now
db = SessionLocal()
try:
row = db.get(Invitation, inv["id"])
row.expires_at = datetime.now(UTC) - timedelta(hours=1)
db.commit()
finally:
db.close()
invitee_token = signup(client, "expired@x.io", org="ExpiredOwn")
r = client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(invitee_token),
)
assert r.status_code == 410
def test_revoke_then_accept(client, fake_storage, no_redis):
owner_token = signup(client, "owner6@example.com", org="InvOrg6")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="revoked@x.io")
r = client.delete(
f"/organizations/{org_id}/invitations/{inv['id']}",
headers=auth_headers(owner_token),
)
assert r.status_code == 204
invitee_token = signup(client, "revoked@x.io", org="RevokedOwn")
r = client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(invitee_token),
)
assert r.status_code == 410
def test_invalid_token_400(client, fake_storage, no_redis):
token = signup(client, "probe@example.com")
r = client.post(
"/invitations/accept",
json={"token": "no-such-token-anywhere-1234567890"},
headers=auth_headers(token),
)
assert r.status_code == 400
def test_duplicate_membership_400(client, fake_storage, no_redis):
"""If the invitee is already a member, accept refuses with 400."""
owner_token = signup(client, "owner7@example.com", org="InvOrg7")
org_id = _me_org_id(client, owner_token)
# Owner invites themselves (silly but a real edge case)
inv = _create_invitation(client, owner_token, org_id, email="owner7@example.com")
r = client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(owner_token),
)
assert r.status_code == 400
# ─── role + cross-org gates ─────────────────────────────────────────────────
def test_creator_role_cannot_invite(client, fake_storage, no_redis):
"""A user whose membership.role == 'creator' is 403'd from invite endpoints."""
owner_token = signup(client, "owner8@example.com", org="InvOrg8")
org_id = _me_org_id(client, owner_token)
inv = _create_invitation(client, owner_token, org_id, email="creator8@x.io")
creator_token = signup(client, "creator8@x.io", org="CreatorOwn8")
client.post(
"/invitations/accept",
json={"token": inv["token"]},
headers=auth_headers(creator_token),
)
# When creator_token resolves /auth/me, their *default* org is the one
# they signed up with ("CreatorOwn8") β€” but `get_tenant_context` returns
# the FIRST membership (created_at asc), which is CreatorOwn8. They are
# the owner of THAT, so we test against the invited org explicitly.
r = client.post(
f"/organizations/{org_id}/invitations",
json={"email": "x@x.io"},
headers=auth_headers(creator_token),
)
# _require_org_owner: ctx.organization_id != organization_id β†’ 403
assert r.status_code == 403
def test_owner_of_org_a_cannot_invite_to_org_b(client, fake_storage, no_redis):
token_a = signup(client, "alice-inv@example.com", org="InvA")
token_b = signup(client, "bob-inv@example.com", org="InvB")
org_b_id = _me_org_id(client, token_b)
r = client.post(
f"/organizations/{org_b_id}/invitations",
json={"email": "x@x.io"},
headers=auth_headers(token_a),
)
assert r.status_code == 403