"""FastAPI app for the Legacy COBOL Migration Workbench.""" from __future__ import annotations import os from threading import RLock from fastapi import Body, HTTPException, status from openenv.core.env_server.http_server import ( ResetRequest, ResetResponse, StepRequest, StepResponse, create_app, ) from openenv.core.env_server.mcp_types import ListToolsAction from openenv.core.env_server.serialization import serialize_observation from pydantic import ValidationError try: from ..models import LegacyCobolState, ToolActionWrapper, ToolObservationWrapper from .legacy_cobol_env_environment import LegacyCobolEnvironment except ImportError: from models import LegacyCobolState, ToolActionWrapper, ToolObservationWrapper from server.legacy_cobol_env_environment import LegacyCobolEnvironment max_concurrent = int(os.getenv("MAX_CONCURRENT_ENVS", "4")) app = create_app( LegacyCobolEnvironment, ToolActionWrapper, ToolObservationWrapper, env_name="legacy_cobol_env", max_concurrent_envs=max_concurrent, ) _rest_env = LegacyCobolEnvironment() _rest_lock = RLock() def _remove_routes(paths: set[str]) -> None: app.router.routes = [ route for route in app.router.routes if getattr(route, "path", None) not in paths ] def _rest_action(action_data: dict[str, object]) -> ToolActionWrapper | ListToolsAction: try: if action_data.get("type") == "list_tools": return ListToolsAction.model_validate(action_data) return ToolActionWrapper.model_validate(action_data) except ValidationError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=exc.errors(), ) from exc def _install_persistent_rest_routes() -> None: _remove_routes({"/reset", "/step", "/state"}) @app.post("/reset", response_model=ResetResponse, tags=["Environment Control"]) async def reset( request: ResetRequest = Body(default_factory=ResetRequest), ) -> ResetResponse: kwargs = request.model_dump(exclude_unset=True) with _rest_lock: observation = _rest_env.reset(**kwargs) return ResetResponse(**serialize_observation(observation)) @app.post("/step", response_model=StepResponse, tags=["Environment Control"]) async def step(request: StepRequest) -> StepResponse: action = _rest_action(request.action) kwargs = request.model_dump(exclude_unset=True, exclude={"action"}) with _rest_lock: observation = _rest_env.step(action, **kwargs) return StepResponse(**serialize_observation(observation)) @app.get("/state", response_model=LegacyCobolState, tags=["State Management"]) async def get_state() -> LegacyCobolState: with _rest_lock: return _rest_env.state.model_copy(deep=True) def _install_project_schema_route() -> None: _remove_routes({"/schema"}) @app.get("/schema", tags=["Schema"]) async def get_project_schemas() -> dict[str, object]: return { "action": ToolActionWrapper.model_json_schema(), "observation": ToolObservationWrapper.model_json_schema(), "state": LegacyCobolState.model_json_schema(), } _install_persistent_rest_routes() _install_project_schema_route() def main() -> None: import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) if __name__ == "__main__": main()