from __future__ import annotations import base64 import binascii import io import os import sys import zipfile SOLUTION_OUTPUT_SEPARATOR: bytes = b'\n----- ENIGMA-SOLUTION-OUTPUT-BEGIN-a8c7f3e2-9d4b-4c5a-8f1e-2b6d3a4e5f7c -----\n' def build_solution_zip(files: dict[str, str | bytes]) -> bytes: buffer = io.BytesIO() with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zf: for name, content in files.items(): zf.writestr(name, content) return buffer.getvalue() def write_solution_output(zip_bytes: bytes) -> None: sys.stdout.flush() sys.stderr.flush() encoded = base64.b64encode(zip_bytes) buf = sys.stdout.buffer buf.write(SOLUTION_OUTPUT_SEPARATOR) buf.write(encoded) buf.write(b'\n') buf.flush() def split_on_separator(raw_stdout: bytes) -> tuple[bytes, bytes, bool]: idx = raw_stdout.find(SOLUTION_OUTPUT_SEPARATOR) if idx == -1: return (raw_stdout, b'', False) return (raw_stdout[:idx], raw_stdout[idx + len(SOLUTION_OUTPUT_SEPARATOR):], True) def extract_artifacts(raw_stdout: bytes, output_dir: str) -> tuple[bool, str | None]: os.makedirs(output_dir, exist_ok=True) logs_bytes, payload_b64, found = split_on_separator(raw_stdout) log_path = os.path.join(output_dir, 'stdout.log') with open(log_path, 'wb') as f: f.write(logs_bytes) if not found: return (False, 'No solution output separator found in container stdout. The solver must print logs, then the separator line, then a base64-encoded zip of result.json and other artifacts.') payload_b64 = payload_b64.strip() if not payload_b64: return (False, 'Separator found but base64 payload is empty.') try: payload_bytes = base64.b64decode(payload_b64, validate=True) except (binascii.Error, ValueError) as e: return (False, f'Base64 decode of solution payload failed: {e}') if not payload_bytes: return (False, 'Decoded payload is empty.') zip_path = os.path.join(output_dir, 'solution_artifacts.zip') with open(zip_path, 'wb') as f: f.write(payload_bytes) if not zipfile.is_zipfile(zip_path): return (False, 'Decoded payload is not a valid zip file.') try: with zipfile.ZipFile(zip_path, 'r') as zf: dest = os.path.abspath(output_dir) for member in zf.infolist(): member_path = os.path.normpath(os.path.join(dest, member.filename)) if not (member_path == dest or member_path.startswith(dest + os.sep)): return (False, f"Zip member '{member.filename}' escapes output directory.") zf.extractall(dest) except zipfile.BadZipFile as e: return (False, f'Bad zip file: {e}') except Exception as e: return (False, f'Zip extraction failed: {e}') return (True, None)