#!/usr/bin/env python3 """Phase 9 audit — TLS pinning, TPM quote, live synthesis.""" from __future__ import annotations import json import socket import ssl import subprocess import sys import tempfile import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT / "tools")) P9_VERSION = "Δ9Φ963-PHASE9-v1.0" def main() -> int: from protocol6_quantum_attest.keylime_bridge import KeylimeAttestation from tls_manager import TLSCertificateManager from live_synthesis import generate_audio_from_seed results: list[dict] = [] t0 = time.perf_counter() td = Path(tempfile.mkdtemp()) mgr_a = TLSCertificateManager("node_a", td / "certs_a") pin_a = mgr_a.generate_self_signed() results.append( { "id": "P9-01", "pass": mgr_a.cert_file.is_file() and mgr_a.key_file.is_file(), "detail": "self-signed cert+key", } ) mgr_b = TLSCertificateManager("node_b", td / "certs_b") pin_b = mgr_b.generate_self_signed() mgr_a.ingest_peer_pin("node_b", pin_b) mgr_b.ingest_peer_pin("node_a", pin_a) results.append( { "id": "P9-02", "pass": mgr_a.get_pin("node_b") == pin_b and mgr_b.get_pin("node_a") == pin_a, } ) peer_id = mgr_a.verify_peer(mgr_b.cert_file) tls_handshake_ok = False if peer_id == "node_b": port = 18443 class _H(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.end_headers() self.wfile.write(b"ok") def log_message(self, *args): return httpd = HTTPServer(("127.0.0.1", port), _H) ctx = mgr_b.ssl_server_context() httpd.socket = ctx.wrap_socket(httpd.socket, server_side=True) def _serve(): httpd.handle_request() th = threading.Thread(target=_serve, daemon=True) th.start() time.sleep(0.2) try: raw = socket.create_connection(("127.0.0.1", port), timeout=3) client_ctx = ssl.create_default_context() client_ctx.check_hostname = False client_ctx.verify_mode = ssl.CERT_NONE with client_ctx.wrap_socket(raw, server_hostname="node_b") as ss: cert_bin = ss.getpeercert(binary_form=True) if cert_bin: tls_handshake_ok = mgr_a.verify_peer_cert_bytes(cert_bin) == "node_b" except Exception: tls_handshake_ok = False finally: try: httpd.server_close() except Exception: pass results.append({"id": "P9-03", "pass": tls_handshake_ok and peer_id == "node_b"}) kl = KeylimeAttestation("audit_node") quote = kl.get_quote() results.append( { "id": "P9-04", "pass": quote is not None and KeylimeAttestation.verify_quote(quote), "mode": (quote or {}).get("mode"), } ) wav = td / "p9_test.wav" synth = generate_audio_from_seed("7e8d18fda979cbef", wav, duration_sec=1.0) wav_ok = wav.is_file() and wav.stat().st_size > 44 and synth.get("samples", 0) > 0 results.append({"id": "P9-05", "pass": wav_ok, "bytes": wav.stat().st_size if wav.is_file() else 0}) for script in ("tls_manager.py", "tpm_attestation.py", "live_synthesis.py"): cp = subprocess.run( [sys.executable, str(ROOT / "tools" / script), "--help"], cwd=ROOT, capture_output=True, timeout=30, ) results.append({"id": f"P9-CLI-{script}", "pass": cp.returncode == 0}) elapsed_ms = int((time.perf_counter() - t0) * 1000) all_pass = all(r["pass"] for r in results) report = { "signature": P9_VERSION, "vectors": results, "all_pass": all_pass, "duration_ms": elapsed_ms, } out = ROOT / "tests" / "phase9_audit_last_run.json" out.write_text(json.dumps(report, indent=2), encoding="utf-8") print(json.dumps(report, indent=2)) return 0 if all_pass else 1 if __name__ == "__main__": raise SystemExit(main())