Spaces:
Sleeping
Sleeping
File size: 10,922 Bytes
54c8522 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | """Validation harness for scheduler policies (minimal, Phase 1 compatible).
Runs a lightweight scheduling loop over a short horizon to compute:
- Utilization
- Urgency SLA (7 working days)
- Constraint violations: capacity overflow, weekend/holiday scheduling
Policies supported: fifo, age, readiness
Run:
uv run --no-project python scripts/validate_policy.py --policy readiness --replications 10 --days 20
"""
from __future__ import annotations
import argparse
import random
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Dict, List, Tuple
import sys, os
# Ensure project root is on sys.path when running as a script
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from scheduler.core.case import Case
from scheduler.core.courtroom import Courtroom
from scheduler.core.judge import Judge
from scheduler.utils.calendar import CourtCalendar
from scheduler.data.config import (
CASE_TYPE_DISTRIBUTION,
URGENT_CASE_PERCENTAGE,
DEFAULT_DAILY_CAPACITY,
COURTROOMS,
)
from scheduler.metrics.basic import utilization, urgency_sla
@dataclass
class KPIResult:
utilization: float
urgent_sla: float
capacity_overflows: int
weekend_violations: int
def sample_case_type() -> str:
items = list(CASE_TYPE_DISTRIBUTION.items())
r = random.random()
acc = 0.0
for ct, p in items:
acc += p
if r <= acc:
return ct
return items[-1][0]
def working_days_diff(cal: CourtCalendar, start: date, end: date) -> int:
if end < start:
return 0
return cal.working_days_between(start, end)
def build_cases(n: int, start_date: date, cal: CourtCalendar) -> List[Case]:
cases: List[Case] = []
# spread filings across the first 10 working days
wd = cal.generate_court_calendar(start_date, start_date + timedelta(days=30))[:10]
for i in range(n):
filed = wd[i % len(wd)]
ct = sample_case_type()
urgent = random.random() < URGENT_CASE_PERCENTAGE
cases.append(
Case(case_id=f"C{i:05d}", case_type=ct, filed_date=filed, current_stage="ADMISSION", is_urgent=urgent)
)
return cases
def choose_order(policy: str, cases: List[Case]) -> List[Case]:
if policy == "fifo":
return sorted(cases, key=lambda c: c.filed_date)
if policy == "age":
# older first: we use age_days which caller must update
return sorted(cases, key=lambda c: c.age_days, reverse=True)
if policy == "readiness":
# use priority which includes urgency and readiness
return sorted(cases, key=lambda c: c.get_priority_score(), reverse=True)
return cases
def run_replication(policy: str, seed: int, days: int) -> KPIResult:
random.seed(seed)
cal = CourtCalendar()
cal.add_standard_holidays(date.today().year)
# build courtrooms and judges
rooms = [Courtroom(courtroom_id=i + 1, judge_id=f"J{i+1:03d}", daily_capacity=DEFAULT_DAILY_CAPACITY) for i in range(COURTROOMS)]
judges = [Judge(judge_id=f"J{i+1:03d}", name=f"Justice {i+1}", courtroom_id=i + 1) for i in range(COURTROOMS)]
# build cases
start = date.today().replace(day=1) # arbitrary start of month
cases = build_cases(n=COURTROOMS * DEFAULT_DAILY_CAPACITY, start_date=start, cal=cal)
# horizon
working_days = cal.generate_court_calendar(start, start + timedelta(days=days + 30))[:days]
scheduled = 0
urgent_records: List[Tuple[bool, int]] = []
capacity_overflows = 0
weekend_violations = 0
unscheduled = set(c.case_id for c in cases)
for d in working_days:
# sanity: weekend should be excluded by calendar, but check
if d.weekday() >= 5:
weekend_violations += 1
# update ages and readiness before scheduling
for c in cases:
c.update_age(d)
c.compute_readiness_score()
# order cases by policy
ordered = [c for c in choose_order(policy, cases) if c.case_id in unscheduled]
# fill capacity across rooms round-robin
remaining_capacity = {r.courtroom_id: r.get_capacity_for_date(d) if hasattr(r, "get_capacity_for_date") else r.daily_capacity for r in rooms}
total_capacity_today = sum(remaining_capacity.values())
filled_today = 0
ridx = 0
for c in ordered:
if filled_today >= total_capacity_today:
break
# find next room with capacity
attempts = 0
while attempts < len(rooms) and remaining_capacity[rooms[ridx].courtroom_id] == 0:
ridx = (ridx + 1) % len(rooms)
attempts += 1
if attempts >= len(rooms):
break
room = rooms[ridx]
if room.can_schedule(d, c.case_id):
room.schedule_case(d, c.case_id)
remaining_capacity[room.courtroom_id] -= 1
filled_today += 1
unscheduled.remove(c.case_id)
# urgency record
urgent_records.append((c.is_urgent, working_days_diff(cal, c.filed_date, d)))
ridx = (ridx + 1) % len(rooms)
# capacity check
for room in rooms:
day_sched = room.get_daily_schedule(d)
if len(day_sched) > room.daily_capacity:
capacity_overflows += 1
scheduled += filled_today
if not unscheduled:
break
# compute KPIs
total_capacity = sum(r.daily_capacity for r in rooms) * len(working_days)
util = utilization(scheduled, total_capacity)
urgent = urgency_sla(urgent_records, days=7)
return KPIResult(utilization=util, urgent_sla=urgent, capacity_overflows=capacity_overflows, weekend_violations=weekend_violations)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--policy", choices=["fifo", "age", "readiness"], default="readiness")
ap.add_argument("--replications", type=int, default=5)
ap.add_argument("--days", type=int, default=20, help="working days horizon")
ap.add_argument("--seed", type=int, default=42)
ap.add_argument("--cases-csv", type=str, default=None, help="Path to pre-generated cases CSV")
args = ap.parse_args()
print("== Validation Run ==")
print(f"Policy: {args.policy}")
print(f"Replications: {args.replications}, Horizon (working days): {args.days}")
if args.cases_csv:
print(f"Cases source: {args.cases_csv}")
results: List[KPIResult] = []
# If cases CSV is provided, load once and close over a custom replication that reuses them
if args.cases_csv:
from pathlib import Path
from scheduler.data.case_generator import CaseGenerator
preload = CaseGenerator.from_csv(Path(args.cases_csv))
def run_with_preloaded(policy: str, seed: int, days: int) -> KPIResult:
# Same as run_replication, but replace built cases with preloaded
import random
random.seed(seed)
cal = CourtCalendar()
cal.add_standard_holidays(date.today().year)
rooms = [Courtroom(courtroom_id=i + 1, judge_id=f"J{i+1:03d}", daily_capacity=DEFAULT_DAILY_CAPACITY) for i in range(COURTROOMS)]
start = date.today().replace(day=1)
cases = list(preload) # shallow copy
working_days = cal.generate_court_calendar(start, start + timedelta(days=days + 30))[:days]
scheduled = 0
urgent_records: List[Tuple[bool, int]] = []
capacity_overflows = 0
weekend_violations = 0
unscheduled = set(c.case_id for c in cases)
for d in working_days:
if d.weekday() >= 5:
weekend_violations += 1
for c in cases:
c.update_age(d)
c.compute_readiness_score()
ordered = [c for c in choose_order(policy, cases) if c.case_id in unscheduled]
remaining_capacity = {r.courtroom_id: r.get_capacity_for_date(d) if hasattr(r, "get_capacity_for_date") else r.daily_capacity for r in rooms}
total_capacity_today = sum(remaining_capacity.values())
filled_today = 0
ridx = 0
for c in ordered:
if filled_today >= total_capacity_today:
break
attempts = 0
while attempts < len(rooms) and remaining_capacity[rooms[ridx].courtroom_id] == 0:
ridx = (ridx + 1) % len(rooms)
attempts += 1
if attempts >= len(rooms):
break
room = rooms[ridx]
if room.can_schedule(d, c.case_id):
room.schedule_case(d, c.case_id)
remaining_capacity[room.courtroom_id] -= 1
filled_today += 1
unscheduled.remove(c.case_id)
urgent_records.append((c.is_urgent, working_days_diff(cal, c.filed_date, d)))
ridx = (ridx + 1) % len(rooms)
for room in rooms:
day_sched = room.get_daily_schedule(d)
if len(day_sched) > room.daily_capacity:
capacity_overflows += 1
scheduled += filled_today
if not unscheduled:
break
total_capacity = sum(r.daily_capacity for r in rooms) * len(working_days)
util = utilization(scheduled, total_capacity)
urgent = urgency_sla(urgent_records, days=7)
return KPIResult(utilization=util, urgent_sla=urgent, capacity_overflows=capacity_overflows, weekend_violations=weekend_violations)
for i in range(args.replications):
results.append(run_with_preloaded(args.policy, args.seed + i, args.days))
else:
for i in range(args.replications):
res = run_replication(args.policy, args.seed + i, args.days)
results.append(res)
# aggregate
util_vals = [r.utilization for r in results]
urgent_vals = [r.urgent_sla for r in results]
cap_viol = sum(r.capacity_overflows for r in results)
wknd_viol = sum(r.weekend_violations for r in results)
def mean(xs: List[float]) -> float:
return sum(xs) / len(xs) if xs else 0.0
print("\n-- KPIs --")
print(f"Utilization (mean): {mean(util_vals):.2%}")
print(f"Urgent SLA<=7d (mean): {mean(urgent_vals):.2%}")
print("\n-- Constraint Violations (should be 0) --")
print(f"Capacity overflows: {cap_viol}")
print(f"Weekend/holiday scheduling: {wknd_viol}")
print("\nNote: This is a lightweight harness for Phase 1; fairness metrics (e.g., Gini of disposal times) will be computed after Phase 3 when full simulation is available.")
if __name__ == "__main__":
main()
|