File size: 3,416 Bytes
126bdbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Tests for security module — rate limiting, request IDs, ground truth leakage."""

import time
import pytest
from financial_audit_env.server.security import InMemoryRateLimiter


class TestRateLimiter:
    """Tests for in-memory rate limiter."""

    def test_allows_under_limit(self):
        limiter = InMemoryRateLimiter(max_requests=5, window_seconds=60)
        for _ in range(5):
            assert not limiter.is_rate_limited("127.0.0.1")

    def test_blocks_over_limit(self):
        limiter = InMemoryRateLimiter(max_requests=3, window_seconds=60)
        for _ in range(3):
            limiter.is_rate_limited("127.0.0.1")
        assert limiter.is_rate_limited("127.0.0.1")

    def test_different_ips_independent(self):
        limiter = InMemoryRateLimiter(max_requests=2, window_seconds=60)
        limiter.is_rate_limited("1.1.1.1")
        limiter.is_rate_limited("1.1.1.1")
        assert limiter.is_rate_limited("1.1.1.1")
        assert not limiter.is_rate_limited("2.2.2.2")  # Different IP

    def test_cleanup_removes_stale_ips(self):
        limiter = InMemoryRateLimiter(max_requests=5, window_seconds=1)
        limiter.is_rate_limited("old_ip")
        # Force cleanup
        time.sleep(1.1)
        limiter._last_cleanup = 0  # Force cleanup on next call
        limiter.is_rate_limited("new_ip")
        assert "old_ip" not in limiter._requests

    def test_no_memory_leak(self):
        limiter = InMemoryRateLimiter(max_requests=100, window_seconds=1)
        # Simulate many unique IPs
        for i in range(50):
            limiter.is_rate_limited(f"ip_{i}")
        assert len(limiter._requests) == 50
        # After window expires, cleanup should remove them
        time.sleep(1.1)
        limiter._last_cleanup = 0
        limiter.is_rate_limited("trigger_cleanup")
        assert len(limiter._requests) < 50


class TestGroundTruthLeakage:
    """Verify error messages never expose ground truth."""

    def test_value_error_sanitized(self):
        """Ensure ground truth keywords are stripped from error messages."""
        from financial_audit_env.server.security import setup_security
        from fastapi import FastAPI
        from fastapi.testclient import TestClient

        app = FastAPI()
        setup_security(app)

        @app.get("/test-error")
        async def test_error():
            raise ValueError("Expected 12 ground_truth errors but got 5")

        client = TestClient(app)
        resp = client.get("/test-error")
        assert resp.status_code == 400
        # Should NOT contain ground truth info
        assert "ground_truth" not in resp.json()["detail"]
        assert "12" not in resp.json()["detail"]

    def test_generic_error_no_trace(self):
        from financial_audit_env.server.security import setup_security
        from fastapi import FastAPI
        from fastapi.testclient import TestClient

        app = FastAPI()
        setup_security(app)

        @app.get("/test-crash")
        async def test_crash():
            raise RuntimeError("Traceback with planted error details")

        client = TestClient(app, raise_server_exceptions=False)
        resp = client.get("/test-crash")
        assert resp.status_code == 500
        body = resp.json()
        # Key check: no internal details leaked to client
        assert "planted" not in body.get("detail", "")
        assert "Traceback" not in body.get("detail", "")