dataset / v13 /my_solution /enigma_challenges /solution_output.py
david22guy's picture
Upload folder using huggingface_hub
8d1e50f verified
Raw
History Blame Contribute Delete
2.87 kB
from __future__ import annotations
import base64
import binascii
import io
import os
import sys
import zipfile
SOLUTION_OUTPUT_SEPARATOR: bytes = b'\n----- ENIGMA-SOLUTION-OUTPUT-BEGIN-a8c7f3e2-9d4b-4c5a-8f1e-2b6d3a4e5f7c -----\n'
def build_solution_zip(files: dict[str, str | bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
for name, content in files.items():
zf.writestr(name, content)
return buffer.getvalue()
def write_solution_output(zip_bytes: bytes) -> None:
sys.stdout.flush()
sys.stderr.flush()
encoded = base64.b64encode(zip_bytes)
buf = sys.stdout.buffer
buf.write(SOLUTION_OUTPUT_SEPARATOR)
buf.write(encoded)
buf.write(b'\n')
buf.flush()
def split_on_separator(raw_stdout: bytes) -> tuple[bytes, bytes, bool]:
idx = raw_stdout.find(SOLUTION_OUTPUT_SEPARATOR)
if idx == -1:
return (raw_stdout, b'', False)
return (raw_stdout[:idx], raw_stdout[idx + len(SOLUTION_OUTPUT_SEPARATOR):], True)
def extract_artifacts(raw_stdout: bytes, output_dir: str) -> tuple[bool, str | None]:
os.makedirs(output_dir, exist_ok=True)
logs_bytes, payload_b64, found = split_on_separator(raw_stdout)
log_path = os.path.join(output_dir, 'stdout.log')
with open(log_path, 'wb') as f:
f.write(logs_bytes)
if not found:
return (False, 'No solution output separator found in container stdout. The solver must print logs, then the separator line, then a base64-encoded zip of result.json and other artifacts.')
payload_b64 = payload_b64.strip()
if not payload_b64:
return (False, 'Separator found but base64 payload is empty.')
try:
payload_bytes = base64.b64decode(payload_b64, validate=True)
except (binascii.Error, ValueError) as e:
return (False, f'Base64 decode of solution payload failed: {e}')
if not payload_bytes:
return (False, 'Decoded payload is empty.')
zip_path = os.path.join(output_dir, 'solution_artifacts.zip')
with open(zip_path, 'wb') as f:
f.write(payload_bytes)
if not zipfile.is_zipfile(zip_path):
return (False, 'Decoded payload is not a valid zip file.')
try:
with zipfile.ZipFile(zip_path, 'r') as zf:
dest = os.path.abspath(output_dir)
for member in zf.infolist():
member_path = os.path.normpath(os.path.join(dest, member.filename))
if not (member_path == dest or member_path.startswith(dest + os.sep)):
return (False, f"Zip member '{member.filename}' escapes output directory.")
zf.extractall(dest)
except zipfile.BadZipFile as e:
return (False, f'Bad zip file: {e}')
except Exception as e:
return (False, f'Zip extraction failed: {e}')
return (True, None)