Spaces:
Running
Running
| """Load parameters extracted from exploratory data analysis. | |
| This module reads all parameter files generated by the EDA pipeline and makes | |
| them available to the scheduler. | |
| """ | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import pandas as pd | |
| from src.data.config import get_latest_params_dir | |
| class ParameterLoader: | |
| """Loads and manages parameters from EDA outputs. | |
| Performance notes: | |
| - Builds in-memory lookup caches to avoid repeated DataFrame filtering. | |
| """ | |
| def __init__(self, params_dir: Optional[Path] = None): | |
| """Initialize parameter loader. | |
| Args: | |
| params_dir: Directory containing parameter files. If None, uses latest. | |
| """ | |
| self.params_dir = params_dir or get_latest_params_dir() | |
| # Cached parameters | |
| self._transition_probs: Optional[pd.DataFrame] = None | |
| self._stage_duration: Optional[pd.DataFrame] = None | |
| self._court_capacity: Optional[Dict] = None | |
| self._adjournment_proxies: Optional[pd.DataFrame] = None | |
| self._case_type_summary: Optional[pd.DataFrame] = None | |
| self._transition_entropy: Optional[pd.DataFrame] = None | |
| # caches | |
| self._duration_map: Optional[Dict[str, Dict[str, float]]] = ( | |
| None # stage -> {"median": x, "p90": y} | |
| ) | |
| self._transitions_map: Optional[Dict[str, List[tuple]]] = ( | |
| None # stage_from -> [(stage_to, cum_p), ...] | |
| ) | |
| self._adj_map: Optional[Dict[str, Dict[str, float]]] = ( | |
| None # stage -> {case_type: p_adj} | |
| ) | |
| def transition_probs(self) -> pd.DataFrame: | |
| """Stage transition probabilities. | |
| Returns: | |
| DataFrame with columns: STAGE_FROM, STAGE_TO, N, row_n, p | |
| """ | |
| if self._transition_probs is None: | |
| file_path = self.params_dir / "stage_transition_probs.csv" | |
| self._transition_probs = pd.read_csv(file_path) | |
| return self._transition_probs | |
| def get_transition_prob(self, stage_from: str, stage_to: str) -> float: | |
| """Get probability of transitioning from one stage to another. | |
| Args: | |
| stage_from: Current stage | |
| stage_to: Next stage | |
| Returns: | |
| Transition probability (0-1) | |
| """ | |
| df = self.transition_probs | |
| match = df[(df["STAGE_FROM"] == stage_from) & (df["STAGE_TO"] == stage_to)] | |
| if len(match) == 0: | |
| return 0.0 | |
| return float(match.iloc[0]["p"]) | |
| def _build_transitions_map(self) -> None: | |
| if self._transitions_map is not None: | |
| return | |
| df = self.transition_probs | |
| self._transitions_map = {} | |
| # group by STAGE_FROM, build cumulative probs for fast sampling | |
| for st_from, group in df.groupby("STAGE_FROM"): | |
| cum = 0.0 | |
| lst = [] | |
| for _, row in group.sort_values("p").iterrows(): | |
| cum += float(row["p"]) | |
| lst.append((str(row["STAGE_TO"]), cum)) | |
| # ensure last cum is 1.0 to guard against rounding | |
| if lst: | |
| to_last, _ = lst[-1] | |
| lst[-1] = (to_last, 1.0) | |
| self._transitions_map[str(st_from)] = lst | |
| def get_stage_transitions(self, stage_from: str) -> pd.DataFrame: | |
| """Get all possible transitions from a given stage. | |
| Args: | |
| stage_from: Current stage | |
| Returns: | |
| DataFrame with STAGE_TO and p columns | |
| """ | |
| df = self.transition_probs | |
| return df[df["STAGE_FROM"] == stage_from][["STAGE_TO", "p"]].reset_index( | |
| drop=True | |
| ) | |
| def get_stage_transitions_fast(self, stage_from: str) -> List[tuple]: | |
| """Fast lookup: returns list of (stage_to, cum_p).""" | |
| self._build_transitions_map() | |
| if not self._transitions_map: | |
| return [] | |
| return self._transitions_map.get(stage_from, []) | |
| def stage_duration(self) -> pd.DataFrame: | |
| """Stage duration statistics. | |
| Returns: | |
| DataFrame with columns: STAGE, RUN_MEDIAN_DAYS, RUN_P90_DAYS, | |
| HEARINGS_PER_RUN_MED, N_RUNS | |
| """ | |
| if self._stage_duration is None: | |
| file_path = self.params_dir / "stage_duration.csv" | |
| self._stage_duration = pd.read_csv(file_path) | |
| return self._stage_duration | |
| def _build_duration_map(self) -> None: | |
| if self._duration_map is not None: | |
| return | |
| df = self.stage_duration | |
| self._duration_map = {} | |
| for _, row in df.iterrows(): | |
| st = str(row["STAGE"]) | |
| self._duration_map.setdefault(st, {}) | |
| self._duration_map[st]["median"] = float(row["RUN_MEDIAN_DAYS"]) | |
| self._duration_map[st]["p90"] = float(row["RUN_P90_DAYS"]) | |
| def get_stage_duration(self, stage: str, percentile: str = "median") -> float: | |
| """Get typical duration for a stage. | |
| Args: | |
| stage: Stage name | |
| percentile: 'median' or 'p90' | |
| Returns: | |
| Duration in days | |
| """ | |
| self._build_duration_map() | |
| if not self._duration_map or stage not in self._duration_map: | |
| return 30.0 | |
| p = "median" if percentile == "median" else "p90" | |
| return float(self._duration_map[stage].get(p, 30.0)) | |
| def court_capacity(self) -> Dict: | |
| """Court capacity metrics. | |
| Returns: | |
| Dict with keys: slots_median_global, slots_p90_global | |
| """ | |
| if self._court_capacity is None: | |
| file_path = self.params_dir / "court_capacity_global.json" | |
| with open(file_path, "r") as f: | |
| self._court_capacity = json.load(f) | |
| return self._court_capacity | |
| def daily_capacity_median(self) -> int: | |
| """Median daily capacity per courtroom.""" | |
| return int(self.court_capacity["slots_median_global"]) | |
| def daily_capacity_p90(self) -> int: | |
| """90th percentile daily capacity per courtroom.""" | |
| return int(self.court_capacity["slots_p90_global"]) | |
| def adjournment_proxies(self) -> pd.DataFrame: | |
| """Adjournment probabilities by stage and case type. | |
| Returns: | |
| DataFrame with columns: Remappedstages, casetype, | |
| p_adjourn_proxy, p_not_reached_proxy, n | |
| """ | |
| if self._adjournment_proxies is None: | |
| file_path = self.params_dir / "adjournment_proxies.csv" | |
| self._adjournment_proxies = pd.read_csv(file_path) | |
| return self._adjournment_proxies | |
| def _build_adj_map(self) -> None: | |
| if self._adj_map is not None: | |
| return | |
| df = self.adjournment_proxies | |
| self._adj_map = {} | |
| for _, row in df.iterrows(): | |
| st = str(row["Remappedstages"]) | |
| ct = str(row["casetype"]) | |
| p = float(row["p_adjourn_proxy"]) | |
| self._adj_map.setdefault(st, {})[ct] = p | |
| def get_adjournment_prob(self, stage: str, case_type: str) -> float: | |
| """Get probability of adjournment for given stage and case type. | |
| Args: | |
| stage: Stage name | |
| case_type: Case type (e.g., 'RSA', 'CRP') | |
| Returns: | |
| Adjournment probability (0-1) | |
| """ | |
| self._build_adj_map() | |
| if not self._adj_map: | |
| return 0.4 | |
| if stage in self._adj_map and case_type in self._adj_map[stage]: | |
| return float(self._adj_map[stage][case_type]) | |
| # fallback: average across types for this stage | |
| if stage in self._adj_map and self._adj_map[stage]: | |
| vals = list(self._adj_map[stage].values()) | |
| return float(sum(vals) / len(vals)) | |
| return 0.4 | |
| def case_type_summary(self) -> pd.DataFrame: | |
| """Summary statistics by case type. | |
| Returns: | |
| DataFrame with columns: CASE_TYPE, n_cases, disp_median, | |
| disp_p90, hear_median, gap_median | |
| """ | |
| if self._case_type_summary is None: | |
| file_path = self.params_dir / "case_type_summary.csv" | |
| self._case_type_summary = pd.read_csv(file_path) | |
| return self._case_type_summary | |
| def get_case_type_stats(self, case_type: str) -> Dict: | |
| """Get statistics for a specific case type. | |
| Args: | |
| case_type: Case type (e.g., 'RSA', 'CRP') | |
| Returns: | |
| Dict with disp_median, disp_p90, hear_median, gap_median | |
| """ | |
| df = self.case_type_summary | |
| match = df[df["CASE_TYPE"] == case_type] | |
| if len(match) == 0: | |
| raise ValueError(f"Unknown case type: {case_type}") | |
| return match.iloc[0].to_dict() | |
| def transition_entropy(self) -> pd.DataFrame: | |
| """Stage transition entropy (predictability metric). | |
| Returns: | |
| DataFrame with columns: STAGE_FROM, entropy | |
| """ | |
| if self._transition_entropy is None: | |
| file_path = self.params_dir / "stage_transition_entropy.csv" | |
| self._transition_entropy = pd.read_csv(file_path) | |
| return self._transition_entropy | |
| def get_stage_predictability(self, stage: str) -> float: | |
| """Get predictability of transitions from a stage (inverse of entropy). | |
| Args: | |
| stage: Stage name | |
| Returns: | |
| Predictability score (0-1, higher = more predictable) | |
| """ | |
| df = self.transition_entropy | |
| match = df[df["STAGE_FROM"] == stage] | |
| if len(match) == 0: | |
| return 0.5 # Default: medium predictability | |
| entropy = float(match.iloc[0]["entropy"]) | |
| # Convert entropy to predictability (lower entropy = higher predictability) | |
| # Max entropy ~1.4, so normalize | |
| predictability = max(0.0, 1.0 - (entropy / 1.5)) | |
| return predictability | |
| def get_stage_stationary_distribution(self) -> Dict[str, float]: | |
| """Approximate stationary distribution over stages from transition matrix. | |
| Returns stage -> probability summing to 1.0. | |
| """ | |
| df = self.transition_probs.copy() | |
| # drop nulls and ensure strings | |
| df = df[df["STAGE_FROM"].notna() & df["STAGE_TO"].notna()] | |
| df["STAGE_FROM"] = df["STAGE_FROM"].astype(str) | |
| df["STAGE_TO"] = df["STAGE_TO"].astype(str) | |
| stages = sorted(set(df["STAGE_FROM"]).union(set(df["STAGE_TO"]))) | |
| idx = {s: i for i, s in enumerate(stages)} | |
| n = len(stages) | |
| # build dense row-stochastic matrix | |
| P = [[0.0] * n for _ in range(n)] | |
| for _, row in df.iterrows(): | |
| i = idx[str(row["STAGE_FROM"])] | |
| j = idx[str(row["STAGE_TO"])] | |
| P[i][j] += float(row["p"]) | |
| # ensure rows sum to 1 by topping up self-loop | |
| for i in range(n): | |
| s = sum(P[i]) | |
| if s < 0.999: | |
| P[i][i] += 1.0 - s | |
| elif s > 1.001: | |
| # normalize if slightly over | |
| P[i] = [v / s for v in P[i]] | |
| # power iteration | |
| pi = [1.0 / n] * n | |
| for _ in range(200): | |
| new = [0.0] * n | |
| for j in range(n): | |
| acc = 0.0 | |
| for i in range(n): | |
| acc += pi[i] * P[i][j] | |
| new[j] = acc | |
| # normalize | |
| z = sum(new) | |
| if z == 0: | |
| break | |
| new = [v / z for v in new] | |
| # check convergence | |
| if sum(abs(new[k] - pi[k]) for k in range(n)) < 1e-9: | |
| pi = new | |
| break | |
| pi = new | |
| return {stages[i]: pi[i] for i in range(n)} | |
| def __repr__(self) -> str: | |
| return f"ParameterLoader(params_dir={self.params_dir})" | |
| # Convenience function for quick access | |
| def load_parameters(params_dir: Optional[Path] = None) -> ParameterLoader: | |
| """Load parameters from EDA outputs. | |
| Args: | |
| params_dir: Directory containing parameter files. If None, uses latest. | |
| Returns: | |
| ParameterLoader instance | |
| """ | |
| return ParameterLoader(params_dir) | |