# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Synchronous wrapper for async EnvClient. This module provides a SyncEnvClient that wraps an async EnvClient, allowing synchronous usage while the underlying client uses async I/O. Example: >>> from openenv.core import GenericEnvClient >>> >>> # Create async client and get sync wrapper >>> async_client = GenericEnvClient(base_url="http://localhost:8000") >>> sync_client = async_client.sync() >>> >>> # Use synchronous API >>> with sync_client: ... result = sync_client.reset() ... result = sync_client.step({"code": "print('hello')"}) """ from __future__ import annotations import asyncio import concurrent.futures import inspect import threading from typing import Any, Dict, Generic, TYPE_CHECKING, TypeVar from .client_types import StateT, StepResult if TYPE_CHECKING: from .env_client import EnvClient ActT = TypeVar("ActT") ObsT = TypeVar("ObsT") class SyncEnvClient(Generic[ActT, ObsT, StateT]): """ Synchronous wrapper around an async EnvClient. This class provides a synchronous interface to an async EnvClient, making it easier to use in synchronous code or to stop async from "infecting" the entire call stack. The wrapper executes async operations on a dedicated background event loop so connection state remains bound to a single loop. Cleanup note: For guaranteed resource cleanup, use `with SyncEnvClient(...)` or call `close()` explicitly. `__del__` is best-effort only and may not run reliably (for example, during interpreter shutdown). Example: >>> # From an async client >>> async_client = GenericEnvClient(base_url="http://localhost:8000") >>> sync_client = async_client.sync() >>> >>> # Use synchronous context manager >>> with sync_client: ... result = sync_client.reset() ... result = sync_client.step({"action": "test"}) Attributes: _async: The wrapped async EnvClient instance """ def __init__(self, async_client: "EnvClient[ActT, ObsT, StateT]"): """ Initialize sync wrapper around an async client. Args: async_client: The async EnvClient to wrap """ self._async = async_client self._loop: asyncio.AbstractEventLoop | None = None self._loop_thread: threading.Thread | None = None self._loop_ready = threading.Event() self._loop_init_lock = threading.Lock() self._async_wrapper_cache: Dict[str, Any] = {} def _run_loop_forever(self) -> None: """Run a dedicated event loop for this sync client.""" loop = asyncio.new_event_loop() self._loop = loop asyncio.set_event_loop(loop) self._loop_ready.set() loop.run_forever() loop.close() def _ensure_loop(self) -> asyncio.AbstractEventLoop: """Start background loop thread on first use.""" if ( self._loop is not None and self._loop_thread and self._loop_thread.is_alive() ): return self._loop # Protect loop initialization when multiple threads race on first use. with self._loop_init_lock: if ( self._loop is not None and self._loop_thread and self._loop_thread.is_alive() ): return self._loop self._loop_ready.clear() self._loop_thread = threading.Thread( target=self._run_loop_forever, name="openenv-sync-client-loop", daemon=True, ) self._loop_thread.start() if not self._loop_ready.wait(timeout=5): raise RuntimeError("Timed out starting sync client event loop") assert self._loop is not None return self._loop def _run(self, coro: Any) -> Any: """Run coroutine on dedicated loop and block for result.""" loop = self._ensure_loop() future: concurrent.futures.Future[Any] = asyncio.run_coroutine_threadsafe( coro, loop ) return future.result() def _stop_loop(self) -> None: """Stop and join background loop thread.""" loop = self._loop thread = self._loop_thread if loop is None: return if loop.is_running(): loop.call_soon_threadsafe(loop.stop) if thread is not None: thread.join(timeout=5) self._loop = None self._loop_thread = None @property def async_client(self) -> "EnvClient[ActT, ObsT, StateT]": """Access the underlying async client.""" return self._async def connect(self) -> "SyncEnvClient[ActT, ObsT, StateT]": """ Establish connection to the server. Returns: self for method chaining """ self._run(self._async.connect()) return self def disconnect(self) -> None: """Close the connection.""" self._run(self._async.disconnect()) def reset(self, **kwargs: Any) -> StepResult[ObsT]: """ Reset the environment. Args: **kwargs: Optional parameters passed to the environment's reset method Returns: StepResult containing initial observation """ return self._run(self._async.reset(**kwargs)) def step(self, action: ActT, **kwargs: Any) -> StepResult[ObsT]: """ Execute an action in the environment. Args: action: The action to execute **kwargs: Optional parameters Returns: StepResult containing observation, reward, and done status """ return self._run(self._async.step(action, **kwargs)) def state(self) -> StateT: """ Get the current environment state. Returns: State object with environment state information """ return self._run(self._async.state()) def close(self) -> None: """Close the connection and clean up resources.""" try: self._run(self._async.close()) finally: self._stop_loop() def __enter__(self) -> "SyncEnvClient[ActT, ObsT, StateT]": """Enter context manager, establishing connection.""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit context manager, closing connection.""" self.close() def __del__(self) -> None: """ Best-effort cleanup for background loop thread. Do not rely on this for deterministic cleanup; prefer context-manager usage or an explicit `close()` call. """ try: self._stop_loop() except Exception: pass def __getattr__(self, name: str) -> Any: """ Delegate unknown attributes to the async client. Async methods are wrapped to run on the sync client's dedicated loop. """ attr = getattr(self._async, name) if inspect.iscoroutinefunction(attr): cached = self._async_wrapper_cache.get(name) if cached is not None: return cached def sync_wrapper(*args: Any, **kwargs: Any) -> Any: method = getattr(self._async, name) return self._run(method(*args, **kwargs)) self._async_wrapper_cache[name] = sync_wrapper return sync_wrapper return attr # Delegate abstract method implementations to the wrapped client def _step_payload(self, action: ActT) -> Dict[str, Any]: """Delegate to async client's _step_payload.""" return self._async._step_payload(action) def _parse_result(self, payload: Dict[str, Any]) -> StepResult[ObsT]: """Delegate to async client's _parse_result.""" return self._async._parse_result(payload) def _parse_state(self, payload: Dict[str, Any]) -> StateT: """Delegate to async client's _parse_state.""" return self._async._parse_state(payload)