| | import numpy as np |
| | import pandas as pd |
| | import plotly.figure_factory as ff |
| | import gradio as gr |
| | from datetime import datetime, timedelta |
| | import math |
| | import random |
| | from copy import deepcopy |
| |
|
| | |
| | def load_data(file_path): |
| | df = pd.read_csv(file_path) |
| | return df |
| |
|
| | |
| | def calculate_staff_needed(demand_data, beds_per_staff=3): |
| | staff_needed = {} |
| | for column in demand_data.columns: |
| | if column.startswith('cycle'): |
| | staff_needed[column] = [] |
| | for value in demand_data[column]: |
| | if pd.isna(value): |
| | staff_needed[column].append(0) |
| | else: |
| | staff_needed[column].append(math.ceil(value / beds_per_staff)) |
| | |
| | return pd.DataFrame({'Date': demand_data['Date'], **staff_needed}) |
| |
|
| | |
| | def create_time_slots(start_time, end_time, cycle_duration=5): |
| | start_hour = int(start_time.split(':')[0]) |
| | start_minute = int(start_time.split(':')[1]) |
| | end_hour = int(end_time.split(':')[0]) |
| | end_minute = int(end_time.split(':')[1]) |
| | |
| | start_datetime = datetime(2025, 1, 1, start_hour, start_minute) |
| | end_datetime = datetime(2025, 1, 1, end_hour, end_minute) |
| | |
| | if end_datetime <= start_datetime: |
| | end_datetime += timedelta(days=1) |
| | |
| | current_time = start_datetime |
| | time_slots = [] |
| | |
| | while current_time < end_datetime: |
| | next_time = current_time + timedelta(hours=cycle_duration) |
| | if next_time > end_datetime: |
| | next_time = end_datetime |
| | |
| | time_slots.append({ |
| | 'start': current_time.strftime('%H:%M'), |
| | 'end': next_time.strftime('%H:%M') |
| | }) |
| | |
| | current_time = next_time |
| | |
| | return time_slots |
| |
|
| | |
| | def is_valid_shift_duration(shift, max_duration=12): |
| | start_hour = int(shift['start_time'].split(':')[0]) |
| | start_minute = int(shift['start_time'].split(':')[1]) |
| | end_hour = int(shift['end_time'].split(':')[0]) |
| | end_minute = int(shift['end_time'].split(':')[1]) |
| | |
| | start = start_hour * 60 + start_minute |
| | end = end_hour * 60 + end_minute |
| | |
| | if end < start: |
| | end += 24 * 60 |
| | |
| | duration = (end - start) / 60 |
| | return duration <= max_duration |
| |
|
| | |
| | def is_valid_shift_change(prev_shift, curr_shift, allowed_diff=1): |
| | if not prev_shift: |
| | return True |
| | |
| | if prev_shift['is_rest_day']: |
| | return True |
| | |
| | prev_start = int(prev_shift['start_time'].split(':')[0]) * 60 + int(prev_shift['start_time'].split(':')[1]) |
| | curr_start = int(curr_shift['start_time'].split(':')[0]) * 60 + int(curr_shift['start_time'].split(':')[1]) |
| | |
| | diff_hours = abs(curr_start - prev_start) / 60 |
| | return diff_hours <= allowed_diff |
| |
|
| | |
| | def has_weekly_rest_day(schedule, current_day_idx, days_in_week=7): |
| | if current_day_idx < days_in_week: |
| | |
| | rest_days = sum(1 for i in range(current_day_idx) if schedule[i]['is_rest_day']) |
| | return rest_days > 0 |
| | else: |
| | |
| | rest_days = sum(1 for i in range(current_day_idx - 6, current_day_idx + 1) if schedule[i]['is_rest_day']) |
| | return rest_days > 0 |
| |
|
| | |
| | def calculate_monthly_hours(schedule): |
| | total_hours = 0 |
| | for day in schedule: |
| | if not day['is_rest_day']: |
| | start_hour = int(day['start_time'].split(':')[0]) |
| | start_minute = int(day['start_time'].split(':')[1]) |
| | end_hour = int(day['end_time'].split(':')[0]) |
| | end_minute = int(day['end_time'].split(':')[1]) |
| | |
| | start = start_hour * 60 + start_minute |
| | end = end_hour * 60 + end_minute |
| | |
| | if end < start: |
| | end += 24 * 60 |
| | |
| | shift_hours = (end - start) / 60 |
| | total_hours += shift_hours |
| | |
| | return total_hours |
| |
|
| | |
| | def generate_possible_shifts(time_slots, max_duration=12, handover_overlap=30): |
| | possible_shifts = [] |
| | |
| | for i in range(len(time_slots)): |
| | for j in range(i, len(time_slots)): |
| | start_time = time_slots[i]['start'] |
| | end_time = time_slots[j]['end'] |
| | |
| | |
| | end_hour, end_minute = map(int, end_time.split(':')) |
| | end_minute += handover_overlap |
| | if end_minute >= 60: |
| | end_hour += 1 |
| | end_minute -= 60 |
| | end_time = f"{end_hour:02d}:{end_minute:02d}" |
| | |
| | shift = { |
| | 'start_time': start_time, |
| | 'end_time': end_time, |
| | 'cycles': list(range(i, j+1)) |
| | } |
| | |
| | if is_valid_shift_duration(shift, max_duration): |
| | possible_shifts.append(shift) |
| | |
| | return possible_shifts |
| |
|
| | |
| | def schedule_staff(staff_needed, time_slots, start_date, clinic_start, clinic_end, |
| | max_duration=12, beds_per_staff=3, max_monthly_hours=234, handover_overlap=30): |
| | |
| | dates = pd.to_datetime(staff_needed['Date']) |
| | date_range = [d.strftime('%Y-%m-%d') for d in dates] |
| | |
| | |
| | demand_array = np.zeros((len(date_range), len(time_slots))) |
| | for i, date in enumerate(date_range): |
| | for j, cycle in enumerate(staff_needed.columns[1:]): |
| | if j < len(time_slots): |
| | demand_array[i, j] = staff_needed[cycle].iloc[i] |
| | |
| | |
| | possible_shifts = generate_possible_shifts(time_slots, max_duration, handover_overlap) |
| | |
| | |
| | staff_schedules = [] |
| | |
| | |
| | def create_rest_day(date): |
| | return { |
| | 'date': date, |
| | 'is_rest_day': True, |
| | 'start_time': '', |
| | 'end_time': '', |
| | 'cycles': [] |
| | } |
| | |
| | |
| | coverage = np.zeros_like(demand_array) |
| | |
| | |
| | def calculate_fitness(coverage, demand): |
| | |
| | shortfall = np.maximum(0, demand - coverage) |
| | return -np.sum(shortfall) |
| | |
| | |
| | iteration_count = 0 |
| | max_iterations = 1000 |
| | |
| | while np.any(coverage < demand_array) and iteration_count < max_iterations: |
| | |
| | new_staff = [] |
| | for i, date in enumerate(date_range): |
| | |
| | if i > 0 and not has_weekly_rest_day(new_staff, i-1): |
| | new_staff.append(create_rest_day(date)) |
| | continue |
| | |
| | |
| | best_shift = None |
| | best_improvement = -1 |
| | |
| | |
| | temp_schedule = deepcopy(new_staff) |
| | temp_schedule.append(create_rest_day(date)) |
| | if has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and calculate_monthly_hours(temp_schedule) <= max_monthly_hours: |
| | |
| | best_shift = create_rest_day(date) |
| | |
| | |
| | for shift in possible_shifts: |
| | |
| | will_help = False |
| | for cycle_idx in shift['cycles']: |
| | if cycle_idx < coverage.shape[1] and coverage[i, cycle_idx] < demand_array[i, cycle_idx]: |
| | will_help = True |
| | break |
| | |
| | if not will_help: |
| | continue |
| | |
| | |
| | temp_shift = { |
| | 'date': date, |
| | 'is_rest_day': False, |
| | 'start_time': shift['start_time'], |
| | 'end_time': shift['end_time'], |
| | 'cycles': shift['cycles'] |
| | } |
| | |
| | |
| | if i > 0 and not is_valid_shift_change(new_staff[-1], temp_shift): |
| | continue |
| | |
| | |
| | temp_schedule = deepcopy(new_staff) |
| | temp_schedule.append(temp_shift) |
| | |
| | if calculate_monthly_hours(temp_schedule) > max_monthly_hours: |
| | continue |
| | |
| | |
| | if not has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and i < len(date_range) - 1: |
| | continue |
| | |
| | |
| | temp_coverage = coverage.copy() |
| | for cycle_idx in shift['cycles']: |
| | if cycle_idx < temp_coverage.shape[1]: |
| | temp_coverage[i, cycle_idx] += 1 |
| | |
| | improvement = calculate_fitness(temp_coverage, demand_array) - calculate_fitness(coverage, demand_array) |
| | |
| | if improvement > best_improvement: |
| | best_improvement = improvement |
| | best_shift = temp_shift |
| | |
| | |
| | if best_shift is None: |
| | best_shift = create_rest_day(date) |
| | |
| | new_staff.append(best_shift) |
| | |
| | |
| | if not best_shift['is_rest_day']: |
| | for cycle_idx in best_shift['cycles']: |
| | if cycle_idx < coverage.shape[1]: |
| | coverage[i, cycle_idx] += 1 |
| | |
| | |
| | staff_schedules.append(new_staff) |
| | |
| | |
| | if not np.any(coverage < demand_array): |
| | break |
| | |
| | iteration_count += 1 |
| | |
| | return staff_schedules, coverage, demand_array |
| |
|
| | |
| | def schedules_to_csv(staff_schedules): |
| | rows = [] |
| | for staff_idx, schedule in enumerate(staff_schedules): |
| | for day in schedule: |
| | if not day['is_rest_day']: |
| | rows.append({ |
| | 'Staff ID': f'Staff {staff_idx+1}', |
| | 'Date': day['date'], |
| | 'Start Time': day['start_time'], |
| | 'End Time': day['end_time'], |
| | 'Is Rest Day': 'No' |
| | }) |
| | else: |
| | rows.append({ |
| | 'Staff ID': f'Staff {staff_idx+1}', |
| | 'Date': day['date'], |
| | 'Start Time': '', |
| | 'End Time': '', |
| | 'Is Rest Day': 'Yes' |
| | }) |
| | |
| | return pd.DataFrame(rows) |
| |
|
| | |
| | def create_gantt_chart(staff_schedules, clinic_start, clinic_end): |
| | tasks = [] |
| | colors = {} |
| | |
| | for staff_idx, schedule in enumerate(staff_schedules): |
| | staff_id = f'Staff {staff_idx+1}' |
| | colors[staff_id] = f'rgb({random.randint(50, 200)}, {random.randint(50, 200)}, {random.randint(50, 200)})' |
| | |
| | for day in schedule: |
| | if not day['is_rest_day']: |
| | date = day['date'] |
| | start_time = day['start_time'] |
| | end_time = day['end_time'] |
| | |
| | start_dt = f"{date} {start_time}" |
| | end_dt = f"{date} {end_time}" |
| | |
| | |
| | start_hour = int(start_time.split(':')[0]) |
| | end_hour = int(end_time.split(':')[0]) |
| | if end_hour < start_hour: |
| | end_date = (pd.to_datetime(date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d') |
| | end_dt = f"{end_date} {end_time}" |
| | |
| | tasks.append({ |
| | 'Task': staff_id, |
| | 'Start': start_dt, |
| | 'Finish': end_dt, |
| | 'Resource': 'Shift' |
| | }) |
| | else: |
| | date = day['date'] |
| | start_dt = f"{date} {clinic_start}" |
| | |
| | |
| | end_date = date |
| | end_dt = f"{end_date} {clinic_end}" |
| | |
| | tasks.append({ |
| | 'Task': staff_id, |
| | 'Start': start_dt, |
| | 'Finish': end_dt, |
| | 'Resource': 'Rest Day' |
| | }) |
| | |
| | df_tasks = pd.DataFrame(tasks) |
| | |
| | |
| | df_tasks['Start'] = pd.to_datetime(df_tasks['Start']) |
| | df_tasks['Finish'] = pd.to_datetime(df_tasks['Finish']) |
| | |
| | |
| | fig = ff.create_gantt(df_tasks, colors=colors, index_col='Resource', |
| | show_colorbar=True, group_tasks=True, showgrid_x=True, |
| | title='Staff Schedule Gantt Chart') |
| | |
| | return fig |
| |
|
| | |
| | def validate_coverage(coverage, demand): |
| | is_valid = np.all(coverage >= demand) |
| | coverage_stats = { |
| | 'Total Demand': np.sum(demand), |
| | 'Total Coverage': np.sum(coverage), |
| | 'Coverage Percentage': f"{100 * np.sum(coverage) / np.sum(demand) if np.sum(demand) > 0 else 100:.2f}%", |
| | 'Uncovered Slots': np.sum(coverage < demand) |
| | } |
| | return is_valid, coverage_stats |
| |
|
| | |
| | def nurse_scheduling_app( |
| | csv_file, |
| | beds_per_staff=3, |
| | max_shift_duration=12, |
| | handover_overlap=30, |
| | max_monthly_hours=234, |
| | clinic_start_time="08:00", |
| | clinic_end_time="20:00", |
| | cycle_duration=5 |
| | ): |
| | try: |
| | |
| | demand_data = load_data(csv_file.name) |
| | |
| | |
| | staff_needed = calculate_staff_needed(demand_data, beds_per_staff) |
| | |
| | |
| | time_slots = create_time_slots(clinic_start_time, clinic_end_time, cycle_duration) |
| | |
| | |
| | start_date = pd.to_datetime(demand_data['Date'].iloc[0]).strftime('%Y-%m-%d') |
| | staff_schedules, coverage, demand = schedule_staff( |
| | staff_needed, |
| | time_slots, |
| | start_date, |
| | clinic_start_time, |
| | clinic_end_time, |
| | max_shift_duration, |
| | beds_per_staff, |
| | max_monthly_hours, |
| | handover_overlap |
| | ) |
| | |
| | |
| | schedule_df = schedules_to_csv(staff_schedules) |
| | |
| | |
| | gantt_chart = create_gantt_chart(staff_schedules, clinic_start_time, clinic_end_time) |
| | |
| | |
| | is_valid, coverage_stats = validate_coverage(coverage, demand) |
| | |
| | |
| | summary = f""" |
| | ## Scheduling Summary |
| | |
| | - Total Staff Required: {len(staff_schedules)} |
| | - 100% Coverage Achieved: {"Yes" if is_valid else "No"} |
| | - Total Demand (staff-shifts): {coverage_stats['Total Demand']} |
| | - Total Coverage Provided: {coverage_stats['Total Coverage']} |
| | - Coverage Percentage: {coverage_stats['Coverage Percentage']} |
| | - Uncovered Slots: {coverage_stats['Uncovered Slots']} |
| | """ |
| | |
| | |
| | csv_path = "schedule_output.csv" |
| | schedule_df.to_csv(csv_path, index=False) |
| | |
| | return summary, gantt_chart, schedule_df, csv_path |
| | |
| | except Exception as e: |
| | return f"Error: {str(e)}", None, None, None |
| |
|
| | |
| | def create_interface(): |
| | with gr.Blocks() as interface: |
| | gr.Markdown("# Staff Scheduling Optimizer") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | |
| | gr.Markdown("### Clinic Parameters") |
| | with gr.Group(): |
| | csv_input = gr.File(label="Upload CSV") |
| | beds_per_staff = gr.Number(label="Beds per Staff", value=3) |
| | max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160) |
| | hours_per_cycle = gr.Number(label="Hours per Cycle", value=4) |
| | rest_days_per_week = gr.Number(label="Rest Days per Week", value=2) |
| | |
| | gr.Markdown("### Time Parameters") |
| | with gr.Group(): |
| | clinic_start = gr.Dropdown( |
| | label="Clinic Start Hour", |
| | choices=am_pm_times, |
| | value="08:00 AM" |
| | ) |
| | clinic_end = gr.Dropdown( |
| | label="Clinic End Hour", |
| | choices=am_pm_times, |
| | value="08:00 PM" |
| | ) |
| | overlap_time = gr.Number(label="Overlap Time", value=0) |
| | max_start_time_change = gr.Number(label="Max Start Time Change", value=2) |
| | |
| | with gr.Column(): |
| | |
| | with gr.Tabs(): |
| | with gr.TabItem("Schedule"): |
| | schedule_output = gr.Dataframe() |
| | with gr.TabItem("Visualization"): |
| | gantt_chart = gr.Plot() |
| | with gr.TabItem("Statistics"): |
| | stats_output = gr.Markdown() |
| | |
| | optimize_btn = gr.Button("Optimize Schedule", variant="primary") |
| | |
| | |
| | optimize_btn.click( |
| | fn=nurse_scheduling_app, |
| | inputs=[ |
| | csv_input, |
| | beds_per_staff, |
| | max_hours_per_staff, |
| | hours_per_cycle, |
| | rest_days_per_week, |
| | clinic_start, |
| | clinic_end, |
| | overlap_time, |
| | max_start_time_change |
| | ], |
| | outputs=[schedule_output, gantt_chart, stats_output] |
| | ) |
| | |
| | return interface |
| |
|
| | |
| | am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] |
| |
|
| | |
| | if __name__ == "__main__": |
| | interface = create_interface() |
| | interface.launch(share=True) |