"""Core scheduling algorithm with override mechanism. This module provides the standalone scheduling algorithm that can be used by: - Simulation engine (repeated daily calls) - CLI interface (single-day scheduling) - Web dashboard (API backend) The algorithm accepts cases, courtrooms, date, policy, and optional overrides, then returns scheduled cause list with explanations and audit trail. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import date from typing import Dict, List, Optional, Tuple from src.control.explainability import ExplainabilityEngine, SchedulingExplanation from src.control.overrides import ( JudgePreferences, Override, OverrideType, OverrideValidator, ) from src.core.case import Case, CaseStatus from src.core.courtroom import Courtroom from src.core.policy import SchedulerPolicy from src.core.ripeness import RipenessClassifier, RipenessStatus from src.data.config import MIN_GAP_BETWEEN_HEARINGS from src.simulation.allocator import CourtroomAllocator @dataclass class SchedulingResult: """Result of single-day scheduling with full transparency. Attributes: scheduled_cases: Mapping of courtroom_id to list of scheduled cases explanations: Decision explanations for each case (scheduled + sample unscheduled) applied_overrides: List of overrides that were successfully applied override_rejections: Structured records for rejected overrides unscheduled_cases: Cases not scheduled with reasons (e.g., unripe, capacity full) ripeness_filtered: Count of cases filtered due to unripe status capacity_limited: Count of cases that didn't fit due to courtroom capacity scheduling_date: Date scheduled for policy_used: Name of scheduling policy used (FIFO, Age, Readiness) total_scheduled: Total number of cases scheduled (calculated) """ # Core output scheduled_cases: Dict[int, List[Case]] # Transparency explanations: Dict[str, SchedulingExplanation] applied_overrides: List[Override] override_rejections: List[Dict[str, str]] # Diagnostics unscheduled_cases: List[Tuple[Case, str]] ripeness_filtered: int capacity_limited: int # Metadata scheduling_date: date policy_used: str total_scheduled: int = field(init=False) def __post_init__(self): """Calculate derived fields.""" self.total_scheduled = sum( len(cases) for cases in self.scheduled_cases.values() ) class SchedulingAlgorithm: """Core scheduling algorithm with override support. This is the main product - a clean, reusable scheduling algorithm that: 1. Filters cases by ripeness and eligibility 2. Applies judge preferences and manual overrides 3. Prioritizes cases using selected policy 4. Allocates cases to courtrooms with load balancing 5. Generates explanations for all decisions Usage: algorithm = SchedulingAlgorithm(policy=readiness_policy, allocator=allocator) result = algorithm.schedule_day( cases=active_cases, courtrooms=courtrooms, current_date=date(2024, 3, 15), overrides=judge_overrides, preferences=judge_prefs ) """ def __init__( self, policy: SchedulerPolicy, allocator: Optional[CourtroomAllocator] = None, min_gap_days: int = MIN_GAP_BETWEEN_HEARINGS, ): """Initialize algorithm with policy and allocator. Args: policy: Scheduling policy (FIFO, Age, Readiness) allocator: Courtroom allocator (defaults to load-balanced) min_gap_days: Minimum days between hearings for a case """ self.policy = policy self.allocator = allocator self.min_gap_days = min_gap_days self.explainer = ExplainabilityEngine() def schedule_day( self, cases: List[Case], courtrooms: List[Courtroom], current_date: date, overrides: Optional[List[Override]] = None, preferences: Optional[JudgePreferences] = None, max_explanations_unscheduled: int = 100, ) -> SchedulingResult: """Schedule cases for a single day with override support. Args: cases: All active cases (will be filtered) courtrooms: Available courtrooms current_date: Date to schedule for overrides: Optional manual overrides to apply preferences: Optional judge preferences/constraints max_explanations_unscheduled: Max unscheduled cases to generate explanations for Returns: SchedulingResult with scheduled cases, explanations, and audit trail """ # Initialize tracking unscheduled: List[Tuple[Case, str]] = [] applied_overrides: List[Override] = [] explanations: Dict[str, SchedulingExplanation] = {} override_rejections: List[Dict[str, str]] = [] validated_overrides: List[Override] = [] # Validate overrides if provided if overrides: validator = OverrideValidator() for override in overrides: if validator.validate(override): validated_overrides.append(override) else: errors = validator.get_errors() rejection_reason = ( "; ".join(errors) if errors else "Validation failed" ) override_rejections.append( { "judge": override.judge_id, "context": override.override_type.value, "reason": rejection_reason, } ) unscheduled.append( ( None, f"Invalid override rejected (judge {override.judge_id}): " f"{override.override_type.value} - {rejection_reason}", ) ) # Filter disposed cases active_cases = [c for c in cases if c.status != CaseStatus.DISPOSED] # Update age and readiness for all cases for case in active_cases: case.update_age(current_date) case.compute_readiness_score() # CHECKPOINT 1: Ripeness filtering with override support ripe_cases, ripeness_filtered = self._filter_by_ripeness( active_cases, current_date, validated_overrides, applied_overrides ) # CHECKPOINT 2: Eligibility check (min gap requirement) eligible_cases = self._filter_eligible(ripe_cases, current_date, unscheduled) # CHECKPOINT 3: Apply judge preferences (capacity overrides tracked) if preferences: applied_overrides.extend( self._get_preference_overrides(preferences, courtrooms) ) # CHECKPOINT 4: Prioritize using policy prioritized = self.policy.prioritize(eligible_cases, current_date) # CHECKPOINT 5: Apply manual overrides (add/remove/reorder/priority) if validated_overrides: prioritized = self._apply_manual_overrides( prioritized, validated_overrides, applied_overrides, unscheduled, active_cases, ) # CHECKPOINT 6: Allocate to courtrooms scheduled_allocation, capacity_limited = self._allocate_cases( prioritized, courtrooms, current_date, preferences ) # Track capacity-limited cases total_scheduled = sum(len(cases) for cases in scheduled_allocation.values()) for case in prioritized[total_scheduled:]: unscheduled.append((case, "Capacity exceeded - all courtrooms full")) # CHECKPOINT 7: Generate explanations for scheduled cases for courtroom_id, cases_in_room in scheduled_allocation.items(): for case in cases_in_room: explanation = self.explainer.explain_scheduling_decision( case=case, current_date=current_date, scheduled=True, ripeness_status=case.ripeness_status, priority_score=case.get_priority_score(), courtroom_id=courtroom_id, ) explanations[case.case_id] = explanation # Generate explanations for sample of unscheduled cases for case, reason in unscheduled[:max_explanations_unscheduled]: if case is not None: # Skip invalid override entries explanation = self.explainer.explain_scheduling_decision( case=case, current_date=current_date, scheduled=False, ripeness_status=case.ripeness_status, capacity_full=("Capacity" in reason), below_threshold=False, ) explanations[case.case_id] = explanation self._clear_temporary_case_flags(active_cases) return SchedulingResult( scheduled_cases=scheduled_allocation, explanations=explanations, applied_overrides=applied_overrides, override_rejections=override_rejections, unscheduled_cases=unscheduled, ripeness_filtered=ripeness_filtered, capacity_limited=capacity_limited, scheduling_date=current_date, policy_used=self.policy.get_name(), ) def _filter_by_ripeness( self, cases: List[Case], current_date: date, overrides: Optional[List[Override]], applied_overrides: List[Override], ) -> Tuple[List[Case], int]: """Filter cases by ripeness with override support.""" # Build override lookup ripeness_overrides = {} if overrides: for override in overrides: if override.override_type == OverrideType.RIPENESS: ripeness_overrides[override.case_id] = override.make_ripe ripe_cases = [] filtered_count = 0 for case in cases: # Check for ripeness override if case.case_id in ripeness_overrides: if ripeness_overrides[case.case_id]: case.mark_ripe(current_date) ripe_cases.append(case) # Track override application override = next( o for o in overrides if o.case_id == case.case_id and o.override_type == OverrideType.RIPENESS ) applied_overrides.append(override) else: case.mark_unripe( RipenessStatus.UNRIPE_DEPENDENT, "Judge override", current_date ) filtered_count += 1 continue # Normal ripeness classification ripeness = RipenessClassifier.classify(case, current_date) if ripeness.value != case.ripeness_status: if ripeness.is_ripe(): case.mark_ripe(current_date) else: reason = RipenessClassifier.get_ripeness_reason(ripeness) case.mark_unripe(ripeness, reason, current_date) if ripeness.is_ripe(): ripe_cases.append(case) else: filtered_count += 1 return ripe_cases, filtered_count def _filter_eligible( self, cases: List[Case], current_date: date, unscheduled: List[Tuple[Case, str]] ) -> List[Case]: """Filter cases that meet minimum gap requirement.""" eligible = [] for case in cases: if case.is_ready_for_scheduling(self.min_gap_days): eligible.append(case) else: reason = f"Min gap not met - last hearing {case.days_since_last_hearing}d ago (min {self.min_gap_days}d)" unscheduled.append((case, reason)) return eligible def _get_preference_overrides( self, preferences: JudgePreferences, courtrooms: List[Courtroom] ) -> List[Override]: """Extract overrides from judge preferences for audit trail.""" overrides = [] if preferences.capacity_overrides: from datetime import datetime for courtroom_id, new_capacity in preferences.capacity_overrides.items(): override = Override( override_id=f"pref-capacity-{courtroom_id}-{preferences.judge_id}", override_type=OverrideType.CAPACITY, case_id="", # Not case-specific judge_id=preferences.judge_id, timestamp=datetime.now(), courtroom_id=courtroom_id, new_capacity=new_capacity, reason="Judge preference", ) overrides.append(override) return overrides def _apply_manual_overrides( self, prioritized: List[Case], overrides: List[Override], applied_overrides: List[Override], unscheduled: List[Tuple[Case, str]], all_cases: List[Case], ) -> List[Case]: """Apply manual overrides (ADD_CASE, REMOVE_CASE, PRIORITY, REORDER).""" result = prioritized.copy() # Apply ADD_CASE overrides (insert at high priority) add_overrides = [ o for o in overrides if o.override_type == OverrideType.ADD_CASE ] for override in add_overrides: # Find case in full case list case_to_add = next( (c for c in all_cases if c.case_id == override.case_id), None ) if case_to_add and case_to_add not in result: # Insert at position 0 (highest priority) or specified position insert_pos = ( override.new_position if override.new_position is not None else 0 ) result.insert(min(insert_pos, len(result)), case_to_add) applied_overrides.append(override) # Apply REMOVE_CASE overrides remove_overrides = [ o for o in overrides if o.override_type == OverrideType.REMOVE_CASE ] for override in remove_overrides: removed = [c for c in result if c.case_id == override.case_id] result = [c for c in result if c.case_id != override.case_id] if removed: applied_overrides.append(override) unscheduled.append((removed[0], f"Judge override: {override.reason}")) # Apply PRIORITY overrides (adjust priority scores) priority_overrides = [ o for o in overrides if o.override_type == OverrideType.PRIORITY ] for override in priority_overrides: case_to_adjust = next( (c for c in result if c.case_id == override.case_id), None ) if case_to_adjust and override.new_priority is not None: # Store original priority for reference case_to_adjust.get_priority_score() # Temporarily adjust case to force re-sorting # Note: This is a simplification - in production might need case.set_priority_override() case_to_adjust._priority_override = override.new_priority applied_overrides.append(override) # Re-sort if priority overrides were applied if priority_overrides: result.sort( key=lambda c: getattr(c, "_priority_override", c.get_priority_score()), reverse=True, ) # Apply REORDER overrides (explicit positioning) reorder_overrides = [ o for o in overrides if o.override_type == OverrideType.REORDER ] for override in reorder_overrides: if override.case_id and override.new_position is not None: case_to_move = next( (c for c in result if c.case_id == override.case_id), None ) if case_to_move and 0 <= override.new_position < len(result): result.remove(case_to_move) result.insert(override.new_position, case_to_move) applied_overrides.append(override) return result def _allocate_cases( self, prioritized: List[Case], courtrooms: List[Courtroom], current_date: date, preferences: Optional[JudgePreferences], ) -> Tuple[Dict[int, List[Case]], int]: """Allocate prioritized cases to courtrooms.""" # Calculate total capacity (with preference overrides) total_capacity = 0 for room in courtrooms: if preferences and room.courtroom_id in preferences.capacity_overrides: total_capacity += preferences.capacity_overrides[room.courtroom_id] else: total_capacity += room.get_capacity_for_date(current_date) # Limit cases to total capacity cases_to_allocate = prioritized[:total_capacity] capacity_limited = len(prioritized) - len(cases_to_allocate) # Use allocator to distribute if self.allocator: case_to_courtroom = self.allocator.allocate(cases_to_allocate, current_date) else: # Fallback: round-robin case_to_courtroom = {} for i, case in enumerate(cases_to_allocate): room_id = courtrooms[i % len(courtrooms)].courtroom_id case_to_courtroom[case.case_id] = room_id # Build allocation dict allocation: Dict[int, List[Case]] = {r.courtroom_id: [] for r in courtrooms} for case in cases_to_allocate: if case.case_id in case_to_courtroom: courtroom_id = case_to_courtroom[case.case_id] allocation[courtroom_id].append(case) return allocation, capacity_limited @staticmethod def _clear_temporary_case_flags(cases: List[Case]) -> None: """Remove temporary scheduling flags to keep case objects clean between runs.""" for case in cases: if hasattr(case, "_priority_override"): delattr(case, "_priority_override")