""" Tests for Stage 80 — security headers + CORS (production BLOCKER 3/5). Coverage plan: 1. SecurityConfig parsing — env reading + fail-fast on garbage 2. always_on_headers — content + HSTS opt-in 3. Integration: headers present on EVERY response shape (200, 401, 403, 404, 422, 429, /metrics text response) 4. HSTS: opt-in semantics, configurable max-age 5. CORS: allowed origin gets preflight + actual response headers; denied origin gets neither 6. CORS unset → no middleware mounted (denies cross-origin by default — browser-side same-origin enforcement) """ import pytest pytest.importorskip("fastapi") pytest.importorskip("httpx") from fastapi.testclient import TestClient from infra import OrgStateService from infra.api import create_app from infra.api.ratelimit import RateLimiter from infra.api.security import ( SecurityConfig, always_on_headers, cors_origins_from_env, ) def _bootstrap(tmp_path): dbfile = str(tmp_path / "sec.sqlite3") svc = OrgStateService(dbfile) try: svc.register_tenant("acme", "ACME") keys = { "acme_op": svc.create_api_key("acme", role="operator").raw, } finally: svc.close() return dbfile, keys def _auth(k): return {"Authorization": f"Bearer {k}"} # ========================================================= # SecurityConfig — env parsing # ========================================================= def test_default_config_hsts_off_no_cors(monkeypatch): """Bare environment → safe defaults: HSTS off (bricks deploys without TLS), no CORS allowlist.""" monkeypatch.delenv("ORGSTATE_HSTS_ENABLED", raising=False) monkeypatch.delenv("ORGSTATE_HSTS_MAX_AGE", raising=False) monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False) cfg = SecurityConfig.from_env() assert cfg.hsts_enabled is False assert cfg.hsts_max_age == 31_536_000 assert cfg.cors_origins == () def test_hsts_enable_truthy_spellings(monkeypatch): """1 / true / yes / on all enable; anything else disables.""" for truthy in ("1", "true", "yes", "on", "TRUE", "Yes"): monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", truthy) assert SecurityConfig.from_env().hsts_enabled is True for falsy in ("0", "false", "no", "off", "", "maybe"): monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", falsy) assert SecurityConfig.from_env().hsts_enabled is False def test_hsts_max_age_garbage_raises(monkeypatch): monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "forever") with pytest.raises(ValueError, match="must be an integer"): SecurityConfig.from_env() def test_hsts_max_age_negative_raises(monkeypatch): monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "-1") with pytest.raises(ValueError, match="must be >= 0"): SecurityConfig.from_env() def test_cors_origins_parsed_and_deduped(monkeypatch): monkeypatch.setenv("ORGSTATE_CORS_ORIGINS", "https://a.com, https://b.com,https://a.com") cfg = SecurityConfig.from_env() assert cfg.cors_origins == ("https://a.com", "https://b.com") def test_cors_origins_empty_means_no_allowlist(monkeypatch): monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False) assert cors_origins_from_env() == () monkeypatch.setenv("ORGSTATE_CORS_ORIGINS", " , ") assert cors_origins_from_env() == () # ========================================================= # always_on_headers — content # ========================================================= def test_always_on_includes_baseline_set(): h = always_on_headers(SecurityConfig()) assert h["X-Content-Type-Options"] == "nosniff" assert h["X-Frame-Options"] == "DENY" assert "strict-origin" in h["Referrer-Policy"] assert "Permissions-Policy" in h def test_always_on_no_hsts_when_disabled(): h = always_on_headers(SecurityConfig(hsts_enabled=False)) assert "Strict-Transport-Security" not in h def test_always_on_includes_hsts_when_enabled(): h = always_on_headers(SecurityConfig(hsts_enabled=True, hsts_max_age=600)) assert h["Strict-Transport-Security"] == \ "max-age=600; includeSubDomains" # ========================================================= # Integration — headers on every response # ========================================================= def test_headers_present_on_200(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.get("/tenants/acme", headers=_auth(keys["acme_op"])) assert r.status_code == 200 assert r.headers["X-Content-Type-Options"] == "nosniff" assert r.headers["X-Frame-Options"] == "DENY" assert "strict-origin" in r.headers["Referrer-Policy"] def test_headers_present_on_health(tmp_path): """Open routes get hardened too — uptime monitors don't care about referrer policy, but a hostile cross-origin embed would.""" dbfile, _ = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.get("/health") assert r.status_code == 200 assert r.headers["X-Content-Type-Options"] == "nosniff" def test_headers_present_on_metrics(tmp_path): """text/plain Prometheus response also gets hardened.""" dbfile, _ = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.get("/metrics") assert r.status_code == 200 assert r.headers["X-Content-Type-Options"] == "nosniff" def test_headers_present_on_401(tmp_path): """Auth failure response still gets hardened — defense in depth against responses that leak data via missing headers.""" dbfile, _ = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.get("/tenants/acme") # no Authorization assert r.status_code == 401 assert r.headers["X-Content-Type-Options"] == "nosniff" def test_headers_present_on_422_pydantic(tmp_path): """Stage 78 422 responses also get the security headers.""" dbfile, keys = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.post("/tenants/acme/webhooks", headers=_auth(keys["acme_op"]), json={}) assert r.status_code == 422 assert r.headers["X-Content-Type-Options"] == "nosniff" def test_headers_present_on_429_rate_limited(tmp_path): """Rate-limited responses ALSO get security headers — the middleware order in app.py ensures security_headers wraps rate_limit (security is outermost).""" dbfile, keys = _bootstrap(tmp_path) limiter = RateLimiter(per_minute_keyed=6, per_minute_anon=0) app = create_app(dbfile, rate_limiter=limiter) client = TestClient(app) # drain for _ in range(12): client.get("/tenants/acme", headers=_auth(keys["acme_op"])) r = client.get("/tenants/acme", headers=_auth(keys["acme_op"])) assert r.status_code == 429 assert r.headers["X-Content-Type-Options"] == "nosniff" assert r.headers["X-Frame-Options"] == "DENY" def test_no_hsts_header_by_default(tmp_path): """The most important behavior of HSTS: it's OFF unless explicitly enabled. A misconfigured TLS terminator + HSTS on bricks the deployment for max-age (default 1 year).""" dbfile, _ = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.get("/health") assert "Strict-Transport-Security" not in r.headers def test_hsts_header_when_enabled(tmp_path): dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig(hsts_enabled=True, hsts_max_age=86400) client = TestClient(create_app(dbfile, security_config=sec)) r = client.get("/health") assert "max-age=86400" in r.headers["Strict-Transport-Security"] assert "includeSubDomains" in r.headers["Strict-Transport-Security"] # ========================================================= # CORS — preflight + actual request # ========================================================= def test_cors_no_origins_no_middleware(tmp_path): """Empty allowlist → CORSMiddleware NOT mounted → no Access-Control-* headers in response. Browser will block cross-origin requests (its same-origin policy).""" dbfile, _ = _bootstrap(tmp_path) client = TestClient(create_app(dbfile)) r = client.options( "/health", headers={ "Origin": "https://customer.example", "Access-Control-Request-Method": "GET", }, ) # without CORS, OPTIONS may 405 or behave generically assert "access-control-allow-origin" not in { k.lower() for k in r.headers } def test_cors_preflight_allowed_origin(tmp_path): dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig(cors_origins=("https://customer.example",)) client = TestClient(create_app(dbfile, security_config=sec)) r = client.options( "/health", headers={ "Origin": "https://customer.example", "Access-Control-Request-Method": "GET", "Access-Control-Request-Headers": "Authorization", }, ) # CORS preflight responds with the matching origin assert r.headers["access-control-allow-origin"] == \ "https://customer.example" methods = r.headers.get("access-control-allow-methods", "") assert "GET" in methods def test_cors_blocks_disallowed_origin(tmp_path): """Browser-side enforcement: server must NOT echo the Origin back when it isn't in the allowlist. Without that echo, the browser blocks the response from JavaScript.""" dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig(cors_origins=("https://customer.example",)) client = TestClient(create_app(dbfile, security_config=sec)) r = client.options( "/health", headers={ "Origin": "https://evil.example", "Access-Control-Request-Method": "GET", }, ) # the server must not endorse this origin aco = r.headers.get("access-control-allow-origin", "") assert aco != "https://evil.example" def test_cors_actual_request_includes_acao(tmp_path): """Allowed origin's normal GET (not preflight) gets the Access-Control-Allow-Origin header on the response.""" dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig(cors_origins=("https://customer.example",)) client = TestClient(create_app(dbfile, security_config=sec)) r = client.get( "/health", headers={"Origin": "https://customer.example"}, ) assert r.status_code == 200 assert r.headers["access-control-allow-origin"] == \ "https://customer.example" def test_security_headers_present_even_with_cors(tmp_path): """CORS middleware shouldn't strip our security headers — both must coexist.""" dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig( cors_origins=("https://customer.example",), hsts_enabled=True, ) client = TestClient(create_app(dbfile, security_config=sec)) r = client.get( "/health", headers={"Origin": "https://customer.example"}, ) assert r.headers["X-Content-Type-Options"] == "nosniff" assert "Strict-Transport-Security" in r.headers assert r.headers["access-control-allow-origin"] == \ "https://customer.example" def test_cors_multiple_origins_allowlist(tmp_path): """Two origins in the allowlist; each gets its own ACAO echo.""" dbfile, _ = _bootstrap(tmp_path) sec = SecurityConfig(cors_origins=("https://a.com", "https://b.com")) client = TestClient(create_app(dbfile, security_config=sec)) for origin in ("https://a.com", "https://b.com"): r = client.get("/health", headers={"Origin": origin}) assert r.headers["access-control-allow-origin"] == origin # but not c r = client.get("/health", headers={"Origin": "https://c.com"}) assert r.headers.get("access-control-allow-origin", "") != "https://c.com"