Spaces:
Sleeping
Court Scheduling System - Complete Workflow & Logic Flow
Step-by-Step Guide: How the System Actually Works
Table of Contents
- System Workflow Overview
- Phase 1: Data Preparation
- Phase 2: Simulation Initialization
- Phase 3: Daily Scheduling Loop
- Phase 4: Output Generation
- Phase 5: Analysis & Reporting
- Complete Example Walkthrough
- Data Flow Pipeline
System Workflow Overview
The Court Scheduling System operates in 5 sequential phases that transform historical court data into optimized daily cause lists:
Historical Data β Data Preparation β Simulation Setup β Daily Scheduling β Output Generation β Analysis
β β β β β β
739K hearings Parameters & Initialized Daily cause CSV files & Performance
134K cases Generated cases simulation lists for 384 Reports metrics
Key Outputs:
- Daily Cause Lists: CSV files for each courtroom/day
- Simulation Report: Overall performance summary
- Metrics File: Daily performance tracking
- Individual Case Audit: Complete hearing history
Phase 1: Data Preparation
Step 1.1: Historical Data Analysis (EDA Pipeline)
Input:
ISDMHack_Case.csv(134,699 cases)ISDMHack_Hear.csv(739,670 hearings)
Process:
# Load and merge historical data
cases_df = pd.read_csv("ISDMHack_Case.csv")
hearings_df = pd.read_csv("ISDMHack_Hear.csv")
merged_data = cases_df.merge(hearings_df, on="Case_ID")
# Extract key parameters
case_type_distribution = cases_df["Type"].value_counts(normalize=True)
stage_transitions = calculate_stage_progression_probabilities(merged_data)
adjournment_rates = calculate_adjournment_rates_by_stage(hearings_df)
daily_capacity = hearings_df.groupby("Hearing_Date").size().mean()
Output:
# Extracted parameters stored in config.py
CASE_TYPE_DISTRIBUTION = {"CRP": 0.201, "CA": 0.200, ...}
STAGE_TRANSITIONS = {"ADMISSION->ARGUMENTS": 0.72, ...}
ADJOURNMENT_RATES = {"ADMISSION": 0.38, "ARGUMENTS": 0.31, ...}
DEFAULT_DAILY_CAPACITY = 151 # cases per courtroom per day
Step 1.2: Synthetic Case Generation
Input:
- Configuration:
configs/generate.sample.toml - Extracted parameters from Step 1.1
Process:
# Generate 10,000 synthetic cases
for i in range(10000):
case = Case(
case_id=f"C{i:06d}",
case_type=random_choice_weighted(CASE_TYPE_DISTRIBUTION),
filed_date=random_date_in_range("2022-01-01", "2023-12-31"),
current_stage=random_choice_weighted(STAGE_DISTRIBUTION),
is_urgent=random_boolean(0.05), # 5% urgent cases
)
# Add realistic hearing history
generate_hearing_history(case, historical_patterns)
cases.append(case)
Output:
data/generated/cases.csvwith 10,000 synthetic cases- Each case has realistic attributes based on historical patterns
Phase 2: Simulation Initialization
Step 2.1: Load Configuration
Input: configs/simulate.sample.toml
cases = "data/generated/cases.csv"
days = 384 # 2-year simulation
policy = "readiness" # Scheduling policy
courtrooms = 5
daily_capacity = 151
Step 2.2: Initialize System State
Process:
# Load generated cases
cases = load_cases_from_csv("data/generated/cases.csv")
# Initialize courtrooms
courtrooms = [
Courtroom(id=1, daily_capacity=151),
Courtroom(id=2, daily_capacity=151),
# ... 5 courtrooms total
]
# Initialize scheduling policy
policy = ReadinessPolicy(
fairness_weight=0.4,
efficiency_weight=0.3,
urgency_weight=0.3
)
# Initialize simulation clock
current_date = datetime(2023, 12, 29) # Start date
end_date = current_date + timedelta(days=384)
Output:
- Simulation environment ready with 10,000 cases and 5 courtrooms
- Policy configured with optimization weights
Phase 3: Daily Scheduling Loop
This is the core algorithm that runs 384 times (once per working day)
Daily Loop Structure
for day in range(384): # Each working day for 2 years
current_date += timedelta(days=1)
# Skip weekends and holidays
if not is_working_day(current_date):
continue
# Execute daily scheduling algorithm
daily_result = schedule_daily_hearings(cases, current_date)
# Update system state for next day
update_case_states(cases, daily_result)
# Generate daily outputs
generate_cause_lists(daily_result, current_date)
Step 3.1: Daily Scheduling Algorithm (Core Logic)
INPUT:
- All active cases (initially 10,000)
- Current date
- Courtroom capacities
CHECKPOINT 1: Case Status Filtering
# Filter out disposed cases
active_cases = [case for case in all_cases
if case.status in [PENDING, SCHEDULED]]
print(f"Day {day}: {len(active_cases)} active cases")
# Example: Day 1: 10,000 active cases β Day 200: 6,500 active cases
CHECKPOINT 2: Case Attribute Updates
for case in active_cases:
# Update age (days since filing)
case.age_days = (current_date - case.filed_date).days
# Update readiness score based on stage and hearing history
case.readiness_score = calculate_readiness(case)
# Update days since last scheduled
if case.last_scheduled_date:
case.days_since_last_scheduled = (current_date - case.last_scheduled_date).days
CHECKPOINT 3: Ripeness Classification (Critical Filter)
ripe_cases = []
ripeness_stats = {"RIPE": 0, "UNRIPE_SUMMONS": 0, "UNRIPE_DEPENDENT": 0, "UNRIPE_PARTY": 0}
for case in active_cases:
ripeness = RipenessClassifier.classify(case, current_date)
ripeness_stats[ripeness.status] += 1
if ripeness.is_ripe():
ripe_cases.append(case)
else:
case.bottleneck_reason = ripeness.reason
print(f"Ripeness Filter: {len(active_cases)} β {len(ripe_cases)} cases")
# Example: 6,500 active β 3,850 ripe cases (40.8% filtered out)
Ripeness Classification Logic:
def classify(case, current_date):
# Step 1: Check explicit bottlenecks in last hearing purpose
if "SUMMONS" in case.last_hearing_purpose:
return RipenessStatus.UNRIPE_SUMMONS
if "STAY" in case.last_hearing_purpose:
return RipenessStatus.UNRIPE_DEPENDENT
# Step 2: Early admission cases likely waiting for service
if case.current_stage == "ADMISSION" and case.hearing_count < 3:
return RipenessStatus.UNRIPE_SUMMONS
# Step 3: Detect stuck cases (many hearings, no progress)
if case.hearing_count > 10 and case.avg_gap_days > 60:
return RipenessStatus.UNRIPE_PARTY
# Step 4: Advanced stages are usually ready
if case.current_stage in ["ARGUMENTS", "EVIDENCE", "ORDERS / JUDGMENT"]:
return RipenessStatus.RIPE
# Step 5: Conservative default
return RipenessStatus.RIPE
CHECKPOINT 4: Eligibility Check (Timing Constraints)
eligible_cases = []
for case in ripe_cases:
# Check minimum 14-day gap between hearings
if case.last_hearing_date:
days_since_last = (current_date - case.last_hearing_date).days
if days_since_last < MIN_GAP_BETWEEN_HEARINGS:
continue
eligible_cases.append(case)
print(f"Eligibility Filter: {len(ripe_cases)} β {len(eligible_cases)} cases")
# Example: 3,850 ripe β 3,200 eligible cases
CHECKPOINT 5: Priority Scoring (Policy Application)
for case in eligible_cases:
# Multi-factor priority calculation
age_component = min(case.age_days / 365, 1.0) * 0.35
readiness_component = case.readiness_score * 0.25
urgency_component = (1.0 if case.is_urgent else 0.5) * 0.25
boost_component = calculate_adjournment_boost(case) * 0.15
case.priority_score = age_component + readiness_component + urgency_component + boost_component
# Sort by priority (highest first)
prioritized_cases = sorted(eligible_cases, key=lambda c: c.priority_score, reverse=True)
CHECKPOINT 6: Judge Overrides (Optional)
if daily_overrides:
# Apply ADD_CASE overrides (highest priority)
for override in add_case_overrides:
case_to_add = find_case_by_id(override.case_id)
prioritized_cases.insert(override.new_position, case_to_add)
# Apply REMOVE_CASE overrides
for override in remove_case_overrides:
prioritized_cases = [c for c in prioritized_cases if c.case_id != override.case_id]
# Apply PRIORITY overrides
for override in priority_overrides:
case = find_case_in_list(prioritized_cases, override.case_id)
case.priority_score = override.new_priority
# Re-sort after priority changes
prioritized_cases.sort(key=lambda c: c.priority_score, reverse=True)
CHECKPOINT 7: Multi-Courtroom Allocation
# Load balancing algorithm
courtroom_loads = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
daily_schedule = {1: [], 2: [], 3: [], 4: [], 5: []}
for case in prioritized_cases:
# Find least loaded courtroom
target_courtroom = min(courtroom_loads.items(), key=lambda x: x[1])[0]
# Check capacity constraint
if courtroom_loads[target_courtroom] >= DEFAULT_DAILY_CAPACITY:
# All courtrooms at capacity, remaining cases unscheduled
break
# Assign case to courtroom
daily_schedule[target_courtroom].append(case)
courtroom_loads[target_courtroom] += 1
case.last_scheduled_date = current_date
total_scheduled = sum(len(cases) for cases in daily_schedule.values())
print(f"Allocation: {total_scheduled} cases scheduled across 5 courtrooms")
# Example: 703 cases scheduled (5 Γ 140-141 per courtroom)
CHECKPOINT 8: Generate Explanations
explanations = {}
for courtroom_id, cases in daily_schedule.items():
for i, case in enumerate(cases):
urgency_text = "HIGH URGENCY" if case.is_urgent else "standard urgency"
stage_text = f"{case.current_stage.lower()} stage"
assignment_text = f"assigned to Courtroom {courtroom_id}"
explanations[case.case_id] = f"{urgency_text} | {stage_text} | {assignment_text}"
Step 3.2: Case State Updates (After Each Day)
def update_case_states(cases, daily_result):
for case in cases:
if case.case_id in daily_result.scheduled_cases:
# Case was scheduled today
case.status = CaseStatus.SCHEDULED
case.hearing_count += 1
case.last_hearing_date = current_date
# Simulate hearing outcome
if random.random() < get_adjournment_rate(case.current_stage):
# Case adjourned - stays in same stage
case.history.append({
"date": current_date,
"outcome": "ADJOURNED",
"next_hearing": current_date + timedelta(days=21)
})
else:
# Case heard - may progress to next stage or dispose
if should_progress_stage(case):
case.current_stage = get_next_stage(case.current_stage)
if should_dispose(case):
case.status = CaseStatus.DISPOSED
case.disposal_date = current_date
else:
# Case not scheduled today
case.days_since_last_scheduled += 1
Phase 4: Output Generation
Step 4.1: Daily Cause List Generation
For each courtroom and each day:
# Generate cause_list_courtroom_1_2024-01-15.csv
def generate_daily_cause_list(courtroom_id, date, scheduled_cases):
cause_list = []
for i, case in enumerate(scheduled_cases):
cause_list.append({
"Date": date.strftime("%Y-%m-%d"),
"Courtroom_ID": courtroom_id,
"Case_ID": case.case_id,
"Case_Type": case.case_type,
"Stage": case.current_stage,
"Purpose": "HEARING",
"Sequence_Number": i + 1,
"Explanation": explanations[case.case_id]
})
# Save to CSV
df = pd.DataFrame(cause_list)
df.to_csv(f"cause_list_courtroom_{courtroom_id}_{date.strftime('%Y-%m-%d')}.csv")
Example Output:
Date,Courtroom_ID,Case_ID,Case_Type,Stage,Purpose,Sequence_Number,Explanation
2024-01-15,1,C002847,CRP,ARGUMENTS,HEARING,1,"HIGH URGENCY | arguments stage | assigned to Courtroom 1"
2024-01-15,1,C005123,CA,ADMISSION,HEARING,2,"standard urgency | admission stage | assigned to Courtroom 1"
2024-01-15,1,C001456,RSA,EVIDENCE,HEARING,3,"standard urgency | evidence stage | assigned to Courtroom 1"
Step 4.2: Daily Metrics Tracking
def record_daily_metrics(date, daily_result):
metrics = {
"date": date,
"scheduled": daily_result.total_scheduled,
"heard": calculate_heard_cases(daily_result),
"adjourned": calculate_adjourned_cases(daily_result),
"disposed": count_disposed_today(daily_result),
"utilization": daily_result.total_scheduled / (COURTROOMS * DEFAULT_DAILY_CAPACITY),
"gini_coefficient": calculate_gini_coefficient(courtroom_loads),
"ripeness_filtered": daily_result.ripeness_filtered_count
}
# Append to metrics.csv
append_to_csv("metrics.csv", metrics)
Example metrics.csv:
date,scheduled,heard,adjourned,disposed,utilization,gini_coefficient,ripeness_filtered
2024-01-15,703,430,273,12,0.931,0.245,287
2024-01-16,698,445,253,15,0.924,0.248,301
2024-01-17,701,421,280,18,0.928,0.251,294
Phase 5: Analysis & Reporting
Step 5.1: Simulation Summary Report
After all 384 days complete:
def generate_simulation_report():
total_hearings = sum(daily_metrics["scheduled"])
total_heard = sum(daily_metrics["heard"])
total_adjourned = sum(daily_metrics["adjourned"])
total_disposed = count_disposed_cases()
report = f"""
SIMULATION SUMMARY
Horizon: {start_date} β {end_date} ({simulation_days} days)
Case Metrics:
Initial cases: {initial_case_count:,}
Cases disposed: {total_disposed:,} ({total_disposed/initial_case_count:.1%})
Cases remaining: {initial_case_count - total_disposed:,}
Hearing Metrics:
Total hearings: {total_hearings:,}
Heard: {total_heard:,} ({total_heard/total_hearings:.1%})
Adjourned: {total_adjourned:,} ({total_adjourned/total_hearings:.1%})
Efficiency Metrics:
Disposal rate: {total_disposed/initial_case_count:.1%}
Utilization: {avg_utilization:.1%}
Gini coefficient: {avg_gini:.3f}
Ripeness filtering: {avg_ripeness_filtered/avg_eligible:.1%}
"""
with open("simulation_report.txt", "w") as f:
f.write(report)
Step 5.2: Performance Analysis
# Calculate key performance indicators
disposal_rate = total_disposed / initial_cases # Target: >70%
load_balance = calculate_gini_coefficient(courtroom_loads) # Target: <0.4
case_coverage = scheduled_cases / eligible_cases # Target: >95%
bottleneck_efficiency = ripeness_filtered / total_cases # Higher = better filtering
print(f"PERFORMANCE RESULTS:")
print(f"Disposal Rate: {disposal_rate:.1%} ({'β' if disposal_rate > 0.70 else 'β'})")
print(f"Load Balance: {load_balance:.3f} ({'β' if load_balance < 0.40 else 'β'})")
print(f"Case Coverage: {case_coverage:.1%} ({'β' if case_coverage > 0.95 else 'β'})")
Complete Example Walkthrough
Let's trace a single case through the entire system:
Case: C002847 (Civil Revision Petition)
Day 0: Case Generation
case = Case(
case_id="C002847",
case_type="CRP",
filed_date=date(2022, 03, 15),
current_stage="ADMISSION",
is_urgent=True, # Medical emergency
hearing_count=0,
last_hearing_date=None
)
Day 1: First Scheduling Attempt (2023-12-29)
# Checkpoint 1: Active? YES (status = PENDING)
# Checkpoint 2: Updates
case.age_days = 654 # Almost 2 years old
case.readiness_score = 0.3 # Low (admission stage)
# Checkpoint 3: Ripeness
ripeness = classify(case, current_date) # UNRIPE_SUMMONS (admission stage, 0 hearings)
# Result: FILTERED OUT (not scheduled)
Day 45: Second Attempt (2024-02-26)
# Case now has 3 hearings, still in admission but making progress
case.hearing_count = 3
case.current_stage = "ADMISSION"
# Checkpoint 3: Ripeness
ripeness = classify(case, current_date) # RIPE (>3 hearings in admission)
# Checkpoint 5: Priority Scoring
age_component = min(689 / 365, 1.0) * 0.35 = 0.35
readiness_component = 0.4 * 0.25 = 0.10
urgency_component = 1.0 * 0.25 = 0.25 # HIGH URGENCY
boost_component = 0.0 * 0.15 = 0.0
case.priority_score = 0.70 # High priority
# Checkpoint 7: Allocation
# Assigned to Courtroom 1 (least loaded), Position 3
# Result: SCHEDULED
Daily Cause List Entry:
2024-02-26,1,C002847,CRP,ADMISSION,HEARING,3,"HIGH URGENCY | admission stage | assigned to Courtroom 1"
Hearing Outcome:
# Simulated outcome: Case heard successfully, progresses to ARGUMENTS
case.current_stage = "ARGUMENTS"
case.hearing_count = 4
case.last_hearing_date = date(2024, 2, 26)
case.history.append({
"date": date(2024, 2, 26),
"outcome": "HEARD",
"stage_progression": "ADMISSION β ARGUMENTS"
})
Day 125: Arguments Stage (2024-06-15)
# Case now in arguments, higher readiness
case.current_stage = "ARGUMENTS"
case.readiness_score = 0.8 # High (arguments stage)
# Priority calculation
age_component = 0.35 # Still max age
readiness_component = 0.8 * 0.25 = 0.20 # Higher
urgency_component = 0.25 # Still urgent
boost_component = 0.0
case.priority_score = 0.80 # Very high priority
# Result: Scheduled in Position 1 (highest priority)
Final Disposal (Day 200: 2024-09-15)
# After multiple hearings in arguments stage
case.current_stage = "ORDERS / JUDGMENT"
case.hearing_count = 12
# Hearing outcome: Case disposed
case.status = CaseStatus.DISPOSED
case.disposal_date = date(2024, 9, 15)
case.total_lifecycle_days = (disposal_date - filed_date).days # 549 days
Data Flow Pipeline
Complete Data Transformation Chain
1. Historical CSV Files (Raw Data)
βββ ISDMHack_Case.csv (134,699 rows Γ 24 columns)
βββ ISDMHack_Hear.csv (739,670 rows Γ 31 columns)
2. Parameter Extraction (EDA Analysis)
βββ case_type_distribution.json
βββ stage_transition_probabilities.json
βββ adjournment_rates_by_stage.json
βββ daily_capacity_statistics.json
3. Synthetic Case Generation
βββ cases.csv (10,000 rows Γ 15 columns)
βββ Case_ID, Case_Type, Filed_Date
βββ Current_Stage, Is_Urgent, Hearing_Count
βββ Last_Hearing_Date, Last_Purpose
4. Daily Scheduling Loop (384 iterations)
βββ Day 1: cases.csv β ripeness_filter β 6,850 β eligible_filter β 5,200 β priority_sort β allocate β 703 scheduled
βββ Day 2: updated_cases β ripeness_filter β 6,820 β eligible_filter β 5,180 β priority_sort β allocate β 698 scheduled
βββ Day 384: updated_cases β ripeness_filter β 2,100 β eligible_filter β 1,950 β priority_sort β allocate β 421 scheduled
5. Daily Output Generation (per day Γ 5 courtrooms)
βββ cause_list_courtroom_1_2024-01-15.csv (140 rows)
βββ cause_list_courtroom_2_2024-01-15.csv (141 rows)
βββ cause_list_courtroom_3_2024-01-15.csv (140 rows)
βββ cause_list_courtroom_4_2024-01-15.csv (141 rows)
βββ cause_list_courtroom_5_2024-01-15.csv (141 rows)
6. Aggregated Metrics
βββ metrics.csv (384 rows Γ 8 columns)
βββ simulation_report.txt (summary statistics)
βββ case_audit_trail.csv (complete hearing history)
Data Volume at Each Stage
- Input: 874K+ historical records
- Generated: 10K synthetic cases
- Daily Processing: ~6K cases evaluated daily
- Daily Output: ~700 scheduled cases/day
- Total Output: ~42K total cause list entries
- Final Reports: 384 daily metrics + summary reports
Key Takeaways:
- Ripeness filtering removes 40.8% of cases daily (most critical efficiency gain)
- Priority scoring ensures fairness while handling urgent cases
- Load balancing achieves near-perfect distribution (Gini 0.002)
- Daily loop processes 6,000+ cases in seconds with multi-objective optimization
- Complete audit trail tracks every case decision for transparency
Last Updated: 2025-11-25
Version: 1.0
Status: Production Ready