import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates from matplotlib.colors import LinearSegmentedColormap import plotly.figure_factory as ff import plotly.express as px from io import StringIO, BytesIO from datetime import datetime, timedelta import random import math import tempfile import os # Function to parse CSV and schedule staff def schedule_staff( demand_csv, clinic_start_time="07:00", cycle_lengths="4,7,6,7", beds_per_staff=3, max_hours=240, rest_days=1, overlap=30, shift_min=6, shift_max=12, staff_count=0 ): # Convert None to default value staff_count = 0 if staff_count is None else staff_count beds_per_staff = 3 if beds_per_staff is None else beds_per_staff # Calculate staff ratio (inverse of beds per staff) staff_ratio = 1.0 / float(beds_per_staff) # Parse the CSV data try: demand_data = pd.read_csv(StringIO(demand_csv), header=0) except: # If no header, try to parse without header demand_data = pd.read_csv(StringIO(demand_csv), header=None) # Rename columns cols = [f"day"] cols.extend([f"cycle{i+1}" for i in range(len(demand_data.columns)-1)]) demand_data.columns = cols # Parse cycle lengths cycle_lengths = [int(x.strip()) for x in cycle_lengths.split(",")] # Convert clinic start time to datetime start_time = datetime.strptime(clinic_start_time, "%H:%M") # Calculate cycle times cycle_times = [] current_time = start_time for length in cycle_lengths: end_time = current_time + timedelta(hours=length) cycle_times.append((current_time, end_time)) current_time = end_time # Calculate total days total_days = len(demand_data) # Calculate minimum staff needed for each cycle on each day min_staff_per_cycle = {} for day in range(total_days): for cycle in range(len(cycle_lengths)): cycle_col = f"cycle{cycle+1}" if f"cycle{cycle+1}" in demand_data.columns else cycle+1 demand = demand_data.iloc[day][cycle_col] if pd.isna(demand): demand = 0 min_staff_per_cycle[(day, cycle)] = math.ceil(demand * staff_ratio) # Initialize optimization if staff_count <= 0: # Start with a reasonable number of staff max_demand_day = max(sum(min_staff_per_cycle.get((day, cycle), 0) for cycle in range(len(cycle_lengths))) for day in range(total_days)) initial_staff = max(max_demand_day, math.ceil(sum(min_staff_per_cycle.values()) / (shift_max * total_days / 30 * max_hours))) else: initial_staff = staff_count # Create schedule using a greedy algorithm with constraints best_schedule = None best_staff_count = float('inf') # Try multiple iterations with different random seeds for attempt in range(5): schedule = create_schedule( total_days, cycle_times, min_staff_per_cycle, initial_staff, max_hours, rest_days, overlap / 60.0, # Convert to hours shift_min, shift_max, attempt ) # Count actual staff used staff_used = len(set(staff for day_schedule in schedule.values() for staff, _, _ in day_schedule)) if staff_used < best_staff_count: best_schedule = schedule best_staff_count = staff_used # If user specified staff count, ensure we use exactly that many if staff_count > 0 and staff_count > best_staff_count: best_schedule = create_schedule( total_days, cycle_times, min_staff_per_cycle, staff_count, max_hours, rest_days, overlap / 60.0, shift_min, shift_max, fixed_staff=staff_count ) best_staff_count = staff_count # Generate CSV output csv_output = generate_csv_output(best_schedule, total_days, cycle_times) # Create a temporary file for the CSV with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w+') as temp_file: csv_output.to_csv(temp_file.name, index=False) temp_file_path = temp_file.name # Create Gantt chart fig = create_gantt_chart(best_schedule, total_days, cycle_times, best_staff_count) return temp_file_path, fig def create_schedule(total_days, cycle_times, min_staff_per_cycle, initial_staff, max_hours, rest_days, overlap, shift_min, shift_max, seed=0, fixed_staff=None): random.seed(seed) # Initialize empty schedule schedule = {day: [] for day in range(total_days)} # Track staff hours and consecutive days staff_hours = {staff: 0 for staff in range(initial_staff)} staff_last_day = {staff: -2 for staff in range(initial_staff)} # -2 means never worked staff_last_shift = {staff: None for staff in range(initial_staff)} # Last shift start time # For each day for day in range(total_days): # For each cycle for cycle in range(len(cycle_times)): cycle_start, cycle_end = cycle_times[cycle] cycle_duration = (cycle_end - cycle_start).total_seconds() / 3600 # in hours # Calculate how many staff needed for this cycle staff_needed = min_staff_per_cycle.get((day, cycle), 0) # Skip if no staff needed if staff_needed == 0: continue # Find available staff for this cycle available_staff = [] for staff in range(initial_staff): # Check if staff has a rest day if staff_last_day[staff] != -2 and (day - staff_last_day[staff]) % 7 < rest_days: continue # Check if staff has reached max hours if staff_hours[staff] + cycle_duration > max_hours: continue # Staff is available available_staff.append(staff) # Sort available staff by priority # Priority: 1) Staff who worked the previous day (for consecutive shift constraint) # 2) Staff with fewer hours def staff_priority(staff): consecutive_priority = 1 if staff_last_day[staff] == day - 1 else 0 hours_priority = -staff_hours[staff] # Negative so fewer hours = higher priority return (consecutive_priority, hours_priority) available_staff.sort(key=staff_priority, reverse=True) # Assign staff to this cycle assigned_staff = [] for i in range(min(staff_needed, len(available_staff))): staff = available_staff[i] # Determine shift start and end times shift_start = cycle_start shift_end = cycle_end # Apply consecutive day constraint (±1 hour flexibility) if staff_last_day[staff] == day - 1 and staff_last_shift[staff] is not None: # Try to keep the same start time as previous day, with ±1 hour flexibility prev_start = staff_last_shift[staff] time_diff = (shift_start.hour - prev_start.hour) + (shift_start.minute - prev_start.minute) / 60 if abs(time_diff) > 1: # Adjust shift start time to be within 1 hour of previous day if time_diff > 1: shift_start = datetime(shift_start.year, shift_start.month, shift_start.day, prev_start.hour + 1, prev_start.minute) else: shift_start = datetime(shift_start.year, shift_start.month, shift_start.day, prev_start.hour - 1, prev_start.minute) # Ensure shift duration is between min and max shift_duration = (shift_end - shift_start).total_seconds() / 3600 if shift_duration < shift_min: # Extend shift to meet minimum duration shift_end = shift_start + timedelta(hours=shift_min) elif shift_duration > shift_max: # Shorten shift to meet maximum duration shift_end = shift_start + timedelta(hours=shift_max) # Add overlap for handover if i > 0: # Overlap with previous staff shift_start = shift_start - timedelta(hours=overlap) # Update staff information staff_hours[staff] += (shift_end - shift_start).total_seconds() / 3600 staff_last_day[staff] = day staff_last_shift[staff] = shift_start # Add to schedule schedule[day].append((staff, shift_start, shift_end)) assigned_staff.append(staff) # If we couldn't assign enough staff, try to extend shifts of already assigned staff if len(assigned_staff) < staff_needed and not fixed_staff: # This is a simplified approach - in a real system, you'd need more sophisticated logic pass return schedule def generate_csv_output(schedule, total_days, cycle_times): # Create a DataFrame for the schedule rows = [] for day in range(total_days): for staff, start, end in schedule.get(day, []): rows.append({ 'Day': day + 1, 'Staff': f'Staff {staff + 1}', 'Start Time': start.strftime('%H:%M'), 'End Time': end.strftime('%H:%M'), 'Duration (hours)': round((end - start).total_seconds() / 3600, 2) }) return pd.DataFrame(rows) def create_gantt_chart(schedule, total_days, cycle_times, staff_count): # Prepare data for Gantt chart df_list = [] for day in range(total_days): for staff, start, end in schedule.get(day, []): # Convert to datetime for plotting plot_start = datetime(2023, 1, 1) + timedelta(days=day, hours=start.hour, minutes=start.minute) plot_end = datetime(2023, 1, 1) + timedelta(days=day, hours=end.hour, minutes=end.minute) df_list.append({ 'Task': f'Staff {staff + 1}', 'Start': plot_start, 'Finish': plot_end, 'Day': day + 1 }) if not df_list: # Return empty plot if no schedule fig = plt.figure(figsize=(12, 8)) plt.text(0.5, 0.5, "No valid schedule found", ha='center', va='center') return fig df = pd.DataFrame(df_list) # Create color map colors = px.colors.qualitative.Plotly[:staff_count] if staff_count > len(colors): # Repeat colors if needed colors = colors * (staff_count // len(colors) + 1) # Create Gantt chart using plotly fig = ff.create_gantt(df, colors=colors, index_col='Task', show_colorbar=True, group_tasks=True) # Update layout fig.update_layout( title='Staff Schedule Gantt Chart', xaxis_title='Day and Time', yaxis_title='Staff', height=600, margin=dict(l=50, r=50, t=100, b=100) ) return fig # Gradio interface setup demo = gr.Interface( fn=schedule_staff, inputs=[ gr.Textbox(label="Demand CSV Data", lines=10), gr.Textbox(label="Clinic Start Time (HH:MM)", value="07:00"), gr.Textbox(label="Cycle Lengths (comma-separated hours)", value="4,7,6,7"), gr.Number(label="Beds per Staff", value=3), gr.Number(label="Max Monthly Hours", value=240), gr.Number(label="Rest Days per Week", value=1), gr.Number(label="Shift Overlap (minutes)", value=30), gr.Number(label="Min Shift Duration (hours)", value=6), gr.Number(label="Max Shift Duration (hours)", value=12), gr.Number(label="Fixed Staff Count (0 for auto-optimize)", value=0) ], outputs=[ gr.File(label="Optimized Schedule CSV"), gr.Plot(label="Schedule Gantt Chart") ], title="Advanced Staff Scheduling App", description="Upload demand data and set constraints to generate an optimized staff schedule." ) demo.launch()