"""Authentication contract tests.""" import os import sys from unittest.mock import MagicMock os.environ.setdefault("CEPHEUS_CLOUD", "1") os.environ.setdefault("CEPHEUS_API_KEY", "test-key") os.environ.setdefault("CEPHEUS_AUTH_DEV_MODE", "1") for mod in ( "google.adk", "google.adk.agents", "google.adk.runners", "google.adk.sessions", ): sys.modules.setdefault(mod, MagicMock()) BACKEND_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if BACKEND_DIR not in sys.path: sys.path.insert(0, BACKEND_DIR) from fastapi.testclient import TestClient # noqa: E402 import main # noqa: E402 client = TestClient(main.app) API_HEADERS = {"X-API-Key": "test-key"} def test_auth_status_dev_mode(): r = client.get("/auth/status") assert r.status_code == 200 body = r.json() assert body["auth_enabled"] is True assert body["mode"] == "dev" def test_login_and_me_jwt(): r = client.post("/auth/login", json={"username": "admin", "password": "admin"}) assert r.status_code == 200 tokens = r.json() assert tokens.get("access_token") bearer = {"Authorization": f"Bearer {tokens['access_token']}"} me = client.get("/auth/me", headers=bearer) assert me.status_code == 200 assert me.json()["role"] == "admin" def test_login_invalid_credentials(): r = client.post("/auth/login", json={"username": "admin", "password": "wrong"}) assert r.status_code == 401 def test_refresh_token_rotation(): login = client.post("/auth/login", json={"username": "staff", "password": "staff"}).json() refreshed = client.post("/auth/refresh", json={"refresh_token": login["refresh_token"]}) assert refreshed.status_code == 200 assert refreshed.json().get("access_token") def test_staff_accept_and_complete_request(): staff_headers = { "Authorization": f"Bearer {client.post('/auth/login', json={'username': 'staff', 'password': 'staff'}).json()['access_token']}" } admin_headers = { "Authorization": f"Bearer {client.post('/auth/login', json={'username': 'admin', 'password': 'admin'}).json()['access_token']}" } issue = client.post( "/issues", headers=admin_headers, json={"title": "Test corridor smoke", "desc": "Smoke test", "status": "ONGOING", "priority": "medium", "staff": 1}, ).json() issue_id = issue["id"] alert = client.post( "/alert", headers=admin_headers, json={ "type": "staff_request", "location": "Corridor B", "message": f"Staff needed for {issue['title']}", "issue_id": issue_id, }, ) assert alert.status_code == 200 pending = client.get("/staff/requests", headers=staff_headers).json() req = next(r for r in pending if r.get("issue_id") == issue_id) assert req["status"] in ("pending", "sent") accepted = client.post( f"/staff/requests/{req['id']}/accept", headers=staff_headers, json={"staff_name": "staff"}, ) assert accepted.status_code == 200 assert accepted.json()["progress"] == 25 completed = client.post( f"/staff/requests/{req['id']}/complete", headers=staff_headers, json={"notes": "Area cleared"}, ) assert completed.status_code == 200 body = completed.json() assert body["status"] == "resolved" assert body["progress"] == 100 assert body["assignments"][0]["stage"] == "resolved" logs = client.get("/incident/logs", headers=admin_headers).json() assert any( entry.get("event_type") == "staff_request_completed" and req["id"] in entry.get("msg", "") for entry in logs ) issues = client.get("/issues", headers=admin_headers).json() updated_issue = next(i for i in issues if i["id"] == issue_id) assert updated_issue["status"] == "RESOLVED" assert updated_issue["progress"] == 100 def test_public_vision_staff_login_returns_jwt(monkeypatch): """HF public vision must still issue JWT for staff portal mutations.""" monkeypatch.setenv("ALLOW_PUBLIC_VISION", "1") monkeypatch.setenv("CEPHEUS_JWT_SECRET", "test-jwt-secret-for-public-vision") monkeypatch.setenv("CEPHEUS_AUTH_DEV_MODE", "0") r = client.post("/auth/login", json={"username": "staff", "password": "staff"}) assert r.status_code == 200 body = r.json() assert body.get("access_token") assert body.get("token_type") == "bearer" assert body.get("user", {}).get("role") == "staff"