import pandas as pd import numpy as np import pulp as pl # Changed from PuLP to pulp import matplotlib.pyplot as plt import gradio as gr from itertools import product import io import base64 import tempfile import os from datetime import datetime def am_pm(hour): """Converts 24-hour time to AM/PM format.""" period = "AM" if hour >= 12: period = "PM" if hour > 12: hour -= 12 elif hour == 0: hour = 12 # Midnight return f"{int(hour):02d}:00 {period}" def show_dataframe(csv_path): """Reads a CSV file and returns a Pandas DataFrame.""" try: df = pd.read_csv(csv_path) return df except Exception as e: return f"Error loading CSV: {e}" def optimize_staffing( csv_file, beds_per_staff, max_hours_per_staff, # This will now be interpreted as hours per 28-day period hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, exact_staff_count=None, overtime_percent=100 ): # Load data try: # Handle different types of csv_file input if csv_file is None: raise ValueError("No CSV file provided") if isinstance(csv_file, str): # It's a file path df = pd.read_csv(csv_file) elif hasattr(csv_file, 'name'): # It's an uploaded file object df = pd.read_csv(csv_file.name) elif hasattr(csv_file, 'decode'): # It's a bytes-like object content = csv_file.decode('utf-8') df = pd.read_csv(io.StringIO(content)) else: # Try direct read df = pd.read_csv(csv_file) except Exception as e: print(f"Error loading CSV: {e}") # Create a minimal DataFrame with default values df = pd.DataFrame({ 'Day': list(range(28)), 'Cycle': ['cycle1'] * 28, 'Beds': [10] * 28 }) # Print the loaded data for debugging print("Loaded CSV data:") print(df.head()) print(f"CSV shape: {df.shape}") # Convert beds_per_staff to float BEDS_PER_STAFF = float(beds_per_staff) # Create a dictionary to store demand data demand_dict = {} # Process the CSV data to extract demand information for _, row in df.iterrows(): day = row.get('Day', 0) cycle_name = row.get('Cycle', 'cycle1') beds = row.get('Beds', 0) # Extract cycle start time cycle_start = 0 if cycle_name == 'cycle1': cycle_start = 7 # 7 AM elif cycle_name == 'cycle2': cycle_start = 12 # 12 PM elif cycle_name == 'cycle3': cycle_start = 17 # 5 PM elif cycle_name == 'cycle4': cycle_start = 22 # 10 PM # Calculate required staff based on beds required_staff = max(1, int(beds / BEDS_PER_STAFF)) # Store in demand dictionary demand_dict[(day, cycle_start)] = { 'bed_count': beds, 'required_staff': required_staff, 'clinic_start': clinic_start, 'clinic_end': clinic_end } print(f"Created demand dictionary with {len(demand_dict)} entries") print(f"Sample demand data: {list(demand_dict.items())[:2]}") # Define cycle times cycle_times = { 'cycle1': (clinic_start, (clinic_start + hours_per_cycle) % 24), 'cycle2': ((clinic_start + hours_per_cycle) % 24, (clinic_start + 2 * hours_per_cycle) % 24), 'cycle3': ((clinic_start + 2 * hours_per_cycle) % 24, (clinic_start + 3 * hours_per_cycle) % 24), 'cycle4': ((clinic_start + 3 * hours_per_cycle) % 24, clinic_end) } print(f"Cycle times: {cycle_times}") # Rename the index column if necessary if df.columns[0] not in ['day', 'Day', 'DAY']: df = df.rename(columns={df.columns[0]: 'day'}) # Fill missing values for col in df.columns: if col.startswith('cycle'): df[col] = df[col].fillna(0) # Calculate clinic hours if clinic_end < clinic_start: clinic_hours = 24 - clinic_start + clinic_end else: clinic_hours = clinic_end - clinic_start # Get number of days in the dataset num_days = len(df) # Parameters STANDARD_PERIOD_DAYS = 30 # Standard 4-week period # Scale MAX_HOURS_PER_STAFF based on the ratio of actual days to standard period BASE_MAX_HOURS = float(max_hours_per_staff) # This is for a 28-day period MAX_HOURS_PER_STAFF = BASE_MAX_HOURS * (num_days / STANDARD_PERIOD_DAYS) # Log the adjustment for transparency original_results = f"Input max hours per staff (28-day period): {BASE_MAX_HOURS}\n" original_results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n\n" HOURS_PER_CYCLE = float(hours_per_cycle) REST_DAYS_PER_WEEK = int(rest_days_per_week) SHIFT_TYPES = [6, 8, 10, 12] # Standard shift types OVERLAP_TIME = float(overlap_time) CLINIC_START = int(clinic_start) CLINIC_END = int(clinic_end) CLINIC_HOURS = clinic_hours MAX_START_TIME_CHANGE = int(max_start_time_change) OVERTIME_ALLOWED = 1 + (overtime_percent / 100) # Convert percentage to multiplier # Calculate staff needed per cycle (beds/BEDS_PER_STAFF, rounded up) for col in df.columns: if col.startswith('cycle') and not col.endswith('_staff'): df[f'{col}_staff'] = np.ceil(df[col] / BEDS_PER_STAFF) # Get cycle names and number of cycles cycle_cols = [col for col in df.columns if col.startswith('cycle') and not col.endswith('_staff')] num_cycles = len(cycle_cols) # Get staff requirements max_staff_needed = max([df[f'{cycle}_staff'].max() for cycle in cycle_cols]) # Define possible shift start times shift_start_times = list(range(CLINIC_START, CLINIC_START + int(CLINIC_HOURS) - min(SHIFT_TYPES) + 1)) # Generate all possible shifts possible_shifts = [] for duration in SHIFT_TYPES: for start_time in shift_start_times: end_time = (start_time + duration) % 24 # Create a shift with its coverage of cycles shift = { 'id': f"{duration}hr_{start_time:02d}", 'start': start_time, 'end': end_time, 'duration': duration, 'cycles_covered': set() } # Determine which cycles this shift covers for cycle, (cycle_start, cycle_end) in cycle_times.items(): # Handle overnight cycles if cycle_end < cycle_start: # overnight cycle if start_time >= cycle_start or end_time <= cycle_end or (start_time < end_time and end_time > cycle_start): shift['cycles_covered'].add(cycle) else: # normal cycle shift_end = end_time if end_time > start_time else end_time + 24 cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24 # Check for overlap if not (shift_end <= cycle_start or start_time >= cycle_end_adj): shift['cycles_covered'].add(cycle) if shift['cycles_covered']: # Only add shifts that cover at least one cycle possible_shifts.append(shift) # Estimate minimum number of staff needed - more precise calculation total_staff_hours = 0 for _, row in df.iterrows(): for cycle in cycle_cols: total_staff_hours += row[f'{cycle}_staff'] * HOURS_PER_CYCLE # Calculate theoretical minimum staff with perfect utilization theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) # Add a small buffer for rest day constraints min_staff_estimate = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK))) # Use exact_staff_count if provided, otherwise estimate if exact_staff_count is not None and exact_staff_count > 0: # When exact staff count is provided, only create that many staff in the model estimated_staff = exact_staff_count num_staff_to_create = exact_staff_count # Only create exactly this many staff else: # Add some buffer for constraints like rest days and shift changes estimated_staff = max(min_staff_estimate, max_staff_needed + 1) num_staff_to_create = int(estimated_staff) # Create the estimated number of staff def optimize_schedule(num_staff, time_limit=600): try: # Create a binary linear programming model model = pl.LpProblem("Staff_Scheduling", pl.LpMinimize) # Decision variables x = pl.LpVariable.dicts("shift", [(s, d, shift['id']) for s in range(1, num_staff+1) for d in range(1, num_days+1) for shift in possible_shifts], cat='Binary') # Staff usage variable (1 if staff s is used at all, 0 otherwise) staff_used = pl.LpVariable.dicts("staff_used", range(1, num_staff+1), cat='Binary') # Total hours worked by all staff total_hours = pl.LpVariable("total_hours", lowBound=0) # CRITICAL CHANGE: Remove coverage violation variables - make coverage a hard constraint # CRITICAL CHANGE: Remove overtime variables - make overtime a hard constraint # Objective function now only focuses on minimizing staff count and total hours model += ( 10**10 * pl.lpSum(staff_used[s] for s in range(1, num_staff+1)) + 1 * total_hours ) # Link total_hours to the sum of all hours worked model += total_hours == pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] for s in range(1, num_staff+1) for d in range(1, num_days+1) for shift in possible_shifts) # Link staff_used variable with shift assignments for s in range(1, num_staff+1): model += pl.lpSum(x[(s, d, shift['id'])] for d in range(1, num_days+1) for shift in possible_shifts) <= num_days * staff_used[s] # If staff is used, they must work at least one shift model += pl.lpSum(x[(s, d, shift['id'])] for d in range(1, num_days+1) for shift in possible_shifts) >= staff_used[s] # Maintain staff ordering (to avoid symmetrical solutions) for s in range(1, num_staff): model += staff_used[s] >= staff_used[s+1] # Each staff works at most one shift per day for s in range(1, num_staff+1): for d in range(1, num_days+1): model += pl.lpSum(x[(s, d, shift['id'])] for shift in possible_shifts) <= 1 # Rest day constraints (with some flexibility) min_rest_days = max(1, REST_DAYS_PER_WEEK - 1) for s in range(1, num_staff+1): for w in range((num_days + 6) // 7): week_start = w*7 + 1 week_end = min(week_start + 6, num_days) days_in_this_week = week_end - week_start + 1 if days_in_this_week < 7: adjusted_rest_days = max(1, int(min_rest_days * days_in_this_week / 7)) else: adjusted_rest_days = min_rest_days model += pl.lpSum(x[(s, d, shift['id'])] for d in range(week_start, week_end+1) for shift in possible_shifts) <= days_in_this_week - adjusted_rest_days # HARD CONSTRAINT: No overtime allowed - strict limit at MAX_HOURS_PER_STAFF for s in range(1, num_staff+1): # Calculate total hours worked by this staff staff_hours = pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] for d in range(1, num_days+1) for shift in possible_shifts) # STRICT constraint: No overtime allowed model += staff_hours <= MAX_HOURS_PER_STAFF # HARD CONSTRAINT: Full coverage required for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame for cycle in cycle_cols: staff_needed = df.iloc[day_index][f'{cycle}_staff'] # Get all shifts that cover this cycle covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] # Staff assigned must be at least staff_needed - NO VIOLATIONS ALLOWED model += (pl.lpSum(x[(s, d, shift['id'])] for s in range(1, num_staff+1) for shift in covering_shifts) >= staff_needed) # HARD CONSTRAINT: Maximum 60 hours per week for each staff for s in range(1, num_staff+1): for w in range((num_days + 6) // 7): week_start = w*7 + 1 week_end = min(week_start + 6, num_days) # Calculate total hours worked by this staff in this week weekly_hours = pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] for d in range(week_start, week_end+1) for shift in possible_shifts) # STRICT constraint: No more than 60 hours per week model += weekly_hours <= 60 # Solve with extended time limit solver = pl.PULP_CBC_CMD(timeLimit=time_limit, msg=1, gapRel=0.01) # Tighter gap for better solutions model.solve(solver) # Check if a feasible solution was found if model.status == pl.LpStatusOptimal or model.status == pl.LpStatusNotSolved: # Extract the solution schedule = [] for s in range(1, num_staff+1): for d in range(1, num_days+1): for shift in possible_shifts: if pl.value(x[(s, d, shift['id'])]) == 1: # Find the shift details shift_details = next((sh for sh in possible_shifts if sh['id'] == shift['id']), None) schedule.append({ 'staff_id': s, 'day': d, 'shift_id': shift['id'], 'start': shift_details['start'], 'end': shift_details['end'], 'duration': shift_details['duration'], 'cycles_covered': list(shift_details['cycles_covered']) }) return schedule, model.objective.value() else: return None, None except Exception as e: print(f"Error in optimization: {e}") return None, None # Try to solve with estimated number of staff if exact_staff_count is not None and exact_staff_count > 0: # If exact staff count is specified, only try with that count staff_count = int(exact_staff_count) results = f"Using exactly {staff_count} staff as specified...\n" # Try to solve with exactly this many staff schedule, objective = optimize_schedule(staff_count) if schedule is None: results += f"Failed to find a feasible solution with exactly {staff_count} staff.\n" results += "Try increasing the staff count.\n" return results, None, None, None, None, None, None else: # Start from theoretical minimum and work up min_staff = max(1, int(theoretical_min_staff)) # Start from theoretical minimum max_staff = int(min_staff_estimate) + 5 # Allow some buffer results = f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n" results += f"Searching for minimum staff count starting from {min_staff}...\n" # Try each staff count from min to max for staff_count in range(min_staff, max_staff + 1): results += f"Trying with {staff_count} staff...\n" # Increase time limit for each attempt to give the solver more time time_limit = 300 + (staff_count - min_staff) * 100 # More time for larger staff counts schedule, objective = optimize_schedule(staff_count, time_limit) if schedule is not None: results += f"Found feasible solution with {staff_count} staff.\n" break if schedule is None: results += "Failed to find a feasible solution with the attempted staff counts.\n" results += "Try increasing the staff count manually or relaxing constraints.\n" return results, None, None, None, None, None, None results += f"Optimal solution found with {staff_count} staff\n" results += f"Total staff hours: {objective}\n" # Convert to DataFrame for analysis schedule_df = pd.DataFrame(schedule) # Analyze staff workload staff_hours = {} for s in range(1, staff_count+1): staff_shifts = schedule_df[schedule_df['staff_id'] == s] total_hours = staff_shifts['duration'].sum() staff_hours[s] = total_hours # After calculating staff hours, filter out staff with 0 hours before displaying active_staff_hours = {s: hours for s, hours in staff_hours.items() if hours > 0} results += "\nStaff Hours:\n" for staff_id, hours in active_staff_hours.items(): utilization = (hours / MAX_HOURS_PER_STAFF) * 100 results += f"Staff {staff_id}: {hours} hours ({utilization:.1f}% utilization)\n" # Add overtime information if hours > MAX_HOURS_PER_STAFF: overtime = hours - MAX_HOURS_PER_STAFF overtime_percent = (overtime / MAX_HOURS_PER_STAFF) * 100 results += f" Overtime: {overtime:.1f} hours ({overtime_percent:.1f}%)\n" # Use active_staff_hours for average utilization calculation active_staff_count = len(active_staff_hours) avg_utilization = sum(active_staff_hours.values()) / (active_staff_count * MAX_HOURS_PER_STAFF) * 100 results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n" # Check coverage for each day and cycle coverage_check = [] for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame day_schedule = schedule_df[schedule_df['day'] == d] for cycle in cycle_cols: required = df.iloc[day_index][f'{cycle}_staff'] # Count staff covering this cycle assigned = sum(1 for _, shift in day_schedule.iterrows() if cycle in shift['cycles_covered']) coverage_check.append({ 'day': d, 'cycle': cycle, 'required': required, 'assigned': assigned, 'satisfied': assigned >= required }) coverage_df = pd.DataFrame(coverage_check) satisfaction = coverage_df['satisfied'].mean() * 100 results += f"Coverage satisfaction: {satisfaction:.1f}%\n" # NEW: Check for partial coverage and fill gaps if satisfaction < 100 or True: # Always check for partial coverage results += "Checking for partial coverage and filling gaps...\n" print("\n\n==== STARTING GAP FILLING PROCESS ====") try: # Create a dictionary-based schedule for gap filling dict_schedule = {} for d in range(1, num_days+1): dict_schedule[d] = {} for cycle in cycle_cols: dict_schedule[d][cycle] = {} # Fill the dictionary schedule with current assignments for _, shift in schedule_df.iterrows(): staff_id = shift['staff_id'] day = shift['day'] start = int(shift['start']) # Ensure integer end = int(shift['end']) # Ensure integer for cycle in shift['cycles_covered']: if staff_id not in dict_schedule[day][cycle]: dict_schedule[day][cycle][staff_id] = [] dict_schedule[day][cycle][staff_id].append((start, end)) # Create staff objects for gap filling - use only existing staff class StaffMember: def __init__(self, staff_id): self.id = staff_id self.name = str(staff_id) # Only use staff that are already in the schedule active_staff_ids = sorted(schedule_df['staff_id'].unique()) staff_list = [StaffMember(s) for s in active_staff_ids] print(f"Created {len(staff_list)} staff members for gap filling (using only existing staff)") # Create demand dictionary for each day and cycle demand_dict = {} for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame demand_dict[d] = {} for cycle in cycle_cols: # Get the actual bed count and required staff bed_count = df.iloc[day_index][cycle] required_staff = df.iloc[day_index][f'{cycle}_staff'] demand_dict[d][cycle] = { 'beds': bed_count, 'required_staff': required_staff, 'beds_per_staff': BEDS_PER_STAFF } # Fill gaps print("Calling assign_uncovered_hours function with demand data...") updated_schedule = assign_uncovered_hours(staff_list, dict_schedule, cycle_times, demand_dict, BEDS_PER_STAFF) print("Returned from assign_uncovered_hours function") # Convert back to DataFrame format new_schedule = [] for day, day_schedule in updated_schedule.items(): for cycle, staff_shifts in day_schedule.items(): for staff_id, shifts in staff_shifts.items(): for start, end in shifts: # Find if this is a new shift or existing one existing = False for idx, row in schedule_df.iterrows(): if (row['staff_id'] == int(staff_id) and row['day'] == day and cycle in row['cycles_covered'] and row['start'] == start and row['end'] == end): existing = True break if not existing: # This is a new shift added to fill a gap duration = end - start if end > start else end + 24 - start new_schedule.append({ 'staff_id': int(staff_id), 'day': day, 'shift_id': f"gap_{start:02d}_{end:02d}", 'start': start, 'end': end, 'duration': duration, 'cycles_covered': [cycle] }) print(f"Added new shift: Staff {staff_id}, Day {day}, {start}:00-{end}:00, Cycle {cycle}") # Add new shifts to the schedule if new_schedule: print(f"Adding {len(new_schedule)} new shifts to the schedule") results += f"Added {len(new_schedule)} new shifts to fill coverage gaps\n" new_shifts_df = pd.DataFrame(new_schedule) schedule_df = pd.concat([schedule_df, new_shifts_df], ignore_index=True) # Force regeneration of CSV and Gantt chart print("Regenerating CSV and Gantt chart with updated schedule") # Recheck coverage after adding new shifts coverage_check = [] for d in range(1, num_days+1): day_index = d - 1 # 0-indexed for DataFrame day_schedule = schedule_df[schedule_df['day'] == d] for cycle in cycle_cols: required = df.iloc[day_index][f'{cycle}_staff'] # Count staff covering this cycle assigned = sum(1 for _, shift in day_schedule.iterrows() if cycle in shift['cycles_covered']) # Check for partial coverage cycle_start, cycle_end = cycle_times[cycle] cycle_duration = cycle_end - cycle_start if cycle_end > cycle_start else cycle_end + 24 - cycle_start # Create hourly timeline to check complete coverage timeline = [0] * cycle_duration # Mark covered hours for _, shift in day_schedule.iterrows(): if cycle in shift['cycles_covered']: start = int(shift['start']) end = int(shift['end']) # Handle overnight shifts if end < start: end += 24 # Calculate relative positions if cycle_end < cycle_start: # overnight cycle if start >= cycle_start: rel_start = start - cycle_start else: rel_start = start + 24 - cycle_start if end >= cycle_start: rel_end = end - cycle_start else: rel_end = end + 24 - cycle_start else: rel_start = max(0, start - cycle_start) rel_end = min(cycle_duration, end - cycle_start) # Ensure bounds rel_start = max(0, min(rel_start, cycle_duration)) rel_end = max(0, min(rel_end, cycle_duration)) # Mark hours for hour in range(rel_start, rel_end): if 0 <= hour < len(timeline): timeline[hour] += 1 # Check if all hours have enough staff fully_covered = all(count >= required for count in timeline) coverage_check.append({ 'day': d, 'cycle': cycle, 'required': required, 'assigned': assigned, 'satisfied': assigned >= required and fully_covered }) new_coverage_df = pd.DataFrame(coverage_check) new_satisfaction = new_coverage_df['satisfied'].mean() * 100 results += f"Coverage satisfaction after gap filling: {new_satisfaction:.1f}%\n" if new_satisfaction < 100: results += "Warning: Some coverage gaps still remain after filling!\n" still_unsatisfied = new_coverage_df[~new_coverage_df['satisfied']] results += still_unsatisfied.to_string() + "\n" else: results += "All coverage gaps successfully filled!\n" else: print("No new shifts were added") results += "No coverage gaps were found or all gaps could not be filled\n" print("==== FINISHED GAP FILLING PROCESS ====\n\n") except Exception as e: print(f"ERROR in gap filling process: {str(e)}") results += f"Error during gap filling: {str(e)}\n" import traceback traceback.print_exc() if satisfaction < 100: results += "Warning: Not all staffing requirements are met!\n" unsatisfied = coverage_df[~coverage_df['satisfied']] results += unsatisfied.to_string() + "\n" # Generate detailed schedule report detailed_schedule = "Detailed Schedule:\n" for d in range(1, num_days+1): day_schedule = schedule_df[schedule_df['day'] == d] day_schedule = day_schedule.sort_values(['start']) detailed_schedule += f"\nDay {d}:\n" for _, shift in day_schedule.iterrows(): start_hour = shift['start'] end_hour = shift['end'] start_str = am_pm(start_hour) end_str = am_pm(end_hour) cycles = ", ".join(shift['cycles_covered']) detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n" # Generate schedule visualization fig, ax = plt.subplots(figsize=(15, 8)) # Prepare schedule for plotting staff_days = {} for s in range(1, staff_count+1): staff_days[s] = [0] * num_days # 0 means off duty for _, shift in schedule_df.iterrows(): staff_id = shift['staff_id'] day = shift['day'] - 1 # 0-indexed staff_days[staff_id][day] = shift['duration'] # Plot the schedule for s, hours in staff_days.items(): ax.bar(range(1, num_days+1), hours, label=f'Staff {s}') ax.set_xlabel('Day') ax.set_ylabel('Shift Hours') ax.set_title('Staff Schedule') ax.set_xticks(range(1, num_days+1)) ax.legend() # Save the figure to a temporary file plot_path = None with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: plt.savefig(f.name) plt.close(fig) plot_path = f.name # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff gantt_path = create_gantt_chart(schedule_df, num_days, staff_count) # Convert schedule to CSV data schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False) # Create a temporary file and write the CSV data into it with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: temp_file.write(schedule_csv) schedule_csv_path = temp_file.name # Create staff assignment table staff_assignment_data = [] for d in range(1, num_days + 1): cycle_staff = {} for cycle in cycle_cols: # Get staff IDs assigned to this cycle on this day staff_ids = schedule_df[(schedule_df['day'] == d) & (schedule_df['cycles_covered'].apply(lambda x: cycle in x))]['staff_id'].tolist() cycle_staff[cycle] = len(staff_ids) staff_assignment_data.append([d] + [cycle_staff[cycle] for cycle in cycle_cols]) staff_assignment_df = pd.DataFrame(staff_assignment_data, columns=['Day'] + cycle_cols) # Create CSV files for download staff_assignment_csv_path = None with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: staff_assignment_df.to_csv(temp_file.name, index=False) staff_assignment_csv_path = temp_file.name # Return all required values in the correct order return results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path def convert_to_24h(time_str): """Converts AM/PM time string to 24-hour format.""" try: time_obj = datetime.strptime(time_str, "%I:00 %p") return time_obj.hour except ValueError: return None def gradio_wrapper( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, exact_staff_count=None, overtime_percent=100 ): # Convert AM/PM times to 24-hour format clinic_start = convert_to_24h(clinic_start_ampm) clinic_end = convert_to_24h(clinic_end_ampm) # Call the optimization function print(f"Starting optimization with gap filling enabled...") try: # Check if CSV file is provided if csv_file is None: print("Error: No CSV file provided") return None, None, None, None, None, None # Print file info for debugging if hasattr(csv_file, 'name'): print(f"CSV file name: {csv_file.name}") results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, exact_staff_count, overtime_percent ) print("Optimization completed successfully") # Create downloadable CSV files staff_download = create_download_link(staff_assignment_df, "staff_assignment.csv") if staff_assignment_df is not None else None schedule_download_file = create_download_link(schedule_df, "schedule.csv") if schedule_df is not None else None # Return all outputs return staff_assignment_df, gantt_path, schedule_csv_path, plot_path, staff_download, schedule_download_file except Exception as e: print(f"Error in gradio_wrapper: {str(e)}") import traceback traceback.print_exc() return None, None, None, None, None, None # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff def create_gantt_chart(schedule_df, num_days, staff_count): # Get the list of active staff IDs (staff who have at least one shift) active_staff_ids = sorted(schedule_df['staff_id'].unique()) active_staff_count = len(active_staff_ids) # Create a mapping from original staff ID to position in the chart staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)} # Create a larger figure with higher DPI plt.figure(figsize=(max(30, num_days * 1.5), max(12, active_staff_count * 0.8)), dpi=200) # Use a more sophisticated color palette - only for active staff colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count)) # Set a modern style plt.style.use('seaborn-v0_8-whitegrid') # Create a new axis with a slight background color ax = plt.gca() ax.set_facecolor('#f8f9fa') # Sort by staff then day schedule_df = schedule_df.sort_values(['staff_id', 'day']) # Plot Gantt chart - only for active staff for i, staff_id in enumerate(active_staff_ids): staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] y_pos = active_staff_count - i # Position based on index in active staff list # Add staff label with a background box ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold', ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray', boxstyle='round,pad=0.5', alpha=0.9)) # Add a subtle background for each staff row ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5) # Track shift positions to avoid label overlap shift_positions = [] for idx, shift in enumerate(staff_shifts.iterrows()): _, shift = shift day = shift['day'] start_hour = shift['start'] end_hour = shift['end'] duration = shift['duration'] # Format times for display start_ampm = am_pm(start_hour) end_ampm = am_pm(end_hour) # Calculate shift position shift_start_pos = day-1+start_hour/24 # Handle overnight shifts if end_hour < start_hour: # Overnight shift # First part of shift (until midnight) rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos, height=0.6, color=colors[i], alpha=0.9, edgecolor='black', linewidth=1, zorder=10) # Add gradient effect for r in rect1: r.set_edgecolor('black') r.set_linewidth(1) # Second part of shift (after midnight) rect2 = ax.barh(y_pos, end_hour/24, left=day, height=0.6, color=colors[i], alpha=0.9, edgecolor='black', linewidth=1, zorder=10) # Add gradient effect for r in rect2: r.set_edgecolor('black') r.set_linewidth(1) # For overnight shifts, we'll place the label in the first part if it's long enough shift_width = (24-start_hour)/24 if shift_width >= 0.1: # Only add label if there's enough space label_pos = shift_start_pos + shift_width/2 # Alternate labels above and below y_offset = 0.35 if idx % 2 == 0 else -0.35 # Add label with background for better readability label = f"{start_ampm}-{end_ampm}" text = ax.text(label_pos, y_pos + y_offset, label, ha='center', va='center', fontsize=9, fontweight='bold', color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, boxstyle='round,pad=0.3', edgecolor='gray'), zorder=20) shift_positions.append(label_pos) else: # Regular shift shift_width = duration/24 rect = ax.barh(y_pos, shift_width, left=shift_start_pos, height=0.6, color=colors[i], alpha=0.9, edgecolor='black', linewidth=1, zorder=10) # Add gradient effect for r in rect: r.set_edgecolor('black') r.set_linewidth(1) # Only add label if there's enough space if shift_width >= 0.1: label_pos = shift_start_pos + shift_width/2 # Alternate labels above and below y_offset = 0.35 if idx % 2 == 0 else -0.35 # Add label with background for better readability label = f"{start_ampm}-{end_ampm}" text = ax.text(label_pos, y_pos + y_offset, label, ha='center', va='center', fontsize=9, fontweight='bold', color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, boxstyle='round,pad=0.3', edgecolor='gray'), zorder=20) shift_positions.append(label_pos) # Add weekend highlighting with a more sophisticated look for day in range(1, num_days + 1): # Determine if this is a weekend (assuming day 1 is Monday) is_weekend = (day % 7 == 0) or (day % 7 == 6) # Saturday or Sunday if is_weekend: ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10) day_label = "Saturday" if day % 7 == 6 else "Sunday" ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000', fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round')) # Set x-axis ticks for each day with better formatting ax.set_xticks(np.arange(0.5, num_days, 1)) day_labels = [f"Day {d}" for d in range(1, num_days+1)] ax.set_xticklabels(day_labels, rotation=0, ha='center', fontsize=10) # Add vertical lines between days with better styling for day in range(1, num_days): ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5) # Set y-axis ticks for each staff ax.set_yticks(np.arange(1, active_staff_count+1)) ax.set_yticklabels([]) # Remove default labels as we've added custom ones # Set axis limits with some padding ax.set_xlim(-0.8, num_days) ax.set_ylim(0.5, active_staff_count + 0.5) # Add grid for hours (every 6 hours) with better styling for day in range(num_days): for hour in [6, 12, 18]: ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5) # Add small hour markers at the bottom hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM" ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7, color='#666666', rotation=90, alpha=0.7) # Add title and labels with more sophisticated styling plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333') plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333') # Add a legend for time reference with better styling time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10, fontweight='bold', color='#333333') time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight'] for i, time in enumerate(time_markers): plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555') # Remove spines for spine in ['top', 'right', 'left']: ax.spines[spine].set_visible(False) # Add a note about weekends with better styling weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12, color='#cc0000', fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round')) # Add a subtle border around the entire chart plt.box(False) # Save the Gantt chart with high quality with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: plt.tight_layout() plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white') plt.close() return f.name def is_hour_within_clinic_hours(hour, clinic_start, clinic_end): """ Check if an hour is within clinic operating hours. Args: hour: Hour to check (0-23) clinic_start: Clinic start hour (0-23) clinic_end: Clinic end hour (0-23) Returns: bool: True if hour is within clinic hours, False otherwise """ try: # Convert inputs to integers if they're not already hour = int(hour) clinic_start = int(clinic_start) clinic_end = int(clinic_end) # Handle overnight clinic (end time is less than start time) if clinic_end < clinic_start: return hour >= clinic_start or hour < clinic_end else: return hour >= clinic_start and hour < clinic_end except Exception as e: print(f"WARNING: Error in is_hour_within_clinic_hours: {e}") # Default to True if there's an error return True def is_cycle_within_clinic_hours(cycle_start, cycle_end, clinic_start, clinic_end): """ Check if a cycle overlaps with clinic operating hours. Args: cycle_start: Cycle start hour (0-23) cycle_end: Cycle end hour (0-23) clinic_start: Clinic start hour (0-23) clinic_end: Clinic end hour (0-23) Returns: bool: True if cycle overlaps with clinic hours, False otherwise """ try: # Convert inputs to integers if they're not already cycle_start = int(cycle_start) cycle_end = int(cycle_end) clinic_start = int(clinic_start) clinic_end = int(clinic_end) # If cycle is overnight (end time is less than start time) if cycle_end < cycle_start: # If clinic is also overnight if clinic_end < clinic_start: # There will always be some overlap return True else: # Check if any part of the cycle is within clinic hours return not (cycle_end <= clinic_start or cycle_start >= clinic_end) else: # If clinic is overnight if clinic_end < clinic_start: # Check if any part of the cycle is within clinic hours return not (cycle_start >= clinic_end and cycle_end <= clinic_start) else: # Check if any part of the cycle is within clinic hours return not (cycle_end <= clinic_start or cycle_start >= clinic_end) except Exception as e: print(f"WARNING: Error in is_cycle_within_clinic_hours: {e}") # Default to True if there's an error return True def fill_coverage_gaps(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff): """ A completely rewritten function to fill coverage gaps by assigning additional shifts to staff. This function is specifically designed to work with the dictionary structure in the code. Args: staff_list: List of staff members schedule: Current schedule (dictionary format) cycle_hours: Hours per cycle demand_dict: Dictionary with demand data beds_per_staff: Number of beds per staff Returns: Updated schedule with gaps filled """ print("\n=== FILLING COVERAGE GAPS (DIRECT APPROACH) ===") # Check if schedule is a dictionary or DataFrame is_dict_schedule = isinstance(schedule, dict) print(f"Schedule format: {'Dictionary' if is_dict_schedule else 'DataFrame'}") if not is_dict_schedule: print("WARNING: Expected dictionary schedule but got DataFrame. Converting to dictionary.") # Convert DataFrame to dictionary if needed dict_schedule = {} for _, row in schedule.iterrows(): day = row['Day'] if day not in dict_schedule: dict_schedule[day] = [] dict_schedule[day].append({ 'staff_id': row['Staff'], 'start': row['Start'], 'end': row['End'], 'type': row.get('Type', 'Regular') }) schedule = dict_schedule # Create a copy of the schedule to avoid modifying the original updated_schedule = {k: v.copy() for k, v in schedule.items()} # Print some debug info about the schedule structure print(f"Schedule has {len(updated_schedule)} days") print(f"Schedule keys: {list(updated_schedule.keys())[:5]} (showing first 5)") # Check if day keys are strings or integers day_keys_are_strings = False if updated_schedule: sample_key = next(iter(updated_schedule)) if isinstance(sample_key, str): day_keys_are_strings = True print("Day keys are strings") else: print("Day keys are integers") # Try to print a sample shift if available if updated_schedule: sample_day = next(iter(updated_schedule)) print(f"Sample day {sample_day} has {len(updated_schedule[sample_day])} shifts") if updated_schedule[sample_day]: try: print(f"First shift on day {sample_day}: {updated_schedule[sample_day][0]}") except (IndexError, KeyError) as e: print(f"Error accessing first shift: {e}") print(f"Shifts for day {sample_day}: {updated_schedule[sample_day]}") # Debug the demand_dict structure print("\nDemand Dictionary Structure:") print(f"Demand dict has {len(demand_dict)} entries") if demand_dict: print(f"Demand dict keys (first 5): {list(demand_dict.keys())[:5]}") sample_key = next(iter(demand_dict)) print(f"Sample demand data for key {sample_key}: {demand_dict[sample_key]}") else: print("WARNING: Demand dictionary is empty!") # Extract the actual cycles from cycle_hours if it's a dictionary cycles = [] if isinstance(cycle_hours, dict): print(f"Cycle hours dictionary: {cycle_hours}") # Check if it has the format {'cycle1': (start, end), ...} for key, value in cycle_hours.items(): if key.startswith('cycle') and isinstance(value, tuple) and len(value) == 2: cycles.append(value) # If we couldn't extract cycles, try to get them from demand_dict if not cycles: for key in demand_dict: if isinstance(key, tuple) and len(key) == 2: day, cycle_start = key if cycle_start not in [c[0] for c in cycles]: # Find the cycle end cycle_end = None for other_key in demand_dict: if isinstance(other_key, tuple) and len(other_key) == 2: other_day, other_cycle_start = other_key if other_day == day and other_cycle_start > cycle_start: if cycle_end is None or other_cycle_start < cycle_end: cycle_end = other_cycle_start # If we couldn't find the next cycle, assume it's 5 hours later if cycle_end is None: cycle_end = (cycle_start + 5) % 24 cycles.append((cycle_start, cycle_end)) # If we still don't have cycles, use default ones if not cycles: # Use the clinic hours to determine cycles clinic_start = 7 # Default clinic_end = 3 # Default # Try to extract from demand_dict for key, value in demand_dict.items(): if isinstance(value, dict): if 'clinic_start' in value: clinic_start = value['clinic_start'] if 'clinic_end' in value: clinic_end = value['clinic_end'] break # Create 4 equal cycles covering the clinic hours if clinic_end < clinic_start: # Overnight clinic total_hours = (24 - clinic_start) + clinic_end else: total_hours = clinic_end - clinic_start cycle_length = max(1, total_hours // 4) cycles = [] for i in range(4): cycle_start = (clinic_start + (i * cycle_length)) % 24 cycle_end = (cycle_start + cycle_length) % 24 cycles.append((cycle_start, cycle_end)) # Sort cycles by start time cycles.sort(key=lambda x: x[0]) print(f"Using cycles: {cycles}") # Extract staff IDs from staff_list staff_ids = [] for staff in staff_list: if hasattr(staff, 'id'): staff_ids.append(staff.id) else: staff_ids.append(staff) print(f"Staff IDs: {staff_ids}") # Calculate current monthly hours for each staff directly from the schedule monthly_hours = {staff_id: 0 for staff_id in staff_ids} # First, convert the nested dictionary schedule to a flat list of shifts for easier processing all_shifts = [] for day_key, day_data in updated_schedule.items(): day = int(day_key) if isinstance(day_key, str) and day_key.isdigit() else day_key # Handle different schedule formats if isinstance(day_data, dict): # Format: {day: {cycle: {staff_id: [(start, end), ...], ...}, ...}, ...} for cycle_key, cycle_data in day_data.items(): if isinstance(cycle_data, dict): for staff_id, shifts in cycle_data.items(): if staff_id in staff_ids: for shift in shifts: if isinstance(shift, tuple) and len(shift) == 2: start_hour, end_hour = shift all_shifts.append({ 'day': day, 'staff_id': staff_id, 'start': start_hour, 'end': end_hour }) elif isinstance(day_data, list): # Format: {day: [{staff_id: ..., start: ..., end: ...}, ...], ...} for shift in day_data: if isinstance(shift, dict): staff_id = shift.get('staff_id') if staff_id in staff_ids: start_hour = shift.get('start') end_hour = shift.get('end') if start_hour is not None and end_hour is not None: all_shifts.append({ 'day': day, 'staff_id': staff_id, 'start': start_hour, 'end': end_hour }) # Calculate hours from the flat list of shifts for shift in all_shifts: staff_id = shift['staff_id'] start_hour = shift['start'] end_hour = shift['end'] # Calculate shift hours if end_hour < start_hour: # Overnight shift shift_hours = (24 - start_hour) + end_hour else: shift_hours = end_hour - start_hour monthly_hours[staff_id] += shift_hours # Print monthly hours for each staff for staff_id, hours in monthly_hours.items(): print(f"Staff {staff_id} current monthly hours: {hours}") # Sort staff by monthly hours (lowest first) sorted_staff_ids = sorted(staff_ids, key=lambda x: monthly_hours.get(x, 0)) print(f"Staff sorted by monthly hours: {sorted_staff_ids}") # Track if any new shifts were added new_shifts_added = False # Process each day for day in range(28): # Convert day to the format used in the schedule day_key = str(day) if day_keys_are_strings else day # For each day, create a timeline of staff coverage timeline = [0] * 24 # Fill the timeline based on current schedule if day_key in updated_schedule: day_data = updated_schedule[day_key] # Handle different schedule formats if isinstance(day_data, dict): # Format: {day: {cycle: {staff_id: [(start, end), ...], ...}, ...}, ...} for cycle_key, cycle_data in day_data.items(): if isinstance(cycle_data, dict): for staff_id, shifts in cycle_data.items(): for shift in shifts: if isinstance(shift, tuple) and len(shift) == 2: start_hour, end_hour = shift # Handle overnight shifts if end_hour < start_hour: # Add staff for hours until midnight for i in range(start_hour, 24): timeline[i] += 1 # Add staff for hours after midnight for i in range(0, end_hour): timeline[i] += 1 else: # Add staff for all hours in the shift for i in range(start_hour, end_hour): timeline[i] += 1 elif isinstance(day_data, list): # Format: {day: [{staff_id: ..., start: ..., end: ...}, ...], ...} for shift in day_data: if isinstance(shift, dict): start_hour = shift.get('start') end_hour = shift.get('end') if start_hour is not None and end_hour is not None: # Handle overnight shifts if end_hour < start_hour: # Add staff for hours until midnight for i in range(start_hour, 24): timeline[i] += 1 # Add staff for hours after midnight for i in range(0, end_hour): timeline[i] += 1 else: # Add staff for all hours in the shift for i in range(start_hour, end_hour): timeline[i] += 1 # Print the timeline for this day print(f"\nDay {day} timeline: {timeline}") # Check each cycle for understaffing for cycle_start, cycle_end in cycles: # Get the required staff for this cycle required_staff = 0 bed_count = 0 # Try different demand data formats if day in demand_dict: day_demand = demand_dict[day] # Format: {day: {'cycle1': {'beds': ..., 'required_staff': ...}, ...}, ...} if isinstance(day_demand, dict): cycle_key = None for key in day_demand.keys(): if key == f'cycle{cycles.index((cycle_start, cycle_end)) + 1}': cycle_key = key break if cycle_key and cycle_key in day_demand: cycle_demand = day_demand[cycle_key] if isinstance(cycle_demand, dict): bed_count = cycle_demand.get('beds', 0) required_staff = cycle_demand.get('required_staff', 0) print(f"Found demand data for day {day}, cycle {cycle_key}: beds={bed_count}, required_staff={required_staff}") # Try tuple format: (day, cycle_start): {'bed_count': ..., 'required_staff': ...} demand_key = (day, cycle_start) if demand_key in demand_dict: demand_data = demand_dict[demand_key] if isinstance(demand_data, dict): bed_count = demand_data.get('bed_count', 0) required_staff = demand_data.get('required_staff', 0) print(f"Found demand data for day {day}, cycle {cycle_start}: bed_count={bed_count}, required_staff={required_staff}") # If we still don't have required_staff, calculate it from bed_count if required_staff == 0 and bed_count > 0: required_staff = max(1, int(bed_count / beds_per_staff)) print(f"Calculated required_staff from bed_count: {required_staff}") # Skip if no staff required if required_staff == 0: print(f"No staff required for day {day}, cycle {cycle_start}-{cycle_end}") continue print(f"\nChecking day {day}, cycle {cycle_start}-{cycle_end}") print(f"Required staff: {required_staff}") # Check each hour in the cycle for understaffing understaffed_hours = [] # Handle overnight cycles if cycle_end < cycle_start: hour_range = list(range(cycle_start, 24)) + list(range(0, cycle_end)) else: hour_range = range(cycle_start, cycle_end) for hour in hour_range: current_staff = timeline[hour] print(f"Hour {hour}: {current_staff} staff (need {required_staff})") if current_staff < required_staff: understaffed_hours.append(hour) if not understaffed_hours: print(f"No understaffing in this cycle") continue print(f"Understaffed hours: {understaffed_hours}") # Group consecutive hours hour_groups = [] current_group = [understaffed_hours[0]] for i in range(1, len(understaffed_hours)): if understaffed_hours[i] == (understaffed_hours[i-1] + 1) % 24: current_group.append(understaffed_hours[i]) else: hour_groups.append(current_group) current_group = [understaffed_hours[i]] if current_group: hour_groups.append(current_group) print(f"Grouped into: {hour_groups}") # Try to assign each group to available staff for group in hour_groups: start_hour = group[0] end_hour = (group[-1] + 1) % 24 # End hour is exclusive print(f"Trying to assign period on day {day}: {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00") # Calculate how many additional staff are needed # Ensure we're using integers for the range function current_min_staff = min([timeline[h] for h in group]) # Convert required_staff to int to avoid numpy.float64 issues required_staff_int = int(required_staff) staff_needed = max(0, required_staff_int - current_min_staff) print(f"Need {staff_needed} additional staff (required={required_staff_int}, current={current_min_staff})") # Try to assign to staff with lowest monthly hours for _ in range(staff_needed): assigned = False for staff_id in sorted_staff_ids: print(f"Checking if staff {staff_id} is available") # Check if staff is available for this period is_available = True # Check for conflicts with existing shifts for shift in all_shifts: if shift['staff_id'] == staff_id: # Check if shift is on the same day if shift['day'] == day: shift_start = shift['start'] shift_end = shift['end'] # Check for overlap if shift_end < shift_start: # Overnight shift # New shift overlaps with first part of overnight shift if start_hour < shift_end: is_available = False print(f" Conflict: Overlaps with first part of overnight shift {shift_start}-{shift_end}") break # New shift overlaps with second part of overnight shift if end_hour > shift_start: is_available = False print(f" Conflict: Overlaps with second part of overnight shift {shift_start}-{shift_end}") break else: # Regular shift # Simple overlap check if start_hour < shift_end and end_hour > shift_start: is_available = False print(f" Conflict: Overlaps with regular shift {shift_start}-{shift_end}") break # Check if shift is on the previous day and extends into this day elif shift['day'] == (day - 1) % 28: shift_start = shift['start'] shift_end = shift['end'] # Only check overnight shifts if shift_end < shift_start and start_hour < shift_end: is_available = False print(f" Conflict: Overlaps with previous day's overnight shift {shift_start}-{shift_end}") break # Check if this would be an overnight shift that conflicts with next day elif shift['day'] == (day + 1) % 28 and end_hour < start_hour: shift_start = shift['start'] # Check if overnight shift extends into next day's shift if end_hour > shift_start: is_available = False print(f" Conflict: Overnight shift would extend into next day's shift at {shift_start}") break if is_available: # Create the new shift new_shift = { 'day': day, 'staff_id': staff_id, 'start': start_hour, 'end': end_hour } # Add to all_shifts for future conflict checking all_shifts.append(new_shift) # Add the assignment to the schedule if day_key not in updated_schedule: # Create a new day entry in the format that matches the rest of the schedule if any(isinstance(updated_schedule.get(k), dict) for k in updated_schedule): # Dictionary format cycle_idx = next((i for i, (cs, ce) in enumerate(cycles) if cs == cycle_start), 0) cycle_key = f"cycle{cycle_idx + 1}" updated_schedule[day_key] = {cycle_key: {staff_id: [(start_hour, end_hour)]}} else: # List format updated_schedule[day_key] = [{ 'staff_id': staff_id, 'start': start_hour, 'end': end_hour, 'type': 'Gap Fill' }] else: # Add to existing day entry day_data = updated_schedule[day_key] if isinstance(day_data, dict): # Dictionary format cycle_idx = next((i for i, (cs, ce) in enumerate(cycles) if cs == cycle_start), 0) cycle_key = f"cycle{cycle_idx + 1}" if cycle_key not in day_data: day_data[cycle_key] = {} if staff_id not in day_data[cycle_key]: day_data[cycle_key][staff_id] = [] day_data[cycle_key][staff_id].append((start_hour, end_hour)) elif isinstance(day_data, list): # List format day_data.append({ 'staff_id': staff_id, 'start': start_hour, 'end': end_hour, 'type': 'Gap Fill' }) # Update monthly hours if end_hour < start_hour: shift_hours = (24 - start_hour) + end_hour else: shift_hours = end_hour - start_hour monthly_hours[staff_id] = monthly_hours.get(staff_id, 0) + shift_hours # Update timeline if end_hour < start_hour: for i in range(start_hour, 24): timeline[i] += 1 for i in range(0, end_hour): timeline[i] += 1 else: for i in range(start_hour, end_hour): timeline[i] += 1 print(f"Assigned staff {staff_id} to cover hours {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00 on day {day}") print(f"Updated timeline: {timeline}") # Re-sort staff by updated monthly hours sorted_staff_ids = sorted(staff_ids, key=lambda x: monthly_hours.get(x, 0)) assigned = True new_shifts_added = True break if not assigned: print(f"Could not find available staff to cover hours {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00 on day {day}") if new_shifts_added: print("Successfully added new shifts to fill gaps") else: print("No new shifts were added") return updated_schedule def assign_uncovered_hours(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff): """ A wrapper around fill_coverage_gaps for backward compatibility. Args: staff_list: List of staff members schedule: Current schedule cycle_hours: Hours per cycle demand_dict: Dictionary with demand data beds_per_staff: Number of beds per staff Returns: Updated schedule with gaps filled """ print("\n=== ASSIGNING UNCOVERED HOURS ===") # Debug the demand_dict structure print("\nDemand Dictionary in assign_uncovered_hours:") print(f"Demand dict has {len(demand_dict)} entries") print(f"Demand dict keys (first 5): {list(demand_dict.keys())[:5]}") # Print a sample of the demand data if demand_dict: sample_key = next(iter(demand_dict)) print(f"Sample demand data for key {sample_key}: {demand_dict[sample_key]}") try: # Call the simplified fill_coverage_gaps function updated_schedule = fill_coverage_gaps(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff) print("Returned from fill_coverage_gaps function") return updated_schedule except Exception as e: import traceback print(f"ERROR in assign_uncovered_hours: {e}") print(traceback.format_exc()) # Return the original schedule if there's an error return schedule def is_staff_available_dict(staff_id, day, start_hour, end_hour, schedule): """ Check if a staff member is available for a shift on a given day and time range. For dictionary-based schedules. Args: staff_id: Staff member ID day: Day to check start_hour: Start hour of the shift end_hour: End hour of the shift schedule: Current schedule (dictionary format) Returns: bool: True if staff is available, False otherwise """ # Debug information print(f"Checking availability for staff {staff_id} on day {day} from {start_hour} to {end_hour}") # Check if staff has a shift on this day if day in schedule: day_shifts = [] for shift in schedule[day]: # Check if shift is a dictionary if isinstance(shift, dict) and shift.get('staff_id') == staff_id: day_shifts.append(shift) # Handle string representation or other formats elif hasattr(shift, '__str__'): shift_str = str(shift) if str(staff_id) in shift_str: # Try to extract start and end times try: if '-' in shift_str: parts = shift_str.split('-') shift_start = int(parts[0].strip()) shift_end = int(parts[1].strip()) day_shifts.append({ 'start': shift_start, 'end': shift_end }) except (ValueError, IndexError): print(f"WARNING: Could not parse shift: {shift}") # If end_hour is less than start_hour, it means the shift goes into the next day overnight_shift = end_hour < start_hour for shift in day_shifts: shift_start = shift.get('start') shift_end = shift.get('end') if shift_start is None or shift_end is None: continue # Check for overnight shifts in the existing schedule shift_overnight = shift_end < shift_start # Case 1: Both shifts are within the same day if not overnight_shift and not shift_overnight: # Check if there's any overlap if not (end_hour <= shift_start or start_hour >= shift_end): print(f" Conflict found: Existing shift from {shift_start} to {shift_end}") return False # Case 2: New shift is overnight, existing shift is not elif overnight_shift and not shift_overnight: # Check if existing shift overlaps with either part of the overnight shift if not (shift_end <= start_hour): # Existing shift ends before overnight shift starts print(f" Conflict found: Existing shift from {shift_start} to {shift_end} overlaps with overnight shift") return False # Case 3: Existing shift is overnight, new shift is not elif not overnight_shift and shift_overnight: # Check if new shift overlaps with either part of the existing overnight shift if not (end_hour <= shift_start): # New shift ends before existing overnight shift starts print(f" Conflict found: New shift overlaps with existing overnight shift from {shift_start} to {shift_end}") return False # Case 4: Both shifts are overnight else: # both are overnight shifts # For overnight shifts, they will always overlap in some way print(f" Conflict found: Both are overnight shifts") return False # Check if staff has a shift on the previous day that extends into this day if not overnight_shift: # Only need to check this for regular shifts prev_day = (day - 1) % 28 # Assuming 28-day cycle if prev_day in schedule: prev_day_shifts = [] for shift in schedule[prev_day]: # Check if shift is a dictionary if isinstance(shift, dict) and shift.get('staff_id') == staff_id: prev_day_shifts.append(shift) # Handle string representation or other formats elif hasattr(shift, '__str__'): shift_str = str(shift) if str(staff_id) in shift_str: # Try to extract start and end times try: if '-' in shift_str: parts = shift_str.split('-') shift_start = int(parts[0].strip()) shift_end = int(parts[1].strip()) prev_day_shifts.append({ 'start': shift_start, 'end': shift_end }) except (ValueError, IndexError): print(f"WARNING: Could not parse shift: {shift}") for shift in prev_day_shifts: shift_start = shift.get('start') shift_end = shift.get('end') if shift_start is None or shift_end is None: continue # If the previous day's shift extends to the next day (overnight shift) if shift_end < shift_start: # Check if there's overlap with the beginning of the new shift if start_hour < shift_end: print(f" Conflict found: Previous day's overnight shift extends to {shift_end}") return False # Check if staff has a shift on the next day that would be affected by an overnight shift if overnight_shift: next_day = (day + 1) % 28 # Assuming 28-day cycle if next_day in schedule: next_day_shifts = [] for shift in schedule[next_day]: # Check if shift is a dictionary if isinstance(shift, dict) and shift.get('staff_id') == staff_id: next_day_shifts.append(shift) # Handle string representation or other formats elif hasattr(shift, '__str__'): shift_str = str(shift) if str(staff_id) in shift_str: # Try to extract start and end times try: if '-' in shift_str: parts = shift_str.split('-') shift_start = int(parts[0].strip()) shift_end = int(parts[1].strip()) next_day_shifts.append({ 'start': shift_start, 'end': shift_end }) except (ValueError, IndexError): print(f"WARNING: Could not parse shift: {shift}") for shift in next_day_shifts: shift_start = shift.get('start') shift_end = shift.get('end') if shift_start is None or shift_end is None: continue # Check if there's overlap with the end of the overnight shift if end_hour > shift_start: print(f" Conflict found: Next day's shift starts at {shift_start} before overnight shift ends") return False print(f" Staff {staff_id} is available for this shift") return True def is_staff_available(staff, day, start_hour, end_hour, schedule): """ Check if a staff member is available for a shift on a given day and time range. For DataFrame-based schedules. Args: staff: Staff member ID day: Day to check start_hour: Start hour of the shift end_hour: End hour of the shift schedule: Current schedule (DataFrame format) Returns: bool: True if staff is available, False otherwise """ # Debug information print(f"Checking availability for staff {staff} on day {day} from {start_hour} to {end_hour}") # Get all shifts for this staff member staff_shifts = schedule[schedule['Staff'] == staff] # Check if staff has a shift on this day day_shifts = staff_shifts[staff_shifts['Day'] == day] # If end_hour is less than start_hour, it means the shift goes into the next day overnight_shift = end_hour < start_hour for _, shift in day_shifts.iterrows(): shift_start = shift['Start'] shift_end = shift['End'] # Check for overnight shifts in the existing schedule shift_overnight = shift_end < shift_start # Case 1: Both shifts are within the same day if not overnight_shift and not shift_overnight: # Check if there's any overlap if not (end_hour <= shift_start or start_hour >= shift_end): print(f" Conflict found: Existing shift from {shift_start} to {shift_end}") return False # Case 2: New shift is overnight, existing shift is not elif overnight_shift and not shift_overnight: # Check if existing shift overlaps with either part of the overnight shift if not (shift_end <= start_hour): # Existing shift ends before overnight shift starts print(f" Conflict found: Existing shift from {shift_start} to {shift_end} overlaps with overnight shift") return False # Case 3: Existing shift is overnight, new shift is not elif not overnight_shift and shift_overnight: # Check if new shift overlaps with either part of the existing overnight shift if not (end_hour <= shift_start): # New shift ends before existing overnight shift starts print(f" Conflict found: New shift overlaps with existing overnight shift from {shift_start} to {shift_end}") return False # Case 4: Both shifts are overnight else: # both are overnight shifts # For overnight shifts, they will always overlap in some way print(f" Conflict found: Both are overnight shifts") return False # Check if staff has a shift on the previous day that extends into this day if not overnight_shift: # Only need to check this for regular shifts prev_day = (day - 1) % 28 # Assuming 28-day cycle prev_day_shifts = staff_shifts[staff_shifts['Day'] == prev_day] for _, shift in prev_day_shifts.iterrows(): shift_start = shift['Start'] shift_end = shift['End'] # If the previous day's shift extends to the next day (overnight shift) if shift_end < shift_start: # Check if there's overlap with the beginning of the new shift if start_hour < shift_end: print(f" Conflict found: Previous day's overnight shift extends to {shift_end}") return False # Check if staff has a shift on the next day that would be affected by an overnight shift if overnight_shift: next_day = (day + 1) % 28 # Assuming 28-day cycle next_day_shifts = staff_shifts[staff_shifts['Day'] == next_day] for _, shift in next_day_shifts.iterrows(): shift_start = shift['Start'] shift_end = shift['End'] # Check if there's overlap with the end of the overnight shift if end_hour > shift_start: print(f" Conflict found: Next day's shift starts at {shift_start} before overnight shift ends") return False print(f" Staff {staff} is available for this shift") return True # Define Gradio UI am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] with gr.Blocks(title="Staff Scheduling Optimizer", css=""" #staff_assignment_table { width: 100% !important; } #csv_schedule { width: 100% !important; } .container { max-width: 100% !important; padding: 0 !important; } .download-btn { margin-top: 10px !important; } """) as iface: gr.Markdown("# Staff Scheduling Optimizer") gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.") with gr.Row(): # LEFT PANEL - Inputs with gr.Column(scale=1): gr.Markdown("### Input Parameters") # Input parameters csv_input = gr.File(label="Upload CSV File", file_types=[".csv"]) beds_per_staff = gr.Number(label="Beds per Staff", value=3, precision=1) max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160, precision=0) hours_per_cycle = gr.Number(label="Hours per Cycle", value=5, precision=1) rest_days_per_week = gr.Number(label="Rest Days per Week", value=2, precision=0) clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="07:00 AM") clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="10:00 PM") overlap_time = gr.Number(label="Overlap Time", value=0.5, precision=1) max_start_time_change = gr.Number(label="Max Start Time Change", value=1, precision=0) exact_staff_count = gr.Number(label="Exact Staff Count (optional) (leave blank)", precision=0) overtime_percent = gr.Number(label="Overtime Allowed (%)", value=0, precision=0) optimize_btn = gr.Button("Start Scheduling", variant="primary") # RIGHT PANEL - Outputs with gr.Column(scale=2): gr.Markdown("### Results") # Tabs for different outputs - reordered with gr.Tabs(): with gr.TabItem("Detailed Schedule"): with gr.Row(): csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule") with gr.Row(): schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True) with gr.TabItem("Gantt Chart"): gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart") with gr.TabItem("Staff Coverage by Cycle"): with gr.Row(): staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table") with gr.Row(): staff_download_file = gr.File(label="Download Coverage Table", visible=True) with gr.TabItem("Hours Visualization"): schedule_visualization = gr.Image(label="Hours by Day Visualization", elem_id="schedule_visualization") # Define download functions def create_download_link(df, filename="data.csv"): """Create a CSV download link for a dataframe""" if df is None or df.empty: return None csv_data = df.to_csv(index=False) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: f.write(csv_data) return f.name # Update the optimize_and_display function def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, exact_staff_count, overtime_percent): try: # Convert AM/PM times to 24-hour format clinic_start = convert_to_24h(clinic_start_ampm) clinic_end = convert_to_24h(clinic_end_ampm) # Call the optimization function results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, exact_staff_count, overtime_percent ) # Return the results return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path except Exception as e: # If there's an error in the optimization process, return a meaningful error message empty_staff_df = pd.DataFrame(columns=["Day"]) error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." # Return error in the first output return empty_staff_df, None, None, None, None, None # Connect the button to the optimization function optimize_btn.click( fn=optimize_and_display, inputs=[ csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, exact_staff_count, overtime_percent ], outputs=[ staff_assignment_table, gantt_chart, csv_schedule, schedule_visualization, staff_download_file, schedule_download_file ] ) # Launch the Gradio app iface.launch(share=True)