| """ |
| Tests for Stage 80 — security headers + CORS (production |
| BLOCKER 3/5). |
| |
| Coverage plan: |
| 1. SecurityConfig parsing — env reading + fail-fast on garbage |
| 2. always_on_headers — content + HSTS opt-in |
| 3. Integration: headers present on EVERY response shape |
| (200, 401, 403, 404, 422, 429, /metrics text response) |
| 4. HSTS: opt-in semantics, configurable max-age |
| 5. CORS: allowed origin gets preflight + actual response headers; |
| denied origin gets neither |
| 6. CORS unset → no middleware mounted (denies cross-origin by |
| default — browser-side same-origin enforcement) |
| """ |
| import pytest |
|
|
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| from fastapi.testclient import TestClient |
|
|
| from infra import OrgStateService |
| from infra.api import create_app |
| from infra.api.ratelimit import RateLimiter |
| from infra.api.security import ( |
| SecurityConfig, |
| always_on_headers, |
| cors_origins_from_env, |
| ) |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "sec.sqlite3") |
| svc = OrgStateService(dbfile) |
| try: |
| svc.register_tenant("acme", "ACME") |
| keys = { |
| "acme_op": svc.create_api_key("acme", role="operator").raw, |
| } |
| finally: |
| svc.close() |
| return dbfile, keys |
|
|
|
|
| def _auth(k): |
| return {"Authorization": f"Bearer {k}"} |
|
|
|
|
| |
| |
| |
|
|
| def test_default_config_hsts_off_no_cors(monkeypatch): |
| """Bare environment → safe defaults: HSTS off (bricks |
| deploys without TLS), no CORS allowlist.""" |
| monkeypatch.delenv("ORGSTATE_HSTS_ENABLED", raising=False) |
| monkeypatch.delenv("ORGSTATE_HSTS_MAX_AGE", raising=False) |
| monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False) |
| cfg = SecurityConfig.from_env() |
| assert cfg.hsts_enabled is False |
| assert cfg.hsts_max_age == 31_536_000 |
| assert cfg.cors_origins == () |
|
|
|
|
| def test_hsts_enable_truthy_spellings(monkeypatch): |
| """1 / true / yes / on all enable; anything else disables.""" |
| for truthy in ("1", "true", "yes", "on", "TRUE", "Yes"): |
| monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", truthy) |
| assert SecurityConfig.from_env().hsts_enabled is True |
| for falsy in ("0", "false", "no", "off", "", "maybe"): |
| monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", falsy) |
| assert SecurityConfig.from_env().hsts_enabled is False |
|
|
|
|
| def test_hsts_max_age_garbage_raises(monkeypatch): |
| monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "forever") |
| with pytest.raises(ValueError, match="must be an integer"): |
| SecurityConfig.from_env() |
|
|
|
|
| def test_hsts_max_age_negative_raises(monkeypatch): |
| monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "-1") |
| with pytest.raises(ValueError, match="must be >= 0"): |
| SecurityConfig.from_env() |
|
|
|
|
| def test_cors_origins_parsed_and_deduped(monkeypatch): |
| monkeypatch.setenv("ORGSTATE_CORS_ORIGINS", |
| "https://a.com, https://b.com,https://a.com") |
| cfg = SecurityConfig.from_env() |
| assert cfg.cors_origins == ("https://a.com", "https://b.com") |
|
|
|
|
| def test_cors_origins_empty_means_no_allowlist(monkeypatch): |
| monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False) |
| assert cors_origins_from_env() == () |
| monkeypatch.setenv("ORGSTATE_CORS_ORIGINS", " , ") |
| assert cors_origins_from_env() == () |
|
|
|
|
| |
| |
| |
|
|
| def test_always_on_includes_baseline_set(): |
| h = always_on_headers(SecurityConfig()) |
| assert h["X-Content-Type-Options"] == "nosniff" |
| assert h["X-Frame-Options"] == "DENY" |
| assert "strict-origin" in h["Referrer-Policy"] |
| assert "Permissions-Policy" in h |
|
|
|
|
| def test_always_on_no_hsts_when_disabled(): |
| h = always_on_headers(SecurityConfig(hsts_enabled=False)) |
| assert "Strict-Transport-Security" not in h |
|
|
|
|
| def test_always_on_includes_hsts_when_enabled(): |
| h = always_on_headers(SecurityConfig(hsts_enabled=True, hsts_max_age=600)) |
| assert h["Strict-Transport-Security"] == \ |
| "max-age=600; includeSubDomains" |
|
|
|
|
| |
| |
| |
|
|
| def test_headers_present_on_200(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.get("/tenants/acme", headers=_auth(keys["acme_op"])) |
| assert r.status_code == 200 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
| assert r.headers["X-Frame-Options"] == "DENY" |
| assert "strict-origin" in r.headers["Referrer-Policy"] |
|
|
|
|
| def test_headers_present_on_health(tmp_path): |
| """Open routes get hardened too — uptime monitors don't care |
| about referrer policy, but a hostile cross-origin embed would.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.get("/health") |
| assert r.status_code == 200 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
|
|
|
|
| def test_headers_present_on_metrics(tmp_path): |
| """text/plain Prometheus response also gets hardened.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.get("/metrics") |
| assert r.status_code == 200 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
|
|
|
|
| def test_headers_present_on_401(tmp_path): |
| """Auth failure response still gets hardened — defense in depth |
| against responses that leak data via missing headers.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.get("/tenants/acme") |
| assert r.status_code == 401 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
|
|
|
|
| def test_headers_present_on_422_pydantic(tmp_path): |
| """Stage 78 422 responses also get the security headers.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.post("/tenants/acme/webhooks", |
| headers=_auth(keys["acme_op"]), json={}) |
| assert r.status_code == 422 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
|
|
|
|
| def test_headers_present_on_429_rate_limited(tmp_path): |
| """Rate-limited responses ALSO get security headers — the |
| middleware order in app.py ensures security_headers wraps |
| rate_limit (security is outermost).""" |
| dbfile, keys = _bootstrap(tmp_path) |
| limiter = RateLimiter(per_minute_keyed=6, per_minute_anon=0) |
| app = create_app(dbfile, rate_limiter=limiter) |
| client = TestClient(app) |
| |
| for _ in range(12): |
| client.get("/tenants/acme", headers=_auth(keys["acme_op"])) |
| r = client.get("/tenants/acme", headers=_auth(keys["acme_op"])) |
| assert r.status_code == 429 |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
| assert r.headers["X-Frame-Options"] == "DENY" |
|
|
|
|
| def test_no_hsts_header_by_default(tmp_path): |
| """The most important behavior of HSTS: it's OFF unless |
| explicitly enabled. A misconfigured TLS terminator + HSTS |
| on bricks the deployment for max-age (default 1 year).""" |
| dbfile, _ = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.get("/health") |
| assert "Strict-Transport-Security" not in r.headers |
|
|
|
|
| def test_hsts_header_when_enabled(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig(hsts_enabled=True, hsts_max_age=86400) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| r = client.get("/health") |
| assert "max-age=86400" in r.headers["Strict-Transport-Security"] |
| assert "includeSubDomains" in r.headers["Strict-Transport-Security"] |
|
|
|
|
| |
| |
| |
|
|
| def test_cors_no_origins_no_middleware(tmp_path): |
| """Empty allowlist → CORSMiddleware NOT mounted → no |
| Access-Control-* headers in response. Browser will block |
| cross-origin requests (its same-origin policy).""" |
| dbfile, _ = _bootstrap(tmp_path) |
| client = TestClient(create_app(dbfile)) |
| r = client.options( |
| "/health", |
| headers={ |
| "Origin": "https://customer.example", |
| "Access-Control-Request-Method": "GET", |
| }, |
| ) |
| |
| assert "access-control-allow-origin" not in { |
| k.lower() for k in r.headers |
| } |
|
|
|
|
| def test_cors_preflight_allowed_origin(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig(cors_origins=("https://customer.example",)) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| r = client.options( |
| "/health", |
| headers={ |
| "Origin": "https://customer.example", |
| "Access-Control-Request-Method": "GET", |
| "Access-Control-Request-Headers": "Authorization", |
| }, |
| ) |
| |
| assert r.headers["access-control-allow-origin"] == \ |
| "https://customer.example" |
| methods = r.headers.get("access-control-allow-methods", "") |
| assert "GET" in methods |
|
|
|
|
| def test_cors_blocks_disallowed_origin(tmp_path): |
| """Browser-side enforcement: server must NOT echo the |
| Origin back when it isn't in the allowlist. Without that |
| echo, the browser blocks the response from JavaScript.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig(cors_origins=("https://customer.example",)) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| r = client.options( |
| "/health", |
| headers={ |
| "Origin": "https://evil.example", |
| "Access-Control-Request-Method": "GET", |
| }, |
| ) |
| |
| aco = r.headers.get("access-control-allow-origin", "") |
| assert aco != "https://evil.example" |
|
|
|
|
| def test_cors_actual_request_includes_acao(tmp_path): |
| """Allowed origin's normal GET (not preflight) gets the |
| Access-Control-Allow-Origin header on the response.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig(cors_origins=("https://customer.example",)) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| r = client.get( |
| "/health", |
| headers={"Origin": "https://customer.example"}, |
| ) |
| assert r.status_code == 200 |
| assert r.headers["access-control-allow-origin"] == \ |
| "https://customer.example" |
|
|
|
|
| def test_security_headers_present_even_with_cors(tmp_path): |
| """CORS middleware shouldn't strip our security headers — |
| both must coexist.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig( |
| cors_origins=("https://customer.example",), |
| hsts_enabled=True, |
| ) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| r = client.get( |
| "/health", |
| headers={"Origin": "https://customer.example"}, |
| ) |
| assert r.headers["X-Content-Type-Options"] == "nosniff" |
| assert "Strict-Transport-Security" in r.headers |
| assert r.headers["access-control-allow-origin"] == \ |
| "https://customer.example" |
|
|
|
|
| def test_cors_multiple_origins_allowlist(tmp_path): |
| """Two origins in the allowlist; each gets its own ACAO echo.""" |
| dbfile, _ = _bootstrap(tmp_path) |
| sec = SecurityConfig(cors_origins=("https://a.com", "https://b.com")) |
| client = TestClient(create_app(dbfile, security_config=sec)) |
| for origin in ("https://a.com", "https://b.com"): |
| r = client.get("/health", headers={"Origin": origin}) |
| assert r.headers["access-control-allow-origin"] == origin |
| |
| r = client.get("/health", headers={"Origin": "https://c.com"}) |
| assert r.headers.get("access-control-allow-origin", "") != "https://c.com" |
|
|