import pandas as pd import numpy as np import pulp as pl import matplotlib.pyplot as plt import gradio as gr from itertools import product import io import base64 import tempfile import os from datetime import datetime def am_pm(hour): """Converts 24-hour time to AM/PM format.""" period = "AM" if hour >= 12: period = "PM" if hour > 12: hour -= 12 elif hour == 0: hour = 12 # Midnight return f"{int(hour):02d}:00 {period}" def optimize_staffing( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change ): # Load data try: if isinstance(csv_file, str): # Handle the case when a filepath is passed directly data = pd.read_csv(csv_file) else: # Handle the case when file object is uploaded through Gradio data = pd.read_csv(io.StringIO(csv_file.decode('utf-8'))) except Exception as e: print(f"Error loading CSV file: {e}") return f"Error loading CSV file: {e}", None, None, None # Rename the index column if necessary if data.columns[0] not in ['day', 'Day', 'DAY']: data = data.rename(columns={data.columns[0]: 'day'}) # Fill missing values for col in data.columns: if col.startswith('cycle'): data[col].fillna(0, inplace=True) # Calculate clinic hours if clinic_end < clinic_start: clinic_hours = 24 - clinic_start + clinic_end else: clinic_hours = clinic_end - clinic_start # Parameters BEDS_PER_STAFF = float(beds_per_staff) MAX_HOURS_PER_STAFF = float(max_hours_per_staff) HOURS_PER_CYCLE = float(hours_per_cycle) REST_DAYS_PER_WEEK = int(rest_days_per_week) SHIFT_TYPES = [6, 8, 10, 12] # Standard shift types OVERLAP_TIME = float(overlap_time) CLINIC_START = int(clinic_start) CLINIC_END = int(clinic_end) CLINIC_HOURS = clinic_hours MAX_START_TIME_CHANGE = int(max_start_time_change) # Calculate staff needed per cycle (beds/BEDS_PER_STAFF, rounded up) for col in data.columns: if col.startswith('cycle') and not col.endswith('_staff'): data[f'{col}_staff'] = np.ceil(data[col] / BEDS_PER_STAFF) # Get cycle names and number of cycles cycle_cols = [col for col in data.columns if col.startswith('cycle') and not col.endswith('_staff')] num_cycles = len(cycle_cols) # Define cycle times cycle_times = {} for i, cycle in enumerate(cycle_cols): cycle_start = (CLINIC_START + i * HOURS_PER_CYCLE) % 24 cycle_end = (CLINIC_START + (i + 1) * HOURS_PER_CYCLE) % 24 cycle_times[cycle] = (cycle_start, cycle_end) # Get staff requirements max_staff_needed = max([data[f'{cycle}_staff'].max() for cycle in cycle_cols]) # Define possible shift start times shift_start_times = list(range(CLINIC_START, CLINIC_START + int(CLINIC_HOURS) - min(SHIFT_TYPES) + 1)) # Generate all possible shifts possible_shifts = [] for duration in SHIFT_TYPES: for start_time in shift_start_times: end_time = (start_time + duration) % 24 # Create a shift with its coverage of cycles shift = { 'id': f"{duration}hr_{start_time:02d}", 'start': start_time, 'end': end_time, 'duration': duration, 'cycles_covered': set() } # Determine which cycles this shift covers for cycle, (cycle_start, cycle_end) in cycle_times.items(): # Handle overnight cycles if cycle_end < cycle_start: # overnight cycle if start_time >= cycle_start or end_time <= cycle_end or (start_time < end_time and end_time > cycle_start): shift['cycles_covered'].add(cycle) else: # normal cycle shift_end = end_time if end_time > start_time else end_time + 24 cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24 # Check for overlap if not (shift_end <= cycle_start or start_time >= cycle_end_adj): shift['cycles_covered'].add(cycle) if shift['cycles_covered']: # Only add shifts that cover at least one cycle possible_shifts.append(shift) # Estimate minimum number of staff needed total_staff_hours = 0 for _, row in data.iterrows(): for cycle in cycle_cols: total_staff_hours += row[f'{cycle}_staff'] * HOURS_PER_CYCLE min_staff_estimate = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) # Get number of days in the dataset num_days = len(data) # Add some buffer for constraints like rest days and shift changes estimated_staff = max(min_staff_estimate, max_staff_needed + 1) def optimize_schedule(num_staff): # Create a binary linear programming model model = pl.LpProblem("Staff_Scheduling", pl.LpMinimize) # Decision variables # x[s,d,shift] = 1 if staff s works shift on day d x = pl.LpVariable.dicts("shift", [(s, d, shift['id']) for s in range(1, num_staff+1) for d in range(1, num_days+1) for shift in possible_shifts], cat='Binary') # Objective: Minimize total staff hours while ensuring coverage model += pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] for s in range(1, num_staff+1) for d in range(1, num_days+1) for shift in possible_shifts) # Constraint: Each staff works at most one shift per day for s in range(1, num_staff+1): for d in range(1, num_days+1): model += pl.lpSum(x[(s, d, shift['id'])] for shift in possible_shifts) <= 1 # Constraint: Each staff has at least one rest day per week for s in range(1, num_staff+1): for w in range((num_days + 6) // 7): # Number of weeks week_start = w*7 + 1 week_end = min(week_start + 6, num_days) model += pl.lpSum(x[(s, d, shift['id'])] for d in range(week_start, week_end+1) for shift in possible_shifts) <= (week_end - week_start + 1) - REST_DAYS_PER_WEEK # Constraint: Each staff works at most MAX_HOURS_PER_STAFF in the period for s in range(1, num_staff+1): model += pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] for d in range(1, num_days+1) for shift in possible_shifts) <= MAX_HOURS_PER_STAFF # Constraint: Each cycle has enough staff each day for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame for cycle in cycle_cols: staff_needed = data.iloc[day_index][f'{cycle}_staff'] # Get all shifts that cover this cycle covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] model += pl.lpSum(x[(s, d, shift['id'])] for s in range(1, num_staff+1) for shift in covering_shifts) >= staff_needed # Solve model with a time limit model.solve(pl.PULP_CBC_CMD(timeLimit=300, msg=0)) # Check if a feasible solution was found if model.status == pl.LpStatusOptimal or model.status == pl.LpStatusNotSolved: # Extract the solution schedule = [] for s in range(1, num_staff+1): for d in range(1, num_days+1): for shift in possible_shifts: if pl.value(x[(s, d, shift['id'])]) == 1: # Find the shift details shift_details = next((sh for sh in possible_shifts if sh['id'] == shift['id']), None) schedule.append({ 'staff_id': s, 'day': d, 'shift_id': shift['id'], 'start': shift_details['start'], 'end': shift_details['end'], 'duration': shift_details['duration'], 'cycles_covered': list(shift_details['cycles_covered']) }) return schedule, model.objective.value() else: return None, None # Try to solve with estimated number of staff staff_count = int(estimated_staff) results = f"Trying with {staff_count} staff...\n" schedule, objective = optimize_schedule(staff_count) # If no solution found, increment staff count until a solution is found while schedule is None and staff_count < 15: # Cap at 15 to avoid infinite loop staff_count += 1 results += f"Trying with {staff_count} staff...\n" schedule, objective = optimize_schedule(staff_count) if schedule is None: results += "Failed to find a feasible solution. Try relaxing some constraints." return results, None, None, None results += f"Optimal solution found with {staff_count} staff\n" results += f"Total staff hours: {objective}\n" # Convert to DataFrame for analysis schedule_df = pd.DataFrame(schedule) # Analyze staff workload staff_hours = {} for s in range(1, staff_count+1): staff_shifts = schedule_df[schedule_df['staff_id'] == s] total_hours = staff_shifts['duration'].sum() staff_hours[s] = total_hours results += "\nStaff Hours:\n" for staff_id, hours in staff_hours.items(): utilization = (hours / MAX_HOURS_PER_STAFF) * 100 results += f"Staff {staff_id}: {hours} hours ({utilization:.1f}% utilization)\n" avg_utilization = sum(staff_hours.values()) / (staff_count * MAX_HOURS_PER_STAFF) * 100 results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n" # Check coverage for each day and cycle coverage_check = [] for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame day_schedule = schedule_df[schedule_df['day'] == d] for cycle in cycle_cols: required = data.iloc[day_index][f'{cycle}_staff'] # Count staff covering this cycle assigned = sum(1 for _, shift in day_schedule.iterrows() if cycle in shift['cycles_covered']) coverage_check.append({ 'day': d, 'cycle': cycle, 'required': required, 'assigned': assigned, 'satisfied': assigned >= required }) coverage_df = pd.DataFrame(coverage_check) satisfaction = coverage_df['satisfied'].mean() * 100 results += f"Coverage satisfaction: {satisfaction:.1f}%\n" if satisfaction < 100: results += "Warning: Not all staffing requirements are met!\n" unsatisfied = coverage_df[~coverage_df['satisfied']] results += unsatisfied.to_string() + "\n" # Generate detailed schedule report detailed_schedule = "Detailed Schedule:\n" for d in range(1, num_days+1): day_schedule = schedule_df[schedule_df['day'] == d] day_schedule = day_schedule.sort_values(['start']) detailed_schedule += f"\nDay {d}:\n" for _, shift in day_schedule.iterrows(): start_hour = shift['start'] end_hour = shift['end'] start_str = am_pm(start_hour) end_str = am_pm(end_hour) cycles = ", ".join(shift['cycles_covered']) detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n" # Generate schedule visualization fig, ax = plt.subplots(figsize=(15, 8)) # Prepare schedule for plotting staff_days = {} for s in range(1, staff_count+1): staff_days[s] = [0] * num_days # 0 means off duty for _, shift in schedule_df.iterrows(): staff_id = shift['staff_id'] day = shift['day'] - 1 # 0-indexed staff_days[staff_id][day] = shift['duration'] # Plot the schedule for s, hours in staff_days.items(): ax.bar(range(1, num_days+1), hours, label=f'Staff {s}') ax.set_xlabel('Day') ax.set_ylabel('Shift Hours') ax.set_title('Staff Schedule') ax.set_xticks(range(1, num_days+1)) ax.legend() # Save the figure to a temporary file plot_path = None with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: plt.savefig(f.name) plt.close(fig) plot_path = f.name # Create a Gantt chart gantt_fig, gantt_ax = plt.subplots(figsize=(30, 12)) # Increased figure width # Set up colors for each staff colors = plt.cm.tab20.colors # Use a visually distinct color palette # Sort by staff then day schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) schedule_df = schedule_df.sort_values(['staff_id', 'day']) # Plot Gantt chart for staff_id in range(1, staff_count+1): staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] y_pos = staff_id for i, shift in staff_shifts.iterrows(): day = shift['day'] start_hour = shift['start'] end_hour = shift['end'] duration = shift['duration'] start_ampm = shift['start_ampm'] end_ampm = shift['end_ampm'] # Handle overnight shifts if end_hour < start_hour: # Overnight shift gantt_ax.broken_barh([(day-1 + start_hour/24, (24-start_hour)/24), (day, end_hour/24)], (y_pos-0.3, 0.6), # Increased bar height facecolors=colors[staff_id % len(colors)]) else: gantt_ax.broken_barh([(day-1 + start_hour/24, duration/24)], (y_pos-0.3, 0.6), # Increased bar height facecolors=colors[staff_id % len(colors)]) # Staggered text labels text_y_offset = 0.1 if (i % 2) == 0 else -0.1 # Alternate label position # Add text label - prioritize staff ID, add time range if space allows text_label = f"Staff {staff_id}" if duration > 6: # Adjust this threshold as needed text_label += f"\n{start_ampm}-{end_ampm}" gantt_ax.text(day-1 + start_hour/24 + duration/48, y_pos + text_y_offset, text_label, horizontalalignment='center', verticalalignment='center', fontsize=7) # Slightly smaller font gantt_ax.set_xlabel('Day') gantt_ax.set_yticks(range(1, staff_count+1)) gantt_ax.set_yticklabels([f'Staff {s}' for s in range(1, staff_count+1)]) gantt_ax.set_xlim(0, num_days) gantt_ax.set_title('Staff Schedule (Full Period)') gantt_ax.grid(False) # Remove grid lines # Save the Gantt chart gantt_path = None with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: gantt_fig.savefig(f.name) plt.close(gantt_fig) gantt_path = f.name # Convert schedule to CSV data schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False) # Create a temporary file and write the CSV data into it with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: temp_file.write(schedule_csv) schedule_csv_path = temp_file.name return results, plot_path, schedule_csv_path, gantt_path def convert_to_24h(time_str): """Converts AM/PM time string to 24-hour format.""" try: time_obj = datetime.strptime(time_str, "%I:00 %p") return time_obj.hour except ValueError: return None def gradio_wrapper( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change ): clinic_start = convert_to_24h(clinic_start_ampm) clinic_end = convert_to_24h(clinic_end_ampm) results, plot_img, schedule_csv_path, gantt_path = optimize_staffing( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change ) # Load plot images if they exist plot_img = plot_img if plot_img and os.path.exists(plot_img) else None gantt_img = gantt_path if gantt_path and os.path.exists(gantt_path) else None return results, plot_img, schedule_csv_path, gantt_img # Define Gradio UI am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] iface = gr.Interface( fn=gradio_wrapper, inputs=[ gr.File(label="Upload CSV"), gr.Number(label="Beds per Staff", value=3), gr.Number(label="Max Hours per Staff", value=40), gr.Number(label="Hours per Cycle", value=4), gr.Number(label="Rest Days per Week", value=2), gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="08:00 AM"), gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="08:00 PM"), gr.Number(label="Overlap Time", value=0), gr.Number(label="Max Start Time Change", value=2) ], outputs=[ gr.Textbox(label="Optimization Results"), gr.Image(label="Schedule Visualization"), gr.File(label="Schedule CSV"), gr.Image(label="Gantt Chart"), ], title="Staff Scheduling Optimizer", description="Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule." ) # Launch the Gradio app iface.launch(share=True)