File size: 4,846 Bytes
de55468
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from __future__ import annotations

import os
import time
from pathlib import Path
from typing import Literal
from uuid import uuid4

from fastapi import Request
from fastapi.responses import PlainTextResponse, RedirectResponse
from openenv.core.env_server import create_app
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import Action, Observation, State
from pydantic import Field


LOG_PATH = Path("/tmp/minimal-space.log")
START_TS = time.strftime("%Y-%m-%dT%H:%M:%S%z")


def log(message: str) -> None:
    line = f"[{time.strftime('%Y-%m-%dT%H:%M:%S%z')}] {message}"
    print(line, flush=True)
    try:
        with LOG_PATH.open("a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass


class MinimalAction(Action):
    action_type: Literal["noop", "increment", "finish"] = "noop"
    amount: int = Field(default=1, ge=1, le=3)


class MinimalObservation(Observation):
    status: str
    counter: int
    summary: str
    reward: float = 0.0
    done: bool = False


class MinimalState(State):
    counter: int = 0


class MinimalEnvironment(Environment[MinimalAction, MinimalObservation, MinimalState]):
    SUPPORTS_CONCURRENT_SESSIONS = False

    def __init__(self):
        super().__init__()
        log("MinimalEnvironment.__init__ begin")
        self._done = False
        self._state = MinimalState(episode_id=str(uuid4()), step_count=0, counter=0)
        log("MinimalEnvironment.__init__ end")

    def reset(self, seed: int | None = None, episode_id: str | None = None, **kwargs) -> MinimalObservation:
        log(f"reset begin seed={seed} episode_id={episode_id} kwargs={kwargs}")
        self._done = False
        self._state = MinimalState(
            episode_id=episode_id or str(uuid4()),
            step_count=0,
            counter=0,
        )
        obs = self._observation(status="ready", reward=0.0, done=False)
        log(f"reset end episode_id={self._state.episode_id}")
        return obs

    def step(self, action: MinimalAction, timeout_s: float | None = None, **kwargs) -> MinimalObservation:
        log(f"step begin action={action.model_dump()} timeout_s={timeout_s} kwargs={kwargs}")
        if self._done:
            obs = self._observation(status="done", reward=0.0, done=True)
            log("step end already done")
            return obs

        self._state.step_count += 1
        reward = 0.0
        status = "ok"

        if action.action_type == "increment":
            self._state.counter += action.amount
            reward = float(action.amount)
        elif action.action_type == "finish":
            self._done = True
            status = "finished"

        if self._state.step_count >= 8:
            self._done = True
            status = "finished"

        obs = self._observation(status=status, reward=reward, done=self._done)
        log(f"step end counter={self._state.counter} step_count={self._state.step_count} done={self._done}")
        return obs

    @property
    def state(self) -> MinimalState:
        log("state property")
        return self._state

    def close(self) -> None:
        log("close")

    def _observation(self, *, status: str, reward: float, done: bool) -> MinimalObservation:
        return MinimalObservation(
            status=status,
            counter=self._state.counter,
            summary=(
                f"Minimal OpenEnv demo. Counter={self._state.counter}. "
                f"Step={self._state.step_count}. "
                f"Choose noop, increment, or finish."
            ),
            reward=reward,
            done=done,
        )


log("minimal_openenv_app module import begin")
app = create_app(MinimalEnvironment, MinimalAction, MinimalObservation, env_name="minimal_openenv")
log("openenv app created")


@app.on_event("startup")
async def startup() -> None:
    log("startup event begin")
    log(f"python={os.sys.version.split()[0]}")
    log(f"cwd={os.getcwd()}")
    log(f"enable_web={os.getenv('ENABLE_WEB_INTERFACE')}")
    log("startup event end")


@app.middleware("http")
async def request_logger(request: Request, call_next):
    log(f"request start method={request.method} path={request.url.path}")
    response = await call_next(request)
    log(f"request end method={request.method} path={request.url.path} status={response.status_code}")
    return response


@app.get("/", include_in_schema=False)
def root() -> RedirectResponse:
    log("root redirect")
    return RedirectResponse(url="/web")


@app.get("/logs", include_in_schema=False)
def logs() -> PlainTextResponse:
    log("logs handler")
    try:
        return PlainTextResponse(LOG_PATH.read_text(encoding="utf-8"))
    except FileNotFoundError:
        return PlainTextResponse("no log file yet\n")


log("minimal_openenv_app module import end")