| |
| |
| |
| |
| |
|
|
| """ |
| FastAPI application for the SafeSpace Content Moderation Environment. |
| |
| This module creates an HTTP server that exposes the SafeSpaceEnvironment |
| over HTTP and WebSocket endpoints, compatible with EnvClient. |
| |
| Endpoints: |
| - POST /reset: Reset the environment |
| - POST /step: Execute an action |
| - GET /state: Get current environment state |
| - GET /schema: Get action/observation schemas |
| - WS /ws: WebSocket endpoint for persistent sessions |
| |
| Usage: |
| # Development (with auto-reload): |
| uvicorn content_moderation_env.server.app:app --reload --host 0.0.0.0 --port 8000 |
| |
| # Production: |
| uvicorn content_moderation_env.server.app:app --host 0.0.0.0 --port 8000 --workers 4 |
| |
| # Or run directly: |
| python -m content_moderation_env.server.app |
| """ |
|
|
| from contextlib import asynccontextmanager |
|
|
| try: |
| from openenv.core.env_server.http_server import create_app |
| from openenv.core.env_server.types import SchemaResponse |
| except Exception as e: |
| raise ImportError( |
| "openenv is required for the web interface. Install with: pip install openenv-core" |
| ) from e |
|
|
| try: |
| from fastapi import Request |
| from fastapi.responses import JSONResponse |
| from ..models import ModerationAction, ModerationObservation, ModerationState |
| from .errors import SafeSpaceError |
| from .environment import SafeSpaceEnvironment |
| from .scenarios import validate_benchmark_manifest, validate_scenario_corpus |
| except (ModuleNotFoundError, ImportError): |
| from fastapi import Request |
| from fastapi.responses import JSONResponse |
| from models import ModerationAction, ModerationObservation, ModerationState |
| from server.errors import SafeSpaceError |
| from server.environment import SafeSpaceEnvironment |
| from server.scenarios import validate_benchmark_manifest, validate_scenario_corpus |
|
|
|
|
| |
| app = create_app( |
| SafeSpaceEnvironment, |
| ModerationAction, |
| ModerationObservation, |
| env_name="safespace", |
| max_concurrent_envs=10, |
| ) |
|
|
|
|
| def _replace_get_route(path: str) -> None: |
| """Remove OpenEnv's default GET route so SafeSpace can publish typed variants.""" |
| app.router.routes = [ |
| route |
| for route in app.router.routes |
| if not ( |
| getattr(route, "path", None) == path |
| and "GET" in (getattr(route, "methods", set()) or set()) |
| ) |
| ] |
| app.openapi_schema = None |
|
|
|
|
| def _get_state_snapshot() -> ModerationState: |
| """Return a typed environment-state snapshot for the stateless HTTP route.""" |
| env = SafeSpaceEnvironment() |
| try: |
| return env.state |
| finally: |
| env.close() |
|
|
|
|
| def _get_schema_payload() -> SchemaResponse: |
| """Return the action, observation, and typed state schemas for public docs.""" |
| return SchemaResponse( |
| action=ModerationAction.model_json_schema(), |
| observation=ModerationObservation.model_json_schema(), |
| state=ModerationState.model_json_schema(), |
| ) |
|
|
|
|
| _replace_get_route("/state") |
| _replace_get_route("/schema") |
|
|
|
|
| @app.get( |
| "/state", |
| response_model=ModerationState, |
| tags=["State Management"], |
| summary="Get current environment state", |
| description=""" |
| Retrieve the current environment state using SafeSpace's typed ModerationState model. |
| |
| The shared HTTP route remains stateless and returns a fresh environment snapshot. |
| Live benchmark state should still be read through a session-aware SafeSpaceEnv client. |
| """, |
| ) |
| async def get_state() -> ModerationState: |
| """Serve the typed public state schema for validators and human reviewers.""" |
| return _get_state_snapshot() |
|
|
|
|
| @app.get( |
| "/schema", |
| response_model=SchemaResponse, |
| tags=["Environment Info"], |
| summary="Get action, observation, and state schemas", |
| description=""" |
| Return the JSON schemas for the SafeSpace action, observation, and typed state models. |
| """, |
| ) |
| async def get_schemas() -> SchemaResponse: |
| """Serve typed schemas for public API consumers and documentation.""" |
| return _get_schema_payload() |
|
|
|
|
| @asynccontextmanager |
| async def _lifespan(_: object): |
| """Fail startup early if the benchmark corpus is missing or invalid.""" |
| validate_scenario_corpus() |
| validate_benchmark_manifest() |
| yield |
|
|
|
|
| app.router.lifespan_context = _lifespan |
|
|
|
|
| @app.exception_handler(SafeSpaceError) |
| async def _handle_safespace_error( |
| request: Request, |
| exc: SafeSpaceError, |
| ) -> JSONResponse: |
| """Return structured validation errors for reset-time failures.""" |
| del request |
| return JSONResponse( |
| status_code=exc.status_code, |
| content={"detail": exc.to_payload()}, |
| ) |
|
|
|
|
| def main(): |
| """ |
| Entry point for direct execution. |
| |
| This function enables running the server without Docker: |
| python -m content_moderation_env.server.app |
| python server/app.py |
| |
| Reads --host and --port from command line arguments if provided. |
| """ |
| import argparse |
| import uvicorn |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument("--host", type=str, default="0.0.0.0") |
| parser.add_argument("--port", type=int, default=8000) |
| args, _ = parser.parse_known_args() |
|
|
| uvicorn.run(app, host=args.host, port=args.port) |
|
|
|
|
| |
| if __name__ == "__main__": |
| main() |
|
|