# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ FastAPI application for the Data Cleaning Env Environment. This module creates an HTTP server that exposes the DataCleaningEnvironment over HTTP and WebSocket endpoints, compatible with EnvClient. Endpoints: - POST /reset: Reset the environment - POST /step: Execute an action - GET /state: Get current environment state - GET /schema: Get action/observation schemas - WS /ws: WebSocket endpoint for persistent sessions Usage: # Development (with auto-reload): uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 # Production: uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4 # Or run directly: python -m server.app """ try: from openenv.core.env_server.http_server import create_app except Exception as e: # pragma: no cover raise ImportError( "openenv is required for the web interface. Install dependencies with '\n uv sync\n'" ) from e try: from ..models import DataCleaningAction, DataCleaningObservation from .data_cleaning_env_environment import DataCleaningEnvironment except: from models import DataCleaningAction, DataCleaningObservation from server.data_cleaning_env_environment import DataCleaningEnvironment from fastapi import Request import subprocess import json import sys import os # Create the app with web interface and README integration app = create_app( DataCleaningEnvironment, DataCleaningAction, DataCleaningObservation, env_name="data_cleaning_env", max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions ) @app.get("/tasks") def get_tasks(): from models import DataCleaningAction try: from server.tasks import TASKS except ImportError: TASKS = [] return { "tasks": [{"id": i, "name": t.name, "description": t.description} for i, t in enumerate(TASKS)], "action_schema": DataCleaningAction.model_json_schema() } @app.get("/grader") def get_grader(): # Helper endpoint returning status return {"message": "Grader called. For real execution, score is tracked in StepResult.observation.current_score"} @app.post("/baseline") def run_baseline_endpoint(): try: pwd = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Run inference.py asynchronously using current python interpreter result = subprocess.run([sys.executable, os.path.join(pwd, "inference.py")], capture_output=True, text=True) out = result.stdout scores = [] for line in out.split('\\n'): if "Final Scores:" in line: scores_str = line.split("Final Scores:")[1].strip() scores = json.loads(scores_str) return {"output": out, "scores": scores, "error": result.stderr} except Exception as e: return {"error": str(e)} def main(host: str = "0.0.0.0", port: int = 8000): """ Entry point for direct execution via uv run or python -m. This function enables running the server without Docker: uv run --project . server uv run --project . server --port 8001 python -m data_cleaning_env.server.app Args: host: Host address to bind to (default: "0.0.0.0") port: Port number to listen on (default: 8000) For production deployments, consider using uvicorn directly with multiple workers: uvicorn data_cleaning_env.server.app:app --workers 4 """ import uvicorn uvicorn.run(app, host=host, port=port) if __name__ == "__main__": main()