File size: 13,562 Bytes
50abc22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8de95b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50abc22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c2d2f2
 
50abc22
6c2d2f2
 
 
 
50abc22
 
 
 
6c2d2f2
50abc22
 
 
 
 
 
 
 
6c2d2f2
 
50abc22
 
 
 
 
 
 
 
 
6c2d2f2
50abc22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2aea52a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7585fdf
2aea52a
7585fdf
2aea52a
 
 
50abc22
 
 
 
 
 
 
 
 
ab8cdcb
50abc22
 
ab8cdcb
 
50abc22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
"""Tests for the loopback-only /debug/* introspection endpoints (Unit 5)."""

from __future__ import annotations

import asyncio

import pytest

pytest.importorskip("fastapi")
pytest.importorskip("httpx")

from fastapi import HTTPException
from fastapi.testclient import TestClient

from headroom.proxy.debug_introspection import (
    collect_tasks,
)
from headroom.proxy.loopback_guard import (
    LOOPBACK_HOSTS,
    is_loopback_host,
    require_loopback,
)
from headroom.proxy.server import ProxyConfig, create_app
from headroom.proxy.warmup import WarmupRegistry
from headroom.proxy.ws_session_registry import (
    WebSocketSessionRegistry,
    WSSessionHandle,
)

# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------


@pytest.fixture
def client():
    config = ProxyConfig(
        optimize=False,
        cache_enabled=False,
        rate_limit_enabled=False,
        cost_tracking_enabled=False,
    )
    app = create_app(config)
    # Pin the simulated client address to loopback so the /debug/* guard
    # accepts the request. Without this, FastAPI's TestClient reports
    # the host as ``testclient`` and the guard correctly 404s us.
    with TestClient(app, client=("127.0.0.1", 12345)) as test_client:
        yield test_client


@pytest.fixture
def app_and_client():
    config = ProxyConfig(
        optimize=False,
        cache_enabled=False,
        rate_limit_enabled=False,
        cost_tracking_enabled=False,
    )
    app = create_app(config)
    with TestClient(app, client=("127.0.0.1", 12345)) as test_client:
        yield app, test_client


@pytest.fixture
def app_and_external_client():
    """TestClient that reports a non-loopback address (to exercise 404)."""
    config = ProxyConfig(
        optimize=False,
        cache_enabled=False,
        rate_limit_enabled=False,
        cost_tracking_enabled=False,
    )
    app = create_app(config)
    with TestClient(app, client=("10.0.0.1", 54321)) as test_client:
        yield app, test_client


# ---------------------------------------------------------------------------
# Loopback guard unit tests
# ---------------------------------------------------------------------------


def test_is_loopback_host_accepts_canonical_hosts():
    for host in LOOPBACK_HOSTS:
        assert is_loopback_host(host) is True
    # None (TestClient with no client info) is treated as loopback.
    assert is_loopback_host(None) is True


def test_is_loopback_host_rejects_external_hosts():
    assert is_loopback_host("10.0.0.1") is False
    assert is_loopback_host("192.168.1.100") is False
    assert is_loopback_host("8.8.8.8") is False


def test_is_loopback_host_accepts_ipv6_mapped_ipv4_loopback():
    # On Linux dual-stack sockets with IPV6_V6ONLY=0, an IPv4 loopback
    # connection arrives as ``::ffff:127.0.0.1``. The guard must treat
    # this as loopback or /debug/* silently 404s when the proxy binds
    # to ``::`` / ``0.0.0.0``.
    assert is_loopback_host("::ffff:127.0.0.1") is True


def test_is_loopback_host_rejects_ipv6_mapped_external_ipv4():
    assert is_loopback_host("::ffff:10.0.0.1") is False


def test_is_loopback_host_rejects_non_loopback_ipv6():
    assert is_loopback_host("2001:db8::1") is False


def test_is_loopback_host_rejects_malformed_input():
    assert is_loopback_host("not-an-ip") is False
    assert is_loopback_host("") is False


def test_require_loopback_raises_404_for_external_client():
    class _FakeClient:
        host = "10.0.0.1"

    class _FakeRequest:
        client = _FakeClient()

    with pytest.raises(HTTPException) as exc_info:
        require_loopback(_FakeRequest())  # type: ignore[arg-type]

    assert exc_info.value.status_code == 404
    # Privacy: 404 explicitly, not 403 — endpoints should be invisible.
    assert exc_info.value.status_code != 403


def test_require_loopback_accepts_loopback_client():
    class _FakeClient:
        host = "127.0.0.1"

    class _FakeRequest:
        client = _FakeClient()

    # Should not raise.
    require_loopback(_FakeRequest())  # type: ignore[arg-type]


# ---------------------------------------------------------------------------
# Serializer unit tests
# ---------------------------------------------------------------------------


def test_warmup_registry_to_dict_returns_registry_shape():
    """Serializer equivalent of the old collect_warmup helper.

    The helper was inlined at the /debug/warmup route handler in server.py
    (``registry.to_dict() if registry else {}``); this test preserves
    coverage of the registry's own serializer contract.
    """
    registry = WarmupRegistry()
    registry.kompress.mark_loaded(handle=object(), source_status="enabled")
    registry.memory_backend.mark_error("boom")

    payload = registry.to_dict()

    assert payload["kompress"]["status"] == "loaded"
    assert payload["memory_backend"]["status"] == "error"
    assert payload["memory_backend"]["error"] == "boom"
    # Raw handle must never leak into the serialized payload.
    assert "handle" not in payload["kompress"]


def test_ws_session_registry_snapshot_returns_registered_entries():
    """Serializer equivalent of the old collect_ws_sessions helper."""
    reg = WebSocketSessionRegistry()
    handle = WSSessionHandle(
        session_id="sess-debug-1",
        request_id="req-debug-1",
        client_addr="127.0.0.1:9999",
        upstream_url="wss://upstream/test",
    )
    reg.register(handle)

    payload = reg.snapshot()
    assert len(payload) == 1
    assert payload[0]["session_id"] == "sess-debug-1"
    assert payload[0]["request_id"] == "req-debug-1"


@pytest.mark.asyncio
async def test_collect_tasks_returns_current_tasks_with_metadata():
    async def _noop_task():
        await asyncio.sleep(0.05)

    task = asyncio.create_task(_noop_task(), name="debug-test-task")
    try:
        entries = collect_tasks()
        matching = [e for e in entries if e["name"] == "debug-test-task"]
        assert matching, "expected the named task to appear in collect_tasks output"
        entry = matching[0]
        assert entry["coro_qualname"] is not None
        # Privacy: no frame locals, no coroutine args.
        assert "locals" not in entry
        assert "cr_frame" not in entry
        assert "args" not in entry
        assert entry["stack_depth"] is None or isinstance(entry["stack_depth"], int)
    finally:
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, BaseException):
            pass


@pytest.mark.asyncio
async def test_collect_tasks_derives_age_from_ws_registry_for_codex_relays():
    reg = WebSocketSessionRegistry()
    sid = "relay-sess-1"
    reg.register(
        WSSessionHandle(
            session_id=sid,
            request_id="req-relay-1",
            client_addr="127.0.0.1:1",
            upstream_url="wss://upstream",
        )
    )

    async def _long_relay():
        await asyncio.sleep(0.2)

    relay_task = asyncio.create_task(_long_relay(), name=f"codex-ws-c2u-{sid}")
    try:
        await asyncio.sleep(0.02)  # let some age accrue
        entries = collect_tasks(ws_registry=reg)
        named = [e for e in entries if e["name"] == f"codex-ws-c2u-{sid}"]
        assert named, "expected relay task in output"
        entry = named[0]
        assert entry["age_seconds"] is not None
        assert entry["age_seconds"] >= 0.0
    finally:
        relay_task.cancel()
        try:
            await relay_task
        except (asyncio.CancelledError, BaseException):
            pass


# ---------------------------------------------------------------------------
# HTTP endpoint tests (loopback)
# ---------------------------------------------------------------------------


def test_debug_tasks_returns_json_array_for_loopback(client):
    response = client.get("/debug/tasks")
    assert response.status_code == 200
    data = response.json()
    assert isinstance(data, list)
    # Each entry at least has name + coro_qualname fields.
    for entry in data:
        assert "name" in entry
        assert "coro_qualname" in entry


def test_debug_tasks_stack_depth_is_gated_behind_query(client):
    """Default response must not compute stack_depth (P3 Fix 29 perf gate).

    ``?stack=true`` opts into the synchronous ``Task.get_stack`` walk; the
    default stays cheap so snapshotting during a reconnect storm does
    not stall the event loop.
    """
    default = client.get("/debug/tasks")
    assert default.status_code == 200
    for entry in default.json():
        assert entry["stack_depth"] is None, (
            f"default /debug/tasks must not compute stack_depth; "
            f"got {entry['stack_depth']!r} for {entry.get('name')!r}"
        )

    with_stack = client.get("/debug/tasks?stack=true")
    assert with_stack.status_code == 200
    entries = with_stack.json()
    # At least one entry should have a computed depth (the TestClient
    # itself runs under a task). Some entries may still be None if
    # get_stack raised defensively — we only require that opting in
    # produces at least one integer result.
    integer_depths = [e["stack_depth"] for e in entries if isinstance(e["stack_depth"], int)]
    assert integer_depths, (
        f"expected at least one int stack_depth when ?stack=true; got entries={entries!r}"
    )


def test_debug_warmup_reports_registry_slots(client):
    response = client.get("/debug/warmup")
    assert response.status_code == 200
    data = response.json()
    # Registry surfaces all canonical slot names.
    assert "kompress" in data
    assert "magika" in data
    assert "memory_backend" in data
    assert "memory_embedder" in data
    assert "runtime" in data
    # Each slot has at least a status field.
    assert "status" in data["memory_backend"]
    assert data["runtime"]["anthropic_pre_upstream"]["resolved_concurrency"] >= 0
    assert data["runtime"]["websocket_sessions"]["active_relay_tasks"] == 0


def test_debug_ws_sessions_reports_live_session(app_and_client):
    app, client = app_and_client
    proxy = app.state.proxy
    assert proxy is not None, "create_app must wire app.state.proxy"

    sid = "sess-debug-http"
    proxy.ws_sessions.register(
        WSSessionHandle(
            session_id=sid,
            request_id="req-debug-http",
            client_addr="127.0.0.1:12345",
            upstream_url="wss://upstream/test",
        )
    )
    try:
        response = client.get("/debug/ws-sessions")
        assert response.status_code == 200
        data = response.json()
        matching = [entry for entry in data if entry["session_id"] == sid]
        assert matching, "expected live session in /debug/ws-sessions output"
        assert matching[0]["request_id"] == "req-debug-http"
    finally:
        proxy.ws_sessions.deregister(sid, cause="response_completed")

    # After cleanup the session is gone.
    response = client.get("/debug/ws-sessions")
    assert response.status_code == 200
    assert all(entry["session_id"] != sid for entry in response.json())


def test_debug_endpoints_do_not_mutate_state(client):
    # Call each endpoint 100 times and confirm the second read equals
    # the first — no accidental mutation from serialization.
    first_tasks = client.get("/debug/tasks").json()
    first_warmup = client.get("/debug/warmup").json()
    first_ws = client.get("/debug/ws-sessions").json()

    for _ in range(100):
        client.get("/debug/tasks")
        client.get("/debug/warmup")
        client.get("/debug/ws-sessions")

    # Warmup and ws-sessions are deterministic (no background work touches
    # them in this test config), so they must be identical.
    assert client.get("/debug/warmup").json() == first_warmup
    assert client.get("/debug/ws-sessions").json() == first_ws
    # Tasks may vary naturally, but the call itself never raises and the
    # shape never changes.
    new_tasks = client.get("/debug/tasks").json()
    assert isinstance(new_tasks, list)
    for entry in new_tasks:
        assert set(entry.keys()) == set(first_tasks[0].keys()) if first_tasks else True


def test_debug_tasks_does_not_leak_coro_locals(client):
    response = client.get("/debug/tasks")
    assert response.status_code == 200
    for entry in response.json():
        # Privacy check: the serializer must not leak coroutine locals,
        # frame state, or request bodies. Only name / qualname / age /
        # depth / done are allowed.
        assert set(entry.keys()) <= {
            "name",
            "coro_qualname",
            "age_seconds",
            "stack_depth",
            "done",
        }


# ---------------------------------------------------------------------------
# HTTP endpoint tests (non-loopback)
# ---------------------------------------------------------------------------


def test_debug_endpoints_return_404_for_non_loopback_client(app_and_external_client):
    _, client = app_and_external_client
    for path in ("/debug/tasks", "/debug/ws-sessions", "/debug/warmup"):
        response = client.get(path)
        assert response.status_code == 404, path
        # Must be 404, not 403 — invisible to scanners.
        assert response.status_code != 403


def test_existing_health_routes_unchanged(client):
    # Invariant: Unit 5 must not regress the existing health endpoints.
    for path in ("/livez", "/readyz", "/health"):
        response = client.get(path)
        assert response.status_code == 200, path