qulab-infinite / agent_lab /backend /api /environment_routes.py
workofarttattoo's picture
πŸš€ QuLab MCP Server: Complete Experiment Taxonomy Deployment
91994bf
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Dict
from environmental_sim.environmental_sim import EnvironmentalSimulator
router = APIRouter(
prefix="/environment",
tags=["Environmental Simulator"],
responses={404: {"description": "Not found"}},
)
# Global instance
env_sim = EnvironmentalSimulator()
# --- Data Models ---
class EnvConfigRequest(BaseModel):
temperature_c: float = 25.0
pressure_bar: float = 1.0
wind_m_s: float = 0.0
gravity_m_s2: List[float] = [0.0, 0.0, -9.81]
class EnvSnapshot(BaseModel):
temperature_C: float
temperature_K: float
pressure_bar: float
pressure_Pa: float
wind_velocity_m_s: List[float]
gravity_m_s2: List[float]
# --- Endpoints ---
@router.post("/configure", response_model=EnvSnapshot)
async def configure_environment(config: EnvConfigRequest):
"""
Update the global environmental state.
"""
controller = env_sim.controller
controller.temperature.set_temperature(config.temperature_c, unit="C")
controller.pressure.set_pressure(config.pressure_bar, unit="bar")
controller.fluid.set_wind((config.wind_m_s, 0.0, 0.0), unit="m/s")
# Snapshot at origin
snapshot = controller.get_conditions_at_position((0.0, 0.0, 0.0))
return EnvSnapshot(
temperature_C=float(snapshot["temperature_C"]),
temperature_K=float(snapshot["temperature_K"]),
pressure_bar=float(snapshot["pressure_bar"]),
pressure_Pa=float(snapshot["pressure_Pa"]),
wind_velocity_m_s=[float(v) for v in snapshot["wind_velocity_m_s"]],
gravity_m_s2=[float(v) for v in snapshot["gravity_m_s2"]]
)
@router.get("/snapshot", response_model=EnvSnapshot)
async def get_snapshot():
"""
Get the current environmental state at the origin.
"""
snapshot = env_sim.controller.get_conditions_at_position((0.0, 0.0, 0.0))
return EnvSnapshot(
temperature_C=float(snapshot["temperature_C"]),
temperature_K=float(snapshot["temperature_K"]),
pressure_bar=float(snapshot["pressure_bar"]),
pressure_Pa=float(snapshot["pressure_Pa"]),
wind_velocity_m_s=[float(v) for v in snapshot["wind_velocity_m_s"]],
gravity_m_s2=[float(v) for v in snapshot["gravity_m_s2"]]
)