Spaces:
Running
Running
| """ | |
| Zero Data Retention (ZDR) Privacy Guard. | |
| Ensures all model inference stays on localhost. Blocks outbound non-local | |
| network connections, generates cryptographically-signed audit certificates, | |
| and wipes session data after analysis. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import json | |
| import logging | |
| import os | |
| import socket | |
| import time | |
| from contextlib import contextmanager | |
| from datetime import datetime, timezone | |
| from typing import Any, Callable, Generator, List, Optional | |
| logger = logging.getLogger(__name__) | |
| # Secret key for HMAC signatures (loaded from env or generated at startup) | |
| _SIGNING_KEY = os.getenv("ZDR_SIGNING_KEY", "codesentry-local-dev-key-change-in-prod").encode() | |
| # Allowed local destinations | |
| _LOCAL_HOSTS = {"localhost", "127.0.0.1", "::1", "0.0.0.0"} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Socket patching | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| _original_connect: Optional[Callable] = None | |
| _original_getaddrinfo: Optional[Callable] = None | |
| def _make_blocking_connect(audit_log: List[str]) -> Callable: | |
| """Return a patched socket.connect that blocks non-local destinations.""" | |
| _orig = socket.socket.connect | |
| def _patched_connect(self: socket.socket, address: Any) -> None: # type: ignore[override] | |
| host = address[0] if isinstance(address, (tuple, list)) else str(address) | |
| if host not in _LOCAL_HOSTS and not str(host).startswith("127."): | |
| msg = f"BLOCKED outbound connection to {host} at {datetime.utcnow().isoformat()}Z" | |
| audit_log.append(msg) | |
| logger.warning("[ZDR] %s", msg) | |
| raise ConnectionRefusedError(f"[ZDR Guard] Blocked non-local connection to {host}") | |
| return _orig(self, address) | |
| return _patched_connect | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Certificate signing | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _sign_certificate(payload: str) -> str: | |
| """Return an HMAC-SHA256 hex digest of the certificate payload.""" | |
| return hmac.new(_SIGNING_KEY, payload.encode(), hashlib.sha256).hexdigest() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main ZDR Guard class | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ZeroDataRetentionGuard: | |
| """ | |
| Ensures all inference stays local. Blocks outbound non-localhost network calls. | |
| Generates cryptographically signed audit certificates. | |
| Usage (context manager):: | |
| with ZeroDataRetentionGuard(session_id="abc123") as guard: | |
| # β¦ run analysis β¦ | |
| cert = guard.generate_certificate() | |
| """ | |
| def __init__(self, session_id: str, enforce_network_block: bool = True) -> None: | |
| self.session_id = session_id | |
| self.enforce_network_block = enforce_network_block | |
| self.audit_log: List[str] = [] | |
| self.start_time: datetime = datetime.now(timezone.utc) | |
| self._session_data: dict = {} | |
| # ββ Context manager ββββββββββββββββββββββββββββββ | |
| def __enter__(self) -> "ZeroDataRetentionGuard": | |
| if self.enforce_network_block: | |
| self._patch_socket() | |
| self.audit_log.append( | |
| f"ZDR session started: {self.session_id} at {self.start_time.isoformat()}" | |
| ) | |
| logger.info("[ZDR] Session %s started. Network block: %s", self.session_id, self.enforce_network_block) | |
| return self | |
| def __exit__(self, *args: Any) -> None: | |
| if self.enforce_network_block: | |
| self._restore_socket() | |
| self._wipe_session_data() | |
| self.audit_log.append( | |
| f"ZDR session ended: {self.session_id} at {datetime.now(timezone.utc).isoformat()}" | |
| ) | |
| logger.info("[ZDR] Session %s ended. Data wiped.", self.session_id) | |
| # ββ Async support ββββββββββββββββββββββββββββββββ | |
| async def __aenter__(self) -> "ZeroDataRetentionGuard": | |
| return self.__enter__() | |
| async def __aexit__(self, *args: Any) -> None: | |
| self.__exit__(*args) | |
| # ββ Socket patching ββββββββββββββββββββββββββββββ | |
| def _patch_socket(self) -> None: | |
| global _original_connect | |
| if _original_connect is None: | |
| _original_connect = socket.socket.connect | |
| socket.socket.connect = _make_blocking_connect(self.audit_log) # type: ignore[method-assign] | |
| logger.debug("[ZDR] Socket patched β blocking non-local connections") | |
| def _restore_socket(self) -> None: | |
| global _original_connect | |
| if _original_connect is not None: | |
| socket.socket.connect = _original_connect # type: ignore[method-assign] | |
| _original_connect = None | |
| logger.debug("[ZDR] Socket restored") | |
| # ββ Session data management ββββββββββββββββββββββ | |
| def store_session_data(self, key: str, value: Any) -> None: | |
| """Store data in the in-memory session store (wiped on exit).""" | |
| self._session_data[key] = value | |
| def _wipe_session_data(self) -> None: | |
| """Overwrite and clear all session data.""" | |
| for key in list(self._session_data.keys()): | |
| # Overwrite with zeros for sensitive string data | |
| if isinstance(self._session_data[key], str): | |
| self._session_data[key] = "\x00" * len(self._session_data[key]) | |
| self._session_data.clear() | |
| logger.debug("[ZDR] Session data wiped for %s", self.session_id) | |
| # ββ Certificate generation βββββββββββββββββββββββ | |
| def generate_certificate(self) -> dict: | |
| """ | |
| Return a ZDR audit certificate dict. | |
| The certificate is HMAC-signed to prove it was generated by this | |
| CodeSentry instance and has not been tampered with. | |
| """ | |
| end_time = datetime.now(timezone.utc) | |
| payload_dict = { | |
| "session_id": self.session_id, | |
| "timestamp": self.start_time.isoformat(), | |
| "completed_at": end_time.isoformat(), | |
| "guarantee": ( | |
| "All inference ran exclusively on localhost AMD MI300X via vLLM. " | |
| "Zero data transmitted to external services." | |
| ), | |
| "model_endpoint": "http://localhost:8080", | |
| "external_calls_blocked": self.audit_log, | |
| "data_wiped": True, | |
| "network_enforcement": self.enforce_network_block, | |
| } | |
| payload_str = json.dumps(payload_dict, sort_keys=True) | |
| signature = _sign_certificate(payload_str) | |
| return { | |
| **payload_dict, | |
| "signature": signature, | |
| "certificate_version": "1.0", | |
| } | |
| def log_event(self, message: str) -> None: | |
| """Append a custom audit event.""" | |
| ts = datetime.now(timezone.utc).isoformat() | |
| self.audit_log.append(f"[{ts}] {message}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Convenience context manager (functional style) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def zdr_session(session_id: str, enforce: bool = True) -> Generator[ZeroDataRetentionGuard, None, None]: | |
| """Functional context manager wrapper for ZeroDataRetentionGuard.""" | |
| guard = ZeroDataRetentionGuard(session_id, enforce_network_block=enforce) | |
| with guard: | |
| yield guard | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI Middleware | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ZDRMiddleware: | |
| """ | |
| Starlette/FastAPI middleware that logs every request with a ZDR audit entry. | |
| Does NOT block sockets at the middleware level (that is done per-session | |
| inside the orchestrator) β this just maintains an audit trail. | |
| """ | |
| def __init__(self, app: Any) -> None: | |
| self.app = app | |
| async def __call__(self, scope: Any, receive: Any, send: Any) -> None: | |
| if scope["type"] == "http": | |
| path = scope.get("path", "") | |
| ts = datetime.now(timezone.utc).isoformat() | |
| logger.info("[ZDR Middleware] %s %s at %s", scope.get("method", ""), path, ts) | |
| await self.app(scope, receive, send) | |