Spaces:
Running
Running
| """Multi-member orgs via invitations β full lifecycle. | |
| Covers: | |
| * happy path: owner creates invitation β second user signs up β accepts | |
| * token storage: raw token returned ONLY at creation (listing redacts) | |
| * single-use: same token can't be accepted twice | |
| * expiry: backdated invitation returns 410 | |
| * revoke: owner revokes β accept attempt 410 | |
| * role gate: creator-role caller cannot create / list / revoke | |
| * cross-org: owner of org A cannot manage org B's invitations | |
| * duplicate membership: existing member trying to accept β 400 | |
| """ | |
| from datetime import UTC, datetime, timedelta | |
| from app.db import SessionLocal | |
| from app.models import Invitation, Membership | |
| from tests._helpers import auth_headers, signup | |
| def _create_invitation(client, token: str, org_id: str, *, email="invited@x.io", role="creator"): | |
| r = client.post( | |
| f"/organizations/{org_id}/invitations", | |
| json={"email": email, "role": role}, | |
| headers=auth_headers(token), | |
| ) | |
| assert r.status_code == 201, r.text | |
| return r.json() | |
| def _me_org_id(client, token: str) -> str: | |
| return client.get("/auth/me", headers=auth_headers(token)).json()["organization"]["id"] | |
| # βββ happy path βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_create_returns_token_once(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner1@example.com", org="InvOrg1") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id) | |
| assert inv["token"], "raw token should be returned at creation" | |
| assert len(inv["token"]) >= 16 | |
| # Listing redacts the token | |
| rows = client.get( | |
| f"/organizations/{org_id}/invitations", headers=auth_headers(owner_token) | |
| ).json() | |
| assert any(r["id"] == inv["id"] and r.get("token") is None for r in rows) | |
| def test_accept_creates_membership(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner2@example.com", org="InvOrg2") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="invitee2@x.io") | |
| # Invitee signs up separately (different org) | |
| invitee_token = signup(client, "invitee2@x.io", org="InviteeOwnOrg") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(invitee_token), | |
| ) | |
| assert r.status_code == 200 | |
| assert r.json()["name"] == "InvOrg2" | |
| # Membership now exists with the requested role | |
| db = SessionLocal() | |
| try: | |
| from app.models import User | |
| invitee = db.query(User).filter(User.email == "invitee2@x.io").first() | |
| m = ( | |
| db.query(Membership) | |
| .filter( | |
| Membership.user_id == invitee.id, | |
| Membership.organization_id == org_id, | |
| ) | |
| .first() | |
| ) | |
| assert m is not None | |
| assert m.role == "creator" | |
| finally: | |
| db.close() | |
| def test_owner_membership_list_shows_both_members(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner3@example.com", org="InvOrg3") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="invitee3@x.io") | |
| invitee_token = signup(client, "invitee3@x.io", org="UnusedOrg3") | |
| client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(invitee_token), | |
| ) | |
| rows = client.get( | |
| f"/organizations/{org_id}/memberships", headers=auth_headers(owner_token) | |
| ).json() | |
| emails = {r["user_email"] for r in rows} | |
| assert emails == {"owner3@example.com", "invitee3@x.io"} | |
| # βββ failure paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_accept_single_use(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner4@example.com", org="InvOrg4") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="single@x.io") | |
| invitee_token = signup(client, "single@x.io", org="SingleOwn") | |
| assert client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(invitee_token), | |
| ).status_code == 200 | |
| # Another user tries the same token | |
| other_token = signup(client, "other@x.io", org="OtherOwn") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(other_token), | |
| ) | |
| assert r.status_code == 410 | |
| def test_accept_expired(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner5@example.com", org="InvOrg5") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="expired@x.io") | |
| # Backdate the expiry past now | |
| db = SessionLocal() | |
| try: | |
| row = db.get(Invitation, inv["id"]) | |
| row.expires_at = datetime.now(UTC) - timedelta(hours=1) | |
| db.commit() | |
| finally: | |
| db.close() | |
| invitee_token = signup(client, "expired@x.io", org="ExpiredOwn") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(invitee_token), | |
| ) | |
| assert r.status_code == 410 | |
| def test_revoke_then_accept(client, fake_storage, no_redis): | |
| owner_token = signup(client, "owner6@example.com", org="InvOrg6") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="revoked@x.io") | |
| r = client.delete( | |
| f"/organizations/{org_id}/invitations/{inv['id']}", | |
| headers=auth_headers(owner_token), | |
| ) | |
| assert r.status_code == 204 | |
| invitee_token = signup(client, "revoked@x.io", org="RevokedOwn") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(invitee_token), | |
| ) | |
| assert r.status_code == 410 | |
| def test_invalid_token_400(client, fake_storage, no_redis): | |
| token = signup(client, "probe@example.com") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": "no-such-token-anywhere-1234567890"}, | |
| headers=auth_headers(token), | |
| ) | |
| assert r.status_code == 400 | |
| def test_duplicate_membership_400(client, fake_storage, no_redis): | |
| """If the invitee is already a member, accept refuses with 400.""" | |
| owner_token = signup(client, "owner7@example.com", org="InvOrg7") | |
| org_id = _me_org_id(client, owner_token) | |
| # Owner invites themselves (silly but a real edge case) | |
| inv = _create_invitation(client, owner_token, org_id, email="owner7@example.com") | |
| r = client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(owner_token), | |
| ) | |
| assert r.status_code == 400 | |
| # βββ role + cross-org gates βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_creator_role_cannot_invite(client, fake_storage, no_redis): | |
| """A user whose membership.role == 'creator' is 403'd from invite endpoints.""" | |
| owner_token = signup(client, "owner8@example.com", org="InvOrg8") | |
| org_id = _me_org_id(client, owner_token) | |
| inv = _create_invitation(client, owner_token, org_id, email="creator8@x.io") | |
| creator_token = signup(client, "creator8@x.io", org="CreatorOwn8") | |
| client.post( | |
| "/invitations/accept", | |
| json={"token": inv["token"]}, | |
| headers=auth_headers(creator_token), | |
| ) | |
| # When creator_token resolves /auth/me, their *default* org is the one | |
| # they signed up with ("CreatorOwn8") β but `get_tenant_context` returns | |
| # the FIRST membership (created_at asc), which is CreatorOwn8. They are | |
| # the owner of THAT, so we test against the invited org explicitly. | |
| r = client.post( | |
| f"/organizations/{org_id}/invitations", | |
| json={"email": "x@x.io"}, | |
| headers=auth_headers(creator_token), | |
| ) | |
| # _require_org_owner: ctx.organization_id != organization_id β 403 | |
| assert r.status_code == 403 | |
| def test_owner_of_org_a_cannot_invite_to_org_b(client, fake_storage, no_redis): | |
| token_a = signup(client, "alice-inv@example.com", org="InvA") | |
| token_b = signup(client, "bob-inv@example.com", org="InvB") | |
| org_b_id = _me_org_id(client, token_b) | |
| r = client.post( | |
| f"/organizations/{org_b_id}/invitations", | |
| json={"email": "x@x.io"}, | |
| headers=auth_headers(token_a), | |
| ) | |
| assert r.status_code == 403 | |