Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import pulp as pl # Changed from PuLP to pulp | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from itertools import product | |
| import io | |
| import base64 | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| def am_pm(hour): | |
| """Converts 24-hour time to AM/PM format.""" | |
| period = "AM" | |
| if hour >= 12: | |
| period = "PM" | |
| if hour > 12: | |
| hour -= 12 | |
| elif hour == 0: | |
| hour = 12 # Midnight | |
| return f"{int(hour):02d}:00 {period}" | |
| def show_dataframe(csv_path): | |
| """Reads a CSV file and returns a Pandas DataFrame.""" | |
| try: | |
| df = pd.read_csv(csv_path) | |
| return df | |
| except Exception as e: | |
| return f"Error loading CSV: {e}" | |
| def optimize_staffing( | |
| csv_file, | |
| beds_per_staff, | |
| max_hours_per_staff, # This will now be interpreted as hours per 28-day period | |
| hours_per_cycle, | |
| rest_days_per_week, | |
| clinic_start, | |
| clinic_end, | |
| overlap_time, | |
| max_start_time_change, | |
| exact_staff_count=None, | |
| overtime_percent=100 | |
| ): | |
| # Load data | |
| try: | |
| # Handle different types of csv_file input | |
| if csv_file is None: | |
| raise ValueError("No CSV file provided") | |
| if isinstance(csv_file, str): | |
| # It's a file path | |
| df = pd.read_csv(csv_file) | |
| elif hasattr(csv_file, 'name'): | |
| # It's an uploaded file object | |
| df = pd.read_csv(csv_file.name) | |
| elif hasattr(csv_file, 'decode'): | |
| # It's a bytes-like object | |
| content = csv_file.decode('utf-8') | |
| df = pd.read_csv(io.StringIO(content)) | |
| else: | |
| # Try direct read | |
| df = pd.read_csv(csv_file) | |
| except Exception as e: | |
| print(f"Error loading CSV: {e}") | |
| # Create a minimal DataFrame with default values | |
| df = pd.DataFrame({ | |
| 'Day': list(range(28)), | |
| 'Cycle': ['cycle1'] * 28, | |
| 'Beds': [10] * 28 | |
| }) | |
| # Print the loaded data for debugging | |
| print("Loaded CSV data:") | |
| print(df.head()) | |
| print(f"CSV shape: {df.shape}") | |
| # Convert beds_per_staff to float | |
| BEDS_PER_STAFF = float(beds_per_staff) | |
| # Create a dictionary to store demand data | |
| demand_dict = {} | |
| # Process the CSV data to extract demand information | |
| for _, row in df.iterrows(): | |
| day = row.get('Day', 0) | |
| cycle_name = row.get('Cycle', 'cycle1') | |
| beds = row.get('Beds', 0) | |
| # Extract cycle start time | |
| cycle_start = 0 | |
| if cycle_name == 'cycle1': | |
| cycle_start = 7 # 7 AM | |
| elif cycle_name == 'cycle2': | |
| cycle_start = 12 # 12 PM | |
| elif cycle_name == 'cycle3': | |
| cycle_start = 17 # 5 PM | |
| elif cycle_name == 'cycle4': | |
| cycle_start = 22 # 10 PM | |
| # Calculate required staff based on beds | |
| required_staff = max(1, int(beds / BEDS_PER_STAFF)) | |
| # Store in demand dictionary | |
| demand_dict[(day, cycle_start)] = { | |
| 'bed_count': beds, | |
| 'required_staff': required_staff, | |
| 'clinic_start': clinic_start, | |
| 'clinic_end': clinic_end | |
| } | |
| print(f"Created demand dictionary with {len(demand_dict)} entries") | |
| print(f"Sample demand data: {list(demand_dict.items())[:2]}") | |
| # Define cycle times | |
| cycle_times = { | |
| 'cycle1': (clinic_start, (clinic_start + hours_per_cycle) % 24), | |
| 'cycle2': ((clinic_start + hours_per_cycle) % 24, (clinic_start + 2 * hours_per_cycle) % 24), | |
| 'cycle3': ((clinic_start + 2 * hours_per_cycle) % 24, (clinic_start + 3 * hours_per_cycle) % 24), | |
| 'cycle4': ((clinic_start + 3 * hours_per_cycle) % 24, clinic_end) | |
| } | |
| print(f"Cycle times: {cycle_times}") | |
| # Rename the index column if necessary | |
| if df.columns[0] not in ['day', 'Day', 'DAY']: | |
| df = df.rename(columns={df.columns[0]: 'day'}) | |
| # Fill missing values | |
| for col in df.columns: | |
| if col.startswith('cycle'): | |
| df[col] = df[col].fillna(0) | |
| # Calculate clinic hours | |
| if clinic_end < clinic_start: | |
| clinic_hours = 24 - clinic_start + clinic_end | |
| else: | |
| clinic_hours = clinic_end - clinic_start | |
| # Get number of days in the dataset | |
| num_days = len(df) | |
| # Parameters | |
| STANDARD_PERIOD_DAYS = 30 # Standard 4-week period | |
| # Scale MAX_HOURS_PER_STAFF based on the ratio of actual days to standard period | |
| BASE_MAX_HOURS = float(max_hours_per_staff) # This is for a 28-day period | |
| MAX_HOURS_PER_STAFF = BASE_MAX_HOURS * (num_days / STANDARD_PERIOD_DAYS) | |
| # Log the adjustment for transparency | |
| original_results = f"Input max hours per staff (28-day period): {BASE_MAX_HOURS}\n" | |
| original_results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n\n" | |
| HOURS_PER_CYCLE = float(hours_per_cycle) | |
| REST_DAYS_PER_WEEK = int(rest_days_per_week) | |
| SHIFT_TYPES = [6, 8, 10, 12] # Standard shift types | |
| OVERLAP_TIME = float(overlap_time) | |
| CLINIC_START = int(clinic_start) | |
| CLINIC_END = int(clinic_end) | |
| CLINIC_HOURS = clinic_hours | |
| MAX_START_TIME_CHANGE = int(max_start_time_change) | |
| OVERTIME_ALLOWED = 1 + (overtime_percent / 100) # Convert percentage to multiplier | |
| # Calculate staff needed per cycle (beds/BEDS_PER_STAFF, rounded up) | |
| for col in df.columns: | |
| if col.startswith('cycle') and not col.endswith('_staff'): | |
| df[f'{col}_staff'] = np.ceil(df[col] / BEDS_PER_STAFF) | |
| # Get cycle names and number of cycles | |
| cycle_cols = [col for col in df.columns if col.startswith('cycle') and not col.endswith('_staff')] | |
| num_cycles = len(cycle_cols) | |
| # Get staff requirements | |
| max_staff_needed = max([df[f'{cycle}_staff'].max() for cycle in cycle_cols]) | |
| # Define possible shift start times | |
| shift_start_times = list(range(CLINIC_START, CLINIC_START + int(CLINIC_HOURS) - min(SHIFT_TYPES) + 1)) | |
| # Generate all possible shifts | |
| possible_shifts = [] | |
| for duration in SHIFT_TYPES: | |
| for start_time in shift_start_times: | |
| end_time = (start_time + duration) % 24 | |
| # Create a shift with its coverage of cycles | |
| shift = { | |
| 'id': f"{duration}hr_{start_time:02d}", | |
| 'start': start_time, | |
| 'end': end_time, | |
| 'duration': duration, | |
| 'cycles_covered': set() | |
| } | |
| # Determine which cycles this shift covers | |
| for cycle, (cycle_start, cycle_end) in cycle_times.items(): | |
| # Handle overnight cycles | |
| if cycle_end < cycle_start: # overnight cycle | |
| if start_time >= cycle_start or end_time <= cycle_end or (start_time < end_time and end_time > cycle_start): | |
| shift['cycles_covered'].add(cycle) | |
| else: # normal cycle | |
| shift_end = end_time if end_time > start_time else end_time + 24 | |
| cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24 | |
| # Check for overlap | |
| if not (shift_end <= cycle_start or start_time >= cycle_end_adj): | |
| shift['cycles_covered'].add(cycle) | |
| if shift['cycles_covered']: # Only add shifts that cover at least one cycle | |
| possible_shifts.append(shift) | |
| # Estimate minimum number of staff needed - more precise calculation | |
| total_staff_hours = 0 | |
| for _, row in df.iterrows(): | |
| for cycle in cycle_cols: | |
| total_staff_hours += row[f'{cycle}_staff'] * HOURS_PER_CYCLE | |
| # Calculate theoretical minimum staff with perfect utilization | |
| theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF) | |
| # Add a small buffer for rest day constraints | |
| min_staff_estimate = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK))) | |
| # Use exact_staff_count if provided, otherwise estimate | |
| if exact_staff_count is not None and exact_staff_count > 0: | |
| # When exact staff count is provided, only create that many staff in the model | |
| estimated_staff = exact_staff_count | |
| num_staff_to_create = exact_staff_count # Only create exactly this many staff | |
| else: | |
| # Add some buffer for constraints like rest days and shift changes | |
| estimated_staff = max(min_staff_estimate, max_staff_needed + 1) | |
| num_staff_to_create = int(estimated_staff) # Create the estimated number of staff | |
| def optimize_schedule(num_staff, time_limit=600): | |
| try: | |
| # Create a binary linear programming model | |
| model = pl.LpProblem("Staff_Scheduling", pl.LpMinimize) | |
| # Decision variables | |
| x = pl.LpVariable.dicts("shift", | |
| [(s, d, shift['id']) for s in range(1, num_staff+1) | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts], | |
| cat='Binary') | |
| # Staff usage variable (1 if staff s is used at all, 0 otherwise) | |
| staff_used = pl.LpVariable.dicts("staff_used", range(1, num_staff+1), cat='Binary') | |
| # Total hours worked by all staff | |
| total_hours = pl.LpVariable("total_hours", lowBound=0) | |
| # CRITICAL CHANGE: Remove coverage violation variables - make coverage a hard constraint | |
| # CRITICAL CHANGE: Remove overtime variables - make overtime a hard constraint | |
| # Objective function now only focuses on minimizing staff count and total hours | |
| model += ( | |
| 10**10 * pl.lpSum(staff_used[s] for s in range(1, num_staff+1)) + | |
| 1 * total_hours | |
| ) | |
| # Link total_hours to the sum of all hours worked | |
| model += total_hours == pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] | |
| for s in range(1, num_staff+1) | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts) | |
| # Link staff_used variable with shift assignments | |
| for s in range(1, num_staff+1): | |
| model += pl.lpSum(x[(s, d, shift['id'])] | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts) <= num_days * staff_used[s] | |
| # If staff is used, they must work at least one shift | |
| model += pl.lpSum(x[(s, d, shift['id'])] | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts) >= staff_used[s] | |
| # Maintain staff ordering (to avoid symmetrical solutions) | |
| for s in range(1, num_staff): | |
| model += staff_used[s] >= staff_used[s+1] | |
| # Each staff works at most one shift per day | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| model += pl.lpSum(x[(s, d, shift['id'])] for shift in possible_shifts) <= 1 | |
| # Rest day constraints (with some flexibility) | |
| min_rest_days = max(1, REST_DAYS_PER_WEEK - 1) | |
| for s in range(1, num_staff+1): | |
| for w in range((num_days + 6) // 7): | |
| week_start = w*7 + 1 | |
| week_end = min(week_start + 6, num_days) | |
| days_in_this_week = week_end - week_start + 1 | |
| if days_in_this_week < 7: | |
| adjusted_rest_days = max(1, int(min_rest_days * days_in_this_week / 7)) | |
| else: | |
| adjusted_rest_days = min_rest_days | |
| model += pl.lpSum(x[(s, d, shift['id'])] | |
| for d in range(week_start, week_end+1) | |
| for shift in possible_shifts) <= days_in_this_week - adjusted_rest_days | |
| # HARD CONSTRAINT: No overtime allowed - strict limit at MAX_HOURS_PER_STAFF | |
| for s in range(1, num_staff+1): | |
| # Calculate total hours worked by this staff | |
| staff_hours = pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] | |
| for d in range(1, num_days+1) | |
| for shift in possible_shifts) | |
| # STRICT constraint: No overtime allowed | |
| model += staff_hours <= MAX_HOURS_PER_STAFF | |
| # HARD CONSTRAINT: Full coverage required | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 # 0-indexed for DataFrame | |
| for cycle in cycle_cols: | |
| staff_needed = df.iloc[day_index][f'{cycle}_staff'] | |
| # Get all shifts that cover this cycle | |
| covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']] | |
| # Staff assigned must be at least staff_needed - NO VIOLATIONS ALLOWED | |
| model += (pl.lpSum(x[(s, d, shift['id'])] | |
| for s in range(1, num_staff+1) | |
| for shift in covering_shifts) >= staff_needed) | |
| # HARD CONSTRAINT: Maximum 60 hours per week for each staff | |
| for s in range(1, num_staff+1): | |
| for w in range((num_days + 6) // 7): | |
| week_start = w*7 + 1 | |
| week_end = min(week_start + 6, num_days) | |
| # Calculate total hours worked by this staff in this week | |
| weekly_hours = pl.lpSum(x[(s, d, shift['id'])] * shift['duration'] | |
| for d in range(week_start, week_end+1) | |
| for shift in possible_shifts) | |
| # STRICT constraint: No more than 60 hours per week | |
| model += weekly_hours <= 60 | |
| # Solve with extended time limit | |
| solver = pl.PULP_CBC_CMD(timeLimit=time_limit, msg=1, gapRel=0.01) # Tighter gap for better solutions | |
| model.solve(solver) | |
| # Check if a feasible solution was found | |
| if model.status == pl.LpStatusOptimal or model.status == pl.LpStatusNotSolved: | |
| # Extract the solution | |
| schedule = [] | |
| for s in range(1, num_staff+1): | |
| for d in range(1, num_days+1): | |
| for shift in possible_shifts: | |
| if pl.value(x[(s, d, shift['id'])]) == 1: | |
| # Find the shift details | |
| shift_details = next((sh for sh in possible_shifts if sh['id'] == shift['id']), None) | |
| schedule.append({ | |
| 'staff_id': s, | |
| 'day': d, | |
| 'shift_id': shift['id'], | |
| 'start': shift_details['start'], | |
| 'end': shift_details['end'], | |
| 'duration': shift_details['duration'], | |
| 'cycles_covered': list(shift_details['cycles_covered']) | |
| }) | |
| return schedule, model.objective.value() | |
| else: | |
| return None, None | |
| except Exception as e: | |
| print(f"Error in optimization: {e}") | |
| return None, None | |
| # Try to solve with estimated number of staff | |
| if exact_staff_count is not None and exact_staff_count > 0: | |
| # If exact staff count is specified, only try with that count | |
| staff_count = int(exact_staff_count) | |
| results = f"Using exactly {staff_count} staff as specified...\n" | |
| # Try to solve with exactly this many staff | |
| schedule, objective = optimize_schedule(staff_count) | |
| if schedule is None: | |
| results += f"Failed to find a feasible solution with exactly {staff_count} staff.\n" | |
| results += "Try increasing the staff count.\n" | |
| return results, None, None, None, None, None, None | |
| else: | |
| # Start from theoretical minimum and work up | |
| min_staff = max(1, int(theoretical_min_staff)) # Start from theoretical minimum | |
| max_staff = int(min_staff_estimate) + 5 # Allow some buffer | |
| results = f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n" | |
| results += f"Searching for minimum staff count starting from {min_staff}...\n" | |
| # Try each staff count from min to max | |
| for staff_count in range(min_staff, max_staff + 1): | |
| results += f"Trying with {staff_count} staff...\n" | |
| # Increase time limit for each attempt to give the solver more time | |
| time_limit = 300 + (staff_count - min_staff) * 100 # More time for larger staff counts | |
| schedule, objective = optimize_schedule(staff_count, time_limit) | |
| if schedule is not None: | |
| results += f"Found feasible solution with {staff_count} staff.\n" | |
| break | |
| if schedule is None: | |
| results += "Failed to find a feasible solution with the attempted staff counts.\n" | |
| results += "Try increasing the staff count manually or relaxing constraints.\n" | |
| return results, None, None, None, None, None, None | |
| results += f"Optimal solution found with {staff_count} staff\n" | |
| results += f"Total staff hours: {objective}\n" | |
| # Convert to DataFrame for analysis | |
| schedule_df = pd.DataFrame(schedule) | |
| # Analyze staff workload | |
| staff_hours = {} | |
| for s in range(1, staff_count+1): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == s] | |
| total_hours = staff_shifts['duration'].sum() | |
| staff_hours[s] = total_hours | |
| # After calculating staff hours, filter out staff with 0 hours before displaying | |
| active_staff_hours = {s: hours for s, hours in staff_hours.items() if hours > 0} | |
| results += "\nStaff Hours:\n" | |
| for staff_id, hours in active_staff_hours.items(): | |
| utilization = (hours / MAX_HOURS_PER_STAFF) * 100 | |
| results += f"Staff {staff_id}: {hours} hours ({utilization:.1f}% utilization)\n" | |
| # Add overtime information | |
| if hours > MAX_HOURS_PER_STAFF: | |
| overtime = hours - MAX_HOURS_PER_STAFF | |
| overtime_percent = (overtime / MAX_HOURS_PER_STAFF) * 100 | |
| results += f" Overtime: {overtime:.1f} hours ({overtime_percent:.1f}%)\n" | |
| # Use active_staff_hours for average utilization calculation | |
| active_staff_count = len(active_staff_hours) | |
| avg_utilization = sum(active_staff_hours.values()) / (active_staff_count * MAX_HOURS_PER_STAFF) * 100 | |
| results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n" | |
| # Check coverage for each day and cycle | |
| coverage_check = [] | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 # 0-indexed for DataFrame | |
| day_schedule = schedule_df[schedule_df['day'] == d] | |
| for cycle in cycle_cols: | |
| required = df.iloc[day_index][f'{cycle}_staff'] | |
| # Count staff covering this cycle | |
| assigned = sum(1 for _, shift in day_schedule.iterrows() | |
| if cycle in shift['cycles_covered']) | |
| coverage_check.append({ | |
| 'day': d, | |
| 'cycle': cycle, | |
| 'required': required, | |
| 'assigned': assigned, | |
| 'satisfied': assigned >= required | |
| }) | |
| coverage_df = pd.DataFrame(coverage_check) | |
| satisfaction = coverage_df['satisfied'].mean() * 100 | |
| results += f"Coverage satisfaction: {satisfaction:.1f}%\n" | |
| # NEW: Check for partial coverage and fill gaps | |
| if satisfaction < 100 or True: # Always check for partial coverage | |
| results += "Checking for partial coverage and filling gaps...\n" | |
| print("\n\n==== STARTING GAP FILLING PROCESS ====") | |
| try: | |
| # Create a dictionary-based schedule for gap filling | |
| dict_schedule = {} | |
| for d in range(1, num_days+1): | |
| dict_schedule[d] = {} | |
| for cycle in cycle_cols: | |
| dict_schedule[d][cycle] = {} | |
| # Fill the dictionary schedule with current assignments | |
| for _, shift in schedule_df.iterrows(): | |
| staff_id = shift['staff_id'] | |
| day = shift['day'] | |
| start = int(shift['start']) # Ensure integer | |
| end = int(shift['end']) # Ensure integer | |
| for cycle in shift['cycles_covered']: | |
| if staff_id not in dict_schedule[day][cycle]: | |
| dict_schedule[day][cycle][staff_id] = [] | |
| dict_schedule[day][cycle][staff_id].append((start, end)) | |
| # Create staff objects for gap filling - use only existing staff | |
| class StaffMember: | |
| def __init__(self, staff_id): | |
| self.id = staff_id | |
| self.name = str(staff_id) | |
| # Only use staff that are already in the schedule | |
| active_staff_ids = sorted(schedule_df['staff_id'].unique()) | |
| staff_list = [StaffMember(s) for s in active_staff_ids] | |
| print(f"Created {len(staff_list)} staff members for gap filling (using only existing staff)") | |
| # Create demand dictionary for each day and cycle | |
| demand_dict = {} | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 # 0-indexed for DataFrame | |
| demand_dict[d] = {} | |
| for cycle in cycle_cols: | |
| # Get the actual bed count and required staff | |
| bed_count = df.iloc[day_index][cycle] | |
| required_staff = df.iloc[day_index][f'{cycle}_staff'] | |
| demand_dict[d][cycle] = { | |
| 'beds': bed_count, | |
| 'required_staff': required_staff, | |
| 'beds_per_staff': BEDS_PER_STAFF | |
| } | |
| # Fill gaps | |
| print("Calling assign_uncovered_hours function with demand data...") | |
| updated_schedule = assign_uncovered_hours(staff_list, dict_schedule, cycle_times, demand_dict, BEDS_PER_STAFF) | |
| print("Returned from assign_uncovered_hours function") | |
| # Convert back to DataFrame format | |
| new_schedule = [] | |
| for day, day_schedule in updated_schedule.items(): | |
| for cycle, staff_shifts in day_schedule.items(): | |
| for staff_id, shifts in staff_shifts.items(): | |
| for start, end in shifts: | |
| # Find if this is a new shift or existing one | |
| existing = False | |
| for idx, row in schedule_df.iterrows(): | |
| if (row['staff_id'] == int(staff_id) and | |
| row['day'] == day and | |
| cycle in row['cycles_covered'] and | |
| row['start'] == start and | |
| row['end'] == end): | |
| existing = True | |
| break | |
| if not existing: | |
| # This is a new shift added to fill a gap | |
| duration = end - start if end > start else end + 24 - start | |
| new_schedule.append({ | |
| 'staff_id': int(staff_id), | |
| 'day': day, | |
| 'shift_id': f"gap_{start:02d}_{end:02d}", | |
| 'start': start, | |
| 'end': end, | |
| 'duration': duration, | |
| 'cycles_covered': [cycle] | |
| }) | |
| print(f"Added new shift: Staff {staff_id}, Day {day}, {start}:00-{end}:00, Cycle {cycle}") | |
| # Add new shifts to the schedule | |
| if new_schedule: | |
| print(f"Adding {len(new_schedule)} new shifts to the schedule") | |
| results += f"Added {len(new_schedule)} new shifts to fill coverage gaps\n" | |
| new_shifts_df = pd.DataFrame(new_schedule) | |
| schedule_df = pd.concat([schedule_df, new_shifts_df], ignore_index=True) | |
| # Force regeneration of CSV and Gantt chart | |
| print("Regenerating CSV and Gantt chart with updated schedule") | |
| # Recheck coverage after adding new shifts | |
| coverage_check = [] | |
| for d in range(1, num_days+1): | |
| day_index = d - 1 # 0-indexed for DataFrame | |
| day_schedule = schedule_df[schedule_df['day'] == d] | |
| for cycle in cycle_cols: | |
| required = df.iloc[day_index][f'{cycle}_staff'] | |
| # Count staff covering this cycle | |
| assigned = sum(1 for _, shift in day_schedule.iterrows() | |
| if cycle in shift['cycles_covered']) | |
| # Check for partial coverage | |
| cycle_start, cycle_end = cycle_times[cycle] | |
| cycle_duration = cycle_end - cycle_start if cycle_end > cycle_start else cycle_end + 24 - cycle_start | |
| # Create hourly timeline to check complete coverage | |
| timeline = [0] * cycle_duration | |
| # Mark covered hours | |
| for _, shift in day_schedule.iterrows(): | |
| if cycle in shift['cycles_covered']: | |
| start = int(shift['start']) | |
| end = int(shift['end']) | |
| # Handle overnight shifts | |
| if end < start: | |
| end += 24 | |
| # Calculate relative positions | |
| if cycle_end < cycle_start: # overnight cycle | |
| if start >= cycle_start: | |
| rel_start = start - cycle_start | |
| else: | |
| rel_start = start + 24 - cycle_start | |
| if end >= cycle_start: | |
| rel_end = end - cycle_start | |
| else: | |
| rel_end = end + 24 - cycle_start | |
| else: | |
| rel_start = max(0, start - cycle_start) | |
| rel_end = min(cycle_duration, end - cycle_start) | |
| # Ensure bounds | |
| rel_start = max(0, min(rel_start, cycle_duration)) | |
| rel_end = max(0, min(rel_end, cycle_duration)) | |
| # Mark hours | |
| for hour in range(rel_start, rel_end): | |
| if 0 <= hour < len(timeline): | |
| timeline[hour] += 1 | |
| # Check if all hours have enough staff | |
| fully_covered = all(count >= required for count in timeline) | |
| coverage_check.append({ | |
| 'day': d, | |
| 'cycle': cycle, | |
| 'required': required, | |
| 'assigned': assigned, | |
| 'satisfied': assigned >= required and fully_covered | |
| }) | |
| new_coverage_df = pd.DataFrame(coverage_check) | |
| new_satisfaction = new_coverage_df['satisfied'].mean() * 100 | |
| results += f"Coverage satisfaction after gap filling: {new_satisfaction:.1f}%\n" | |
| if new_satisfaction < 100: | |
| results += "Warning: Some coverage gaps still remain after filling!\n" | |
| still_unsatisfied = new_coverage_df[~new_coverage_df['satisfied']] | |
| results += still_unsatisfied.to_string() + "\n" | |
| else: | |
| results += "All coverage gaps successfully filled!\n" | |
| else: | |
| print("No new shifts were added") | |
| results += "No coverage gaps were found or all gaps could not be filled\n" | |
| print("==== FINISHED GAP FILLING PROCESS ====\n\n") | |
| except Exception as e: | |
| print(f"ERROR in gap filling process: {str(e)}") | |
| results += f"Error during gap filling: {str(e)}\n" | |
| import traceback | |
| traceback.print_exc() | |
| if satisfaction < 100: | |
| results += "Warning: Not all staffing requirements are met!\n" | |
| unsatisfied = coverage_df[~coverage_df['satisfied']] | |
| results += unsatisfied.to_string() + "\n" | |
| # Generate detailed schedule report | |
| detailed_schedule = "Detailed Schedule:\n" | |
| for d in range(1, num_days+1): | |
| day_schedule = schedule_df[schedule_df['day'] == d] | |
| day_schedule = day_schedule.sort_values(['start']) | |
| detailed_schedule += f"\nDay {d}:\n" | |
| for _, shift in day_schedule.iterrows(): | |
| start_hour = shift['start'] | |
| end_hour = shift['end'] | |
| start_str = am_pm(start_hour) | |
| end_str = am_pm(end_hour) | |
| cycles = ", ".join(shift['cycles_covered']) | |
| detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n" | |
| # Generate schedule visualization | |
| fig, ax = plt.subplots(figsize=(15, 8)) | |
| # Prepare schedule for plotting | |
| staff_days = {} | |
| for s in range(1, staff_count+1): | |
| staff_days[s] = [0] * num_days # 0 means off duty | |
| for _, shift in schedule_df.iterrows(): | |
| staff_id = shift['staff_id'] | |
| day = shift['day'] - 1 # 0-indexed | |
| staff_days[staff_id][day] = shift['duration'] | |
| # Plot the schedule | |
| for s, hours in staff_days.items(): | |
| ax.bar(range(1, num_days+1), hours, label=f'Staff {s}') | |
| ax.set_xlabel('Day') | |
| ax.set_ylabel('Shift Hours') | |
| ax.set_title('Staff Schedule') | |
| ax.set_xticks(range(1, num_days+1)) | |
| ax.legend() | |
| # Save the figure to a temporary file | |
| plot_path = None | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: | |
| plt.savefig(f.name) | |
| plt.close(fig) | |
| plot_path = f.name | |
| # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff | |
| gantt_path = create_gantt_chart(schedule_df, num_days, staff_count) | |
| # Convert schedule to CSV data | |
| schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm) | |
| schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm) | |
| schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False) | |
| # Create a temporary file and write the CSV data into it | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: | |
| temp_file.write(schedule_csv) | |
| schedule_csv_path = temp_file.name | |
| # Create staff assignment table | |
| staff_assignment_data = [] | |
| for d in range(1, num_days + 1): | |
| cycle_staff = {} | |
| for cycle in cycle_cols: | |
| # Get staff IDs assigned to this cycle on this day | |
| staff_ids = schedule_df[(schedule_df['day'] == d) & (schedule_df['cycles_covered'].apply(lambda x: cycle in x))]['staff_id'].tolist() | |
| cycle_staff[cycle] = len(staff_ids) | |
| staff_assignment_data.append([d] + [cycle_staff[cycle] for cycle in cycle_cols]) | |
| staff_assignment_df = pd.DataFrame(staff_assignment_data, columns=['Day'] + cycle_cols) | |
| # Create CSV files for download | |
| staff_assignment_csv_path = None | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file: | |
| staff_assignment_df.to_csv(temp_file.name, index=False) | |
| staff_assignment_csv_path = temp_file.name | |
| # Return all required values in the correct order | |
| return results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path | |
| def convert_to_24h(time_str): | |
| """Converts AM/PM time string to 24-hour format.""" | |
| try: | |
| time_obj = datetime.strptime(time_str, "%I:00 %p") | |
| return time_obj.hour | |
| except ValueError: | |
| return None | |
| def gradio_wrapper( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change, | |
| exact_staff_count=None, overtime_percent=100 | |
| ): | |
| # Convert AM/PM times to 24-hour format | |
| clinic_start = convert_to_24h(clinic_start_ampm) | |
| clinic_end = convert_to_24h(clinic_end_ampm) | |
| # Call the optimization function | |
| print(f"Starting optimization with gap filling enabled...") | |
| try: | |
| # Check if CSV file is provided | |
| if csv_file is None: | |
| print("Error: No CSV file provided") | |
| return None, None, None, None, None, None | |
| # Print file info for debugging | |
| if hasattr(csv_file, 'name'): | |
| print(f"CSV file name: {csv_file.name}") | |
| results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, | |
| exact_staff_count, overtime_percent | |
| ) | |
| print("Optimization completed successfully") | |
| # Create downloadable CSV files | |
| staff_download = create_download_link(staff_assignment_df, "staff_assignment.csv") if staff_assignment_df is not None else None | |
| schedule_download_file = create_download_link(schedule_df, "schedule.csv") if schedule_df is not None else None | |
| # Return all outputs | |
| return staff_assignment_df, gantt_path, schedule_csv_path, plot_path, staff_download, schedule_download_file | |
| except Exception as e: | |
| print(f"Error in gradio_wrapper: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None, None, None, None, None | |
| # Create a Gantt chart with advanced visuals and alternating labels - only showing active staff | |
| def create_gantt_chart(schedule_df, num_days, staff_count): | |
| # Get the list of active staff IDs (staff who have at least one shift) | |
| active_staff_ids = sorted(schedule_df['staff_id'].unique()) | |
| active_staff_count = len(active_staff_ids) | |
| # Create a mapping from original staff ID to position in the chart | |
| staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)} | |
| # Create a larger figure with higher DPI | |
| plt.figure(figsize=(max(30, num_days * 1.5), max(12, active_staff_count * 0.8)), dpi=200) | |
| # Use a more sophisticated color palette - only for active staff | |
| colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count)) | |
| # Set a modern style | |
| plt.style.use('seaborn-v0_8-whitegrid') | |
| # Create a new axis with a slight background color | |
| ax = plt.gca() | |
| ax.set_facecolor('#f8f9fa') | |
| # Sort by staff then day | |
| schedule_df = schedule_df.sort_values(['staff_id', 'day']) | |
| # Plot Gantt chart - only for active staff | |
| for i, staff_id in enumerate(active_staff_ids): | |
| staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id] | |
| y_pos = active_staff_count - i # Position based on index in active staff list | |
| # Add staff label with a background box | |
| ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold', | |
| ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray', | |
| boxstyle='round,pad=0.5', alpha=0.9)) | |
| # Add a subtle background for each staff row | |
| ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5) | |
| # Track shift positions to avoid label overlap | |
| shift_positions = [] | |
| for idx, shift in enumerate(staff_shifts.iterrows()): | |
| _, shift = shift | |
| day = shift['day'] | |
| start_hour = shift['start'] | |
| end_hour = shift['end'] | |
| duration = shift['duration'] | |
| # Format times for display | |
| start_ampm = am_pm(start_hour) | |
| end_ampm = am_pm(end_hour) | |
| # Calculate shift position | |
| shift_start_pos = day-1+start_hour/24 | |
| # Handle overnight shifts | |
| if end_hour < start_hour: # Overnight shift | |
| # First part of shift (until midnight) | |
| rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect1: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # Second part of shift (after midnight) | |
| rect2 = ax.barh(y_pos, end_hour/24, left=day, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect2: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # For overnight shifts, we'll place the label in the first part if it's long enough | |
| shift_width = (24-start_hour)/24 | |
| if shift_width >= 0.1: # Only add label if there's enough space | |
| label_pos = shift_start_pos + shift_width/2 | |
| # Alternate labels above and below | |
| y_offset = 0.35 if idx % 2 == 0 else -0.35 | |
| # Add label with background for better readability | |
| label = f"{start_ampm}-{end_ampm}" | |
| text = ax.text(label_pos, y_pos + y_offset, label, | |
| ha='center', va='center', fontsize=9, fontweight='bold', | |
| color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, | |
| boxstyle='round,pad=0.3', edgecolor='gray'), | |
| zorder=20) | |
| shift_positions.append(label_pos) | |
| else: | |
| # Regular shift | |
| shift_width = duration/24 | |
| rect = ax.barh(y_pos, shift_width, left=shift_start_pos, | |
| height=0.6, color=colors[i], alpha=0.9, | |
| edgecolor='black', linewidth=1, zorder=10) | |
| # Add gradient effect | |
| for r in rect: | |
| r.set_edgecolor('black') | |
| r.set_linewidth(1) | |
| # Only add label if there's enough space | |
| if shift_width >= 0.1: | |
| label_pos = shift_start_pos + shift_width/2 | |
| # Alternate labels above and below | |
| y_offset = 0.35 if idx % 2 == 0 else -0.35 | |
| # Add label with background for better readability | |
| label = f"{start_ampm}-{end_ampm}" | |
| text = ax.text(label_pos, y_pos + y_offset, label, | |
| ha='center', va='center', fontsize=9, fontweight='bold', | |
| color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3, | |
| boxstyle='round,pad=0.3', edgecolor='gray'), | |
| zorder=20) | |
| shift_positions.append(label_pos) | |
| # Add weekend highlighting with a more sophisticated look | |
| for day in range(1, num_days + 1): | |
| # Determine if this is a weekend (assuming day 1 is Monday) | |
| is_weekend = (day % 7 == 0) or (day % 7 == 6) # Saturday or Sunday | |
| if is_weekend: | |
| ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10) | |
| day_label = "Saturday" if day % 7 == 6 else "Sunday" | |
| ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000', | |
| fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round')) | |
| # Set x-axis ticks for each day with better formatting | |
| ax.set_xticks(np.arange(0.5, num_days, 1)) | |
| day_labels = [f"Day {d}" for d in range(1, num_days+1)] | |
| ax.set_xticklabels(day_labels, rotation=0, ha='center', fontsize=10) | |
| # Add vertical lines between days with better styling | |
| for day in range(1, num_days): | |
| ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5) | |
| # Set y-axis ticks for each staff | |
| ax.set_yticks(np.arange(1, active_staff_count+1)) | |
| ax.set_yticklabels([]) # Remove default labels as we've added custom ones | |
| # Set axis limits with some padding | |
| ax.set_xlim(-0.8, num_days) | |
| ax.set_ylim(0.5, active_staff_count + 0.5) | |
| # Add grid for hours (every 6 hours) with better styling | |
| for day in range(num_days): | |
| for hour in [6, 12, 18]: | |
| ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5) | |
| # Add small hour markers at the bottom | |
| hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM" | |
| ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7, | |
| color='#666666', rotation=90, alpha=0.7) | |
| # Add title and labels with more sophisticated styling | |
| plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333') | |
| plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333') | |
| # Add a legend for time reference with better styling | |
| time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10, | |
| fontweight='bold', color='#333333') | |
| time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight'] | |
| for i, time in enumerate(time_markers): | |
| plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555') | |
| # Remove spines | |
| for spine in ['top', 'right', 'left']: | |
| ax.spines[spine].set_visible(False) | |
| # Add a note about weekends with better styling | |
| weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12, | |
| color='#cc0000', fontweight='bold', | |
| bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round')) | |
| # Add a subtle border around the entire chart | |
| plt.box(False) | |
| # Save the Gantt chart with high quality | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: | |
| plt.tight_layout() | |
| plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white') | |
| plt.close() | |
| return f.name | |
| def is_hour_within_clinic_hours(hour, clinic_start, clinic_end): | |
| """ | |
| Check if an hour is within clinic operating hours. | |
| Args: | |
| hour: Hour to check (0-23) | |
| clinic_start: Clinic start hour (0-23) | |
| clinic_end: Clinic end hour (0-23) | |
| Returns: | |
| bool: True if hour is within clinic hours, False otherwise | |
| """ | |
| try: | |
| # Convert inputs to integers if they're not already | |
| hour = int(hour) | |
| clinic_start = int(clinic_start) | |
| clinic_end = int(clinic_end) | |
| # Handle overnight clinic (end time is less than start time) | |
| if clinic_end < clinic_start: | |
| return hour >= clinic_start or hour < clinic_end | |
| else: | |
| return hour >= clinic_start and hour < clinic_end | |
| except Exception as e: | |
| print(f"WARNING: Error in is_hour_within_clinic_hours: {e}") | |
| # Default to True if there's an error | |
| return True | |
| def is_cycle_within_clinic_hours(cycle_start, cycle_end, clinic_start, clinic_end): | |
| """ | |
| Check if a cycle overlaps with clinic operating hours. | |
| Args: | |
| cycle_start: Cycle start hour (0-23) | |
| cycle_end: Cycle end hour (0-23) | |
| clinic_start: Clinic start hour (0-23) | |
| clinic_end: Clinic end hour (0-23) | |
| Returns: | |
| bool: True if cycle overlaps with clinic hours, False otherwise | |
| """ | |
| try: | |
| # Convert inputs to integers if they're not already | |
| cycle_start = int(cycle_start) | |
| cycle_end = int(cycle_end) | |
| clinic_start = int(clinic_start) | |
| clinic_end = int(clinic_end) | |
| # If cycle is overnight (end time is less than start time) | |
| if cycle_end < cycle_start: | |
| # If clinic is also overnight | |
| if clinic_end < clinic_start: | |
| # There will always be some overlap | |
| return True | |
| else: | |
| # Check if any part of the cycle is within clinic hours | |
| return not (cycle_end <= clinic_start or cycle_start >= clinic_end) | |
| else: | |
| # If clinic is overnight | |
| if clinic_end < clinic_start: | |
| # Check if any part of the cycle is within clinic hours | |
| return not (cycle_start >= clinic_end and cycle_end <= clinic_start) | |
| else: | |
| # Check if any part of the cycle is within clinic hours | |
| return not (cycle_end <= clinic_start or cycle_start >= clinic_end) | |
| except Exception as e: | |
| print(f"WARNING: Error in is_cycle_within_clinic_hours: {e}") | |
| # Default to True if there's an error | |
| return True | |
| def fill_coverage_gaps(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff): | |
| """ | |
| A completely rewritten function to fill coverage gaps by assigning additional shifts to staff. | |
| This function is specifically designed to work with the dictionary structure in the code. | |
| Args: | |
| staff_list: List of staff members | |
| schedule: Current schedule (dictionary format) | |
| cycle_hours: Hours per cycle | |
| demand_dict: Dictionary with demand data | |
| beds_per_staff: Number of beds per staff | |
| Returns: | |
| Updated schedule with gaps filled | |
| """ | |
| print("\n=== FILLING COVERAGE GAPS (DIRECT APPROACH) ===") | |
| # Check if schedule is a dictionary or DataFrame | |
| is_dict_schedule = isinstance(schedule, dict) | |
| print(f"Schedule format: {'Dictionary' if is_dict_schedule else 'DataFrame'}") | |
| if not is_dict_schedule: | |
| print("WARNING: Expected dictionary schedule but got DataFrame. Converting to dictionary.") | |
| # Convert DataFrame to dictionary if needed | |
| dict_schedule = {} | |
| for _, row in schedule.iterrows(): | |
| day = row['Day'] | |
| if day not in dict_schedule: | |
| dict_schedule[day] = [] | |
| dict_schedule[day].append({ | |
| 'staff_id': row['Staff'], | |
| 'start': row['Start'], | |
| 'end': row['End'], | |
| 'type': row.get('Type', 'Regular') | |
| }) | |
| schedule = dict_schedule | |
| # Create a copy of the schedule to avoid modifying the original | |
| updated_schedule = {k: v.copy() for k, v in schedule.items()} | |
| # Print some debug info about the schedule structure | |
| print(f"Schedule has {len(updated_schedule)} days") | |
| print(f"Schedule keys: {list(updated_schedule.keys())[:5]} (showing first 5)") | |
| # Check if day keys are strings or integers | |
| day_keys_are_strings = False | |
| if updated_schedule: | |
| sample_key = next(iter(updated_schedule)) | |
| if isinstance(sample_key, str): | |
| day_keys_are_strings = True | |
| print("Day keys are strings") | |
| else: | |
| print("Day keys are integers") | |
| # Try to print a sample shift if available | |
| if updated_schedule: | |
| sample_day = next(iter(updated_schedule)) | |
| print(f"Sample day {sample_day} has {len(updated_schedule[sample_day])} shifts") | |
| if updated_schedule[sample_day]: | |
| try: | |
| print(f"First shift on day {sample_day}: {updated_schedule[sample_day][0]}") | |
| except (IndexError, KeyError) as e: | |
| print(f"Error accessing first shift: {e}") | |
| print(f"Shifts for day {sample_day}: {updated_schedule[sample_day]}") | |
| # Debug the demand_dict structure | |
| print("\nDemand Dictionary Structure:") | |
| print(f"Demand dict has {len(demand_dict)} entries") | |
| if demand_dict: | |
| print(f"Demand dict keys (first 5): {list(demand_dict.keys())[:5]}") | |
| sample_key = next(iter(demand_dict)) | |
| print(f"Sample demand data for key {sample_key}: {demand_dict[sample_key]}") | |
| else: | |
| print("WARNING: Demand dictionary is empty!") | |
| # Extract the actual cycles from cycle_hours if it's a dictionary | |
| cycles = [] | |
| if isinstance(cycle_hours, dict): | |
| print(f"Cycle hours dictionary: {cycle_hours}") | |
| # Check if it has the format {'cycle1': (start, end), ...} | |
| for key, value in cycle_hours.items(): | |
| if key.startswith('cycle') and isinstance(value, tuple) and len(value) == 2: | |
| cycles.append(value) | |
| # If we couldn't extract cycles, try to get them from demand_dict | |
| if not cycles: | |
| for key in demand_dict: | |
| if isinstance(key, tuple) and len(key) == 2: | |
| day, cycle_start = key | |
| if cycle_start not in [c[0] for c in cycles]: | |
| # Find the cycle end | |
| cycle_end = None | |
| for other_key in demand_dict: | |
| if isinstance(other_key, tuple) and len(other_key) == 2: | |
| other_day, other_cycle_start = other_key | |
| if other_day == day and other_cycle_start > cycle_start: | |
| if cycle_end is None or other_cycle_start < cycle_end: | |
| cycle_end = other_cycle_start | |
| # If we couldn't find the next cycle, assume it's 5 hours later | |
| if cycle_end is None: | |
| cycle_end = (cycle_start + 5) % 24 | |
| cycles.append((cycle_start, cycle_end)) | |
| # If we still don't have cycles, use default ones | |
| if not cycles: | |
| # Use the clinic hours to determine cycles | |
| clinic_start = 7 # Default | |
| clinic_end = 3 # Default | |
| # Try to extract from demand_dict | |
| for key, value in demand_dict.items(): | |
| if isinstance(value, dict): | |
| if 'clinic_start' in value: | |
| clinic_start = value['clinic_start'] | |
| if 'clinic_end' in value: | |
| clinic_end = value['clinic_end'] | |
| break | |
| # Create 4 equal cycles covering the clinic hours | |
| if clinic_end < clinic_start: # Overnight clinic | |
| total_hours = (24 - clinic_start) + clinic_end | |
| else: | |
| total_hours = clinic_end - clinic_start | |
| cycle_length = max(1, total_hours // 4) | |
| cycles = [] | |
| for i in range(4): | |
| cycle_start = (clinic_start + (i * cycle_length)) % 24 | |
| cycle_end = (cycle_start + cycle_length) % 24 | |
| cycles.append((cycle_start, cycle_end)) | |
| # Sort cycles by start time | |
| cycles.sort(key=lambda x: x[0]) | |
| print(f"Using cycles: {cycles}") | |
| # Extract staff IDs from staff_list | |
| staff_ids = [] | |
| for staff in staff_list: | |
| if hasattr(staff, 'id'): | |
| staff_ids.append(staff.id) | |
| else: | |
| staff_ids.append(staff) | |
| print(f"Staff IDs: {staff_ids}") | |
| # Calculate current monthly hours for each staff directly from the schedule | |
| monthly_hours = {staff_id: 0 for staff_id in staff_ids} | |
| # First, convert the nested dictionary schedule to a flat list of shifts for easier processing | |
| all_shifts = [] | |
| for day_key, day_data in updated_schedule.items(): | |
| day = int(day_key) if isinstance(day_key, str) and day_key.isdigit() else day_key | |
| # Handle different schedule formats | |
| if isinstance(day_data, dict): | |
| # Format: {day: {cycle: {staff_id: [(start, end), ...], ...}, ...}, ...} | |
| for cycle_key, cycle_data in day_data.items(): | |
| if isinstance(cycle_data, dict): | |
| for staff_id, shifts in cycle_data.items(): | |
| if staff_id in staff_ids: | |
| for shift in shifts: | |
| if isinstance(shift, tuple) and len(shift) == 2: | |
| start_hour, end_hour = shift | |
| all_shifts.append({ | |
| 'day': day, | |
| 'staff_id': staff_id, | |
| 'start': start_hour, | |
| 'end': end_hour | |
| }) | |
| elif isinstance(day_data, list): | |
| # Format: {day: [{staff_id: ..., start: ..., end: ...}, ...], ...} | |
| for shift in day_data: | |
| if isinstance(shift, dict): | |
| staff_id = shift.get('staff_id') | |
| if staff_id in staff_ids: | |
| start_hour = shift.get('start') | |
| end_hour = shift.get('end') | |
| if start_hour is not None and end_hour is not None: | |
| all_shifts.append({ | |
| 'day': day, | |
| 'staff_id': staff_id, | |
| 'start': start_hour, | |
| 'end': end_hour | |
| }) | |
| # Calculate hours from the flat list of shifts | |
| for shift in all_shifts: | |
| staff_id = shift['staff_id'] | |
| start_hour = shift['start'] | |
| end_hour = shift['end'] | |
| # Calculate shift hours | |
| if end_hour < start_hour: # Overnight shift | |
| shift_hours = (24 - start_hour) + end_hour | |
| else: | |
| shift_hours = end_hour - start_hour | |
| monthly_hours[staff_id] += shift_hours | |
| # Print monthly hours for each staff | |
| for staff_id, hours in monthly_hours.items(): | |
| print(f"Staff {staff_id} current monthly hours: {hours}") | |
| # Sort staff by monthly hours (lowest first) | |
| sorted_staff_ids = sorted(staff_ids, key=lambda x: monthly_hours.get(x, 0)) | |
| print(f"Staff sorted by monthly hours: {sorted_staff_ids}") | |
| # Track if any new shifts were added | |
| new_shifts_added = False | |
| # Process each day | |
| for day in range(28): | |
| # Convert day to the format used in the schedule | |
| day_key = str(day) if day_keys_are_strings else day | |
| # For each day, create a timeline of staff coverage | |
| timeline = [0] * 24 | |
| # Fill the timeline based on current schedule | |
| if day_key in updated_schedule: | |
| day_data = updated_schedule[day_key] | |
| # Handle different schedule formats | |
| if isinstance(day_data, dict): | |
| # Format: {day: {cycle: {staff_id: [(start, end), ...], ...}, ...}, ...} | |
| for cycle_key, cycle_data in day_data.items(): | |
| if isinstance(cycle_data, dict): | |
| for staff_id, shifts in cycle_data.items(): | |
| for shift in shifts: | |
| if isinstance(shift, tuple) and len(shift) == 2: | |
| start_hour, end_hour = shift | |
| # Handle overnight shifts | |
| if end_hour < start_hour: | |
| # Add staff for hours until midnight | |
| for i in range(start_hour, 24): | |
| timeline[i] += 1 | |
| # Add staff for hours after midnight | |
| for i in range(0, end_hour): | |
| timeline[i] += 1 | |
| else: | |
| # Add staff for all hours in the shift | |
| for i in range(start_hour, end_hour): | |
| timeline[i] += 1 | |
| elif isinstance(day_data, list): | |
| # Format: {day: [{staff_id: ..., start: ..., end: ...}, ...], ...} | |
| for shift in day_data: | |
| if isinstance(shift, dict): | |
| start_hour = shift.get('start') | |
| end_hour = shift.get('end') | |
| if start_hour is not None and end_hour is not None: | |
| # Handle overnight shifts | |
| if end_hour < start_hour: | |
| # Add staff for hours until midnight | |
| for i in range(start_hour, 24): | |
| timeline[i] += 1 | |
| # Add staff for hours after midnight | |
| for i in range(0, end_hour): | |
| timeline[i] += 1 | |
| else: | |
| # Add staff for all hours in the shift | |
| for i in range(start_hour, end_hour): | |
| timeline[i] += 1 | |
| # Print the timeline for this day | |
| print(f"\nDay {day} timeline: {timeline}") | |
| # Check each cycle for understaffing | |
| for cycle_start, cycle_end in cycles: | |
| # Get the required staff for this cycle | |
| required_staff = 0 | |
| bed_count = 0 | |
| # Try different demand data formats | |
| if day in demand_dict: | |
| day_demand = demand_dict[day] | |
| # Format: {day: {'cycle1': {'beds': ..., 'required_staff': ...}, ...}, ...} | |
| if isinstance(day_demand, dict): | |
| cycle_key = None | |
| for key in day_demand.keys(): | |
| if key == f'cycle{cycles.index((cycle_start, cycle_end)) + 1}': | |
| cycle_key = key | |
| break | |
| if cycle_key and cycle_key in day_demand: | |
| cycle_demand = day_demand[cycle_key] | |
| if isinstance(cycle_demand, dict): | |
| bed_count = cycle_demand.get('beds', 0) | |
| required_staff = cycle_demand.get('required_staff', 0) | |
| print(f"Found demand data for day {day}, cycle {cycle_key}: beds={bed_count}, required_staff={required_staff}") | |
| # Try tuple format: (day, cycle_start): {'bed_count': ..., 'required_staff': ...} | |
| demand_key = (day, cycle_start) | |
| if demand_key in demand_dict: | |
| demand_data = demand_dict[demand_key] | |
| if isinstance(demand_data, dict): | |
| bed_count = demand_data.get('bed_count', 0) | |
| required_staff = demand_data.get('required_staff', 0) | |
| print(f"Found demand data for day {day}, cycle {cycle_start}: bed_count={bed_count}, required_staff={required_staff}") | |
| # If we still don't have required_staff, calculate it from bed_count | |
| if required_staff == 0 and bed_count > 0: | |
| required_staff = max(1, int(bed_count / beds_per_staff)) | |
| print(f"Calculated required_staff from bed_count: {required_staff}") | |
| # Skip if no staff required | |
| if required_staff == 0: | |
| print(f"No staff required for day {day}, cycle {cycle_start}-{cycle_end}") | |
| continue | |
| print(f"\nChecking day {day}, cycle {cycle_start}-{cycle_end}") | |
| print(f"Required staff: {required_staff}") | |
| # Check each hour in the cycle for understaffing | |
| understaffed_hours = [] | |
| # Handle overnight cycles | |
| if cycle_end < cycle_start: | |
| hour_range = list(range(cycle_start, 24)) + list(range(0, cycle_end)) | |
| else: | |
| hour_range = range(cycle_start, cycle_end) | |
| for hour in hour_range: | |
| current_staff = timeline[hour] | |
| print(f"Hour {hour}: {current_staff} staff (need {required_staff})") | |
| if current_staff < required_staff: | |
| understaffed_hours.append(hour) | |
| if not understaffed_hours: | |
| print(f"No understaffing in this cycle") | |
| continue | |
| print(f"Understaffed hours: {understaffed_hours}") | |
| # Group consecutive hours | |
| hour_groups = [] | |
| current_group = [understaffed_hours[0]] | |
| for i in range(1, len(understaffed_hours)): | |
| if understaffed_hours[i] == (understaffed_hours[i-1] + 1) % 24: | |
| current_group.append(understaffed_hours[i]) | |
| else: | |
| hour_groups.append(current_group) | |
| current_group = [understaffed_hours[i]] | |
| if current_group: | |
| hour_groups.append(current_group) | |
| print(f"Grouped into: {hour_groups}") | |
| # Try to assign each group to available staff | |
| for group in hour_groups: | |
| start_hour = group[0] | |
| end_hour = (group[-1] + 1) % 24 # End hour is exclusive | |
| print(f"Trying to assign period on day {day}: {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00") | |
| # Calculate how many additional staff are needed | |
| # Ensure we're using integers for the range function | |
| current_min_staff = min([timeline[h] for h in group]) | |
| # Convert required_staff to int to avoid numpy.float64 issues | |
| required_staff_int = int(required_staff) | |
| staff_needed = max(0, required_staff_int - current_min_staff) | |
| print(f"Need {staff_needed} additional staff (required={required_staff_int}, current={current_min_staff})") | |
| # Try to assign to staff with lowest monthly hours | |
| for _ in range(staff_needed): | |
| assigned = False | |
| for staff_id in sorted_staff_ids: | |
| print(f"Checking if staff {staff_id} is available") | |
| # Check if staff is available for this period | |
| is_available = True | |
| # Check for conflicts with existing shifts | |
| for shift in all_shifts: | |
| if shift['staff_id'] == staff_id: | |
| # Check if shift is on the same day | |
| if shift['day'] == day: | |
| shift_start = shift['start'] | |
| shift_end = shift['end'] | |
| # Check for overlap | |
| if shift_end < shift_start: # Overnight shift | |
| # New shift overlaps with first part of overnight shift | |
| if start_hour < shift_end: | |
| is_available = False | |
| print(f" Conflict: Overlaps with first part of overnight shift {shift_start}-{shift_end}") | |
| break | |
| # New shift overlaps with second part of overnight shift | |
| if end_hour > shift_start: | |
| is_available = False | |
| print(f" Conflict: Overlaps with second part of overnight shift {shift_start}-{shift_end}") | |
| break | |
| else: # Regular shift | |
| # Simple overlap check | |
| if start_hour < shift_end and end_hour > shift_start: | |
| is_available = False | |
| print(f" Conflict: Overlaps with regular shift {shift_start}-{shift_end}") | |
| break | |
| # Check if shift is on the previous day and extends into this day | |
| elif shift['day'] == (day - 1) % 28: | |
| shift_start = shift['start'] | |
| shift_end = shift['end'] | |
| # Only check overnight shifts | |
| if shift_end < shift_start and start_hour < shift_end: | |
| is_available = False | |
| print(f" Conflict: Overlaps with previous day's overnight shift {shift_start}-{shift_end}") | |
| break | |
| # Check if this would be an overnight shift that conflicts with next day | |
| elif shift['day'] == (day + 1) % 28 and end_hour < start_hour: | |
| shift_start = shift['start'] | |
| # Check if overnight shift extends into next day's shift | |
| if end_hour > shift_start: | |
| is_available = False | |
| print(f" Conflict: Overnight shift would extend into next day's shift at {shift_start}") | |
| break | |
| if is_available: | |
| # Create the new shift | |
| new_shift = { | |
| 'day': day, | |
| 'staff_id': staff_id, | |
| 'start': start_hour, | |
| 'end': end_hour | |
| } | |
| # Add to all_shifts for future conflict checking | |
| all_shifts.append(new_shift) | |
| # Add the assignment to the schedule | |
| if day_key not in updated_schedule: | |
| # Create a new day entry in the format that matches the rest of the schedule | |
| if any(isinstance(updated_schedule.get(k), dict) for k in updated_schedule): | |
| # Dictionary format | |
| cycle_idx = next((i for i, (cs, ce) in enumerate(cycles) if cs == cycle_start), 0) | |
| cycle_key = f"cycle{cycle_idx + 1}" | |
| updated_schedule[day_key] = {cycle_key: {staff_id: [(start_hour, end_hour)]}} | |
| else: | |
| # List format | |
| updated_schedule[day_key] = [{ | |
| 'staff_id': staff_id, | |
| 'start': start_hour, | |
| 'end': end_hour, | |
| 'type': 'Gap Fill' | |
| }] | |
| else: | |
| # Add to existing day entry | |
| day_data = updated_schedule[day_key] | |
| if isinstance(day_data, dict): | |
| # Dictionary format | |
| cycle_idx = next((i for i, (cs, ce) in enumerate(cycles) if cs == cycle_start), 0) | |
| cycle_key = f"cycle{cycle_idx + 1}" | |
| if cycle_key not in day_data: | |
| day_data[cycle_key] = {} | |
| if staff_id not in day_data[cycle_key]: | |
| day_data[cycle_key][staff_id] = [] | |
| day_data[cycle_key][staff_id].append((start_hour, end_hour)) | |
| elif isinstance(day_data, list): | |
| # List format | |
| day_data.append({ | |
| 'staff_id': staff_id, | |
| 'start': start_hour, | |
| 'end': end_hour, | |
| 'type': 'Gap Fill' | |
| }) | |
| # Update monthly hours | |
| if end_hour < start_hour: | |
| shift_hours = (24 - start_hour) + end_hour | |
| else: | |
| shift_hours = end_hour - start_hour | |
| monthly_hours[staff_id] = monthly_hours.get(staff_id, 0) + shift_hours | |
| # Update timeline | |
| if end_hour < start_hour: | |
| for i in range(start_hour, 24): | |
| timeline[i] += 1 | |
| for i in range(0, end_hour): | |
| timeline[i] += 1 | |
| else: | |
| for i in range(start_hour, end_hour): | |
| timeline[i] += 1 | |
| print(f"Assigned staff {staff_id} to cover hours {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00 on day {day}") | |
| print(f"Updated timeline: {timeline}") | |
| # Re-sort staff by updated monthly hours | |
| sorted_staff_ids = sorted(staff_ids, key=lambda x: monthly_hours.get(x, 0)) | |
| assigned = True | |
| new_shifts_added = True | |
| break | |
| if not assigned: | |
| print(f"Could not find available staff to cover hours {start_hour}:00 to {end_hour if end_hour > 0 else 24}:00 on day {day}") | |
| if new_shifts_added: | |
| print("Successfully added new shifts to fill gaps") | |
| else: | |
| print("No new shifts were added") | |
| return updated_schedule | |
| def assign_uncovered_hours(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff): | |
| """ | |
| A wrapper around fill_coverage_gaps for backward compatibility. | |
| Args: | |
| staff_list: List of staff members | |
| schedule: Current schedule | |
| cycle_hours: Hours per cycle | |
| demand_dict: Dictionary with demand data | |
| beds_per_staff: Number of beds per staff | |
| Returns: | |
| Updated schedule with gaps filled | |
| """ | |
| print("\n=== ASSIGNING UNCOVERED HOURS ===") | |
| # Debug the demand_dict structure | |
| print("\nDemand Dictionary in assign_uncovered_hours:") | |
| print(f"Demand dict has {len(demand_dict)} entries") | |
| print(f"Demand dict keys (first 5): {list(demand_dict.keys())[:5]}") | |
| # Print a sample of the demand data | |
| if demand_dict: | |
| sample_key = next(iter(demand_dict)) | |
| print(f"Sample demand data for key {sample_key}: {demand_dict[sample_key]}") | |
| try: | |
| # Call the simplified fill_coverage_gaps function | |
| updated_schedule = fill_coverage_gaps(staff_list, schedule, cycle_hours, demand_dict, beds_per_staff) | |
| print("Returned from fill_coverage_gaps function") | |
| return updated_schedule | |
| except Exception as e: | |
| import traceback | |
| print(f"ERROR in assign_uncovered_hours: {e}") | |
| print(traceback.format_exc()) | |
| # Return the original schedule if there's an error | |
| return schedule | |
| def is_staff_available_dict(staff_id, day, start_hour, end_hour, schedule): | |
| """ | |
| Check if a staff member is available for a shift on a given day and time range. | |
| For dictionary-based schedules. | |
| Args: | |
| staff_id: Staff member ID | |
| day: Day to check | |
| start_hour: Start hour of the shift | |
| end_hour: End hour of the shift | |
| schedule: Current schedule (dictionary format) | |
| Returns: | |
| bool: True if staff is available, False otherwise | |
| """ | |
| # Debug information | |
| print(f"Checking availability for staff {staff_id} on day {day} from {start_hour} to {end_hour}") | |
| # Check if staff has a shift on this day | |
| if day in schedule: | |
| day_shifts = [] | |
| for shift in schedule[day]: | |
| # Check if shift is a dictionary | |
| if isinstance(shift, dict) and shift.get('staff_id') == staff_id: | |
| day_shifts.append(shift) | |
| # Handle string representation or other formats | |
| elif hasattr(shift, '__str__'): | |
| shift_str = str(shift) | |
| if str(staff_id) in shift_str: | |
| # Try to extract start and end times | |
| try: | |
| if '-' in shift_str: | |
| parts = shift_str.split('-') | |
| shift_start = int(parts[0].strip()) | |
| shift_end = int(parts[1].strip()) | |
| day_shifts.append({ | |
| 'start': shift_start, | |
| 'end': shift_end | |
| }) | |
| except (ValueError, IndexError): | |
| print(f"WARNING: Could not parse shift: {shift}") | |
| # If end_hour is less than start_hour, it means the shift goes into the next day | |
| overnight_shift = end_hour < start_hour | |
| for shift in day_shifts: | |
| shift_start = shift.get('start') | |
| shift_end = shift.get('end') | |
| if shift_start is None or shift_end is None: | |
| continue | |
| # Check for overnight shifts in the existing schedule | |
| shift_overnight = shift_end < shift_start | |
| # Case 1: Both shifts are within the same day | |
| if not overnight_shift and not shift_overnight: | |
| # Check if there's any overlap | |
| if not (end_hour <= shift_start or start_hour >= shift_end): | |
| print(f" Conflict found: Existing shift from {shift_start} to {shift_end}") | |
| return False | |
| # Case 2: New shift is overnight, existing shift is not | |
| elif overnight_shift and not shift_overnight: | |
| # Check if existing shift overlaps with either part of the overnight shift | |
| if not (shift_end <= start_hour): # Existing shift ends before overnight shift starts | |
| print(f" Conflict found: Existing shift from {shift_start} to {shift_end} overlaps with overnight shift") | |
| return False | |
| # Case 3: Existing shift is overnight, new shift is not | |
| elif not overnight_shift and shift_overnight: | |
| # Check if new shift overlaps with either part of the existing overnight shift | |
| if not (end_hour <= shift_start): # New shift ends before existing overnight shift starts | |
| print(f" Conflict found: New shift overlaps with existing overnight shift from {shift_start} to {shift_end}") | |
| return False | |
| # Case 4: Both shifts are overnight | |
| else: # both are overnight shifts | |
| # For overnight shifts, they will always overlap in some way | |
| print(f" Conflict found: Both are overnight shifts") | |
| return False | |
| # Check if staff has a shift on the previous day that extends into this day | |
| if not overnight_shift: # Only need to check this for regular shifts | |
| prev_day = (day - 1) % 28 # Assuming 28-day cycle | |
| if prev_day in schedule: | |
| prev_day_shifts = [] | |
| for shift in schedule[prev_day]: | |
| # Check if shift is a dictionary | |
| if isinstance(shift, dict) and shift.get('staff_id') == staff_id: | |
| prev_day_shifts.append(shift) | |
| # Handle string representation or other formats | |
| elif hasattr(shift, '__str__'): | |
| shift_str = str(shift) | |
| if str(staff_id) in shift_str: | |
| # Try to extract start and end times | |
| try: | |
| if '-' in shift_str: | |
| parts = shift_str.split('-') | |
| shift_start = int(parts[0].strip()) | |
| shift_end = int(parts[1].strip()) | |
| prev_day_shifts.append({ | |
| 'start': shift_start, | |
| 'end': shift_end | |
| }) | |
| except (ValueError, IndexError): | |
| print(f"WARNING: Could not parse shift: {shift}") | |
| for shift in prev_day_shifts: | |
| shift_start = shift.get('start') | |
| shift_end = shift.get('end') | |
| if shift_start is None or shift_end is None: | |
| continue | |
| # If the previous day's shift extends to the next day (overnight shift) | |
| if shift_end < shift_start: | |
| # Check if there's overlap with the beginning of the new shift | |
| if start_hour < shift_end: | |
| print(f" Conflict found: Previous day's overnight shift extends to {shift_end}") | |
| return False | |
| # Check if staff has a shift on the next day that would be affected by an overnight shift | |
| if overnight_shift: | |
| next_day = (day + 1) % 28 # Assuming 28-day cycle | |
| if next_day in schedule: | |
| next_day_shifts = [] | |
| for shift in schedule[next_day]: | |
| # Check if shift is a dictionary | |
| if isinstance(shift, dict) and shift.get('staff_id') == staff_id: | |
| next_day_shifts.append(shift) | |
| # Handle string representation or other formats | |
| elif hasattr(shift, '__str__'): | |
| shift_str = str(shift) | |
| if str(staff_id) in shift_str: | |
| # Try to extract start and end times | |
| try: | |
| if '-' in shift_str: | |
| parts = shift_str.split('-') | |
| shift_start = int(parts[0].strip()) | |
| shift_end = int(parts[1].strip()) | |
| next_day_shifts.append({ | |
| 'start': shift_start, | |
| 'end': shift_end | |
| }) | |
| except (ValueError, IndexError): | |
| print(f"WARNING: Could not parse shift: {shift}") | |
| for shift in next_day_shifts: | |
| shift_start = shift.get('start') | |
| shift_end = shift.get('end') | |
| if shift_start is None or shift_end is None: | |
| continue | |
| # Check if there's overlap with the end of the overnight shift | |
| if end_hour > shift_start: | |
| print(f" Conflict found: Next day's shift starts at {shift_start} before overnight shift ends") | |
| return False | |
| print(f" Staff {staff_id} is available for this shift") | |
| return True | |
| def is_staff_available(staff, day, start_hour, end_hour, schedule): | |
| """ | |
| Check if a staff member is available for a shift on a given day and time range. | |
| For DataFrame-based schedules. | |
| Args: | |
| staff: Staff member ID | |
| day: Day to check | |
| start_hour: Start hour of the shift | |
| end_hour: End hour of the shift | |
| schedule: Current schedule (DataFrame format) | |
| Returns: | |
| bool: True if staff is available, False otherwise | |
| """ | |
| # Debug information | |
| print(f"Checking availability for staff {staff} on day {day} from {start_hour} to {end_hour}") | |
| # Get all shifts for this staff member | |
| staff_shifts = schedule[schedule['Staff'] == staff] | |
| # Check if staff has a shift on this day | |
| day_shifts = staff_shifts[staff_shifts['Day'] == day] | |
| # If end_hour is less than start_hour, it means the shift goes into the next day | |
| overnight_shift = end_hour < start_hour | |
| for _, shift in day_shifts.iterrows(): | |
| shift_start = shift['Start'] | |
| shift_end = shift['End'] | |
| # Check for overnight shifts in the existing schedule | |
| shift_overnight = shift_end < shift_start | |
| # Case 1: Both shifts are within the same day | |
| if not overnight_shift and not shift_overnight: | |
| # Check if there's any overlap | |
| if not (end_hour <= shift_start or start_hour >= shift_end): | |
| print(f" Conflict found: Existing shift from {shift_start} to {shift_end}") | |
| return False | |
| # Case 2: New shift is overnight, existing shift is not | |
| elif overnight_shift and not shift_overnight: | |
| # Check if existing shift overlaps with either part of the overnight shift | |
| if not (shift_end <= start_hour): # Existing shift ends before overnight shift starts | |
| print(f" Conflict found: Existing shift from {shift_start} to {shift_end} overlaps with overnight shift") | |
| return False | |
| # Case 3: Existing shift is overnight, new shift is not | |
| elif not overnight_shift and shift_overnight: | |
| # Check if new shift overlaps with either part of the existing overnight shift | |
| if not (end_hour <= shift_start): # New shift ends before existing overnight shift starts | |
| print(f" Conflict found: New shift overlaps with existing overnight shift from {shift_start} to {shift_end}") | |
| return False | |
| # Case 4: Both shifts are overnight | |
| else: # both are overnight shifts | |
| # For overnight shifts, they will always overlap in some way | |
| print(f" Conflict found: Both are overnight shifts") | |
| return False | |
| # Check if staff has a shift on the previous day that extends into this day | |
| if not overnight_shift: # Only need to check this for regular shifts | |
| prev_day = (day - 1) % 28 # Assuming 28-day cycle | |
| prev_day_shifts = staff_shifts[staff_shifts['Day'] == prev_day] | |
| for _, shift in prev_day_shifts.iterrows(): | |
| shift_start = shift['Start'] | |
| shift_end = shift['End'] | |
| # If the previous day's shift extends to the next day (overnight shift) | |
| if shift_end < shift_start: | |
| # Check if there's overlap with the beginning of the new shift | |
| if start_hour < shift_end: | |
| print(f" Conflict found: Previous day's overnight shift extends to {shift_end}") | |
| return False | |
| # Check if staff has a shift on the next day that would be affected by an overnight shift | |
| if overnight_shift: | |
| next_day = (day + 1) % 28 # Assuming 28-day cycle | |
| next_day_shifts = staff_shifts[staff_shifts['Day'] == next_day] | |
| for _, shift in next_day_shifts.iterrows(): | |
| shift_start = shift['Start'] | |
| shift_end = shift['End'] | |
| # Check if there's overlap with the end of the overnight shift | |
| if end_hour > shift_start: | |
| print(f" Conflict found: Next day's shift starts at {shift_start} before overnight shift ends") | |
| return False | |
| print(f" Staff {staff} is available for this shift") | |
| return True | |
| # Define Gradio UI | |
| am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)] | |
| with gr.Blocks(title="Staff Scheduling Optimizer", css=""" | |
| #staff_assignment_table { | |
| width: 100% !important; | |
| } | |
| #csv_schedule { | |
| width: 100% !important; | |
| } | |
| .container { | |
| max-width: 100% !important; | |
| padding: 0 !important; | |
| } | |
| .download-btn { | |
| margin-top: 10px !important; | |
| } | |
| """) as iface: | |
| gr.Markdown("# Staff Scheduling Optimizer") | |
| gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.") | |
| with gr.Row(): | |
| # LEFT PANEL - Inputs | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Input Parameters") | |
| # Input parameters | |
| csv_input = gr.File(label="Upload CSV File", file_types=[".csv"]) | |
| beds_per_staff = gr.Number(label="Beds per Staff", value=3, precision=1) | |
| max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160, precision=0) | |
| hours_per_cycle = gr.Number(label="Hours per Cycle", value=5, precision=1) | |
| rest_days_per_week = gr.Number(label="Rest Days per Week", value=2, precision=0) | |
| clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="07:00 AM") | |
| clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="10:00 PM") | |
| overlap_time = gr.Number(label="Overlap Time", value=0.5, precision=1) | |
| max_start_time_change = gr.Number(label="Max Start Time Change", value=1, precision=0) | |
| exact_staff_count = gr.Number(label="Exact Staff Count (optional) (leave blank)", precision=0) | |
| overtime_percent = gr.Number(label="Overtime Allowed (%)", value=0, precision=0) | |
| optimize_btn = gr.Button("Start Scheduling", variant="primary") | |
| # RIGHT PANEL - Outputs | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Results") | |
| # Tabs for different outputs - reordered | |
| with gr.Tabs(): | |
| with gr.TabItem("Detailed Schedule"): | |
| with gr.Row(): | |
| csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule") | |
| with gr.Row(): | |
| schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True) | |
| with gr.TabItem("Gantt Chart"): | |
| gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart") | |
| with gr.TabItem("Staff Coverage by Cycle"): | |
| with gr.Row(): | |
| staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table") | |
| with gr.Row(): | |
| staff_download_file = gr.File(label="Download Coverage Table", visible=True) | |
| with gr.TabItem("Hours Visualization"): | |
| schedule_visualization = gr.Image(label="Hours by Day Visualization", elem_id="schedule_visualization") | |
| # Define download functions | |
| def create_download_link(df, filename="data.csv"): | |
| """Create a CSV download link for a dataframe""" | |
| if df is None or df.empty: | |
| return None | |
| csv_data = df.to_csv(index=False) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f: | |
| f.write(csv_data) | |
| return f.name | |
| # Update the optimize_and_display function | |
| def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, | |
| overlap_time, max_start_time_change, exact_staff_count, overtime_percent): | |
| try: | |
| # Convert AM/PM times to 24-hour format | |
| clinic_start = convert_to_24h(clinic_start_ampm) | |
| clinic_end = convert_to_24h(clinic_end_ampm) | |
| # Call the optimization function | |
| results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing( | |
| csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change, | |
| exact_staff_count, overtime_percent | |
| ) | |
| # Return the results | |
| return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path | |
| except Exception as e: | |
| # If there's an error in the optimization process, return a meaningful error message | |
| empty_staff_df = pd.DataFrame(columns=["Day"]) | |
| error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset." | |
| # Return error in the first output | |
| return empty_staff_df, None, None, None, None, None | |
| # Connect the button to the optimization function | |
| optimize_btn.click( | |
| fn=optimize_and_display, | |
| inputs=[ | |
| csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle, | |
| rest_days_per_week, clinic_start_ampm, clinic_end_ampm, | |
| overlap_time, max_start_time_change, exact_staff_count, overtime_percent | |
| ], | |
| outputs=[ | |
| staff_assignment_table, gantt_chart, csv_schedule, schedule_visualization, | |
| staff_download_file, schedule_download_file | |
| ] | |
| ) | |
| # Launch the Gradio app | |
| iface.launch(share=True) | |