| """Compute Market environment client.""" | |
| from __future__ import annotations | |
| from typing import Any | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_client import EnvClient | |
| from .models import ( | |
| ActorProfile, | |
| ActorSignal, | |
| ComputeMarketAction, | |
| ComputeMarketObservation, | |
| ComputeMarketState, | |
| JobRecord, | |
| MarketEvent, | |
| MarketOffer, | |
| ReservationRecord, | |
| ) | |
| class ComputeMarketEnv( | |
| EnvClient[ComputeMarketAction, ComputeMarketObservation, ComputeMarketState] | |
| ): | |
| """Persistent client for the compute market environment.""" | |
| def _step_payload(self, action: ComputeMarketAction) -> dict[str, Any]: | |
| return action.model_dump(exclude_none=True) | |
| def _parse_result(self, payload: dict[str, Any]) -> StepResult[ComputeMarketObservation]: | |
| obs_data = payload.get("observation", {}) | |
| observation = ComputeMarketObservation( | |
| scenario_variant=obs_data.get("scenario_variant", "baseline"), | |
| current_tick=obs_data.get("current_tick", 0), | |
| max_ticks=obs_data.get("max_ticks", 0), | |
| total_gpus=obs_data.get("total_gpus", 0), | |
| free_gpus=obs_data.get("free_gpus", 0), | |
| owned_gpus=obs_data.get("owned_gpus", 0), | |
| idle_owned_gpus=obs_data.get("idle_owned_gpus", 0), | |
| budget_remaining=obs_data.get("budget_remaining", 0.0), | |
| market_price=obs_data.get("market_price", 0.0), | |
| jobs=[JobRecord(**item) for item in obs_data.get("jobs", [])], | |
| visible_offers=[MarketOffer(**item) for item in obs_data.get("visible_offers", [])], | |
| recent_events=[MarketEvent(**item) for item in obs_data.get("recent_events", [])], | |
| actor_signals=[ActorSignal(**item) for item in obs_data.get("actor_signals", [])], | |
| done=payload.get("done", False), | |
| reward=payload.get("reward", 0.0), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward", 0.0), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: dict[str, Any]) -> ComputeMarketState: | |
| return ComputeMarketState( | |
| episode_id=payload.get("episode_id", ""), | |
| step_count=payload.get("step_count", 0), | |
| scenario_seed=payload.get("scenario_seed", 0), | |
| scenario_variant=payload.get("scenario_variant", "baseline"), | |
| current_tick=payload.get("current_tick", 0), | |
| max_ticks=payload.get("max_ticks", 0), | |
| total_gpus=payload.get("total_gpus", 0), | |
| free_gpus=payload.get("free_gpus", 0), | |
| owned_gpus=payload.get("owned_gpus", 0), | |
| idle_owned_gpus=payload.get("idle_owned_gpus", 0), | |
| budget_remaining=payload.get("budget_remaining", 0.0), | |
| market_price=payload.get("market_price", 0.0), | |
| cumulative_reward=payload.get("cumulative_reward", 0.0), | |
| external_allocated_gpus=payload.get("external_allocated_gpus", 0), | |
| done=payload.get("done", False), | |
| jobs=[JobRecord(**item) for item in payload.get("jobs", [])], | |
| visible_offers=[MarketOffer(**item) for item in payload.get("visible_offers", [])], | |
| reservations=[ReservationRecord(**item) for item in payload.get("reservations", [])], | |
| actor_signals=[ActorSignal(**item) for item in payload.get("actor_signals", [])], | |
| hidden_actors=[ActorProfile(**item) for item in payload.get("hidden_actors", [])], | |
| recent_events=[MarketEvent(**item) for item in payload.get("recent_events", [])], | |
| ) | |