odse / core /tasks /base_task.py
simeetnayan's picture
Upload folder using huggingface_hub
fede53c verified
"""Abstract base class for all ODSE tasks.
Every concrete task (cleaning, feature engineering, ...) inherits from
``BaseTask`` and implements the abstract hooks. The base class provides:
* ``setup()`` - initialises / resets the state.
* ``execute()`` =. validates + dispatches an action, computes reward.
* ``build_observation()`` - constructs the ``Observation`` pydantic model.
* ``calculate_reward()`` - sklearn proxy model accuracy (5-fold CV).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Set
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from ..data.data_manager import DataState
from ..data.datasets import DatasetConfig
from ..models import (
Action,
ColumnInfo,
Difficulty,
Observation,
StepResult,
SubmitAction,
TaskType,
)
class BaseTask(ABC):
"""Base class that every ODSE task must inherit from.
Subclass contract
-----------------
* ``TASK_TYPE``: the ``TaskType`` enum member.
* ``SUPPORTED_ACTIONS``: set of action_types literal strings.
* ``apply_action()``: mutate ``self.data_state`` for a given action.
* ``calculate_reward()``: scaler reward for one step.
* ``is_done()``: whether the episode should terminate.
* ``grade()``: score the final state (dict with ``"score"``).
* ``get_goal_description()``: human readable goal string
"""
# -- subclasses must set these class-level attributes -------------------
TASK_TYPE: TaskType
SUPPORTED_ACTIONS: Set[str]
#Default per-difficulty step limits (overridable per task)
MAX_STEPS : Dict[Difficulty, int] = {
Difficulty.EASY: 15,
Difficulty.MEDIUM: 30,
Difficulty.HARD: 45
}
def __init__(
self,
dataset_config: DatasetConfig,
difficulty: Difficulty,
seed: int = 42,
) -> None:
self.dataset_config = dataset_config
self.difficulty = difficulty
self.seed = seed
self.max_steps: int = self.MAX_STEPS.get(difficulty, 25)
#State - populated on setup()
self.data_state: DataState | None = None
self.step_count: int = 0
self._initial_accuracy: float = 0.0
self._previous_accuracy: float = 0.0
# Lifecycle
def setup(self) -> Observation:
"""Initialize / reset the task. Called by ``ODSEnvironment.reset()``."""
self.data_state = DataState(self.dataset_config.df, name='initial')
self.step_count = 0
self._initial_accuracy = self.calculate_accuracy()
self._previous_accuracy = self._initial_accuracy
return self.build_observation()
# Action Dispatch
def execute(self, action: Action) -> StepResult:
"""Validate, apply, *action*, compute reward, and return ``StepResult``."""
assert self.data_state is not None, "Call setup() before executing actions"
self.step_count += 1
# ----- Submit terminates immediately ------
if isinstance(action, SubmitAction):
obs = self.build_observation()
return StepResult(
observation=obs,
reward=0.0,
done=True,
info={"reason": "Submit"}
)
# ----- Validate action type for this task -----
if action.action_type not in self.SUPPORTED_ACTIONS:
obs = self.build_observation()
return StepResult(
observation=obs,
reward=-0.1, # Penalty for invalid action
done=False,
info={
"error": (
f"Action `{action.action_type}` not supported "
f"by {self.TASK_TYPE.value}"
)
}
)
#can be optimized with prev acc
old_accuracy = self.calculate_accuracy()
# ---- Delegate to the concreate task ----------
self.apply_action(action)
new_accuracy = self.calculate_accuracy()
reward = self.calculate_reward(old_accuracy, new_accuracy, action)
self._previous_accuracy = new_accuracy
obs = self.build_observation()
done = self.is_done()
return StepResult(
observation=obs,
reward=reward,
done=done,
info={
"old_accuracy": old_accuracy,
"new_accuracy": new_accuracy,
"accuracy_delta": new_accuracy - old_accuracy
}
)
# Abstract interface - subclasses must implement these methods
@abstractmethod
def apply_action(self, action: Action) -> None:
"""Mutate ``self.data_state`` according to *action*"""
pass
@abstractmethod
def calculate_reward(self, old_accuracy: float, new_accuracy: float, action: Action) -> float:
"""Return the scalar reward for a single step"""
pass
@abstractmethod
def is_done(self) -> bool:
"""REturn ``True`` when the epiisode should end (besides submit)."""
pass
@abstractmethod
def grade(self) -> Dict[str, Any]:
"""Score the final state. Must return dict containiing ``"score"``."""
pass
@abstractmethod
def get_goal_description(self) -> str:
"""Return a human-readable string describing the agent's goal."""
pass
# Observation Builder
def build_observation(self) -> Observation:
"""Construct an ``Observation`` from the current ``DataState``."""
ds = self.data_state
df = ds.df
columns: List[ColumnInfo] = []
total_nulls = 0
for col in df.columns:
nc = int(df[col].isnull().sum())
total_nulls += nc
columns.append(
ColumnInfo(
name=col,
dtype=str(df[col].dtype),
null_count=nc,
null_percentage=(
round(nc / len(df) * 100, 2) if len(df) > 0 else 0.0
),
unique_count=int(df[col].nunique()),
is_numeric=pd.api.types.is_numeric_dtype(df[col]),
)
)
return Observation(
columns=columns,
sample_head=df.head().to_dict(orient="list"),
shape=tuple(df.shape),
current_accuracy=self.calculate_accuracy(),
step_count=self.step_count,
nulls_remaining=total_nulls,
task_type=self.TASK_TYPE.value,
difficulty=self.difficulty.value,
goal_description=self.get_goal_description(),
available_actions=sorted(self.SUPPORTED_ACTIONS | {"submit"})
)
# =====================================================================
# Proxy model accuracy (shared by all tasks)
# =====================================================================
def calculate_accuracy(self) -> float:
"""5-fold CV accuracy using a LogisticRegression proxy model.
Only complete (non-null) rows are used. Returns 0.0 when the
dataset is too small or the model fails.
"""
df = self.data_state.df.dropna()
target = self.dataset_config.target_column
features = [
c for c in self.dataset_config.feature_columns if c in df.columns
]
if len(df) < 10 or target not in df.columns or not features:
return 0.0
try:
X = df[features].copy()
y = df[target]
# Encode categorical / string columns
for col in X.columns:
if not pd.api.types.is_numeric_dtype(X[col]):
le = LabelEncoder()
X[col] = le.fit_transform(X[col].astype(str))
X_arr = X.values.astype(float)
n_folds = min(5, len(df))
model = LogisticRegression(
max_iter=1000,
random_state=self.seed,
solver="lbfgs",
)
scores = cross_val_score(
model, X_arr, y, cv=n_folds, scoring="accuracy",
)
return float(np.mean(scores))
except Exception:
return 0.0