"""Load parameters extracted from exploratory data analysis. This module reads all parameter files generated by the EDA pipeline and makes them available to the scheduler. """ import json from pathlib import Path from typing import Dict, List, Optional import pandas as pd from src.data.config import get_latest_params_dir class ParameterLoader: """Loads and manages parameters from EDA outputs. Performance notes: - Builds in-memory lookup caches to avoid repeated DataFrame filtering. """ def __init__(self, params_dir: Optional[Path] = None): """Initialize parameter loader. Args: params_dir: Directory containing parameter files. If None, uses latest. """ self.params_dir = params_dir or get_latest_params_dir() # Cached parameters self._transition_probs: Optional[pd.DataFrame] = None self._stage_duration: Optional[pd.DataFrame] = None self._court_capacity: Optional[Dict] = None self._adjournment_proxies: Optional[pd.DataFrame] = None self._case_type_summary: Optional[pd.DataFrame] = None self._transition_entropy: Optional[pd.DataFrame] = None # caches self._duration_map: Optional[Dict[str, Dict[str, float]]] = ( None # stage -> {"median": x, "p90": y} ) self._transitions_map: Optional[Dict[str, List[tuple]]] = ( None # stage_from -> [(stage_to, cum_p), ...] ) self._adj_map: Optional[Dict[str, Dict[str, float]]] = ( None # stage -> {case_type: p_adj} ) @property def transition_probs(self) -> pd.DataFrame: """Stage transition probabilities. Returns: DataFrame with columns: STAGE_FROM, STAGE_TO, N, row_n, p """ if self._transition_probs is None: file_path = self.params_dir / "stage_transition_probs.csv" self._transition_probs = pd.read_csv(file_path) return self._transition_probs def get_transition_prob(self, stage_from: str, stage_to: str) -> float: """Get probability of transitioning from one stage to another. Args: stage_from: Current stage stage_to: Next stage Returns: Transition probability (0-1) """ df = self.transition_probs match = df[(df["STAGE_FROM"] == stage_from) & (df["STAGE_TO"] == stage_to)] if len(match) == 0: return 0.0 return float(match.iloc[0]["p"]) def _build_transitions_map(self) -> None: if self._transitions_map is not None: return df = self.transition_probs self._transitions_map = {} # group by STAGE_FROM, build cumulative probs for fast sampling for st_from, group in df.groupby("STAGE_FROM"): cum = 0.0 lst = [] for _, row in group.sort_values("p").iterrows(): cum += float(row["p"]) lst.append((str(row["STAGE_TO"]), cum)) # ensure last cum is 1.0 to guard against rounding if lst: to_last, _ = lst[-1] lst[-1] = (to_last, 1.0) self._transitions_map[str(st_from)] = lst def get_stage_transitions(self, stage_from: str) -> pd.DataFrame: """Get all possible transitions from a given stage. Args: stage_from: Current stage Returns: DataFrame with STAGE_TO and p columns """ df = self.transition_probs return df[df["STAGE_FROM"] == stage_from][["STAGE_TO", "p"]].reset_index( drop=True ) def get_stage_transitions_fast(self, stage_from: str) -> List[tuple]: """Fast lookup: returns list of (stage_to, cum_p).""" self._build_transitions_map() if not self._transitions_map: return [] return self._transitions_map.get(stage_from, []) @property def stage_duration(self) -> pd.DataFrame: """Stage duration statistics. Returns: DataFrame with columns: STAGE, RUN_MEDIAN_DAYS, RUN_P90_DAYS, HEARINGS_PER_RUN_MED, N_RUNS """ if self._stage_duration is None: file_path = self.params_dir / "stage_duration.csv" self._stage_duration = pd.read_csv(file_path) return self._stage_duration def _build_duration_map(self) -> None: if self._duration_map is not None: return df = self.stage_duration self._duration_map = {} for _, row in df.iterrows(): st = str(row["STAGE"]) self._duration_map.setdefault(st, {}) self._duration_map[st]["median"] = float(row["RUN_MEDIAN_DAYS"]) self._duration_map[st]["p90"] = float(row["RUN_P90_DAYS"]) def get_stage_duration(self, stage: str, percentile: str = "median") -> float: """Get typical duration for a stage. Args: stage: Stage name percentile: 'median' or 'p90' Returns: Duration in days """ self._build_duration_map() if not self._duration_map or stage not in self._duration_map: return 30.0 p = "median" if percentile == "median" else "p90" return float(self._duration_map[stage].get(p, 30.0)) @property def court_capacity(self) -> Dict: """Court capacity metrics. Returns: Dict with keys: slots_median_global, slots_p90_global """ if self._court_capacity is None: file_path = self.params_dir / "court_capacity_global.json" with open(file_path, "r") as f: self._court_capacity = json.load(f) return self._court_capacity @property def daily_capacity_median(self) -> int: """Median daily capacity per courtroom.""" return int(self.court_capacity["slots_median_global"]) @property def daily_capacity_p90(self) -> int: """90th percentile daily capacity per courtroom.""" return int(self.court_capacity["slots_p90_global"]) @property def adjournment_proxies(self) -> pd.DataFrame: """Adjournment probabilities by stage and case type. Returns: DataFrame with columns: Remappedstages, casetype, p_adjourn_proxy, p_not_reached_proxy, n """ if self._adjournment_proxies is None: file_path = self.params_dir / "adjournment_proxies.csv" self._adjournment_proxies = pd.read_csv(file_path) return self._adjournment_proxies def _build_adj_map(self) -> None: if self._adj_map is not None: return df = self.adjournment_proxies self._adj_map = {} for _, row in df.iterrows(): st = str(row["Remappedstages"]) ct = str(row["casetype"]) p = float(row["p_adjourn_proxy"]) self._adj_map.setdefault(st, {})[ct] = p def get_adjournment_prob(self, stage: str, case_type: str) -> float: """Get probability of adjournment for given stage and case type. Args: stage: Stage name case_type: Case type (e.g., 'RSA', 'CRP') Returns: Adjournment probability (0-1) """ self._build_adj_map() if not self._adj_map: return 0.4 if stage in self._adj_map and case_type in self._adj_map[stage]: return float(self._adj_map[stage][case_type]) # fallback: average across types for this stage if stage in self._adj_map and self._adj_map[stage]: vals = list(self._adj_map[stage].values()) return float(sum(vals) / len(vals)) return 0.4 @property def case_type_summary(self) -> pd.DataFrame: """Summary statistics by case type. Returns: DataFrame with columns: CASE_TYPE, n_cases, disp_median, disp_p90, hear_median, gap_median """ if self._case_type_summary is None: file_path = self.params_dir / "case_type_summary.csv" self._case_type_summary = pd.read_csv(file_path) return self._case_type_summary def get_case_type_stats(self, case_type: str) -> Dict: """Get statistics for a specific case type. Args: case_type: Case type (e.g., 'RSA', 'CRP') Returns: Dict with disp_median, disp_p90, hear_median, gap_median """ df = self.case_type_summary match = df[df["CASE_TYPE"] == case_type] if len(match) == 0: raise ValueError(f"Unknown case type: {case_type}") return match.iloc[0].to_dict() @property def transition_entropy(self) -> pd.DataFrame: """Stage transition entropy (predictability metric). Returns: DataFrame with columns: STAGE_FROM, entropy """ if self._transition_entropy is None: file_path = self.params_dir / "stage_transition_entropy.csv" self._transition_entropy = pd.read_csv(file_path) return self._transition_entropy def get_stage_predictability(self, stage: str) -> float: """Get predictability of transitions from a stage (inverse of entropy). Args: stage: Stage name Returns: Predictability score (0-1, higher = more predictable) """ df = self.transition_entropy match = df[df["STAGE_FROM"] == stage] if len(match) == 0: return 0.5 # Default: medium predictability entropy = float(match.iloc[0]["entropy"]) # Convert entropy to predictability (lower entropy = higher predictability) # Max entropy ~1.4, so normalize predictability = max(0.0, 1.0 - (entropy / 1.5)) return predictability def get_stage_stationary_distribution(self) -> Dict[str, float]: """Approximate stationary distribution over stages from transition matrix. Returns stage -> probability summing to 1.0. """ df = self.transition_probs.copy() # drop nulls and ensure strings df = df[df["STAGE_FROM"].notna() & df["STAGE_TO"].notna()] df["STAGE_FROM"] = df["STAGE_FROM"].astype(str) df["STAGE_TO"] = df["STAGE_TO"].astype(str) stages = sorted(set(df["STAGE_FROM"]).union(set(df["STAGE_TO"]))) idx = {s: i for i, s in enumerate(stages)} n = len(stages) # build dense row-stochastic matrix P = [[0.0] * n for _ in range(n)] for _, row in df.iterrows(): i = idx[str(row["STAGE_FROM"])] j = idx[str(row["STAGE_TO"])] P[i][j] += float(row["p"]) # ensure rows sum to 1 by topping up self-loop for i in range(n): s = sum(P[i]) if s < 0.999: P[i][i] += 1.0 - s elif s > 1.001: # normalize if slightly over P[i] = [v / s for v in P[i]] # power iteration pi = [1.0 / n] * n for _ in range(200): new = [0.0] * n for j in range(n): acc = 0.0 for i in range(n): acc += pi[i] * P[i][j] new[j] = acc # normalize z = sum(new) if z == 0: break new = [v / z for v in new] # check convergence if sum(abs(new[k] - pi[k]) for k in range(n)) < 1e-9: pi = new break pi = new return {stages[i]: pi[i] for i in range(n)} def __repr__(self) -> str: return f"ParameterLoader(params_dir={self.params_dir})" # Convenience function for quick access def load_parameters(params_dir: Optional[Path] = None) -> ParameterLoader: """Load parameters from EDA outputs. Args: params_dir: Directory containing parameter files. If None, uses latest. Returns: ParameterLoader instance """ return ParameterLoader(params_dir)