Buckets:
Core API
The openenv.core package provides the core abstractions for building and running environments. For an end-to-end tutorial on building environments with OpenEnv, see the building an environment guide.
If you are trying to understand when OpenEnv exposes the training loop versus direct MCP access, see the simulation vs production mode guide.
For a high-level explanation of how MCP-backed environments move through step(), step_async(), and convenience tool helpers, see the MCP environment lifecycle guide.
Server
Environment server primitives[[openenv.core.Message]]
openenv.core.Message[[openenv.core.Message]]
A message in a conversation.
Compatible with Huggingface chat template format.
openenv.core.ModelTokenizer[[openenv.core.ModelTokenizer]]
Protocol for tokenizers that support chat templates.
This protocol defines the interface that tokenizers must implement to work with chat-based environments. It's compatible with Huggingface transformers tokenizers.
apply_chat_templateopenenv.core.ModelTokenizer.apply_chat_templatehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/interfaces.py#L41[{"name": "conversation", "val": ": list"}, {"name": "tokenize", "val": ": bool = True"}, {"name": "return_tensors", "val": ": str | None = None"}, {"name": "**kwargs", "val": ": typing.Any"}]- conversation (list[Message]) --
List of message dictionaries with 'role' and 'content'.
- tokenize (
bool, optional, defaults toTrue) -- Whether to tokenize the output. - return_tensors (
str, optional) -- Format for returned tensors ('pt' for PyTorch). - **kwargs -- Additional arguments.0Formatted and optionally tokenized conversation. Apply a chat template to format and optionally tokenize a conversation.
Parameters:
conversation (list[Message]) : List of message dictionaries with 'role' and 'content'.
tokenize (bool, optional, defaults to True) : Whether to tokenize the output.
return_tensors (str, optional) : Format for returned tensors ('pt' for PyTorch).
- **kwargs : Additional arguments.
Returns:
Formatted and optionally tokenized conversation.
decode[[openenv.core.ModelTokenizer.decode]]
Decode token IDs back to text.
Parameters:
token_ids (Any) : Token IDs to decode.
skip_special_tokens (bool, optional, defaults to False) : Whether to skip special tokens in output.
- **kwargs : Additional arguments.
Returns:
str
Decoded text string.
openenv.core.Transform[[openenv.core.Transform]]
Transform observations to add rewards, metrics, or other modifications.
Transforms follow the TorchRL pattern where they take an observation and return a (potentially modified) observation. This allows for flexible reward computation and observation augmentation.
openenv.core.Environment[[openenv.core.Environment]]
Base class for all environment servers following Gym/Gymnasium API.
See rfcs/004-rubrics.md for rubric design details.
closeopenenv.core.Environment.closehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/interfaces.py#L317[] Clean up resources used by the environment.
Override this method to implement custom cleanup logic. Called when the environment is being destroyed or reset.
Parameters:
transform (Transform, optional) : Optional transform to apply to observations.
rubric (Rubric, optional) : Optional rubric for reward computation. When provided, the rubric's output can be used to set the observation's reward in step().
SUPPORTS_CONCURRENT_SESSIONS (bool) : Whether this environment supports concurrent sessions. When True, multiple WebSocket connections can each have their own environment instance (up to max_concurrent_envs). When False (default), the environment should only be used with a single session at a time. Set this to True in your subclass if the environment uses proper session isolation (unique working dirs, no shared mutable state, and external resources that can handle concurrent access).
rubric (Rubric, optional) : Optional rubric for computing rewards. Set in __init__ and use in step() to compute observation rewards. Training infrastructure can access it for introspection: python for name, r in env.rubric.named_rubrics(): print(f"{name}: {r.last_score}")
get_metadata[[openenv.core.Environment.get_metadata]]
Get metadata about this environment.
Override this method to provide custom metadata for the environment. Default implementation returns basic metadata derived from class name.
Returns:
EnvironmentMetadata with environment information.
reset[[openenv.core.Environment.reset]]
Reset the environment and return initial observation.
reset_async[[openenv.core.Environment.reset_async]]
Async version of reset. Default implementation calls sync reset.
Override to provide true async implementation.
step[[openenv.core.Environment.step]]
Take a step in the environment.
step_async[[openenv.core.Environment.step_async]]
Async version of step. Default implementation calls sync step.
Override to provide true async implementation.
Types[[openenv.core.ServerMode]]
openenv.core.ServerMode[[openenv.core.ServerMode]]
Server operation mode.
openenv.core.HealthStatus[[openenv.core.HealthStatus]]
Server health status values.
openenv.core.WSErrorCode[[openenv.core.WSErrorCode]]
WebSocket error codes for structured error handling.
openenv.core.Action[[openenv.core.Action]]
Base class for all environment actions.
All action subclasses should inherit from this base class. Uses Pydantic for automatic validation and serialization.
openenv.core.Observation[[openenv.core.Observation]]
Base class for all environment observations.
All observation subclasses should inherit from this base class. Uses Pydantic for automatic validation and serialization.
openenv.core.env_server.types.ResetRequest[[openenv.core.env_server.types.ResetRequest]]
Request model for environment reset.
openenv.core.env_server.types.ResetResponse[[openenv.core.env_server.types.ResetResponse]]
Response model for environment reset.
openenv.core.env_server.types.StepRequest[[openenv.core.env_server.types.StepRequest]]
Request model for environment step.
openenv.core.env_server.types.StepResponse[[openenv.core.env_server.types.StepResponse]]
Response model for environment step.
openenv.core.BaseMessage[[openenv.core.BaseMessage]]
Base class for WebSocket messages with shared configuration.
openenv.core.State[[openenv.core.State]]
Base class for environment state.
Represents internal environment state, separate from observations.
openenv.core.env_server.types.CodeExecResult[[openenv.core.env_server.types.CodeExecResult]]
Result of code execution containing stdout, stderr, and exit code.
openenv.core.env_server.types.EnvironmentMetadata[[openenv.core.env_server.types.EnvironmentMetadata]]
Metadata about an environment for documentation and UI purposes.
openenv.core.SchemaResponse[[openenv.core.SchemaResponse]]
Response model for the combined schema endpoint.
openenv.core.HealthResponse[[openenv.core.HealthResponse]]
Response model for health check endpoint.
openenv.core.WSResetMessage[[openenv.core.WSResetMessage]]
WebSocket message to reset the environment.
openenv.core.WSStepMessage[[openenv.core.WSStepMessage]]
WebSocket message to execute a step.
openenv.core.WSStateMessage[[openenv.core.WSStateMessage]]
WebSocket message to request current state.
openenv.core.WSCloseMessage[[openenv.core.WSCloseMessage]]
WebSocket message to close the session.
openenv.core.WSObservationResponse[[openenv.core.WSObservationResponse]]
WebSocket response containing an observation.
openenv.core.WSStateResponse[[openenv.core.WSStateResponse]]
WebSocket response containing environment state.
openenv.core.WSErrorResponse[[openenv.core.WSErrorResponse]]
WebSocket response for errors.
openenv.core.ConcurrencyConfig[[openenv.core.ConcurrencyConfig]]
Configuration for concurrent environment sessions.
openenv.core.ServerCapacityStatus[[openenv.core.ServerCapacityStatus]]
Status of server capacity for concurrent sessions.
from_countsopenenv.core.ServerCapacityStatus.from_countshttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/types.py#L369[{"name": "active", "val": ": int"}, {"name": "max_sessions", "val": ": int"}] Create status from active and max session counts.
openenv.core.SessionInfo[[openenv.core.SessionInfo]]
Information about an active session.
Exceptions[[openenv.core.OpenEnvError]]
openenv.core.OpenEnvError[[openenv.core.OpenEnvError]]
Base exception for all OpenEnv errors.
openenv.core.ConcurrencyConfigurationError[[openenv.core.ConcurrencyConfigurationError]]
Raised when an environment is misconfigured for concurrent sessions.
This error is raised during server startup when max_concurrent_envs > 1 is specified for an environment that is not marked as SUPPORTS_CONCURRENT_SESSIONS.
openenv.core.SessionCapacityError[[openenv.core.SessionCapacityError]]
Raised when the server cannot accept new sessions due to capacity limits.
This error is raised when a new WebSocket connection is attempted but the server has already reached max_concurrent_envs active sessions.
openenv.core.SessionNotFoundError[[openenv.core.SessionNotFoundError]]
Raised when attempting to access a session that does not exist.
openenv.core.SessionCreationError[[openenv.core.SessionCreationError]]
Raised when a session cannot be created.
openenv.core.EnvironmentFactoryError[[openenv.core.EnvironmentFactoryError]]
Raised when the environment factory fails to create an instance.
HTTP server utilities[[openenv.core.HTTPEnvServer]]
openenv.core.HTTPEnvServer[[openenv.core.HTTPEnvServer]]
HTTP server wrapper for Environment instances.
This class wraps an Environment and exposes its reset(), step(), and state methods as HTTP and WebSocket endpoints compatible with EnvClient.
The server expects:
- Action deserialization: Converts JSON dict to Action subclass
- Observation serialization: Converts Observation subclass to JSON dict
Examples:
from core.env_server import HTTPEnvServer
from envs.coding_env.server import CodeExecutionEnvironment
from envs.coding_env.models import CodeAction, CodeObservation
# Pass environment class (factory pattern)
server = HTTPEnvServer(
env=CodeExecutionEnvironment,
action_cls=CodeAction,
observation_cls=CodeObservation,
max_concurrent_envs=4,
)
# Register routes with FastAPI
from fastapi import FastAPI
app = FastAPI()
server.register_routes(app)
get_capacity_statusopenenv.core.HTTPEnvServer.get_capacity_statushttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/http_server.py#L284[]`ServerCapacityStatus` with current session counts and availability.
Get the current capacity status of the server.
Returns:
ServerCapacityStatus with current session counts and availability.
get_session_info[[openenv.core.HTTPEnvServer.get_session_info]]
Get information about a specific session.
Parameters:
session_id (str) : The session ID to query.
Returns:
SessionInfo if the session exists, None otherwise.
register_routes[[openenv.core.HTTPEnvServer.register_routes]]
Register HTTP routes on a FastAPI application.
Parameters:
app (FastAPI) : FastAPI application instance.
mode (ServerMode or str, optional, defaults to ServerMode.SIMULATION) : Server mode. In production mode, simulation control endpoints (/reset, /step, /state) are NOT registered. Only safe endpoints (/health, /schema, /metadata, /ws) are available.
openenv.core.create_app[[openenv.core.create_app]]
Create a FastAPI application with or without web interface.
This function creates a FastAPI app with the web interface enabled by default, including README integration for better user experience.
Parameters:
env (Callable[[], Environment]) : Environment factory (callable) that creates new instances.
action_cls (Type[Action]) : The Action subclass this environment expects.
observation_cls (Type[Observation]) : The Observation subclass this environment returns.
env_name (str, optional) : Environment name for README loading.
max_concurrent_envs (int, optional) : Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config.
concurrency_config (ConcurrencyConfig, optional) : Advanced concurrency settings. Mutually exclusive with max_concurrent_envs.
gradio_builder (Callable, optional) : Callable to build a custom Gradio UI at /web. Signature: (web_manager, action_fields, metadata, is_chat_env, title, quick_start_md) -> gr.Blocks. When None, the default Gradio app is used.
custom_tab_name (str, optional, defaults to "Custom") : Label for the env-specific tab when gradio_builder is provided.
custom_tab_primary (bool, optional, defaults to False) : When True, the env-specific tab is active first; the auto-generated Playground becomes secondary.
show_default_tab (bool, optional, defaults to True) : When False, mount the env's gradio_builder output alone (no auto-generated Playground, no tab chrome). Only meaningful when gradio_builder is provided.
title_override (str, optional) : If set, used as the Gradio app title instead of the default "OpenEnv Agentic Environment: {name}".
Returns:
FastAPI application instance with or without web interface and README integration.
openenv.core.create_fastapi_app[[openenv.core.create_fastapi_app]]
Create a FastAPI application with comprehensive documentation.
Parameters:
env (Callable[[], Environment]) : Environment factory (callable) that creates new instances.
action_cls (Type[Action]) : The Action subclass this environment expects.
observation_cls (Type[Observation]) : The Observation subclass this environment returns.
max_concurrent_envs (int, optional) : Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config.
concurrency_config (ConcurrencyConfig, optional) : Advanced concurrency settings. Mutually exclusive with max_concurrent_envs.
Returns:
FastAPI application instance.
Web interface helpers[[openenv.core.env_server.web_interface.ActionLog]]
openenv.core.env_server.web_interface.ActionLog[[openenv.core.env_server.web_interface.ActionLog]]
Log entry for an action taken.
openenv.core.env_server.web_interface.EpisodeState[[openenv.core.env_server.web_interface.EpisodeState]]
Current episode state for the web interface.
openenv.core.WebInterfaceManager[[openenv.core.WebInterfaceManager]]
Manages the web interface for an environment.
connect_websocketopenenv.core.WebInterfaceManager.connect_websockethttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/web_interface.py#L308[{"name": "websocket", "val": ": WebSocket"}] Connect a new WebSocket client.
disconnect_websocket[[openenv.core.WebInterfaceManager.disconnect_websocket]]
Disconnect a WebSocket client.
get_state[[openenv.core.WebInterfaceManager.get_state]]
Get current environment state.
reset_environment[[openenv.core.WebInterfaceManager.reset_environment]]
Reset the environment and update state.
step_environment[[openenv.core.WebInterfaceManager.step_environment]]
Execute a step in the environment and update state.
openenv.core.create_web_interface_app[[openenv.core.create_web_interface_app]]
Create a FastAPI application with web interface for the given environment.
Parameters:
env : The Environment instance to serve
action_cls : The Action subclass this environment expects
observation_cls : The Observation subclass this environment returns
env_name : Optional environment name for README loading
max_concurrent_envs : Maximum concurrent WebSocket sessions
concurrency_config : Optional ConcurrencyConfig for advanced concurrency settings
gradio_builder : Optional callable (web_manager, action_fields, metadata, is_chat_env, title, quick_start_md) -> gr.Blocks to use instead of the default Gradio UI. Lets envs replace or customize the /web interface.
custom_tab_name : Label shown on the env-specific tab when gradio_builder is provided. Defaults to "Custom" for backwards compatibility; envs that ship a rich custom UI should pass a descriptive name (e.g. "REPL"). Ignored when show_default_tab=False (no tab chrome is rendered).
custom_tab_primary : When True, the env-specific tab is rendered first and selected by default; the auto-generated Playground becomes secondary. Use this for envs whose custom tab is the real interaction surface (so visitors don't land on a less informative schema form). Ignored when show_default_tab=False.
show_default_tab : When False, the auto-generated Playground tab is not rendered and the env's gradio_builder output is mounted directly (single-view UI, no tab chrome). Only meaningful when gradio_builder is provided.
title_override : If set, used verbatim as the Gradio app/browser-tab title instead of the default "OpenEnv Agentic Environment: {name}".
Returns:
FastAPI application instance with web interface
Serialization[[openenv.core.deserialize_action]]
openenv.core.deserialize_action[[openenv.core.deserialize_action]]
Convert JSON dict to Action instance using Pydantic validation.
MCP action types (list_tools, call_tool) are recognised
automatically via the "type" discriminator field, regardless of
the environment's configured action_cls. All other payloads
fall through to action_cls.model_validate().
For special cases (e.g., tensor fields, custom type conversions), use deserialize_action_with_preprocessing().
Parameters:
action_data (dict) : Dictionary containing action data.
action_cls (type) : The Action subclass to instantiate.
Returns:
Action instance.
openenv.core.deserialize_action_with_preprocessing[[openenv.core.deserialize_action_with_preprocessing]]
Convert JSON dict to Action instance with preprocessing for special types.
This version handles common type conversions needed for web interfaces:
- Converting lists/strings to tensors for 'tokens' field
- Converting string action_id to int
- Other custom preprocessing as needed
Parameters:
action_data (dict) : Dictionary containing action data.
action_cls (type) : The Action subclass to instantiate.
Returns:
Action instance.
openenv.core.serialize_observation[[openenv.core.serialize_observation]]
Convert Observation instance to JSON-compatible dict using Pydantic.
Parameters:
observation (Observation) : Observation instance to serialize.
Returns:
``dictcompatible withEnvClient._parse_result(), with keys
observation(dict): Observation fields.reward(floatorNone): Reward value.done(bool): Whether the episode is done.metadata(dict, optional): Additional observation metadata.
Transforms[[openenv.core.CompositeTransform]]
openenv.core.CompositeTransform[[openenv.core.CompositeTransform]]
Combines multiple transforms into a single transform.
openenv.core.NullTransform[[openenv.core.NullTransform]]
Default transform that passes through unchanged.
Route configuration[[openenv.core.GetEndpointConfig]]
openenv.core.GetEndpointConfig[[openenv.core.GetEndpointConfig]]
Configuration for a simple GET endpoint.
openenv.core.env_server.route_config.register_get_endpoints[[openenv.core.env_server.route_config.register_get_endpoints]]
Register multiple GET endpoints from configuration.
Parameters:
app (~fastapi.FastAPI) : FastAPI application instance.
configs (List[GetEndpointConfig]) : List of GET endpoint configurations.
Clients
Base client[[openenv.core.EnvClient]]
openenv.core.EnvClient[[openenv.core.EnvClient]]
Async environment client for persistent sessions.
This client maintains a persistent WebSocket connection to an environment server, enabling efficient multi-step interactions. Each client instance corresponds to a dedicated environment session on the server.
The client is async by default. For synchronous usage, use the .sync()
method to get a SyncEnvClient wrapper.
Features:
- Lower latency for sequential interactions
- Session state is maintained server-side
- Better suited for long-running episodes
- Async by default for modern Python async/await patterns
Examples:
Async usage:
from envs.coding_env.client import CodingEnv
async with CodingEnv(base_url="ws://localhost:8000") as env:
result = await env.reset(seed=42)
while not result.done:
action = agent.predict(result.observation)
result = await env.step(action)
Sync usage via .sync() wrapper:
env = CodingEnv(base_url="ws://localhost:8000").sync()
with env:
result = env.reset(seed=42)
result = env.step(action)
closeopenenv.core.EnvClient.closehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_client.py#L447[]
Close the WebSocket connection and clean up resources.
If this client was created via from_docker_image() or from_env(), this will also stop and remove the associated container/process.
connect[[openenv.core.EnvClient.connect]]
Establish WebSocket connection to the server.
Returns:
self for method chaining
disconnect[[openenv.core.EnvClient.disconnect]]
Close the WebSocket connection.
from_docker_image[[openenv.core.EnvClient.from_docker_image]]
Create an environment client by spinning up a Docker container.
Parameters:
image (str) : Docker image name to run (e.g., "coding-env:latest").
provider (ContainerProvider, optional) : Container provider to use. Defaults to LocalDockerProvider.
- **kwargs : Additional arguments to pass to
provider.start_container().
Returns:
Connected client instance
from_env[[openenv.core.EnvClient.from_env]]
Create a client from a Hugging Face Space.
Examples:
# Pull and run from HF Docker registry
env = await MyEnv.from_env("openenv/echo-env")
# Run locally with UV (clones the space)
env = await MyEnv.from_env("openenv/echo-env", use_docker=False)
# Run from a local checkout
env = await MyEnv.from_env(
"openenv/echo-env",
use_docker=False,
project_path="/path/to/local/checkout"
)
Parameters:
repo_id (str) : Hugging Face space identifier {org}/{space}.
use_docker (bool, optional, defaults to True) : When True, pull from the HF registry and launch via LocalDockerProvider. When False, run the space locally with UVProvider.
provider (ContainerProvider or RuntimeProvider, optional) : Provider instance to reuse. Must be a ContainerProvider when use_docker=True and a RuntimeProvider otherwise.
- **provider_kwargs : Additional keyword arguments forwarded to either the container provider's
start_container(docker) or to theUVProviderconstructor/start (uv). Whenuse_docker=False, theproject_pathargument can be used to override the default git URL (git+https://huggingface.co/spaces/{repo_id}).
Returns:
Connected client instance
reset[[openenv.core.EnvClient.reset]]
Reset the environment with optional parameters.
Parameters:
- **kwargs : Optional parameters passed to the environment's reset method.
Returns:
StepResult containing initial observation
state[[openenv.core.EnvClient.state]]
Get the current environment state from the server.
Returns:
State object with environment state information
step[[openenv.core.EnvClient.step]]
Execute an action in the environment.
Parameters:
action : The action to execute.
- **kwargs : Optional parameters (currently ignored).
Returns:
StepResult containing observation, reward, and done status
sync[[openenv.core.EnvClient.sync]]
Return a synchronous wrapper around this async client.
Use this method when you need synchronous access to the environment without async/await syntax. This is useful for:
- Integration with synchronous codebases
- Interactive/REPL usage
- Stopping async from "infecting" the call stack
Examples:
async_client = GenericEnvClient(base_url="http://localhost:8000")
sync_client = async_client.sync()
with sync_client:
result = sync_client.reset()
result = sync_client.step({"code": "print('hello')"})
Returns:
SyncEnvClient wrapper that provides synchronous methods
Synchronous client[[openenv.SyncEnvClient]]
openenv.SyncEnvClient[[openenv.SyncEnvClient]]
Synchronous wrapper around an async EnvClient.
This class provides a synchronous interface to an async EnvClient, making it easier to use in synchronous code or to stop async from "infecting" the entire call stack.
The wrapper executes async operations on a dedicated background event loop so connection state remains bound to a single loop.
For guaranteed resource cleanup, use with SyncEnvClient(...) or call
close() explicitly. __del__ is best-effort only and may not run
reliably (for example, during interpreter shutdown).
Examples:
# From an async client
async_client = GenericEnvClient(base_url="http://localhost:8000")
sync_client = async_client.sync()
# Use synchronous context manager
with sync_client:
result = sync_client.reset()
result = sync_client.step({"action": "test"})
closeopenenv.SyncEnvClient.closehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/sync_client.py#L211[] Close the connection and clean up resources.
Parameters:
_async : The wrapped async EnvClient instance
connect[[openenv.SyncEnvClient.connect]]
Establish connection to the server.
Returns:
self for method chaining
disconnect[[openenv.SyncEnvClient.disconnect]]
Close the connection.
reset[[openenv.SyncEnvClient.reset]]
Reset the environment.
Parameters:
- **kwargs : Optional parameters passed to the environment's reset method.
Returns:
StepResult containing initial observation
state[[openenv.SyncEnvClient.state]]
Get the current environment state.
Returns:
State object with environment state information
step[[openenv.SyncEnvClient.step]]
Execute an action in the environment.
Parameters:
action : The action to execute.
- **kwargs : Optional parameters.
Returns:
StepResult containing observation, reward, and done status
Generic client[[openenv.GenericEnvClient]]
openenv.GenericEnvClient[[openenv.GenericEnvClient]]
Environment client that works with raw dictionaries instead of typed classes.
This client doesn't require installing environment-specific packages, making it ideal for:
- Connecting to remote servers without installing their packages
- Quick prototyping and testing
- Environments where type safety isn't needed
- Security-conscious scenarios where you don't want to run remote code
The trade-off is that you lose type safety and IDE autocomplete for actions and observations. Instead of typed objects, you work with plain dictionaries.
Examples:
# Direct connection to a running server (no installation needed)
with GenericEnvClient(base_url="http://localhost:8000") as env:
result = env.reset()
result = env.step({"code": "print('hello')"})
print(result.observation) # Dict[str, Any]
print(result.observation.get("output"))
# From local Docker image
env = GenericEnvClient.from_docker_image("coding-env:latest")
result = env.reset()
result = env.step({"code": "x = 1 + 2"})
env.close()
# From HuggingFace Hub (pulls Docker image, no pip install)
env = GenericEnvClient.from_env("user/my-env", use_docker=True)
result = env.reset()
env.close()
GenericEnvClient inherits from_docker_image() and from_env() from
EnvClient, so you can use it with Docker containers and HuggingFace
Spaces without any package installation.
openenv.GenericAction[[openenv.GenericAction]]
A dictionary subclass for creating actions when using GenericEnvClient.
This provides a semantic wrapper around dictionaries to make code more readable when working with GenericEnvClient. It behaves exactly like a dict but signals intent that this is an action for an environment.
Examples:
# Without GenericAction (works fine)
env.step({"code": "print('hello')"})
# With GenericAction (more explicit)
action = GenericAction(code="print('hello')")
env.step(action)
# With multiple fields
action = GenericAction(code="x = 1", timeout=30, metadata={"tag": "test"})
env.step(action)
GenericAction is just a dict with a constructor that accepts keyword
arguments. It's provided for symmetry with typed Action classes and
to make code more readable.
LLM client[[openenv.core.ToolCall]]
openenv.core.ToolCall[[openenv.core.ToolCall]]
A single tool/function call returned by the model.
openenv.core.LLMResponse[[openenv.core.LLMResponse]]
Normalized response from an LLM, with optional tool calls.
to_message_dictopenenv.core.LLMResponse.to_message_dicthttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/llm_client.py#L52[] Convert to an OpenAI-format assistant message dict.
openenv.core.LLMClient[[openenv.core.LLMClient]]
Abstract base for LLM endpoint clients.
Subclass and implement complete() for your protocol.
completeopenenv.core.LLMClient.completehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/llm_client.py#L86[{"name": "prompt", "val": ": str"}, {"name": "**kwargs", "val": ""}]- prompt (str) --
The user prompt to send.
- **kwargs -- Override default parameters (temperature, max_tokens, etc.).0The model's text response. Send a prompt, return the text response.
Parameters:
endpoint (str) : The base URL of the LLM service (e.g. "http://localhost").
port (int) : The port the service listens on.
Returns:
The model's text response.
complete_with_tools[[openenv.core.LLMClient.complete_with_tools]]
Send messages with tool definitions, return a normalized response.
Messages use OpenAI-format dicts ({"role": "...", "content": "..."}).
Tools use MCP tool definitions; they are converted internally.
Parameters:
messages (list[dict[str, Any]]) : Conversation history as OpenAI-format message dicts.
tools (list[dict[str, Any]]) : MCP tool definitions.
- **kwargs : Override default parameters (temperature, max_tokens, etc.).
Returns:
An LLMResponse with the model's text and any tool calls.
openenv.core.OpenAIClient[[openenv.core.OpenAIClient]]
Client for OpenAI-compatible APIs.
Works with: OpenAI, vLLM, TGI, Ollama, HuggingFace Inference API, or any endpoint that speaks the OpenAI chat completions format.
completeopenenv.core.OpenAIClient.completehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/llm_client.py#L185[{"name": "prompt", "val": ": str"}, {"name": "**kwargs", "val": ""}]- prompt (str) --
The user message.
- **kwargs -- Overrides for temperature, max_tokens.0The assistant's response text. Send a chat completion request.
Parameters:
endpoint (str) : The base URL (e.g. "http://localhost").
port (int) : The port number.
model (str) : Model name to pass to the API.
api_key (str, optional) : API key. Defaults to "not-needed" for local endpoints.
system_prompt (str, optional) : System message prepended to every request.
temperature (float, optional, defaults to 0.0) : Default sampling temperature.
max_tokens (int, optional, defaults to 256) : Default max tokens in the response.
use_max_completion_tokens (bool, optional, defaults to False) : Use max_completion_tokens instead of max_tokens. Required for newer OpenAI models (gpt-5-mini, o1, o3). Not supported by self-hosted OpenAI-compatible endpoints.
Returns:
The assistant's response text.
openenv.core.AnthropicClient[[openenv.core.AnthropicClient]]
Client for Anthropic's Messages API.
Requires the anthropic package (lazy-imported at construction time).
Parameters:
endpoint (str) : The base URL (e.g. https://api.anthropic.com).
port (int) : The port number.
model (str) : Model name (e.g. "claude-sonnet-4-20250514").
api_key (str, optional) : Anthropic API key.
system_prompt (str, optional) : System message prepended to every request.
temperature (float, optional, defaults to 0.0) : Default sampling temperature.
max_tokens (int, optional, defaults to 256) : Default max tokens in the response.
openenv.core.create_llm_client[[openenv.core.create_llm_client]]
Create an LLM client for a hosted provider.
Parameters:
provider (str) : Provider name ("openai" or "anthropic").
model (str) : Model identifier.
api_key (str) : API key for the provider.
system_prompt (str, optional) : System message prepended to every request.
temperature (float, optional, defaults to 0.0) : Sampling temperature.
max_tokens (int, optional, defaults to 4096) : Maximum tokens in the response.
Returns:
A configured LLMClient instance.
Shared dataclasses[[openenv.core.client_types.StepResult]]
openenv.core.client_types.StepResult[[openenv.core.client_types.StepResult]]
Represents the result of one environment step.
Parameters:
observation : The environment's observation after the action.
reward (float, optional) : Scalar reward for this step.
done (bool, optional, defaults to False) : Whether the episode is finished.
metadata (dict, optional) : Additional metadata returned alongside the observation.
MCP (Model Context Protocol)
MCP environment[[openenv.core.MCPEnvironment]]
openenv.core.MCPEnvironment[[openenv.core.MCPEnvironment]]
Base class for environments that expose tools via MCP (Model Context Protocol).
MCPEnvironment bridges FastMCP servers with OpenEnv's Gym-style API, allowing agents to discover and invoke MCP tools through the standard step() interface.
The class automatically handles:
- ListToolsAction: Returns available tools from the MCP server
- CallToolAction: Invokes a specific tool with arguments
All other actions are delegated to the abstract _step_impl() method, which subclasses must implement.
Examples:
from fastmcp import FastMCP
mcp = FastMCP("calculator")
@mcp.tool()
def add(a: int, b: int) -> int:
return a + b
env = MyMCPEnvironment(mcp)
obs = env.step(ListToolsAction())
obs.tools[0].name # 'add'
closeopenenv.core.MCPEnvironment.closehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/mcp_environment.py#L644[]
Clean up resources used by the environment.
This method cleans up the MCP client and any other resources. Subclasses should call super().close() if they override this method.
Parameters:
mcp_server : A FastMCP server instance containing tool definitions. The server's tools will be validated against reserved names.
transform : Optional transform to apply to observations (inherited from Environment).
execute_code[[openenv.core.MCPEnvironment.execute_code]]
Execute Python code with tools available as callables.
This enables the CodeAct pattern where agents write Python code that calls tools directly as functions, avoiding JSON-RPC overhead.
Parameters:
code : Python code to execute. Tools are available as functions in the execution namespace. Set a variable named 'result' to capture the return value.
Returns:
Observation with result in metadata["result"] or error in metadata["error"].
get_callables[[openenv.core.MCPEnvironment.get_callables]]
Get callable functions for code mode.
Returns tool functions as direct Python callables, enabling code mode where agents write Python code that calls tools directly (no JSON-RPC overhead). Mode-specific tools are filtered by the current mode.
Returns:
Dictionary mapping tool names to callables.
mcp_session[[openenv.core.MCPEnvironment.mcp_session]]
Context manager for MCP client sessions.
This wrapper serves two purposes:
Null guard — raises a clear error if
close()has already been called (mcp_clientisNone).AsyncExitStack adapter — FastMCP's
Client.__aenter__creates a backgroundasyncio.Taskfor session management. When entered directly viaAsyncExitStackin the HTTP session path (_create_session), this task can be cancelled by ASGI harnesses (e.g. StarletteTestClient) between requests, corrupting session state. Wrapping in anasynccontextmanagergenerator isolates the task lifecycle: the generator frame keepsasync with client:suspended atyield, so cleanup only runs when the stack explicitly closes the generator — not when the event loop cancels orphaned tasks.
Delegates to FastMCP's Client context manager which is
reentrant: the first entry opens the transport and subsequent
(nested) entries simply increment an internal reference counter.
The transport is closed only when the outermost context exits.
No external lock is needed because Client._connect /
Client._disconnect already serialise connection state changes
through their own anyio.Lock.
step[[openenv.core.MCPEnvironment.step]]
Execute an action in the environment.
This method routes MCP-specific actions (ListToolsAction, CallToolAction) to the appropriate handlers, while delegating all other actions to the subclass's _step_impl() method.
Parameters:
action (Action) : The action to execute. ListToolsAction returns available MCP tools, CallToolAction invokes a specific MCP tool, and any other action is delegated to _step_impl().
timeout_s (float, optional) : Timeout in seconds for the action. Defaults to MCP_TOOL_CALL_TIMEOUT (30s) for MCP actions.
- **kwargs (
Any) : Additional arguments passed to handlers.
Returns:
Observation
ListToolsObservation for ListToolsAction,
CallToolObservation for CallToolAction, or a subclass-defined
Observation for other actions.
step_async[[openenv.core.MCPEnvironment.step_async]]
Async step that routes MCP actions without going through run_async_safely.
The WebSocket handler calls this directly on the outer event loop, where the MCP session is already open, avoiding the thread/event-loop deadlock that occurs when the sync step() path is used via run_in_executor.
tool[[openenv.core.MCPEnvironment.tool]]
Decorator for registering mode-aware tools.
Parameters:
mode : Optional mode for the tool ("production" or "simulation"). If None, tool is available in all modes.
Returns:
A decorator function for registering tools.
MCP types[[openenv.core.JsonRpcErrorCode]]
openenv.core.JsonRpcErrorCode[[openenv.core.JsonRpcErrorCode]]
Standard JSON-RPC 2.0 error codes.
See: https://www.jsonrpc.org/specification#error_object
openenv.core.McpMethod[[openenv.core.McpMethod]]
Supported MCP method names.
openenv.core.JsonRpcError[[openenv.core.JsonRpcError]]
JSON-RPC 2.0 error object.
See: https://www.jsonrpc.org/specification#error_object
from_codeopenenv.core.JsonRpcError.from_codehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/mcp_types.py#L73[{"name": "code", "val": ": JsonRpcErrorCode"}, {"name": "message", "val": ": typing.Optional[str] = None"}, {"name": "data", "val": ": typing.Any = None"}] Create an error from a standard error code.
openenv.core.JsonRpcRequest[[openenv.core.JsonRpcRequest]]
JSON-RPC 2.0 request object.
See: https://www.jsonrpc.org/specification#request_object
openenv.core.JsonRpcResponse[[openenv.core.JsonRpcResponse]]
JSON-RPC 2.0 response object.
Per JSON-RPC 2.0 spec, a response has either 'result' or 'error', not both. This model excludes None values during serialization to comply with the spec.
See: https://www.jsonrpc.org/specification#response_object
error_responseopenenv.core.JsonRpcResponse.error_responsehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/env_server/mcp_types.py#L163[{"name": "code", "val": ": JsonRpcErrorCode"}, {"name": "message", "val": ": typing.Optional[str] = None"}, {"name": "data", "val": ": typing.Any = None"}, {"name": "request_id", "val": ": typing.Union[str, int, NoneType] = None"}] Create an error response from a standard error code.
model_dump[[openenv.core.JsonRpcResponse.model_dump]]
Serialize to dict, excluding result or error when None (JSON-RPC compliance).
model_dump_json[[openenv.core.JsonRpcResponse.model_dump_json]]
Serialize to JSON string, excluding result or error when None (JSON-RPC compliance).
success[[openenv.core.JsonRpcResponse.success]]
Create a success response.
openenv.core.Tool[[openenv.core.Tool]]
Strongly typed MCP tool specification.
Follows the MCP ToolSpec format for tool discovery. See: https://modelcontextprotocol.io/specification/2025-06-18/server/tools
openenv.core.ToolErrorType[[openenv.core.ToolErrorType]]
Types of errors that can occur during tool execution.
openenv.core.ToolError[[openenv.core.ToolError]]
Structured error for tool execution failures.
This is used for transport/framework errors, NOT for errors returned by the tool itself (those go in the result field).
openenv.core.ListToolsAction[[openenv.core.ListToolsAction]]
Request list of available tools from the environment.
This action triggers MCP's tools/list operation and returns all available tools with their schemas. Does NOT require reset() to be called first.
openenv.core.CallToolAction[[openenv.core.CallToolAction]]
Call a specific tool via MCP.
This action triggers MCP's tools/call operation with the specified tool name and arguments.
openenv.core.ListToolsObservation[[openenv.core.ListToolsObservation]]
Response containing available tools.
Returned when processing a ListToolsAction.
openenv.core.CallToolObservation[[openenv.core.CallToolObservation]]
Response from tool execution.
Contains the tool's result or an error if the call failed. Tool-specific errors (from the tool itself) are included in the result. Transport/framework errors use the error field.
openenv.core.WSMCPMessage[[openenv.core.WSMCPMessage]]
WebSocket message for MCP JSON-RPC requests.
Allows direct MCP access via WebSocket for production inference, bypassing the step() API.
openenv.core.WSMCPResponse[[openenv.core.WSMCPResponse]]
WebSocket response for MCP JSON-RPC.
Contains the JSON-RPC response from the MCP server.
MCP client[[openenv.core.MCPClientBase]]
openenv.core.MCPClientBase[[openenv.core.MCPClientBase]]
Base class for MCP clients with tool discovery.
This class provides the common list_tools() method for discovering
available tools from an MCP-enabled environment. Subclasses implement
specific interaction patterns (tool-calling or CodeAct).
closeopenenv.core.MCPClientBase.closehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/mcp_client.py#L327[]
Close client resources.
In production MCP mode, this also closes the server-side persistent MCP session (best effort) before closing websocket/provider resources.
Parameters:
_tools_cache : Cached list of tools (populated on first list_tools() call)
list_tools[[openenv.core.MCPClientBase.list_tools]]
Discover available tools from the environment.
Examples:
tools = await env.list_tools()
for tool in tools:
print(f"{tool.name}: {tool.description}")
Parameters:
use_cache (bool, optional, defaults to True) : If True, return cached tools if available. Set to False to force a fresh request.
Returns:
List of Tool objects with name, description, and input_schema.
openenv.core.MCPToolClient[[openenv.core.MCPToolClient]]
Async client for tool-calling style MCP interactions.
Each step invokes a single tool. Use this for traditional function-calling agent patterns where the agent decides which tool to call next.
This client provides convenience methods for tool discovery and invocation:
list_tools(): Get all available tools with their schemascall_tool(name, **kwargs): Invoke a tool by name with arguments
Examples:
async with MCPToolClient(base_url="http://localhost:8000") as env:
# Reset the environment
await env.reset()
# Discover available tools
tools = await env.list_tools()
print([t.name for t in tools]) # ['echo_message', 'echo_with_length']
# Call a tool directly
result = await env.call_tool("echo_message", message="Hello!")
print(result) # "Hello!"
# Or use the full action interface
from openenv.core.env_server.mcp_types import CallToolAction
step_result = await env.step(CallToolAction(
tool_name="echo_with_length",
arguments={"message": "Test"}
))
print(step_result.observation.result)
Sync wrapper:
env = MCPToolClient(base_url="http://localhost:8000").sync()
with env:
tools = env.list_tools()
result = env.call_tool("echo_message", message="Hello!")
call_toolopenenv.core.MCPToolClient.call_toolhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/mcp_client.py#L402[{"name": "name", "val": ": str"}, {"name": "**kwargs", "val": ": typing.Any"}]- name (str) --
Name of the tool to invoke (must match a tool from list_tools()).
- **kwargs --
Arguments to pass to the tool. Must match the tool's input_schema.0The tool's result. The type depends on the tool being called.-
RuntimeError-- If the server returns an error response.RuntimeError
Call a tool by name.
This is a convenience method that creates a CallToolAction, executes it,
and returns the result directly. For more control, use step() with
a CallToolAction directly.
Examples:
result = await env.call_tool("add", a=5, b=3)
print(result) # 8
result = await env.call_tool("greet", name="Claude")
print(result) # "Hello, Claude!"
Parameters:
name (str) : Name of the tool to invoke (must match a tool from list_tools()).
- **kwargs : Arguments to pass to the tool. Must match the tool's input_schema.
Returns:
The tool's result. The type depends on the tool being called.
get_tool[[openenv.core.MCPToolClient.get_tool]]
Get a specific tool by name.
Examples:
tool = await env.get_tool("echo_message")
if tool:
print(tool.description)
print(tool.input_schema)
Parameters:
name (str) : Name of the tool to find.
Returns:
The Tool object if found, None otherwise.
has_tool[[openenv.core.MCPToolClient.has_tool]]
Check if a tool exists.
Parameters:
name (str) : Name of the tool to check.
Returns:
True if the tool exists, False otherwise.
Rubrics[[openenv.core.rubrics.Rubric]]
openenv.core.rubrics.Rubric[[openenv.core.rubrics.Rubric]]
Abstract base class for reward computation.
A Rubric computes a reward signal from an action and observation. Subclasses implement forward() to define the reward logic.
Examples:
class MyRubric(Rubric):
def forward(self, action, observation) -> float:
return 1.0 if action.valid else 0.0
rubric = MyRubric()
reward = rubric(action, observation)
Child rubrics are auto-registered when assigned as attributes, enabling hierarchical composition and introspection.
childrenopenenv.core.rubrics.Rubric.childrenhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/base.py#L149[] Iterate over immediate child rubrics.
forward[[openenv.core.rubrics.Rubric.forward]]
Compute the reward. Implement this in subclasses.
Parameters:
action : The action taken by the agent.
observation : The resulting observation.
Returns:
float
Reward value (typically 0.0 to 1.0).
get_rubric[[openenv.core.rubrics.Rubric.get_rubric]]
Access a nested rubric by dot-separated path.
Parameters:
path (str) : Dot-separated path (e.g., "code.syntax").
Returns:
Rubric
The rubric at the specified path.
load_state_dict[[openenv.core.rubrics.Rubric.load_state_dict]]
Load rubric configuration from checkpoint.
named_children[[openenv.core.rubrics.Rubric.named_children]]
Iterate over immediate child rubrics with names.
named_rubrics[[openenv.core.rubrics.Rubric.named_rubrics]]
Iterate over all descendant rubrics with dot-separated names.
register_forward_hook[[openenv.core.rubrics.Rubric.register_forward_hook]]
Register a hook called after forward().
Parameters:
hook (Callable) : Callable with signature (rubric, action, observation, result).
register_forward_pre_hook[[openenv.core.rubrics.Rubric.register_forward_pre_hook]]
Register a hook called before forward().
Parameters:
hook (Callable) : Callable with signature (rubric, action, observation).
reset[[openenv.core.rubrics.Rubric.reset]]
Reset any internal state. Override in subclasses if needed.
rubrics[[openenv.core.rubrics.Rubric.rubrics]]
Iterate over all descendant rubrics (depth-first).
state_dict[[openenv.core.rubrics.Rubric.state_dict]]
Serialize rubric configuration for checkpointing.
openenv.core.rubrics.Sequential[[openenv.core.rubrics.Sequential]]
Run rubrics in order, fail-fast on zero.
Runs child rubrics in order. If any returns 0, stops immediately and returns 0. This implements hierarchical gating patterns where syntax checks run before execution checks.
Examples:
rubric = Sequential(
Gate(Compiles()),
Gate(PassesTests(), threshold=0.5),
WeightedSum([PassesTests(), StyleRubric()], weights=[0.7, 0.3])
)
forwardopenenv.core.rubrics.Sequential.forwardhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/containers.py#L61[{"name": "action", "val": ": typing.Any"}, {"name": "observation", "val": ": typing.Any"}] Run rubrics in order, return 0 if any returns 0. Sync version.
openenv.core.rubrics.Gate[[openenv.core.rubrics.Gate]]
Threshold wrapper - returns 0 if child score is below threshold.
Useful for hard constraints like "must pass 50% of tests".
Examples:
rubric = Gate(PassesTests(), threshold=0.5)
# Returns PassesTests() score if >= 0.5, else 0.0
forwardopenenv.core.rubrics.Gate.forwardhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/containers.py#L290[{"name": "action", "val": ": typing.Any"}, {"name": "observation", "val": ": typing.Any"}] Return child score if >= threshold, else 0. Sync version.
openenv.core.rubrics.WeightedSum[[openenv.core.rubrics.WeightedSum]]
Weighted combination of child rubrics.
Standard aggregation pattern for multi-criteria evaluation.
Examples:
rubric = WeightedSum(
[PassesTests(), StyleRubric()],
weights=[0.7, 0.3]
)
forwardopenenv.core.rubrics.WeightedSum.forwardhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/containers.py#L377[{"name": "action", "val": ": typing.Any"}, {"name": "observation", "val": ": typing.Any"}] Return weighted sum of child scores. Sync version.
openenv.core.rubrics.RubricList[[openenv.core.rubrics.RubricList]]
Container for dynamic lists of rubrics.
Analogous to nn.ModuleList. Does not define aggregation - use within a parent rubric that implements custom logic.
Examples:
class MultiGameRubric(Rubric):
def __init__(self, games: List[str]):
super().__init__()
self.games = RubricList([GameRubric(g) for g in games])
def forward(self, action, obs) -> float:
return self.games[obs.game_index](action, obs)
appendopenenv.core.rubrics.RubricList.appendhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/containers.py#L494[{"name": "rubric", "val": ": Rubric"}] Add a rubric to the list.
extend[[openenv.core.rubrics.RubricList.extend]]
Add multiple rubrics to the list.
forward[[openenv.core.rubrics.RubricList.forward]]
RubricList does not define aggregation - override in parent.
openenv.core.rubrics.RubricDict[[openenv.core.rubrics.RubricDict]]
Container for named rubrics with keyed access.
Analogous to nn.ModuleDict. Enables keyed access for multi-task environments where different tasks require different rubrics.
Examples:
class AtariRubric(Rubric):
def __init__(self):
super().__init__()
self.games = RubricDict({
"pong": PongRubric(),
"breakout": BreakoutRubric(),
"space_invaders": SpaceInvadersRubric(),
})
def forward(self, action, obs) -> float:
return self.games[obs.game_id](action, obs)
# Access: env.rubric.games["pong"]
forwardopenenv.core.rubrics.RubricDict.forwardhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/containers.py#L553[{"name": "action", "val": ": typing.Any"}, {"name": "observation", "val": ": typing.Any"}] RubricDict does not define aggregation - override in parent.
items[[openenv.core.rubrics.RubricDict.items]]
Iterate over (key, rubric) pairs.
keys[[openenv.core.rubrics.RubricDict.keys]]
Iterate over keys.
update[[openenv.core.rubrics.RubricDict.update]]
Update with rubrics from a dictionary.
values[[openenv.core.rubrics.RubricDict.values]]
Iterate over rubrics.
openenv.core.rubrics.TrajectoryRubric[[openenv.core.rubrics.TrajectoryRubric]]
Abstract base for rubrics that score based on full trajectories.
Subclasses implement:
- score_trajectory(): Compute final score from trajectory
- compute_step_rewards(): Define credit assignment strategy
The call method accumulates steps and returns rewards according to the subclass's implementation.
IMPORTANT: Trajectories are stored in CPU memory to avoid GPU pressure. Environments with GPU tensors in observations must move them to CPU before returning from step().
Known limitation: Very long episodes (thousands of steps) may consume significant CPU memory. For such cases, consider streaming rubrics.
Examples:
class WinLossRubric(TrajectoryRubric):
def score_trajectory(self, trajectory):
_, final_obs = trajectory[-1]
return 1.0 if final_obs.metadata.get('won') else 0.0
def compute_step_rewards(self):
# Equal credit to all steps
score = self.score_trajectory(self._trajectory)
return [score] * len(self._trajectory)
rubric = WinLossRubric()
for action, obs in episode:
reward = rubric(action, obs) # 0.0 until done
step_rewards = rubric.compute_step_rewards() # Credit assignment
compute_step_rewardsopenenv.core.rubrics.TrajectoryRubric.compute_step_rewardshttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/trajectory.py#L111[]`list[float]`Rewards, one per step. Length matches len(trajectory). Compute per-step rewards from the accumulated trajectory.
Define your credit assignment strategy here (e.g., discounting, assigning all credit to specific steps, etc.).
Returns:
list[float]
Rewards, one per step. Length matches len(trajectory).
forward[[openenv.core.rubrics.TrajectoryRubric.forward]]
Accumulate step and return reward.
Returns intermediate_reward until done, then computes trajectory score.
Parameters:
action : The action taken.
observation : The resulting observation. Must have a 'done' attribute.
Returns:
float
intermediate_reward if not done, else score_trajectory() result.
load_state_dict[[openenv.core.rubrics.TrajectoryRubric.load_state_dict]]
Load configuration from checkpoint.
reset[[openenv.core.rubrics.TrajectoryRubric.reset]]
Clear accumulated trajectory. Call on env.reset().
score_trajectory[[openenv.core.rubrics.TrajectoryRubric.score_trajectory]]
Score the complete trajectory. Return 0.0-1.0.
Called when observation.done=True.
Parameters:
trajectory (list) : List of (action, observation) tuples.
Returns:
float
Final trajectory score (typically 0.0 to 1.0).
state_dict[[openenv.core.rubrics.TrajectoryRubric.state_dict]]
Serialize configuration (not trajectory data).
openenv.core.rubrics.ExponentialDiscountingTrajectoryRubric[[openenv.core.rubrics.ExponentialDiscountingTrajectoryRubric]]
TrajectoryRubric with exponential discounting for credit assignment.
Per-step reward:
r_t = gamma^(T-1-t) * R_final
With gamma=0.99, later steps get higher reward (they're "closer" to the outcome). With gamma=1.0, all steps get equal reward. With gamma=0.0, only the final step gets reward.
This is the standard temporal discounting used in reinforcement learning, applied retroactively once the episode outcome is known.
Examples:
class ChessRubric(ExponentialDiscountingTrajectoryRubric):
def score_trajectory(self, trajectory):
_, final_obs = trajectory[-1]
outcome = final_obs.metadata.get('winner')
if outcome == 'agent': return 1.0
elif outcome == 'opponent': return 0.0
else: return 0.5 # Draw
rubric = ChessRubric(gamma=0.99)
reward = rubric(action, obs) # 0.0 until done, then final score
step_rewards = rubric.compute_step_rewards() # Discounted per-step rewards
compute_step_rewardsopenenv.core.rubrics.ExponentialDiscountingTrajectoryRubric.compute_step_rewardshttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/trajectory.py#L191[]`list[float]`Discounted rewards where step_rewards[t] = gamma^(T-1-t) * R_final,
T is the trajectory length and R_final is score_trajectory().
Apply exponential discounting from final reward.
Returns:
list[float]
Discounted rewards where step_rewards[t] = gamma^(T-1-t) * R_final,
T is the trajectory length and R_final is score_trajectory().
state_dict[[openenv.core.rubrics.ExponentialDiscountingTrajectoryRubric.state_dict]]
Serialize configuration.
openenv.core.rubrics.LLMJudge[[openenv.core.rubrics.LLMJudge]]
Rubric that uses an LLM to evaluate agent actions/observations.
The prompt template is formatted with {action} and {observation}
placeholders. The LLM response is parsed for a numeric score.
forwardopenenv.core.rubrics.LLMJudge.forwardhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/rubrics/llm_judge.py#L68[{"name": "action", "val": ": typing.Any"}, {"name": "observation", "val": ": typing.Any"}]- action -- The action taken by the agent.
- observation -- The resulting observation.0
floatParsed score from the LLM response. Evaluate by sending a prompt to the LLM and parsing the score.
Parameters:
prompt_template (str) : Template string with {action} and {observation} placeholders.
client (LLMClient) : An LLMClient instance for making LLM calls.
score_pattern (str, optional) : Regex to extract the score from the LLM response. Defaults to matching the first decimal number.
default_score (float, optional, defaults to 0.0) : Score returned when parsing fails.
normalize (bool, optional, defaults to True) : If True, clamp extracted score to [0, 1].
Returns:
float
Parsed score from the LLM response.
state_dict[[openenv.core.rubrics.LLMJudge.state_dict]]
Serialize rubric configuration.
Tools[[openenv.core.tools.RepoInfo]]
openenv.core.tools.RepoInfo[[openenv.core.tools.RepoInfo]]
Information about a repository.
openenv.core.tools.GitServerClient[[openenv.core.tools.GitServerClient]]
Client for connecting to an external Gitea server.
This client is optimized for task-based isolation where:
- Multiple tasks share the same Gitea instance
- Each task has its own isolated workspace
- Fast reset() via git operations (no server restart)
- Repos are pre-migrated to Gitea once
Examples:
import os
# Connect to shared Gitea (credentials from environment)
client = GitServerClient(
gitea_url=os.getenv("GITEA_URL"),
username=os.getenv("GITEA_USERNAME"),
password=os.getenv("GITEA_PASSWORD")
)
client.wait_for_ready()
# Clone repo to workspace
path = client.clone_to_workspace("my-repo", commit="abc123")
# Fast reset to base state
client.reset_workspace("my-repo", commit="abc123")
clone_to_workspaceopenenv.core.tools.GitServerClient.clone_to_workspacehttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/tools/git_server_client.py#L184[{"name": "repo_name", "val": ": str"}, {"name": "target_dir", "val": ": str | None = None"}, {"name": "commit", "val": ": str = 'main'"}]- repo_name (str) --
Name of repository to clone.
- target_dir (
str, optional) -- Target directory name. Defaults torepo_name. - commit (
str, optional, defaults to"main") -- Commit hash or branch to check out.0strPath to cloned repository.-RuntimeError-- If clone fails.RuntimeError
Clone a repository to the workspace at a specific commit.
This creates a fresh clone optimized for task isolation.
Parameters:
gitea_url : URL of the Gitea server (e.g., "http://gitea:3000")
username : Gitea username for authentication
password : Gitea password for authentication
workspace_dir : Local workspace directory for cloning repos
Returns:
str
Path to cloned repository.
execute_git_command[[openenv.core.tools.GitServerClient.execute_git_command]]
Execute a git command in the workspace.
Parameters:
command (str) : Git command to execute (without git prefix).
working_dir (str, optional, defaults to "") : Working directory relative to workspace.
Returns:
tuple of (exit_code, stdout, stderr).
get_current_commit[[openenv.core.tools.GitServerClient.get_current_commit]]
Get current commit hash of a workspace repository.
Parameters:
repo_name (str) : Name of repository in workspace.
Returns:
str
Commit hash.
list_repositories[[openenv.core.tools.GitServerClient.list_repositories]]
List all repositories in Gitea.
Returns:
list of repository information dictionaries.
reset_workspace[[openenv.core.tools.GitServerClient.reset_workspace]]
Fast reset of workspace to base state (optimized for task resets).
This is much faster than re-cloning. It:
- Checks out the target commit
- Resets to that commit (hard)
- Cleans untracked files
Parameters:
repo_name (str) : Name of repository (directory in workspace).
commit (str, optional, defaults to "main") : Commit hash or branch to reset to.
Returns:
bool
True if reset successful.
wait_for_ready[[openenv.core.tools.GitServerClient.wait_for_ready]]
Wait for Gitea server to be ready.
Parameters:
timeout (int, optional, defaults to 30) : Maximum seconds to wait.
Returns:
bool
True if server is ready, False otherwise.
workspace_exists[[openenv.core.tools.GitServerClient.workspace_exists]]
Check if a repository exists in workspace.
Container providers[[openenv.core.containers.runtime.ContainerProvider]]
openenv.core.containers.runtime.ContainerProvider[[openenv.core.containers.runtime.ContainerProvider]]
Abstract base class for container providers.
Providers implement this interface to support different container platforms:
- LocalDockerProvider: Runs containers on local Docker daemon
- KubernetesProvider: Runs containers in Kubernetes cluster
- FargateProvider: Runs containers on AWS Fargate
- CloudRunProvider: Runs containers on Google Cloud Run
The provider manages a single container lifecycle and provides the base URL for connecting to it.
Examples:
provider = LocalDockerProvider()
base_url = provider.start_container("echo-env:latest")
print(base_url) # http://localhost:8000
# Use the environment via base_url
provider.stop_container()
start_containeropenenv.core.containers.runtime.ContainerProvider.start_containerhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/containers/runtime/providers.py#L44[{"name": "image", "val": ": str"}, {"name": "port", "val": ": Optional[int] = None"}, {"name": "env_vars", "val": ": Optional[Dict[str, str]] = None"}, {"name": "**kwargs", "val": ": Any"}]- image (str) --
Container image name (e.g., "echo-env:latest").
- port (
int, optional) -- Port to expose. IfNone, the provider chooses. - env_vars (
dict, optional) -- Environment variables to pass to container. - **kwargs --
Provider-specific options.0
strBase URL to connect to the container (e.g.,"http://localhost:8000").-RuntimeError-- If container fails to start.RuntimeError
Start a container from the specified image.
Parameters:
image (str) : Container image name (e.g., "echo-env:latest").
port (int, optional) : Port to expose. If None, the provider chooses.
env_vars (dict, optional) : Environment variables to pass to container.
- **kwargs : Provider-specific options.
Returns:
str
Base URL to connect to the container (e.g., "http://localhost:8000").
stop_container[[openenv.core.containers.runtime.ContainerProvider.stop_container]]
Stop and remove the running container.
This cleans up the container that was started by start_container().
wait_for_ready[[openenv.core.containers.runtime.ContainerProvider.wait_for_ready]]
Wait for the container to be ready to accept requests.
This typically polls the /health endpoint until it returns 200.
Parameters:
base_url (str) : Base URL of the container.
timeout_s (float, optional, defaults to 30.0) : Maximum time to wait in seconds.
openenv.core.containers.runtime.LocalDockerProvider[[openenv.core.containers.runtime.LocalDockerProvider]]
Container provider for local Docker daemon.
This provider runs containers on the local machine using Docker. Useful for development and testing.
Examples:
provider = LocalDockerProvider()
base_url = provider.start_container("echo-env:latest")
# Container running on http://localhost:
provider.stop_container()
start_containeropenenv.core.containers.runtime.LocalDockerProvider.start_containerhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/containers/runtime/providers.py#L142[{"name": "image", "val": ": str"}, {"name": "port", "val": ": Optional[int] = None"}, {"name": "env_vars", "val": ": Optional[Dict[str, str]] = None"}, {"name": "**kwargs", "val": ": Any"}]- image (str) --
Docker image name.
- port (
int, optional) -- Port to expose. IfNone, finds an available port. - env_vars (
dict, optional) -- Environment variables for the container. - **kwargs --
Additional Docker run options.0
strBase URL to connect to the container.
Start a Docker container locally.
Parameters:
image (str) : Docker image name.
port (int, optional) : Port to expose. If None, finds an available port.
env_vars (dict, optional) : Environment variables for the container.
- **kwargs : Additional Docker run options.
Returns:
str
Base URL to connect to the container.
stop_container[[openenv.core.containers.runtime.LocalDockerProvider.stop_container]]
Stop and remove the Docker container.
wait_for_ready[[openenv.core.containers.runtime.LocalDockerProvider.wait_for_ready]]
Wait for container to be ready by polling /health endpoint.
Parameters:
base_url (str) : Base URL of the container.
timeout_s (float, optional, defaults to 30.0) : Maximum time to wait in seconds.
openenv.core.containers.runtime.DockerSwarmProvider[[openenv.core.containers.runtime.DockerSwarmProvider]]
Container provider that uses Docker Swarm services for local concurrency.
This provider creates a replicated Swarm service backed by the local Docker engine. The built-in load-balancer fans requests across the replicas, allowing multiple container instances to run concurrently on the developer workstation (mirroring the workflow described in the Docker stack docs).
start_containeropenenv.core.containers.runtime.DockerSwarmProvider.start_containerhttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/containers/runtime/providers.py#L347[{"name": "image", "val": ": str"}, {"name": "port", "val": ": Optional[int] = None"}, {"name": "env_vars", "val": ": Optional[Dict[str, str]] = None"}, {"name": "**kwargs", "val": ": Any"}]- image (str) --
Docker image name.
- port (
int, optional) -- Port to expose. IfNone, finds an available port. - env_vars (
dict, optional) -- Environment variables for the container. - replicas (
int, optional, defaults to2) -- Number of container replicas. - cpu_limit (
floatorstr, optional) -- CPU limit passed to--limit-cpu. - memory_limit (
str, optional) -- Memory limit passed to--limit-memory. - constraints (
Sequence[str], optional) -- Placement constraints. - labels (
dict, optional) -- Service labels. - command (
Sequence[str]orstr, optional) -- Override container command.0strBase URL to connect to the service.
Start (or scale) a Swarm service for the given image.
Parameters:
image (str) : Docker image name.
port (int, optional) : Port to expose. If None, finds an available port.
env_vars (dict, optional) : Environment variables for the container.
replicas (int, optional, defaults to 2) : Number of container replicas.
cpu_limit (float or str, optional) : CPU limit passed to --limit-cpu.
memory_limit (str, optional) : Memory limit passed to --limit-memory.
constraints (Sequence[str], optional) : Placement constraints.
labels (dict, optional) : Service labels.
command (Sequence[str] or str, optional) : Override container command.
Returns:
str
Base URL to connect to the service.
stop_container[[openenv.core.containers.runtime.DockerSwarmProvider.stop_container]]
Remove the Swarm service (and keep the Swarm manager running).
wait_for_ready[[openenv.core.containers.runtime.DockerSwarmProvider.wait_for_ready]]
Wait for at least one replica to become healthy by polling /health.
With Swarm's load balancer, requests round-robin across replicas, so this only verifies that at least one replica is responding. Some replicas may still be starting when this returns.
openenv.core.containers.runtime.KubernetesProvider[[openenv.core.containers.runtime.KubernetesProvider]]
Container provider for Kubernetes clusters.
This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding.
Examples:
provider = KubernetesProvider(namespace="envtorch-dev")
base_url = provider.start_container("echo-env:latest")
# Pod running in k8s, accessible via service or port-forward
provider.stop_container()
openenv.core.containers.runtime.RuntimeProvider[[openenv.core.containers.runtime.RuntimeProvider]]
Abstract base class for runtime providers that are not container providers. Providers implement this interface to support different runtime platforms:
- UVProvider: Runs environments via
uv run
The provider manages a single runtime lifecycle and provides the base URL for connecting to it.
Examples:
provider = UVProvider(project_path="/path/to/env")
base_url = provider.start()
print(base_url) # http://localhost:8000
provider.stop()
startopenenv.core.containers.runtime.RuntimeProvider.starthttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/containers/runtime/providers.py#L667[{"name": "port", "val": ": Optional[int] = None"}, {"name": "env_vars", "val": ": Optional[Dict[str, str]] = None"}, {"name": "**kwargs", "val": ": Any"}]- port (int, optional) --
Port to expose. If None, the provider chooses.
- env_vars (
dict, optional) -- Environment variables for the runtime. - **kwargs --
Additional runtime options.0
strBase URL to connect to the runtime.
Start the runtime.
Parameters:
port (int, optional) : Port to expose. If None, the provider chooses.
env_vars (dict, optional) : Environment variables for the runtime.
- **kwargs : Additional runtime options.
Returns:
str
Base URL to connect to the runtime.
stop[[openenv.core.containers.runtime.RuntimeProvider.stop]]
Stop the runtime.
wait_for_ready[[openenv.core.containers.runtime.RuntimeProvider.wait_for_ready]]
Wait for the runtime to be ready to accept requests.
openenv.core.containers.runtime.UVProvider[[openenv.core.containers.runtime.UVProvider]]
RuntimeProvider implementation backed by uv run.
Examples:
provider = UVProvider(project_path="/path/to/env")
base_url = provider.start()
print(base_url) # http://localhost:8000
# Use the environment via base_url
provider.stop()
startopenenv.core.containers.runtime.UVProvider.starthttps://github.com/huggingface/openenv/blob/vr_813/openenv/core/containers/runtime/uv_provider.py#L131[{"name": "port", "val": ": Optional[int] = None"}, {"name": "env_vars", "val": ": Optional[Dict[str, str]] = None"}, {"name": "workers", "val": ": int = 1"}, {"name": "**_", "val": ": Dict[str, str]"}]- port (int, optional) --
The port to bind the environment to.
- env_vars (
dict, optional) -- Environment variables to pass to the environment. - workers (
int, optional, defaults to1) -- The number of workers to use.0strBase URL of the environment.-RuntimeError-- If the environment is already running.RuntimeError
Start the environment via uv run.
Parameters:
project_path (str) : Local path to a uv project (passed to uv run --project).
app (str, optional, defaults to "server.app --app"): ASGI application path for uvicorn.
host (str, optional, defaults to "0.0.0.0") : Host interface to bind to.
reload (bool, optional, defaults to False) : Whether to enable uvicorn's reload mode.
env_vars (dict, optional) : Environment variables to pass through to the spawned process.
context_timeout_s (float, optional, defaults to 60.0) : How long to wait for the environment to become ready.
Returns:
str
Base URL of the environment.
stop[[openenv.core.containers.runtime.UVProvider.stop]]
Stop the environment.
wait_for_ready[[openenv.core.containers.runtime.UVProvider.wait_for_ready]]
Wait for the environment to become ready.
Parameters:
timeout_s (float, optional, defaults to 60.0) : Maximum time in seconds to wait for the environment to become ready.
Xet Storage Details
- Size:
- 92.4 kB
- Xet hash:
- 52e5993fa58dd1a157a6cd55e895435a5fd49f0390976f4bea0daf5f665d7dc3
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.