| | import pandas as pd |
| | import numpy as np |
| | import pulp as pl |
| | import matplotlib.pyplot as plt |
| | import gradio as gr |
| | from itertools import product |
| | import io |
| | import base64 |
| | import tempfile |
| | import os |
| | from datetime import datetime |
| | import plotly.express as px |
| | from plotly.subplots import make_subplots |
| | import plotly.graph_objects as go |
| | import seaborn as sns |
| | from ortools.sat.python import cp_model |
| | import random |
| | from deap import base, creator, tools, algorithms |
| | import time |
| |
|
| | def am_pm(hour): |
| | """Converts 24-hour time to AM/PM format.""" |
| | period = "AM" |
| | if hour >= 12: |
| | period = "PM" |
| | if hour > 12: |
| | hour -= 12 |
| | elif hour == 0: |
| | hour = 12 |
| | return f"{int(hour):02d}:00 {period}" |
| |
|
| | def show_dataframe(csv_path): |
| | """Reads a CSV file and returns a Pandas DataFrame.""" |
| | try: |
| | df = pd.read_csv(csv_path) |
| | return df |
| | except Exception as e: |
| | return f"Error loading CSV: {e}" |
| |
|
| | def optimize_staffing(csv_file, beds_per_staff, max_hours_per_staff, overlap_time): |
| | try: |
| | |
| | data = pd.read_csv(csv_file) |
| | num_days = len(data) |
| | |
| | |
| | cycles = { |
| | 'cycle_1': range(7, 12), |
| | 'cycle_2': range(12, 17), |
| | 'cycle_3': range(17, 22), |
| | 'cycle_4': range(22, 24) + range(0, 3), |
| | } |
| | |
| | |
| | hourly_demand = {} |
| | for d in range(num_days): |
| | cycle_demands = {} |
| | for col in data.columns: |
| | if col.startswith('cycle'): |
| | |
| | beds = float(data.iloc[d][col]) if not pd.isna(data.iloc[d][col]) else 0 |
| | cycle_demands[col] = int(np.ceil(beds / beds_per_staff)) |
| | |
| | for h in range(24): |
| | for cycle, hours in cycles.items(): |
| | if h in hours: |
| | hourly_demand[(d,h)] = cycle_demands.get(cycle, 0) |
| | break |
| | else: |
| | hourly_demand[(d,h)] = 0 |
| |
|
| | |
| | if any(pd.isna(value) for value in hourly_demand.values()): |
| | raise ValueError("Hourly demand calculation resulted in NaN values.") |
| |
|
| | min_staff = max(hourly_demand.values()) + 1 |
| | |
| | |
| | model = cp_model.CpModel() |
| | |
| | |
| | x = {} |
| | working = {} |
| | start = {} |
| | |
| | for s in range(min_staff): |
| | for d in range(num_days): |
| | working[s,d] = model.NewBoolVar(f'working_{s}_{d}') |
| | start[s,d] = model.NewIntVar(0, 23, f'start_{s}_{d}') |
| | for h in range(24): |
| | x[s,d,h] = model.NewBoolVar(f'x_{s}_{d}_{h}') |
| |
|
| | |
| | for (d,h), demand in hourly_demand.items(): |
| | if demand > 0: |
| | model.Add(sum(x[s,d,h] for s in range(min_staff)) >= demand) |
| |
|
| | |
| | for s in range(min_staff): |
| | for d in range(num_days): |
| | |
| | model.Add(sum(x[s,d,h] for h in range(24)) >= 6).OnlyEnforceIf(working[s,d]) |
| | model.Add(sum(x[s,d,h] for h in range(24)) <= 12).OnlyEnforceIf(working[s,d]) |
| | model.Add(sum(x[s,d,h] for h in range(24)) == 0).OnlyEnforceIf(working[s,d].Not()) |
| |
|
| | |
| | for h in range(24): |
| | model.Add(start[s,d] == h).OnlyEnforceIf([x[s,d,h], working[s,d]]) |
| | if h > 0: |
| | model.Add(x[s,d,h-1] == 0).OnlyEnforceIf([x[s,d,h], working[s,d]]) |
| |
|
| | |
| | for s in range(min_staff): |
| | for d in range(num_days-1): |
| | model.Add(start[s,d+1] - start[s,d] >= -1).OnlyEnforceIf([working[s,d], working[s,d+1]]) |
| | model.Add(start[s,d+1] - start[s,d] <= 1).OnlyEnforceIf([working[s,d], working[s,d+1]]) |
| |
|
| | |
| | for s in range(min_staff): |
| | for w in range((num_days + 6) // 7): |
| | week_start = w * 7 |
| | week_end = min((w + 1) * 7, num_days) |
| | model.Add(sum(working[s,d] for d in range(week_start, week_end)) <= 6) |
| |
|
| | |
| | for s in range(min_staff): |
| | model.Add(sum(x[s,d,h] for d in range(num_days) for h in range(24)) <= max_hours_per_staff) |
| |
|
| | |
| | solver = cp_model.CpSolver() |
| | solver.parameters.max_time_in_seconds = 300 |
| | solver.parameters.num_search_workers = 8 |
| | |
| | status = solver.Solve(model) |
| | |
| | if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: |
| | solution = np.zeros((min_staff, num_days, 24)) |
| | for s in range(min_staff): |
| | for d in range(num_days): |
| | for h in range(24): |
| | if solver.Value(x[s,d,h]) == 1: |
| | solution[s,d,h] = 1 |
| | return process_solution(solution, min_staff, num_days) |
| | |
| | return None |
| |
|
| | except Exception as e: |
| | print(f"Optimization error: {str(e)}") |
| | return None |
| |
|
| | def convert_to_24h(time_str): |
| | """Converts AM/PM time string to 24-hour format.""" |
| | try: |
| | time_obj = datetime.strptime(time_str, "%I:00 %p") |
| | return time_obj.hour |
| | except ValueError: |
| | return None |
| |
|
| | def gradio_wrapper( |
| | csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, |
| | exact_staff_count=None, overtime_percent=100 |
| | ): |
| | try: |
| | |
| | clinic_start = convert_to_24h(clinic_start_ampm) |
| | clinic_end = convert_to_24h(clinic_end_ampm) |
| | |
| | |
| | results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( |
| | csv_file, beds_per_staff, max_hours_per_staff, overlap_time |
| | ) |
| | |
| | |
| | return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path |
| | except Exception as e: |
| | |
| | empty_staff_df = pd.DataFrame(columns=["Day"]) |
| | error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." |
| | |
| | return empty_staff_df, None, None, None, None, None |
| |
|
| | |
| | def create_gantt_chart(schedule_df, num_days, staff_count): |
| | |
| | active_staff_ids = sorted(schedule_df['staff_id'].unique()) |
| | active_staff_count = len(active_staff_ids) |
| | |
| | |
| | staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)} |
| | |
| | |
| | plt.figure(figsize=(max(30, num_days * 1.5), max(12, active_staff_count * 0.8)), dpi=200) |
| | |
| | |
| | colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count)) |
| | |
| | |
| | plt.style.use('seaborn-v0_8-whitegrid') |
| | |
| | |
| | |
| | ax = plt.gca() |
| | ax.set_facecolor('#f8f9fa') |
| | |
| | |
| | schedule_df = schedule_df.sort_values(['staff_id', 'day']) |
| | |
| | |
| | for i, staff_id in enumerate(active_staff_ids): |
| | staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] |
| | |
| | y_pos = active_staff_count - i |
| | |
| | |
| | ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold', |
| | ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray', |
| | boxstyle='round,pad=0.5', alpha=0.9)) |
| | |
| | |
| | ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5) |
| | |
| | |
| | shift_positions = [] |
| | |
| | for idx, shift in enumerate(staff_shifts.iterrows()): |
| | _, shift = shift |
| | day = shift['day'] |
| | start_hour = shift['start'] |
| | end_hour = shift['end'] |
| | duration = shift['duration'] |
| | |
| | |
| | start_ampm = am_pm(start_hour) |
| | end_ampm = am_pm(end_hour) |
| | |
| | |
| | shift_start_pos = day-1+start_hour/24 |
| | |
| | |
| | if end_hour < start_hour: |
| | |
| | rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect1: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | rect2 = ax.barh(y_pos, end_hour/24, left=day, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect2: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | shift_width = (24-start_hour)/24 |
| | if shift_width >= 0.1: |
| | label_pos = shift_start_pos + shift_width/2 |
| | |
| | |
| | y_offset = 0.35 if idx % 2 == 0 else -0.35 |
| | |
| | |
| | label = f"{start_ampm}-{end_ampm}" |
| | text = ax.text(label_pos, y_pos + y_offset, label, |
| | ha='center', va='center', fontsize=9, fontweight='bold', |
| | color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, |
| | boxstyle='round,pad=0.3', edgecolor='gray'), |
| | zorder=20) |
| | |
| | shift_positions.append(label_pos) |
| | else: |
| | |
| | shift_width = duration/24 |
| | rect = ax.barh(y_pos, shift_width, left=shift_start_pos, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | if shift_width >= 0.1: |
| | label_pos = shift_start_pos + shift_width/2 |
| | |
| | |
| | y_offset = 0.35 if idx % 2 == 0 else -0.35 |
| | |
| | |
| | label = f"{start_ampm}-{end_ampm}" |
| | text = ax.text(label_pos, y_pos + y_offset, label, |
| | ha='center', va='center', fontsize=9, fontweight='bold', |
| | color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, |
| | boxstyle='round,pad=0.3', edgecolor='gray'), |
| | zorder=20) |
| | |
| | shift_positions.append(label_pos) |
| | |
| | |
| | for day in range(1, num_days + 1): |
| | |
| | is_weekend = (day % 7 == 0) or (day % 7 == 6) |
| | |
| | if is_weekend: |
| | ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10) |
| | day_label = "Saturday" if day % 7 == 6 else "Sunday" |
| | ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000', |
| | fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round')) |
| | |
| | |
| | ax.set_xticks(np.arange(0.5, num_days, 1)) |
| | day_labels = [f"Day {d}" for d in range(1, num_days+1)] |
| | ax.set_xticklabels(day_labels, rotation=0, ha='center', fontsize=10) |
| | |
| | |
| | for day in range(1, num_days): |
| | ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5) |
| | |
| | |
| | ax.set_yticks(np.arange(1, active_staff_count+1)) |
| | ax.set_yticklabels([]) |
| | |
| | |
| | ax.set_xlim(-0.8, num_days) |
| | ax.set_ylim(0.5, active_staff_count + 0.5) |
| | |
| | |
| | for day in range(num_days): |
| | for hour in [6, 12, 18]: |
| | ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5) |
| | |
| | hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM" |
| | ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7, |
| | color='#666666', rotation=90, alpha=0.7) |
| | |
| | |
| | plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333') |
| | plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333') |
| | |
| | |
| | time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10, |
| | fontweight='bold', color='#333333') |
| | time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight'] |
| | for i, time in enumerate(time_markers): |
| | plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555') |
| | |
| | |
| | for spine in ['top', 'right', 'left']: |
| | ax.spines[spine].set_visible(False) |
| | |
| | |
| | weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12, |
| | color='#cc0000', fontweight='bold', |
| | bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round')) |
| | |
| | |
| | plt.box(False) |
| | |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: |
| | plt.tight_layout() |
| | plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white') |
| | plt.close() |
| | return f.name |
| |
|
| | |
| | am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] |
| |
|
| | |
| | css = """ |
| | .chart-container { |
| | height: 800px !important; |
| | width: 100% !important; |
| | margin: 20px 0; |
| | padding: 20px; |
| | border: 1px solid #ddd; |
| | border-radius: 8px; |
| | background: white; |
| | box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
| | } |
| | |
| | .weekly-chart-container { |
| | height: 1000px !important; |
| | width: 100% !important; |
| | margin: 20px 0; |
| | padding: 20px; |
| | border: 1px solid #ddd; |
| | border-radius: 8px; |
| | background: white; |
| | box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
| | } |
| | |
| | /* Ensure plotly charts are visible */ |
| | .js-plotly-plot { |
| | width: 100% !important; |
| | height: 100% !important; |
| | } |
| | |
| | /* Improve visibility of chart titles */ |
| | .gtitle { |
| | font-weight: bold !important; |
| | font-size: 20px !important; |
| | } |
| | """ |
| |
|
| | with gr.Blocks(title="Staff Scheduling Optimizer", css=css) as iface: |
| | |
| | gr.Markdown("# Staff Scheduling Optimizer") |
| | gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.") |
| | |
| | with gr.Row(): |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Input Parameters") |
| | |
| | |
| | csv_input = gr.File(label="Upload CSV") |
| | beds_per_staff = gr.Number(label="Beds per Staff", value=3) |
| | max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160) |
| | hours_per_cycle = gr.Number(label="Hours per Cycle", value=4) |
| | rest_days_per_week = gr.Number(label="Rest Days per Week", value=2) |
| | clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="08:00 AM") |
| | clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="08:00 PM") |
| | overlap_time = gr.Number(label="Overlap Time", value=0) |
| | max_start_time_change = gr.Number(label="Max Start Time Change", value=2) |
| | exact_staff_count = gr.Number(label="Exact Staff Count (optional)", value=None) |
| | overtime_percent = gr.Slider(label="Overtime Allowed (%)", minimum=0, maximum=100, value=100, step=10) |
| | |
| | optimize_btn = gr.Button("Optimize Schedule", variant="primary", size="lg") |
| | |
| | |
| | with gr.Column(scale=2): |
| | gr.Markdown("### Results") |
| | |
| | |
| | with gr.Tabs(): |
| | with gr.TabItem("Detailed Schedule"): |
| | with gr.Row(): |
| | csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule") |
| | |
| | with gr.Row(): |
| | schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True) |
| | |
| | with gr.TabItem("Gantt Chart"): |
| | gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart") |
| | |
| | with gr.TabItem("Staff Coverage by Cycle"): |
| | with gr.Row(): |
| | staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table") |
| | |
| | with gr.Row(): |
| | staff_download_file = gr.File(label="Download Coverage Table", visible=True) |
| | |
| | with gr.TabItem("Constraints and Analytics"): |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Applied Constraints") |
| | constraints_text = gr.TextArea( |
| | label="", |
| | interactive=False, |
| | show_label=False |
| | ) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Monthly Distribution") |
| | monthly_chart = gr.HTML( |
| | label="Monthly Hours Distribution", |
| | show_label=False, |
| | elem_classes="chart-container" |
| | ) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Weekly Distribution") |
| | weekly_charts = gr.HTML( |
| | label="Weekly Hours Distribution", |
| | show_label=False, |
| | elem_classes="weekly-chart-container" |
| | ) |
| | |
| | with gr.TabItem("Staff Overlap"): |
| | with gr.Row(): |
| | overlap_chart = gr.HTML( |
| | label="Staff Overlap Visualization", |
| | show_label=False |
| | ) |
| | with gr.Row(): |
| | gr.Markdown(""" |
| | This heatmap shows the number of staff members working simultaneously throughout each day. |
| | - Darker colors indicate more staff overlap |
| | - The x-axis shows time of day in 30-minute intervals |
| | - The y-axis shows each day of the schedule |
| | """) |
| | |
| | with gr.TabItem("Staff Absence Handler"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Handle Staff Absence") |
| | absent_staff = gr.Number(label="Staff ID to be absent", precision=0) |
| | absence_start = gr.Number(label="Start Day", precision=0) |
| | absence_end = gr.Number(label="End Day", precision=0) |
| | handle_absence_btn = gr.Button("Redistribute Shifts", variant="primary") |
| | |
| | with gr.Column(): |
| | absence_result = gr.TextArea(label="Redistribution Results", interactive=False) |
| | updated_schedule = gr.DataFrame(label="Updated Schedule") |
| | absence_gantt_chart = gr.Image(label="Absence Schedule Visualization", elem_id="absence_gantt_chart") |
| | |
| | |
| | def create_download_link(df, filename="data.csv"): |
| | """Create a CSV download link for a dataframe""" |
| | if df is None or df.empty: |
| | return None |
| | |
| | csv_data = df.to_csv(index=False) |
| | with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: |
| | f.write(csv_data) |
| | return f.name |
| |
|
| | |
| | def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, |
| | overlap_time, max_start_time_change, exact_staff_count, overtime_percent): |
| | try: |
| | print(f"Starting optimization with parameters: beds={beds_per_staff}, hours={max_hours_per_staff}") |
| | |
| | |
| | clinic_start = convert_to_24h(clinic_start_ampm) |
| | clinic_end = convert_to_24h(clinic_end_ampm) |
| | |
| | |
| | result = optimize_staffing(csv_file, beds_per_staff, max_hours_per_staff, overlap_time) |
| | |
| | if result is None: |
| | return ( |
| | pd.DataFrame({"Error": ["No feasible solution found"]}), |
| | None, |
| | None, |
| | None, |
| | "Optimization failed to find a valid schedule", |
| | "<div>No solution found</div>", |
| | "<div>No solution found</div>", |
| | "<div>No solution found</div>" |
| | ) |
| | |
| | staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv, staff_csv = result |
| | |
| | |
| | constraints_info = get_constraints_summary(max_hours_per_staff, rest_days_per_week, overtime_percent) |
| | monthly_html = create_monthly_distribution_chart(schedule_df) |
| | weekly_html = create_weekly_distribution_charts(schedule_df) |
| | overlap_html = create_overlap_visualization(schedule_df) |
| | |
| | return ( |
| | staff_assignment_df, |
| | gantt_path, |
| | schedule_df, |
| | schedule_csv, |
| | constraints_info, |
| | monthly_html, |
| | weekly_html, |
| | overlap_html |
| | ) |
| | |
| | except Exception as e: |
| | print(f"Error: {str(e)}") |
| | return ( |
| | pd.DataFrame({"Error": [str(e)]}), |
| | None, |
| | None, |
| | None, |
| | "Error occurred during optimization", |
| | "<div>Error</div>", |
| | "<div>Error</div>", |
| | "<div>Error</div>" |
| | ) |
| | |
| | def get_constraints_summary(max_hours, rest_days, overtime_percent): |
| | """Generate a summary of all applied constraints from actual parameters""" |
| | constraints = [ |
| | "Applied Scheduling Constraints:", |
| | "----------------------------", |
| | f"1. Maximum Hours per Month: {max_hours} hours", |
| | f"2. Required Rest Days per Week: {rest_days} days", |
| | f"3. Maximum Weekly Hours: 60 hours per staff member", |
| | "4. Minimum Rest Period: 11 hours between shifts", |
| | "5. Maximum Consecutive Days: 6 working days", |
| | f"6. Overtime Allowance: {overtime_percent}% of standard hours", |
| | "7. Coverage Requirements:", |
| | " - All cycles must be fully staffed", |
| | " - No understaffing allowed", |
| | " - Staff assigned based on required beds/staff ratio", |
| | "8. Shift Constraints:", |
| | " - Available shift durations: 5, 10 hours", |
| | " - Shifts must align with cycle times", |
| | "9. Staff Scheduling Rules:", |
| | " - Equal distribution of workload when possible", |
| | " - Consistent shift patterns preferred", |
| | " - Weekend rotations distributed fairly" |
| | ] |
| | return "\n".join(constraints) |
| | |
| | def create_monthly_distribution_chart(schedule_df): |
| | """Create Seaborn pie chart for monthly hours distribution""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | staff_hours = schedule_df.groupby('staff_id')['duration'].sum() |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(8, 8)) |
| | sns.set_palette("pastel") |
| | ax.pie(staff_hours, labels=staff_hours.index, autopct='%1.1f%%', startangle=90) |
| | ax.axis('equal') |
| | plt.title("Monthly Hours Distribution") |
| | |
| | |
| | img = io.BytesIO() |
| | plt.savefig(img, format='png', bbox_inches='tight') |
| | plt.close(fig) |
| | img.seek(0) |
| | |
| | |
| | img_base64 = base64.b64encode(img.read()).decode('utf-8') |
| | img_html = f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%; max-height:600px;">' |
| | |
| | return img_html |
| | except Exception as e: |
| | print(f"Error in monthly chart: {e}") |
| | return f"<div>Error creating monthly chart: {str(e)}</div>" |
| | |
| | def create_weekly_distribution_charts(schedule_df): |
| | """Create Plotly pie charts for weekly hours distribution""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | schedule_df['week'] = schedule_df['day'] // 7 |
| | weekly_hours = schedule_df.groupby(['week', 'staff_id'])['duration'].sum().reset_index() |
| | |
| | |
| | weekly_hours['staff_label'] = weekly_hours.apply( |
| | lambda x: f"Staff {x['staff_id']} ({x['duration']:.1f}hrs)", |
| | axis=1 |
| | ) |
| | |
| | |
| | weeks = sorted(weekly_hours['week'].unique()) |
| | |
| | |
| | colors = px.colors.qualitative.Set3 |
| | |
| | |
| | fig = make_subplots( |
| | rows=len(weeks), |
| | cols=1, |
| | subplot_titles=[f'Week {week}' for week in weeks], |
| | specs=[[{'type': 'domain'}] for week in weeks] |
| | ) |
| | |
| | |
| | for i, week in enumerate(weeks, start=1): |
| | week_data = weekly_hours[weekly_hours['week'] == week] |
| | |
| | fig.add_trace( |
| | go.Pie( |
| | values=week_data['duration'], |
| | labels=week_data['staff_label'], |
| | name=f'Week {week}', |
| | showlegend=(i == 1), |
| | marker_colors=colors, |
| | textposition='inside', |
| | textinfo='percent+label', |
| | hovertemplate=( |
| | "Staff: %{label}<br>" |
| | "Hours: %{value:.1f}<br>" |
| | "Percentage: %{percent:.1f}%" |
| | "<extra></extra>" |
| | ) |
| | ), |
| | row=i, |
| | col=1 |
| | ) |
| | |
| | fig.update_layout( |
| | height=300 * len(weeks), |
| | width=800, |
| | title_text="Weekly Hours Distribution", |
| | title_x=0.5, |
| | title_font_size=20, |
| | margin=dict(t=50, l=50, r=50, b=50), |
| | showlegend=True |
| | ) |
| | |
| | return fig.to_html(include_plotlyjs='cdn', full_html=False) |
| | except Exception as e: |
| | print(f"Error in weekly charts: {e}") |
| | return f"<div>Error creating weekly charts: {str(e)}</div>" |
| |
|
| | |
| | def create_overlap_visualization(schedule_df): |
| | """Create Seaborn heatmap for staff overlap""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | intervals = 48 |
| | days = sorted(schedule_df['day'].unique()) |
| | |
| | |
| | overlap_data = np.zeros((len(days), intervals)) |
| | |
| | |
| | for day_idx, day in enumerate(days): |
| | day_shifts = schedule_df[schedule_df['day'] == day] |
| | |
| | for i in range(intervals): |
| | time = i * 0.5 |
| | staff_working = 0 |
| | |
| | for _, shift in day_shifts.iterrows(): |
| | start = shift['start'] |
| | end = shift['end'] |
| | |
| | if end < start: |
| | if time >= start or time < end: |
| | staff_working += 1 |
| | else: |
| | if start <= time < end: |
| | staff_working += 1 |
| | |
| | overlap_data[day_idx, i] = staff_working |
| | |
| | |
| | time_labels = [f"{int(i//2):02d}:{int((i%2)*30):02d}" for i in range(intervals)] |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(12, 8)) |
| | sns.heatmap(overlap_data, cmap="viridis", ax=ax, cbar_kws={'label': 'Staff Count'}) |
| | |
| | |
| | ax.set_xticks(np.arange(len(time_labels[::4]))) |
| | ax.set_xticklabels(time_labels[::4], rotation=45, ha="right") |
| | ax.set_yticks(np.arange(len(days))) |
| | ax.set_yticklabels(days) |
| | |
| | |
| | ax.set_title("Staff Overlap Throughout the Day") |
| | |
| | |
| | plt.tight_layout() |
| | |
| | |
| | img = io.BytesIO() |
| | plt.savefig(img, format='png', bbox_inches='tight') |
| | plt.close(fig) |
| | img.seek(0) |
| | |
| | |
| | img_base64 = base64.b64encode(img.read()).decode('utf-8') |
| | img_html = f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%; max-height:800px;">' |
| | |
| | return img_html |
| | except Exception as e: |
| | print(f"Error in overlap visualization: {e}") |
| | return f"<div>Error creating overlap visualization: {str(e)}</div>" |
| |
|
| | |
| | optimize_btn.click( |
| | fn=optimize_and_display, |
| | inputs=[ |
| | csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, |
| | overlap_time, max_start_time_change, exact_staff_count, overtime_percent |
| | ], |
| | outputs=[ |
| | staff_assignment_table, |
| | gantt_chart, |
| | csv_schedule, |
| | schedule_download_file, |
| | constraints_text, |
| | monthly_chart, |
| | weekly_charts, |
| | overlap_chart |
| | ] |
| | ) |
| |
|
| | |
| | def handle_absence_click(staff_id, start_day, end_day, current_schedule, max_hours_per_staff, overtime_percent): |
| | if current_schedule is None or current_schedule.empty: |
| | return "No current schedule loaded.", None, None |
| | |
| | absence_dates = list(range(int(start_day), int(end_day) + 1)) |
| | summary, absence_schedule, absence_gantt_path = handle_staff_absence( |
| | current_schedule, |
| | int(staff_id), |
| | absence_dates, |
| | max_hours_per_staff, |
| | overtime_percent |
| | ) |
| | |
| | return summary, absence_schedule, absence_gantt_path |
| |
|
| | |
| | handle_absence_btn.click( |
| | fn=handle_absence_click, |
| | inputs=[ |
| | absent_staff, |
| | absence_start, |
| | absence_end, |
| | csv_schedule, |
| | max_hours_per_staff, |
| | overtime_percent |
| | ], |
| | outputs=[ |
| | absence_result, |
| | updated_schedule, |
| | absence_gantt_chart |
| | ] |
| | ) |
| |
|
| | |
| | iface.launch(share=True) |
| |
|
| | def create_interface(): |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# NEF Scheduling System") |
| | |
| | with gr.Tabs() as tabs: |
| | with gr.Tab("Schedule Input"): |
| | |
| | with gr.Row(): |
| | csv_input = gr.File(label="Upload Schedule Data (CSV)") |
| | schedule_preview = gr.DataFrame(label="Schedule Preview") |
| | |
| | with gr.Tab("Schedule Output"): |
| | |
| | with gr.Row(): |
| | schedule_output = gr.DataFrame(label="Generated Schedule") |
| | download_btn = gr.Button("Download Schedule") |
| | |
| | with gr.Tab("Constraints and Analytics"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Applied Constraints") |
| | constraints_text = gr.TextArea(label="", interactive=False) |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Monthly Distribution") |
| | monthly_chart = gr.HTML(label="Monthly Hours Distribution") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Weekly Distribution") |
| | weekly_charts = gr.HTML(label="Weekly Hours Distribution") |
| |
|
| | with gr.TabItem("Staff Absence Handler"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Handle Staff Absence") |
| | absent_staff = gr.Number(label="Staff ID to be absent", precision=0) |
| | absence_start = gr.Number(label="Start Day", precision=0) |
| | absence_end = gr.Number(label="End Day", precision=0) |
| | handle_absence_btn = gr.Button("Redistribute Shifts", variant="primary") |
| | |
| | with gr.Column(): |
| | absence_result = gr.TextArea(label="Redistribution Results", interactive=False) |
| | updated_schedule = gr.DataFrame(label="Updated Schedule") |
| | absence_gantt_chart = gr.Image(label="Absence Schedule Visualization", elem_id="absence_gantt_chart") |
| |
|
| | return demo |
| |
|
| | def handle_staff_absence(schedule_df, absent_staff_id, absence_dates, max_hours_per_staff, overtime_percent): |
| | """ |
| | Redistribute shifts of absent staff member to others, prioritizing staff with lowest monthly hours |
| | """ |
| | try: |
| | |
| | new_schedule = schedule_df.copy() |
| | |
| | |
| | absent_shifts = new_schedule[ |
| | (new_schedule['staff_id'] == absent_staff_id) & |
| | (new_schedule['day'].isin(absence_dates)) |
| | ] |
| | |
| | if absent_shifts.empty: |
| | return "No shifts found for the specified staff member on given dates.", None, None |
| | |
| | |
| | available_staff = sorted(list(set(new_schedule['staff_id']) - {absent_staff_id})) |
| | |
| | |
| | current_hours = new_schedule.groupby('staff_id')['duration'].sum() |
| | |
| | |
| | staff_hours_sorted = current_hours.reindex(available_staff).sort_values() |
| | available_staff = staff_hours_sorted.index.tolist() |
| | |
| | |
| | max_allowed_hours = max_hours_per_staff * (1 + overtime_percent/100) |
| | available_hours = { |
| | staff_id: max_allowed_hours - current_hours.get(staff_id, 0) |
| | for staff_id in available_staff |
| | } |
| | |
| | results = [] |
| | unassigned_shifts = [] |
| | |
| | |
| | for _, shift in absent_shifts.iterrows(): |
| | |
| | eligible_staff = [] |
| | eligible_staff_hours = {} |
| | |
| | for staff_id in available_staff: |
| | |
| | if available_hours[staff_id] >= shift['duration']: |
| | |
| | staff_shifts_that_day = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day']) |
| | ] |
| | |
| | if staff_shifts_that_day.empty: |
| | |
| | day_before = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day'] - 1) |
| | ] |
| | |
| | day_after = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day'] + 1) |
| | ] |
| | |
| | can_work = True |
| | if not day_before.empty: |
| | end_time_before = day_before.iloc[0]['end'] |
| | if (shift['start'] + 24 - end_time_before) < 11: |
| | can_work = False |
| | |
| | if not day_after.empty and can_work: |
| | start_time_after = day_after.iloc[0]['start'] |
| | if (starttime_after + 24 - shift['end']) < 11: |
| | can_work = False |
| | |
| | if can_work: |
| | eligible_staff.append(staff_id) |
| | eligible_staff_hours[staff_id] = current_hours.get(staff_id, 0) |
| | |
| | if eligible_staff: |
| | |
| | sorted_eligible = sorted(eligible_staff, key=lambda x: eligible_staff_hours[x]) |
| | best_staff = sorted_eligible[0] |
| | |
| | |
| | new_schedule.loc[shift.name, 'staff_id'] = best_staff |
| | |
| | |
| | available_hours[best_staff] -= shift['duration'] |
| | current_hours[best_staff] = current_hours.get(best_staff, 0) + shift['duration'] |
| | |
| | results.append( |
| | f"Shift on Day {shift['day']} ({shift['duration']} hours) " |
| | f"reassigned to Staff {best_staff} (current hours: {current_hours[best_staff]:.1f})" |
| | ) |
| | else: |
| | unassigned_shifts.append( |
| | f"Could not reassign shift on Day {shift['day']} ({shift['duration']} hours)" |
| | ) |
| | |
| | |
| | summary = "\n".join([ |
| | "Shift Redistribution Summary:", |
| | "----------------------------", |
| | f"Staff {absent_staff_id} absent for {len(absence_dates)} days", |
| | f"Successfully reassigned: {len(results)} shifts", |
| | f"Failed to reassign: {len(unassigned_shifts)} shifts", |
| | "\nCurrent Hours Distribution:", |
| | "-------------------------" |
| | ] + [ |
| | f"Staff {s}: {current_hours.get(s, 0):.1f} hours (of max {max_allowed_hours:.1f})" |
| | for s in sorted(available_staff) |
| | ] + [ |
| | "\nReassignment Details:", |
| | *results, |
| | "\nUnassigned Shifts:", |
| | *unassigned_shifts |
| | ]) |
| | |
| | |
| | absence_schedule = new_schedule[new_schedule['day'].isin(absence_dates)].copy() |
| | |
| | |
| | absence_gantt_path = create_gantt_chart(absence_schedule, len(absence_dates), len(set(absence_schedule['staff_id']))) |
| | |
| | if unassigned_shifts: |
| | return summary, None, None |
| | else: |
| | return summary, absence_schedule, absence_gantt_path |
| | |
| | except Exception as e: |
| | return f"Error redistributing shifts: {str(e)}", None, None |
| |
|
| | class FastScheduler: |
| | def __init__(self, num_staff, num_days, possible_shifts, staff_requirements, constraints): |
| | self.num_staff = num_staff |
| | self.num_days = num_days |
| | self.possible_shifts = possible_shifts |
| | self.staff_requirements = staff_requirements |
| | self.constraints = constraints |
| | self.best_schedule = None |
| | self.best_score = float('inf') |
| | |
| | self.shift_lookup = {shift['id']: shift for shift in possible_shifts} |
| | self.cycle_shifts = self._precompute_cycle_shifts() |
| | |
| | self.staff_sequences = {} |
| | self.staff_hours = {} |
| | self.max_monthly_hours = constraints['max_hours_per_staff'] |
| |
|
| | def _precompute_cycle_shifts(self): |
| | """Pre-compute which shifts can cover each cycle""" |
| | cycle_shifts = {} |
| | for cycle in self.staff_requirements[0].keys(): |
| | cycle_shifts[cycle] = [shift for shift in self.possible_shifts if cycle in shift['cycles_covered']] |
| | return cycle_shifts |
| |
|
| | def optimize(self, time_limit=300): |
| | """Main optimization method""" |
| | start_time = time.time() |
| | schedule = [] |
| | |
| | |
| | for day in range(1, self.num_days + 1): |
| | |
| | day_requirements = self.staff_requirements[day-1] |
| | |
| | |
| | for cycle, staff_needed in day_requirements.items(): |
| | staff_assigned = 0 |
| | |
| | |
| | for staff_id in range(1, self.num_staff + 1): |
| | |
| | if staff_assigned >= staff_needed: |
| | break |
| | |
| | |
| | if time.time() - start_time > time_limit: |
| | return None |
| | |
| | |
| | shift = self._find_optimal_shift(staff_id, day, cycle, self.staff_hours) |
| | if shift: |
| | schedule.append(shift) |
| | staff_assigned += 1 |
| | |
| | |
| | score = self._evaluate_schedule(schedule) |
| | if score == float('inf'): |
| | return None |
| | |
| | |
| | final_score = self._evaluate_schedule(schedule) |
| | if final_score == float('inf'): |
| | return None |
| | |
| | return schedule |
| |
|
| | def _find_optimal_shift(self, staff_id, day, cycle, staff_hours): |
| | """Optimized shift finding with early exits and pre-computed lookups""" |
| | |
| | staff_info = self.staff_sequences.get(staff_id) |
| | current_hours = self.staff_hours.get(staff_id, 0) |
| |
|
| | |
| | if current_hours >= self.max_monthly_hours: |
| | return None |
| |
|
| | |
| | valid_shifts = self.cycle_shifts.get(cycle, []) |
| | if not valid_shifts: |
| | return None |
| |
|
| | |
| | if staff_info and staff_info.get('consecutive_days', 0) >= 6 and day - staff_info['last_day'] == 1: |
| | return None |
| |
|
| | |
| | if staff_info and day - staff_info['last_day'] == 1: |
| | required_start = staff_info['last_time'] |
| | valid_shifts = [s for s in valid_shifts if s['start'] == required_start] |
| | if not valid_shifts: |
| | return None |
| |
|
| | |
| | valid_shifts = [s for s in valid_shifts if current_hours + s['duration'] <= self.max_monthly_hours] |
| | if not valid_shifts: |
| | return None |
| |
|
| | |
| | if any(s['staff_id'] == staff_id and s['day'] == day for s in self.best_schedule or []): |
| | return None |
| |
|
| | |
| | shift = valid_shifts[0] |
| | assigned_shift = { |
| | 'staff_id': staff_id, |
| | 'day': day, |
| | 'shift_id': shift['id'], |
| | 'start': shift['start'], |
| | 'end': shift['end'], |
| | 'duration': shift['duration'], |
| | 'cycles_covered': list(shift['cycles_covered']) |
| | } |
| |
|
| | |
| | consecutive_days = 1 if not staff_info else ( |
| | staff_info['consecutive_days'] + 1 if day - staff_info['last_day'] == 1 else 1 |
| | ) |
| | |
| | self.staff_sequences[staff_id] = { |
| | 'last_day': day, |
| | 'last_time': shift['start'], |
| | 'consecutive_days': consecutive_days |
| | } |
| | self.staff_hours[staff_id] = current_hours + shift['duration'] |
| |
|
| | return assigned_shift |
| |
|
| | def _evaluate_schedule(self, schedule): |
| | """Optimized schedule evaluation with early exits""" |
| | if not schedule: |
| | return float('inf') |
| |
|
| | |
| | staff_shifts = {} |
| | for shift in schedule: |
| | staff_id = shift['staff_id'] |
| | if staff_id not in staff_shifts: |
| | staff_shifts[staff_id] = [] |
| | staff_shifts[staff_id].append(shift) |
| | |
| | |
| | if self.staff_hours.get(staff_id, 0) > self.max_monthly_hours: |
| | return float('inf') |
| |
|
| | |
| | for shifts in staff_shifts.values(): |
| | shifts.sort(key=lambda x: x['day']) |
| | for i in range(1, len(shifts)): |
| | if (shifts[i]['day'] - shifts[i-1]['day'] == 1 and |
| | shifts[i]['start'] != shifts[i-1]['start']): |
| | return float('inf') |
| |
|
| | |
| | coverage = self._check_coverage_requirements(schedule) |
| | if coverage > 0: |
| | return float('inf') |
| |
|
| | return 0 |
| |
|
| | def _check_coverage_requirements(self, schedule): |
| | """Optimized coverage check using pre-computed data""" |
| | day_cycle_coverage = {} |
| | |
| | |
| | for shift in schedule: |
| | day = shift['day'] |
| | if day not in day_cycle_coverage: |
| | day_cycle_coverage[day] = {cycle: 0 for cycle in self.staff_requirements[0].keys()} |
| | |
| | for cycle in shift['cycles_covered']: |
| | day_cycle_coverage[day][cycle] += 1 |
| |
|
| | |
| | violations = 0 |
| | for day in range(1, self.num_days + 1): |
| | if day not in day_cycle_coverage: |
| | return float('inf') |
| | |
| | day_coverage = day_cycle_coverage[day] |
| | required = self.staff_requirements[day-1] |
| | |
| | for cycle, needed in required.items(): |
| | if day_coverage[cycle] < needed: |
| | violations += needed - day_coverage[cycle] |
| | if violations > 0: |
| | return violations |
| |
|
| | return violations |
| |
|
| | def reset(self): |
| | """Reset the scheduler state""" |
| | self.best_schedule = None |
| | self.best_score = float('inf') |
| | self.staff_sequences = {} |
| | self.staff_hours = {} |
| |
|
| | def process_solution(solution, min_staff, num_days): |
| | try: |
| | |
| | schedule = [] |
| | |
| | for s in range(min_staff): |
| | for d in range(num_days): |
| | |
| | working_hours = [] |
| | for h in range(24): |
| | if solution[s,d,h] == 1: |
| | working_hours.append(h) |
| | |
| | if working_hours: |
| | |
| | shift_start = working_hours[0] |
| | shift_end = working_hours[-1] + 1 |
| | |
| | schedule.append({ |
| | 'staff_id': s + 1, |
| | 'day': d + 1, |
| | 'start': shift_start, |
| | 'end': shift_end, |
| | 'duration': shift_end - shift_start |
| | }) |
| | |
| | if schedule: |
| | schedule_df = pd.DataFrame(schedule) |
| | |
| | |
| | staff_assignment = {} |
| | for s in range(min_staff): |
| | staff_assignment[f'Staff {s+1}'] = [] |
| | for d in range(num_days): |
| | hours = sum(solution[s,d,h] for h in range(24)) |
| | staff_assignment_df = pd.DataFrame(staff_assignment) |
| | staff_assignment_df.index = [f'Day {d+1}' for d in range(num_days)] |
| | |
| | |
| | gantt_path = create_gantt_chart(schedule_df, num_days, min_staff) |
| | |
| | |
| | schedule_csv = schedule_df.to_csv(index=False) |
| | staff_csv = staff_assignment_df.to_csv() |
| | |
| | return ( |
| | staff_assignment_df, |
| | gantt_path, |
| | schedule_df, |
| | None, |
| | schedule_csv, |
| | staff_csv |
| | ) |
| | else: |
| | raise ValueError("No valid schedule found in solution") |
| | |
| | except Exception as e: |
| | print(f"Error processing solution: {str(e)}") |
| | return None |