| """DeerFlow Sandbox Provisioner Service. |
| |
| Dynamically creates and manages per-sandbox Pods in Kubernetes. |
| Each ``sandbox_id`` gets its own Pod + NodePort Service. The backend |
| accesses sandboxes directly via ``{NODE_HOST}:{NodePort}``. |
| |
| The provisioner connects to the host machine's Kubernetes cluster via a |
| mounted kubeconfig (``~/.kube/config``). Sandbox Pods run on the host |
| K8s and are accessed by the backend via ``{NODE_HOST}:{NodePort}``. |
| |
| Endpoints: |
| POST /api/sandboxes β Create a sandbox Pod + Service |
| DELETE /api/sandboxes/{sandbox_id} β Destroy a sandbox Pod + Service |
| GET /api/sandboxes/{sandbox_id} β Get sandbox status & URL |
| GET /api/sandboxes β List all sandboxes |
| GET /health β Provisioner health check |
| |
| Architecture (docker-compose-dev): |
| ββββββββββββββ HTTP βββββββββββββββ K8s API ββββββββββββββββ |
| β remote β ββββββΈ β provisioner β βββββββββΈ β host K8s β |
| β _backend β β :8002 β β API server β |
| ββββββββββββββ βββββββββββββββ ββββββββ¬ββββββββ |
| β creates |
| βββββββββββββββ ββββββββΌββββββββ |
| β backend β βββββββββΈ β sandbox β |
| β β direct β Pod(s) β |
| βββββββββββββββ NodePort ββββββββββββββββ |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import time |
| from contextlib import asynccontextmanager |
|
|
| import urllib3 |
| from fastapi import FastAPI, HTTPException |
| from kubernetes import client as k8s_client |
| from kubernetes import config as k8s_config |
| from kubernetes.client.rest import ApiException |
| from pydantic import BaseModel |
|
|
| |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
| logger = logging.getLogger(__name__) |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
|
|
| |
|
|
| K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "deer-flow") |
| SANDBOX_IMAGE = os.environ.get( |
| "SANDBOX_IMAGE", |
| "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest", |
| ) |
| SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills") |
| THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads") |
|
|
| |
| |
| KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config") |
|
|
| |
| |
| |
| NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal") |
|
|
| |
|
|
| core_v1: k8s_client.CoreV1Api | None = None |
|
|
|
|
| def _init_k8s_client() -> k8s_client.CoreV1Api: |
| """Load kubeconfig from the mounted host config and return a CoreV1Api. |
| |
| Tries the mounted kubeconfig first, then falls back to in-cluster |
| config (useful if the provisioner itself runs inside K8s). |
| """ |
| if os.path.exists(KUBECONFIG_PATH): |
| if os.path.isdir(KUBECONFIG_PATH): |
| raise RuntimeError( |
| f"KUBECONFIG_PATH points to a directory, expected a file: {KUBECONFIG_PATH}" |
| ) |
| try: |
| k8s_config.load_kube_config(config_file=KUBECONFIG_PATH) |
| logger.info(f"Loaded kubeconfig from {KUBECONFIG_PATH}") |
| except Exception as exc: |
| raise RuntimeError( |
| f"Failed to load kubeconfig from {KUBECONFIG_PATH}: {exc}" |
| ) from exc |
| else: |
| logger.warning( |
| f"Kubeconfig not found at {KUBECONFIG_PATH}; trying in-cluster config" |
| ) |
| try: |
| k8s_config.load_incluster_config() |
| except Exception as exc: |
| raise RuntimeError( |
| "Failed to initialize Kubernetes client. " |
| f"No kubeconfig at {KUBECONFIG_PATH}, and in-cluster config is unavailable: {exc}" |
| ) from exc |
|
|
| |
| |
| |
| k8s_api_server = os.environ.get("K8S_API_SERVER") |
| if k8s_api_server: |
| configuration = k8s_client.Configuration.get_default_copy() |
| configuration.host = k8s_api_server |
| |
| configuration.verify_ssl = False |
| api_client = k8s_client.ApiClient(configuration) |
| return k8s_client.CoreV1Api(api_client) |
|
|
| return k8s_client.CoreV1Api() |
|
|
|
|
| def _wait_for_kubeconfig(timeout: int = 30) -> None: |
| """Wait for kubeconfig file if configured, then continue with fallback support.""" |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| if os.path.exists(KUBECONFIG_PATH): |
| if os.path.isfile(KUBECONFIG_PATH): |
| logger.info(f"Found kubeconfig file at {KUBECONFIG_PATH}") |
| return |
| if os.path.isdir(KUBECONFIG_PATH): |
| raise RuntimeError( |
| "Kubeconfig path is a directory. " |
| f"Please mount a kubeconfig file at {KUBECONFIG_PATH}." |
| ) |
| raise RuntimeError( |
| f"Kubeconfig path exists but is not a regular file: {KUBECONFIG_PATH}" |
| ) |
| logger.info(f"Waiting for kubeconfig at {KUBECONFIG_PATH} β¦") |
| time.sleep(2) |
| logger.warning( |
| f"Kubeconfig not found at {KUBECONFIG_PATH} after {timeout}s; " |
| "will attempt in-cluster Kubernetes config" |
| ) |
|
|
|
|
| def _ensure_namespace() -> None: |
| """Create the K8s namespace if it does not yet exist.""" |
| try: |
| core_v1.read_namespace(K8S_NAMESPACE) |
| logger.info(f"Namespace '{K8S_NAMESPACE}' already exists") |
| except ApiException as exc: |
| if exc.status == 404: |
| ns = k8s_client.V1Namespace( |
| metadata=k8s_client.V1ObjectMeta( |
| name=K8S_NAMESPACE, |
| labels={ |
| "app.kubernetes.io/name": "deer-flow", |
| "app.kubernetes.io/component": "sandbox", |
| }, |
| ) |
| ) |
| core_v1.create_namespace(ns) |
| logger.info(f"Created namespace '{K8S_NAMESPACE}'") |
| else: |
| raise |
|
|
|
|
| |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(_app: FastAPI): |
| global core_v1 |
| _wait_for_kubeconfig() |
| core_v1 = _init_k8s_client() |
| _ensure_namespace() |
| logger.info("Provisioner is ready (using host Kubernetes)") |
| yield |
|
|
|
|
| app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan) |
|
|
|
|
| |
|
|
|
|
| class CreateSandboxRequest(BaseModel): |
| sandbox_id: str |
| thread_id: str |
|
|
|
|
| class SandboxResponse(BaseModel): |
| sandbox_id: str |
| sandbox_url: str |
| status: str |
|
|
|
|
| |
|
|
|
|
| def _pod_name(sandbox_id: str) -> str: |
| return f"sandbox-{sandbox_id}" |
|
|
|
|
| def _svc_name(sandbox_id: str) -> str: |
| return f"sandbox-{sandbox_id}-svc" |
|
|
|
|
| def _sandbox_url(node_port: int) -> str: |
| """Build the sandbox URL using the configured NODE_HOST.""" |
| return f"http://{NODE_HOST}:{node_port}" |
|
|
|
|
| def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod: |
| """Construct a Pod manifest for a single sandbox.""" |
| return k8s_client.V1Pod( |
| metadata=k8s_client.V1ObjectMeta( |
| name=_pod_name(sandbox_id), |
| namespace=K8S_NAMESPACE, |
| labels={ |
| "app": "deer-flow-sandbox", |
| "sandbox-id": sandbox_id, |
| "app.kubernetes.io/name": "deer-flow", |
| "app.kubernetes.io/component": "sandbox", |
| }, |
| ), |
| spec=k8s_client.V1PodSpec( |
| containers=[ |
| k8s_client.V1Container( |
| name="sandbox", |
| image=SANDBOX_IMAGE, |
| image_pull_policy="IfNotPresent", |
| ports=[ |
| k8s_client.V1ContainerPort( |
| name="http", |
| container_port=8080, |
| protocol="TCP", |
| ) |
| ], |
| readiness_probe=k8s_client.V1Probe( |
| http_get=k8s_client.V1HTTPGetAction( |
| path="/v1/sandbox", |
| port=8080, |
| ), |
| initial_delay_seconds=5, |
| period_seconds=5, |
| timeout_seconds=3, |
| failure_threshold=3, |
| ), |
| liveness_probe=k8s_client.V1Probe( |
| http_get=k8s_client.V1HTTPGetAction( |
| path="/v1/sandbox", |
| port=8080, |
| ), |
| initial_delay_seconds=10, |
| period_seconds=10, |
| timeout_seconds=3, |
| failure_threshold=3, |
| ), |
| resources=k8s_client.V1ResourceRequirements( |
| requests={ |
| "cpu": "100m", |
| "memory": "256Mi", |
| "ephemeral-storage": "500Mi", |
| }, |
| limits={ |
| "cpu": "1000m", |
| "memory": "1Gi", |
| "ephemeral-storage": "500Mi", |
| }, |
| ), |
| volume_mounts=[ |
| k8s_client.V1VolumeMount( |
| name="skills", |
| mount_path="/mnt/skills", |
| read_only=True, |
| ), |
| k8s_client.V1VolumeMount( |
| name="user-data", |
| mount_path="/mnt/user-data", |
| read_only=False, |
| ), |
| ], |
| security_context=k8s_client.V1SecurityContext( |
| privileged=False, |
| allow_privilege_escalation=True, |
| ), |
| ) |
| ], |
| volumes=[ |
| k8s_client.V1Volume( |
| name="skills", |
| host_path=k8s_client.V1HostPathVolumeSource( |
| path=SKILLS_HOST_PATH, |
| type="Directory", |
| ), |
| ), |
| k8s_client.V1Volume( |
| name="user-data", |
| host_path=k8s_client.V1HostPathVolumeSource( |
| path=f"{THREADS_HOST_PATH}/{thread_id}/user-data", |
| type="DirectoryOrCreate", |
| ), |
| ), |
| ], |
| restart_policy="Always", |
| ), |
| ) |
|
|
|
|
| def _build_service(sandbox_id: str) -> k8s_client.V1Service: |
| """Construct a NodePort Service manifest (port auto-allocated by K8s).""" |
| return k8s_client.V1Service( |
| metadata=k8s_client.V1ObjectMeta( |
| name=_svc_name(sandbox_id), |
| namespace=K8S_NAMESPACE, |
| labels={ |
| "app": "deer-flow-sandbox", |
| "sandbox-id": sandbox_id, |
| "app.kubernetes.io/name": "deer-flow", |
| "app.kubernetes.io/component": "sandbox", |
| }, |
| ), |
| spec=k8s_client.V1ServiceSpec( |
| type="NodePort", |
| ports=[ |
| k8s_client.V1ServicePort( |
| name="http", |
| port=8080, |
| target_port=8080, |
| protocol="TCP", |
| |
| ) |
| ], |
| selector={ |
| "sandbox-id": sandbox_id, |
| }, |
| ), |
| ) |
|
|
|
|
| def _get_node_port(sandbox_id: str) -> int | None: |
| """Read the K8s-allocated NodePort from the Service.""" |
| try: |
| svc = core_v1.read_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE) |
| for port in svc.spec.ports or []: |
| if port.name == "http": |
| return port.node_port |
| except ApiException: |
| pass |
| return None |
|
|
|
|
| def _get_pod_phase(sandbox_id: str) -> str: |
| """Return the Pod phase (Pending / Running / Succeeded / Failed / Unknown).""" |
| try: |
| pod = core_v1.read_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) |
| return pod.status.phase or "Unknown" |
| except ApiException: |
| return "NotFound" |
|
|
|
|
| |
|
|
|
|
| @app.get("/health") |
| async def health(): |
| """Provisioner health check.""" |
| return {"status": "ok"} |
|
|
|
|
| @app.post("/api/sandboxes", response_model=SandboxResponse) |
| async def create_sandbox(req: CreateSandboxRequest): |
| """Create a sandbox Pod + NodePort Service for *sandbox_id*. |
| |
| If the sandbox already exists, returns the existing information |
| (idempotent). |
| """ |
| sandbox_id = req.sandbox_id |
| thread_id = req.thread_id |
|
|
| logger.info( |
| f"Received request to create sandbox '{sandbox_id}' for thread '{thread_id}'" |
| ) |
|
|
| |
| existing_port = _get_node_port(sandbox_id) |
| if existing_port: |
| return SandboxResponse( |
| sandbox_id=sandbox_id, |
| sandbox_url=_sandbox_url(existing_port), |
| status=_get_pod_phase(sandbox_id), |
| ) |
|
|
| |
| try: |
| core_v1.create_namespaced_pod(K8S_NAMESPACE, _build_pod(sandbox_id, thread_id)) |
| logger.info(f"Created Pod {_pod_name(sandbox_id)}") |
| except ApiException as exc: |
| if exc.status != 409: |
| raise HTTPException( |
| status_code=500, detail=f"Pod creation failed: {exc.reason}" |
| ) |
|
|
| |
| try: |
| core_v1.create_namespaced_service(K8S_NAMESPACE, _build_service(sandbox_id)) |
| logger.info(f"Created Service {_svc_name(sandbox_id)}") |
| except ApiException as exc: |
| if exc.status != 409: |
| |
| try: |
| core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) |
| except ApiException: |
| pass |
| raise HTTPException( |
| status_code=500, detail=f"Service creation failed: {exc.reason}" |
| ) |
|
|
| |
| node_port: int | None = None |
| for _ in range(20): |
| node_port = _get_node_port(sandbox_id) |
| if node_port: |
| break |
| time.sleep(0.5) |
|
|
| if not node_port: |
| raise HTTPException( |
| status_code=500, detail="NodePort was not allocated in time" |
| ) |
|
|
| return SandboxResponse( |
| sandbox_id=sandbox_id, |
| sandbox_url=_sandbox_url(node_port), |
| status=_get_pod_phase(sandbox_id), |
| ) |
|
|
|
|
| @app.delete("/api/sandboxes/{sandbox_id}") |
| async def destroy_sandbox(sandbox_id: str): |
| """Destroy a sandbox Pod + Service.""" |
| errors: list[str] = [] |
|
|
| |
| try: |
| core_v1.delete_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE) |
| logger.info(f"Deleted Service {_svc_name(sandbox_id)}") |
| except ApiException as exc: |
| if exc.status != 404: |
| errors.append(f"service: {exc.reason}") |
|
|
| |
| try: |
| core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) |
| logger.info(f"Deleted Pod {_pod_name(sandbox_id)}") |
| except ApiException as exc: |
| if exc.status != 404: |
| errors.append(f"pod: {exc.reason}") |
|
|
| if errors: |
| raise HTTPException( |
| status_code=500, detail=f"Partial cleanup: {', '.join(errors)}" |
| ) |
|
|
| return {"ok": True, "sandbox_id": sandbox_id} |
|
|
|
|
| @app.get("/api/sandboxes/{sandbox_id}", response_model=SandboxResponse) |
| async def get_sandbox(sandbox_id: str): |
| """Return current status and URL for a sandbox.""" |
| node_port = _get_node_port(sandbox_id) |
| if not node_port: |
| raise HTTPException(status_code=404, detail=f"Sandbox '{sandbox_id}' not found") |
|
|
| return SandboxResponse( |
| sandbox_id=sandbox_id, |
| sandbox_url=_sandbox_url(node_port), |
| status=_get_pod_phase(sandbox_id), |
| ) |
|
|
|
|
| @app.get("/api/sandboxes") |
| async def list_sandboxes(): |
| """List every sandbox currently managed in the namespace.""" |
| try: |
| services = core_v1.list_namespaced_service( |
| K8S_NAMESPACE, |
| label_selector="app=deer-flow-sandbox", |
| ) |
| except ApiException as exc: |
| raise HTTPException( |
| status_code=500, detail=f"Failed to list services: {exc.reason}" |
| ) |
|
|
| sandboxes: list[SandboxResponse] = [] |
| for svc in services.items: |
| sid = (svc.metadata.labels or {}).get("sandbox-id") |
| if not sid: |
| continue |
| node_port = None |
| for port in svc.spec.ports or []: |
| if port.name == "http": |
| node_port = port.node_port |
| break |
| if node_port: |
| sandboxes.append( |
| SandboxResponse( |
| sandbox_id=sid, |
| sandbox_url=_sandbox_url(node_port), |
| status=_get_pod_phase(sid), |
| ) |
| ) |
|
|
| return {"sandboxes": sandboxes, "count": len(sandboxes)} |
|
|