"""ODSE Sandbox Environment - the main entry-point for RL agents. Instead of a fixed DSL, agents write and execute Python code in a persistent, sandboxed namespace. The environment provides: * Pre-loaded data (``train_df``, ``val_features``, ``test_features``) * A sandboxed executor with whitelisted imports and time limits * An ``evaluate(predictions)`` function for validation feedback * Dense heuristic rewards per step + a final test-set score on submit API --- ``reset() -> Observation`` Initialize the episode and return the first observation. ``step(action) -> StepResult`` Execute a ``RunCodeAction`` or ``SubmitAction``. ``state() -> Observation`` Read current observation without advancing the episode. """ from __future__ import annotations from typing import Any, Dict, Optional import numpy as np import pandas as pd from pathlib import Path from .data.data_manager import DataSplit, create_data_split from .data.datasets import DatasetConfig, load_dataset from .evaluator import compute_full_report, compute_metric from .executor import SandboxExecutor from .docker_executor import DockerSandboxExecutor from .models import ( Action, ColumnSchema, DatasetInfo, Difficulty, ExecutionStatus, Observation, ProblemType, RunCodeAction, StepResult, SubmitAction, VariableInfo, ) from .reward import compute_step_reward, compute_submit_reward def _inside_docker() -> bool: """Detect if we're running inside a Docker container.""" import os inside_docker = os.environ.get("AM_I_IN_A_DOCKER_CONTAINER", "false").lower() == "true" return inside_docker def _ensure_sandbox_image(image: str = "odse-sandbox:latest") -> None: """Builds the sandbox image if it doesn't exist""" import logging import subprocess logger = logging.getLogger(__name__) result = subprocess.run( ["docker", "images", "-q", image], capture_output=True, text=True, timeout=20 ) if result.stdout.strip(): logger.info(f"Docker image '{image}' already exists.") return project_root = Path(__file__).resolve().parent.parent dockerfile = project_root / "core" / "Dockerfile.sandbox" if not dockerfile.exists(): raise FileNotFoundError( f"Cannot auto-build sandbox image: {dockerfile} not found" ) logger.info("Building sandbox image '%s' (first run only)...", image) subprocess.run( ["docker", "build", "-f", str(dockerfile), "-t", image, str(project_root)], check=True, timeout=300 ) logger.info("Sandbox image '%s' built successfully", image) class EvaluateFunctionWrapper: """Pickleable wrapper for the evaluate function. This allows the evaluate function to be serialized and sent to the Docker container where it can be reconstructed. """ def __init__(self, val_labels: pd.Series, problem_type: ProblemType, metric: str): self.val_labels = val_labels.values self.problem_type = problem_type self.metric = metric self.best_score = None def __call__(self, predictions) -> Dict[str, Any]: """Score predictions against validation labels.""" preds = np.asarray(predictions) if len(preds) != len(self.val_labels): return { "error": ( f"Expected {len(self.val_labels)} predictions " f"(val_features length), got {len(preds)}" ) } try: primary = compute_metric( self.val_labels, preds, self.problem_type, self.metric, ) report = compute_full_report( self.val_labels, preds, self.problem_type, ) report["primary_metric"] = self.metric report["primary_score"] = round(primary, 4) if self.best_score is None or primary > self.best_score: self.best_score = primary return report except Exception as exc: return {"error": str(exc)} class ODSEnvironment: """Open Data Science Sandbox Environment. Agents interact by writing and executing Python code. The environment provides train/validation data, a sandboxed executor, and evaluates predictions on hidden holdout data. Parameters ---------- dataset : str Dataset name (e.g. ``"titanic"``, ``"iris"``, ``"mpg"``). difficulty : str | Difficulty ``"easy"``, ``"medium"``, or ``"hard"``. problem_type : str | ProblemType | None ``"classification"`` or ``"regression"``. Auto-detected if *None*. metric : str | None Primary metric. Defaults to ``"accuracy"`` (classification) or ``"r2"`` (regression). max_steps : int | None Maximum ``RunCode`` executions per episode. timeout_seconds : float Per-execution wall-clock time limit. seed : int RNG seed for reproducibility. Example ------- >>> env = ODSEnvironment(dataset="titanic", difficulty="easy") >>> obs = env.reset() >>> result = env.step(RunCodeAction(code="print(train_df.shape)")) >>> result = env.step(RunCodeAction(code=\"\"\" ... from sklearn.linear_model import LogisticRegression ... from sklearn.preprocessing import LabelEncoder ... X = train_df.drop('survived', axis=1).select_dtypes('number').fillna(0) ... y = train_df['survived'] ... model = LogisticRegression(max_iter=1000).fit(X, y) ... predictions = model.predict(test_features.select_dtypes('number').fillna(0)) ... \"\"\")) >>> result = env.step(SubmitAction()) >>> print(result.info["test_report"]) """ MAX_STEPS: Dict[Difficulty, int] = { Difficulty.EASY: 20, Difficulty.MEDIUM: 40, Difficulty.HARD: 60, } def __init__( self, dataset: str = "titanic", difficulty: Difficulty | str = Difficulty.EASY, problem_type: ProblemType | str | None = None, metric: str | None = None, max_steps: int | None = None, timeout_seconds: float = 30.0, seed: int = 42, ) -> None: if isinstance(difficulty, str): difficulty = Difficulty(difficulty) self.dataset_name = dataset self.difficulty = difficulty self.seed = seed self.timeout_seconds = timeout_seconds # Load dataset config self.dataset_config: DatasetConfig = load_dataset(dataset, difficulty) # Determine problem type if problem_type is not None: self.problem_type = ( ProblemType(problem_type) if isinstance(problem_type, str) else problem_type ) else: self.problem_type = self.dataset_config.problem_type # Determine metric if metric is not None: self.metric = metric else: self.metric = ( "accuracy" if self.problem_type == ProblemType.CLASSIFICATION else "r2" ) # Max steps self.max_steps = max_steps or self.MAX_STEPS.get(difficulty, 30) # Internal state (populated on reset) self._data_split: Optional[DataSplit] = None self._executor: Optional[DockerSandboxExecutor | SandboxExecutor] = None self._step_count: int = 0 self._done: bool = False self._best_val_score: Optional[float] = None self._prev_val_score: Optional[float] = None self._had_predictions: bool = False # ======================================================================== # Public API # ======================================================================== def reset(self) -> Observation: """Reset the environment and return the initial observation.""" np.random.seed(self.seed) # Create train / val / test split self._data_split = create_data_split( self.dataset_config, seed=self.seed, ) # Set up sandbox executor #If we're already running inside Docker, #use the local sandbox executor. Otherwise, use the Docker-based one for isolation. if _inside_docker(): print('Using a sandbox executor') self._executor = SandboxExecutor() else: print('Using a docker based sandbox executor') _ensure_sandbox_image() self._executor = DockerSandboxExecutor( timeout_seconds=self.timeout_seconds, ) self._executor.setup_namespace( train_df=self._data_split.train_df, val_features=self._data_split.val_features, test_features=self._data_split.test_features, target_column=self.dataset_config.target_column, evaluate_fn=self._make_evaluate_fn(), ) # Reset episode counters self._step_count = 0 self._done = False self._best_val_score = None self._prev_val_score = None self._had_predictions = False return self._build_observation() def step(self, action: Action) -> StepResult: """Execute *action* and return a :class:`StepResult`.""" self._ensure_ready() if self._done: raise RuntimeError( "Episode is done. Call reset() to start a new one." ) self._step_count += 1 if isinstance(action, SubmitAction): return self._handle_submit() if isinstance(action, RunCodeAction): return self._handle_run_code(action) raise ValueError(f"Unknown action type: {type(action)}") def state(self) -> Observation: """Return current observation without advancing the episode.""" self._ensure_ready() return self._build_observation() # ======================================================================== # Submit handler # ======================================================================== def _handle_submit(self) -> StepResult: """Score predictions on hidden test set and terminate.""" self._done = True predictions = self._executor.get_predictions() test_score: Optional[float] = None test_report: Dict[str, Any] = {} if ( predictions is not None and len(predictions) == len(self._data_split.test_labels) ): try: test_score = compute_metric( self._data_split.test_labels.values, predictions, self.problem_type, self.metric, ) test_report = compute_full_report( self._data_split.test_labels.values, predictions, self.problem_type, ) except Exception as exc: test_report = {"error": str(exc)} reward = compute_submit_reward( test_score=test_score, best_validation_score=self._best_val_score, ) obs = self._build_observation( done=True, test_score=test_score, test_report=test_report, ) return StepResult( observation=obs, reward=reward, done=True, info={ "reason": "submit", "test_score": test_score, "test_report": test_report, "best_validation_score": self._best_val_score, } ) # ======================================================================== # RunCode handler # ======================================================================== def _handle_run_code(self, action: RunCodeAction) -> StepResult: """Execute code in the sandbox and compute the dense reward.""" had_preds = self._had_predictions prev_val = self._prev_val_score # Execute the agent's code result = self._executor.execute(action.code) code_ok = result.status == ExecutionStatus.SUCCESS # Check for predictions variable predictions = self._executor.get_predictions() has_preds = predictions is not None if has_preds: self._had_predictions = True # Auto-score against validation if predictions match val size curr_val_score: Optional[float] = None if ( has_preds and len(predictions) == len(self._data_split.val_labels) ): try: curr_val_score = compute_metric( self._data_split.val_labels.values, predictions, self.problem_type, self.metric, ) if ( self._best_val_score is None or curr_val_score > self._best_val_score ): self._best_val_score = curr_val_score except Exception: curr_val_score = None self._prev_val_score = curr_val_score # Dense reward reward = compute_step_reward( code_succeeded=code_ok, had_predictions_before=had_preds, has_predictions_now=has_preds, prev_validation_score=prev_val, curr_validation_score=curr_val_score, ) # Check step-budget termination done = self._step_count >= self.max_steps if done: self._done = True obs = self._build_observation( stdout=result.stdout, stderr=result.stderr, execution_status=result.status, execution_time_ms=result.execution_time_ms, validation_score=curr_val_score, done=done, test_score=None, test_report=None, ) return StepResult( observation=obs, reward=reward, done=done, info={ "execution_status": result.status.value, "validation_score": curr_val_score, "best_validation_score": self._best_val_score, }, ) # ======================================================================== # Observation builder # ======================================================================== def _build_observation( self, *, stdout: str = "", stderr: str = "", execution_status: Optional[ExecutionStatus] = None, execution_time_ms: float = 0.0, validation_score: Optional[float] = None, done: bool = False, test_score: Optional[float] = None, test_report: Optional[Dict[str, Any]] = None, ) -> Observation: return Observation( stdout=stdout, stderr=stderr, execution_status=execution_status, execution_time_ms=execution_time_ms, namespace_summary=self._executor.get_namespace_summary(), validation_score=validation_score, best_validation_score=self._best_val_score, step_count=self._step_count, max_steps=self.max_steps, dataset_info=self._build_dataset_info(), task_description=self._build_task_description(), done=done, test_score=test_score, test_report=test_report ) def _build_dataset_info(self) -> DatasetInfo: split = self._data_split train = split.train_df cfg = self.dataset_config columns = [] for col in train.columns: columns.append( ColumnSchema( name=col, dtype=str(train[col].dtype), null_count=int(train[col].isnull().sum()), is_numeric=pd.api.types.is_numeric_dtype(train[col]), unique_count=int(train[col].nunique()), sample_values=train[col].dropna().head(3).tolist(), ) ) info = DatasetInfo( train_shape=tuple(train.shape), val_shape=tuple(split.val_features.shape), test_shape=tuple(split.test_features.shape), target_column=cfg.target_column, problem_description=cfg.problem_description, problem_type=self.problem_type.value, metric=self.metric, columns=columns, ) # Target-specific metadata target_col = train[cfg.target_column] if self.problem_type == ProblemType.CLASSIFICATION: info.target_classes = sorted( target_col.dropna().unique().tolist(), key=str, ) else: info.target_stats = { "mean": round(float(target_col.mean()), 4), "std": round(float(target_col.std()), 4), "min": round(float(target_col.min()), 4), "max": round(float(target_col.max()), 4), } return info def _build_task_description(self) -> str: pt = self.problem_type.value.upper() tc = self.dataset_config.target_column return ( f"{pt} TASK: Build a model to predict '{tc}' using the " f"provided training data.\n\n" f"Problem definition: {self.dataset_config.problem_description}\n\n" f"Your code runs in a persistent sandbox - variables survive " f"across RunCode steps.\n" f"Pre-loaded variables: train_df, val_features, test_features, " f"target_column.\n" f"Use evaluate(predictions) to check your score on hidden " f"validation labels (pass val-sized predictions).\n" f"When ready, set `predictions` to your test-set predictions " f"(matching test_features length) and call Submit.\n\n" f"Primary metric: {self.metric} | " f"Max steps: {self.max_steps} | " f"Dataset: {self.dataset_name} ({self.difficulty.value})" ) # ======================================================================== # evaluate() factory # ======================================================================== def _make_evaluate_fn(self): """Create the ``evaluate()`` wrapper injected into the namespace. Returns a pickleable EvaluateFunctionWrapper instance that scores predictions against the hidden validation labels. """ return EvaluateFunctionWrapper( val_labels=self._data_split.val_labels, problem_type=self.problem_type, metric=self.metric, ) # ======================================================================== # Internals # ======================================================================== def _ensure_ready(self) -> None: if self._executor is None or self._data_split is None: raise RuntimeError( "Environment not initialised - call reset() first." )