Spaces:
Running
Running
| """ | |
| Tests for ZeroDataRetentionGuard β no GPU required. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import socket | |
| import time | |
| import pytest | |
| from privacy.privacy_guard import ZeroDataRetentionGuard, zdr_session, _sign_certificate | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # Certificate generation | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestCertificateGeneration: | |
| def test_certificate_generated(self): | |
| """Guard must generate a certificate on exit.""" | |
| with ZeroDataRetentionGuard("test-cert-001", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| assert cert is not None | |
| assert isinstance(cert, dict) | |
| def test_certificate_has_required_fields(self): | |
| with ZeroDataRetentionGuard("test-cert-002", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| required_fields = [ | |
| "session_id", "timestamp", "guarantee", | |
| "model_endpoint", "data_wiped", "signature", | |
| ] | |
| for field in required_fields: | |
| assert field in cert, f"Certificate missing field: {field}" | |
| def test_certificate_session_id_matches(self): | |
| session_id = "my-unique-session-xyz" | |
| with ZeroDataRetentionGuard(session_id, enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| assert cert["session_id"] == session_id | |
| def test_certificate_data_wiped_true(self): | |
| with ZeroDataRetentionGuard("test-wipe-001", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| assert cert["data_wiped"] is True | |
| def test_certificate_model_endpoint_is_localhost(self): | |
| with ZeroDataRetentionGuard("test-local-001", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| assert "localhost" in cert["model_endpoint"] | |
| def test_certificate_guarantee_mentions_local(self): | |
| with ZeroDataRetentionGuard("test-guarantee-001", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| guarantee = cert["guarantee"].lower() | |
| assert "localhost" in guarantee or "local" in guarantee | |
| def test_certificate_signature_is_hex_string(self): | |
| with ZeroDataRetentionGuard("test-sig-001", enforce_network_block=False) as guard: | |
| cert = guard.generate_certificate() | |
| signature = cert["signature"] | |
| assert isinstance(signature, str) | |
| assert len(signature) == 64 # SHA-256 hex = 64 chars | |
| def test_certificate_signature_is_deterministic_for_same_session(self): | |
| """Same payload should produce same signature.""" | |
| payload = json.dumps( | |
| {"test": "data", "session_id": "sig-test"}, sort_keys=True | |
| ) | |
| sig1 = _sign_certificate(payload) | |
| sig2 = _sign_certificate(payload) | |
| assert sig1 == sig2 | |
| def test_different_sessions_have_different_signatures(self): | |
| with ZeroDataRetentionGuard("session-A", enforce_network_block=False) as gA: | |
| cert_a = gA.generate_certificate() | |
| with ZeroDataRetentionGuard("session-B", enforce_network_block=False) as gB: | |
| cert_b = gB.generate_certificate() | |
| assert cert_a["signature"] != cert_b["signature"] | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # Session data wiping | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestSessionDataWiping: | |
| def test_session_data_wiped_after_scan(self): | |
| """Data stored in the guard must be cleared after context exit.""" | |
| guard = ZeroDataRetentionGuard("test-wipe-data", enforce_network_block=False) | |
| with guard: | |
| guard.store_session_data("sensitive_code", "import os; os.system('rm -rf /')") | |
| guard.store_session_data("api_key", "sk-secret-key") | |
| # After exit, internal store should be cleared | |
| assert len(guard._session_data) == 0, ( | |
| "Session data was not wiped after context exit" | |
| ) | |
| def test_session_data_accessible_during_context(self): | |
| guard = ZeroDataRetentionGuard("test-access-data", enforce_network_block=False) | |
| with guard: | |
| guard.store_session_data("key", "value") | |
| assert guard._session_data.get("key") == "value" | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # Audit log | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestAuditLog: | |
| def test_audit_log_contains_start_event(self): | |
| with ZeroDataRetentionGuard("test-audit-001", enforce_network_block=False) as guard: | |
| pass | |
| assert any("started" in entry.lower() for entry in guard.audit_log), ( | |
| "Audit log should contain a session start entry" | |
| ) | |
| def test_custom_events_logged(self): | |
| with ZeroDataRetentionGuard("test-audit-002", enforce_network_block=False) as guard: | |
| guard.log_event("Analysis phase 1 complete") | |
| guard.log_event("Analysis phase 2 complete") | |
| logged = " ".join(guard.audit_log) | |
| assert "Analysis phase 1 complete" in logged | |
| assert "Analysis phase 2 complete" in logged | |
| def test_blocked_calls_appear_in_certificate(self): | |
| """Any blocked external connection attempts should appear in certificate.""" | |
| with ZeroDataRetentionGuard("test-blocked", enforce_network_block=False) as guard: | |
| # Manually add a fake blocked call entry | |
| guard.audit_log.append("BLOCKED outbound connection to example.com at 2024-01-01T00:00:00Z") | |
| cert = guard.generate_certificate() | |
| blocked = cert.get("external_calls_blocked", []) | |
| assert any("BLOCKED" in entry for entry in blocked) | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # Network blocking | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestNetworkBlocking: | |
| def test_no_external_calls_during_analysis(self): | |
| """ | |
| With network enforcement ON, connecting to an external host must raise. | |
| """ | |
| blocked_attempts = [] | |
| with ZeroDataRetentionGuard("test-network-block", enforce_network_block=True) as guard: | |
| try: | |
| # Attempt to connect to an external host | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.connect(("8.8.8.8", 80)) | |
| sock.close() | |
| except (ConnectionRefusedError, OSError) as e: | |
| blocked_attempts.append(str(e)) | |
| # Should have been blocked | |
| assert len(blocked_attempts) > 0 or any("BLOCKED" in e for e in guard.audit_log), ( | |
| "External connection was not blocked by ZDR guard" | |
| ) | |
| def test_localhost_connections_allowed(self): | |
| """ | |
| Connections to localhost must NOT be blocked (needed for vLLM). | |
| """ | |
| with ZeroDataRetentionGuard("test-localhost-allow", enforce_network_block=True): | |
| # This should NOT raise β just fail to connect if no server is running | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(0.1) | |
| sock.connect(("127.0.0.1", 8080)) | |
| sock.close() | |
| except (ConnectionRefusedError, TimeoutError, OSError): | |
| pass # Expected β no server listening, but NOT blocked by ZDR | |
| except Exception as e: | |
| # Only ZDR-specific block errors should fail the test | |
| if "ZDR Guard" in str(e): | |
| pytest.fail(f"Localhost connection was incorrectly blocked: {e}") | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # Context manager (functional style) | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestZDRSessionContextManager: | |
| def test_zdr_session_context_manager(self): | |
| with zdr_session("func-cm-test", enforce=False) as guard: | |
| assert guard.session_id == "func-cm-test" | |
| cert = guard.generate_certificate() | |
| assert cert["session_id"] == "func-cm-test" | |
| def test_zdr_session_data_wiped_on_exit(self): | |
| with zdr_session("func-cm-wipe", enforce=False) as guard: | |
| guard.store_session_data("secret", "classified") | |
| assert len(guard._session_data) == 0 | |