Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from ortools.sat.python import cp_model | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from itertools import product | |
| import io | |
| import base64 | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| def am_pm(hour): | |
| """Converts 24-hour time to AM/PM format.""" | |
| period = "AM" | |
| if hour >= 12: | |
| period = "PM" | |
| if hour > 12: | |
| hour -= 12 | |
| elif hour == 0: | |
| hour = 12 # Midnight | |
| return f"{int(hour):02d}:00 {period}" | |
| def show_dataframe(csv_path): | |
| """Reads a CSV file and returns a Pandas DataFrame.""" | |
| try: | |
| df = pd.read_csv(csv_path) | |
| return df | |
| except Exception as e: | |
| return f"Error loading CSV: {e}" | |
| def optimize_staffing( | |
| csv_file, | |
| beds_per_staff, | |
| max_hours_per_staff, | |
| hours_per_cycle, | |
| rest_days_per_week, | |
| clinic_start, | |
| clinic_end, | |
| overlap_time, | |
| max_start_time_change, | |
| exact_staff_count=None, | |
| overtime_percent=100 | |
| ): | |
| # Constants | |
| STANDARD_PERIOD_DAYS = 30 # Standard month length | |
| REST_DAYS_PER_WEEK = rest_days_per_week # Store as constant | |
| try: | |
| if isinstance(csv_file, str): | |
| data = pd.read_csv(csv_file) | |
| else: | |
| data = pd.read_csv(io.StringIO(csv_file.decode('utf-8'))) | |
| except Exception as e: | |
| print(f"Error loading CSV file: {e}") | |
| return f"Error loading CSV file: {e}", None, None, None, None, None, None, None | |
| # Get number of days and setup parameters | |
| num_days = len(data) | |
| BEDS_PER_STAFF = float(beds_per_staff) | |
| BASE_MAX_HOURS = float(max_hours_per_staff) | |
| MAX_HOURS_PER_STAFF = BASE_MAX_HOURS * (num_days / 30) # Scale for actual period length | |
| original_max_hours = MAX_HOURS_PER_STAFF # Store original max hours for overtime calculations | |
| # Calculate total work hours needed | |
| total_staff_hours = 0 | |
| cycle_cols = [col for col in data.columns if col.startswith('cycle') and not col.endswith('_staff')] | |
| # Calculate staff needed per cycle | |
| for col in cycle_cols: | |
| data[f'{col}_staff'] = np.ceil(data[col] / BEDS_PER_STAFF) | |
| for _, row in data.iterrows(): | |
| for cycle in cycle_cols: | |
| total_staff_hours += row[f'{cycle}_staff'] * float(hours_per_cycle) | |
| # Define cycle times | |
| cycle_times = {} | |
| cycle_duration = float(hours_per_cycle) | |
| for i, cycle in enumerate(cycle_cols): | |
| if i == 0: | |
| cycle_start = clinic_start | |
| else: | |
| prev_cycle_start = cycle_times[cycle_cols[i-1]][0] | |
| cycle_start = (prev_cycle_start + cycle_duration) % 24 | |
| cycle_end = (cycle_start + cycle_duration) % 24 | |
| cycle_times[cycle] = (cycle_start, cycle_end) | |
| # Generate all possible shifts | |
| possible_shifts = [] | |
| shift_start_times = [] | |
| # Handle overnight clinic shifts | |
| if clinic_end < clinic_start: | |
| shift_start_times.extend(range(clinic_start, 24)) | |
| shift_start_times.extend(range(0, clinic_end + 1)) | |
| else: | |
| shift_start_times.extend(range(clinic_start, clinic_end - 4 + 1)) | |
| # Generate shifts | |
| for start_time in shift_start_times: | |
| for duration in [8, 12]: # Only 8 and 12 hour shifts - most common in healthcare | |
| end_time = (start_time + duration) % 24 | |
| shift = { | |
| 'id': f"{duration}hr_{start_time:02d}", | |
| 'start': start_time, | |
| 'end': end_time, | |
| 'duration': duration, | |
| 'cycles_covered': set() | |
| } | |
| # Determine cycles covered | |
| for cycle, (cycle_start, cycle_end) in cycle_times.items(): | |
| if cycle_end < cycle_start: # overnight cycle | |
| if (start_time >= cycle_start or end_time <= cycle_end or | |
| (end_time < start_time and (start_time < cycle_end or end_time > cycle_start))): | |
| shift['cycles_covered'].add(cycle) | |
| else: # normal cycle | |
| shift_end = end_time if end_time > start_time else end_time + 24 | |
| cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24 | |
| if not (shift_end <= cycle_start or start_time >= cycle_end_adj): | |
| shift['cycles_covered'].add(cycle) | |
| if shift['cycles_covered']: | |
| possible_shifts.append(shift) | |
| # Calculate minimum staff needed | |
| theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) | |
| min_staff_with_rest = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK))) | |
| min_staff_estimate = min_staff_with_rest | |
| # Define optimize_schedule function inside optimize_staffing to access all variables | |
| def optimize_schedule(num_staff, time_limit=600): | |
| try: | |
| # Create the model | |
| model = cp_model.CpModel() | |
| solver = cp_model.CpSolver() | |
| solver.parameters.max_time_in_seconds = time_limit | |
| solver.parameters.num_search_workers = 8 # Use multiple threads | |
| # Scale factor for converting hours to integers (x100 for 2 decimal precision) | |
| SCALE = 100 | |
| MAX_SCALED_HOURS = int(MAX_HOURS_PER_STAFF * SCALE) | |
| # Create shift variables - 1 if staff s works shift on day d | |
| x = {} | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| for shift in possible_shifts: | |
| x[s, d, shift['id']] = model.NewBoolVar(f'shift_{s}_{d}_{shift["id"]}') | |
| # Staff hours variables (scaled to integers) | |
| staff_hours = {} | |
| for s in range(1, num_staff+1): | |
| staff_hours[s] = model.NewIntVar(0, MAX_SCALED_HOURS, f'hours_{s}') | |
| max_staff_hours = model.NewIntVar(0, MAX_SCALED_HOURS, 'max_hours') | |
| # Calculate staff hours (with scaling) | |
| for s in range(1, num_staff+1): | |
| model.Add(staff_hours[s] == sum( | |
| x[s, d, shift['id']] * int(shift['duration'] * SCALE) | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts | |
| )) | |
| model.Add(staff_hours[s] <= max_staff_hours) | |
| model.Add(staff_hours[s] <= MAX_SCALED_HOURS) | |
| # Coverage constraints | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 | |
| for cycle in cycle_cols: | |
| staff_needed = int(data.iloc[day_index][f'{cycle}_staff']) | |
| if staff_needed > 0: | |
| covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] | |
| model.Add(sum( | |
| x[s, d, shift['id']] | |
| for s in range(1, num_staff+1) | |
| for shift in covering_shifts | |
| ) >= staff_needed) | |
| # One shift per day per staff | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| model.Add(sum(x[s, d, shift['id']] for shift in possible_shifts) <= 1) | |
| # Rest days constraint | |
| days_per_week = min(7, num_days) | |
| min_rest_days = max(1, min(REST_DAYS_PER_WEEK, days_per_week // 3)) | |
| for s in range(1, num_staff+1): | |
| for w in range((num_days + days_per_week - 1) // days_per_week): | |
| week_start = w * days_per_week + 1 | |
| week_end = min(week_start + days_per_week - 1, num_days) | |
| days_in_week = week_end - week_start + 1 | |
| adjusted_rest_days = max(1, int(min_rest_days * days_in_week / 7)) | |
| model.Add(sum( | |
| x[s, d, shift['id']] | |
| for d in range(week_start, week_end+1) | |
| for shift in possible_shifts | |
| ) <= days_in_week - adjusted_rest_days) | |
| # Objective: Minimize maximum hours (scaled) | |
| model.Minimize(max_staff_hours * 1000 + sum(staff_hours[s] for s in range(1, num_staff+1))) | |
| # Solve | |
| status = solver.Solve(model) | |
| if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: | |
| # Extract solution | |
| schedule = [] | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| for shift in possible_shifts: | |
| if solver.Value(x[s, d, shift['id']]) == 1: | |
| schedule.append({ | |
| 'staff_id': s, | |
| 'day': d, | |
| 'shift_id': shift['id'], | |
| 'start': shift['start'], | |
| 'end': shift['end'], | |
| 'duration': shift['duration'], | |
| 'cycles_covered': list(shift['cycles_covered']) | |
| }) | |
| objective_value = solver.ObjectiveValue() / SCALE # Convert back to hours | |
| return schedule, objective_value | |
| else: | |
| print(f"No solution found. Status: {status}") | |
| return None, None | |
| except Exception as e: | |
| print(f"Error in optimization: {e}") | |
| return None, None | |
| # Modify the optimization attempt logic for exact staff count | |
| if exact_staff_count is not None and exact_staff_count > 0: | |
| staff_count = int(exact_staff_count) | |
| time_limit = 120 # Reduced time limit | |
| schedule, objective = optimize_schedule(staff_count, time_limit) | |
| results = f"Input max hours per staff (30-day period): {BASE_MAX_HOURS}\n" | |
| results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n" | |
| results += f"(Adjustment ratio: {num_days}/{30} = {(num_days/30):.2f})\n\n" | |
| results += f"Requested staff count: {staff_count}\n" | |
| # Calculate theoretical minimum staff needed | |
| theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) | |
| min_staff_with_rest = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK))) | |
| results += f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n" | |
| results += f"Minimum staff with rest days: {min_staff_with_rest:.1f}\n" | |
| if staff_count < min_staff_with_rest: | |
| # Calculate required overtime per staff | |
| total_work_needed = total_staff_hours | |
| hours_per_staff_no_overtime = total_work_needed / min_staff_with_rest | |
| hours_per_staff_with_fewer = total_work_needed / staff_count | |
| overtime_needed_percent = ((hours_per_staff_with_fewer / hours_per_staff_no_overtime) - 1) * 100 | |
| results += f"\nWARNING: Requested staff count ({staff_count}) is below minimum ({min_staff_with_rest:.1f})" | |
| results += f"\nEach staff member will need approximately {overtime_needed_percent:.1f}% overtime" | |
| results += f"\nAttempting solution with overtime...\n" | |
| # Update MAX_HOURS_PER_STAFF with required overtime | |
| original_max_hours = MAX_HOURS_PER_STAFF | |
| MAX_HOURS_PER_STAFF *= (1 + overtime_needed_percent/100) | |
| # Try to find solution with overtime | |
| schedule, objective = optimize_schedule(staff_count) | |
| if schedule is not None: | |
| results += f"\nSolution found with {overtime_needed_percent:.1f}% overtime allowance\n" | |
| results += f"Original max hours per staff: {original_max_hours:.1f}\n" | |
| results += f"Adjusted max hours with overtime: {MAX_HOURS_PER_STAFF:.1f}\n" | |
| # Calculate actual overtime for each staff | |
| schedule_df = pd.DataFrame(schedule) | |
| staff_overtime = {} | |
| for s in range(1, staff_count + 1): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == s] | |
| total_hours = staff_shifts['duration'].sum() | |
| if total_hours > original_max_hours: | |
| overtime = total_hours - original_max_hours | |
| overtime_percent = (overtime / original_max_hours) * 100 | |
| staff_overtime[s] = (total_hours, overtime, overtime_percent) | |
| results += "\nDetailed overtime breakdown:\n" | |
| for staff_id, (total, ot, pct) in staff_overtime.items(): | |
| results += f"Staff {staff_id}: {total:.1f} total hours, {ot:.1f} overtime hours ({pct:.1f}% overtime)\n" | |
| else: | |
| results += "\nFailed to find solution even with overtime. Try increasing staff count.\n" | |
| return results, None, None, None, None, None, None, None | |
| else: | |
| # Start from theoretical minimum and work up | |
| min_staff = max(1, int(theoretical_min_staff)) | |
| max_staff = int(min_staff_estimate) + 2 # Reduced range further | |
| results = f"Input max hours per staff (30-day period): {BASE_MAX_HOURS}\n" | |
| results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n" | |
| results += f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n" | |
| results += f"Searching for minimum staff count starting from {min_staff}...\n" | |
| schedule = None | |
| for staff_count in range(min_staff, max_staff + 1): | |
| results += f"Trying with {staff_count} staff...\n" | |
| time_limit = 90 # Very aggressive time limit | |
| schedule, objective = optimize_schedule(staff_count, time_limit) | |
| if schedule is not None: | |
| results += f"Found feasible solution with {staff_count} staff.\n" | |
| break | |
| if schedule is None: | |
| results += "Failed to find a feasible solution with standard staff counts.\n" | |
| results += "Attempting solution with minimum staff and overtime...\n" | |
| staff_count = min_staff | |
| required_overtime = ((min_staff_estimate / staff_count) - 1) * 100 | |
| overtime_percent = max(overtime_percent, required_overtime) | |
| # Update MAX_HOURS_PER_STAFF with overtime | |
| original_max_hours = MAX_HOURS_PER_STAFF | |
| MAX_HOURS_PER_STAFF *= (1 + overtime_percent/100) | |
| schedule, objective = optimize_schedule(staff_count) | |
| if schedule is not None: | |
| results += f"\nSolution found with {staff_count} staff and {overtime_percent:.1f}% overtime\n" | |
| results += f"Original max hours: {original_max_hours:.1f}\n" | |
| results += f"Adjusted max hours with overtime: {MAX_HOURS_PER_STAFF:.1f}\n" | |
| results += f"Optimal solution found with {staff_count} staff\n" | |
| results += f"Total staff hours: {objective}\n" | |
| # Convert to DataFrame for analysis | |
| schedule_df = pd.DataFrame(schedule) | |
| # Analyze staff workload with overtime calculations | |
| staff_hours = {} | |
| overtime_hours = {} | |
| for s in range(1, staff_count+1): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == s] | |
| total_hours = staff_shifts['duration'].sum() | |
| staff_hours[s] = total_hours | |
| # Calculate overtime | |
| if total_hours > original_max_hours: | |
| overtime = total_hours - original_max_hours | |
| overtime_percent = (overtime / original_max_hours) * 100 | |
| overtime_hours[s] = (overtime, overtime_percent) | |
| # Display staff hours and overtime | |
| results += "\nStaff Hours and Overtime:\n" | |
| for staff_id, hours in staff_hours.items(): | |
| results += f"Staff {staff_id}: {hours:.1f} hours" | |
| if staff_id in overtime_hours: | |
| ot_hours, ot_percent = overtime_hours[staff_id] | |
| results += f" (includes {ot_hours:.1f} overtime hours, {ot_percent:.1f}% overtime)" | |
| results += "\n" | |
| # Convert to DataFrame for analysis | |
| schedule_df = pd.DataFrame(schedule) | |
| # Analyze staff workload | |
| staff_hours = {} | |
| for s in range(1, staff_count+1): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == s] | |
| total_hours = staff_shifts['duration'].sum() | |
| staff_hours[s] = total_hours | |
| # Handle staff hours display based on whether exact count was specified | |
| if exact_staff_count is not None: | |
| # When exact count is specified, show all staff including those with 0 hours | |
| active_staff_hours = staff_hours | |
| else: | |
| # Otherwise, only show active staff | |
| active_staff_hours = {s: hours for s, hours in staff_hours.items() if hours > 0} | |
| results += "\nStaff Hours:\n" | |
| total_active_hours = sum(active_staff_hours.values()) | |
| avg_hours = total_active_hours / len(active_staff_hours) if active_staff_hours else 0 | |
| for staff_id, hours in active_staff_hours.items(): | |
| utilization = (hours / MAX_HOURS_PER_STAFF) * 100 | |
| deviation_from_avg = ((hours - avg_hours) / avg_hours * 100) if avg_hours > 0 else 0 | |
| results += f"Staff {staff_id}: {hours:.1f} hours ({utilization:.1f}% utilization)" | |
| if exact_staff_count is not None: | |
| results += f" [Deviation from avg: {deviation_from_avg:+.1f}%]" | |
| results += "\n" | |
| # Add overtime information | |
| if staff_id in overtime_hours: | |
| ot_hours, ot_percent = overtime_hours[staff_id] | |
| results += f" Overtime: {ot_hours:.1f} hours ({ot_percent:.1f}%)\n" | |
| if exact_staff_count is not None: | |
| results += f"\nWorkload Distribution Stats:\n" | |
| results += f"Average hours per staff: {avg_hours:.1f}\n" | |
| if active_staff_hours: | |
| max_deviation = max(abs((hours - avg_hours) / avg_hours * 100) for hours in active_staff_hours.values()) if avg_hours > 0 else 0 | |
| results += f"Maximum deviation from average: {max_deviation:.1f}%\n" | |
| # Use active_staff_hours for average utilization calculation | |
| active_staff_count = len(active_staff_hours) | |
| avg_utilization = sum(active_staff_hours.values()) / (active_staff_count * MAX_HOURS_PER_STAFF) * 100 | |
| results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n" | |
| # Check coverage for each day and cycle | |
| coverage_check = [] | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 # 0-indexed for DataFrame | |
| day_schedule = schedule_df[schedule_df['day'] == d] | |
| for cycle in cycle_cols: | |
| required = data.iloc[day_index][f'{cycle}_staff'] | |
| # Count staff covering this cycle | |
| assigned = sum(1 for _, shift in day_schedule.iterrows() | |
| if cycle in shift['cycles_covered']) | |
| coverage_check.append({ | |
| 'day': d, | |
| 'cycle': cycle, | |
| 'required': required, | |
| 'assigned': assigned, | |
| 'satisfied': assigned >= required | |
| }) | |
| coverage_df = pd.DataFrame(coverage_check) | |
| satisfaction = coverage_df['satisfied'].mean() * 100 | |
| results += f"Coverage satisfaction: {satisfaction:.1f}%\n" | |
| if satisfaction < 100: | |
| results += "Warning: Not all staffing requirements are met!\n" | |
| unsatisfied = coverage_df[~coverage_df['satisfied']] | |
| results += unsatisfied.to_string() + "\n" | |
| # Generate detailed schedule report | |
| detailed_schedule = "Detailed Schedule:\n" | |
| for d in range(1, num_days+1): | |
| day_schedule = schedule_df[schedule_df['day'] == d] | |
| day_schedule = day_schedule.sort_values(['start']) | |
| detailed_schedule += f"\nDay {d}:\n" | |
| for _, shift in day_schedule.iterrows(): | |
| start_hour = shift['start'] | |
| end_hour = shift['end'] | |
| start_str = am_pm(start_hour) | |
| end_str = am_pm(end_hour) | |
| cycles = ", ".join(shift['cycles_covered']) | |
| detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n" | |
| # Generate schedule visualization | |
| fig, ax = plt.subplots(figsize=(15, 8)) | |
| # Prepare schedule for plotting | |
| staff_days = {} | |
| for s in range(1, staff_count+1): | |
| staff_days[s] = [0] * num_days # 0 means off duty | |
| for _, shift in schedule_df.iterrows(): | |
| staff_id = shift['staff_id'] | |
| day = shift['day'] - 1 # 0-indexed | |
| staff_days[staff_id][day] = shift['duration'] | |
| # Plot the schedule | |
| for s, hours in staff_days.items(): | |
| ax.bar(range(1, num_days+1), hours, label=f'Staff {s}') | |
| ax.set_xlabel('Day') | |
| ax.set_ylabel('Shift Hours') | |
| ax.set_title('Staff Schedule') | |
| ax.set_xticks(range(1, num_days+1)) | |
| ax.legend() | |
| # Save the figure to a temporary file | |
| plot_path = None | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: | |
| plt.savefig(f.name) | |
| plt.close(fig) | |
| plot_path = f.name | |
| # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff | |
| gantt_path = create_gantt_chart(schedule_df, num_days, staff_count) | |
| # Convert schedule to CSV data | |
| schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) | |
| schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) | |
| schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False) | |
| # Create a temporary file and write the CSV data into it | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: | |
| temp_file.write(schedule_csv) | |
| schedule_csv_path = temp_file.name | |
| # Create staff assignment table | |
| staff_assignment_data = [] | |
| for d in range(1, num_days + 1): | |
| cycle_staff = {} | |
| for cycle in cycle_cols: | |
| # Get staff IDs assigned to this cycle on this day | |
| staff_ids = schedule_df[(schedule_df['day'] == d) & (schedule_df['cycles_covered'].apply(lambda x: cycle in x))]['staff_id'].tolist() | |
| cycle_staff[cycle] = len(staff_ids) | |
| staff_assignment_data.append([d] + [cycle_staff[cycle] for cycle in cycle_cols]) | |
| staff_assignment_df = pd.DataFrame(staff_assignment_data, columns=['Day'] + cycle_cols) | |
| # Create CSV files for download | |
| staff_assignment_csv_path = None | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: | |
| staff_assignment_df.to_csv(temp_file.name, index=False) | |
| staff_assignment_csv_path = temp_file.name | |
| # Return all required values in the correct order | |
| return results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path | |
| def convert_to_24h(time_str): | |
| """Converts AM/PM time string to 24-hour format.""" | |
| try: | |
| time_obj = datetime.strptime(time_str, "%I:00 %p") | |
| return time_obj.hour | |
| except ValueError: | |
| return None | |
| def gradio_wrapper( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, | |
| exact_staff_count=None, overtime_percent=100 | |
| ): | |
| try: | |
| # Convert AM/PM times to 24-hour format | |
| clinic_start = convert_to_24h(clinic_start_ampm) | |
| clinic_end = convert_to_24h(clinic_end_ampm) | |
| # Call the optimization function | |
| results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, | |
| exact_staff_count, overtime_percent | |
| ) | |
| # Return the results | |
| return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path | |
| except Exception as e: | |
| # If there's an error in the optimization process, return a meaningful error message | |
| empty_staff_df = pd.DataFrame(columns=["Day"]) | |
| error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." | |
| # Return error in the first output | |
| return empty_staff_df, None, None, None, None, None | |
| # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff | |
| def create_gantt_chart(schedule_df, num_days, staff_count): | |
| # Get the list of active staff IDs (staff who have at least one shift) | |
| active_staff_ids = sorted(schedule_df['staff_id'].unique()) | |
| active_staff_count = len(active_staff_ids) | |
| # Create a mapping from original staff ID to position in the chart | |
| staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)} | |
| # Create a figure size based on the actual number of days | |
| plt.figure(figsize=(max(15, num_days * 1.2), max(8, active_staff_count * 0.8)), dpi=200) | |
| # Use a more sophisticated color palette - only for active staff | |
| colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count)) | |
| # Set a modern style | |
| plt.style.use('seaborn-v0_8-whitegrid') | |
| # Create a new axis with a slight background color | |
| ax = plt.gca() | |
| ax.set_facecolor('#f8f9fa') | |
| # Sort by staff then day | |
| schedule_df = schedule_df.sort_values(['staff_id', 'day']) | |
| # Plot Gantt chart - only for active staff | |
| for i, staff_id in enumerate(active_staff_ids): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] | |
| y_pos = active_staff_count - i # Position based on index in active staff list | |
| # Add staff label with a background box | |
| ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold', | |
| ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray', | |
| boxstyle='round,pad=0.5', alpha=0.9)) | |
| # Add a subtle background for each staff row | |
| ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5) | |
| # Track shift positions to avoid label overlap | |
| shift_positions = [] | |
| for idx, shift in enumerate(staff_shifts.iterrows()): | |
| _, shift = shift | |
| day = shift['day'] | |
| start_hour = shift['start'] | |
| end_hour = shift['end'] | |
| duration = shift['duration'] | |
| # Format times for display | |
| start_ampm = am_pm(start_hour) | |
| end_ampm = am_pm(end_hour) | |
| # Calculate shift position | |
| shift_start_pos = day-1+start_hour/24 | |
| # Handle overnight shifts | |
| if end_hour < start_hour: # Overnight shift | |
| # First part of shift (until midnight) | |
| rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect1: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # Second part of shift (after midnight) | |
| rect2 = ax.barh(y_pos, end_hour/24, left=day, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect2: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # For overnight shifts, we'll place the label in the first part if it's long enough | |
| shift_width = (24-start_hour)/24 | |
| if shift_width >= 0.1: # Only add label if there's enough space | |
| label_pos = shift_start_pos + shift_width/2 | |
| # Alternate labels above and below | |
| y_offset = 0.35 if idx % 2 == 0 else -0.35 | |
| # Add label with background for better readability | |
| label = f"{start_ampm}-{end_ampm}" | |
| text = ax.text(label_pos, y_pos + y_offset, label, | |
| ha='center', va='center', fontsize=9, fontweight='bold', | |
| color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, | |
| boxstyle='round,pad=0.3', edgecolor='gray'), | |
| zorder=20) | |
| shift_positions.append(label_pos) | |
| else: | |
| # Regular shift | |
| shift_width = duration/24 | |
| rect = ax.barh(y_pos, shift_width, left=shift_start_pos, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # Only add label if there's enough space | |
| if shift_width >= 0.1: | |
| label_pos = shift_start_pos + shift_width/2 | |
| # Alternate labels above and below | |
| y_offset = 0.35 if idx % 2 == 0 else -0.35 | |
| # Add label with background for better readability | |
| label = f"{start_ampm}-{end_ampm}" | |
| text = ax.text(label_pos, y_pos + y_offset, label, | |
| ha='center', va='center', fontsize=9, fontweight='bold', | |
| color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, | |
| boxstyle='round,pad=0.3', edgecolor='gray'), | |
| zorder=20) | |
| shift_positions.append(label_pos) | |
| # Add weekend highlighting with a more sophisticated look | |
| for day in range(1, num_days + 1): | |
| # Determine if this is a weekend (assuming day 1 is Monday) | |
| is_weekend = (day % 7 == 0) or (day % 7 == 6) # Saturday or Sunday | |
| if is_weekend: | |
| ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10) | |
| day_label = "Saturday" if day % 7 == 6 else "Sunday" | |
| ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000', | |
| fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round')) | |
| # Set x-axis ticks for each day with better formatting | |
| ax.set_xticks(np.arange(0.5, num_days, 1)) | |
| day_labels = [f"Day {d}" for d in range(1, num_days+1)] | |
| ax.set_xticklabels(day_labels, rotation=45 if num_days > 20 else 0, ha='center', fontsize=10) | |
| # Add vertical lines between days with better styling | |
| for day in range(1, num_days): | |
| ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5) | |
| # Set y-axis ticks for each staff | |
| ax.set_yticks(np.arange(1, active_staff_count+1)) | |
| ax.set_yticklabels([]) # Remove default labels as we've added custom ones | |
| # Set axis limits with some padding | |
| ax.set_xlim(-0.8, num_days) | |
| ax.set_ylim(0.5, active_staff_count + 0.5) | |
| # Add grid for hours (every 6 hours) with better styling | |
| for day in range(num_days): | |
| for hour in [6, 12, 18]: | |
| ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5) | |
| # Add small hour markers at the bottom | |
| hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM" | |
| ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7, | |
| color='#666666', rotation=90, alpha=0.7) | |
| # Add title and labels with more sophisticated styling | |
| plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333') | |
| plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333') | |
| # Add a legend for time reference with better styling | |
| time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10, | |
| fontweight='bold', color='#333333') | |
| time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight'] | |
| for i, time in enumerate(time_markers): | |
| plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555') | |
| # Remove spines | |
| for spine in ['top', 'right', 'left']: | |
| ax.spines[spine].set_visible(False) | |
| # Add a note about weekends with better styling | |
| weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12, | |
| color='#cc0000', fontweight='bold', | |
| bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round')) | |
| # Add a subtle border around the entire chart | |
| plt.box(False) | |
| # Save the Gantt chart with high quality | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: | |
| plt.tight_layout() | |
| plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white') | |
| plt.close() | |
| return f.name | |
| def optimize_schedule(num_staff, time_limit=600): | |
| try: | |
| # Create the model | |
| model = cp_model.CpModel() | |
| solver = cp_model.CpSolver() | |
| solver.parameters.max_time_in_seconds = time_limit | |
| solver.parameters.num_search_workers = 8 # Use multiple threads | |
| # Scale factor for converting hours to integers (x100 for 2 decimal precision) | |
| SCALE = 100 | |
| MAX_SCALED_HOURS = int(MAX_HOURS_PER_STAFF * SCALE) | |
| # Create shift variables - 1 if staff s works shift on day d | |
| x = {} | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| for shift in possible_shifts: | |
| x[s, d, shift['id']] = model.NewBoolVar(f'shift_{s}_{d}_{shift["id"]}') | |
| # Staff hours variables (scaled to integers) | |
| staff_hours = {} | |
| for s in range(1, num_staff+1): | |
| staff_hours[s] = model.NewIntVar(0, MAX_SCALED_HOURS, f'hours_{s}') | |
| max_staff_hours = model.NewIntVar(0, MAX_SCALED_HOURS, 'max_hours') | |
| # Calculate staff hours (with scaling) | |
| for s in range(1, num_staff+1): | |
| model.Add(staff_hours[s] == sum( | |
| x[s, d, shift['id']] * int(shift['duration'] * SCALE) | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts | |
| )) | |
| model.Add(staff_hours[s] <= max_staff_hours) | |
| model.Add(staff_hours[s] <= MAX_SCALED_HOURS) | |
| # Coverage constraints | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 | |
| for cycle in cycle_cols: | |
| staff_needed = int(data.iloc[day_index][f'{cycle}_staff']) | |
| if staff_needed > 0: | |
| covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] | |
| model.Add(sum( | |
| x[s, d, shift['id']] | |
| for s in range(1, num_staff+1) | |
| for shift in covering_shifts | |
| ) >= staff_needed) | |
| # One shift per day per staff | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| model.Add(sum(x[s, d, shift['id']] for shift in possible_shifts) <= 1) | |
| # Rest days constraint | |
| days_per_week = min(7, num_days) | |
| min_rest_days = max(1, min(REST_DAYS_PER_WEEK, days_per_week // 3)) | |
| for s in range(1, num_staff+1): | |
| for w in range((num_days + days_per_week - 1) // days_per_week): | |
| week_start = w * days_per_week + 1 | |
| week_end = min(week_start + days_per_week - 1, num_days) | |
| days_in_week = week_end - week_start + 1 | |
| adjusted_rest_days = max(1, int(min_rest_days * days_in_week / 7)) | |
| model.Add(sum( | |
| x[s, d, shift['id']] | |
| for d in range(week_start, week_end+1) | |
| for shift in possible_shifts | |
| ) <= days_in_week - adjusted_rest_days) | |
| # Objective: Minimize maximum hours (scaled) | |
| model.Minimize(max_staff_hours * 1000 + sum(staff_hours[s] for s in range(1, num_staff+1))) | |
| # Solve | |
| status = solver.Solve(model) | |
| if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: | |
| # Extract solution | |
| schedule = [] | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| for shift in possible_shifts: | |
| if solver.Value(x[s, d, shift['id']]) == 1: | |
| schedule.append({ | |
| 'staff_id': s, | |
| 'day': d, | |
| 'shift_id': shift['id'], | |
| 'start': shift['start'], | |
| 'end': shift['end'], | |
| 'duration': shift['duration'], | |
| 'cycles_covered': list(shift['cycles_covered']) | |
| }) | |
| objective_value = solver.ObjectiveValue() / SCALE # Convert back to hours | |
| return schedule, objective_value | |
| else: | |
| print(f"No solution found. Status: {status}") | |
| return None, None | |
| except Exception as e: | |
| print(f"Error in optimization: {e}") | |
| return None, None | |
| # Define Gradio UI | |
| am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] | |
| with gr.Blocks(title="Staff Scheduling Optimizer", css=""" | |
| #staff_assignment_table { | |
| width: 100% !important; | |
| } | |
| #csv_schedule { | |
| width: 100% !important; | |
| } | |
| .container { | |
| max-width: 100% !important; | |
| padding: 0 !important; | |
| } | |
| .download-btn { | |
| margin-top: 10px !important; | |
| } | |
| """) as iface: | |
| gr.Markdown("# Staff Scheduling Optimizer") | |
| gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.") | |
| with gr.Row(): | |
| # LEFT PANEL - Inputs | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Input Parameters") | |
| # Input parameters | |
| csv_input = gr.File(label="Upload CSV") | |
| beds_per_staff = gr.Number(label="Beds per Staff", value=3) | |
| max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160) | |
| hours_per_cycle = gr.Number(label="Hours per Cycle", value=4) | |
| rest_days_per_week = gr.Number(label="Rest Days per Week", value=2) | |
| clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="08:00 AM") | |
| clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="08:00 PM") | |
| overlap_time = gr.Number(label="Overlap Time", value=0) | |
| max_start_time_change = gr.Number(label="Max Start Time Change", value=2) | |
| exact_staff_count = gr.Number(label="Exact Staff Count (optional)", value=None) | |
| overtime_percent = gr.Slider(label="Overtime Allowed (%)", minimum=0, maximum=100, value=100, step=10) | |
| optimize_btn = gr.Button("Optimize Schedule", variant="primary", size="lg") | |
| # RIGHT PANEL - Outputs | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Results") | |
| # Tabs for different outputs - reordered | |
| with gr.Tabs(): | |
| with gr.TabItem("Detailed Schedule"): | |
| with gr.Row(): | |
| csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule") | |
| with gr.Row(): | |
| schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True) | |
| with gr.TabItem("Gantt Chart"): | |
| gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart") | |
| with gr.TabItem("Staff Coverage by Cycle"): | |
| with gr.Row(): | |
| staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table") | |
| with gr.Row(): | |
| staff_download_file = gr.File(label="Download Coverage Table", visible=True) | |
| with gr.TabItem("Hours Visualization"): | |
| schedule_visualization = gr.Image(label="Hours by Day Visualization", elem_id="schedule_visualization") | |
| # Define download functions | |
| def create_download_link(df, filename="data.csv"): | |
| """Create a CSV download link for a dataframe""" | |
| if df is None or df.empty: | |
| return None | |
| csv_data = df.to_csv(index=False) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: | |
| f.write(csv_data) | |
| return f.name | |
| # Update the optimize_and_display function | |
| def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, | |
| overlap_time, max_start_time_change, exact_staff_count, overtime_percent): | |
| try: | |
| # Convert AM/PM times to 24-hour format | |
| clinic_start = convert_to_24h(clinic_start_ampm) | |
| clinic_end = convert_to_24h(clinic_end_ampm) | |
| # Call the optimization function | |
| results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, | |
| exact_staff_count, overtime_percent | |
| ) | |
| # Return the results | |
| return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path | |
| except Exception as e: | |
| # If there's an error in the optimization process, return a meaningful error message | |
| empty_staff_df = pd.DataFrame(columns=["Day"]) | |
| error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." | |
| # Return error in the first output | |
| return empty_staff_df, None, None, None, None, None | |
| # Connect the button to the optimization function | |
| optimize_btn.click( | |
| fn=optimize_and_display, | |
| inputs=[ | |
| csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, | |
| overlap_time, max_start_time_change, exact_staff_count, overtime_percent | |
| ], | |
| outputs=[ | |
| staff_assignment_table, gantt_chart, csv_schedule, schedule_visualization, | |
| staff_download_file, schedule_download_file | |
| ] | |
| ) | |
| # Launch the Gradio app | |
| iface.launch(share=True) |