# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ JuliaEnv -------- Client-side wrapper for the Julia environment server. This client maintains a persistent WebSocket connection to the environment server, enabling efficient multi-step interactions with lower latency. - Users instantiate JuliaEnv with a base_url provided by the higher-level vector/orchestration layer. - Environment authors ship the Docker image that serves the API. """ from __future__ import annotations from openenv.core.client_types import StepResult from openenv.core.env_client import EnvClient from .models import JuliaAction, JuliaObservation, JuliaState class JuliaEnv(EnvClient[JuliaAction, JuliaObservation, JuliaState]): """ WebSocket client for the Julia Environment. This client connects to a JuliaEnvironment server and provides methods to interact with it: reset(), step(), and state access. The default message timeout is set to 180 seconds to accommodate: - Server execution timeout: 120s - Process pool worker wait: 30s - Network overhead: 30s buffer Example: >>> # Connect to a running server >>> client = JuliaEnv(base_url="http://localhost:8000") >>> result = client.reset() >>> print(result.observation.stdout) >>> >>> # Execute Julia code >>> action = JuliaAction( ... core_code=''' ... function multiply(a, b) ... return a * b ... end ... ''', ... test_code=''' ... using Test ... @test multiply(3, 4) == 12 ... ''' ... ) >>> result = client.step(action) >>> print(result.observation.tests_passed) # 1 >>> print(result.reward) Example with Docker: >>> # Automatically start container and connect >>> client = JuliaEnv.from_docker_image("julia-env:latest") >>> result = client.reset() >>> result = client.step(JuliaAction(core_code="println(2 + 2)", test_code="")) >>> print(result.observation.stdout) # "4\\n" >>> client.close() """ # Override default timeout to accommodate Julia execution + worker wait DEFAULT_MESSAGE_TIMEOUT = 180.0 # 120s execution + 30s worker wait + 30s buffer def __init__( self, base_url: str, connect_timeout_s: float = 10.0, message_timeout_s: float | None = None, **kwargs, ): """ Initialize JuliaEnv client with appropriate timeout. Args: base_url: Base URL of the Julia environment server connect_timeout_s: Timeout for establishing WebSocket connection message_timeout_s: Timeout for receiving responses (default: 180.0) **kwargs: Additional arguments passed to EnvClient """ if message_timeout_s is None: message_timeout_s = self.DEFAULT_MESSAGE_TIMEOUT super().__init__( base_url, connect_timeout_s=connect_timeout_s, message_timeout_s=message_timeout_s, **kwargs, ) # --- EnvClient abstract hooks --- def _step_payload(self, action: JuliaAction) -> dict: """ Convert JuliaAction to JSON payload for step request. Args: action: JuliaAction instance Returns: Dictionary representation suitable for JSON encoding """ return { "core_code": action.core_code, "test_code": action.test_code, } def _parse_result(self, payload: dict) -> StepResult[JuliaObservation]: """ Parse server response into StepResult[JuliaObservation]. Args: payload: JSON response from server Returns: StepResult with JuliaObservation """ obs_data = payload.get("observation", {}) observation = JuliaObservation( stdout=obs_data.get("stdout", ""), stderr=obs_data.get("stderr", ""), exit_code=obs_data.get("exit_code", 0), tests_passed=obs_data.get("tests_passed", 0), tests_failed=obs_data.get("tests_failed", 0), code_compiles=obs_data.get("code_compiles", True), done=payload.get("done", False), reward=payload.get("reward"), ) return StepResult[JuliaObservation]( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: dict) -> JuliaState: """ Parse server response into JuliaState object. Args: payload: JSON response from /state endpoint Returns: JuliaState object with episode metadata """ return JuliaState( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), last_exit_code=payload.get("last_exit_code", 0), last_code_compiles=payload.get("last_code_compiles", True), total_tests_passed=payload.get("total_tests_passed", 0), total_tests_failed=payload.get("total_tests_failed", 0), )