File size: 9,066 Bytes
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""
Tests for ZeroDataRetentionGuard β€” no GPU required.
"""
from __future__ import annotations

import json
import socket
import time
import pytest

from privacy.privacy_guard import ZeroDataRetentionGuard, zdr_session, _sign_certificate


# ──────────────────────────────────────────
# Certificate generation
# ──────────────────────────────────────────

class TestCertificateGeneration:
    def test_certificate_generated(self):
        """Guard must generate a certificate on exit."""
        with ZeroDataRetentionGuard("test-cert-001", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        assert cert is not None
        assert isinstance(cert, dict)

    def test_certificate_has_required_fields(self):
        with ZeroDataRetentionGuard("test-cert-002", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        required_fields = [
            "session_id", "timestamp", "guarantee",
            "model_endpoint", "data_wiped", "signature",
        ]
        for field in required_fields:
            assert field in cert, f"Certificate missing field: {field}"

    def test_certificate_session_id_matches(self):
        session_id = "my-unique-session-xyz"
        with ZeroDataRetentionGuard(session_id, enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        assert cert["session_id"] == session_id

    def test_certificate_data_wiped_true(self):
        with ZeroDataRetentionGuard("test-wipe-001", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        assert cert["data_wiped"] is True

    def test_certificate_model_endpoint_is_localhost(self):
        with ZeroDataRetentionGuard("test-local-001", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        assert "localhost" in cert["model_endpoint"]

    def test_certificate_guarantee_mentions_local(self):
        with ZeroDataRetentionGuard("test-guarantee-001", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        guarantee = cert["guarantee"].lower()
        assert "localhost" in guarantee or "local" in guarantee

    def test_certificate_signature_is_hex_string(self):
        with ZeroDataRetentionGuard("test-sig-001", enforce_network_block=False) as guard:
            cert = guard.generate_certificate()

        signature = cert["signature"]
        assert isinstance(signature, str)
        assert len(signature) == 64  # SHA-256 hex = 64 chars

    def test_certificate_signature_is_deterministic_for_same_session(self):
        """Same payload should produce same signature."""
        payload = json.dumps(
            {"test": "data", "session_id": "sig-test"}, sort_keys=True
        )
        sig1 = _sign_certificate(payload)
        sig2 = _sign_certificate(payload)
        assert sig1 == sig2

    def test_different_sessions_have_different_signatures(self):
        with ZeroDataRetentionGuard("session-A", enforce_network_block=False) as gA:
            cert_a = gA.generate_certificate()
        with ZeroDataRetentionGuard("session-B", enforce_network_block=False) as gB:
            cert_b = gB.generate_certificate()

        assert cert_a["signature"] != cert_b["signature"]


# ──────────────────────────────────────────
# Session data wiping
# ──────────────────────────────────────────

class TestSessionDataWiping:
    def test_session_data_wiped_after_scan(self):
        """Data stored in the guard must be cleared after context exit."""
        guard = ZeroDataRetentionGuard("test-wipe-data", enforce_network_block=False)
        with guard:
            guard.store_session_data("sensitive_code", "import os; os.system('rm -rf /')")
            guard.store_session_data("api_key", "sk-secret-key")

        # After exit, internal store should be cleared
        assert len(guard._session_data) == 0, (
            "Session data was not wiped after context exit"
        )

    def test_session_data_accessible_during_context(self):
        guard = ZeroDataRetentionGuard("test-access-data", enforce_network_block=False)
        with guard:
            guard.store_session_data("key", "value")
            assert guard._session_data.get("key") == "value"


# ──────────────────────────────────────────
# Audit log
# ──────────────────────────────────────────

class TestAuditLog:
    def test_audit_log_contains_start_event(self):
        with ZeroDataRetentionGuard("test-audit-001", enforce_network_block=False) as guard:
            pass

        assert any("started" in entry.lower() for entry in guard.audit_log), (
            "Audit log should contain a session start entry"
        )

    def test_custom_events_logged(self):
        with ZeroDataRetentionGuard("test-audit-002", enforce_network_block=False) as guard:
            guard.log_event("Analysis phase 1 complete")
            guard.log_event("Analysis phase 2 complete")

        logged = " ".join(guard.audit_log)
        assert "Analysis phase 1 complete" in logged
        assert "Analysis phase 2 complete" in logged

    def test_blocked_calls_appear_in_certificate(self):
        """Any blocked external connection attempts should appear in certificate."""
        with ZeroDataRetentionGuard("test-blocked", enforce_network_block=False) as guard:
            # Manually add a fake blocked call entry
            guard.audit_log.append("BLOCKED outbound connection to example.com at 2024-01-01T00:00:00Z")
            cert = guard.generate_certificate()

        blocked = cert.get("external_calls_blocked", [])
        assert any("BLOCKED" in entry for entry in blocked)


# ──────────────────────────────────────────
# Network blocking
# ──────────────────────────────────────────

class TestNetworkBlocking:
    def test_no_external_calls_during_analysis(self):
        """
        With network enforcement ON, connecting to an external host must raise.
        """
        blocked_attempts = []

        with ZeroDataRetentionGuard("test-network-block", enforce_network_block=True) as guard:
            try:
                # Attempt to connect to an external host
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect(("8.8.8.8", 80))
                sock.close()
            except (ConnectionRefusedError, OSError) as e:
                blocked_attempts.append(str(e))

        # Should have been blocked
        assert len(blocked_attempts) > 0 or any("BLOCKED" in e for e in guard.audit_log), (
            "External connection was not blocked by ZDR guard"
        )

    def test_localhost_connections_allowed(self):
        """
        Connections to localhost must NOT be blocked (needed for vLLM).
        """
        with ZeroDataRetentionGuard("test-localhost-allow", enforce_network_block=True):
            # This should NOT raise β€” just fail to connect if no server is running
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(0.1)
                sock.connect(("127.0.0.1", 8080))
                sock.close()
            except (ConnectionRefusedError, TimeoutError, OSError):
                pass  # Expected β€” no server listening, but NOT blocked by ZDR
            except Exception as e:
                # Only ZDR-specific block errors should fail the test
                if "ZDR Guard" in str(e):
                    pytest.fail(f"Localhost connection was incorrectly blocked: {e}")


# ──────────────────────────────────────────
# Context manager (functional style)
# ──────────────────────────────────────────

class TestZDRSessionContextManager:
    def test_zdr_session_context_manager(self):
        with zdr_session("func-cm-test", enforce=False) as guard:
            assert guard.session_id == "func-cm-test"
            cert = guard.generate_certificate()
            assert cert["session_id"] == "func-cm-test"

    def test_zdr_session_data_wiped_on_exit(self):
        with zdr_session("func-cm-wipe", enforce=False) as guard:
            guard.store_session_data("secret", "classified")
        assert len(guard._session_data) == 0