Spaces:
Running
Running
| """Judge override and intervention control system. | |
| Allows judges to review, modify, and approve algorithmic scheduling suggestions. | |
| System is suggestive, not prescriptive - judges retain final control. | |
| """ | |
| import json | |
| from dataclasses import dataclass, field | |
| from datetime import date, datetime | |
| from enum import Enum | |
| from typing import Optional | |
| class OverrideType(Enum): | |
| """Types of overrides judges can make.""" | |
| RIPENESS = "ripeness" # Override ripeness classification | |
| PRIORITY = "priority" # Adjust priority score or urgency | |
| ADD_CASE = "add_case" # Manually add case to cause list | |
| REMOVE_CASE = "remove_case" # Remove case from cause list | |
| REORDER = "reorder" # Change sequence within day | |
| CAPACITY = "capacity" # Adjust daily capacity | |
| MIN_GAP = "min_gap" # Override minimum gap between hearings | |
| COURTROOM = "courtroom" # Change courtroom assignment | |
| class Override: | |
| """Single override action by a judge.""" | |
| override_id: str | |
| override_type: OverrideType | |
| case_id: str | |
| judge_id: str | |
| timestamp: datetime | |
| old_value: Optional[str] = None | |
| new_value: Optional[str] = None | |
| reason: str = "" | |
| date_affected: Optional[date] = None | |
| courtroom_id: Optional[int] = None | |
| # Algorithm-specific attributes | |
| make_ripe: Optional[bool] = None # For RIPENESS overrides | |
| new_position: Optional[int] = None # For REORDER/ADD_CASE overrides | |
| new_priority: Optional[float] = None # For PRIORITY overrides | |
| new_capacity: Optional[int] = None # For CAPACITY overrides | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary for logging.""" | |
| return { | |
| "override_id": self.override_id, | |
| "type": self.override_type.value, | |
| "case_id": self.case_id, | |
| "judge_id": self.judge_id, | |
| "timestamp": self.timestamp.isoformat(), | |
| "old_value": self.old_value, | |
| "new_value": self.new_value, | |
| "reason": self.reason, | |
| "date_affected": self.date_affected.isoformat() if self.date_affected else None, | |
| "courtroom_id": self.courtroom_id, | |
| "make_ripe": self.make_ripe, | |
| "new_position": self.new_position, | |
| "new_priority": self.new_priority, | |
| "new_capacity": self.new_capacity | |
| } | |
| def to_readable_text(self) -> str: | |
| """Human-readable description of override.""" | |
| action_desc = { | |
| OverrideType.RIPENESS: f"Changed ripeness from {self.old_value} to {self.new_value}", | |
| OverrideType.PRIORITY: f"Adjusted priority from {self.old_value} to {self.new_value}", | |
| OverrideType.ADD_CASE: "Manually added case to cause list", | |
| OverrideType.REMOVE_CASE: "Removed case from cause list", | |
| OverrideType.REORDER: f"Reordered from position {self.old_value} to {self.new_value}", | |
| OverrideType.CAPACITY: f"Changed capacity from {self.old_value} to {self.new_value}", | |
| OverrideType.MIN_GAP: f"Overrode min gap from {self.old_value} to {self.new_value} days", | |
| OverrideType.COURTROOM: f"Changed courtroom from {self.old_value} to {self.new_value}" | |
| } | |
| action = action_desc.get(self.override_type, f"Override: {self.override_type.value}") | |
| parts = [ | |
| f"[{self.timestamp.strftime('%Y-%m-%d %H:%M')}]", | |
| f"Judge {self.judge_id}:", | |
| action, | |
| f"(Case {self.case_id})" | |
| ] | |
| if self.reason: | |
| parts.append(f"Reason: {self.reason}") | |
| return " ".join(parts) | |
| class JudgePreferences: | |
| """Judge-specific scheduling preferences.""" | |
| judge_id: str | |
| daily_capacity_override: Optional[int] = None # Override default capacity | |
| blocked_dates: list[date] = field(default_factory=list) # Vacation, illness | |
| min_gap_overrides: dict[str, int] = field(default_factory=dict) # Per-case gap overrides | |
| case_type_preferences: dict[str, list[str]] = field(default_factory=dict) # Day-of-week preferences | |
| capacity_overrides: dict[int, int] = field(default_factory=dict) # Per-courtroom capacity overrides | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary.""" | |
| return { | |
| "judge_id": self.judge_id, | |
| "daily_capacity_override": self.daily_capacity_override, | |
| "blocked_dates": [d.isoformat() for d in self.blocked_dates], | |
| "min_gap_overrides": self.min_gap_overrides, | |
| "case_type_preferences": self.case_type_preferences, | |
| "capacity_overrides": self.capacity_overrides | |
| } | |
| class CauseListDraft: | |
| """Draft cause list before judge approval.""" | |
| date: date | |
| courtroom_id: int | |
| judge_id: str | |
| algorithm_suggested: list[str] # Case IDs suggested by algorithm | |
| judge_approved: list[str] # Case IDs after judge review | |
| overrides: list[Override] | |
| created_at: datetime | |
| finalized_at: Optional[datetime] = None | |
| status: str = "DRAFT" # DRAFT, APPROVED, REJECTED | |
| def get_acceptance_rate(self) -> float: | |
| """Calculate what % of suggestions were accepted.""" | |
| if not self.algorithm_suggested: | |
| return 0.0 | |
| accepted = len(set(self.algorithm_suggested) & set(self.judge_approved)) | |
| return accepted / len(self.algorithm_suggested) * 100 | |
| def get_modifications_summary(self) -> dict: | |
| """Summarize modifications made.""" | |
| added = set(self.judge_approved) - set(self.algorithm_suggested) | |
| removed = set(self.algorithm_suggested) - set(self.judge_approved) | |
| override_counts = {} | |
| for override in self.overrides: | |
| override_type = override.override_type.value | |
| override_counts[override_type] = override_counts.get(override_type, 0) + 1 | |
| return { | |
| "cases_added": len(added), | |
| "cases_removed": len(removed), | |
| "cases_kept": len(set(self.algorithm_suggested) & set(self.judge_approved)), | |
| "override_types": override_counts, | |
| "acceptance_rate": self.get_acceptance_rate() | |
| } | |
| class OverrideValidator: | |
| """Validates override requests against constraints.""" | |
| def __init__(self): | |
| self.errors: list[str] = [] | |
| def validate(self, override: Override) -> bool: | |
| """Validate an override against all applicable constraints. | |
| Args: | |
| override: Override to validate | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| self.errors.clear() | |
| if override.override_type == OverrideType.RIPENESS: | |
| valid, error = self.validate_ripeness_override( | |
| override.case_id, | |
| override.new_value or "", | |
| override.reason | |
| ) | |
| if not valid: | |
| self.errors.append(error) | |
| return False | |
| elif override.override_type == OverrideType.CAPACITY: | |
| if override.new_capacity is not None: | |
| valid, error = self.validate_capacity_override( | |
| int(override.old_value) if override.old_value else 0, | |
| override.new_capacity | |
| ) | |
| if not valid: | |
| self.errors.append(error) | |
| return False | |
| elif override.override_type == OverrideType.PRIORITY: | |
| if override.new_priority is not None: | |
| if not (0 <= override.new_priority <= 1.0): | |
| self.errors.append("Priority must be between 0 and 1.0") | |
| return False | |
| # Basic validation | |
| if not override.case_id: | |
| self.errors.append("Case ID is required") | |
| return False | |
| if not override.judge_id: | |
| self.errors.append("Judge ID is required") | |
| return False | |
| return True | |
| def get_errors(self) -> list[str]: | |
| """Get validation errors from last validation.""" | |
| return self.errors.copy() | |
| def validate_ripeness_override( | |
| case_id: str, | |
| new_status: str, | |
| reason: str | |
| ) -> tuple[bool, str]: | |
| """Validate ripeness override. | |
| Args: | |
| case_id: Case ID | |
| new_status: Requested new status | |
| reason: Reason for override | |
| Returns: | |
| (valid, error_message) | |
| """ | |
| valid_statuses = ["RIPE", "UNRIPE_SUMMONS", "UNRIPE_DEPENDENT", "UNRIPE_PARTY", "UNRIPE_DOCUMENT"] | |
| if new_status not in valid_statuses: | |
| return False, f"Invalid ripeness status: {new_status}" | |
| if not reason: | |
| return False, "Reason required for ripeness override" | |
| if len(reason) < 10: | |
| return False, "Reason must be at least 10 characters" | |
| return True, "" | |
| def validate_capacity_override( | |
| current_capacity: int, | |
| new_capacity: int, | |
| max_capacity: int = 200 | |
| ) -> tuple[bool, str]: | |
| """Validate capacity override. | |
| Args: | |
| current_capacity: Current daily capacity | |
| new_capacity: Requested new capacity | |
| max_capacity: Maximum allowed capacity | |
| Returns: | |
| (valid, error_message) | |
| """ | |
| if new_capacity < 0: | |
| return False, "Capacity cannot be negative" | |
| if new_capacity > max_capacity: | |
| return False, f"Capacity cannot exceed maximum ({max_capacity})" | |
| if new_capacity == 0: | |
| return False, "Capacity cannot be zero (use blocked dates for full closures)" | |
| return True, "" | |
| def validate_add_case( | |
| case_id: str, | |
| current_schedule: list[str], | |
| current_capacity: int, | |
| max_capacity: int | |
| ) -> tuple[bool, str]: | |
| """Validate adding a case to cause list. | |
| Args: | |
| case_id: Case to add | |
| current_schedule: Currently scheduled case IDs | |
| current_capacity: Current number of scheduled cases | |
| max_capacity: Maximum capacity | |
| Returns: | |
| (valid, error_message) | |
| """ | |
| if case_id in current_schedule: | |
| return False, f"Case {case_id} already in schedule" | |
| if current_capacity >= max_capacity: | |
| return False, f"Schedule at capacity ({current_capacity}/{max_capacity})" | |
| return True, "" | |
| def validate_remove_case( | |
| case_id: str, | |
| current_schedule: list[str] | |
| ) -> tuple[bool, str]: | |
| """Validate removing a case from cause list. | |
| Args: | |
| case_id: Case to remove | |
| current_schedule: Currently scheduled case IDs | |
| Returns: | |
| (valid, error_message) | |
| """ | |
| if case_id not in current_schedule: | |
| return False, f"Case {case_id} not in schedule" | |
| return True, "" | |
| class OverrideManager: | |
| """Manages judge overrides and interventions.""" | |
| def __init__(self): | |
| self.overrides: list[Override] = [] | |
| self.drafts: list[CauseListDraft] = [] | |
| self.preferences: dict[str, JudgePreferences] = {} | |
| def create_draft( | |
| self, | |
| date: date, | |
| courtroom_id: int, | |
| judge_id: str, | |
| algorithm_suggested: list[str] | |
| ) -> CauseListDraft: | |
| """Create a draft cause list for judge review. | |
| Args: | |
| date: Date of cause list | |
| courtroom_id: Courtroom ID | |
| judge_id: Judge ID | |
| algorithm_suggested: Case IDs suggested by algorithm | |
| Returns: | |
| Draft cause list | |
| """ | |
| draft = CauseListDraft( | |
| date=date, | |
| courtroom_id=courtroom_id, | |
| judge_id=judge_id, | |
| algorithm_suggested=algorithm_suggested.copy(), | |
| judge_approved=[], | |
| overrides=[], | |
| created_at=datetime.now(), | |
| status="DRAFT" | |
| ) | |
| self.drafts.append(draft) | |
| return draft | |
| def apply_override( | |
| self, | |
| draft: CauseListDraft, | |
| override: Override | |
| ) -> tuple[bool, str]: | |
| """Apply an override to a draft cause list. | |
| Args: | |
| draft: Draft to modify | |
| override: Override to apply | |
| Returns: | |
| (success, error_message) | |
| """ | |
| # Validate based on type | |
| if override.override_type == OverrideType.RIPENESS: | |
| valid, error = OverrideValidator.validate_ripeness_override( | |
| override.case_id, | |
| override.old_value or "", | |
| override.new_value or "", | |
| override.reason | |
| ) | |
| if not valid: | |
| return False, error | |
| elif override.override_type == OverrideType.ADD_CASE: | |
| valid, error = OverrideValidator.validate_add_case( | |
| override.case_id, | |
| draft.judge_approved, | |
| len(draft.judge_approved), | |
| 200 # Max capacity | |
| ) | |
| if not valid: | |
| return False, error | |
| draft.judge_approved.append(override.case_id) | |
| elif override.override_type == OverrideType.REMOVE_CASE: | |
| valid, error = OverrideValidator.validate_remove_case( | |
| override.case_id, | |
| draft.judge_approved | |
| ) | |
| if not valid: | |
| return False, error | |
| draft.judge_approved.remove(override.case_id) | |
| # Record override | |
| draft.overrides.append(override) | |
| self.overrides.append(override) | |
| return True, "" | |
| def finalize_draft(self, draft: CauseListDraft) -> bool: | |
| """Finalize draft cause list (judge approval). | |
| Args: | |
| draft: Draft to finalize | |
| Returns: | |
| Success status | |
| """ | |
| if draft.status != "DRAFT": | |
| return False | |
| draft.status = "APPROVED" | |
| draft.finalized_at = datetime.now() | |
| return True | |
| def get_judge_preferences(self, judge_id: str) -> JudgePreferences: | |
| """Get or create judge preferences. | |
| Args: | |
| judge_id: Judge ID | |
| Returns: | |
| Judge preferences | |
| """ | |
| if judge_id not in self.preferences: | |
| self.preferences[judge_id] = JudgePreferences(judge_id=judge_id) | |
| return self.preferences[judge_id] | |
| def get_override_statistics(self, judge_id: Optional[str] = None) -> dict: | |
| """Get override statistics. | |
| Args: | |
| judge_id: Optional filter by judge | |
| Returns: | |
| Statistics dictionary | |
| """ | |
| relevant_overrides = self.overrides | |
| if judge_id: | |
| relevant_overrides = [o for o in self.overrides if o.judge_id == judge_id] | |
| if not relevant_overrides: | |
| return { | |
| "total_overrides": 0, | |
| "by_type": {}, | |
| "avg_per_day": 0 | |
| } | |
| override_counts = {} | |
| for override in relevant_overrides: | |
| override_type = override.override_type.value | |
| override_counts[override_type] = override_counts.get(override_type, 0) + 1 | |
| # Calculate acceptance rate from drafts | |
| relevant_drafts = self.drafts | |
| if judge_id: | |
| relevant_drafts = [d for d in self.drafts if d.judge_id == judge_id] | |
| acceptance_rates = [d.get_acceptance_rate() for d in relevant_drafts if d.status == "APPROVED"] | |
| avg_acceptance = sum(acceptance_rates) / len(acceptance_rates) if acceptance_rates else 0 | |
| return { | |
| "total_overrides": len(relevant_overrides), | |
| "by_type": override_counts, | |
| "total_drafts": len(relevant_drafts), | |
| "approved_drafts": len([d for d in relevant_drafts if d.status == "APPROVED"]), | |
| "avg_acceptance_rate": avg_acceptance, | |
| "modification_rate": 100 - avg_acceptance if avg_acceptance else 0 | |
| } | |
| def export_audit_trail(self, output_file: str): | |
| """Export complete audit trail to file. | |
| Args: | |
| output_file: Path to output file | |
| """ | |
| audit_data = { | |
| "overrides": [o.to_dict() for o in self.overrides], | |
| "drafts": [ | |
| { | |
| "date": d.date.isoformat(), | |
| "courtroom_id": d.courtroom_id, | |
| "judge_id": d.judge_id, | |
| "status": d.status, | |
| "acceptance_rate": d.get_acceptance_rate(), | |
| "modifications": d.get_modifications_summary() | |
| } | |
| for d in self.drafts | |
| ], | |
| "statistics": self.get_override_statistics() | |
| } | |
| with open(output_file, 'w') as f: | |
| json.dump(audit_data, f, indent=2) | |