# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Daytona container provider for running OpenEnv environments in Daytona cloud sandboxes. Requires the ``daytona`` SDK: ``pip install daytona>=0.10`` """ from __future__ import annotations import json import os import shlex import time from typing import Any, Callable, Dict, Optional import yaml from .providers import ContainerProvider class DaytonaProvider(ContainerProvider): """ Container provider that runs environments in Daytona cloud sandboxes. Example: >>> provider = DaytonaProvider(api_key="your-key") >>> image = DaytonaProvider.image_from_dockerfile("envs/echo_env/server/Dockerfile") >>> base_url = provider.start_container(image) >>> provider.wait_for_ready(base_url) >>> provider.stop_container() """ _dockerfile_registry: Dict[str, Dict[str, Any]] = {} def __init__( self, *, api_key: Optional[str] = None, public: bool = False, resources: Optional[Any] = None, auto_stop_interval: int = 15, target: Optional[str] = None, on_snapshot_create_logs: Optional[Callable[[str], None]] = None, cmd: Optional[str] = None, create_timeout: float = 300, ): """ Args: api_key: Daytona API key. Falls back to ``DAYTONA_API_KEY`` env var. public: If True, the sandbox preview is publicly accessible. resources: Optional ``daytona.Resources`` instance for CPU/memory. auto_stop_interval: Minutes of inactivity before auto-stop (0 disables). target: Daytona target region (e.g. "us"). on_snapshot_create_logs: Callback for snapshot build log lines. cmd: Shell command to start the server inside the sandbox. create_timeout: Seconds to wait for sandbox creation (default 300). Heavy images (e.g. with Playwright/Chromium) may need more. """ from daytona import Daytona, DaytonaConfig config_kwargs: Dict[str, Any] = {} resolved_key = api_key or os.environ.get("DAYTONA_API_KEY") if resolved_key: config_kwargs["api_key"] = resolved_key if target: config_kwargs["target"] = target self._daytona = Daytona(DaytonaConfig(**config_kwargs)) self._public = public self._resources = resources self._auto_stop_interval = auto_stop_interval self._on_snapshot_create_logs = on_snapshot_create_logs self._cmd = cmd self._create_timeout = create_timeout self._sandbox: Any = None self._preview_url: Optional[str] = None def _discover_server_cmd(self, sandbox: Any, port: int = 8000) -> str: """Discover the server command from ``openenv.yaml`` inside *sandbox*. Finds the file, reads the ``app`` field, and constructs a command of the form ``cd && python -m uvicorn --host 0.0.0.0 --port ``. Raises: ValueError: If ``openenv.yaml`` is not found or lacks an ``app`` field. """ yaml_path = self._find_openenv_yaml(sandbox) if yaml_path is None: raise ValueError( "Could not find openenv.yaml inside the sandbox. " "Pass an explicit cmd= to DaytonaProvider or start_container()." ) cat_resp = sandbox.process.exec(f"cat {shlex.quote(yaml_path)}", timeout=10) content = cat_resp.result if hasattr(cat_resp, "result") else str(cat_resp) app = self._parse_app_field(content) if app is None: raise ValueError( f"openenv.yaml at {yaml_path} does not contain an 'app' field. " "Pass an explicit cmd= to DaytonaProvider or start_container()." ) # The directory containing openenv.yaml is the env root env_root = yaml_path.rsplit("/", 1)[0] return ( f"cd {shlex.quote(env_root)} && " f"python -m uvicorn {shlex.quote(app)} --host 0.0.0.0 --port {port}" ) def _find_openenv_yaml(self, sandbox: Any) -> Optional[str]: """Locate ``openenv.yaml`` inside the sandbox. Tries the modern layout path ``/app/env/openenv.yaml`` first, then falls back to a ``find`` command for the old layout. """ # Fast path: modern Dockerfile layout resp = sandbox.process.exec( "test -f /app/env/openenv.yaml && echo found", timeout=10 ) out = resp.result if hasattr(resp, "result") else str(resp) if "found" in (out or ""): return "/app/env/openenv.yaml" # Fallback: search for it (redirect stderr so error messages # like "No such file or directory" don't get mistaken for paths). resp = sandbox.process.exec( "find /app -maxdepth 4 -name openenv.yaml -print -quit 2>/dev/null", timeout=10, ) path = (resp.result if hasattr(resp, "result") else str(resp) or "").strip() if path and path.startswith("/"): return path return None @staticmethod def _parse_app_field(yaml_content: str) -> Optional[str]: """Extract the ``app`` value from raw openenv.yaml content. Uses PyYAML to handle comments, quotes, and nested keys correctly. """ try: data = yaml.safe_load(yaml_content) or {} except Exception: return None if not isinstance(data, dict): return None value = data.get("app") if isinstance(value, str): value = value.strip() return value if value else None return None @staticmethod def _parse_dockerfile_cmd(dockerfile_content: str) -> Optional[str]: """Extract the server command from the last ``CMD`` in a Dockerfile. Handles exec form (``CMD ["prog", "arg"]``) and shell form (``CMD prog arg``). When a Dockerfile has multiple ``CMD`` instructions (e.g. multi-stage builds), the last one wins - same semantics as Docker itself. Lines where ``CMD`` appears inside a comment are ignored. Returns: The command as a single string, or ``None`` if no ``CMD`` found. """ import re last_cmd: Optional[str] = None for line in dockerfile_content.splitlines(): stripped = line.strip() # Skip comments if stripped.startswith("#"): continue match = re.match(r"CMD\s+(.+)", stripped, flags=re.IGNORECASE) if match: last_cmd = match.group(1).strip() if last_cmd is None: return None # Exec form: CMD ["executable", "param1", ...] if last_cmd.startswith("["): try: parts = json.loads(last_cmd) if isinstance(parts, list) and all(isinstance(p, str) for p in parts): return " ".join(parts) except (json.JSONDecodeError, TypeError): pass # Shell form: CMD executable param1 ... return last_cmd if last_cmd else None @staticmethod def strip_buildkit_syntax(dockerfile_content: str) -> str: """Remove BuildKit ``--mount=...`` flags from ``RUN`` instructions. Handles single-line flags, multi-line continuations, and multiple ``--mount`` flags spread across continuation lines. Only leading ``--mount`` flags are removed (before the actual command starts). Daytona's ``Image.from_dockerfile`` does not support BuildKit ``--mount`` syntax. This helper strips the flags so that standard Dockerfiles (like the ones generated by ``openenv build``) can be used directly. """ import re def strip_leading_mounts(text: str) -> str: remaining = text while True: match = re.match(r"\s*--mount=\S+\s*", remaining) if not match: return remaining remaining = remaining[match.end() :] lines = dockerfile_content.split("\n") result: list[str] = [] in_run = False in_mount_prefix = False for line in lines: line_out = line run_start = False if re.match(r"\s*RUN(\s+|$)", line, flags=re.IGNORECASE): in_run = True in_mount_prefix = True run_start = True if in_run and in_mount_prefix: original_ends_with_slash = line_out.rstrip().endswith("\\") if run_start: match = re.match(r"(\s*RUN\s+)(.*)$", line_out, flags=re.IGNORECASE) if match: run_prefix, remainder = match.group(1), match.group(2) else: run_prefix, remainder = line_out, "" new_remainder = strip_leading_mounts(remainder) line_out = run_prefix + new_remainder content_for_check = new_remainder else: new_remainder = strip_leading_mounts(line_out) line_out = new_remainder content_for_check = new_remainder if original_ends_with_slash and not line_out.rstrip().endswith("\\"): line_out = line_out.rstrip() + " \\" if content_for_check.strip() not in ("", "\\"): in_mount_prefix = False if in_run and not line_out.rstrip().endswith("\\"): in_run = False in_mount_prefix = False result.append(line_out) return "\n".join(result) @classmethod def image_from_dockerfile( cls, dockerfile_path: str, context_dir: str | None = None, ) -> str: """Validate a Dockerfile and return a ``dockerfile:`` URI for :meth:`start_container`. Eagerly validates the Dockerfile (existence, COPY sources, BuildKit stripping) and stores the processed content in an internal registry. The actual ``daytona.Image`` is created later inside ``start_container``. Args: dockerfile_path: Path to the Dockerfile on disk. context_dir: Build context directory. Defaults to the Dockerfile's grandparent directory, matching the ``openenv init`` convention where Dockerfiles live in ``/server/Dockerfile`` and the build context is ``/``. Pass explicitly for non-standard layouts (e.g. ``context_dir="."`` for repo-root contexts). Returns: A ``"dockerfile:"`` string to pass to ``start_container``. Raises: FileNotFoundError: If *dockerfile_path* does not exist. ValueError: If *context_dir* is given but does not exist, or if COPY sources in the Dockerfile cannot be found under the resolved context directory. """ import pathlib import re src = pathlib.Path(dockerfile_path).resolve() if not src.is_file(): raise FileNotFoundError(f"Dockerfile not found: {dockerfile_path}") if context_dir is not None: ctx = pathlib.Path(context_dir) if not ctx.is_dir(): raise ValueError(f"context_dir does not exist: {context_dir}") else: # Default: grandparent of the Dockerfile, matching the # openenv init layout (/server/Dockerfile -> /). ctx = src.parent.parent content = src.read_text() stripped = cls.strip_buildkit_syntax(content) # Validate that COPY sources exist under the context directory. # This catches mismatches early (e.g. a Dockerfile expecting repo # root as context when we defaulted to the env directory). for line in stripped.splitlines(): m = re.match(r"^\s*COPY\s+(?!--from=)(\S+)\s+", line, re.IGNORECASE) if not m: continue copy_src = m.group(1) if copy_src.startswith("/"): continue resolved = ctx / copy_src if not resolved.exists() and not any(ctx.glob(copy_src)): raise ValueError( f"Dockerfile COPY source '{copy_src}' not found " f"under context_dir '{ctx}'. This Dockerfile may " f"expect a different build context (e.g. the repo " f"root). Pass context_dir explicitly." ) # Parse CMD from the original Dockerfile so start_container can # use it as a fallback when openenv.yaml is unavailable. parsed_cmd = cls._parse_dockerfile_cmd(content) cls._dockerfile_registry[str(src)] = { "stripped_content": stripped, "context_dir": str(ctx), "server_cmd": parsed_cmd, } return f"dockerfile:{src}" def start_container( self, image: str, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> str: """ Create a Daytona sandbox from a Docker image or snapshot. Daytona does not execute the image's CMD (known bug — ENTRYPOINT runs, CMD does not). The server command is resolved in order: 1. Explicit ``cmd`` passed to the constructor. 2. ``cmd`` key in ``**kwargs`` (popped before forwarding). 3. Auto-discovered from ``openenv.yaml`` inside the sandbox. 4. ``CMD`` parsed from the Dockerfile (when *image* came from ``image_from_dockerfile``). Args: image: Docker image name (e.g. ``"echo-env:latest"``), ``"snapshot:"`` to create from a pre-built snapshot, or ``"dockerfile:"`` returned by :meth:`image_from_dockerfile`. port: Must be ``None`` or ``8000``. Daytona exposes port 8000 via its preview proxy; other ports raise ``ValueError``. env_vars: Environment variables forwarded to the sandbox. **kwargs: ``cmd`` (str) to override the server command; remaining kwargs passed through to ``Daytona.create()``. Returns: HTTPS preview URL for the sandbox (base_url). """ if port is not None and port != 8000: raise ValueError( f"DaytonaProvider only supports port 8000 (got {port}). " "The Daytona preview proxy routes to port 8000 inside the sandbox." ) # Resolve the server command (may be None; discovery happens after # sandbox creation when we can inspect the filesystem). cmd = kwargs.pop("cmd", None) or self._cmd # CMD parsed from Dockerfile (populated for "dockerfile:" images). parsed_cmd: Optional[str] = None # Build creation params create_kwargs: Dict[str, Any] = {} if env_vars: create_kwargs["env_vars"] = env_vars if self._public: create_kwargs["public"] = True if self._auto_stop_interval != 15: create_kwargs["auto_stop_interval"] = self._auto_stop_interval if image.startswith("snapshot:"): from daytona import CreateSandboxFromSnapshotParams snapshot_name = image[len("snapshot:") :] params = CreateSandboxFromSnapshotParams( snapshot=snapshot_name, **create_kwargs ) elif image.startswith("dockerfile:"): from daytona import CreateSandboxFromImageParams, Image dockerfile_path = image[len("dockerfile:") :] meta = self._dockerfile_registry.get(dockerfile_path) if meta is None: raise ValueError( f"No registered Dockerfile metadata for {dockerfile_path}. " "Call DaytonaProvider.image_from_dockerfile() first." ) parsed_cmd = meta.get("server_cmd") # Build the daytona Image from the pre-stripped content. import pathlib import uuid ctx = pathlib.Path(meta["context_dir"]) tmp_name = f".daytona-{uuid.uuid4().hex[:8]}.dockerfile" tmp_path = ctx / tmp_name try: tmp_path.write_text(meta["stripped_content"]) daytona_image = Image.from_dockerfile(str(tmp_path)) finally: tmp_path.unlink(missing_ok=True) img_kwargs: Dict[str, Any] = { "image": daytona_image, **create_kwargs, } if self._resources is not None: img_kwargs["resources"] = self._resources params = CreateSandboxFromImageParams(**img_kwargs) else: from daytona import CreateSandboxFromImageParams img_kwargs = {"image": image, **create_kwargs} if self._resources is not None: img_kwargs["resources"] = self._resources params = CreateSandboxFromImageParams(**img_kwargs) # Create sandbox extra: Dict[str, Any] = dict(kwargs) if self._on_snapshot_create_logs is not None: extra["on_snapshot_create_logs"] = self._on_snapshot_create_logs self._sandbox = self._daytona.create( params, timeout=self._create_timeout, **extra ) try: # Discover server command from openenv.yaml if not explicitly set. if cmd is None: try: cmd = self._discover_server_cmd(self._sandbox) except ValueError: # Fall back to CMD parsed from Dockerfile (if available). if parsed_cmd: cmd = parsed_cmd else: raise # Wrap in bash -c so compound commands (cd ... && uvicorn ...) # are handled correctly by nohup. Write PID so we can check # if the process crashed later in wait_for_ready(). escaped_cmd = shlex.quote(cmd) self._sandbox.process.exec( f"nohup bash -c {escaped_cmd} > /tmp/openenv-server.log 2>&1 &" " echo $! > /tmp/openenv-server.pid", timeout=10, ) # Get a signed preview URL for port 8000. The token is # embedded in the URL itself so no extra headers are needed. signed = self._sandbox.create_signed_preview_url( 8000, expires_in_seconds=86400 ) self._preview_url = signed.url except Exception: self.stop_container() raise return self._preview_url def refresh_preview_url(self) -> str: """Get a fresh signed preview URL (valid for 24h). Daytona signed URLs expire after at most 24 hours. Call this to get a new one for long-running sessions. The returned URL points to the same sandbox — clients will need to reconnect using it. """ if self._sandbox is None: raise RuntimeError("No active sandbox to refresh URL for.") signed = self._sandbox.create_signed_preview_url(8000, expires_in_seconds=86400) self._preview_url = signed.url return self._preview_url def stop_container(self) -> None: """Delete the Daytona sandbox.""" if self._sandbox is None: return try: self._daytona.delete(self._sandbox) finally: self._sandbox = None self._preview_url = None def wait_for_ready(self, base_url: str, timeout_s: float = 120.0) -> None: """ Poll the /health endpoint until the sandbox is ready. Uses a longer default timeout (120s) than Docker providers because Daytona sandboxes may have cold-start latency. Args: base_url: Preview URL returned by ``start_container()``. timeout_s: Maximum seconds to wait. Raises: TimeoutError: If the sandbox doesn't become ready in time. RuntimeError: If the server process died (detected via PID check). """ import requests health_url = f"{base_url}/health" deadline = time.time() + timeout_s while time.time() < deadline: try: response = requests.get(health_url, timeout=5.0) if response.status_code == 200: return except requests.RequestException: pass # Early exit: if the server process died, raise immediately # instead of waiting for the full health-check timeout. if self._sandbox is not None: resp = self._sandbox.process.exec( "kill -0 $(cat /tmp/openenv-server.pid) 2>/dev/null" " && echo RUNNING || echo DEAD", timeout=10, ) out = resp.result if hasattr(resp, "result") else str(resp) if "DEAD" in (out or ""): log_resp = self._sandbox.process.exec( "cat /tmp/openenv-server.log 2>/dev/null", timeout=10 ) log = ( log_resp.result if hasattr(log_resp, "result") else str(log_resp) ) raise RuntimeError(f"Server process died.\nLog:\n{log}") time.sleep(1.0) raise TimeoutError( f"Daytona sandbox at {base_url} did not become ready within {timeout_s}s" )