kyars commited on
Commit
53f7ded
·
verified ·
1 Parent(s): 4f45a7b

Upload folder using huggingface_hub

Browse files
Dockerfile ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ARG BASE_IMAGE=ghcr.io/meta-pytorch/openenv-base:latest
2
+ FROM ${BASE_IMAGE} AS builder
3
+
4
+ WORKDIR /app
5
+ RUN apt-get update && \
6
+ apt-get install -y --no-install-recommends git && \
7
+ rm -rf /var/lib/apt/lists/*
8
+
9
+ COPY . /app/env
10
+ WORKDIR /app/env
11
+
12
+ RUN if ! command -v uv >/dev/null 2>&1; then \
13
+ curl -LsSf https://astral.sh/uv/install.sh | sh && \
14
+ mv /root/.local/bin/uv /usr/local/bin/uv && \
15
+ mv /root/.local/bin/uvx /usr/local/bin/uvx; \
16
+ fi
17
+
18
+ RUN --mount=type=cache,target=/root/.cache/uv \
19
+ uv sync --no-install-project --no-editable
20
+ RUN --mount=type=cache,target=/root/.cache/uv \
21
+ uv sync --no-editable
22
+
23
+ FROM ${BASE_IMAGE}
24
+ WORKDIR /app
25
+ COPY --from=builder /app/env/.venv /app/.venv
26
+ COPY --from=builder /app/env /app/env
27
+ ENV PATH="/app/.venv/bin:$PATH"
28
+ ENV PYTHONPATH="/app/env:$PYTHONPATH"
29
+ HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
30
+ CMD curl -f http://localhost:8000/health || exit 1
31
+ ENV ENABLE_WEB_INTERFACE=true
32
+ CMD ["sh", "-c", "cd /app/env && uvicorn server.app:app --host 0.0.0.0 --port 8000"]
README.md CHANGED
@@ -1,10 +1,83 @@
1
  ---
2
- title: Compute Market Env
3
- emoji: 🚀
4
  colorFrom: blue
5
- colorTo: indigo
6
  sdk: docker
7
  pinned: false
 
 
 
 
 
 
 
8
  ---
9
 
10
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  ---
2
+ title: Compute Market Environment Server
3
+ emoji: 📊
4
  colorFrom: blue
5
+ colorTo: green
6
  sdk: docker
7
  pinned: false
8
+ app_port: 8000
9
+ base_path: /web
10
+ tags:
11
+ - openenv
12
+ - multi-agent
13
+ - compute-allocation
14
+ - market-simulation
15
  ---
16
 
17
+ # Compute Market Environment
18
+
19
+ An OpenEnv environment for training a single allocator/trader in a scarce-GPU market with scripted background actors, hidden incentives, delayed rewards, and partial observability.
20
+
21
+ ## What v1 implements
22
+
23
+ - One trained agent: the allocator/trader
24
+ - Scripted counterparties: urgent tenant, cost-sensitive tenant, broker
25
+ - Jobs with deadlines, value, dependencies, and delayed payoff
26
+ - Actions: `bid_for_capacity`, `accept_offer`, `propose_swap`, `schedule_job`, `delay_job`, `inspect_market`, `noop`
27
+ - Reward = completed job value minus compute spend, missed-deadline penalties, and idle-hoarding penalties
28
+ - Separate training helper for TRL/Colab in `training/minimal_grpo_rollout.py`
29
+
30
+ ## Quick Start
31
+
32
+ ```python
33
+ from compute_market_env import ComputeMarketAction, ComputeMarketEnv
34
+
35
+ with ComputeMarketEnv(base_url="http://localhost:8000") as env:
36
+ result = env.reset(seed=7)
37
+ print(result.observation.market_price)
38
+ print(result.observation.free_gpus)
39
+
40
+ result = env.step(
41
+ ComputeMarketAction(
42
+ action_type="bid_for_capacity",
43
+ gpu_count=4,
44
+ price_per_gpu=6.5,
45
+ duration=3,
46
+ )
47
+ )
48
+ print(result.reward)
49
+ print(result.observation.budget_remaining)
50
+ ```
51
+
52
+ ## Local Development
53
+
54
+ ```bash
55
+ uv sync --extra dev
56
+ uv run pytest -q
57
+ uv run uvicorn server.app:app --host 0.0.0.0 --port 8000
58
+ openenv validate --verbose
59
+ ```
60
+
61
+ ## Docker
62
+
63
+ ```bash
64
+ docker build -t compute-market-env:latest -f server/Dockerfile .
65
+ docker run -p 8000:8000 compute-market-env:latest
66
+ ```
67
+
68
+ ## Environment Loop
69
+
70
+ 1. Agent observes market price, public free GPUs, visible offers, jobs, and public actor signals.
71
+ 2. Agent takes one action.
72
+ 3. The environment advances one tick.
73
+ 4. Scripted actors update demand and offers.
74
+ 5. Jobs progress, complete, pause, or miss deadlines.
75
+ 6. The environment returns the next observation and realized reward for that tick.
76
+
77
+ ## Training
78
+
79
+ For a minimal Colab/TRL integration, use `training/minimal_grpo_rollout.py` and start from a small model notebook such as Qwen 4B or Llama 3.2 3B. The helper script is designed to be pasted into a Colab notebook and wired into GRPO rollouts.
80
+
81
+ ## Oversight
82
+
83
+ Scalable oversight is intentionally not joint-trained in v1. The recommended phase 2 is to log trajectories from this environment and train a separate auditor on replayed episodes.
Untitled ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ - - **Snorkel AI:** Simulated Experts-in-the-Loop: Environment that simulates interactions with real subject-matter experts, with changing requirements / preferences. - - **Fleet AI:** Scalable Oversight: Environments that train oversight agents to monitor, analyze, and explain the behavior of other AI agents operating in complex, multi-agent settings.
2
+
3
+ thoughts on compatbility with the above two envs
__init__.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Compute Market environment exports."""
2
+
3
+ from .client import ComputeMarketEnv
4
+ from .models import (
5
+ ActorProfile,
6
+ ActorSignal,
7
+ ComputeMarketAction,
8
+ ComputeMarketObservation,
9
+ ComputeMarketState,
10
+ JobRecord,
11
+ MarketEvent,
12
+ MarketOffer,
13
+ ReservationRecord,
14
+ )
15
+
16
+ __all__ = [
17
+ "ActorProfile",
18
+ "ActorSignal",
19
+ "ComputeMarketAction",
20
+ "ComputeMarketEnv",
21
+ "ComputeMarketObservation",
22
+ "ComputeMarketState",
23
+ "JobRecord",
24
+ "MarketEvent",
25
+ "MarketOffer",
26
+ "ReservationRecord",
27
+ ]
client.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Compute Market environment client."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any
6
+
7
+ from openenv.core.client_types import StepResult
8
+ from openenv.core.env_client import EnvClient
9
+
10
+ from .models import (
11
+ ActorProfile,
12
+ ActorSignal,
13
+ ComputeMarketAction,
14
+ ComputeMarketObservation,
15
+ ComputeMarketState,
16
+ JobRecord,
17
+ MarketEvent,
18
+ MarketOffer,
19
+ ReservationRecord,
20
+ )
21
+
22
+
23
+ class ComputeMarketEnv(
24
+ EnvClient[ComputeMarketAction, ComputeMarketObservation, ComputeMarketState]
25
+ ):
26
+ """Persistent client for the compute market environment."""
27
+
28
+ def _step_payload(self, action: ComputeMarketAction) -> dict[str, Any]:
29
+ return action.model_dump(exclude_none=True)
30
+
31
+ def _parse_result(self, payload: dict[str, Any]) -> StepResult[ComputeMarketObservation]:
32
+ obs_data = payload.get("observation", {})
33
+ observation = ComputeMarketObservation(
34
+ current_tick=obs_data.get("current_tick", 0),
35
+ max_ticks=obs_data.get("max_ticks", 0),
36
+ total_gpus=obs_data.get("total_gpus", 0),
37
+ free_gpus=obs_data.get("free_gpus", 0),
38
+ owned_gpus=obs_data.get("owned_gpus", 0),
39
+ idle_owned_gpus=obs_data.get("idle_owned_gpus", 0),
40
+ budget_remaining=obs_data.get("budget_remaining", 0.0),
41
+ market_price=obs_data.get("market_price", 0.0),
42
+ jobs=[JobRecord(**item) for item in obs_data.get("jobs", [])],
43
+ visible_offers=[MarketOffer(**item) for item in obs_data.get("visible_offers", [])],
44
+ recent_events=[MarketEvent(**item) for item in obs_data.get("recent_events", [])],
45
+ actor_signals=[ActorSignal(**item) for item in obs_data.get("actor_signals", [])],
46
+ done=payload.get("done", False),
47
+ reward=payload.get("reward", 0.0),
48
+ metadata=obs_data.get("metadata", {}),
49
+ )
50
+ return StepResult(
51
+ observation=observation,
52
+ reward=payload.get("reward", 0.0),
53
+ done=payload.get("done", False),
54
+ )
55
+
56
+ def _parse_state(self, payload: dict[str, Any]) -> ComputeMarketState:
57
+ return ComputeMarketState(
58
+ episode_id=payload.get("episode_id", ""),
59
+ step_count=payload.get("step_count", 0),
60
+ scenario_seed=payload.get("scenario_seed", 0),
61
+ current_tick=payload.get("current_tick", 0),
62
+ max_ticks=payload.get("max_ticks", 0),
63
+ total_gpus=payload.get("total_gpus", 0),
64
+ free_gpus=payload.get("free_gpus", 0),
65
+ owned_gpus=payload.get("owned_gpus", 0),
66
+ idle_owned_gpus=payload.get("idle_owned_gpus", 0),
67
+ budget_remaining=payload.get("budget_remaining", 0.0),
68
+ market_price=payload.get("market_price", 0.0),
69
+ cumulative_reward=payload.get("cumulative_reward", 0.0),
70
+ external_allocated_gpus=payload.get("external_allocated_gpus", 0),
71
+ done=payload.get("done", False),
72
+ jobs=[JobRecord(**item) for item in payload.get("jobs", [])],
73
+ visible_offers=[MarketOffer(**item) for item in payload.get("visible_offers", [])],
74
+ reservations=[ReservationRecord(**item) for item in payload.get("reservations", [])],
75
+ actor_signals=[ActorSignal(**item) for item in payload.get("actor_signals", [])],
76
+ hidden_actors=[ActorProfile(**item) for item in payload.get("hidden_actors", [])],
77
+ recent_events=[MarketEvent(**item) for item in payload.get("recent_events", [])],
78
+ )
examples/random_policy.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tiny local smoke example for the compute market environment."""
2
+
3
+ from compute_market_env import ComputeMarketAction, ComputeMarketEnv
4
+
5
+
6
+ with ComputeMarketEnv(base_url="http://localhost:8000") as env:
7
+ result = env.reset(seed=5)
8
+ print("reset", result.observation.market_price, result.observation.free_gpus)
9
+
10
+ result = env.step(
11
+ ComputeMarketAction(
12
+ action_type="bid_for_capacity",
13
+ gpu_count=min(4, max(1, result.observation.free_gpus)),
14
+ price_per_gpu=max(6.0, result.observation.market_price + 0.5),
15
+ duration=3,
16
+ )
17
+ )
18
+ print("bid", result.reward)
19
+
20
+ result = env.step(ComputeMarketAction(action_type="schedule_job", job_id="job-a"))
21
+ print("schedule", result.reward)
22
+
23
+ while not result.done:
24
+ result = env.step(ComputeMarketAction(action_type="noop"))
25
+ print("tick", result.observation.current_tick, result.reward)
models.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Data models for the Compute Market environment."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Literal
6
+
7
+ from openenv.core.env_server.types import Action, Observation, State
8
+ from pydantic import BaseModel, Field
9
+
10
+
11
+ class JobRecord(BaseModel):
12
+ """Represents an agent-owned job in the simulated compute market."""
13
+
14
+ job_id: str
15
+ gpu_count: int = Field(..., ge=1)
16
+ total_duration: int = Field(..., ge=1)
17
+ remaining_duration: int = Field(..., ge=0)
18
+ deadline: int = Field(..., ge=1)
19
+ value: float = Field(..., ge=0.0)
20
+ priority: int = Field(default=1, ge=1)
21
+ depends_on: list[str] = Field(default_factory=list)
22
+ status: Literal["pending", "running", "paused", "completed", "missed"] = "pending"
23
+ delay_count: int = Field(default=0, ge=0)
24
+ started_at: int | None = None
25
+ completed_at: int | None = None
26
+
27
+
28
+ class MarketOffer(BaseModel):
29
+ """Visible capacity offer from a scripted actor."""
30
+
31
+ offer_id: str
32
+ actor_id: str
33
+ gpu_count: int = Field(..., ge=1)
34
+ price_per_gpu: float = Field(..., ge=0.0)
35
+ duration: int = Field(..., ge=1)
36
+ expires_at_tick: int = Field(..., ge=0)
37
+ offer_type: Literal["broker", "swap"] = "broker"
38
+
39
+
40
+ class ReservationRecord(BaseModel):
41
+ """Capacity currently owned by the agent."""
42
+
43
+ reservation_id: str
44
+ source: str
45
+ gpu_count: int = Field(..., ge=1)
46
+ remaining_ticks: int = Field(..., ge=0)
47
+ price_per_gpu: float = Field(..., ge=0.0)
48
+ acquired_at_tick: int = Field(..., ge=0)
49
+
50
+
51
+ class MarketEvent(BaseModel):
52
+ """Human-readable event surfaced in observations."""
53
+
54
+ tick: int = Field(..., ge=0)
55
+ event_type: str
56
+ message: str
57
+
58
+
59
+ class ActorSignal(BaseModel):
60
+ """Public signal about a scripted actor."""
61
+
62
+ actor_id: str
63
+ visible_behavior: Literal["aggressive", "steady", "opportunistic"]
64
+ pressure_hint: Literal["low", "medium", "high"]
65
+ last_seen_bid: float = Field(..., ge=0.0)
66
+
67
+
68
+ class ActorProfile(BaseModel):
69
+ """Hidden actor configuration kept in the control-plane state."""
70
+
71
+ actor_id: str
72
+ policy_type: Literal["urgent_tenant", "cost_sensitive_tenant", "broker"]
73
+ max_bid: float = Field(..., ge=0.0)
74
+ preferred_gpu_count: int = Field(..., ge=1)
75
+ visible_behavior: Literal["aggressive", "steady", "opportunistic"]
76
+ swap_floor: float = Field(..., ge=0.0)
77
+
78
+
79
+ class ComputeMarketAction(Action):
80
+ """Single-step action for the compute market."""
81
+
82
+ action_type: Literal[
83
+ "bid_for_capacity",
84
+ "accept_offer",
85
+ "propose_swap",
86
+ "schedule_job",
87
+ "delay_job",
88
+ "inspect_market",
89
+ "noop",
90
+ ]
91
+ job_id: str | None = Field(default=None)
92
+ offer_id: str | None = Field(default=None)
93
+ actor_id: str | None = Field(default=None)
94
+ gpu_count: int = Field(default=0, ge=0)
95
+ price_per_gpu: float = Field(default=0.0, ge=0.0)
96
+ duration: int = Field(default=1, ge=1)
97
+
98
+
99
+ class ComputeMarketObservation(Observation):
100
+ """Partial observation exposed to the trained agent."""
101
+
102
+ current_tick: int = Field(default=0, ge=0)
103
+ max_ticks: int = Field(default=0, ge=0)
104
+ total_gpus: int = Field(default=0, ge=0)
105
+ free_gpus: int = Field(default=0, ge=0)
106
+ owned_gpus: int = Field(default=0, ge=0)
107
+ idle_owned_gpus: int = Field(default=0, ge=0)
108
+ budget_remaining: float = Field(default=0.0)
109
+ market_price: float = Field(default=0.0, ge=0.0)
110
+ jobs: list[JobRecord] = Field(default_factory=list)
111
+ visible_offers: list[MarketOffer] = Field(default_factory=list)
112
+ recent_events: list[MarketEvent] = Field(default_factory=list)
113
+ actor_signals: list[ActorSignal] = Field(default_factory=list)
114
+
115
+
116
+ class ComputeMarketState(State):
117
+ """Full control-plane state including hidden actor data."""
118
+
119
+ scenario_seed: int = 0
120
+ current_tick: int = 0
121
+ max_ticks: int = 0
122
+ total_gpus: int = 0
123
+ free_gpus: int = 0
124
+ owned_gpus: int = 0
125
+ idle_owned_gpus: int = 0
126
+ budget_remaining: float = 0.0
127
+ market_price: float = 0.0
128
+ cumulative_reward: float = 0.0
129
+ external_allocated_gpus: int = 0
130
+ done: bool = False
131
+ jobs: list[JobRecord] = Field(default_factory=list)
132
+ visible_offers: list[MarketOffer] = Field(default_factory=list)
133
+ reservations: list[ReservationRecord] = Field(default_factory=list)
134
+ actor_signals: list[ActorSignal] = Field(default_factory=list)
135
+ hidden_actors: list[ActorProfile] = Field(default_factory=list)
136
+ recent_events: list[MarketEvent] = Field(default_factory=list)
openenv.yaml ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ spec_version: 1
2
+ name: compute_market_env
3
+ type: space
4
+ runtime: fastapi
5
+ app: server.app:app
6
+ port: 8000
pyproject.toml ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [build-system]
2
+ requires = ["setuptools>=69", "wheel"]
3
+ build-backend = "setuptools.build_meta"
4
+
5
+ [project]
6
+ name = "openenv-compute-market-env"
7
+ version = "0.1.0"
8
+ description = "Compute allocation market environment for OpenEnv"
9
+ readme = "README.md"
10
+ requires-python = ">=3.10"
11
+ dependencies = [
12
+ "openenv-core[core]>=0.2.1",
13
+ "pydantic>=2.7.0",
14
+ ]
15
+
16
+ [project.optional-dependencies]
17
+ dev = [
18
+ "openenv-core[cli]>=0.2.1",
19
+ "pytest>=8.2.0",
20
+ "ruff>=0.7.0",
21
+ ]
22
+ train = [
23
+ "trl>=0.24.0",
24
+ "transformers>=4.56.0",
25
+ ]
26
+
27
+ [project.scripts]
28
+ server = "compute_market_env.server.app:main"
29
+
30
+ [tool.setuptools]
31
+ include-package-data = true
32
+ packages = ["compute_market_env", "compute_market_env.server"]
33
+ package-dir = {"compute_market_env" = ".", "compute_market_env.server" = "server"}
34
+
35
+ [tool.pytest.ini_options]
36
+ testpaths = ["tests"]
server/__init__.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ """Server exports for the Compute Market environment."""
2
+
3
+ from .compute_market_environment import ComputeMarketEnvironment
4
+
5
+ __all__ = ["ComputeMarketEnvironment"]
server/app.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """FastAPI app for the Compute Market environment."""
2
+
3
+ try:
4
+ from openenv.core.env_server import create_app
5
+ except ImportError:
6
+ from openenv.core.env_server.http_server import create_app
7
+
8
+ try:
9
+ from ..models import ComputeMarketAction, ComputeMarketObservation
10
+ from .compute_market_environment import create_environment_from_env
11
+ except ImportError:
12
+ from models import ComputeMarketAction, ComputeMarketObservation
13
+ from server.compute_market_environment import create_environment_from_env
14
+
15
+
16
+ app = create_app(
17
+ create_environment_from_env,
18
+ ComputeMarketAction,
19
+ ComputeMarketObservation,
20
+ env_name="compute_market_env",
21
+ max_concurrent_envs=8,
22
+ )
23
+
24
+
25
+ def main(host: str = "0.0.0.0", port: int = 8000) -> None:
26
+ import uvicorn
27
+
28
+ uvicorn.run(app, host=host, port=port)
29
+
30
+
31
+ if __name__ == "__main__":
32
+ main()
server/compute_market_environment.py ADDED
@@ -0,0 +1,637 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Compute Market environment implementation."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import os
6
+ import random
7
+ from uuid import uuid4
8
+
9
+ try:
10
+ from openenv.core.env_server.interfaces import Environment
11
+ except ImportError:
12
+ from openenv.core.env_server import Environment
13
+
14
+ try:
15
+ from ..models import (
16
+ ActorProfile,
17
+ ActorSignal,
18
+ ComputeMarketAction,
19
+ ComputeMarketObservation,
20
+ ComputeMarketState,
21
+ JobRecord,
22
+ MarketEvent,
23
+ MarketOffer,
24
+ ReservationRecord,
25
+ )
26
+ except ImportError:
27
+ from models import (
28
+ ActorProfile,
29
+ ActorSignal,
30
+ ComputeMarketAction,
31
+ ComputeMarketObservation,
32
+ ComputeMarketState,
33
+ JobRecord,
34
+ MarketEvent,
35
+ MarketOffer,
36
+ ReservationRecord,
37
+ )
38
+
39
+
40
+ class ComputeMarketEnvironment(
41
+ Environment[ComputeMarketAction, ComputeMarketObservation, ComputeMarketState]
42
+ ):
43
+ """Single-agent compute allocation market with scripted counterparties."""
44
+
45
+ SUPPORTS_CONCURRENT_SESSIONS = True
46
+
47
+ def __init__(
48
+ self,
49
+ total_gpus: int = 8,
50
+ initial_budget: float = 150.0,
51
+ max_ticks: int = 12,
52
+ default_seed: int = 0,
53
+ ) -> None:
54
+ self.total_gpus = total_gpus
55
+ self.initial_budget = initial_budget
56
+ self.max_ticks = max_ticks
57
+ self.default_seed = default_seed
58
+ self._state = ComputeMarketState(
59
+ episode_id=str(uuid4()),
60
+ step_count=0,
61
+ scenario_seed=default_seed,
62
+ max_ticks=max_ticks,
63
+ total_gpus=total_gpus,
64
+ budget_remaining=initial_budget,
65
+ )
66
+ self._rng = random.Random(default_seed)
67
+ self._jobs: list[JobRecord] = []
68
+ self._reservations: list[ReservationRecord] = []
69
+ self._visible_offers: list[MarketOffer] = []
70
+ self._hidden_actors: list[ActorProfile] = []
71
+ self._actor_signals: list[ActorSignal] = []
72
+ self._recent_events: list[MarketEvent] = []
73
+ self._current_tick = 0
74
+ self._market_price = 0.0
75
+ self._free_gpus = total_gpus
76
+ self._external_allocated_gpus = 0
77
+ self._budget_remaining = initial_budget
78
+ self._cumulative_reward = 0.0
79
+ self._done = False
80
+
81
+ def reset(
82
+ self,
83
+ seed: int | None = None,
84
+ episode_id: str | None = None,
85
+ **kwargs,
86
+ ) -> ComputeMarketObservation:
87
+ scenario_seed = self.default_seed if seed is None else seed
88
+ self._rng = random.Random(scenario_seed)
89
+ self._current_tick = 0
90
+ self._done = False
91
+ self._budget_remaining = float(self.initial_budget)
92
+ self._cumulative_reward = 0.0
93
+ self._jobs = self._build_jobs()
94
+ self._reservations = []
95
+ self._hidden_actors = self._build_actors()
96
+ self._visible_offers = []
97
+ self._actor_signals = []
98
+ self._recent_events = [
99
+ MarketEvent(
100
+ tick=0,
101
+ event_type="reset",
102
+ message="Scenario initialized with scripted tenants and broker.",
103
+ )
104
+ ]
105
+ self._refresh_market()
106
+ self._state = self._snapshot_state(
107
+ episode_id=episode_id or str(uuid4()),
108
+ step_count=0,
109
+ scenario_seed=scenario_seed,
110
+ )
111
+ return self._build_observation(0.0, False, {"status": "ready"})
112
+
113
+ def step(self, action: ComputeMarketAction) -> ComputeMarketObservation: # type: ignore[override]
114
+ if self._done:
115
+ return self._build_observation(
116
+ 0.0,
117
+ True,
118
+ {"error": "Episode already finished."},
119
+ )
120
+
121
+ self._state.step_count += 1
122
+ reward = 0.0
123
+ action_events: list[MarketEvent] = []
124
+ error: str | None = None
125
+
126
+ if action.action_type == "bid_for_capacity":
127
+ reward, error, action_events = self._handle_bid(action)
128
+ elif action.action_type == "accept_offer":
129
+ reward, error, action_events = self._handle_accept_offer(action)
130
+ elif action.action_type == "propose_swap":
131
+ reward, error, action_events = self._handle_swap(action)
132
+ elif action.action_type == "schedule_job":
133
+ reward, error, action_events = self._handle_schedule(action)
134
+ elif action.action_type == "delay_job":
135
+ reward, error, action_events = self._handle_delay(action)
136
+ elif action.action_type == "inspect_market":
137
+ action_events = [
138
+ self._event(
139
+ "inspect",
140
+ f"Market inspected: spot price ${self._market_price:.2f}, free GPUs {self._free_gpus}.",
141
+ )
142
+ ]
143
+ reward -= 0.25
144
+ elif action.action_type == "noop":
145
+ action_events = [self._event("noop", "No action taken this tick.")]
146
+ else:
147
+ error = f"Unsupported action type: {action.action_type}"
148
+ reward -= 2.0
149
+
150
+ advance_reward, advance_events = self._advance_tick()
151
+ total_reward = round(reward + advance_reward, 2)
152
+ combined_events = action_events + advance_events
153
+ self._recent_events = combined_events[-6:]
154
+ self._cumulative_reward = round(self._cumulative_reward + total_reward, 2)
155
+ self._state = self._snapshot_state(
156
+ episode_id=self._state.episode_id,
157
+ step_count=self._state.step_count,
158
+ scenario_seed=self._state.scenario_seed,
159
+ )
160
+ metadata = {"events": [event.model_dump() for event in combined_events]}
161
+ if error:
162
+ metadata["error"] = error
163
+ return self._build_observation(total_reward, self._done, metadata)
164
+
165
+ @property
166
+ def state(self) -> ComputeMarketState:
167
+ return self._state
168
+
169
+ def _build_jobs(self) -> list[JobRecord]:
170
+ jitter = self._rng.randint(-4, 4)
171
+ return [
172
+ JobRecord(
173
+ job_id="job-a",
174
+ gpu_count=4,
175
+ total_duration=2,
176
+ remaining_duration=2,
177
+ deadline=4,
178
+ value=100 + jitter,
179
+ priority=3,
180
+ ),
181
+ JobRecord(
182
+ job_id="job-b",
183
+ gpu_count=2,
184
+ total_duration=2,
185
+ remaining_duration=2,
186
+ deadline=7,
187
+ value=46 + self._rng.randint(-3, 3),
188
+ priority=2,
189
+ ),
190
+ JobRecord(
191
+ job_id="job-c",
192
+ gpu_count=1,
193
+ total_duration=1,
194
+ remaining_duration=1,
195
+ deadline=8,
196
+ value=24 + self._rng.randint(-2, 2),
197
+ priority=1,
198
+ depends_on=["job-a"],
199
+ ),
200
+ ]
201
+
202
+ def _build_actors(self) -> list[ActorProfile]:
203
+ return [
204
+ ActorProfile(
205
+ actor_id="urgent-tenant",
206
+ policy_type="urgent_tenant",
207
+ max_bid=round(7.0 + self._rng.uniform(0.5, 1.5), 2),
208
+ preferred_gpu_count=4 + self._rng.randint(0, 2),
209
+ visible_behavior="aggressive",
210
+ swap_floor=round(6.0 + self._rng.uniform(0.2, 0.8), 2),
211
+ ),
212
+ ActorProfile(
213
+ actor_id="budget-tenant",
214
+ policy_type="cost_sensitive_tenant",
215
+ max_bid=round(4.5 + self._rng.uniform(0.2, 1.0), 2),
216
+ preferred_gpu_count=2 + self._rng.randint(0, 1),
217
+ visible_behavior="steady",
218
+ swap_floor=round(4.0 + self._rng.uniform(0.2, 0.8), 2),
219
+ ),
220
+ ActorProfile(
221
+ actor_id="broker-1",
222
+ policy_type="broker",
223
+ max_bid=round(6.0 + self._rng.uniform(0.2, 1.2), 2),
224
+ preferred_gpu_count=3 + self._rng.randint(0, 2),
225
+ visible_behavior="opportunistic",
226
+ swap_floor=round(5.0 + self._rng.uniform(0.2, 0.8), 2),
227
+ ),
228
+ ]
229
+
230
+ def _handle_bid(self, action: ComputeMarketAction) -> tuple[float, str | None, list[MarketEvent]]:
231
+ if action.gpu_count <= 0:
232
+ return -2.0, "gpu_count must be positive.", []
233
+ if action.price_per_gpu <= 0:
234
+ return -2.0, "price_per_gpu must be positive.", []
235
+ if action.gpu_count > self._free_gpus:
236
+ return -2.0, f"Only {self._free_gpus} public GPUs are available this tick.", []
237
+ if self._owned_gpus() + action.gpu_count > self.total_gpus:
238
+ return -2.0, "Cluster capacity would be exceeded.", []
239
+ if action.price_per_gpu < self._market_price:
240
+ return -1.0, f"Bid ${action.price_per_gpu:.2f} is below current clearing price ${self._market_price:.2f}.", []
241
+ total_cost = round(action.gpu_count * action.price_per_gpu * action.duration, 2)
242
+ if total_cost > self._budget_remaining:
243
+ return -2.0, "Insufficient budget for bid.", []
244
+
245
+ self._budget_remaining = round(self._budget_remaining - total_cost, 2)
246
+ self._reservations.append(
247
+ ReservationRecord(
248
+ reservation_id=f"res-{uuid4().hex[:8]}",
249
+ source="spot-market",
250
+ gpu_count=action.gpu_count,
251
+ remaining_ticks=action.duration,
252
+ price_per_gpu=action.price_per_gpu,
253
+ acquired_at_tick=self._current_tick,
254
+ )
255
+ )
256
+ return (
257
+ -total_cost,
258
+ None,
259
+ [
260
+ self._event(
261
+ "bid_won",
262
+ f"Won {action.gpu_count} GPU(s) for {action.duration} tick(s) at ${action.price_per_gpu:.2f}/GPU.",
263
+ )
264
+ ],
265
+ )
266
+
267
+ def _handle_accept_offer(self, action: ComputeMarketAction) -> tuple[float, str | None, list[MarketEvent]]:
268
+ if not action.offer_id:
269
+ return -2.0, "offer_id is required.", []
270
+ offer = next((item for item in self._visible_offers if item.offer_id == action.offer_id), None)
271
+ if offer is None:
272
+ return -2.0, f"Offer {action.offer_id} is not available.", []
273
+ if self._owned_gpus() + offer.gpu_count > self.total_gpus:
274
+ return -2.0, "Cluster capacity would be exceeded.", []
275
+
276
+ total_cost = round(offer.gpu_count * offer.price_per_gpu * offer.duration, 2)
277
+ if total_cost > self._budget_remaining:
278
+ return -2.0, "Insufficient budget for offer.", []
279
+
280
+ self._budget_remaining = round(self._budget_remaining - total_cost, 2)
281
+ self._reservations.append(
282
+ ReservationRecord(
283
+ reservation_id=f"res-{uuid4().hex[:8]}",
284
+ source=offer.actor_id,
285
+ gpu_count=offer.gpu_count,
286
+ remaining_ticks=offer.duration,
287
+ price_per_gpu=offer.price_per_gpu,
288
+ acquired_at_tick=self._current_tick,
289
+ )
290
+ )
291
+ self._visible_offers = [item for item in self._visible_offers if item.offer_id != offer.offer_id]
292
+ return (
293
+ -total_cost,
294
+ None,
295
+ [
296
+ self._event(
297
+ "offer_accepted",
298
+ f"Accepted {offer.offer_type} offer from {offer.actor_id} for {offer.gpu_count} GPU(s).",
299
+ )
300
+ ],
301
+ )
302
+
303
+ def _handle_swap(self, action: ComputeMarketAction) -> tuple[float, str | None, list[MarketEvent]]:
304
+ if not action.actor_id:
305
+ return -2.0, "actor_id is required.", []
306
+ if action.gpu_count <= 0 or action.price_per_gpu <= 0:
307
+ return -2.0, "gpu_count and price_per_gpu must be positive.", []
308
+ actor = next((item for item in self._hidden_actors if item.actor_id == action.actor_id), None)
309
+ if actor is None:
310
+ return -2.0, f"Unknown actor {action.actor_id}.", []
311
+ if self._owned_gpus() + action.gpu_count > self.total_gpus:
312
+ return -2.0, "Cluster capacity would be exceeded.", []
313
+ total_cost = round(action.gpu_count * action.price_per_gpu * action.duration, 2)
314
+ if total_cost > self._budget_remaining:
315
+ return -2.0, "Insufficient budget for swap.", []
316
+ if action.price_per_gpu < actor.swap_floor:
317
+ return -1.0, f"{actor.actor_id} rejected the swap; offered price is below its floor.", []
318
+
319
+ self._budget_remaining = round(self._budget_remaining - total_cost, 2)
320
+ self._reservations.append(
321
+ ReservationRecord(
322
+ reservation_id=f"res-{uuid4().hex[:8]}",
323
+ source=f"swap:{actor.actor_id}",
324
+ gpu_count=action.gpu_count,
325
+ remaining_ticks=action.duration,
326
+ price_per_gpu=action.price_per_gpu,
327
+ acquired_at_tick=self._current_tick,
328
+ )
329
+ )
330
+ return (
331
+ -total_cost,
332
+ None,
333
+ [
334
+ self._event(
335
+ "swap_accepted",
336
+ f"{actor.actor_id} transferred {action.gpu_count} GPU(s) at ${action.price_per_gpu:.2f}/GPU.",
337
+ )
338
+ ],
339
+ )
340
+
341
+ def _handle_schedule(self, action: ComputeMarketAction) -> tuple[float, str | None, list[MarketEvent]]:
342
+ if not action.job_id:
343
+ return -2.0, "job_id is required.", []
344
+ job = self._job(action.job_id)
345
+ if job is None:
346
+ return -2.0, f"Unknown job {action.job_id}.", []
347
+ if job.status in {"completed", "missed"}:
348
+ return -1.0, f"Job {action.job_id} is already terminal.", []
349
+ if not self._deps_completed(job):
350
+ return -1.0, f"Job {action.job_id} is blocked on dependencies {job.depends_on}.", []
351
+ if self._idle_owned_gpus() < job.gpu_count:
352
+ return -1.0, f"Need {job.gpu_count} idle owned GPU(s) to start {job.job_id}.", []
353
+
354
+ job.status = "running"
355
+ if job.started_at is None:
356
+ job.started_at = self._current_tick
357
+ return (
358
+ 1.0,
359
+ None,
360
+ [
361
+ self._event(
362
+ "job_started",
363
+ f"Scheduled {job.job_id} using {job.gpu_count} GPU(s).",
364
+ )
365
+ ],
366
+ )
367
+
368
+ def _handle_delay(self, action: ComputeMarketAction) -> tuple[float, str | None, list[MarketEvent]]:
369
+ if not action.job_id:
370
+ return -2.0, "job_id is required.", []
371
+ job = self._job(action.job_id)
372
+ if job is None:
373
+ return -2.0, f"Unknown job {action.job_id}.", []
374
+ if job.status in {"completed", "missed"}:
375
+ return -1.0, f"Job {job.job_id} is already terminal.", []
376
+ job.status = "paused" if job.status == "running" else "pending"
377
+ job.delay_count += 1
378
+ return (
379
+ -1.0,
380
+ None,
381
+ [
382
+ self._event(
383
+ "job_delayed",
384
+ f"Delayed {job.job_id}; slack shrinks while the deadline stays fixed.",
385
+ )
386
+ ],
387
+ )
388
+
389
+ def _advance_tick(self) -> tuple[float, list[MarketEvent]]:
390
+ tick_reward = 0.0
391
+ events: list[MarketEvent] = []
392
+ available_owned = self._owned_gpus()
393
+ used_gpus = 0
394
+
395
+ running_jobs = sorted(
396
+ [job for job in self._jobs if job.status == "running"],
397
+ key=lambda item: (-item.priority, item.job_id),
398
+ )
399
+ for job in running_jobs:
400
+ if used_gpus + job.gpu_count <= available_owned:
401
+ used_gpus += job.gpu_count
402
+ job.remaining_duration -= 1
403
+ events.append(
404
+ self._event(
405
+ "job_progress",
406
+ f"{job.job_id} progressed; {job.remaining_duration} tick(s) remaining.",
407
+ )
408
+ )
409
+ if job.remaining_duration == 0:
410
+ job.status = "completed"
411
+ job.completed_at = self._current_tick + 1
412
+ tick_reward += job.value
413
+ events.append(
414
+ self._event(
415
+ "job_completed",
416
+ f"{job.job_id} completed before deadline and earned ${job.value:.2f}.",
417
+ )
418
+ )
419
+ else:
420
+ job.status = "paused"
421
+ tick_reward -= 3.0
422
+ events.append(
423
+ self._event(
424
+ "job_paused",
425
+ f"{job.job_id} paused because owned capacity dropped below demand.",
426
+ )
427
+ )
428
+
429
+ idle_owned = max(0, available_owned - used_gpus)
430
+ if idle_owned > 0:
431
+ idle_penalty = round(0.5 * idle_owned, 2)
432
+ tick_reward -= idle_penalty
433
+ events.append(
434
+ self._event(
435
+ "idle_penalty",
436
+ f"Paid ${idle_penalty:.2f} idle-hoarding penalty for {idle_owned} unused owned GPU(s).",
437
+ )
438
+ )
439
+
440
+ for reservation in self._reservations:
441
+ reservation.remaining_ticks = max(0, reservation.remaining_ticks - 1)
442
+ expired = [item for item in self._reservations if item.remaining_ticks == 0]
443
+ self._reservations = [item for item in self._reservations if item.remaining_ticks > 0]
444
+ for reservation in expired:
445
+ events.append(
446
+ self._event(
447
+ "reservation_expired",
448
+ f"Reservation {reservation.reservation_id} from {reservation.source} expired.",
449
+ )
450
+ )
451
+
452
+ next_tick = self._current_tick + 1
453
+ for job in self._jobs:
454
+ if job.status not in {"completed", "missed"} and next_tick > job.deadline:
455
+ job.status = "missed"
456
+ penalty = round(job.value * 0.6, 2)
457
+ tick_reward -= penalty
458
+ events.append(
459
+ self._event(
460
+ "deadline_missed",
461
+ f"{job.job_id} missed its deadline and incurred ${penalty:.2f} penalty.",
462
+ )
463
+ )
464
+
465
+ self._current_tick = next_tick
466
+ self._done = self._current_tick >= self.max_ticks or all(
467
+ job.status in {"completed", "missed"} for job in self._jobs
468
+ ) or self._budget_remaining <= 0.0
469
+
470
+ if not self._done:
471
+ self._refresh_market()
472
+ events.extend(self._market_events_for_tick())
473
+ else:
474
+ self._visible_offers = []
475
+ self._actor_signals = []
476
+ self._free_gpus = max(0, self.total_gpus - self._owned_gpus())
477
+
478
+ return round(tick_reward, 2), events
479
+
480
+ def _refresh_market(self) -> None:
481
+ owned = self._owned_gpus()
482
+ remaining_cluster = max(0, self.total_gpus - owned)
483
+ base_price = 4.0 + 0.3 * self._current_tick + self._rng.uniform(0.0, 1.0)
484
+ actor_signals: list[ActorSignal] = []
485
+ visible_offers: list[MarketOffer] = []
486
+ external_demand = 0
487
+
488
+ for actor in self._hidden_actors:
489
+ if actor.policy_type == "urgent_tenant":
490
+ gpu_demand = max(2, actor.preferred_gpu_count - (self._current_tick // 3))
491
+ bid = round(actor.max_bid - 0.15 * self._current_tick, 2)
492
+ pressure = "high" if gpu_demand >= 4 else "medium"
493
+ elif actor.policy_type == "cost_sensitive_tenant":
494
+ gpu_demand = max(1, actor.preferred_gpu_count - (self._current_tick // 4))
495
+ bid = round(actor.max_bid - 0.1 * max(0, self._current_tick - 1), 2)
496
+ pressure = "medium" if gpu_demand >= 2 else "low"
497
+ else:
498
+ gpu_demand = actor.preferred_gpu_count
499
+ bid = round(actor.max_bid + 0.2 * self._current_tick, 2)
500
+ pressure = "medium"
501
+ visible_offers.append(
502
+ MarketOffer(
503
+ offer_id=f"offer-{self._current_tick}-{actor.actor_id}",
504
+ actor_id=actor.actor_id,
505
+ gpu_count=min(remaining_cluster or actor.preferred_gpu_count, actor.preferred_gpu_count),
506
+ price_per_gpu=round(bid + 0.6, 2),
507
+ duration=2,
508
+ expires_at_tick=self._current_tick + 1,
509
+ offer_type="broker",
510
+ )
511
+ )
512
+
513
+ if actor.policy_type != "broker":
514
+ external_demand += gpu_demand
515
+ actor_signals.append(
516
+ ActorSignal(
517
+ actor_id=actor.actor_id,
518
+ visible_behavior=actor.visible_behavior,
519
+ pressure_hint=pressure,
520
+ last_seen_bid=max(0.0, bid),
521
+ )
522
+ )
523
+
524
+ self._external_allocated_gpus = min(remaining_cluster, external_demand)
525
+ self._free_gpus = max(0, remaining_cluster - self._external_allocated_gpus)
526
+ pressure_bump = 0.45 * (external_demand / max(1, self.total_gpus))
527
+ self._market_price = round(base_price + pressure_bump, 2)
528
+ self._visible_offers = [
529
+ offer for offer in visible_offers if offer.gpu_count > 0 and offer.expires_at_tick >= self._current_tick
530
+ ]
531
+ self._actor_signals = actor_signals
532
+
533
+ def _market_events_for_tick(self) -> list[MarketEvent]:
534
+ messages = [
535
+ self._event(
536
+ "market_tick",
537
+ f"Tick {self._current_tick}: spot price ${self._market_price:.2f}, public free GPUs {self._free_gpus}.",
538
+ )
539
+ ]
540
+ for signal in self._actor_signals:
541
+ messages.append(
542
+ self._event(
543
+ "actor_signal",
544
+ f"{signal.actor_id} looks {signal.visible_behavior} with {signal.pressure_hint} pressure.",
545
+ )
546
+ )
547
+ for offer in self._visible_offers:
548
+ messages.append(
549
+ self._event(
550
+ "offer_visible",
551
+ f"{offer.actor_id} posted {offer.gpu_count} GPU(s) at ${offer.price_per_gpu:.2f}/GPU for {offer.duration} tick(s).",
552
+ )
553
+ )
554
+ return messages
555
+
556
+ def _snapshot_state(
557
+ self,
558
+ episode_id: str,
559
+ step_count: int,
560
+ scenario_seed: int,
561
+ ) -> ComputeMarketState:
562
+ return ComputeMarketState(
563
+ episode_id=episode_id,
564
+ step_count=step_count,
565
+ scenario_seed=scenario_seed,
566
+ current_tick=self._current_tick,
567
+ max_ticks=self.max_ticks,
568
+ total_gpus=self.total_gpus,
569
+ free_gpus=self._free_gpus,
570
+ owned_gpus=self._owned_gpus(),
571
+ idle_owned_gpus=self._idle_owned_gpus(),
572
+ budget_remaining=self._budget_remaining,
573
+ market_price=self._market_price,
574
+ cumulative_reward=self._cumulative_reward,
575
+ external_allocated_gpus=self._external_allocated_gpus,
576
+ done=self._done,
577
+ jobs=[job.model_copy(deep=True) for job in self._jobs],
578
+ visible_offers=[offer.model_copy(deep=True) for offer in self._visible_offers],
579
+ reservations=[reservation.model_copy(deep=True) for reservation in self._reservations],
580
+ actor_signals=[signal.model_copy(deep=True) for signal in self._actor_signals],
581
+ hidden_actors=[actor.model_copy(deep=True) for actor in self._hidden_actors],
582
+ recent_events=[event.model_copy(deep=True) for event in self._recent_events],
583
+ )
584
+
585
+ def _build_observation(
586
+ self,
587
+ reward: float,
588
+ done: bool,
589
+ metadata: dict,
590
+ ) -> ComputeMarketObservation:
591
+ return ComputeMarketObservation(
592
+ current_tick=self._current_tick,
593
+ max_ticks=self.max_ticks,
594
+ total_gpus=self.total_gpus,
595
+ free_gpus=self._free_gpus,
596
+ owned_gpus=self._owned_gpus(),
597
+ idle_owned_gpus=self._idle_owned_gpus(),
598
+ budget_remaining=self._budget_remaining,
599
+ market_price=self._market_price,
600
+ jobs=[job.model_copy(deep=True) for job in self._jobs],
601
+ visible_offers=[offer.model_copy(deep=True) for offer in self._visible_offers],
602
+ recent_events=[event.model_copy(deep=True) for event in self._recent_events],
603
+ actor_signals=[signal.model_copy(deep=True) for signal in self._actor_signals],
604
+ done=done,
605
+ reward=reward,
606
+ metadata=metadata,
607
+ )
608
+
609
+ def _deps_completed(self, job: JobRecord) -> bool:
610
+ if not job.depends_on:
611
+ return True
612
+ completed = {item.job_id for item in self._jobs if item.status == "completed"}
613
+ return all(dep in completed for dep in job.depends_on)
614
+
615
+ def _job(self, job_id: str) -> JobRecord | None:
616
+ return next((job for job in self._jobs if job.job_id == job_id), None)
617
+
618
+ def _owned_gpus(self) -> int:
619
+ return sum(reservation.gpu_count for reservation in self._reservations)
620
+
621
+ def _idle_owned_gpus(self) -> int:
622
+ running_gpu_demand = sum(job.gpu_count for job in self._jobs if job.status == "running")
623
+ return max(0, self._owned_gpus() - running_gpu_demand)
624
+
625
+ def _event(self, event_type: str, message: str) -> MarketEvent:
626
+ return MarketEvent(tick=self._current_tick, event_type=event_type, message=message)
627
+
628
+
629
+ def create_environment_from_env() -> ComputeMarketEnvironment:
630
+ """Factory used by the FastAPI app and tests."""
631
+
632
+ return ComputeMarketEnvironment(
633
+ total_gpus=int(os.getenv("COMPUTE_MARKET_TOTAL_GPUS", "8")),
634
+ initial_budget=float(os.getenv("COMPUTE_MARKET_INITIAL_BUDGET", "150")),
635
+ max_ticks=int(os.getenv("COMPUTE_MARKET_MAX_TICKS", "12")),
636
+ default_seed=int(os.getenv("COMPUTE_MARKET_DEFAULT_SEED", "0")),
637
+ )
server/requirements.txt ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ openenv-core[core]>=0.2.1
2
+ uvicorn>=0.30.0
tests/test_compute_market_environment.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from compute_market_env import ComputeMarketAction
2
+ from compute_market_env.server.compute_market_environment import ComputeMarketEnvironment
3
+
4
+
5
+ def test_reset_is_deterministic_for_fixed_seed():
6
+ env = ComputeMarketEnvironment(default_seed=1)
7
+ obs1 = env.reset(seed=11)
8
+ obs2 = env.reset(seed=11)
9
+
10
+ assert obs1.market_price == obs2.market_price
11
+ assert obs1.free_gpus == obs2.free_gpus
12
+ assert [offer.price_per_gpu for offer in obs1.visible_offers] == [
13
+ offer.price_per_gpu for offer in obs2.visible_offers
14
+ ]
15
+
16
+
17
+ def test_bid_schedule_and_complete_job():
18
+ env = ComputeMarketEnvironment(total_gpus=12, initial_budget=200.0, max_ticks=8, default_seed=0)
19
+ env.reset(seed=3)
20
+
21
+ result = env.step(
22
+ ComputeMarketAction(
23
+ action_type="bid_for_capacity",
24
+ gpu_count=4,
25
+ price_per_gpu=8.0,
26
+ duration=3,
27
+ )
28
+ )
29
+ assert result.reward < 0
30
+
31
+ result = env.step(ComputeMarketAction(action_type="schedule_job", job_id="job-a"))
32
+ assert any(job.status == "running" for job in result.jobs)
33
+
34
+ result = env.step(ComputeMarketAction(action_type="noop"))
35
+ job_a = next(job for job in result.jobs if job.job_id == "job-a")
36
+ assert job_a.status == "completed"
37
+ assert result.reward > 0
38
+
39
+
40
+ def test_invalid_action_returns_penalty_metadata():
41
+ env = ComputeMarketEnvironment(default_seed=2)
42
+ env.reset(seed=2)
43
+
44
+ result = env.step(ComputeMarketAction(action_type="schedule_job", job_id="missing-job"))
45
+
46
+ assert result.reward < 0
47
+ assert "error" in result.metadata
training/Compute_Market_Qwen3_GRPO.ipynb ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "cells": [
3
+ {
4
+ "cell_type": "markdown",
5
+ "metadata": {},
6
+ "source": [
7
+ "# Compute Market Qwen3 GRPO\n",
8
+ "Minimal Colab notebook for GRPO on the OpenEnv compute market environment."
9
+ ]
10
+ },
11
+ {
12
+ "cell_type": "code",
13
+ "execution_count": null,
14
+ "metadata": {},
15
+ "outputs": [],
16
+ "source": [
17
+ "%%capture\n",
18
+ "!pip install --upgrade uv\n",
19
+ "!uv pip install unsloth vllm --torch-backend=auto\n",
20
+ "!uv pip install --upgrade --no-cache-dir --no-deps unsloth unsloth_zoo\n",
21
+ "!uv pip install transformers==4.56.2 'trl>=0.24.0' datasets openenv-core\n",
22
+ "!git clone https://github.com/kiankyars/lambdatheta.git
23
+ !pip install git+https://huggingface.co/spaces/kyars/compute_market_env"
24
+ ]
25
+ },
26
+ {
27
+ "cell_type": "code",
28
+ "execution_count": null,
29
+ "metadata": {},
30
+ "outputs": [],
31
+ "source": [
32
+ "import os\n",
33
+ "os.environ['OPENENV_URL'] = 'https://kyars-compute-market-env.hf.space'
34
+ import sys
35
+ sys.path.append('/content/lambdatheta')\n",
36
+ "MAX_STEPS = 300"
37
+ ]
38
+ },
39
+ {
40
+ "cell_type": "code",
41
+ "execution_count": null,
42
+ "metadata": {},
43
+ "outputs": [],
44
+ "source": [
45
+ "from unsloth import FastLanguageModel\n",
46
+ "from transformers import AutoTokenizer\n",
47
+ "from training.compute_market_grpo import build_trainer, build_dataset\n",
48
+ "\n",
49
+ "model_name = 'Qwen/Qwen3-4B'\n",
50
+ "model, tokenizer = FastLanguageModel.from_pretrained(\n",
51
+ " model_name=model_name,\n",
52
+ " max_seq_length=4096,\n",
53
+ " load_in_4bit=True,\n",
54
+ " fast_inference=True,\n",
55
+ ")\n",
56
+ "tokenizer = AutoTokenizer.from_pretrained(model_name)\n",
57
+ "if tokenizer.pad_token is None:\n",
58
+ " tokenizer.pad_token = tokenizer.eos_token"
59
+ ]
60
+ },
61
+ {
62
+ "cell_type": "code",
63
+ "execution_count": null,
64
+ "metadata": {},
65
+ "outputs": [],
66
+ "source": [
67
+ "dataset = build_dataset(size=64)\n",
68
+ "trainer = build_trainer(\n",
69
+ " model=model,\n",
70
+ " tokenizer=tokenizer,\n",
71
+ " env_url=os.environ['OPENENV_URL'],\n",
72
+ " train_dataset=dataset,\n",
73
+ " output_dir='outputs/compute-market-qwen3-4b',\n",
74
+ " max_steps=MAX_STEPS,\n",
75
+ ")"
76
+ ]
77
+ },
78
+ {
79
+ "cell_type": "code",
80
+ "execution_count": null,
81
+ "metadata": {},
82
+ "outputs": [],
83
+ "source": [
84
+ "trainer.train()"
85
+ ]
86
+ }
87
+ ],
88
+ "metadata": {
89
+ "kernelspec": {
90
+ "display_name": "Python 3",
91
+ "language": "python",
92
+ "name": "python3"
93
+ },
94
+ "language_info": {
95
+ "name": "python",
96
+ "version": "3.11"
97
+ }
98
+ },
99
+ "nbformat": 4,
100
+ "nbformat_minor": 5
101
+ }
training/OpenEnv_gpt_oss_(20B)_Reinforcement_Learning_2048_Game.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
training/Qwen3_(4B)_GRPO.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
training/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Training helpers for the compute market environment."""
training/compute_market_grpo.py ADDED
@@ -0,0 +1,313 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """GRPO training helpers for the Compute Market environment.
2
+
3
+ Designed to be imported from a Colab notebook after installing:
4
+ - the environment package from the Hugging Face Space or GitHub repo
5
+ - TRL / Unsloth / transformers runtime deps
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import json
11
+ import os
12
+ import re
13
+ from dataclasses import dataclass
14
+ from typing import Any, Callable
15
+
16
+ from compute_market_env import ComputeMarketAction, ComputeMarketEnv
17
+
18
+ ACTION_JSON_RE = re.compile(r"\{.*\}", re.DOTALL)
19
+
20
+ SYSTEM_PROMPT = """You are a compute allocator trading for scarce GPU capacity.
21
+ Return exactly one JSON object with a valid action.
22
+ Allowed action_type values: bid_for_capacity, accept_offer, propose_swap, schedule_job, delay_job, inspect_market, noop.
23
+ Only include fields needed by the chosen action.
24
+ Be conservative with budget and prioritize completing valuable jobs before their deadlines."""
25
+
26
+ DEFAULT_TASK_PROMPT = (
27
+ "Maximize completed job value while minimizing spend, idle-hoarding penalties, "
28
+ "and missed deadlines in the compute market."
29
+ )
30
+
31
+
32
+ @dataclass
33
+ class RolloutSummary:
34
+ prompt_ids: list[int]
35
+ completion_ids: list[int]
36
+ logprobs: list[float]
37
+ episode_return: float
38
+ valid_action_reward: float
39
+ completion_bonus: float
40
+ transcripts: list[dict[str, Any]]
41
+
42
+
43
+ def observation_to_prompt(observation: Any, task_prompt: str = DEFAULT_TASK_PROMPT) -> str:
44
+ jobs = [job.model_dump() for job in observation.jobs]
45
+ offers = [offer.model_dump() for offer in observation.visible_offers]
46
+ signals = [signal.model_dump() for signal in observation.actor_signals]
47
+ events = [event.model_dump() for event in observation.recent_events]
48
+ return json.dumps(
49
+ {
50
+ "task": task_prompt,
51
+ "tick": observation.current_tick,
52
+ "max_ticks": observation.max_ticks,
53
+ "budget_remaining": observation.budget_remaining,
54
+ "market_price": observation.market_price,
55
+ "public_free_gpus": observation.free_gpus,
56
+ "owned_gpus": observation.owned_gpus,
57
+ "idle_owned_gpus": observation.idle_owned_gpus,
58
+ "jobs": jobs,
59
+ "visible_offers": offers,
60
+ "actor_signals": signals,
61
+ "recent_events": events,
62
+ },
63
+ indent=2,
64
+ )
65
+
66
+
67
+ def parse_action(text: str) -> tuple[ComputeMarketAction, bool]:
68
+ match = ACTION_JSON_RE.search(text)
69
+ if not match:
70
+ return ComputeMarketAction(action_type="inspect_market"), False
71
+ try:
72
+ payload = json.loads(match.group(0))
73
+ return ComputeMarketAction(**payload), True
74
+ except Exception:
75
+ return ComputeMarketAction(action_type="inspect_market"), False
76
+
77
+
78
+ def _count_completed_jobs(observation: Any) -> int:
79
+ return sum(1 for job in observation.jobs if job.status == "completed")
80
+
81
+
82
+ def rollout_once(
83
+ trainer: Any,
84
+ env: ComputeMarketEnv,
85
+ tokenizer: Any,
86
+ dataset_prompt: str,
87
+ system_prompt: str = SYSTEM_PROMPT,
88
+ max_turns: int = 6,
89
+ seed: int | None = None,
90
+ ) -> RolloutSummary:
91
+ from trl.experimental.openenv import generate_rollout_completions
92
+
93
+ result = env.reset(seed=seed)
94
+ prompt_ids: list[int] = []
95
+ completion_ids: list[int] = []
96
+ logprobs: list[float] = []
97
+ transcripts: list[dict[str, Any]] = []
98
+ rewards: list[float] = []
99
+ valid_action_reward = 0.0
100
+ completed_before = 0
101
+
102
+ for turn in range(max_turns):
103
+ if result.done:
104
+ break
105
+
106
+ prompt_text = observation_to_prompt(result.observation, dataset_prompt)
107
+ messages = [
108
+ {"role": "system", "content": system_prompt},
109
+ {"role": "user", "content": prompt_text},
110
+ ]
111
+ rendered_prompt = tokenizer.apply_chat_template(
112
+ messages,
113
+ add_generation_prompt=True,
114
+ tokenize=False,
115
+ enable_thinking=False,
116
+ )
117
+
118
+ rollout_outputs = generate_rollout_completions(trainer, [rendered_prompt])[0]
119
+ prompt_ids.extend(rollout_outputs["prompt_ids"])
120
+ completion_ids.extend(rollout_outputs["completion_ids"])
121
+ logprobs.extend(rollout_outputs["logprobs"])
122
+ completion_text = rollout_outputs.get("text") or tokenizer.decode(
123
+ rollout_outputs["completion_ids"],
124
+ skip_special_tokens=True,
125
+ )
126
+
127
+ action, is_valid = parse_action(completion_text)
128
+ result = env.step(action)
129
+ reward = float(result.reward or 0.0)
130
+ rewards.append(reward)
131
+ valid_action_reward += 0.25 if is_valid else -1.0
132
+
133
+ completed_after = _count_completed_jobs(result.observation)
134
+ completion_gain = max(0, completed_after - completed_before)
135
+ completed_before = completed_after
136
+
137
+ transcripts.append(
138
+ {
139
+ "turn": turn,
140
+ "prompt": prompt_text,
141
+ "completion": completion_text,
142
+ "action": action.model_dump(exclude_none=True),
143
+ "is_valid_action": is_valid,
144
+ "reward": reward,
145
+ "completed_jobs": completed_after,
146
+ "completion_gain": completion_gain,
147
+ }
148
+ )
149
+
150
+ completion_bonus = float(completed_before)
151
+ return RolloutSummary(
152
+ prompt_ids=prompt_ids,
153
+ completion_ids=completion_ids,
154
+ logprobs=logprobs,
155
+ episode_return=sum(rewards),
156
+ valid_action_reward=valid_action_reward,
157
+ completion_bonus=completion_bonus,
158
+ transcripts=transcripts,
159
+ )
160
+
161
+
162
+ def rollout_func(
163
+ prompts: list[str],
164
+ trainer: Any | None = None,
165
+ tokenizer: Any | None = None,
166
+ env_url: str | None = None,
167
+ max_turns: int = 6,
168
+ seed_offset: int = 0,
169
+ ) -> dict[str, Any]:
170
+ if trainer is None:
171
+ raise ValueError("trainer is required")
172
+ if tokenizer is None:
173
+ raise ValueError("tokenizer is required")
174
+
175
+ env_url = env_url or os.environ.get("OPENENV_URL", "http://localhost:8000")
176
+ episode_prompt_ids = []
177
+ episode_completion_ids = []
178
+ episode_logprobs = []
179
+ episode_returns = []
180
+ validity_rewards = []
181
+ completion_bonuses = []
182
+ transcripts = []
183
+
184
+ with ComputeMarketEnv(base_url=env_url) as env:
185
+ for idx, prompt_text in enumerate(prompts):
186
+ episode = rollout_once(
187
+ trainer=trainer,
188
+ env=env,
189
+ tokenizer=tokenizer,
190
+ dataset_prompt=prompt_text,
191
+ max_turns=max_turns,
192
+ seed=seed_offset + idx,
193
+ )
194
+ episode_prompt_ids.append(episode.prompt_ids)
195
+ episode_completion_ids.append(episode.completion_ids)
196
+ episode_logprobs.append(episode.logprobs)
197
+ episode_returns.append(episode.episode_return)
198
+ validity_rewards.append(episode.valid_action_reward)
199
+ completion_bonuses.append(episode.completion_bonus)
200
+ transcripts.append(episode.transcripts)
201
+
202
+ return {
203
+ "prompt_ids": episode_prompt_ids,
204
+ "completion_ids": episode_completion_ids,
205
+ "logprobs": episode_logprobs,
206
+ "env_reward": episode_returns,
207
+ "valid_action_reward": validity_rewards,
208
+ "completion_bonus": completion_bonuses,
209
+ "transcripts": transcripts,
210
+ }
211
+
212
+
213
+ def reward_env_return(completions: list[Any], **kwargs: Any) -> list[float]:
214
+ rewards = kwargs.get("env_reward") or []
215
+ return [float(rewards[i]) if i < len(rewards) else 0.0 for i in range(len(completions))]
216
+
217
+
218
+ def reward_valid_action(completions: list[Any], **kwargs: Any) -> list[float]:
219
+ rewards = kwargs.get("valid_action_reward") or []
220
+ return [float(rewards[i]) if i < len(rewards) else 0.0 for i in range(len(completions))]
221
+
222
+
223
+ def reward_job_completion(completions: list[Any], **kwargs: Any) -> list[float]:
224
+ rewards = kwargs.get("completion_bonus") or []
225
+ return [float(rewards[i]) if i < len(rewards) else 0.0 for i in range(len(completions))]
226
+
227
+
228
+ def build_dataset(size: int = 128, prompt: str = DEFAULT_TASK_PROMPT):
229
+ from datasets import Dataset
230
+
231
+ return Dataset.from_dict({"prompt": [prompt] * size})
232
+
233
+
234
+ def build_grpo_config(
235
+ output_dir: str = "outputs/compute-market-qwen3-4b",
236
+ max_steps: int = 300,
237
+ learning_rate: float = 5e-6,
238
+ num_generations: int = 2,
239
+ max_prompt_length: int = 1800,
240
+ max_completion_length: int = 192,
241
+ use_vllm: bool = True,
242
+ ):
243
+ from trl import GRPOConfig
244
+
245
+ kwargs: dict[str, Any] = dict(
246
+ learning_rate=learning_rate,
247
+ weight_decay=0.001,
248
+ warmup_ratio=0.1,
249
+ lr_scheduler_type="linear",
250
+ optim="adamw_8bit",
251
+ logging_steps=1,
252
+ per_device_train_batch_size=1,
253
+ gradient_accumulation_steps=1,
254
+ num_generations=num_generations,
255
+ max_prompt_length=max_prompt_length,
256
+ max_completion_length=max_completion_length,
257
+ max_steps=max_steps,
258
+ save_steps=max_steps,
259
+ report_to="none",
260
+ output_dir=output_dir,
261
+ )
262
+ if use_vllm:
263
+ kwargs.update(
264
+ use_vllm=True,
265
+ vllm_mode="colocate",
266
+ vllm_gpu_memory_utilization=0.15,
267
+ )
268
+ return GRPOConfig(**kwargs)
269
+
270
+
271
+ def build_trainer(
272
+ model: Any,
273
+ tokenizer: Any,
274
+ env_url: str,
275
+ train_dataset: Any | None = None,
276
+ output_dir: str = "outputs/compute-market-qwen3-4b",
277
+ max_steps: int = 300,
278
+ max_turns: int = 6,
279
+ ):
280
+ from trl import GRPOTrainer
281
+
282
+ train_dataset = train_dataset or build_dataset()
283
+ args = build_grpo_config(output_dir=output_dir, max_steps=max_steps)
284
+
285
+ def bound_rollout_func(prompts: list[str], trainer: Any | None = None, **_: Any) -> dict[str, Any]:
286
+ return rollout_func(
287
+ prompts,
288
+ trainer=trainer,
289
+ tokenizer=tokenizer,
290
+ env_url=env_url,
291
+ max_turns=max_turns,
292
+ )
293
+
294
+ return GRPOTrainer(
295
+ model=model,
296
+ processing_class=tokenizer,
297
+ reward_funcs=[
298
+ reward_env_return,
299
+ reward_valid_action,
300
+ reward_job_completion,
301
+ ],
302
+ train_dataset=train_dataset,
303
+ args=args,
304
+ rollout_func=bound_rollout_func,
305
+ )
306
+
307
+
308
+ def build_colab_setup_snippet(space_repo_id: str = "kyars/compute_market_env") -> str:
309
+ return f"""# Colab install\n!pip install --upgrade uv\n!uv pip install unsloth vllm --torch-backend=auto\n!uv pip install --upgrade --no-cache-dir --no-deps unsloth unsloth_zoo\n!uv pip install transformers==4.56.2 'trl>=0.24.0' datasets openenv-core\n!pip install git+https://huggingface.co/spaces/{space_repo_id}\n\nimport os\nos.environ['OPENENV_URL'] = 'https://{space_repo_id.replace('/', '-').replace('_', '-')}.hf.space'\n"""
310
+
311
+
312
+ if __name__ == "__main__":
313
+ print(build_colab_setup_snippet())
training/minimal_grpo_rollout.py ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Minimal TRL/OpenEnv rollout helpers for Colab."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import json
6
+ import os
7
+ import re
8
+ from typing import Any
9
+
10
+ from compute_market_env import ComputeMarketAction, ComputeMarketEnv
11
+
12
+ ACTION_RE = re.compile(r"\{.*\}", re.DOTALL)
13
+ SYSTEM_PROMPT = """You trade for scarce GPU capacity.
14
+ Choose exactly one JSON action per turn.
15
+ Valid action_type values are: bid_for_capacity, accept_offer, propose_swap, schedule_job, delay_job, inspect_market, noop.
16
+ Return strict JSON only."""
17
+
18
+
19
+ def observation_to_prompt(observation) -> str:
20
+ jobs = [job.model_dump() for job in observation.jobs]
21
+ offers = [offer.model_dump() for offer in observation.visible_offers]
22
+ signals = [signal.model_dump() for signal in observation.actor_signals]
23
+ return json.dumps(
24
+ {
25
+ "tick": observation.current_tick,
26
+ "budget_remaining": observation.budget_remaining,
27
+ "market_price": observation.market_price,
28
+ "free_gpus": observation.free_gpus,
29
+ "owned_gpus": observation.owned_gpus,
30
+ "idle_owned_gpus": observation.idle_owned_gpus,
31
+ "jobs": jobs,
32
+ "visible_offers": offers,
33
+ "actor_signals": signals,
34
+ },
35
+ indent=2,
36
+ )
37
+
38
+
39
+ def parse_action(text: str) -> ComputeMarketAction:
40
+ match = ACTION_RE.search(text)
41
+ if not match:
42
+ return ComputeMarketAction(action_type="inspect_market")
43
+ try:
44
+ payload = json.loads(match.group(0))
45
+ return ComputeMarketAction(**payload)
46
+ except Exception:
47
+ return ComputeMarketAction(action_type="inspect_market")
48
+
49
+
50
+ def rollout_once(generate_completion, seed: int = 0, max_turns: int = 6) -> dict[str, Any]:
51
+ env_url = os.environ.get("OPENENV_URL", "http://localhost:8000")
52
+ rewards = []
53
+ actions = []
54
+ transcripts = []
55
+
56
+ with ComputeMarketEnv(base_url=env_url) as env:
57
+ result = env.reset(seed=seed)
58
+ for _ in range(max_turns):
59
+ if result.done:
60
+ break
61
+ prompt = observation_to_prompt(result.observation)
62
+ completion = generate_completion(SYSTEM_PROMPT, prompt)
63
+ action = parse_action(completion)
64
+ result = env.step(action)
65
+ actions.append(action.model_dump(exclude_none=True))
66
+ rewards.append(float(result.reward or 0.0))
67
+ transcripts.append(
68
+ {
69
+ "prompt": prompt,
70
+ "completion": completion,
71
+ "action": action.model_dump(exclude_none=True),
72
+ "reward": result.reward,
73
+ }
74
+ )
75
+
76
+ return {
77
+ "actions": actions,
78
+ "rewards": rewards,
79
+ "return": sum(rewards),
80
+ "transcripts": transcripts,
81
+ }
uv.lock ADDED
The diff for this file is too large to render. See raw diff