File size: 10,910 Bytes
4d0ffdd
 
 
 
 
 
 
6a28f91
4d0ffdd
 
f6c65ef
4d0ffdd
 
 
 
6a28f91
4d0ffdd
 
 
 
f6c65ef
4d0ffdd
 
 
 
 
 
 
 
 
 
2d34efc
4d0ffdd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6d32faf
f6c65ef
6d32faf
 
2d34efc
4d0ffdd
6a28f91
f6c65ef
4d0ffdd
 
 
 
 
6a28f91
4d0ffdd
2d34efc
 
6d32faf
2d34efc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a28f91
 
2d34efc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6c65ef
4d0ffdd
6a28f91
 
 
4d0ffdd
f6c65ef
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
2d34efc
4d0ffdd
 
 
 
 
2d34efc
 
 
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
 
2d34efc
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
 
 
 
2d34efc
4d0ffdd
 
 
 
 
2d34efc
4d0ffdd
 
 
2d34efc
 
 
 
 
 
 
 
4d0ffdd
2d34efc
 
 
f6c65ef
4d0ffdd
6a28f91
 
 
4d0ffdd
f6c65ef
4d0ffdd
 
f6c65ef
4d0ffdd
 
 
 
 
f6c65ef
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
 
 
f6c65ef
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
 
 
 
 
f6c65ef
4d0ffdd
 
f6c65ef
4d0ffdd
 
f6c65ef
4d0ffdd
 
 
f6c65ef
4d0ffdd
f6c65ef
4d0ffdd
 
f6c65ef
4d0ffdd
 
 
 
 
 
 
 
 
2d34efc
4d0ffdd
 
f6c65ef
4d0ffdd
6a28f91
 
 
4d0ffdd
f6c65ef
4d0ffdd
f6c65ef
4d0ffdd
 
 
f6c65ef
4d0ffdd
 
 
 
f6c65ef
4d0ffdd
 
f6c65ef
4d0ffdd
 
 
 
 
 
 
f6c65ef
4d0ffdd
f6c65ef
6d32faf
 
 
f6c65ef
6d32faf
 
 
 
 
 
 
 
 
f6c65ef
6d32faf
 
 
f6c65ef
6d32faf
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""Case ripeness classification for intelligent scheduling.

Ripe cases are ready for substantive judicial time.
Unripe cases have bottlenecks (summons, dependencies, parties, documents).

Based on analysis of historical PurposeOfHearing patterns (see scripts/analyze_ripeness_patterns.py).
"""

from __future__ import annotations

from datetime import datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from src.core.case import Case


class RipenessStatus(Enum):
    """Status indicating whether a case is ready for hearing."""

    RIPE = "RIPE"  # Ready for hearing
    UNRIPE_SUMMONS = "UNRIPE_SUMMONS"  # Waiting for summons service
    UNRIPE_DEPENDENT = "UNRIPE_DEPENDENT"  # Waiting for dependent case/order
    UNRIPE_PARTY = "UNRIPE_PARTY"  # Party/lawyer unavailable
    UNRIPE_DOCUMENT = "UNRIPE_DOCUMENT"  # Missing documents/evidence
    UNKNOWN = "UNKNOWN"  # Cannot determine

    def is_ripe(self) -> bool:
        """Check if status indicates ripeness."""
        return self == RipenessStatus.RIPE

    def is_unripe(self) -> bool:
        """Check if status indicates unripeness."""
        return self in {
            RipenessStatus.UNRIPE_SUMMONS,
            RipenessStatus.UNRIPE_DEPENDENT,
            RipenessStatus.UNRIPE_PARTY,
            RipenessStatus.UNRIPE_DOCUMENT,
        }


# Keywords indicating bottlenecks (data-driven from analyze_ripeness_patterns.py)
UNRIPE_KEYWORDS = {
    "SUMMONS": RipenessStatus.UNRIPE_SUMMONS,
    "NOTICE": RipenessStatus.UNRIPE_SUMMONS,
    "ISSUE": RipenessStatus.UNRIPE_SUMMONS,
    "SERVICE": RipenessStatus.UNRIPE_SUMMONS,
    "STAY": RipenessStatus.UNRIPE_DEPENDENT,
    "PENDING": RipenessStatus.UNRIPE_DEPENDENT,
}

RIPE_KEYWORDS = ["ARGUMENTS", "HEARING", "FINAL", "JUDGMENT", "ORDERS", "DISPOSAL"]


class RipenessClassifier:
    """Classify cases as RIPE or UNRIPE for scheduling optimization.

    Thresholds can be adjusted dynamically based on accuracy feedback.
    """

    # Stages that indicate case is ready for substantive hearing
    RIPE_STAGES = ["ARGUMENTS", "EVIDENCE", "ORDERS / JUDGMENT", "FINAL DISPOSAL"]

    # Stages that indicate administrative/preliminary work
    UNRIPE_STAGES = [
        "PRE-ADMISSION",
        "ADMISSION",  # Most cases stuck here waiting for compliance
        "FRAMING OF CHARGES",
        "INTERLOCUTORY APPLICATION",
    ]

    # Minimum evidence thresholds before declaring a case RIPE
    # These can be adjusted via set_thresholds() for calibration
    MIN_SERVICE_HEARINGS = 1  # At least one hearing to confirm service/compliance
    MIN_STAGE_DAYS = 7  # Time spent in current stage to show compliance efforts
    MIN_CASE_AGE_DAYS = 14  # Minimum maturity before assuming readiness

    @classmethod
    def _has_required_evidence(cls, case: Case) -> tuple[bool, dict[str, bool]]:
        """Check that minimum readiness evidence exists before declaring RIPE."""
        # Evidence of service/compliance: at least one hearing or explicit purpose text
        service_confirmed = case.hearing_count >= cls.MIN_SERVICE_HEARINGS or bool(
            getattr(case, "last_hearing_purpose", None)
        )

        # Evidence the case has progressed in its current stage
        days_in_stage = getattr(case, "days_in_stage", 0)
        compliance_confirmed = (
            case.current_stage not in cls.UNRIPE_STAGES
            or days_in_stage >= cls.MIN_STAGE_DAYS
        )

        # Age-based maturity requirement
        age_confirmed = getattr(case, "age_days", 0) >= cls.MIN_CASE_AGE_DAYS

        evidence = {
            "service": service_confirmed,
            "compliance": compliance_confirmed,
            "age": age_confirmed,
        }

        return all(evidence.values()), evidence

    @classmethod
    def _has_ripe_signal(cls, case: Case) -> bool:
        """Check if stage or hearing purpose indicates readiness."""
        if case.current_stage in cls.RIPE_STAGES:
            return True

        if hasattr(case, "last_hearing_purpose") and case.last_hearing_purpose:
            purpose_upper = case.last_hearing_purpose.upper()
            return any(keyword in purpose_upper for keyword in RIPE_KEYWORDS)

        return False

    @classmethod
    def classify(
        cls, case: Case, current_date: datetime | None = None
    ) -> RipenessStatus:
        """Classify case ripeness status with bottleneck type.

        Args:
            case: Case to classify
            current_date: Current simulation date (defaults to now)

        Returns:
            RipenessStatus enum indicating ripeness and bottleneck type

        Algorithm:
        1. Check last hearing purpose for explicit bottleneck keywords
        2. Check stage (ADMISSION vs ORDERS/JUDGMENT)
        3. Check case maturity (days since filing, hearing count)
        4. Check if stuck (many hearings but no progress)
        5. Require readiness evidence (service/compliance/age) else UNKNOWN
        6. Check explicit ripe signals (stage/purpose)
        7. Default to UNKNOWN if evidence exists but no ripe signal
        """
        if current_date is None:
            current_date = datetime.now()

        # 1. Check last hearing purpose for explicit bottleneck keywords
        if hasattr(case, "last_hearing_purpose") and case.last_hearing_purpose:
            purpose_upper = case.last_hearing_purpose.upper()

            for keyword, bottleneck_type in UNRIPE_KEYWORDS.items():
                if keyword in purpose_upper:
                    return bottleneck_type

        # 2. Check stage - ADMISSION stage with few hearings is likely unripe
        if case.current_stage == "ADMISSION":
            # New cases in ADMISSION (< 3 hearings) are often unripe
            if case.hearing_count < 3:
                return RipenessStatus.UNRIPE_SUMMONS

        # 3. Check if case is "stuck" (many hearings but no progress)
        if case.hearing_count > 10:
            # Calculate average days between hearings
            if case.age_days > 0:
                avg_gap = case.age_days / case.hearing_count

                # If average gap > 60 days, likely stuck due to bottleneck
                if avg_gap > 60:
                    return RipenessStatus.UNRIPE_PARTY

        # 4. Require explicit readiness evidence before declaring RIPE
        evidence_ok, _ = cls._has_required_evidence(case)
        if not evidence_ok:
            return RipenessStatus.UNKNOWN

        # 5. Check stage-based ripeness (ripe stages are substantive) or explicit RIPE signal
        if cls._has_ripe_signal(case):
            return RipenessStatus.RIPE

        # 6. Default to UNKNOWN if no bottlenecks but also no clear ripe signal
        return RipenessStatus.UNKNOWN

    @classmethod
    def get_ripeness_priority(
        cls, case: Case, current_date: datetime | None = None
    ) -> float:
        """Get priority adjustment based on ripeness.

        Ripe cases should get judicial time priority over unripe cases
        when scheduling is tight.

        Returns:
            Priority multiplier (1.5 for RIPE, 0.7 for UNRIPE)
        """
        ripeness = cls.classify(case, current_date)
        return 1.5 if ripeness.is_ripe() else 0.7

    @classmethod
    def is_schedulable(cls, case: Case, current_date: datetime | None = None) -> bool:
        """Determine if a case can be scheduled for a hearing.

        A case is schedulable if:
        - It is RIPE (no bottlenecks)
        - It has been sufficient time since last hearing
        - It is not disposed

        Args:
            case: The case to check
            current_date: Current simulation date

        Returns:
            True if case can be scheduled, False otherwise
        """
        # Check disposal status
        if case.is_disposed:
            return False

        # Calculate current ripeness
        ripeness = cls.classify(case, current_date)

        # Only RIPE cases can be scheduled
        return ripeness.is_ripe()

    @classmethod
    def get_ripeness_reason(cls, ripeness_status: RipenessStatus) -> str:
        """Get human-readable explanation for ripeness status.

        Used in dashboard tooltips and reports.

        Args:
            ripeness_status: The status to explain

        Returns:
            Human-readable explanation string
        """
        reasons = {
            RipenessStatus.RIPE: "Case is ready for hearing (no bottlenecks detected)",
            RipenessStatus.UNRIPE_SUMMONS: "Waiting for summons service or notice response",
            RipenessStatus.UNRIPE_DEPENDENT: "Waiting for another case or court order",
            RipenessStatus.UNRIPE_PARTY: "Party or lawyer unavailable",
            RipenessStatus.UNRIPE_DOCUMENT: "Missing documents or evidence",
            RipenessStatus.UNKNOWN: "Insufficient readiness evidence; route to manual triage",
        }
        return reasons.get(ripeness_status, "Unknown status")

    @classmethod
    def estimate_ripening_time(
        cls, case: Case, current_date: datetime
    ) -> timedelta | None:
        """Estimate time until case becomes ripe.

        This is a heuristic based on bottleneck type and historical data.

        Args:
            case: The case to evaluate
            current_date: Current simulation date

        Returns:
            Estimated timedelta until ripe, or None if already ripe or unknown
        """
        ripeness = cls.classify(case, current_date)

        if ripeness.is_ripe():
            return timedelta(0)

        # Heuristic estimates based on bottleneck type
        estimates = {
            RipenessStatus.UNRIPE_SUMMONS: timedelta(days=30),
            RipenessStatus.UNRIPE_DEPENDENT: timedelta(days=60),
            RipenessStatus.UNRIPE_PARTY: timedelta(days=14),
            RipenessStatus.UNRIPE_DOCUMENT: timedelta(days=21),
        }

        return estimates.get(ripeness, None)

    @classmethod
    def set_thresholds(cls, new_thresholds: dict[str, int | float]) -> None:
        """Update classification thresholds for calibration.

        Args:
            new_thresholds: Dictionary with threshold names and values
                           e.g., {"MIN_SERVICE_HEARINGS": 2, "MIN_STAGE_DAYS": 5}
        """
        for threshold_name, value in new_thresholds.items():
            if hasattr(cls, threshold_name):
                setattr(cls, threshold_name, int(value))
            else:
                raise ValueError(f"Unknown threshold: {threshold_name}")

    @classmethod
    def get_current_thresholds(cls) -> dict[str, int]:
        """Get current threshold values.

        Returns:
            Dictionary of threshold names and values
        """
        return {
            "MIN_SERVICE_HEARINGS": cls.MIN_SERVICE_HEARINGS,
            "MIN_STAGE_DAYS": cls.MIN_STAGE_DAYS,
            "MIN_CASE_AGE_DAYS": cls.MIN_CASE_AGE_DAYS,
        }