financial_audit_env / tests /test_security.py
balloonmann's picture
feat: v2.0 — fraud detection task, severity grading, investigation mode, security hardening, 78 tests
126bdbd
"""Tests for security module — rate limiting, request IDs, ground truth leakage."""
import time
import pytest
from financial_audit_env.server.security import InMemoryRateLimiter
class TestRateLimiter:
"""Tests for in-memory rate limiter."""
def test_allows_under_limit(self):
limiter = InMemoryRateLimiter(max_requests=5, window_seconds=60)
for _ in range(5):
assert not limiter.is_rate_limited("127.0.0.1")
def test_blocks_over_limit(self):
limiter = InMemoryRateLimiter(max_requests=3, window_seconds=60)
for _ in range(3):
limiter.is_rate_limited("127.0.0.1")
assert limiter.is_rate_limited("127.0.0.1")
def test_different_ips_independent(self):
limiter = InMemoryRateLimiter(max_requests=2, window_seconds=60)
limiter.is_rate_limited("1.1.1.1")
limiter.is_rate_limited("1.1.1.1")
assert limiter.is_rate_limited("1.1.1.1")
assert not limiter.is_rate_limited("2.2.2.2") # Different IP
def test_cleanup_removes_stale_ips(self):
limiter = InMemoryRateLimiter(max_requests=5, window_seconds=1)
limiter.is_rate_limited("old_ip")
# Force cleanup
time.sleep(1.1)
limiter._last_cleanup = 0 # Force cleanup on next call
limiter.is_rate_limited("new_ip")
assert "old_ip" not in limiter._requests
def test_no_memory_leak(self):
limiter = InMemoryRateLimiter(max_requests=100, window_seconds=1)
# Simulate many unique IPs
for i in range(50):
limiter.is_rate_limited(f"ip_{i}")
assert len(limiter._requests) == 50
# After window expires, cleanup should remove them
time.sleep(1.1)
limiter._last_cleanup = 0
limiter.is_rate_limited("trigger_cleanup")
assert len(limiter._requests) < 50
class TestGroundTruthLeakage:
"""Verify error messages never expose ground truth."""
def test_value_error_sanitized(self):
"""Ensure ground truth keywords are stripped from error messages."""
from financial_audit_env.server.security import setup_security
from fastapi import FastAPI
from fastapi.testclient import TestClient
app = FastAPI()
setup_security(app)
@app.get("/test-error")
async def test_error():
raise ValueError("Expected 12 ground_truth errors but got 5")
client = TestClient(app)
resp = client.get("/test-error")
assert resp.status_code == 400
# Should NOT contain ground truth info
assert "ground_truth" not in resp.json()["detail"]
assert "12" not in resp.json()["detail"]
def test_generic_error_no_trace(self):
from financial_audit_env.server.security import setup_security
from fastapi import FastAPI
from fastapi.testclient import TestClient
app = FastAPI()
setup_security(app)
@app.get("/test-crash")
async def test_crash():
raise RuntimeError("Traceback with planted error details")
client = TestClient(app, raise_server_exceptions=False)
resp = client.get("/test-crash")
assert resp.status_code == 500
body = resp.json()
# Key check: no internal details leaked to client
assert "planted" not in body.get("detail", "")
assert "Traceback" not in body.get("detail", "")