"""Judge override and intervention control system. Allows judges to review, modify, and approve algorithmic scheduling suggestions. System is suggestive, not prescriptive - judges retain final control. """ import json from dataclasses import dataclass, field from datetime import date, datetime from enum import Enum from typing import Optional class OverrideType(Enum): """Types of overrides judges can make.""" RIPENESS = "ripeness" # Override ripeness classification PRIORITY = "priority" # Adjust priority score or urgency ADD_CASE = "add_case" # Manually add case to cause list REMOVE_CASE = "remove_case" # Remove case from cause list REORDER = "reorder" # Change sequence within day CAPACITY = "capacity" # Adjust daily capacity MIN_GAP = "min_gap" # Override minimum gap between hearings COURTROOM = "courtroom" # Change courtroom assignment @dataclass class Override: """Single override action by a judge.""" override_id: str override_type: OverrideType case_id: str judge_id: str timestamp: datetime old_value: Optional[str] = None new_value: Optional[str] = None reason: str = "" date_affected: Optional[date] = None courtroom_id: Optional[int] = None # Algorithm-specific attributes make_ripe: Optional[bool] = None # For RIPENESS overrides new_position: Optional[int] = None # For REORDER/ADD_CASE overrides new_priority: Optional[float] = None # For PRIORITY overrides new_capacity: Optional[int] = None # For CAPACITY overrides def to_dict(self) -> dict: """Convert to dictionary for logging.""" return { "override_id": self.override_id, "type": self.override_type.value, "case_id": self.case_id, "judge_id": self.judge_id, "timestamp": self.timestamp.isoformat(), "old_value": self.old_value, "new_value": self.new_value, "reason": self.reason, "date_affected": self.date_affected.isoformat() if self.date_affected else None, "courtroom_id": self.courtroom_id, "make_ripe": self.make_ripe, "new_position": self.new_position, "new_priority": self.new_priority, "new_capacity": self.new_capacity } def to_readable_text(self) -> str: """Human-readable description of override.""" action_desc = { OverrideType.RIPENESS: f"Changed ripeness from {self.old_value} to {self.new_value}", OverrideType.PRIORITY: f"Adjusted priority from {self.old_value} to {self.new_value}", OverrideType.ADD_CASE: "Manually added case to cause list", OverrideType.REMOVE_CASE: "Removed case from cause list", OverrideType.REORDER: f"Reordered from position {self.old_value} to {self.new_value}", OverrideType.CAPACITY: f"Changed capacity from {self.old_value} to {self.new_value}", OverrideType.MIN_GAP: f"Overrode min gap from {self.old_value} to {self.new_value} days", OverrideType.COURTROOM: f"Changed courtroom from {self.old_value} to {self.new_value}" } action = action_desc.get(self.override_type, f"Override: {self.override_type.value}") parts = [ f"[{self.timestamp.strftime('%Y-%m-%d %H:%M')}]", f"Judge {self.judge_id}:", action, f"(Case {self.case_id})" ] if self.reason: parts.append(f"Reason: {self.reason}") return " ".join(parts) @dataclass class JudgePreferences: """Judge-specific scheduling preferences.""" judge_id: str daily_capacity_override: Optional[int] = None # Override default capacity blocked_dates: list[date] = field(default_factory=list) # Vacation, illness min_gap_overrides: dict[str, int] = field(default_factory=dict) # Per-case gap overrides case_type_preferences: dict[str, list[str]] = field(default_factory=dict) # Day-of-week preferences capacity_overrides: dict[int, int] = field(default_factory=dict) # Per-courtroom capacity overrides def to_dict(self) -> dict: """Convert to dictionary.""" return { "judge_id": self.judge_id, "daily_capacity_override": self.daily_capacity_override, "blocked_dates": [d.isoformat() for d in self.blocked_dates], "min_gap_overrides": self.min_gap_overrides, "case_type_preferences": self.case_type_preferences, "capacity_overrides": self.capacity_overrides } @dataclass class CauseListDraft: """Draft cause list before judge approval.""" date: date courtroom_id: int judge_id: str algorithm_suggested: list[str] # Case IDs suggested by algorithm judge_approved: list[str] # Case IDs after judge review overrides: list[Override] created_at: datetime finalized_at: Optional[datetime] = None status: str = "DRAFT" # DRAFT, APPROVED, REJECTED def get_acceptance_rate(self) -> float: """Calculate what % of suggestions were accepted.""" if not self.algorithm_suggested: return 0.0 accepted = len(set(self.algorithm_suggested) & set(self.judge_approved)) return accepted / len(self.algorithm_suggested) * 100 def get_modifications_summary(self) -> dict: """Summarize modifications made.""" added = set(self.judge_approved) - set(self.algorithm_suggested) removed = set(self.algorithm_suggested) - set(self.judge_approved) override_counts = {} for override in self.overrides: override_type = override.override_type.value override_counts[override_type] = override_counts.get(override_type, 0) + 1 return { "cases_added": len(added), "cases_removed": len(removed), "cases_kept": len(set(self.algorithm_suggested) & set(self.judge_approved)), "override_types": override_counts, "acceptance_rate": self.get_acceptance_rate() } class OverrideValidator: """Validates override requests against constraints.""" def __init__(self): self.errors: list[str] = [] def validate(self, override: Override) -> bool: """Validate an override against all applicable constraints. Args: override: Override to validate Returns: True if valid, False otherwise """ self.errors.clear() if override.override_type == OverrideType.RIPENESS: valid, error = self.validate_ripeness_override( override.case_id, override.new_value or "", override.reason ) if not valid: self.errors.append(error) return False elif override.override_type == OverrideType.CAPACITY: if override.new_capacity is not None: valid, error = self.validate_capacity_override( int(override.old_value) if override.old_value else 0, override.new_capacity ) if not valid: self.errors.append(error) return False elif override.override_type == OverrideType.PRIORITY: if override.new_priority is not None: if not (0 <= override.new_priority <= 1.0): self.errors.append("Priority must be between 0 and 1.0") return False # Basic validation if not override.case_id: self.errors.append("Case ID is required") return False if not override.judge_id: self.errors.append("Judge ID is required") return False return True def get_errors(self) -> list[str]: """Get validation errors from last validation.""" return self.errors.copy() @staticmethod def validate_ripeness_override( case_id: str, new_status: str, reason: str ) -> tuple[bool, str]: """Validate ripeness override. Args: case_id: Case ID new_status: Requested new status reason: Reason for override Returns: (valid, error_message) """ valid_statuses = ["RIPE", "UNRIPE_SUMMONS", "UNRIPE_DEPENDENT", "UNRIPE_PARTY", "UNRIPE_DOCUMENT"] if new_status not in valid_statuses: return False, f"Invalid ripeness status: {new_status}" if not reason: return False, "Reason required for ripeness override" if len(reason) < 10: return False, "Reason must be at least 10 characters" return True, "" @staticmethod def validate_capacity_override( current_capacity: int, new_capacity: int, max_capacity: int = 200 ) -> tuple[bool, str]: """Validate capacity override. Args: current_capacity: Current daily capacity new_capacity: Requested new capacity max_capacity: Maximum allowed capacity Returns: (valid, error_message) """ if new_capacity < 0: return False, "Capacity cannot be negative" if new_capacity > max_capacity: return False, f"Capacity cannot exceed maximum ({max_capacity})" if new_capacity == 0: return False, "Capacity cannot be zero (use blocked dates for full closures)" return True, "" @staticmethod def validate_add_case( case_id: str, current_schedule: list[str], current_capacity: int, max_capacity: int ) -> tuple[bool, str]: """Validate adding a case to cause list. Args: case_id: Case to add current_schedule: Currently scheduled case IDs current_capacity: Current number of scheduled cases max_capacity: Maximum capacity Returns: (valid, error_message) """ if case_id in current_schedule: return False, f"Case {case_id} already in schedule" if current_capacity >= max_capacity: return False, f"Schedule at capacity ({current_capacity}/{max_capacity})" return True, "" @staticmethod def validate_remove_case( case_id: str, current_schedule: list[str] ) -> tuple[bool, str]: """Validate removing a case from cause list. Args: case_id: Case to remove current_schedule: Currently scheduled case IDs Returns: (valid, error_message) """ if case_id not in current_schedule: return False, f"Case {case_id} not in schedule" return True, "" class OverrideManager: """Manages judge overrides and interventions.""" def __init__(self): self.overrides: list[Override] = [] self.drafts: list[CauseListDraft] = [] self.preferences: dict[str, JudgePreferences] = {} def create_draft( self, date: date, courtroom_id: int, judge_id: str, algorithm_suggested: list[str] ) -> CauseListDraft: """Create a draft cause list for judge review. Args: date: Date of cause list courtroom_id: Courtroom ID judge_id: Judge ID algorithm_suggested: Case IDs suggested by algorithm Returns: Draft cause list """ draft = CauseListDraft( date=date, courtroom_id=courtroom_id, judge_id=judge_id, algorithm_suggested=algorithm_suggested.copy(), judge_approved=[], overrides=[], created_at=datetime.now(), status="DRAFT" ) self.drafts.append(draft) return draft def apply_override( self, draft: CauseListDraft, override: Override ) -> tuple[bool, str]: """Apply an override to a draft cause list. Args: draft: Draft to modify override: Override to apply Returns: (success, error_message) """ # Validate based on type if override.override_type == OverrideType.RIPENESS: valid, error = OverrideValidator.validate_ripeness_override( override.case_id, override.old_value or "", override.new_value or "", override.reason ) if not valid: return False, error elif override.override_type == OverrideType.ADD_CASE: valid, error = OverrideValidator.validate_add_case( override.case_id, draft.judge_approved, len(draft.judge_approved), 200 # Max capacity ) if not valid: return False, error draft.judge_approved.append(override.case_id) elif override.override_type == OverrideType.REMOVE_CASE: valid, error = OverrideValidator.validate_remove_case( override.case_id, draft.judge_approved ) if not valid: return False, error draft.judge_approved.remove(override.case_id) # Record override draft.overrides.append(override) self.overrides.append(override) return True, "" def finalize_draft(self, draft: CauseListDraft) -> bool: """Finalize draft cause list (judge approval). Args: draft: Draft to finalize Returns: Success status """ if draft.status != "DRAFT": return False draft.status = "APPROVED" draft.finalized_at = datetime.now() return True def get_judge_preferences(self, judge_id: str) -> JudgePreferences: """Get or create judge preferences. Args: judge_id: Judge ID Returns: Judge preferences """ if judge_id not in self.preferences: self.preferences[judge_id] = JudgePreferences(judge_id=judge_id) return self.preferences[judge_id] def get_override_statistics(self, judge_id: Optional[str] = None) -> dict: """Get override statistics. Args: judge_id: Optional filter by judge Returns: Statistics dictionary """ relevant_overrides = self.overrides if judge_id: relevant_overrides = [o for o in self.overrides if o.judge_id == judge_id] if not relevant_overrides: return { "total_overrides": 0, "by_type": {}, "avg_per_day": 0 } override_counts = {} for override in relevant_overrides: override_type = override.override_type.value override_counts[override_type] = override_counts.get(override_type, 0) + 1 # Calculate acceptance rate from drafts relevant_drafts = self.drafts if judge_id: relevant_drafts = [d for d in self.drafts if d.judge_id == judge_id] acceptance_rates = [d.get_acceptance_rate() for d in relevant_drafts if d.status == "APPROVED"] avg_acceptance = sum(acceptance_rates) / len(acceptance_rates) if acceptance_rates else 0 return { "total_overrides": len(relevant_overrides), "by_type": override_counts, "total_drafts": len(relevant_drafts), "approved_drafts": len([d for d in relevant_drafts if d.status == "APPROVED"]), "avg_acceptance_rate": avg_acceptance, "modification_rate": 100 - avg_acceptance if avg_acceptance else 0 } def export_audit_trail(self, output_file: str): """Export complete audit trail to file. Args: output_file: Path to output file """ audit_data = { "overrides": [o.to_dict() for o in self.overrides], "drafts": [ { "date": d.date.isoformat(), "courtroom_id": d.courtroom_id, "judge_id": d.judge_id, "status": d.status, "acceptance_rate": d.get_acceptance_rate(), "modifications": d.get_modifications_summary() } for d in self.drafts ], "statistics": self.get_override_statistics() } with open(output_file, 'w') as f: json.dump(audit_data, f, indent=2)