| from __future__ import annotations |
| import base64 |
| import binascii |
| import io |
| import os |
| import sys |
| import zipfile |
| SOLUTION_OUTPUT_SEPARATOR: bytes = b'\n----- ENIGMA-SOLUTION-OUTPUT-BEGIN-a8c7f3e2-9d4b-4c5a-8f1e-2b6d3a4e5f7c -----\n' |
|
|
| def build_solution_zip(files: dict[str, str | bytes]) -> bytes: |
| buffer = io.BytesIO() |
| with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zf: |
| for name, content in files.items(): |
| zf.writestr(name, content) |
| return buffer.getvalue() |
|
|
| def write_solution_output(zip_bytes: bytes) -> None: |
| sys.stdout.flush() |
| sys.stderr.flush() |
| encoded = base64.b64encode(zip_bytes) |
| buf = sys.stdout.buffer |
| buf.write(SOLUTION_OUTPUT_SEPARATOR) |
| buf.write(encoded) |
| buf.write(b'\n') |
| buf.flush() |
|
|
| def split_on_separator(raw_stdout: bytes) -> tuple[bytes, bytes, bool]: |
| idx = raw_stdout.find(SOLUTION_OUTPUT_SEPARATOR) |
| if idx == -1: |
| return (raw_stdout, b'', False) |
| return (raw_stdout[:idx], raw_stdout[idx + len(SOLUTION_OUTPUT_SEPARATOR):], True) |
|
|
| def extract_artifacts(raw_stdout: bytes, output_dir: str) -> tuple[bool, str | None]: |
| os.makedirs(output_dir, exist_ok=True) |
| logs_bytes, payload_b64, found = split_on_separator(raw_stdout) |
| log_path = os.path.join(output_dir, 'stdout.log') |
| with open(log_path, 'wb') as f: |
| f.write(logs_bytes) |
| if not found: |
| return (False, 'No solution output separator found in container stdout. The solver must print logs, then the separator line, then a base64-encoded zip of result.json and other artifacts.') |
| payload_b64 = payload_b64.strip() |
| if not payload_b64: |
| return (False, 'Separator found but base64 payload is empty.') |
| try: |
| payload_bytes = base64.b64decode(payload_b64, validate=True) |
| except (binascii.Error, ValueError) as e: |
| return (False, f'Base64 decode of solution payload failed: {e}') |
| if not payload_bytes: |
| return (False, 'Decoded payload is empty.') |
| zip_path = os.path.join(output_dir, 'solution_artifacts.zip') |
| with open(zip_path, 'wb') as f: |
| f.write(payload_bytes) |
| if not zipfile.is_zipfile(zip_path): |
| return (False, 'Decoded payload is not a valid zip file.') |
| try: |
| with zipfile.ZipFile(zip_path, 'r') as zf: |
| dest = os.path.abspath(output_dir) |
| for member in zf.infolist(): |
| member_path = os.path.normpath(os.path.join(dest, member.filename)) |
| if not (member_path == dest or member_path.startswith(dest + os.sep)): |
| return (False, f"Zip member '{member.filename}' escapes output directory.") |
| zf.extractall(dest) |
| except zipfile.BadZipFile as e: |
| return (False, f'Bad zip file: {e}') |
| except Exception as e: |
| return (False, f'Zip extraction failed: {e}') |
| return (True, None) |
|
|