RoyAalekh's picture
enhancements, added view for scehduled cases as tickets
9eaac57
"""Phase 3: Minimal SimPy simulation engine.
This engine simulates daily operations over working days:
- Each day, schedule ready cases up to courtroom capacities using a simple policy (readiness priority)
- For each scheduled case, sample hearing outcome (adjourned vs heard) using EDA adjournment rates
- If heard, sample stage transition using EDA transition probabilities (may dispose the case)
- Track basic KPIs, utilization, and outcomes
This is intentionally lightweight; OR-Tools optimization and richer policies will integrate later.
"""
from __future__ import annotations
import csv
import random
import time
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List
from src.core.algorithm import SchedulingAlgorithm, SchedulingResult
from src.core.case import Case, CaseStatus
from src.core.courtroom import Courtroom
from src.core.ripeness import RipenessClassifier
from src.data.config import (
ANNUAL_FILING_RATE,
COURTROOMS,
DEFAULT_DAILY_CAPACITY,
MIN_GAP_BETWEEN_HEARINGS,
MONTHLY_SEASONALITY,
TERMINAL_STAGES,
)
from src.data.param_loader import load_parameters
from src.simulation.allocator import AllocationStrategy, CourtroomAllocator
from src.simulation.events import EventWriter
from src.simulation.policies import get_policy
from src.utils.calendar import CourtCalendar
from src.config.paths import make_new_run_dir
@dataclass
class CourtSimConfig:
start: date
days: int
seed: int = 42
courtrooms: int = COURTROOMS
daily_capacity: int = DEFAULT_DAILY_CAPACITY
policy: str = "readiness" # fifo|age|readiness
duration_percentile: str = "median" # median|p90
log_dir: Path | None = None # if set, write metrics and suggestions
write_suggestions: bool = False # if True, write daily suggestion CSVs (slow)
def __post_init__(self):
"""Validate configuration parameters."""
# Ensure log_dir is Path if provided
if self.log_dir is not None and not isinstance(self.log_dir, Path):
self.log_dir = Path(self.log_dir)
@dataclass
class CourtSimResult:
hearings_total: int
hearings_heard: int
hearings_adjourned: int
disposals: int
utilization: float
end_date: date
ripeness_transitions: int = 0 # Number of ripeness status changes
unripe_filtered: int = 0 # Cases filtered out due to unripeness
insights_text: str = "" # Collected insights as plain text
class CourtSim:
def __init__(self, config: CourtSimConfig, cases: List[Case]):
self.cfg = config
self.cases = cases
self.calendar = CourtCalendar()
self.params = load_parameters()
# Initialize policy
self.policy = get_policy(self.cfg.policy)
random.seed(self.cfg.seed)
# month working-days cache
self._month_working_cache: Dict[tuple, int] = {}
# logging setup
self._log_dir: Path | None = None
if self.cfg.log_dir:
self._log_dir = Path(self.cfg.log_dir)
self._log_dir.mkdir(parents=True, exist_ok=True)
else:
# default run folder (centralized base path)
run_id = time.strftime("%Y%m%d_%H%M%S")
self._log_dir = make_new_run_dir(run_id)
self._metrics_path = self._log_dir / "metrics.csv"
with self._metrics_path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(
[
"date",
"total_cases",
"scheduled",
"heard",
"adjourned",
"disposals",
"utilization",
]
)
# events
self._events_path = self._log_dir / "events.csv"
self._events = EventWriter(self._events_path)
# resources
self.rooms = [
Courtroom(
courtroom_id=i + 1,
judge_id=f"J{i + 1:03d}",
daily_capacity=self.cfg.daily_capacity,
)
for i in range(self.cfg.courtrooms)
]
# stats
self._hearings_total = 0
self._hearings_heard = 0
self._hearings_adjourned = 0
self._disposals = 0
self._capacity_offered = 0
# gating: earliest date a case may leave its current stage
self._stage_ready: Dict[str, date] = {}
self._init_stage_ready()
# ripeness tracking
self._ripeness_transitions = 0
self._unripe_filtered = 0
self._last_ripeness_eval = self.cfg.start
# courtroom allocator
self.allocator = CourtroomAllocator(
num_courtrooms=self.cfg.courtrooms,
per_courtroom_capacity=self.cfg.daily_capacity,
strategy=AllocationStrategy.LOAD_BALANCED,
)
# scheduling algorithm (NEW - replaces inline logic)
self.algorithm = SchedulingAlgorithm(
policy=self.policy,
allocator=self.allocator,
min_gap_days=MIN_GAP_BETWEEN_HEARINGS,
)
# --- helpers -------------------------------------------------------------
def _init_stage_ready(self) -> None:
# Cases with last_hearing_date have been in current stage for some time
# Set stage_ready relative to last hearing + typical stage duration
# This allows cases to progress naturally from simulation start
for c in self.cases:
dur = int(
round(
self.params.get_stage_duration(
c.current_stage, self.cfg.duration_percentile
)
)
)
dur = max(1, dur)
# If case has hearing history, use last hearing date as reference
if c.last_hearing_date:
# Case has been in stage since last hearing, allow transition after typical duration
self._stage_ready[c.case_id] = c.last_hearing_date + timedelta(days=dur)
else:
# New case - use filed date
self._stage_ready[c.case_id] = c.filed_date + timedelta(days=dur)
# --- stochastic helpers -------------------------------------------------
def _sample_adjournment(self, stage: str, case_type: str) -> bool:
p_adj = self.params.get_adjournment_prob(stage, case_type)
return random.random() < p_adj
def _sample_next_stage(self, stage_from: str) -> str:
lst = self.params.get_stage_transitions_fast(stage_from)
if not lst:
return stage_from
r = random.random()
for to, cum in lst:
if r <= cum:
return to
return lst[-1][0]
def _check_disposal_at_hearing(self, case: Case, current: date) -> bool:
"""Check if case disposes at this hearing based on type-specific maturity.
Logic:
- Each case type has a median disposal duration (e.g., RSA=695d, CCC=93d).
- Disposal probability increases as case approaches/exceeds this median.
- Only occurs in terminal-capable stages (ORDERS, ARGUMENTS).
"""
# 1. Must be in a stage where disposal is possible
# Historical data shows 90% disposals happen in ADMISSION or ORDERS
disposal_capable_stages = [
"ORDERS / JUDGMENT",
"ARGUMENTS",
"ADMISSION",
"FINAL DISPOSAL",
]
if case.current_stage not in disposal_capable_stages:
return False
# 2. Get case type statistics
try:
stats = self.params.get_case_type_stats(case.case_type)
expected_days = stats["disp_median"]
expected_hearings = stats["hear_median"]
except (ValueError, KeyError):
# Fallback for unknown types
expected_days = 365.0
expected_hearings = 5.0
# 3. Calculate maturity factors
# Age factor: non-linear increase as we approach median duration
maturity = case.age_days / max(1.0, expected_days)
if maturity < 0.2:
age_prob = 0.01 # Very unlikely to dispose early
elif maturity < 0.8:
age_prob = 0.05 * maturity # Linear ramp up
elif maturity < 1.5:
age_prob = 0.10 + 0.10 * (maturity - 0.8) # Higher prob around median
else:
age_prob = 0.25 # Cap at 25% for overdue cases
# Hearing factor: need sufficient hearings
hearing_factor = min(case.hearing_count / max(1.0, expected_hearings), 1.5)
# Stage factor
stage_prob = 1.0
if case.current_stage == "ADMISSION":
stage_prob = 0.5 # Less likely to dispose in admission than orders
elif case.current_stage == "FINAL DISPOSAL":
stage_prob = 2.0 # Very likely
# 4. Final probability check
final_prob = age_prob * hearing_factor * stage_prob
# Cap at reasonable max per hearing to avoid sudden mass disposals
final_prob = min(final_prob, 0.30)
return random.random() < final_prob
# --- ripeness evaluation (periodic) -------------------------------------
def _evaluate_ripeness(self, current: date) -> None:
"""Periodically re-evaluate ripeness for all active cases.
This detects when bottlenecks are resolved or new ones emerge.
"""
for c in self.cases:
if c.status == CaseStatus.DISPOSED:
continue
# Calculate current ripeness
prev_status = c.ripeness_status
new_status = RipenessClassifier.classify(c, current)
# Track transitions (compare string values)
if new_status.value != prev_status:
self._ripeness_transitions += 1
# Update case status
if new_status.is_ripe():
c.mark_ripe(current)
self._events.write(
current,
"ripeness_change",
c.case_id,
case_type=c.case_type,
stage=c.current_stage,
detail=f"UNRIPE->RIPE (was {prev_status.value})",
)
else:
reason = RipenessClassifier.get_ripeness_reason(new_status)
c.mark_unripe(new_status, reason, current)
self._events.write(
current,
"ripeness_change",
c.case_id,
case_type=c.case_type,
stage=c.current_stage,
detail=f"RIPE->UNRIPE ({new_status.value}: {reason})",
)
# --- daily scheduling policy --------------------------------------------
def _choose_cases_for_day(self, current: date) -> SchedulingResult:
"""Use SchedulingAlgorithm to schedule cases for the day.
This replaces the previous inline scheduling logic with a call to the
standalone algorithm module. The algorithm handles:
- Ripeness filtering
- Eligibility checks
- Policy prioritization
- Courtroom allocation
- Explanation generation
"""
# Periodic ripeness re-evaluation (every 7 days)
days_since_eval = (current - self._last_ripeness_eval).days
if days_since_eval >= 7:
self._evaluate_ripeness(current)
self._last_ripeness_eval = current
# Call algorithm to schedule day
# Note: No overrides in baseline simulation - that's for override demonstration runs
result = self.algorithm.schedule_day(
cases=self.cases,
courtrooms=self.rooms,
current_date=current,
overrides=None, # No overrides in baseline simulation
preferences=None, # No judge preferences in baseline simulation
)
# Update stats from algorithm result
self._unripe_filtered += result.ripeness_filtered
return result
# --- main loop -----------------------------------------------------------
def _expected_daily_filings(self, current: date) -> int:
# Approximate monthly filing rate adjusted by seasonality
monthly = ANNUAL_FILING_RATE / 12.0
factor = MONTHLY_SEASONALITY.get(current.month, 1.0)
# scale by working days in month
key = (current.year, current.month)
if key not in self._month_working_cache:
self._month_working_cache[key] = len(
self.calendar.get_working_days_in_month(current.year, current.month)
)
month_working = self._month_working_cache[key]
if month_working == 0:
return 0
return max(0, int(round((monthly * factor) / month_working)))
def _file_new_cases(self, current: date, n: int) -> None:
# Simple new filings at ADMISSION
start_idx = len(self.cases)
for i in range(n):
cid = f"NEW/{current.year}/{start_idx + i + 1:05d}"
ct = "RSA" # lightweight: pick a plausible type; could sample from distribution
case = Case(
case_id=cid,
case_type=ct,
filed_date=current,
current_stage="ADMISSION",
is_urgent=False,
)
self.cases.append(case)
# stage gating for new case
dur = int(
round(
self.params.get_stage_duration(
case.current_stage, self.cfg.duration_percentile
)
)
)
dur = max(1, dur)
self._stage_ready[case.case_id] = current + timedelta(days=dur)
# event
self._events.write(
current,
"filing",
case.case_id,
case_type=case.case_type,
stage=case.current_stage,
detail="new_filing",
)
def _day_process(self, current: date):
# schedule
# DISABLED: dynamic case filing to test with fixed case set
# inflow = self._expected_daily_filings(current)
# if inflow:
# self._file_new_cases(current, inflow)
result = self._choose_cases_for_day(current)
capacity_today = sum(self.cfg.daily_capacity for _ in self.rooms)
self._capacity_offered += capacity_today
day_heard = 0
day_total = 0
# suggestions file for transparency (optional, expensive)
sw = None
sf = None
if self.cfg.write_suggestions:
sugg_path = self._log_dir / f"suggestions_{current.isoformat()}.csv"
sf = sugg_path.open("w", newline="")
sw = csv.writer(sf)
sw.writerow(
[
"case_id",
"courtroom_id",
"policy",
"age_days",
"readiness_score",
"urgent",
"stage",
"days_since_last_hearing",
"stage_ready_date",
]
)
for room in self.rooms:
for case in result.scheduled_cases.get(room.courtroom_id, []):
# Skip if case already disposed (safety check)
if case.status == CaseStatus.DISPOSED:
continue
if room.schedule_case(current, case.case_id):
# Mark case as scheduled (for no-case-left-behind tracking)
case.mark_scheduled(current)
# Calculate adjournment boost for logging
import math
adj_boost = 0.0
if case.status == CaseStatus.ADJOURNED and case.hearing_count > 0:
adj_boost = math.exp(-case.days_since_last_hearing / 21)
# Log with full decision metadata
self._events.write(
current,
"scheduled",
case.case_id,
case_type=case.case_type,
stage=case.current_stage,
courtroom_id=room.courtroom_id,
priority_score=case.get_priority_score(),
age_days=case.age_days,
readiness_score=case.readiness_score,
is_urgent=case.is_urgent,
adj_boost=adj_boost,
ripeness_status=case.ripeness_status,
days_since_hearing=case.days_since_last_hearing,
)
day_total += 1
self._hearings_total += 1
# log suggestive rationale
if sw:
sw.writerow(
[
case.case_id,
room.courtroom_id,
self.cfg.policy,
case.age_days,
f"{case.readiness_score:.3f}",
int(case.is_urgent),
case.current_stage,
case.days_since_last_hearing,
self._stage_ready.get(
case.case_id, current
).isoformat(),
]
)
# outcome
if self._sample_adjournment(case.current_stage, case.case_type):
case.record_hearing(
current, was_heard=False, outcome="adjourned"
)
self._events.write(
current,
"outcome",
case.case_id,
case_type=case.case_type,
stage=case.current_stage,
courtroom_id=room.courtroom_id,
detail="adjourned",
)
self._hearings_adjourned += 1
else:
case.record_hearing(current, was_heard=True, outcome="heard")
day_heard += 1
self._events.write(
current,
"outcome",
case.case_id,
case_type=case.case_type,
stage=case.current_stage,
courtroom_id=room.courtroom_id,
detail="heard",
)
self._hearings_heard += 1
# stage transition (duration-gated)
disposed = False
# Check for disposal FIRST (before stage transition)
if self._check_disposal_at_hearing(case, current):
case.status = CaseStatus.DISPOSED
case.disposal_date = current
self._disposals += 1
self._events.write(
current,
"disposed",
case.case_id,
case_type=case.case_type,
stage=case.current_stage,
detail="natural_disposal",
)
disposed = True
if not disposed and current >= self._stage_ready.get(
case.case_id, current
):
next_stage = self._sample_next_stage(case.current_stage)
# apply transition
prev_stage = case.current_stage
case.progress_to_stage(next_stage, current)
self._events.write(
current,
"stage_change",
case.case_id,
case_type=case.case_type,
stage=next_stage,
detail=f"from:{prev_stage}",
)
# Explicit stage-based disposal (rare but possible)
if not disposed and (
case.status == CaseStatus.DISPOSED
or next_stage in TERMINAL_STAGES
):
self._disposals += 1
self._events.write(
current,
"disposed",
case.case_id,
case_type=case.case_type,
stage=next_stage,
detail="case_disposed",
)
disposed = True
# set next stage ready date
if not disposed:
dur = int(
round(
self.params.get_stage_duration(
case.current_stage,
self.cfg.duration_percentile,
)
)
)
dur = max(1, dur)
self._stage_ready[case.case_id] = current + timedelta(
days=dur
)
elif not disposed:
# not allowed to leave stage yet; extend readiness window to avoid perpetual eligibility
dur = int(
round(
self.params.get_stage_duration(
case.current_stage, self.cfg.duration_percentile
)
)
)
dur = max(1, dur)
self._stage_ready[case.case_id] = self._stage_ready[
case.case_id
] # unchanged
room.record_daily_utilization(current, day_heard)
# write metrics row
total_cases = sum(1 for c in self.cases if c.status != CaseStatus.DISPOSED)
util = (day_total / capacity_today) if capacity_today else 0.0
with self._metrics_path.open("a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(
[
current.isoformat(),
total_cases,
day_total,
day_heard,
day_total - day_heard,
self._disposals,
f"{util:.4f}",
]
)
if sf:
sf.close()
# flush buffered events once per day to minimize I/O
self._events.flush()
# no env timeout needed for discrete daily steps here
def run(self) -> CourtSimResult:
# derive working days sequence
end_guess = self.cfg.start + timedelta(
days=self.cfg.days + 60
) # pad for weekends/holidays
working_days = self.calendar.generate_court_calendar(self.cfg.start, end_guess)[
: self.cfg.days
]
for d in working_days:
self._day_process(d)
# final flush (should be no-op if flushed daily) to ensure buffers are empty
self._events.flush()
util = (
(self._hearings_total / self._capacity_offered)
if self._capacity_offered
else 0.0
)
# Collect insights text (previously printed inline)
insights_lines: List[str] = []
# Ripeness summary
active_cases = [c for c in self.cases if c.status != CaseStatus.DISPOSED]
ripeness_dist: Dict[str, int] = {}
for c in active_cases:
status = c.ripeness_status
ripeness_dist[status] = ripeness_dist.get(status, 0) + 1
insights_lines.append("=== Ripeness Summary ===")
insights_lines.append(
f"Total ripeness transitions: {self._ripeness_transitions}"
)
insights_lines.append(f"Cases filtered (unripe): {self._unripe_filtered}")
insights_lines.append("\nFinal ripeness distribution:")
for status, count in sorted(ripeness_dist.items()):
pct = (count / len(active_cases) * 100) if active_cases else 0
insights_lines.append(f" {status}: {count} ({pct:.1f}%)")
# Courtroom allocation summary
insights_lines.append("")
insights_lines.append(self.allocator.get_courtroom_summary())
# Comprehensive case status breakdown
total_cases = len(self.cases)
disposed_cases = [c for c in self.cases if c.status == CaseStatus.DISPOSED]
scheduled_at_least_once = [
c for c in self.cases if c.last_scheduled_date is not None
]
never_scheduled = [c for c in self.cases if c.last_scheduled_date is None]
scheduled_but_not_disposed = [
c for c in scheduled_at_least_once if c.status != CaseStatus.DISPOSED
]
insights_lines.append("\n=== Case Status Breakdown ===")
insights_lines.append(f"Total cases in system: {total_cases:,}")
insights_lines.append("\nScheduling outcomes:")
insights_lines.append(
f" Scheduled at least once: {len(scheduled_at_least_once):,} ({len(scheduled_at_least_once) / max(1, total_cases) * 100:.1f}%)"
)
insights_lines.append(
f" - Disposed: {len(disposed_cases):,} ({len(disposed_cases) / max(1, total_cases) * 100:.1f}%)"
)
insights_lines.append(
f" - Active (not disposed): {len(scheduled_but_not_disposed):,} ({len(scheduled_but_not_disposed) / max(1, total_cases) * 100:.1f}%)"
)
insights_lines.append(
f" Never scheduled: {len(never_scheduled):,} ({len(never_scheduled) / max(1, total_cases) * 100:.1f}%)"
)
if scheduled_at_least_once:
avg_hearings = sum(c.hearing_count for c in scheduled_at_least_once) / len(
scheduled_at_least_once
)
insights_lines.append(
f"\nAverage hearings per scheduled case: {avg_hearings:.1f}"
)
if disposed_cases:
avg_hearings_to_disposal = sum(
c.hearing_count for c in disposed_cases
) / len(disposed_cases)
avg_days_to_disposal = sum(
(c.disposal_date - c.filed_date).days for c in disposed_cases
) / len(disposed_cases)
insights_lines.append("\nDisposal metrics:")
insights_lines.append(
f" Average hearings to disposal: {avg_hearings_to_disposal:.1f}"
)
insights_lines.append(
f" Average days to disposal: {avg_days_to_disposal:.0f}"
)
insights_text = "\n".join(insights_lines)
# Still echo to console for CLI users
print("\n" + insights_text)
return CourtSimResult(
hearings_total=self._hearings_total,
hearings_heard=self._hearings_heard,
hearings_adjourned=self._hearings_adjourned,
disposals=self._disposals,
utilization=util,
end_date=working_days[-1] if working_days else self.cfg.start,
ripeness_transitions=self._ripeness_transitions,
unripe_filtered=self._unripe_filtered,
insights_text=insights_text,
)