File size: 4,389 Bytes
4d13031
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""FastAPI server for serving RecallTrace in Docker or Hugging Face Spaces."""

from __future__ import annotations

from pathlib import Path
from typing import Optional

import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel

from baseline.policy import choose_heuristic_action
from env.env import RecallTraceEnv
from env.models import RecallAction


BASE_DIR = Path(__file__).resolve().parent
STATIC_DIR = BASE_DIR / "static"

app = FastAPI(title="RecallTrace OpenEnv", version="1.0.0")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")

ACTIVE_ENV = RecallTraceEnv()


class ResetRequest(BaseModel):
    task_id: Optional[str] = None
    phase: Optional[int] = None


class RunEpisodeRequest(BaseModel):
    task_id: Optional[str] = None
    phase: Optional[int] = None


@app.get("/")
def root() -> FileResponse:
    return FileResponse(STATIC_DIR / "index.html")


@app.get("/health")
def health() -> dict:
    return {"status": "healthy"}


@app.get("/tasks")
def tasks() -> dict:
    return {"tasks": [task.model_dump() for task in RecallTraceEnv.available_tasks()]}


@app.get("/api/tasks")
def api_tasks() -> dict:
    return tasks()


@app.get("/reset")
def reset_get(task_id: Optional[str] = None, phase: Optional[int] = None) -> dict:
    try:
        return ACTIVE_ENV.reset(task_id=task_id, phase=phase).model_dump()
    except Exception as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.post("/reset")
def reset_post(request: ResetRequest) -> dict:
    try:
        return ACTIVE_ENV.reset(task_id=request.task_id, phase=request.phase).model_dump()
    except Exception as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.post("/step")
def step(action: RecallAction) -> dict:
    try:
        observation, reward, done, info = ACTIVE_ENV.step(action)
        return {
            "observation": observation.model_dump(),
            "reward": reward,
            "done": done,
            "info": info,
        }
    except Exception as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.get("/state")
def state() -> dict:
    return ACTIVE_ENV.state().model_dump()


def _run_episode(task_id: str | None = None, phase: int | None = None) -> dict:
    env = RecallTraceEnv(task_id=task_id, phase=phase)
    observation = env.reset(task_id=task_id, phase=phase)
    logs = []
    final_info = {"score": 0.0}

    for step_number in range(1, env.task.max_steps + 1):
        action = choose_heuristic_action(observation)
        observation, reward, done, info = env.step(action)
        logs.append(
            {
                "step": step_number,
                "action": action.model_dump(exclude_none=True),
                "reward": reward,
                "done": done,
                "message": info.get("message"),
            }
        )
        final_info = info
        if done:
            break

    return {
        "task": env.task.model_dump(),
        "score": float(final_info.get("score", 0.0)),
        "success": float(final_info.get("score", 0.0)) >= 0.9,
        "steps_taken": env.state().steps_taken,
        "final_info": final_info,
        "final_observation": observation.model_dump(),
        "logs": logs,
    }


@app.post("/api/run_episode")
def run_episode(request: RunEpisodeRequest) -> dict:
    try:
        return _run_episode(task_id=request.task_id, phase=request.phase)
    except Exception as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.get("/api/run_all")
def run_all() -> dict:
    try:
        episodes = [_run_episode(task_id=task.task_id) for task in RecallTraceEnv.available_tasks()]
        average_score = round(sum(item["score"] for item in episodes) / len(episodes), 4)
        return {
            "average_score": average_score,
            "episodes": episodes,
        }
    except Exception as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


def main() -> None:
    uvicorn.run(app, host="0.0.0.0", port=7860)


if __name__ == "__main__":
    main()