compute_market_env / client.py
kyars's picture
Upload folder using huggingface_hub
81321bc verified
"""Compute Market environment client."""
from __future__ import annotations
from typing import Any
from openenv.core.client_types import StepResult
from openenv.core.env_client import EnvClient
from .models import (
ActorProfile,
ActorSignal,
ComputeMarketAction,
ComputeMarketObservation,
ComputeMarketState,
JobRecord,
MarketEvent,
MarketOffer,
ReservationRecord,
)
class ComputeMarketEnv(
EnvClient[ComputeMarketAction, ComputeMarketObservation, ComputeMarketState]
):
"""Persistent client for the compute market environment."""
def _step_payload(self, action: ComputeMarketAction) -> dict[str, Any]:
return action.model_dump(exclude_none=True)
def _parse_result(self, payload: dict[str, Any]) -> StepResult[ComputeMarketObservation]:
obs_data = payload.get("observation", {})
observation = ComputeMarketObservation(
scenario_variant=obs_data.get("scenario_variant", "baseline"),
current_tick=obs_data.get("current_tick", 0),
max_ticks=obs_data.get("max_ticks", 0),
total_gpus=obs_data.get("total_gpus", 0),
free_gpus=obs_data.get("free_gpus", 0),
owned_gpus=obs_data.get("owned_gpus", 0),
idle_owned_gpus=obs_data.get("idle_owned_gpus", 0),
budget_remaining=obs_data.get("budget_remaining", 0.0),
market_price=obs_data.get("market_price", 0.0),
jobs=[JobRecord(**item) for item in obs_data.get("jobs", [])],
visible_offers=[MarketOffer(**item) for item in obs_data.get("visible_offers", [])],
recent_events=[MarketEvent(**item) for item in obs_data.get("recent_events", [])],
actor_signals=[ActorSignal(**item) for item in obs_data.get("actor_signals", [])],
done=payload.get("done", False),
reward=payload.get("reward", 0.0),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward", 0.0),
done=payload.get("done", False),
)
def _parse_state(self, payload: dict[str, Any]) -> ComputeMarketState:
return ComputeMarketState(
episode_id=payload.get("episode_id", ""),
step_count=payload.get("step_count", 0),
scenario_seed=payload.get("scenario_seed", 0),
scenario_variant=payload.get("scenario_variant", "baseline"),
current_tick=payload.get("current_tick", 0),
max_ticks=payload.get("max_ticks", 0),
total_gpus=payload.get("total_gpus", 0),
free_gpus=payload.get("free_gpus", 0),
owned_gpus=payload.get("owned_gpus", 0),
idle_owned_gpus=payload.get("idle_owned_gpus", 0),
budget_remaining=payload.get("budget_remaining", 0.0),
market_price=payload.get("market_price", 0.0),
cumulative_reward=payload.get("cumulative_reward", 0.0),
external_allocated_gpus=payload.get("external_allocated_gpus", 0),
done=payload.get("done", False),
jobs=[JobRecord(**item) for item in payload.get("jobs", [])],
visible_offers=[MarketOffer(**item) for item in payload.get("visible_offers", [])],
reservations=[ReservationRecord(**item) for item in payload.get("reservations", [])],
actor_signals=[ActorSignal(**item) for item in payload.get("actor_signals", [])],
hidden_actors=[ActorProfile(**item) for item in payload.get("hidden_actors", [])],
recent_events=[MarketEvent(**item) for item in payload.get("recent_events", [])],
)