import numpy as np import pandas as pd import plotly.figure_factory as ff import gradio as gr from datetime import datetime, timedelta import math import random from copy import deepcopy # Load and process data def load_data(file_path): df = pd.read_csv(file_path) return df # Convert demand data to required staff def calculate_staff_needed(demand_data, beds_per_staff=3): staff_needed = {} for column in demand_data.columns: if column.startswith('cycle'): staff_needed[column] = [] for value in demand_data[column]: if pd.isna(value): staff_needed[column].append(0) else: staff_needed[column].append(math.ceil(value / beds_per_staff)) return pd.DataFrame({'Date': demand_data['Date'], **staff_needed}) # Create time slots based on clinic hours def create_time_slots(start_time, end_time, cycle_duration=5): start_hour = int(start_time.split(':')[0]) start_minute = int(start_time.split(':')[1]) end_hour = int(end_time.split(':')[0]) end_minute = int(end_time.split(':')[1]) start_datetime = datetime(2025, 1, 1, start_hour, start_minute) end_datetime = datetime(2025, 1, 1, end_hour, end_minute) if end_datetime <= start_datetime: end_datetime += timedelta(days=1) current_time = start_datetime time_slots = [] while current_time < end_datetime: next_time = current_time + timedelta(hours=cycle_duration) if next_time > end_datetime: next_time = end_datetime time_slots.append({ 'start': current_time.strftime('%H:%M'), 'end': next_time.strftime('%H:%M') }) current_time = next_time return time_slots # Check if a shift respects duration constraints def is_valid_shift_duration(shift, max_duration=12): start_hour = int(shift['start_time'].split(':')[0]) start_minute = int(shift['start_time'].split(':')[1]) end_hour = int(shift['end_time'].split(':')[0]) end_minute = int(shift['end_time'].split(':')[1]) start = start_hour * 60 + start_minute end = end_hour * 60 + end_minute if end < start: # Overnight shift end += 24 * 60 duration = (end - start) / 60 # in hours return duration <= max_duration # Check if shift change follows constraints def is_valid_shift_change(prev_shift, curr_shift, allowed_diff=1): if not prev_shift: return True if prev_shift['is_rest_day']: return True prev_start = int(prev_shift['start_time'].split(':')[0]) * 60 + int(prev_shift['start_time'].split(':')[1]) curr_start = int(curr_shift['start_time'].split(':')[0]) * 60 + int(curr_shift['start_time'].split(':')[1]) diff_hours = abs(curr_start - prev_start) / 60 return diff_hours <= allowed_diff # Check weekly rest day requirement def has_weekly_rest_day(schedule, current_day_idx, days_in_week=7): if current_day_idx < days_in_week: # Count rest days in the partial week rest_days = sum(1 for i in range(current_day_idx) if schedule[i]['is_rest_day']) return rest_days > 0 else: # Check the previous 6 days plus current day rest_days = sum(1 for i in range(current_day_idx - 6, current_day_idx + 1) if schedule[i]['is_rest_day']) return rest_days > 0 # Check total monthly hours def calculate_monthly_hours(schedule): total_hours = 0 for day in schedule: if not day['is_rest_day']: start_hour = int(day['start_time'].split(':')[0]) start_minute = int(day['start_time'].split(':')[1]) end_hour = int(day['end_time'].split(':')[0]) end_minute = int(day['end_time'].split(':')[1]) start = start_hour * 60 + start_minute end = end_hour * 60 + end_minute if end < start: # Overnight shift end += 24 * 60 shift_hours = (end - start) / 60 total_hours += shift_hours return total_hours # Generate all possible shifts for a day def generate_possible_shifts(time_slots, max_duration=12, handover_overlap=30): possible_shifts = [] for i in range(len(time_slots)): for j in range(i, len(time_slots)): start_time = time_slots[i]['start'] end_time = time_slots[j]['end'] # Add handover overlap end_hour, end_minute = map(int, end_time.split(':')) end_minute += handover_overlap if end_minute >= 60: end_hour += 1 end_minute -= 60 end_time = f"{end_hour:02d}:{end_minute:02d}" shift = { 'start_time': start_time, 'end_time': end_time, 'cycles': list(range(i, j+1)) } if is_valid_shift_duration(shift, max_duration): possible_shifts.append(shift) return possible_shifts # Primary scheduling algorithm def schedule_staff(staff_needed, time_slots, start_date, clinic_start, clinic_end, max_duration=12, beds_per_staff=3, max_monthly_hours=234, handover_overlap=30): dates = pd.to_datetime(staff_needed['Date']) date_range = [d.strftime('%Y-%m-%d') for d in dates] # Create demand array demand_array = np.zeros((len(date_range), len(time_slots))) for i, date in enumerate(date_range): for j, cycle in enumerate(staff_needed.columns[1:]): # Skip 'Date' column if j < len(time_slots): demand_array[i, j] = staff_needed[cycle].iloc[i] # Generate all possible shifts possible_shifts = generate_possible_shifts(time_slots, max_duration, handover_overlap) # Initialize staff schedules staff_schedules = [] # Helper function to create a rest day entry def create_rest_day(date): return { 'date': date, 'is_rest_day': True, 'start_time': '', 'end_time': '', 'cycles': [] } # Initialize coverage array coverage = np.zeros_like(demand_array) # Function to calculate fitness of current solution def calculate_fitness(coverage, demand): # Calculate how well coverage meets demand shortfall = np.maximum(0, demand - coverage) return -np.sum(shortfall) # Negative because we want to minimize shortfall # Add staff members until demand is met iteration_count = 0 max_iterations = 1000 while np.any(coverage < demand_array) and iteration_count < max_iterations: # Create a new staff schedule new_staff = [] for i, date in enumerate(date_range): # Decide whether this is a rest day if i > 0 and not has_weekly_rest_day(new_staff, i-1): new_staff.append(create_rest_day(date)) continue # Find the best shift for this day best_shift = None best_improvement = -1 # Try rest day temp_schedule = deepcopy(new_staff) temp_schedule.append(create_rest_day(date)) if has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and calculate_monthly_hours(temp_schedule) <= max_monthly_hours: # Rest day is valid best_shift = create_rest_day(date) # Try each possible shift for shift in possible_shifts: # Check if this shift would help with uncovered demand will_help = False for cycle_idx in shift['cycles']: if cycle_idx < coverage.shape[1] and coverage[i, cycle_idx] < demand_array[i, cycle_idx]: will_help = True break if not will_help: continue # Create temporary shift for validation temp_shift = { 'date': date, 'is_rest_day': False, 'start_time': shift['start_time'], 'end_time': shift['end_time'], 'cycles': shift['cycles'] } # Check if shift change is valid if i > 0 and not is_valid_shift_change(new_staff[-1], temp_shift): continue # Check if adding this shift would exceed monthly hours temp_schedule = deepcopy(new_staff) temp_schedule.append(temp_shift) if calculate_monthly_hours(temp_schedule) > max_monthly_hours: continue # Check weekly rest day requirement if not has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and i < len(date_range) - 1: continue # Calculate improvement in coverage temp_coverage = coverage.copy() for cycle_idx in shift['cycles']: if cycle_idx < temp_coverage.shape[1]: temp_coverage[i, cycle_idx] += 1 improvement = calculate_fitness(temp_coverage, demand_array) - calculate_fitness(coverage, demand_array) if improvement > best_improvement: best_improvement = improvement best_shift = temp_shift # If no valid shift is found, use a rest day if best_shift is None: best_shift = create_rest_day(date) new_staff.append(best_shift) # Update coverage if not a rest day if not best_shift['is_rest_day']: for cycle_idx in best_shift['cycles']: if cycle_idx < coverage.shape[1]: coverage[i, cycle_idx] += 1 # Add the new staff schedule staff_schedules.append(new_staff) # Check if we've met all demands if not np.any(coverage < demand_array): break iteration_count += 1 return staff_schedules, coverage, demand_array # Convert schedules to CSV format def schedules_to_csv(staff_schedules): rows = [] for staff_idx, schedule in enumerate(staff_schedules): for day in schedule: if not day['is_rest_day']: rows.append({ 'Staff ID': f'Staff {staff_idx+1}', 'Date': day['date'], 'Start Time': day['start_time'], 'End Time': day['end_time'], 'Is Rest Day': 'No' }) else: rows.append({ 'Staff ID': f'Staff {staff_idx+1}', 'Date': day['date'], 'Start Time': '', 'End Time': '', 'Is Rest Day': 'Yes' }) return pd.DataFrame(rows) # Generate Gantt chart def create_gantt_chart(staff_schedules, clinic_start, clinic_end): tasks = [] colors = {} for staff_idx, schedule in enumerate(staff_schedules): staff_id = f'Staff {staff_idx+1}' colors[staff_id] = f'rgb({random.randint(50, 200)}, {random.randint(50, 200)}, {random.randint(50, 200)})' for day in schedule: if not day['is_rest_day']: date = day['date'] start_time = day['start_time'] end_time = day['end_time'] start_dt = f"{date} {start_time}" end_dt = f"{date} {end_time}" # Handle overnight shifts start_hour = int(start_time.split(':')[0]) end_hour = int(end_time.split(':')[0]) if end_hour < start_hour: end_date = (pd.to_datetime(date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d') end_dt = f"{end_date} {end_time}" tasks.append({ 'Task': staff_id, 'Start': start_dt, 'Finish': end_dt, 'Resource': 'Shift' }) else: date = day['date'] start_dt = f"{date} {clinic_start}" # Calculate end of day end_date = date end_dt = f"{end_date} {clinic_end}" tasks.append({ 'Task': staff_id, 'Start': start_dt, 'Finish': end_dt, 'Resource': 'Rest Day' }) df_tasks = pd.DataFrame(tasks) # Convert string dates to datetime df_tasks['Start'] = pd.to_datetime(df_tasks['Start']) df_tasks['Finish'] = pd.to_datetime(df_tasks['Finish']) # Create the Gantt chart fig = ff.create_gantt(df_tasks, colors=colors, index_col='Resource', show_colorbar=True, group_tasks=True, showgrid_x=True, title='Staff Schedule Gantt Chart') return fig # Validate coverage def validate_coverage(coverage, demand): is_valid = np.all(coverage >= demand) coverage_stats = { 'Total Demand': np.sum(demand), 'Total Coverage': np.sum(coverage), 'Coverage Percentage': f"{100 * np.sum(coverage) / np.sum(demand) if np.sum(demand) > 0 else 100:.2f}%", 'Uncovered Slots': np.sum(coverage < demand) } return is_valid, coverage_stats # Gradio interface def nurse_scheduling_app( csv_file, beds_per_staff=3, max_shift_duration=12, handover_overlap=30, max_monthly_hours=234, clinic_start_time="08:00", clinic_end_time="20:00", cycle_duration=5 ): try: # Load data demand_data = load_data(csv_file.name) # Calculate staff needed staff_needed = calculate_staff_needed(demand_data, beds_per_staff) # Create time slots time_slots = create_time_slots(clinic_start_time, clinic_end_time, cycle_duration) # Run scheduling algorithm start_date = pd.to_datetime(demand_data['Date'].iloc[0]).strftime('%Y-%m-%d') staff_schedules, coverage, demand = schedule_staff( staff_needed, time_slots, start_date, clinic_start_time, clinic_end_time, max_shift_duration, beds_per_staff, max_monthly_hours, handover_overlap ) # Convert to CSV schedule_df = schedules_to_csv(staff_schedules) # Create Gantt chart gantt_chart = create_gantt_chart(staff_schedules, clinic_start_time, clinic_end_time) # Validate coverage is_valid, coverage_stats = validate_coverage(coverage, demand) # Summary summary = f""" ## Scheduling Summary - Total Staff Required: {len(staff_schedules)} - 100% Coverage Achieved: {"Yes" if is_valid else "No"} - Total Demand (staff-shifts): {coverage_stats['Total Demand']} - Total Coverage Provided: {coverage_stats['Total Coverage']} - Coverage Percentage: {coverage_stats['Coverage Percentage']} - Uncovered Slots: {coverage_stats['Uncovered Slots']} """ # Save to CSV csv_path = "schedule_output.csv" schedule_df.to_csv(csv_path, index=False) return summary, gantt_chart, schedule_df, csv_path except Exception as e: return f"Error: {str(e)}", None, None, None # Create Gradio interface def create_interface(): with gr.Blocks() as interface: gr.Markdown("# Staff Scheduling Optimizer") with gr.Row(): with gr.Column(): # Left panel for inputs gr.Markdown("### Clinic Parameters") # Use Markdown instead of Group label with gr.Group(): # Group without label csv_input = gr.File(label="Upload CSV") beds_per_staff = gr.Number(label="Beds per Staff", value=3) max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160) hours_per_cycle = gr.Number(label="Hours per Cycle", value=4) rest_days_per_week = gr.Number(label="Rest Days per Week", value=2) gr.Markdown("### Time Parameters") # Use Markdown for section headers with gr.Group(): clinic_start = gr.Dropdown( label="Clinic Start Hour", choices=am_pm_times, value="08:00 AM" ) clinic_end = gr.Dropdown( label="Clinic End Hour", choices=am_pm_times, value="08:00 PM" ) overlap_time = gr.Number(label="Overlap Time", value=0) max_start_time_change = gr.Number(label="Max Start Time Change", value=2) with gr.Column(): # Right panel for outputs with gr.Tabs(): with gr.TabItem("Schedule"): schedule_output = gr.Dataframe() with gr.TabItem("Visualization"): gantt_chart = gr.Plot() with gr.TabItem("Statistics"): stats_output = gr.Markdown() optimize_btn = gr.Button("Optimize Schedule", variant="primary") # Connect the button to your optimization function optimize_btn.click( fn=nurse_scheduling_app, inputs=[ csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change ], outputs=[schedule_output, gantt_chart, stats_output] ) return interface # Define your time options am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] # Launch the interface if __name__ == "__main__": interface = create_interface() interface.launch(share=True)