Spaces:
Running
Running
| """Abstract base class for all ODSE tasks. | |
| Every concrete task (cleaning, feature engineering, ...) inherits from | |
| ``BaseTask`` and implements the abstract hooks. The base class provides: | |
| * ``setup()`` - initialises / resets the state. | |
| * ``execute()`` =. validates + dispatches an action, computes reward. | |
| * ``build_observation()`` - constructs the ``Observation`` pydantic model. | |
| * ``calculate_reward()`` - sklearn proxy model accuracy (5-fold CV). | |
| """ | |
| from __future__ import annotations | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, List, Set | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.model_selection import cross_val_score | |
| from sklearn.preprocessing import LabelEncoder | |
| from ..data.data_manager import DataState | |
| from ..data.datasets import DatasetConfig | |
| from ..models import ( | |
| Action, | |
| ColumnInfo, | |
| Difficulty, | |
| Observation, | |
| StepResult, | |
| SubmitAction, | |
| TaskType, | |
| ) | |
| class BaseTask(ABC): | |
| """Base class that every ODSE task must inherit from. | |
| Subclass contract | |
| ----------------- | |
| * ``TASK_TYPE``: the ``TaskType`` enum member. | |
| * ``SUPPORTED_ACTIONS``: set of action_types literal strings. | |
| * ``apply_action()``: mutate ``self.data_state`` for a given action. | |
| * ``calculate_reward()``: scaler reward for one step. | |
| * ``is_done()``: whether the episode should terminate. | |
| * ``grade()``: score the final state (dict with ``"score"``). | |
| * ``get_goal_description()``: human readable goal string | |
| """ | |
| # -- subclasses must set these class-level attributes ------------------- | |
| TASK_TYPE: TaskType | |
| SUPPORTED_ACTIONS: Set[str] | |
| #Default per-difficulty step limits (overridable per task) | |
| MAX_STEPS : Dict[Difficulty, int] = { | |
| Difficulty.EASY: 15, | |
| Difficulty.MEDIUM: 30, | |
| Difficulty.HARD: 45 | |
| } | |
| def __init__( | |
| self, | |
| dataset_config: DatasetConfig, | |
| difficulty: Difficulty, | |
| seed: int = 42, | |
| ) -> None: | |
| self.dataset_config = dataset_config | |
| self.difficulty = difficulty | |
| self.seed = seed | |
| self.max_steps: int = self.MAX_STEPS.get(difficulty, 25) | |
| #State - populated on setup() | |
| self.data_state: DataState | None = None | |
| self.step_count: int = 0 | |
| self._initial_accuracy: float = 0.0 | |
| self._previous_accuracy: float = 0.0 | |
| # Lifecycle | |
| def setup(self) -> Observation: | |
| """Initialize / reset the task. Called by ``ODSEnvironment.reset()``.""" | |
| self.data_state = DataState(self.dataset_config.df, name='initial') | |
| self.step_count = 0 | |
| self._initial_accuracy = self.calculate_accuracy() | |
| self._previous_accuracy = self._initial_accuracy | |
| return self.build_observation() | |
| # Action Dispatch | |
| def execute(self, action: Action) -> StepResult: | |
| """Validate, apply, *action*, compute reward, and return ``StepResult``.""" | |
| assert self.data_state is not None, "Call setup() before executing actions" | |
| self.step_count += 1 | |
| # ----- Submit terminates immediately ------ | |
| if isinstance(action, SubmitAction): | |
| obs = self.build_observation() | |
| return StepResult( | |
| observation=obs, | |
| reward=0.0, | |
| done=True, | |
| info={"reason": "Submit"} | |
| ) | |
| # ----- Validate action type for this task ----- | |
| if action.action_type not in self.SUPPORTED_ACTIONS: | |
| obs = self.build_observation() | |
| return StepResult( | |
| observation=obs, | |
| reward=-0.1, # Penalty for invalid action | |
| done=False, | |
| info={ | |
| "error": ( | |
| f"Action `{action.action_type}` not supported " | |
| f"by {self.TASK_TYPE.value}" | |
| ) | |
| } | |
| ) | |
| #can be optimized with prev acc | |
| old_accuracy = self.calculate_accuracy() | |
| # ---- Delegate to the concreate task ---------- | |
| self.apply_action(action) | |
| new_accuracy = self.calculate_accuracy() | |
| reward = self.calculate_reward(old_accuracy, new_accuracy, action) | |
| self._previous_accuracy = new_accuracy | |
| obs = self.build_observation() | |
| done = self.is_done() | |
| return StepResult( | |
| observation=obs, | |
| reward=reward, | |
| done=done, | |
| info={ | |
| "old_accuracy": old_accuracy, | |
| "new_accuracy": new_accuracy, | |
| "accuracy_delta": new_accuracy - old_accuracy | |
| } | |
| ) | |
| # Abstract interface - subclasses must implement these methods | |
| def apply_action(self, action: Action) -> None: | |
| """Mutate ``self.data_state`` according to *action*""" | |
| pass | |
| def calculate_reward(self, old_accuracy: float, new_accuracy: float, action: Action) -> float: | |
| """Return the scalar reward for a single step""" | |
| pass | |
| def is_done(self) -> bool: | |
| """REturn ``True`` when the epiisode should end (besides submit).""" | |
| pass | |
| def grade(self) -> Dict[str, Any]: | |
| """Score the final state. Must return dict containiing ``"score"``.""" | |
| pass | |
| def get_goal_description(self) -> str: | |
| """Return a human-readable string describing the agent's goal.""" | |
| pass | |
| # Observation Builder | |
| def build_observation(self) -> Observation: | |
| """Construct an ``Observation`` from the current ``DataState``.""" | |
| ds = self.data_state | |
| df = ds.df | |
| columns: List[ColumnInfo] = [] | |
| total_nulls = 0 | |
| for col in df.columns: | |
| nc = int(df[col].isnull().sum()) | |
| total_nulls += nc | |
| columns.append( | |
| ColumnInfo( | |
| name=col, | |
| dtype=str(df[col].dtype), | |
| null_count=nc, | |
| null_percentage=( | |
| round(nc / len(df) * 100, 2) if len(df) > 0 else 0.0 | |
| ), | |
| unique_count=int(df[col].nunique()), | |
| is_numeric=pd.api.types.is_numeric_dtype(df[col]), | |
| ) | |
| ) | |
| return Observation( | |
| columns=columns, | |
| sample_head=df.head().to_dict(orient="list"), | |
| shape=tuple(df.shape), | |
| current_accuracy=self.calculate_accuracy(), | |
| step_count=self.step_count, | |
| nulls_remaining=total_nulls, | |
| task_type=self.TASK_TYPE.value, | |
| difficulty=self.difficulty.value, | |
| goal_description=self.get_goal_description(), | |
| available_actions=sorted(self.SUPPORTED_ACTIONS | {"submit"}) | |
| ) | |
| # ===================================================================== | |
| # Proxy model accuracy (shared by all tasks) | |
| # ===================================================================== | |
| def calculate_accuracy(self) -> float: | |
| """5-fold CV accuracy using a LogisticRegression proxy model. | |
| Only complete (non-null) rows are used. Returns 0.0 when the | |
| dataset is too small or the model fails. | |
| """ | |
| df = self.data_state.df.dropna() | |
| target = self.dataset_config.target_column | |
| features = [ | |
| c for c in self.dataset_config.feature_columns if c in df.columns | |
| ] | |
| if len(df) < 10 or target not in df.columns or not features: | |
| return 0.0 | |
| try: | |
| X = df[features].copy() | |
| y = df[target] | |
| # Encode categorical / string columns | |
| for col in X.columns: | |
| if not pd.api.types.is_numeric_dtype(X[col]): | |
| le = LabelEncoder() | |
| X[col] = le.fit_transform(X[col].astype(str)) | |
| X_arr = X.values.astype(float) | |
| n_folds = min(5, len(df)) | |
| model = LogisticRegression( | |
| max_iter=1000, | |
| random_state=self.seed, | |
| solver="lbfgs", | |
| ) | |
| scores = cross_val_score( | |
| model, X_arr, y, cv=n_folds, scoring="accuracy", | |
| ) | |
| return float(np.mean(scores)) | |
| except Exception: | |
| return 0.0 |