| | import gradio as gr |
| | import pandas as pd |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | import matplotlib.dates as mdates |
| | from matplotlib.colors import LinearSegmentedColormap |
| | import plotly.figure_factory as ff |
| | import plotly.express as px |
| | from io import StringIO, BytesIO |
| | from datetime import datetime, timedelta |
| | import random |
| | import math |
| | import tempfile |
| | import os |
| |
|
| | |
| |
|
| | def schedule_staff( |
| | demand_csv, |
| | clinic_start_time="07:00", |
| | cycle_lengths="4,7,6,7", |
| | beds_per_staff=3, |
| | max_hours=240, |
| | rest_days=1, |
| | overlap=30, |
| | shift_min=6, |
| | shift_max=12, |
| | staff_count=0 |
| | ): |
| | |
| | staff_count = 0 if staff_count is None else staff_count |
| | beds_per_staff = 3 if beds_per_staff is None else beds_per_staff |
| | |
| | |
| | staff_ratio = 1.0 / float(beds_per_staff) |
| | |
| | |
| | try: |
| | demand_data = pd.read_csv(StringIO(demand_csv), header=0) |
| | except: |
| | |
| | demand_data = pd.read_csv(StringIO(demand_csv), header=None) |
| | |
| | cols = [f"day"] |
| | cols.extend([f"cycle{i+1}" for i in range(len(demand_data.columns)-1)]) |
| | demand_data.columns = cols |
| | |
| | |
| | cycle_lengths = [int(x.strip()) for x in cycle_lengths.split(",")] |
| | |
| | |
| | start_time = datetime.strptime(clinic_start_time, "%H:%M") |
| | |
| | |
| | cycle_times = [] |
| | current_time = start_time |
| | for length in cycle_lengths: |
| | end_time = current_time + timedelta(hours=length) |
| | cycle_times.append((current_time, end_time)) |
| | current_time = end_time |
| | |
| | |
| | total_days = len(demand_data) |
| | |
| | |
| | min_staff_per_cycle = {} |
| | for day in range(total_days): |
| | for cycle in range(len(cycle_lengths)): |
| | cycle_col = f"cycle{cycle+1}" if f"cycle{cycle+1}" in demand_data.columns else cycle+1 |
| | demand = demand_data.iloc[day][cycle_col] |
| | if pd.isna(demand): |
| | demand = 0 |
| | min_staff_per_cycle[(day, cycle)] = math.ceil(demand * staff_ratio) |
| | |
| | |
| | if staff_count <= 0: |
| | |
| | max_demand_day = max(sum(min_staff_per_cycle.get((day, cycle), 0) for cycle in range(len(cycle_lengths))) |
| | for day in range(total_days)) |
| | initial_staff = max(max_demand_day, |
| | math.ceil(sum(min_staff_per_cycle.values()) / (shift_max * total_days / 30 * max_hours))) |
| | else: |
| | initial_staff = staff_count |
| | |
| | |
| | best_schedule = None |
| | best_staff_count = float('inf') |
| | |
| | |
| | for attempt in range(5): |
| | schedule = create_schedule( |
| | total_days, |
| | cycle_times, |
| | min_staff_per_cycle, |
| | initial_staff, |
| | max_hours, |
| | rest_days, |
| | overlap / 60.0, |
| | shift_min, |
| | shift_max, |
| | attempt |
| | ) |
| | |
| | |
| | staff_used = len(set(staff for day_schedule in schedule.values() for staff, _, _ in day_schedule)) |
| | |
| | if staff_used < best_staff_count: |
| | best_schedule = schedule |
| | best_staff_count = staff_used |
| | |
| | |
| | if staff_count > 0 and staff_count > best_staff_count: |
| | best_schedule = create_schedule( |
| | total_days, |
| | cycle_times, |
| | min_staff_per_cycle, |
| | staff_count, |
| | max_hours, |
| | rest_days, |
| | overlap / 60.0, |
| | shift_min, |
| | shift_max, |
| | fixed_staff=staff_count |
| | ) |
| | best_staff_count = staff_count |
| | |
| | |
| | csv_output = generate_csv_output(best_schedule, total_days, cycle_times) |
| | |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w+') as temp_file: |
| | csv_output.to_csv(temp_file.name, index=False) |
| | temp_file_path = temp_file.name |
| | |
| | |
| | fig = create_gantt_chart(best_schedule, total_days, cycle_times, best_staff_count) |
| | |
| | return temp_file_path, fig |
| |
|
| | def create_schedule(total_days, cycle_times, min_staff_per_cycle, initial_staff, |
| | max_hours, rest_days, overlap, shift_min, shift_max, seed=0, fixed_staff=None): |
| | random.seed(seed) |
| | |
| | |
| | schedule = {day: [] for day in range(total_days)} |
| | |
| | |
| | staff_hours = {staff: 0 for staff in range(initial_staff)} |
| | staff_last_day = {staff: -2 for staff in range(initial_staff)} |
| | staff_last_shift = {staff: None for staff in range(initial_staff)} |
| | |
| | |
| | for day in range(total_days): |
| | |
| | for cycle in range(len(cycle_times)): |
| | cycle_start, cycle_end = cycle_times[cycle] |
| | cycle_duration = (cycle_end - cycle_start).total_seconds() / 3600 |
| | |
| | |
| | staff_needed = min_staff_per_cycle.get((day, cycle), 0) |
| | |
| | |
| | if staff_needed == 0: |
| | continue |
| | |
| | |
| | available_staff = [] |
| | for staff in range(initial_staff): |
| | |
| | if staff_last_day[staff] != -2 and (day - staff_last_day[staff]) % 7 < rest_days: |
| | continue |
| | |
| | |
| | if staff_hours[staff] + cycle_duration > max_hours: |
| | continue |
| | |
| | |
| | available_staff.append(staff) |
| | |
| | |
| | |
| | |
| | def staff_priority(staff): |
| | consecutive_priority = 1 if staff_last_day[staff] == day - 1 else 0 |
| | hours_priority = -staff_hours[staff] |
| | return (consecutive_priority, hours_priority) |
| | |
| | available_staff.sort(key=staff_priority, reverse=True) |
| | |
| | |
| | assigned_staff = [] |
| | for i in range(min(staff_needed, len(available_staff))): |
| | staff = available_staff[i] |
| | |
| | |
| | shift_start = cycle_start |
| | shift_end = cycle_end |
| | |
| | |
| | if staff_last_day[staff] == day - 1 and staff_last_shift[staff] is not None: |
| | |
| | prev_start = staff_last_shift[staff] |
| | time_diff = (shift_start.hour - prev_start.hour) + (shift_start.minute - prev_start.minute) / 60 |
| | |
| | if abs(time_diff) > 1: |
| | |
| | if time_diff > 1: |
| | shift_start = datetime(shift_start.year, shift_start.month, shift_start.day, |
| | prev_start.hour + 1, prev_start.minute) |
| | else: |
| | shift_start = datetime(shift_start.year, shift_start.month, shift_start.day, |
| | prev_start.hour - 1, prev_start.minute) |
| | |
| | |
| | shift_duration = (shift_end - shift_start).total_seconds() / 3600 |
| | if shift_duration < shift_min: |
| | |
| | shift_end = shift_start + timedelta(hours=shift_min) |
| | elif shift_duration > shift_max: |
| | |
| | shift_end = shift_start + timedelta(hours=shift_max) |
| | |
| | |
| | if i > 0: |
| | |
| | shift_start = shift_start - timedelta(hours=overlap) |
| | |
| | |
| | staff_hours[staff] += (shift_end - shift_start).total_seconds() / 3600 |
| | staff_last_day[staff] = day |
| | staff_last_shift[staff] = shift_start |
| | |
| | |
| | schedule[day].append((staff, shift_start, shift_end)) |
| | assigned_staff.append(staff) |
| | |
| | |
| | if len(assigned_staff) < staff_needed and not fixed_staff: |
| | |
| | pass |
| | |
| | return schedule |
| |
|
| | def generate_csv_output(schedule, total_days, cycle_times): |
| | |
| | rows = [] |
| | |
| | for day in range(total_days): |
| | for staff, start, end in schedule.get(day, []): |
| | rows.append({ |
| | 'Day': day + 1, |
| | 'Staff': f'Staff {staff + 1}', |
| | 'Start Time': start.strftime('%H:%M'), |
| | 'End Time': end.strftime('%H:%M'), |
| | 'Duration (hours)': round((end - start).total_seconds() / 3600, 2) |
| | }) |
| | |
| | return pd.DataFrame(rows) |
| |
|
| | def create_gantt_chart(schedule, total_days, cycle_times, staff_count): |
| | |
| | df_list = [] |
| | |
| | for day in range(total_days): |
| | for staff, start, end in schedule.get(day, []): |
| | |
| | plot_start = datetime(2023, 1, 1) + timedelta(days=day, hours=start.hour, minutes=start.minute) |
| | plot_end = datetime(2023, 1, 1) + timedelta(days=day, hours=end.hour, minutes=end.minute) |
| | |
| | df_list.append({ |
| | 'Task': f'Staff {staff + 1}', |
| | 'Start': plot_start, |
| | 'Finish': plot_end, |
| | 'Day': day + 1 |
| | }) |
| | |
| | if not df_list: |
| | |
| | fig = plt.figure(figsize=(12, 8)) |
| | plt.text(0.5, 0.5, "No valid schedule found", ha='center', va='center') |
| | return fig |
| | |
| | df = pd.DataFrame(df_list) |
| | |
| | |
| | colors = px.colors.qualitative.Plotly[:staff_count] |
| | if staff_count > len(colors): |
| | |
| | colors = colors * (staff_count // len(colors) + 1) |
| | |
| | |
| | fig = ff.create_gantt(df, colors=colors, index_col='Task', |
| | show_colorbar=True, group_tasks=True) |
| | |
| | |
| | fig.update_layout( |
| | title='Staff Schedule Gantt Chart', |
| | xaxis_title='Day and Time', |
| | yaxis_title='Staff', |
| | height=600, |
| | margin=dict(l=50, r=50, t=100, b=100) |
| | ) |
| | |
| | return fig |
| |
|
| | |
| | demo = gr.Interface( |
| | fn=schedule_staff, |
| | inputs=[ |
| | gr.Textbox(label="Demand CSV Data", lines=10), |
| | gr.Textbox(label="Clinic Start Time (HH:MM)", value="07:00"), |
| | gr.Textbox(label="Cycle Lengths (comma-separated hours)", value="4,7,6,7"), |
| | gr.Number(label="Beds per Staff", value=3), |
| | gr.Number(label="Max Monthly Hours", value=240), |
| | gr.Number(label="Rest Days per Week", value=1), |
| | gr.Number(label="Shift Overlap (minutes)", value=30), |
| | gr.Number(label="Min Shift Duration (hours)", value=6), |
| | gr.Number(label="Max Shift Duration (hours)", value=12), |
| | gr.Number(label="Fixed Staff Count (0 for auto-optimize)", value=0) |
| | ], |
| | outputs=[ |
| | gr.File(label="Optimized Schedule CSV"), |
| | gr.Plot(label="Schedule Gantt Chart") |
| | ], |
| | title="Advanced Staff Scheduling App", |
| | description="Upload demand data and set constraints to generate an optimized staff schedule." |
| | ) |
| |
|
| | demo.launch() |
| |
|