msrishav's picture
Add inference code, config, and technical report
e68eb1c verified
Raw
History Blame Contribute Delete
4.26 kB
"""Official-style row-by-row evaluation without sequence-boundary leakage."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pandas as pd
from competition_package.utils import DataPoint
from src.data.protocol import get_feature_columns, validate_wunder_dataframe
from src.utils.metrics import compute_r2_per_feature, compute_r2_score
@dataclass(frozen=True)
class StepwiseScoreResult:
mean_r2: float
r2_per_feature: dict[str, float]
n_predictions_scored: int
n_predictions_requested: int
n_predictions_dropped_final_step: int
feature_columns: list[str]
class StepwiseScorer:
"""Replay held-out sequences through the competition `predict` API."""
def __init__(self, dataframe: pd.DataFrame):
self.dataset = dataframe.sort_values(["seq_ix", "step_in_seq"]).reset_index(drop=True)
self.feature_columns = validate_wunder_dataframe(self.dataset)
self.dim = len(self.feature_columns)
@classmethod
def from_parquet(cls, path: str, seq_ids: Optional[list[int]] = None) -> "StepwiseScorer":
df = pd.read_parquet(path)
if seq_ids is not None:
df = df[df["seq_ix"].isin(set(seq_ids))].copy()
return cls(df)
def score(self, model) -> StepwiseScoreResult:
predictions = []
targets = []
pending_prediction = None
pending_seq_ix = None
requested = 0
dropped_final = 0
for _, seq_df in self.dataset.groupby("seq_ix", sort=True):
pending_prediction = None
pending_seq_ix = None
seq_ix_values = seq_df["seq_ix"].to_numpy()
step_values = seq_df["step_in_seq"].to_numpy()
need_values = seq_df["need_prediction"].to_numpy()
state_values = seq_df[self.feature_columns].to_numpy(dtype=np.float32)
for pos in range(len(seq_df)):
seq_ix = int(seq_ix_values[pos])
step_in_seq = int(step_values[pos])
need_prediction = bool(need_values[pos])
state = state_values[pos]
if pending_prediction is not None:
if pending_seq_ix != seq_ix:
raise AssertionError("Internal scorer leaked across sequences")
predictions.append(pending_prediction)
targets.append(state)
data_point = DataPoint(seq_ix, step_in_seq, need_prediction, state)
next_prediction = model.predict(data_point)
self._check_prediction(data_point, next_prediction)
if need_prediction:
requested += 1
pending_prediction = next_prediction
pending_seq_ix = seq_ix
if pending_prediction is not None:
dropped_final += 1
if not predictions:
raise ValueError("No predictions were scored")
y_pred = np.asarray(predictions, dtype=np.float64)
y_true = np.asarray(targets, dtype=np.float64)
return StepwiseScoreResult(
mean_r2=compute_r2_score(y_true, y_pred),
r2_per_feature=compute_r2_per_feature(y_true, y_pred, self.feature_columns),
n_predictions_scored=len(predictions),
n_predictions_requested=requested,
n_predictions_dropped_final_step=dropped_final,
feature_columns=self.feature_columns,
)
def _check_prediction(self, data_point: DataPoint, prediction: Optional[np.ndarray]) -> None:
if not data_point.need_prediction:
if prediction is not None:
raise ValueError(f"Prediction is not needed for {data_point}")
return
if prediction is None:
raise ValueError(f"Prediction is required for {data_point}")
if not isinstance(prediction, np.ndarray):
raise ValueError(f"Prediction must be np.ndarray, got {type(prediction)}")
if prediction.shape != (self.dim,):
raise ValueError(f"Prediction has wrong shape: {prediction.shape} != {(self.dim,)}")
if not np.isfinite(prediction).all():
raise ValueError("Prediction contains NaN or Inf")