| | import pandas as pd |
| | import numpy as np |
| | import pulp as pl |
| | import matplotlib.pyplot as plt |
| | import gradio as gr |
| | from itertools import product |
| | import io |
| | import base64 |
| | import tempfile |
| | import os |
| | from datetime import datetime |
| | import plotly.express as px |
| | from plotly.subplots import make_subplots |
| | import plotly.graph_objects as go |
| | import seaborn as sns |
| | from ortools.sat.python import cp_model |
| | import random |
| | from deap import base, creator, tools, algorithms |
| | import time |
| |
|
| | def am_pm(hour): |
| | """Converts 24-hour time to AM/PM format.""" |
| | period = "AM" |
| | if hour >= 12: |
| | period = "PM" |
| | if hour > 12: |
| | hour -= 12 |
| | elif hour == 0: |
| | hour = 12 |
| | return f"{int(hour):02d}:00 {period}" |
| |
|
| | def show_dataframe(csv_path): |
| | """Reads a CSV file and returns a Pandas DataFrame.""" |
| | try: |
| | df = pd.read_csv(csv_path) |
| | return df |
| | except Exception as e: |
| | return f"Error loading CSV: {e}" |
| |
|
| | def optimize_staffing( |
| | csv_file, |
| | beds_per_staff, |
| | max_hours_per_staff, |
| | hours_per_cycle, |
| | rest_days_per_week, |
| | clinic_start, |
| | clinic_end, |
| | overlap_time, |
| | max_start_time_change, |
| | exact_staff_count=None, |
| | overtime_percent=100 |
| | ): |
| | |
| | try: |
| | if isinstance(csv_file, str): |
| | |
| | data = pd.read_csv(csv_file) |
| | elif hasattr(csv_file, 'name'): |
| | |
| | data = pd.read_csv(csv_file.name) |
| | elif csv_file is None: |
| | |
| | days = range(1, 21) |
| | data = pd.DataFrame({'day': days}) |
| | |
| | for cycle in range(1, 5): |
| | data[f'cycle{cycle}'] = 3 |
| | else: |
| | |
| | data = pd.read_csv(io.StringIO(csv_file.decode('utf-8'))) |
| | except Exception as e: |
| | print(f"Error loading CSV file: {e}") |
| | |
| | days = range(1, 21) |
| | data = pd.DataFrame({'day': days}) |
| | |
| | for cycle in range(1, 5): |
| | data[f'cycle{cycle}'] = 3 |
| | print("Created default schedule with 20 days and 4 cycles per day") |
| | |
| | |
| | if data.columns[0] not in ['day', 'Day', 'DAY']: |
| | data = data.rename(columns={data.columns[0]: 'day'}) |
| | |
| | |
| | for col in data.columns: |
| | if col.startswith('cycle'): |
| | data[col] = data[col].fillna(0) |
| | |
| | |
| | if clinic_end < clinic_start: |
| | clinic_hours = 24 - clinic_start + clinic_end |
| | else: |
| | clinic_hours = clinic_end - clinic_start |
| | |
| | |
| | num_days = len(data) |
| | |
| | |
| | BEDS_PER_STAFF = float(beds_per_staff) |
| | STANDARD_PERIOD_DAYS = 30 |
| | |
| | |
| | BASE_MAX_HOURS = float(max_hours_per_staff) |
| | MAX_HOURS_PER_STAFF = BASE_MAX_HOURS * (num_days / STANDARD_PERIOD_DAYS) |
| | |
| | |
| | original_results = f"Input max hours per staff (28-day period): {BASE_MAX_HOURS}\n" |
| | original_results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n\n" |
| | |
| | HOURS_PER_CYCLE = float(hours_per_cycle) |
| | REST_DAYS_PER_WEEK = int(rest_days_per_week) |
| | SHIFT_TYPES = [5, 10] |
| | OVERLAP_TIME = float(overlap_time) |
| | CLINIC_START = int(clinic_start) |
| | CLINIC_END = int(clinic_end) |
| | CLINIC_HOURS = clinic_hours |
| | MAX_START_TIME_CHANGE = int(max_start_time_change) |
| | OVERTIME_ALLOWED = 1 + (overtime_percent / 100) |
| | |
| | |
| | for col in data.columns: |
| | if col.startswith('cycle') and not col.endswith('_staff'): |
| | data[f'{col}_staff'] = np.ceil(data[col] / BEDS_PER_STAFF) |
| | |
| | |
| | cycle_cols = [col for col in data.columns if col.startswith('cycle') and not col.endswith('_staff')] |
| | num_cycles = len(cycle_cols) |
| | |
| | |
| | cycle_times = {} |
| | for i, cycle in enumerate(cycle_cols): |
| | cycle_start = (CLINIC_START + i * HOURS_PER_CYCLE) % 24 |
| | cycle_end = (CLINIC_START + (i + 1) * HOURS_PER_CYCLE) % 24 |
| | cycle_times[cycle] = (cycle_start, cycle_end) |
| | |
| | |
| | max_staff_needed = max([data[f'{cycle}_staff'].max() for cycle in cycle_cols]) |
| | |
| | |
| | shift_start_times = [] |
| | for i in range(num_cycles): |
| | cycle_start = (CLINIC_START + i * HOURS_PER_CYCLE) % 24 |
| | shift_start_times.append(cycle_start) |
| | |
| | |
| | possible_shifts = [] |
| | for duration in SHIFT_TYPES: |
| | for start_time in shift_start_times: |
| | end_time = (start_time + duration) % 24 |
| | |
| | |
| | shift = { |
| | 'id': f"{int(duration)}hr_{int(start_time):02d}", |
| | 'start': start_time, |
| | 'end': end_time, |
| | 'duration': duration, |
| | 'cycles_covered': set() |
| | } |
| | |
| | |
| | for cycle, (cycle_start, cycle_end) in cycle_times.items(): |
| | |
| | if cycle_end < cycle_start: |
| | if start_time >= cycle_start or end_time <= cycle_end or (start_time < end_time and end_time > cycle_start): |
| | shift['cycles_covered'].add(cycle) |
| | else: |
| | shift_end = end_time if end_time > start_time else end_time + 24 |
| | cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24 |
| | |
| | |
| | if not (shift_end <= cycle_start or start_time >= cycle_end_adj): |
| | shift['cycles_covered'].add(cycle) |
| | |
| | if shift['cycles_covered']: |
| | possible_shifts.append(shift) |
| | |
| | |
| | total_staff_hours = 0 |
| | for _, row in data.iterrows(): |
| | for cycle in cycle_cols: |
| | total_staff_hours += row[f'{cycle}_staff'] * HOURS_PER_CYCLE |
| | |
| | |
| | theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) |
| | |
| | |
| | min_staff_estimate = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK))) |
| | |
| | |
| | if exact_staff_count is not None and exact_staff_count > 0: |
| | |
| | estimated_staff = exact_staff_count |
| | num_staff_to_create = exact_staff_count |
| | else: |
| | |
| | estimated_staff = max(min_staff_estimate, max_staff_needed + 1) |
| | num_staff_to_create = int(estimated_staff) |
| | |
| | def optimize_schedule(num_staff, time_limit=600): |
| | try: |
| | |
| | model = pl.LpProblem("Staff_Scheduling", pl.LpMinimize) |
| | |
| | |
| | x = pl.LpVariable.dicts("shift", |
| | [(s, d, shift['id']) for s in range(1, num_staff+1) |
| | for d in range(1, num_days+1) |
| | for shift in possible_shifts], |
| | cat='Binary') |
| | |
| | |
| | timing_violations = pl.LpVariable.dicts("timing_violation", |
| | [(s, d) for s in range(1, num_staff+1) |
| | for d in range(2, num_days+1)], |
| | lowBound=0) |
| | |
| | |
| | model += pl.lpSum(1000000 * timing_violations[s,d] for s in range(1, num_staff+1) |
| | for d in range(2, num_days+1)) |
| | |
| | |
| | for s in range(1, num_staff+1): |
| | for d in range(2, num_days+1): |
| | |
| | prev_day_shifts = pl.lpSum(x[(s, d-1, shift['id'])] for shift in possible_shifts) |
| | current_day_shifts = pl.lpSum(x[(s, d, shift['id'])] for shift in possible_shifts) |
| | |
| | |
| | model += prev_day_shifts + current_day_shifts <= 2 + timing_violations[s,d] |
| | |
| | |
| | for shift1 in possible_shifts: |
| | for shift2 in possible_shifts: |
| | if shift1['start'] != shift2['start']: |
| | model += x[(s, d-1, shift1['id'])] + x[(s, d, shift2['id'])] <= 1 + timing_violations[s,d] |
| | |
| | |
| | for d in range(1, num_days+1): |
| | day_index = d - 1 |
| | for cycle in cycle_cols: |
| | staff_needed = data.iloc[day_index][f'{cycle}_staff'] |
| | covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] |
| | model += pl.lpSum(x[(s, d, shift['id'])] |
| | for s in range(1, num_staff+1) |
| | for shift in covering_shifts) >= staff_needed |
| | |
| | |
| | for s in range(1, num_staff+1): |
| | for d in range(1, num_days+1): |
| | |
| | model += pl.lpSum(x[(s, d, shift['id'])] for shift in possible_shifts) <= 1 |
| | |
| | |
| | solver = pl.PULP_CBC_CMD(timeLimit=time_limit, msg=1, gapRel=0.01) |
| | model.solve(solver) |
| | |
| | |
| | if model.status == pl.LpStatusOptimal or model.status == pl.LpStatusNotSolved: |
| | |
| | schedule = [] |
| | for s in range(1, num_staff+1): |
| | for d in range(1, num_days+1): |
| | for shift in possible_shifts: |
| | if pl.value(x[(s, d, shift['id'])]) == 1: |
| | shift_details = next((sh for sh in possible_shifts if sh['id'] == shift['id']), None) |
| | schedule.append({ |
| | 'staff_id': s, |
| | 'day': d, |
| | 'shift_id': shift['id'], |
| | 'start': shift_details['start'], |
| | 'end': shift_details['end'], |
| | 'duration': shift_details['duration'], |
| | 'cycles_covered': list(shift_details['cycles_covered']) |
| | }) |
| | |
| | |
| | timing_violations = sum(pl.value(timing_violations[s,d]) |
| | for s in range(1, num_staff+1) |
| | for d in range(2, num_days+1)) |
| | |
| | if timing_violations > 0: |
| | print(f"Warning: Found {timing_violations} timing violations") |
| | return None, None |
| | |
| | return schedule, model.objective.value() |
| | else: |
| | return None, None |
| | except Exception as e: |
| | print(f"Error in optimization: {e}") |
| | return None, None |
| | |
| | |
| | if exact_staff_count is not None and exact_staff_count > 0: |
| | |
| | staff_count = int(exact_staff_count) |
| | results = f"Using exactly {staff_count} staff as specified...\n" |
| | |
| | |
| | schedule, objective = optimize_schedule(staff_count) |
| | |
| | if schedule is None: |
| | results += f"Failed to find a feasible solution with exactly {staff_count} staff.\n" |
| | results += "Try increasing the staff count.\n" |
| | return results, None, None, None, None |
| | else: |
| | |
| | min_staff = max(1, int(theoretical_min_staff)) |
| | max_staff = int(min_staff_estimate) + 5 |
| | |
| | results = f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n" |
| | results += f"Searching for minimum staff count starting from {min_staff}...\n" |
| | |
| | |
| | for staff_count in range(min_staff, max_staff + 1): |
| | results += f"Trying with {staff_count} staff...\n" |
| | |
| | |
| | time_limit = 300 + (staff_count - min_staff) * 100 |
| | schedule, objective = optimize_schedule(staff_count, time_limit) |
| | |
| | if schedule is not None: |
| | results += f"Found feasible solution with {staff_count} staff.\n" |
| | break |
| | |
| | if schedule is None: |
| | results += "Failed to find a feasible solution with the attempted staff counts.\n" |
| | results += "Try increasing the staff count manually or relaxing constraints.\n" |
| | return results, None, None, None, None |
| | |
| | results += f"Optimal solution found with {staff_count} staff\n" |
| | results += f"Total staff hours: {objective}\n" |
| | |
| | |
| | schedule_df = pd.DataFrame(schedule) |
| | |
| | |
| | staff_hours = {} |
| | for s in range(1, staff_count+1): |
| | staff_shifts = schedule_df[schedule_df['staff_id'] == s] |
| | total_hours = staff_shifts['duration'].sum() |
| | staff_hours[s] = total_hours |
| | |
| | |
| | active_staff_hours = {s: hours for s, hours in staff_hours.items() if hours > 0} |
| | |
| | results += "\nStaff Hours:\n" |
| | for staff_id, hours in active_staff_hours.items(): |
| | utilization = (hours / MAX_HOURS_PER_STAFF) * 100 |
| | results += f"Staff {staff_id}: {hours} hours ({utilization:.1f}% utilization)\n" |
| | |
| | if hours > MAX_HOURS_PER_STAFF: |
| | overtime = hours - MAX_HOURS_PER_STAFF |
| | overtime_percent = (overtime / MAX_HOURS_PER_STAFF) * 100 |
| | results += f" Overtime: {overtime:.1f} hours ({overtime_percent:.1f}%)\n" |
| | |
| | |
| | active_staff_count = len(active_staff_hours) |
| | avg_utilization = sum(active_staff_hours.values()) / (active_staff_count * MAX_HOURS_PER_STAFF) * 100 |
| | results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n" |
| | |
| | |
| | coverage_check = [] |
| | for d in range(1, num_days+1): |
| | day_index = d - 1 |
| | |
| | day_schedule = schedule_df[schedule_df['day'] == d] |
| | |
| | for cycle in cycle_cols: |
| | required = data.iloc[day_index][f'{cycle}_staff'] |
| | |
| | |
| | assigned = sum(1 for _, shift in day_schedule.iterrows() |
| | if cycle in shift['cycles_covered']) |
| | |
| | coverage_check.append({ |
| | 'day': d, |
| | 'cycle': cycle, |
| | 'required': required, |
| | 'assigned': assigned, |
| | 'satisfied': assigned >= required |
| | }) |
| | |
| | coverage_df = pd.DataFrame(coverage_check) |
| | satisfaction = coverage_df['satisfied'].mean() * 100 |
| | results += f"Coverage satisfaction: {satisfaction:.1f}%\n" |
| | |
| | if satisfaction < 100: |
| | results += "Warning: Not all staffing requirements are met!\n" |
| | unsatisfied = coverage_df[~coverage_df['satisfied']] |
| | results += unsatisfied.to_string() + "\n" |
| | |
| | |
| | detailed_schedule = "Detailed Schedule:\n" |
| | for d in range(1, num_days+1): |
| | day_schedule = schedule_df[schedule_df['day'] == d] |
| | day_schedule = day_schedule.sort_values(['start']) |
| | |
| | detailed_schedule += f"\nDay {d}:\n" |
| | for _, shift in day_schedule.iterrows(): |
| | start_hour = shift['start'] |
| | end_hour = shift['end'] |
| |
|
| | start_str = am_pm(start_hour) |
| | end_str = am_pm(end_hour) |
| |
|
| | cycles = ", ".join(shift['cycles_covered']) |
| | detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n" |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(15, 8)) |
| | |
| | |
| | staff_days = {} |
| | for s in range(1, staff_count+1): |
| | staff_days[s] = [0] * num_days |
| | |
| | for _, shift in schedule_df.iterrows(): |
| | staff_id = shift['staff_id'] |
| | day = shift['day'] - 1 |
| | staff_days[staff_id][day] = shift['duration'] |
| | |
| | |
| | for s, hours in staff_days.items(): |
| | ax.bar(range(1, num_days+1), hours, label=f'Staff {s}') |
| | |
| | ax.set_xlabel('Day') |
| | ax.set_ylabel('Shift Hours') |
| | ax.set_title('Staff Schedule') |
| | ax.set_xticks(range(1, num_days+1)) |
| | ax.legend() |
| | |
| | |
| | plot_path = None |
| | with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: |
| | plt.savefig(f.name) |
| | plt.close(fig) |
| | plot_path = f.name |
| | |
| | |
| | gantt_path = create_gantt_chart(schedule_df, num_days, staff_count) |
| |
|
| | |
| | schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) |
| | schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) |
| | schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False) |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: |
| | temp_file.write(schedule_csv) |
| | schedule_csv_path = temp_file.name |
| |
|
| | |
| | staff_assignment_data = [] |
| | for d in range(1, num_days + 1): |
| | cycle_staff = {} |
| | for cycle in cycle_cols: |
| | |
| | staff_ids = schedule_df[(schedule_df['day'] == d) & (schedule_df['cycles_covered'].apply(lambda x: cycle in x))]['staff_id'].tolist() |
| | cycle_staff[cycle] = len(staff_ids) |
| | staff_assignment_data.append([d] + [cycle_staff[cycle] for cycle in cycle_cols]) |
| |
|
| | staff_assignment_df = pd.DataFrame(staff_assignment_data, columns=['Day'] + cycle_cols) |
| | |
| | |
| | staff_assignment_csv_path = None |
| | with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: |
| | staff_assignment_df.to_csv(temp_file.name, index=False) |
| | staff_assignment_csv_path = temp_file.name |
| | |
| | |
| | return results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path |
| |
|
| | def convert_to_24h(time_str): |
| | """Converts AM/PM time string to 24-hour format.""" |
| | try: |
| | time_obj = datetime.strptime(time_str, "%I:00 %p") |
| | return time_obj.hour |
| | except ValueError: |
| | return None |
| |
|
| | def gradio_wrapper( |
| | csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, |
| | exact_staff_count=None, overtime_percent=100 |
| | ): |
| | try: |
| | |
| | clinic_start = convert_to_24h(clinic_start_ampm) |
| | clinic_end = convert_to_24h(clinic_end_ampm) |
| | |
| | |
| | results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( |
| | csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, |
| | exact_staff_count, overtime_percent |
| | ) |
| | |
| | |
| | return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path |
| | except Exception as e: |
| | |
| | empty_staff_df = pd.DataFrame(columns=["Day"]) |
| | error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." |
| | |
| | return empty_staff_df, None, None, None, None, None |
| |
|
| | |
| | def create_gantt_chart(schedule_df, num_days, staff_count): |
| | |
| | active_staff_ids = sorted(schedule_df['staff_id'].unique()) |
| | active_staff_count = len(active_staff_ids) |
| | |
| | |
| | staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)} |
| | |
| | |
| | plt.figure(figsize=(max(30, num_days * 1.5), max(12, active_staff_count * 0.8)), dpi=200) |
| | |
| | |
| | colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count)) |
| | |
| | |
| | plt.style.use('seaborn-v0_8-whitegrid') |
| | |
| | |
| | |
| | ax = plt.gca() |
| | ax.set_facecolor('#f8f9fa') |
| | |
| | |
| | schedule_df = schedule_df.sort_values(['staff_id', 'day']) |
| | |
| | |
| | for i, staff_id in enumerate(active_staff_ids): |
| | staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] |
| | |
| | y_pos = active_staff_count - i |
| | |
| | |
| | ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold', |
| | ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray', |
| | boxstyle='round,pad=0.5', alpha=0.9)) |
| | |
| | |
| | ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5) |
| | |
| | |
| | shift_positions = [] |
| | |
| | for idx, shift in enumerate(staff_shifts.iterrows()): |
| | _, shift = shift |
| | day = shift['day'] |
| | start_hour = shift['start'] |
| | end_hour = shift['end'] |
| | duration = shift['duration'] |
| | |
| | |
| | start_ampm = am_pm(start_hour) |
| | end_ampm = am_pm(end_hour) |
| | |
| | |
| | shift_start_pos = day-1+start_hour/24 |
| | |
| | |
| | if end_hour < start_hour: |
| | |
| | rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect1: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | rect2 = ax.barh(y_pos, end_hour/24, left=day, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect2: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | shift_width = (24-start_hour)/24 |
| | if shift_width >= 0.1: |
| | label_pos = shift_start_pos + shift_width/2 |
| | |
| | |
| | y_offset = 0.35 if idx % 2 == 0 else -0.35 |
| | |
| | |
| | label = f"{start_ampm}-{end_ampm}" |
| | text = ax.text(label_pos, y_pos + y_offset, label, |
| | ha='center', va='center', fontsize=9, fontweight='bold', |
| | color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, |
| | boxstyle='round,pad=0.3', edgecolor='gray'), |
| | zorder=20) |
| | |
| | shift_positions.append(label_pos) |
| | else: |
| | |
| | shift_width = duration/24 |
| | rect = ax.barh(y_pos, shift_width, left=shift_start_pos, |
| | height=0.6, color=colors[i], alpha=0.9, |
| | edgecolor='black', linewidth=1, zorder=10) |
| | |
| | |
| | for r in rect: |
| | r.set_edgecolor('black') |
| | r.set_linewidth(1) |
| | |
| | |
| | if shift_width >= 0.1: |
| | label_pos = shift_start_pos + shift_width/2 |
| | |
| | |
| | y_offset = 0.35 if idx % 2 == 0 else -0.35 |
| | |
| | |
| | label = f"{start_ampm}-{end_ampm}" |
| | text = ax.text(label_pos, y_pos + y_offset, label, |
| | ha='center', va='center', fontsize=9, fontweight='bold', |
| | color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, |
| | boxstyle='round,pad=0.3', edgecolor='gray'), |
| | zorder=20) |
| | |
| | shift_positions.append(label_pos) |
| | |
| | |
| | for day in range(1, num_days + 1): |
| | |
| | is_weekend = (day % 7 == 0) or (day % 7 == 6) |
| | |
| | if is_weekend: |
| | ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10) |
| | day_label = "Saturday" if day % 7 == 6 else "Sunday" |
| | ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000', |
| | fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round')) |
| | |
| | |
| | ax.set_xticks(np.arange(0.5, num_days, 1)) |
| | day_labels = [f"Day {d}" for d in range(1, num_days+1)] |
| | ax.set_xticklabels(day_labels, rotation=0, ha='center', fontsize=10) |
| | |
| | |
| | for day in range(1, num_days): |
| | ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5) |
| | |
| | |
| | ax.set_yticks(np.arange(1, active_staff_count+1)) |
| | ax.set_yticklabels([]) |
| | |
| | |
| | ax.set_xlim(-0.8, num_days) |
| | ax.set_ylim(0.5, active_staff_count + 0.5) |
| | |
| | |
| | for day in range(num_days): |
| | for hour in [6, 12, 18]: |
| | ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5) |
| | |
| | hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM" |
| | ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7, |
| | color='#666666', rotation=90, alpha=0.7) |
| | |
| | |
| | plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333') |
| | plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333') |
| | |
| | |
| | time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10, |
| | fontweight='bold', color='#333333') |
| | time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight'] |
| | for i, time in enumerate(time_markers): |
| | plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555') |
| | |
| | |
| | for spine in ['top', 'right', 'left']: |
| | ax.spines[spine].set_visible(False) |
| | |
| | |
| | weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12, |
| | color='#cc0000', fontweight='bold', |
| | bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round')) |
| | |
| | |
| | plt.box(False) |
| | |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: |
| | plt.tight_layout() |
| | plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white') |
| | plt.close() |
| | return f.name |
| |
|
| | |
| | am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] |
| |
|
| | |
| | css = """ |
| | .chart-container { |
| | height: 800px !important; |
| | width: 100% !important; |
| | margin: 20px 0; |
| | padding: 20px; |
| | border: 1px solid #ddd; |
| | border-radius: 8px; |
| | background: white; |
| | box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
| | } |
| | |
| | .weekly-chart-container { |
| | height: 1000px !important; |
| | width: 100% !important; |
| | margin: 20px 0; |
| | padding: 20px; |
| | border: 1px solid #ddd; |
| | border-radius: 8px; |
| | background: white; |
| | box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
| | } |
| | |
| | /* Ensure plotly charts are visible */ |
| | .js-plotly-plot { |
| | width: 100% !important; |
| | height: 100% !important; |
| | } |
| | |
| | /* Improve visibility of chart titles */ |
| | .gtitle { |
| | font-weight: bold !important; |
| | font-size: 20px !important; |
| | } |
| | """ |
| |
|
| | with gr.Blocks(title="Staff Scheduling Optimizer", css=css) as iface: |
| | |
| | gr.Markdown("# Staff Scheduling Optimizer") |
| | gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.") |
| | |
| | with gr.Row(): |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Input Parameters") |
| | |
| | |
| | csv_input = gr.File(label="Upload CSV") |
| | beds_per_staff = gr.Number(label="Beds per Staff", value=3) |
| | max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160) |
| | hours_per_cycle = gr.Number(label="Hours per Cycle", value=4) |
| | rest_days_per_week = gr.Number(label="Rest Days per Week", value=2) |
| | clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="08:00 AM") |
| | clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="08:00 PM") |
| | overlap_time = gr.Number(label="Overlap Time", value=0) |
| | max_start_time_change = gr.Number(label="Max Start Time Change", value=2) |
| | exact_staff_count = gr.Number(label="Exact Staff Count (optional)", value=None) |
| | overtime_percent = gr.Slider(label="Overtime Allowed (%)", minimum=0, maximum=100, value=100, step=10) |
| | |
| | optimize_btn = gr.Button("Optimize Schedule", variant="primary", size="lg") |
| | |
| | |
| | with gr.Column(scale=2): |
| | gr.Markdown("### Results") |
| | |
| | |
| | with gr.Tabs(): |
| | with gr.TabItem("Detailed Schedule"): |
| | with gr.Row(): |
| | csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule") |
| | |
| | with gr.Row(): |
| | schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True) |
| | |
| | with gr.TabItem("Gantt Chart"): |
| | gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart") |
| | |
| | with gr.TabItem("Staff Coverage by Cycle"): |
| | with gr.Row(): |
| | staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table") |
| | |
| | with gr.Row(): |
| | staff_download_file = gr.File(label="Download Coverage Table", visible=True) |
| | |
| | with gr.TabItem("Constraints and Analytics"): |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Applied Constraints") |
| | constraints_text = gr.TextArea( |
| | label="", |
| | interactive=False, |
| | show_label=False |
| | ) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Monthly Distribution") |
| | monthly_chart = gr.HTML( |
| | label="Monthly Hours Distribution", |
| | show_label=False, |
| | elem_classes="chart-container" |
| | ) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Weekly Distribution") |
| | weekly_charts = gr.HTML( |
| | label="Weekly Hours Distribution", |
| | show_label=False, |
| | elem_classes="weekly-chart-container" |
| | ) |
| | |
| | with gr.TabItem("Staff Overlap"): |
| | with gr.Row(): |
| | overlap_chart = gr.HTML( |
| | label="Staff Overlap Visualization", |
| | show_label=False |
| | ) |
| | with gr.Row(): |
| | gr.Markdown(""" |
| | This heatmap shows the number of staff members working simultaneously throughout each day. |
| | - Darker colors indicate more staff overlap |
| | - The x-axis shows time of day in 30-minute intervals |
| | - The y-axis shows each day of the schedule |
| | """) |
| | |
| | with gr.TabItem("Staff Absence Handler"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Handle Staff Absence") |
| | absent_staff = gr.Number(label="Staff ID to be absent", precision=0) |
| | absence_start = gr.Number(label="Start Day", precision=0) |
| | absence_end = gr.Number(label="End Day", precision=0) |
| | handle_absence_btn = gr.Button("Redistribute Shifts", variant="primary") |
| | |
| | with gr.Column(): |
| | absence_result = gr.TextArea(label="Redistribution Results", interactive=False) |
| | updated_schedule = gr.DataFrame(label="Updated Schedule") |
| | absence_gantt_chart = gr.Image(label="Absence Schedule Visualization", elem_id="absence_gantt_chart") |
| | |
| | |
| | def create_download_link(df, filename="data.csv"): |
| | """Create a CSV download link for a dataframe""" |
| | if df is None or df.empty: |
| | return None |
| | |
| | csv_data = df.to_csv(index=False) |
| | with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: |
| | f.write(csv_data) |
| | return f.name |
| |
|
| | |
| | def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, |
| | overlap_time, max_start_time_change, exact_staff_count, overtime_percent): |
| | try: |
| | |
| | clinic_start = convert_to_24h(clinic_start_ampm) |
| | clinic_end = convert_to_24h(clinic_end_ampm) |
| | |
| | |
| | results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( |
| | csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, |
| | exact_staff_count, overtime_percent |
| | ) |
| | |
| | if schedule_df is not None: |
| | try: |
| | |
| | constraints_info = get_constraints_summary( |
| | max_hours_per_staff, |
| | rest_days_per_week, |
| | overtime_percent |
| | ) |
| | |
| | |
| | monthly_html = create_monthly_distribution_chart(schedule_df) |
| | weekly_html = create_weekly_distribution_charts(schedule_df) |
| | overlap_html = create_overlap_visualization(schedule_df) |
| | |
| | return ( |
| | staff_assignment_df, |
| | gantt_path, |
| | schedule_df, |
| | schedule_csv_path, |
| | constraints_info, |
| | monthly_html, |
| | weekly_html, |
| | overlap_html |
| | ) |
| | except Exception as e: |
| | print(f"Error in visualization: {str(e)}") |
| | return ( |
| | staff_assignment_df, |
| | gantt_path, |
| | schedule_df, |
| | schedule_csv_path, |
| | "Error in constraints", |
| | "<div>Error creating monthly chart</div>", |
| | "<div>Error creating weekly charts</div>", |
| | "<div>Error creating overlap visualization</div>" |
| | ) |
| | else: |
| | return (None,) * 8 |
| | |
| | except Exception as e: |
| | print(f"Error in optimization: {str(e)}") |
| | return (None,) * 8 |
| | |
| | def get_constraints_summary(max_hours, rest_days, overtime_percent): |
| | """Generate a summary of all applied constraints from actual parameters""" |
| | constraints = [ |
| | "Applied Scheduling Constraints:", |
| | "----------------------------", |
| | f"1. Maximum Hours per Month: {max_hours} hours", |
| | f"2. Required Rest Days per Week: {rest_days} days", |
| | f"3. Maximum Weekly Hours: 60 hours per staff member", |
| | "4. Minimum Rest Period: 11 hours between shifts", |
| | "5. Maximum Consecutive Days: 6 working days", |
| | f"6. Overtime Allowance: {overtime_percent}% of standard hours", |
| | "7. Coverage Requirements:", |
| | " - All cycles must be fully staffed", |
| | " - No understaffing allowed", |
| | " - Staff assigned based on required beds/staff ratio", |
| | "8. Shift Constraints:", |
| | " - Available shift durations: 5, 10 hours", |
| | " - Shifts must align with cycle times", |
| | "9. Staff Scheduling Rules:", |
| | " - Equal distribution of workload when possible", |
| | " - Consistent shift patterns preferred", |
| | " - Weekend rotations distributed fairly" |
| | ] |
| | return "\n".join(constraints) |
| | |
| | def create_monthly_distribution_chart(schedule_df): |
| | """Create Seaborn pie chart for monthly hours distribution""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | staff_hours = schedule_df.groupby('staff_id')['duration'].sum() |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(8, 8)) |
| | sns.set_palette("pastel") |
| | ax.pie(staff_hours, labels=staff_hours.index, autopct='%1.1f%%', startangle=90) |
| | ax.axis('equal') |
| | plt.title("Monthly Hours Distribution") |
| | |
| | |
| | img = io.BytesIO() |
| | plt.savefig(img, format='png', bbox_inches='tight') |
| | plt.close(fig) |
| | img.seek(0) |
| | |
| | |
| | img_base64 = base64.b64encode(img.read()).decode('utf-8') |
| | img_html = f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%; max-height:600px;">' |
| | |
| | return img_html |
| | except Exception as e: |
| | print(f"Error in monthly chart: {e}") |
| | return f"<div>Error creating monthly chart: {str(e)}</div>" |
| | |
| | def create_weekly_distribution_charts(schedule_df): |
| | """Create Plotly pie charts for weekly hours distribution""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | schedule_df['week'] = schedule_df['day'] // 7 |
| | weekly_hours = schedule_df.groupby(['week', 'staff_id'])['duration'].sum().reset_index() |
| | |
| | |
| | weekly_hours['staff_label'] = weekly_hours.apply( |
| | lambda x: f"Staff {x['staff_id']} ({x['duration']:.1f}hrs)", |
| | axis=1 |
| | ) |
| | |
| | |
| | weeks = sorted(weekly_hours['week'].unique()) |
| | |
| | |
| | colors = px.colors.qualitative.Set3 |
| | |
| | |
| | fig = make_subplots( |
| | rows=len(weeks), |
| | cols=1, |
| | subplot_titles=[f'Week {week}' for week in weeks], |
| | specs=[[{'type': 'domain'}] for week in weeks] |
| | ) |
| | |
| | |
| | for i, week in enumerate(weeks, start=1): |
| | week_data = weekly_hours[weekly_hours['week'] == week] |
| | |
| | fig.add_trace( |
| | go.Pie( |
| | values=week_data['duration'], |
| | labels=week_data['staff_label'], |
| | name=f'Week {week}', |
| | showlegend=(i == 1), |
| | marker_colors=colors, |
| | textposition='inside', |
| | textinfo='percent+label', |
| | hovertemplate=( |
| | "Staff: %{label}<br>" |
| | "Hours: %{value:.1f}<br>" |
| | "Percentage: %{percent:.1f}%" |
| | "<extra></extra>" |
| | ) |
| | ), |
| | row=i, |
| | col=1 |
| | ) |
| | |
| | fig.update_layout( |
| | height=300 * len(weeks), |
| | width=800, |
| | title_text="Weekly Hours Distribution", |
| | title_x=0.5, |
| | title_font_size=20, |
| | margin=dict(t=50, l=50, r=50, b=50), |
| | showlegend=True |
| | ) |
| | |
| | return fig.to_html(include_plotlyjs='cdn', full_html=False) |
| | except Exception as e: |
| | print(f"Error in weekly charts: {e}") |
| | return f"<div>Error creating weekly charts: {str(e)}</div>" |
| |
|
| | |
| | def create_overlap_visualization(schedule_df): |
| | """Create Seaborn heatmap for staff overlap""" |
| | if schedule_df is None or schedule_df.empty: |
| | return "<div>No data available for visualization</div>" |
| | |
| | try: |
| | |
| | intervals = 48 |
| | days = sorted(schedule_df['day'].unique()) |
| | |
| | |
| | overlap_data = np.zeros((len(days), intervals)) |
| | |
| | |
| | for day_idx, day in enumerate(days): |
| | day_shifts = schedule_df[schedule_df['day'] == day] |
| | |
| | for i in range(intervals): |
| | time = i * 0.5 |
| | staff_working = 0 |
| | |
| | for _, shift in day_shifts.iterrows(): |
| | start = shift['start'] |
| | end = shift['end'] |
| | |
| | if end < start: |
| | if time >= start or time < end: |
| | staff_working += 1 |
| | else: |
| | if start <= time < end: |
| | staff_working += 1 |
| | |
| | overlap_data[day_idx, i] = staff_working |
| | |
| | |
| | time_labels = [f"{int(i//2):02d}:{int((i%2)*30):02d}" for i in range(intervals)] |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(12, 8)) |
| | sns.heatmap(overlap_data, cmap="viridis", ax=ax, cbar_kws={'label': 'Staff Count'}) |
| | |
| | |
| | ax.set_xticks(np.arange(len(time_labels[::4]))) |
| | ax.set_xticklabels(time_labels[::4], rotation=45, ha="right") |
| | ax.set_yticks(np.arange(len(days))) |
| | ax.set_yticklabels(days) |
| | |
| | |
| | ax.set_title("Staff Overlap Throughout the Day") |
| | |
| | |
| | plt.tight_layout() |
| | |
| | |
| | img = io.BytesIO() |
| | plt.savefig(img, format='png', bbox_inches='tight') |
| | plt.close(fig) |
| | img.seek(0) |
| | |
| | |
| | img_base64 = base64.b64encode(img.read()).decode('utf-8') |
| | img_html = f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%; max-height:800px;">' |
| | |
| | return img_html |
| | except Exception as e: |
| | print(f"Error in overlap visualization: {e}") |
| | return f"<div>Error creating overlap visualization: {str(e)}</div>" |
| |
|
| | |
| | optimize_btn.click( |
| | fn=optimize_and_display, |
| | inputs=[ |
| | csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, |
| | rest_days_per_week, clinic_start_ampm, clinic_end_ampm, |
| | overlap_time, max_start_time_change, exact_staff_count, overtime_percent |
| | ], |
| | outputs=[ |
| | staff_assignment_table, |
| | gantt_chart, |
| | csv_schedule, |
| | schedule_download_file, |
| | constraints_text, |
| | monthly_chart, |
| | weekly_charts, |
| | overlap_chart |
| | ] |
| | ) |
| |
|
| | |
| | def handle_absence_click(staff_id, start_day, end_day, current_schedule, max_hours_per_staff, overtime_percent): |
| | if current_schedule is None or current_schedule.empty: |
| | return "No current schedule loaded.", None, None |
| | |
| | absence_dates = list(range(int(start_day), int(end_day) + 1)) |
| | summary, absence_schedule, absence_gantt_path = handle_staff_absence( |
| | current_schedule, |
| | int(staff_id), |
| | absence_dates, |
| | max_hours_per_staff, |
| | overtime_percent |
| | ) |
| | |
| | return summary, absence_schedule, absence_gantt_path |
| |
|
| | |
| | handle_absence_btn.click( |
| | fn=handle_absence_click, |
| | inputs=[ |
| | absent_staff, |
| | absence_start, |
| | absence_end, |
| | csv_schedule, |
| | max_hours_per_staff, |
| | overtime_percent |
| | ], |
| | outputs=[ |
| | absence_result, |
| | updated_schedule, |
| | absence_gantt_chart |
| | ] |
| | ) |
| |
|
| | |
| | iface.launch(share=True) |
| |
|
| | def create_interface(): |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# NEF Scheduling System") |
| | |
| | with gr.Tabs() as tabs: |
| | with gr.Tab("Schedule Input"): |
| | |
| | with gr.Row(): |
| | csv_input = gr.File(label="Upload Schedule Data (CSV)") |
| | schedule_preview = gr.DataFrame(label="Schedule Preview") |
| | |
| | with gr.Tab("Schedule Output"): |
| | |
| | with gr.Row(): |
| | schedule_output = gr.DataFrame(label="Generated Schedule") |
| | download_btn = gr.Button("Download Schedule") |
| | |
| | with gr.Tab("Constraints and Analytics"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Applied Constraints") |
| | constraints_text = gr.TextArea(label="", interactive=False) |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Monthly Distribution") |
| | monthly_chart = gr.HTML(label="Monthly Hours Distribution") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Weekly Distribution") |
| | weekly_charts = gr.HTML(label="Weekly Hours Distribution") |
| |
|
| | with gr.TabItem("Staff Absence Handler"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### Handle Staff Absence") |
| | absent_staff = gr.Number(label="Staff ID to be absent", precision=0) |
| | absence_start = gr.Number(label="Start Day", precision=0) |
| | absence_end = gr.Number(label="End Day", precision=0) |
| | handle_absence_btn = gr.Button("Redistribute Shifts", variant="primary") |
| | |
| | with gr.Column(): |
| | absence_result = gr.TextArea(label="Redistribution Results", interactive=False) |
| | updated_schedule = gr.DataFrame(label="Updated Schedule") |
| | absence_gantt_chart = gr.Image(label="Absence Schedule Visualization", elem_id="absence_gantt_chart") |
| |
|
| | return demo |
| |
|
| | def handle_staff_absence(schedule_df, absent_staff_id, absence_dates, max_hours_per_staff, overtime_percent): |
| | """ |
| | Redistribute shifts of absent staff member to others, prioritizing staff with lowest monthly hours |
| | """ |
| | try: |
| | |
| | new_schedule = schedule_df.copy() |
| | |
| | |
| | absent_shifts = new_schedule[ |
| | (new_schedule['staff_id'] == absent_staff_id) & |
| | (new_schedule['day'].isin(absence_dates)) |
| | ] |
| | |
| | if absent_shifts.empty: |
| | return "No shifts found for the specified staff member on given dates.", None, None |
| | |
| | |
| | available_staff = sorted(list(set(new_schedule['staff_id']) - {absent_staff_id})) |
| | |
| | |
| | current_hours = new_schedule.groupby('staff_id')['duration'].sum() |
| | |
| | |
| | staff_hours_sorted = current_hours.reindex(available_staff).sort_values() |
| | available_staff = staff_hours_sorted.index.tolist() |
| | |
| | |
| | max_allowed_hours = max_hours_per_staff * (1 + overtime_percent/100) |
| | available_hours = { |
| | staff_id: max_allowed_hours - current_hours.get(staff_id, 0) |
| | for staff_id in available_staff |
| | } |
| | |
| | results = [] |
| | unassigned_shifts = [] |
| | |
| | |
| | for _, shift in absent_shifts.iterrows(): |
| | |
| | eligible_staff = [] |
| | eligible_staff_hours = {} |
| | |
| | for staff_id in available_staff: |
| | |
| | if available_hours[staff_id] >= shift['duration']: |
| | |
| | staff_shifts_that_day = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day']) |
| | ] |
| | |
| | if staff_shifts_that_day.empty: |
| | |
| | day_before = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day'] - 1) |
| | ] |
| | |
| | day_after = new_schedule[ |
| | (new_schedule['staff_id'] == staff_id) & |
| | (new_schedule['day'] == shift['day'] + 1) |
| | ] |
| | |
| | can_work = True |
| | if not day_before.empty: |
| | end_time_before = day_before.iloc[0]['end'] |
| | if (shift['start'] + 24 - end_time_before) < 11: |
| | can_work = False |
| | |
| | if not day_after.empty and can_work: |
| | start_time_after = day_after.iloc[0]['start'] |
| | if (starttime_after + 24 - shift['end']) < 11: |
| | can_work = False |
| | |
| | if can_work: |
| | eligible_staff.append(staff_id) |
| | eligible_staff_hours[staff_id] = current_hours.get(staff_id, 0) |
| | |
| | if eligible_staff: |
| | |
| | sorted_eligible = sorted(eligible_staff, key=lambda x: eligible_staff_hours[x]) |
| | best_staff = sorted_eligible[0] |
| | |
| | |
| | new_schedule.loc[shift.name, 'staff_id'] = best_staff |
| | |
| | |
| | available_hours[best_staff] -= shift['duration'] |
| | current_hours[best_staff] = current_hours.get(best_staff, 0) + shift['duration'] |
| | |
| | results.append( |
| | f"Shift on Day {shift['day']} ({shift['duration']} hours) " |
| | f"reassigned to Staff {best_staff} (current hours: {current_hours[best_staff]:.1f})" |
| | ) |
| | else: |
| | unassigned_shifts.append( |
| | f"Could not reassign shift on Day {shift['day']} ({shift['duration']} hours)" |
| | ) |
| | |
| | |
| | summary = "\n".join([ |
| | "Shift Redistribution Summary:", |
| | "----------------------------", |
| | f"Staff {absent_staff_id} absent for {len(absence_dates)} days", |
| | f"Successfully reassigned: {len(results)} shifts", |
| | f"Failed to reassign: {len(unassigned_shifts)} shifts", |
| | "\nCurrent Hours Distribution:", |
| | "-------------------------" |
| | ] + [ |
| | f"Staff {s}: {current_hours.get(s, 0):.1f} hours (of max {max_allowed_hours:.1f})" |
| | for s in sorted(available_staff) |
| | ] + [ |
| | "\nReassignment Details:", |
| | *results, |
| | "\nUnassigned Shifts:", |
| | *unassigned_shifts |
| | ]) |
| | |
| | |
| | absence_schedule = new_schedule[new_schedule['day'].isin(absence_dates)].copy() |
| | |
| | |
| | absence_gantt_path = create_gantt_chart(absence_schedule, len(absence_dates), len(set(absence_schedule['staff_id']))) |
| | |
| | if unassigned_shifts: |
| | return summary, None, None |
| | else: |
| | return summary, absence_schedule, absence_gantt_path |
| | |
| | except Exception as e: |
| | return f"Error redistributing shifts: {str(e)}", None, None |
| |
|
| | class FastScheduler: |
| | def __init__(self, num_staff, num_days, possible_shifts, staff_requirements, constraints): |
| | self.num_staff = num_staff |
| | self.num_days = num_days |
| | self.possible_shifts = possible_shifts |
| | self.staff_requirements = staff_requirements |
| | self.constraints = constraints |
| | self.best_schedule = None |
| | self.best_score = float('inf') |
| | |
| | self.staff_sequences = {} |
| |
|
| | def _find_optimal_shift(self, staff_id, day, cycle, staff_hours): |
| | """Find a valid shift that strictly enforces timing consistency""" |
| | valid_shifts = self._get_valid_shifts(cycle) |
| | if not valid_shifts: |
| | return None |
| |
|
| | |
| | staff_info = self.staff_sequences.get(staff_id) |
| | |
| | if staff_info: |
| | days_gap = day - staff_info['last_day'] |
| | if days_gap == 1: |
| | |
| | required_start_time = staff_info['last_time'] |
| | valid_shifts = [s for s in valid_shifts if s['start'] == required_start_time] |
| | if not valid_shifts: |
| | return None |
| | |
| | |
| | if valid_shifts: |
| | |
| | shift = valid_shifts[0] |
| | assigned_shift = { |
| | 'staff_id': staff_id, |
| | 'day': day, |
| | 'shift_id': shift['id'], |
| | 'start': shift['start'], |
| | 'end': shift['end'], |
| | 'duration': shift['duration'], |
| | 'cycles_covered': list(shift['cycles_covered']) |
| | } |
| | |
| | |
| | self.staff_sequences[staff_id] = { |
| | 'last_day': day, |
| | 'last_time': shift['start'] |
| | } |
| | |
| | return assigned_shift |
| | |
| | return None |
| |
|
| | def _evaluate_schedule(self, schedule): |
| | """Evaluate schedule based ONLY on timing consistency""" |
| | if not schedule: |
| | return float('inf') |
| |
|
| | |
| | staff_shifts = {} |
| | for shift in schedule: |
| | staff_id = shift['staff_id'] |
| | if staff_id not in staff_shifts: |
| | staff_shifts[staff_id] = [] |
| | staff_shifts[staff_id].append(shift) |
| |
|
| | |
| | for shifts in staff_shifts.values(): |
| | shifts.sort(key=lambda x: x['day']) |
| | |
| | for i in range(1, len(shifts)): |
| | current_shift = shifts[i] |
| | prev_shift = shifts[i-1] |
| | days_gap = current_shift['day'] - prev_shift['day'] |
| | |
| | if days_gap == 1: |
| | if current_shift['start'] != prev_shift['start']: |
| | return float('inf') |
| |
|
| | |
| | return 0 |
| |
|
| | def _get_reference_time(self, staff_id, day): |
| | """Get the reference time that MUST be used for consecutive days""" |
| | staff_info = self.staff_sequences.get(staff_id) |
| | if staff_info and (day - staff_info['last_day'] == 1): |
| | return staff_info['last_time'] |
| | return None |
| |
|
| | def _is_new_sequence(self, staff_id, day): |
| | """Check if this is a new sequence (after rest/off days)""" |
| | staff_info = self.staff_sequences.get(staff_id) |
| | if not staff_info: |
| | return True |
| | return (day - staff_info['last_day']) >= 2 |