File size: 10,922 Bytes
54c8522
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""Validation harness for scheduler policies (minimal, Phase 1 compatible).

Runs a lightweight scheduling loop over a short horizon to compute:
- Utilization
- Urgency SLA (7 working days)
- Constraint violations: capacity overflow, weekend/holiday scheduling

Policies supported: fifo, age, readiness

Run:
  uv run --no-project python scripts/validate_policy.py --policy readiness --replications 10 --days 20
"""
from __future__ import annotations

import argparse
import random
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Dict, List, Tuple
import sys, os

# Ensure project root is on sys.path when running as a script
sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from scheduler.core.case import Case
from scheduler.core.courtroom import Courtroom
from scheduler.core.judge import Judge
from scheduler.utils.calendar import CourtCalendar
from scheduler.data.config import (
    CASE_TYPE_DISTRIBUTION,
    URGENT_CASE_PERCENTAGE,
    DEFAULT_DAILY_CAPACITY,
    COURTROOMS,
)
from scheduler.metrics.basic import utilization, urgency_sla


@dataclass
class KPIResult:
    utilization: float
    urgent_sla: float
    capacity_overflows: int
    weekend_violations: int


def sample_case_type() -> str:
    items = list(CASE_TYPE_DISTRIBUTION.items())
    r = random.random()
    acc = 0.0
    for ct, p in items:
        acc += p
        if r <= acc:
            return ct
    return items[-1][0]


def working_days_diff(cal: CourtCalendar, start: date, end: date) -> int:
    if end < start:
        return 0
    return cal.working_days_between(start, end)


def build_cases(n: int, start_date: date, cal: CourtCalendar) -> List[Case]:
    cases: List[Case] = []
    # spread filings across the first 10 working days
    wd = cal.generate_court_calendar(start_date, start_date + timedelta(days=30))[:10]
    for i in range(n):
        filed = wd[i % len(wd)]
        ct = sample_case_type()
        urgent = random.random() < URGENT_CASE_PERCENTAGE
        cases.append(
            Case(case_id=f"C{i:05d}", case_type=ct, filed_date=filed, current_stage="ADMISSION", is_urgent=urgent)
        )
    return cases


def choose_order(policy: str, cases: List[Case]) -> List[Case]:
    if policy == "fifo":
        return sorted(cases, key=lambda c: c.filed_date)
    if policy == "age":
        # older first: we use age_days which caller must update
        return sorted(cases, key=lambda c: c.age_days, reverse=True)
    if policy == "readiness":
        # use priority which includes urgency and readiness
        return sorted(cases, key=lambda c: c.get_priority_score(), reverse=True)
    return cases


def run_replication(policy: str, seed: int, days: int) -> KPIResult:
    random.seed(seed)
    cal = CourtCalendar()
    cal.add_standard_holidays(date.today().year)

    # build courtrooms and judges
    rooms = [Courtroom(courtroom_id=i + 1, judge_id=f"J{i+1:03d}", daily_capacity=DEFAULT_DAILY_CAPACITY) for i in range(COURTROOMS)]
    judges = [Judge(judge_id=f"J{i+1:03d}", name=f"Justice {i+1}", courtroom_id=i + 1) for i in range(COURTROOMS)]

    # build cases
    start = date.today().replace(day=1)  # arbitrary start of month
    cases = build_cases(n=COURTROOMS * DEFAULT_DAILY_CAPACITY, start_date=start, cal=cal)

    # horizon
    working_days = cal.generate_court_calendar(start, start + timedelta(days=days + 30))[:days]

    scheduled = 0
    urgent_records: List[Tuple[bool, int]] = []
    capacity_overflows = 0
    weekend_violations = 0

    unscheduled = set(c.case_id for c in cases)

    for d in working_days:
        # sanity: weekend should be excluded by calendar, but check
        if d.weekday() >= 5:
            weekend_violations += 1

        # update ages and readiness before scheduling
        for c in cases:
            c.update_age(d)
            c.compute_readiness_score()

        # order cases by policy
        ordered = [c for c in choose_order(policy, cases) if c.case_id in unscheduled]

        # fill capacity across rooms round-robin
        remaining_capacity = {r.courtroom_id: r.get_capacity_for_date(d) if hasattr(r, "get_capacity_for_date") else r.daily_capacity for r in rooms}
        total_capacity_today = sum(remaining_capacity.values())
        filled_today = 0

        ridx = 0
        for c in ordered:
            if filled_today >= total_capacity_today:
                break
            # find next room with capacity
            attempts = 0
            while attempts < len(rooms) and remaining_capacity[rooms[ridx].courtroom_id] == 0:
                ridx = (ridx + 1) % len(rooms)
                attempts += 1
            if attempts >= len(rooms):
                break
            room = rooms[ridx]
            if room.can_schedule(d, c.case_id):
                room.schedule_case(d, c.case_id)
                remaining_capacity[room.courtroom_id] -= 1
                filled_today += 1
                unscheduled.remove(c.case_id)
                # urgency record
                urgent_records.append((c.is_urgent, working_days_diff(cal, c.filed_date, d)))
            ridx = (ridx + 1) % len(rooms)

        # capacity check
        for room in rooms:
            day_sched = room.get_daily_schedule(d)
            if len(day_sched) > room.daily_capacity:
                capacity_overflows += 1

        scheduled += filled_today

        if not unscheduled:
            break

    # compute KPIs
    total_capacity = sum(r.daily_capacity for r in rooms) * len(working_days)
    util = utilization(scheduled, total_capacity)
    urgent = urgency_sla(urgent_records, days=7)

    return KPIResult(utilization=util, urgent_sla=urgent, capacity_overflows=capacity_overflows, weekend_violations=weekend_violations)


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--policy", choices=["fifo", "age", "readiness"], default="readiness")
    ap.add_argument("--replications", type=int, default=5)
    ap.add_argument("--days", type=int, default=20, help="working days horizon")
    ap.add_argument("--seed", type=int, default=42)
    ap.add_argument("--cases-csv", type=str, default=None, help="Path to pre-generated cases CSV")
    args = ap.parse_args()

    print("== Validation Run ==")
    print(f"Policy: {args.policy}")
    print(f"Replications: {args.replications}, Horizon (working days): {args.days}")
    if args.cases_csv:
        print(f"Cases source: {args.cases_csv}")

    results: List[KPIResult] = []

    # If cases CSV is provided, load once and close over a custom replication that reuses them
    if args.cases_csv:
        from pathlib import Path
        from scheduler.data.case_generator import CaseGenerator
        preload = CaseGenerator.from_csv(Path(args.cases_csv))

        def run_with_preloaded(policy: str, seed: int, days: int) -> KPIResult:
            # Same as run_replication, but replace built cases with preloaded
            import random
            random.seed(seed)
            cal = CourtCalendar()
            cal.add_standard_holidays(date.today().year)
            rooms = [Courtroom(courtroom_id=i + 1, judge_id=f"J{i+1:03d}", daily_capacity=DEFAULT_DAILY_CAPACITY) for i in range(COURTROOMS)]
            start = date.today().replace(day=1)
            cases = list(preload)  # shallow copy
            working_days = cal.generate_court_calendar(start, start + timedelta(days=days + 30))[:days]
            scheduled = 0
            urgent_records: List[Tuple[bool, int]] = []
            capacity_overflows = 0
            weekend_violations = 0
            unscheduled = set(c.case_id for c in cases)
            for d in working_days:
                if d.weekday() >= 5:
                    weekend_violations += 1
                for c in cases:
                    c.update_age(d)
                    c.compute_readiness_score()
                ordered = [c for c in choose_order(policy, cases) if c.case_id in unscheduled]
                remaining_capacity = {r.courtroom_id: r.get_capacity_for_date(d) if hasattr(r, "get_capacity_for_date") else r.daily_capacity for r in rooms}
                total_capacity_today = sum(remaining_capacity.values())
                filled_today = 0
                ridx = 0
                for c in ordered:
                    if filled_today >= total_capacity_today:
                        break
                    attempts = 0
                    while attempts < len(rooms) and remaining_capacity[rooms[ridx].courtroom_id] == 0:
                        ridx = (ridx + 1) % len(rooms)
                        attempts += 1
                    if attempts >= len(rooms):
                        break
                    room = rooms[ridx]
                    if room.can_schedule(d, c.case_id):
                        room.schedule_case(d, c.case_id)
                        remaining_capacity[room.courtroom_id] -= 1
                        filled_today += 1
                        unscheduled.remove(c.case_id)
                        urgent_records.append((c.is_urgent, working_days_diff(cal, c.filed_date, d)))
                    ridx = (ridx + 1) % len(rooms)
                for room in rooms:
                    day_sched = room.get_daily_schedule(d)
                    if len(day_sched) > room.daily_capacity:
                        capacity_overflows += 1
                scheduled += filled_today
                if not unscheduled:
                    break
            total_capacity = sum(r.daily_capacity for r in rooms) * len(working_days)
            util = utilization(scheduled, total_capacity)
            urgent = urgency_sla(urgent_records, days=7)
            return KPIResult(utilization=util, urgent_sla=urgent, capacity_overflows=capacity_overflows, weekend_violations=weekend_violations)

        for i in range(args.replications):
            results.append(run_with_preloaded(args.policy, args.seed + i, args.days))
    else:
        for i in range(args.replications):
            res = run_replication(args.policy, args.seed + i, args.days)
            results.append(res)

    # aggregate
    util_vals = [r.utilization for r in results]
    urgent_vals = [r.urgent_sla for r in results]
    cap_viol = sum(r.capacity_overflows for r in results)
    wknd_viol = sum(r.weekend_violations for r in results)

    def mean(xs: List[float]) -> float:
        return sum(xs) / len(xs) if xs else 0.0

    print("\n-- KPIs --")
    print(f"Utilization (mean): {mean(util_vals):.2%}")
    print(f"Urgent SLA<=7d (mean): {mean(urgent_vals):.2%}")

    print("\n-- Constraint Violations (should be 0) --")
    print(f"Capacity overflows: {cap_viol}")
    print(f"Weekend/holiday scheduling: {wknd_viol}")

    print("\nNote: This is a lightweight harness for Phase 1; fairness metrics (e.g., Gini of disposal times) will be computed after Phase 3 when full simulation is available.")


if __name__ == "__main__":
    main()