hackathon_code4change / src /data /param_loader.py
RoyAalekh's picture
refactored project structure. renamed scheduler dir to src
6a28f91
"""Load parameters extracted from exploratory data analysis.
This module reads all parameter files generated by the EDA pipeline and makes
them available to the scheduler.
"""
import json
from pathlib import Path
from typing import Dict, List, Optional
import pandas as pd
from src.data.config import get_latest_params_dir
class ParameterLoader:
"""Loads and manages parameters from EDA outputs.
Performance notes:
- Builds in-memory lookup caches to avoid repeated DataFrame filtering.
"""
def __init__(self, params_dir: Optional[Path] = None):
"""Initialize parameter loader.
Args:
params_dir: Directory containing parameter files. If None, uses latest.
"""
self.params_dir = params_dir or get_latest_params_dir()
# Cached parameters
self._transition_probs: Optional[pd.DataFrame] = None
self._stage_duration: Optional[pd.DataFrame] = None
self._court_capacity: Optional[Dict] = None
self._adjournment_proxies: Optional[pd.DataFrame] = None
self._case_type_summary: Optional[pd.DataFrame] = None
self._transition_entropy: Optional[pd.DataFrame] = None
# caches
self._duration_map: Optional[Dict[str, Dict[str, float]]] = (
None # stage -> {"median": x, "p90": y}
)
self._transitions_map: Optional[Dict[str, List[tuple]]] = (
None # stage_from -> [(stage_to, cum_p), ...]
)
self._adj_map: Optional[Dict[str, Dict[str, float]]] = (
None # stage -> {case_type: p_adj}
)
@property
def transition_probs(self) -> pd.DataFrame:
"""Stage transition probabilities.
Returns:
DataFrame with columns: STAGE_FROM, STAGE_TO, N, row_n, p
"""
if self._transition_probs is None:
file_path = self.params_dir / "stage_transition_probs.csv"
self._transition_probs = pd.read_csv(file_path)
return self._transition_probs
def get_transition_prob(self, stage_from: str, stage_to: str) -> float:
"""Get probability of transitioning from one stage to another.
Args:
stage_from: Current stage
stage_to: Next stage
Returns:
Transition probability (0-1)
"""
df = self.transition_probs
match = df[(df["STAGE_FROM"] == stage_from) & (df["STAGE_TO"] == stage_to)]
if len(match) == 0:
return 0.0
return float(match.iloc[0]["p"])
def _build_transitions_map(self) -> None:
if self._transitions_map is not None:
return
df = self.transition_probs
self._transitions_map = {}
# group by STAGE_FROM, build cumulative probs for fast sampling
for st_from, group in df.groupby("STAGE_FROM"):
cum = 0.0
lst = []
for _, row in group.sort_values("p").iterrows():
cum += float(row["p"])
lst.append((str(row["STAGE_TO"]), cum))
# ensure last cum is 1.0 to guard against rounding
if lst:
to_last, _ = lst[-1]
lst[-1] = (to_last, 1.0)
self._transitions_map[str(st_from)] = lst
def get_stage_transitions(self, stage_from: str) -> pd.DataFrame:
"""Get all possible transitions from a given stage.
Args:
stage_from: Current stage
Returns:
DataFrame with STAGE_TO and p columns
"""
df = self.transition_probs
return df[df["STAGE_FROM"] == stage_from][["STAGE_TO", "p"]].reset_index(
drop=True
)
def get_stage_transitions_fast(self, stage_from: str) -> List[tuple]:
"""Fast lookup: returns list of (stage_to, cum_p)."""
self._build_transitions_map()
if not self._transitions_map:
return []
return self._transitions_map.get(stage_from, [])
@property
def stage_duration(self) -> pd.DataFrame:
"""Stage duration statistics.
Returns:
DataFrame with columns: STAGE, RUN_MEDIAN_DAYS, RUN_P90_DAYS,
HEARINGS_PER_RUN_MED, N_RUNS
"""
if self._stage_duration is None:
file_path = self.params_dir / "stage_duration.csv"
self._stage_duration = pd.read_csv(file_path)
return self._stage_duration
def _build_duration_map(self) -> None:
if self._duration_map is not None:
return
df = self.stage_duration
self._duration_map = {}
for _, row in df.iterrows():
st = str(row["STAGE"])
self._duration_map.setdefault(st, {})
self._duration_map[st]["median"] = float(row["RUN_MEDIAN_DAYS"])
self._duration_map[st]["p90"] = float(row["RUN_P90_DAYS"])
def get_stage_duration(self, stage: str, percentile: str = "median") -> float:
"""Get typical duration for a stage.
Args:
stage: Stage name
percentile: 'median' or 'p90'
Returns:
Duration in days
"""
self._build_duration_map()
if not self._duration_map or stage not in self._duration_map:
return 30.0
p = "median" if percentile == "median" else "p90"
return float(self._duration_map[stage].get(p, 30.0))
@property
def court_capacity(self) -> Dict:
"""Court capacity metrics.
Returns:
Dict with keys: slots_median_global, slots_p90_global
"""
if self._court_capacity is None:
file_path = self.params_dir / "court_capacity_global.json"
with open(file_path, "r") as f:
self._court_capacity = json.load(f)
return self._court_capacity
@property
def daily_capacity_median(self) -> int:
"""Median daily capacity per courtroom."""
return int(self.court_capacity["slots_median_global"])
@property
def daily_capacity_p90(self) -> int:
"""90th percentile daily capacity per courtroom."""
return int(self.court_capacity["slots_p90_global"])
@property
def adjournment_proxies(self) -> pd.DataFrame:
"""Adjournment probabilities by stage and case type.
Returns:
DataFrame with columns: Remappedstages, casetype,
p_adjourn_proxy, p_not_reached_proxy, n
"""
if self._adjournment_proxies is None:
file_path = self.params_dir / "adjournment_proxies.csv"
self._adjournment_proxies = pd.read_csv(file_path)
return self._adjournment_proxies
def _build_adj_map(self) -> None:
if self._adj_map is not None:
return
df = self.adjournment_proxies
self._adj_map = {}
for _, row in df.iterrows():
st = str(row["Remappedstages"])
ct = str(row["casetype"])
p = float(row["p_adjourn_proxy"])
self._adj_map.setdefault(st, {})[ct] = p
def get_adjournment_prob(self, stage: str, case_type: str) -> float:
"""Get probability of adjournment for given stage and case type.
Args:
stage: Stage name
case_type: Case type (e.g., 'RSA', 'CRP')
Returns:
Adjournment probability (0-1)
"""
self._build_adj_map()
if not self._adj_map:
return 0.4
if stage in self._adj_map and case_type in self._adj_map[stage]:
return float(self._adj_map[stage][case_type])
# fallback: average across types for this stage
if stage in self._adj_map and self._adj_map[stage]:
vals = list(self._adj_map[stage].values())
return float(sum(vals) / len(vals))
return 0.4
@property
def case_type_summary(self) -> pd.DataFrame:
"""Summary statistics by case type.
Returns:
DataFrame with columns: CASE_TYPE, n_cases, disp_median,
disp_p90, hear_median, gap_median
"""
if self._case_type_summary is None:
file_path = self.params_dir / "case_type_summary.csv"
self._case_type_summary = pd.read_csv(file_path)
return self._case_type_summary
def get_case_type_stats(self, case_type: str) -> Dict:
"""Get statistics for a specific case type.
Args:
case_type: Case type (e.g., 'RSA', 'CRP')
Returns:
Dict with disp_median, disp_p90, hear_median, gap_median
"""
df = self.case_type_summary
match = df[df["CASE_TYPE"] == case_type]
if len(match) == 0:
raise ValueError(f"Unknown case type: {case_type}")
return match.iloc[0].to_dict()
@property
def transition_entropy(self) -> pd.DataFrame:
"""Stage transition entropy (predictability metric).
Returns:
DataFrame with columns: STAGE_FROM, entropy
"""
if self._transition_entropy is None:
file_path = self.params_dir / "stage_transition_entropy.csv"
self._transition_entropy = pd.read_csv(file_path)
return self._transition_entropy
def get_stage_predictability(self, stage: str) -> float:
"""Get predictability of transitions from a stage (inverse of entropy).
Args:
stage: Stage name
Returns:
Predictability score (0-1, higher = more predictable)
"""
df = self.transition_entropy
match = df[df["STAGE_FROM"] == stage]
if len(match) == 0:
return 0.5 # Default: medium predictability
entropy = float(match.iloc[0]["entropy"])
# Convert entropy to predictability (lower entropy = higher predictability)
# Max entropy ~1.4, so normalize
predictability = max(0.0, 1.0 - (entropy / 1.5))
return predictability
def get_stage_stationary_distribution(self) -> Dict[str, float]:
"""Approximate stationary distribution over stages from transition matrix.
Returns stage -> probability summing to 1.0.
"""
df = self.transition_probs.copy()
# drop nulls and ensure strings
df = df[df["STAGE_FROM"].notna() & df["STAGE_TO"].notna()]
df["STAGE_FROM"] = df["STAGE_FROM"].astype(str)
df["STAGE_TO"] = df["STAGE_TO"].astype(str)
stages = sorted(set(df["STAGE_FROM"]).union(set(df["STAGE_TO"])))
idx = {s: i for i, s in enumerate(stages)}
n = len(stages)
# build dense row-stochastic matrix
P = [[0.0] * n for _ in range(n)]
for _, row in df.iterrows():
i = idx[str(row["STAGE_FROM"])]
j = idx[str(row["STAGE_TO"])]
P[i][j] += float(row["p"])
# ensure rows sum to 1 by topping up self-loop
for i in range(n):
s = sum(P[i])
if s < 0.999:
P[i][i] += 1.0 - s
elif s > 1.001:
# normalize if slightly over
P[i] = [v / s for v in P[i]]
# power iteration
pi = [1.0 / n] * n
for _ in range(200):
new = [0.0] * n
for j in range(n):
acc = 0.0
for i in range(n):
acc += pi[i] * P[i][j]
new[j] = acc
# normalize
z = sum(new)
if z == 0:
break
new = [v / z for v in new]
# check convergence
if sum(abs(new[k] - pi[k]) for k in range(n)) < 1e-9:
pi = new
break
pi = new
return {stages[i]: pi[i] for i in range(n)}
def __repr__(self) -> str:
return f"ParameterLoader(params_dir={self.params_dir})"
# Convenience function for quick access
def load_parameters(params_dir: Optional[Path] = None) -> ParameterLoader:
"""Load parameters from EDA outputs.
Args:
params_dir: Directory containing parameter files. If None, uses latest.
Returns:
ParameterLoader instance
"""
return ParameterLoader(params_dir)