Spaces:
Running
Running
File size: 9,066 Bytes
7b4f5dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """
Tests for ZeroDataRetentionGuard β no GPU required.
"""
from __future__ import annotations
import json
import socket
import time
import pytest
from privacy.privacy_guard import ZeroDataRetentionGuard, zdr_session, _sign_certificate
# ββββββββββββββββββββββββββββββββββββββββββ
# Certificate generation
# ββββββββββββββββββββββββββββββββββββββββββ
class TestCertificateGeneration:
def test_certificate_generated(self):
"""Guard must generate a certificate on exit."""
with ZeroDataRetentionGuard("test-cert-001", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
assert cert is not None
assert isinstance(cert, dict)
def test_certificate_has_required_fields(self):
with ZeroDataRetentionGuard("test-cert-002", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
required_fields = [
"session_id", "timestamp", "guarantee",
"model_endpoint", "data_wiped", "signature",
]
for field in required_fields:
assert field in cert, f"Certificate missing field: {field}"
def test_certificate_session_id_matches(self):
session_id = "my-unique-session-xyz"
with ZeroDataRetentionGuard(session_id, enforce_network_block=False) as guard:
cert = guard.generate_certificate()
assert cert["session_id"] == session_id
def test_certificate_data_wiped_true(self):
with ZeroDataRetentionGuard("test-wipe-001", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
assert cert["data_wiped"] is True
def test_certificate_model_endpoint_is_localhost(self):
with ZeroDataRetentionGuard("test-local-001", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
assert "localhost" in cert["model_endpoint"]
def test_certificate_guarantee_mentions_local(self):
with ZeroDataRetentionGuard("test-guarantee-001", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
guarantee = cert["guarantee"].lower()
assert "localhost" in guarantee or "local" in guarantee
def test_certificate_signature_is_hex_string(self):
with ZeroDataRetentionGuard("test-sig-001", enforce_network_block=False) as guard:
cert = guard.generate_certificate()
signature = cert["signature"]
assert isinstance(signature, str)
assert len(signature) == 64 # SHA-256 hex = 64 chars
def test_certificate_signature_is_deterministic_for_same_session(self):
"""Same payload should produce same signature."""
payload = json.dumps(
{"test": "data", "session_id": "sig-test"}, sort_keys=True
)
sig1 = _sign_certificate(payload)
sig2 = _sign_certificate(payload)
assert sig1 == sig2
def test_different_sessions_have_different_signatures(self):
with ZeroDataRetentionGuard("session-A", enforce_network_block=False) as gA:
cert_a = gA.generate_certificate()
with ZeroDataRetentionGuard("session-B", enforce_network_block=False) as gB:
cert_b = gB.generate_certificate()
assert cert_a["signature"] != cert_b["signature"]
# ββββββββββββββββββββββββββββββββββββββββββ
# Session data wiping
# ββββββββββββββββββββββββββββββββββββββββββ
class TestSessionDataWiping:
def test_session_data_wiped_after_scan(self):
"""Data stored in the guard must be cleared after context exit."""
guard = ZeroDataRetentionGuard("test-wipe-data", enforce_network_block=False)
with guard:
guard.store_session_data("sensitive_code", "import os; os.system('rm -rf /')")
guard.store_session_data("api_key", "sk-secret-key")
# After exit, internal store should be cleared
assert len(guard._session_data) == 0, (
"Session data was not wiped after context exit"
)
def test_session_data_accessible_during_context(self):
guard = ZeroDataRetentionGuard("test-access-data", enforce_network_block=False)
with guard:
guard.store_session_data("key", "value")
assert guard._session_data.get("key") == "value"
# ββββββββββββββββββββββββββββββββββββββββββ
# Audit log
# ββββββββββββββββββββββββββββββββββββββββββ
class TestAuditLog:
def test_audit_log_contains_start_event(self):
with ZeroDataRetentionGuard("test-audit-001", enforce_network_block=False) as guard:
pass
assert any("started" in entry.lower() for entry in guard.audit_log), (
"Audit log should contain a session start entry"
)
def test_custom_events_logged(self):
with ZeroDataRetentionGuard("test-audit-002", enforce_network_block=False) as guard:
guard.log_event("Analysis phase 1 complete")
guard.log_event("Analysis phase 2 complete")
logged = " ".join(guard.audit_log)
assert "Analysis phase 1 complete" in logged
assert "Analysis phase 2 complete" in logged
def test_blocked_calls_appear_in_certificate(self):
"""Any blocked external connection attempts should appear in certificate."""
with ZeroDataRetentionGuard("test-blocked", enforce_network_block=False) as guard:
# Manually add a fake blocked call entry
guard.audit_log.append("BLOCKED outbound connection to example.com at 2024-01-01T00:00:00Z")
cert = guard.generate_certificate()
blocked = cert.get("external_calls_blocked", [])
assert any("BLOCKED" in entry for entry in blocked)
# ββββββββββββββββββββββββββββββββββββββββββ
# Network blocking
# ββββββββββββββββββββββββββββββββββββββββββ
class TestNetworkBlocking:
def test_no_external_calls_during_analysis(self):
"""
With network enforcement ON, connecting to an external host must raise.
"""
blocked_attempts = []
with ZeroDataRetentionGuard("test-network-block", enforce_network_block=True) as guard:
try:
# Attempt to connect to an external host
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("8.8.8.8", 80))
sock.close()
except (ConnectionRefusedError, OSError) as e:
blocked_attempts.append(str(e))
# Should have been blocked
assert len(blocked_attempts) > 0 or any("BLOCKED" in e for e in guard.audit_log), (
"External connection was not blocked by ZDR guard"
)
def test_localhost_connections_allowed(self):
"""
Connections to localhost must NOT be blocked (needed for vLLM).
"""
with ZeroDataRetentionGuard("test-localhost-allow", enforce_network_block=True):
# This should NOT raise β just fail to connect if no server is running
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.1)
sock.connect(("127.0.0.1", 8080))
sock.close()
except (ConnectionRefusedError, TimeoutError, OSError):
pass # Expected β no server listening, but NOT blocked by ZDR
except Exception as e:
# Only ZDR-specific block errors should fail the test
if "ZDR Guard" in str(e):
pytest.fail(f"Localhost connection was incorrectly blocked: {e}")
# ββββββββββββββββββββββββββββββββββββββββββ
# Context manager (functional style)
# ββββββββββββββββββββββββββββββββββββββββββ
class TestZDRSessionContextManager:
def test_zdr_session_context_manager(self):
with zdr_session("func-cm-test", enforce=False) as guard:
assert guard.session_id == "func-cm-test"
cert = guard.generate_certificate()
assert cert["session_id"] == "func-cm-test"
def test_zdr_session_data_wiped_on_exit(self):
with zdr_session("func-cm-wipe", enforce=False) as guard:
guard.store_session_data("secret", "classified")
assert len(guard._session_data) == 0
|