NetZero-Nav / netzero_nav /server.py
Aryanshh
feat: Implement Suez Jam crisis scenario and HUD integration
2ddf5f5
raw
history blame
1.49 kB
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional
import os
from netzero_nav.env import AtlasEcoEnv
from netzero_nav.models import Action, Observation, StepResponse
app = FastAPI(title="NetZero Nav: Eco-Resilient Logistics")
_env = AtlasEcoEnv()
# Mount Static Dashboard
static_path = os.path.join(os.getcwd(), "dashboard")
app.mount("/dashboard", StaticFiles(directory=static_path), name="dashboard")
@app.get("/")
def root():
return RedirectResponse(url="/dashboard/index.html")
class ResetRequest(BaseModel):
task: Optional[str] = "easy"
seed: Optional[int] = 42
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/reset", response_model=Observation)
def reset(req: ResetRequest = ResetRequest()):
return _env.reset(seed=req.seed)
@app.post("/step", response_model=StepResponse)
def step(action: Action):
obs, reward, done, info = _env.step(action)
return StepResponse(observation=obs, reward=reward, done=done, info=info)
@app.post("/trigger")
def trigger():
_env.sea_blocked_until = _env.step_count + 7
return {"status": "Suez Jam Triggered", "duration": 7}
@app.get("/state")
def state():
return {
"step": _env.step_count,
"inventory": _env.inventory,
"carbon": _env.carbon_total,
"cash": _env.cash_balance,
"orders": _env.pending_orders
}