kousiksasmal's picture
Upload folder using huggingface_hub
c7ccf4c verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
FastAPI application for the Data Cleaning Env Environment.
This module creates an HTTP server that exposes the DataCleaningEnvironment
over HTTP and WebSocket endpoints, compatible with EnvClient.
Endpoints:
- POST /reset: Reset the environment
- POST /step: Execute an action
- GET /state: Get current environment state
- GET /schema: Get action/observation schemas
- WS /ws: WebSocket endpoint for persistent sessions
Usage:
# Development (with auto-reload):
uvicorn server.app:app --reload --host 0.0.0.0 --port 8000
# Production:
uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4
# Or run directly:
python -m server.app
"""
try:
from openenv.core.env_server.http_server import create_app
except Exception as e: # pragma: no cover
raise ImportError(
"openenv is required for the web interface. Install dependencies with '\n uv sync\n'"
) from e
try:
from ..models import DataCleaningAction, DataCleaningObservation
from .data_cleaning_env_environment import DataCleaningEnvironment
except:
from models import DataCleaningAction, DataCleaningObservation
from server.data_cleaning_env_environment import DataCleaningEnvironment
from fastapi import Request
import subprocess
import json
import sys
import os
# Create the app with web interface and README integration
app = create_app(
DataCleaningEnvironment,
DataCleaningAction,
DataCleaningObservation,
env_name="data_cleaning_env",
max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions
)
@app.get("/tasks")
def get_tasks():
from models import DataCleaningAction
try:
from server.tasks import TASKS
except ImportError:
TASKS = []
return {
"tasks": [{"id": i, "name": t.name, "description": t.description} for i, t in enumerate(TASKS)],
"action_schema": DataCleaningAction.model_json_schema()
}
@app.get("/grader")
def get_grader():
# Helper endpoint returning status
return {"message": "Grader called. For real execution, score is tracked in StepResult.observation.current_score"}
@app.post("/baseline")
def run_baseline_endpoint():
try:
pwd = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Run inference.py asynchronously using current python interpreter
result = subprocess.run([sys.executable, os.path.join(pwd, "inference.py")], capture_output=True, text=True)
out = result.stdout
scores = []
for line in out.split('\\n'):
if "Final Scores:" in line:
scores_str = line.split("Final Scores:")[1].strip()
scores = json.loads(scores_str)
return {"output": out, "scores": scores, "error": result.stderr}
except Exception as e:
return {"error": str(e)}
def main(host: str = "0.0.0.0", port: int = 8000):
"""
Entry point for direct execution via uv run or python -m.
This function enables running the server without Docker:
uv run --project . server
uv run --project . server --port 8001
python -m data_cleaning_env.server.app
Args:
host: Host address to bind to (default: "0.0.0.0")
port: Port number to listen on (default: 8000)
For production deployments, consider using uvicorn directly with
multiple workers:
uvicorn data_cleaning_env.server.app:app --workers 4
"""
import uvicorn
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
main()