File size: 11,883 Bytes
d2d1903
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
Tests for Stage 80 β€” security headers + CORS (production
BLOCKER 3/5).

Coverage plan:
1. SecurityConfig parsing β€” env reading + fail-fast on garbage
2. always_on_headers β€” content + HSTS opt-in
3. Integration: headers present on EVERY response shape
   (200, 401, 403, 404, 422, 429, /metrics text response)
4. HSTS: opt-in semantics, configurable max-age
5. CORS: allowed origin gets preflight + actual response headers;
   denied origin gets neither
6. CORS unset β†’ no middleware mounted (denies cross-origin by
   default β€” browser-side same-origin enforcement)
"""
import pytest

pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient

from infra import OrgStateService
from infra.api import create_app
from infra.api.ratelimit import RateLimiter
from infra.api.security import (
    SecurityConfig,
    always_on_headers,
    cors_origins_from_env,
)


def _bootstrap(tmp_path):
    dbfile = str(tmp_path / "sec.sqlite3")
    svc = OrgStateService(dbfile)
    try:
        svc.register_tenant("acme", "ACME")
        keys = {
            "acme_op": svc.create_api_key("acme", role="operator").raw,
        }
    finally:
        svc.close()
    return dbfile, keys


def _auth(k):
    return {"Authorization": f"Bearer {k}"}


# =========================================================
# SecurityConfig β€” env parsing
# =========================================================

def test_default_config_hsts_off_no_cors(monkeypatch):
    """Bare environment β†’ safe defaults: HSTS off (bricks
    deploys without TLS), no CORS allowlist."""
    monkeypatch.delenv("ORGSTATE_HSTS_ENABLED", raising=False)
    monkeypatch.delenv("ORGSTATE_HSTS_MAX_AGE", raising=False)
    monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False)
    cfg = SecurityConfig.from_env()
    assert cfg.hsts_enabled is False
    assert cfg.hsts_max_age == 31_536_000
    assert cfg.cors_origins == ()


def test_hsts_enable_truthy_spellings(monkeypatch):
    """1 / true / yes / on all enable; anything else disables."""
    for truthy in ("1", "true", "yes", "on", "TRUE", "Yes"):
        monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", truthy)
        assert SecurityConfig.from_env().hsts_enabled is True
    for falsy in ("0", "false", "no", "off", "", "maybe"):
        monkeypatch.setenv("ORGSTATE_HSTS_ENABLED", falsy)
        assert SecurityConfig.from_env().hsts_enabled is False


def test_hsts_max_age_garbage_raises(monkeypatch):
    monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "forever")
    with pytest.raises(ValueError, match="must be an integer"):
        SecurityConfig.from_env()


def test_hsts_max_age_negative_raises(monkeypatch):
    monkeypatch.setenv("ORGSTATE_HSTS_MAX_AGE", "-1")
    with pytest.raises(ValueError, match="must be >= 0"):
        SecurityConfig.from_env()


def test_cors_origins_parsed_and_deduped(monkeypatch):
    monkeypatch.setenv("ORGSTATE_CORS_ORIGINS",
                        "https://a.com, https://b.com,https://a.com")
    cfg = SecurityConfig.from_env()
    assert cfg.cors_origins == ("https://a.com", "https://b.com")


def test_cors_origins_empty_means_no_allowlist(monkeypatch):
    monkeypatch.delenv("ORGSTATE_CORS_ORIGINS", raising=False)
    assert cors_origins_from_env() == ()
    monkeypatch.setenv("ORGSTATE_CORS_ORIGINS", "  ,  ")
    assert cors_origins_from_env() == ()


# =========================================================
# always_on_headers β€” content
# =========================================================

def test_always_on_includes_baseline_set():
    h = always_on_headers(SecurityConfig())
    assert h["X-Content-Type-Options"] == "nosniff"
    assert h["X-Frame-Options"] == "DENY"
    assert "strict-origin" in h["Referrer-Policy"]
    assert "Permissions-Policy" in h


def test_always_on_no_hsts_when_disabled():
    h = always_on_headers(SecurityConfig(hsts_enabled=False))
    assert "Strict-Transport-Security" not in h


def test_always_on_includes_hsts_when_enabled():
    h = always_on_headers(SecurityConfig(hsts_enabled=True, hsts_max_age=600))
    assert h["Strict-Transport-Security"] == \
        "max-age=600; includeSubDomains"


# =========================================================
# Integration β€” headers on every response
# =========================================================

def test_headers_present_on_200(tmp_path):
    dbfile, keys = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.get("/tenants/acme", headers=_auth(keys["acme_op"]))
    assert r.status_code == 200
    assert r.headers["X-Content-Type-Options"] == "nosniff"
    assert r.headers["X-Frame-Options"] == "DENY"
    assert "strict-origin" in r.headers["Referrer-Policy"]


def test_headers_present_on_health(tmp_path):
    """Open routes get hardened too β€” uptime monitors don't care
    about referrer policy, but a hostile cross-origin embed would."""
    dbfile, _ = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.get("/health")
    assert r.status_code == 200
    assert r.headers["X-Content-Type-Options"] == "nosniff"


def test_headers_present_on_metrics(tmp_path):
    """text/plain Prometheus response also gets hardened."""
    dbfile, _ = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.get("/metrics")
    assert r.status_code == 200
    assert r.headers["X-Content-Type-Options"] == "nosniff"


def test_headers_present_on_401(tmp_path):
    """Auth failure response still gets hardened β€” defense in depth
    against responses that leak data via missing headers."""
    dbfile, _ = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.get("/tenants/acme")    # no Authorization
    assert r.status_code == 401
    assert r.headers["X-Content-Type-Options"] == "nosniff"


def test_headers_present_on_422_pydantic(tmp_path):
    """Stage 78 422 responses also get the security headers."""
    dbfile, keys = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.post("/tenants/acme/webhooks",
                     headers=_auth(keys["acme_op"]), json={})
    assert r.status_code == 422
    assert r.headers["X-Content-Type-Options"] == "nosniff"


def test_headers_present_on_429_rate_limited(tmp_path):
    """Rate-limited responses ALSO get security headers β€” the
    middleware order in app.py ensures security_headers wraps
    rate_limit (security is outermost)."""
    dbfile, keys = _bootstrap(tmp_path)
    limiter = RateLimiter(per_minute_keyed=6, per_minute_anon=0)
    app = create_app(dbfile, rate_limiter=limiter)
    client = TestClient(app)
    # drain
    for _ in range(12):
        client.get("/tenants/acme", headers=_auth(keys["acme_op"]))
    r = client.get("/tenants/acme", headers=_auth(keys["acme_op"]))
    assert r.status_code == 429
    assert r.headers["X-Content-Type-Options"] == "nosniff"
    assert r.headers["X-Frame-Options"] == "DENY"


def test_no_hsts_header_by_default(tmp_path):
    """The most important behavior of HSTS: it's OFF unless
    explicitly enabled. A misconfigured TLS terminator + HSTS
    on bricks the deployment for max-age (default 1 year)."""
    dbfile, _ = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.get("/health")
    assert "Strict-Transport-Security" not in r.headers


def test_hsts_header_when_enabled(tmp_path):
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(hsts_enabled=True, hsts_max_age=86400)
    client = TestClient(create_app(dbfile, security_config=sec))
    r = client.get("/health")
    assert "max-age=86400" in r.headers["Strict-Transport-Security"]
    assert "includeSubDomains" in r.headers["Strict-Transport-Security"]


# =========================================================
# CORS β€” preflight + actual request
# =========================================================

def test_cors_no_origins_no_middleware(tmp_path):
    """Empty allowlist β†’ CORSMiddleware NOT mounted β†’ no
    Access-Control-* headers in response. Browser will block
    cross-origin requests (its same-origin policy)."""
    dbfile, _ = _bootstrap(tmp_path)
    client = TestClient(create_app(dbfile))
    r = client.options(
        "/health",
        headers={
            "Origin": "https://customer.example",
            "Access-Control-Request-Method": "GET",
        },
    )
    # without CORS, OPTIONS may 405 or behave generically
    assert "access-control-allow-origin" not in {
        k.lower() for k in r.headers
    }


def test_cors_preflight_allowed_origin(tmp_path):
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(cors_origins=("https://customer.example",))
    client = TestClient(create_app(dbfile, security_config=sec))
    r = client.options(
        "/health",
        headers={
            "Origin": "https://customer.example",
            "Access-Control-Request-Method": "GET",
            "Access-Control-Request-Headers": "Authorization",
        },
    )
    # CORS preflight responds with the matching origin
    assert r.headers["access-control-allow-origin"] == \
        "https://customer.example"
    methods = r.headers.get("access-control-allow-methods", "")
    assert "GET" in methods


def test_cors_blocks_disallowed_origin(tmp_path):
    """Browser-side enforcement: server must NOT echo the
    Origin back when it isn't in the allowlist. Without that
    echo, the browser blocks the response from JavaScript."""
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(cors_origins=("https://customer.example",))
    client = TestClient(create_app(dbfile, security_config=sec))
    r = client.options(
        "/health",
        headers={
            "Origin": "https://evil.example",
            "Access-Control-Request-Method": "GET",
        },
    )
    # the server must not endorse this origin
    aco = r.headers.get("access-control-allow-origin", "")
    assert aco != "https://evil.example"


def test_cors_actual_request_includes_acao(tmp_path):
    """Allowed origin's normal GET (not preflight) gets the
    Access-Control-Allow-Origin header on the response."""
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(cors_origins=("https://customer.example",))
    client = TestClient(create_app(dbfile, security_config=sec))
    r = client.get(
        "/health",
        headers={"Origin": "https://customer.example"},
    )
    assert r.status_code == 200
    assert r.headers["access-control-allow-origin"] == \
        "https://customer.example"


def test_security_headers_present_even_with_cors(tmp_path):
    """CORS middleware shouldn't strip our security headers β€”
    both must coexist."""
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(
        cors_origins=("https://customer.example",),
        hsts_enabled=True,
    )
    client = TestClient(create_app(dbfile, security_config=sec))
    r = client.get(
        "/health",
        headers={"Origin": "https://customer.example"},
    )
    assert r.headers["X-Content-Type-Options"] == "nosniff"
    assert "Strict-Transport-Security" in r.headers
    assert r.headers["access-control-allow-origin"] == \
        "https://customer.example"


def test_cors_multiple_origins_allowlist(tmp_path):
    """Two origins in the allowlist; each gets its own ACAO echo."""
    dbfile, _ = _bootstrap(tmp_path)
    sec = SecurityConfig(cors_origins=("https://a.com", "https://b.com"))
    client = TestClient(create_app(dbfile, security_config=sec))
    for origin in ("https://a.com", "https://b.com"):
        r = client.get("/health", headers={"Origin": origin})
        assert r.headers["access-control-allow-origin"] == origin
    # but not c
    r = client.get("/health", headers={"Origin": "https://c.com"})
    assert r.headers.get("access-control-allow-origin", "") != "https://c.com"