nephro3solved7am / progress.py
anujkum0x's picture
Update progress.py
4b311c5 verified
import pandas as pd
import numpy as np
from ortools.sat.python import cp_model
import matplotlib.pyplot as plt
import gradio as gr
from itertools import product
import io
import base64
import tempfile
import os
from datetime import datetime
def am_pm(hour):
"""Converts 24-hour time to AM/PM format."""
period = "AM"
if hour >= 12:
period = "PM"
if hour > 12:
hour -= 12
elif hour == 0:
hour = 12 # Midnight
return f"{int(hour):02d}:00 {period}"
def show_dataframe(csv_path):
"""Reads a CSV file and returns a Pandas DataFrame."""
try:
df = pd.read_csv(csv_path)
return df
except Exception as e:
return f"Error loading CSV: {e}"
def optimize_staffing(
csv_file,
beds_per_staff,
max_hours_per_staff,
hours_per_cycle,
rest_days_per_week,
clinic_start,
clinic_end,
overlap_time,
max_start_time_change,
exact_staff_count=None,
overtime_percent=100
):
# Constants
STANDARD_PERIOD_DAYS = 30 # Standard month length
REST_DAYS_PER_WEEK = rest_days_per_week # Store as constant
try:
if isinstance(csv_file, str):
data = pd.read_csv(csv_file)
else:
data = pd.read_csv(io.StringIO(csv_file.decode('utf-8')))
except Exception as e:
print(f"Error loading CSV file: {e}")
return f"Error loading CSV file: {e}", None, None, None, None, None, None, None
# Get number of days and setup parameters
num_days = len(data)
BEDS_PER_STAFF = float(beds_per_staff)
BASE_MAX_HOURS = float(max_hours_per_staff)
MAX_HOURS_PER_STAFF = BASE_MAX_HOURS * (num_days / 30) # Scale for actual period length
original_max_hours = MAX_HOURS_PER_STAFF # Store original max hours for overtime calculations
# Calculate total work hours needed
total_staff_hours = 0
cycle_cols = [col for col in data.columns if col.startswith('cycle') and not col.endswith('_staff')]
# Calculate staff needed per cycle
for col in cycle_cols:
data[f'{col}_staff'] = np.ceil(data[col] / BEDS_PER_STAFF)
for _, row in data.iterrows():
for cycle in cycle_cols:
total_staff_hours += row[f'{cycle}_staff'] * float(hours_per_cycle)
# Define cycle times
cycle_times = {}
cycle_duration = float(hours_per_cycle)
for i, cycle in enumerate(cycle_cols):
if i == 0:
cycle_start = clinic_start
else:
prev_cycle_start = cycle_times[cycle_cols[i-1]][0]
cycle_start = (prev_cycle_start + cycle_duration) % 24
cycle_end = (cycle_start + cycle_duration) % 24
cycle_times[cycle] = (cycle_start, cycle_end)
# Generate all possible shifts
possible_shifts = []
shift_start_times = []
# Handle overnight clinic shifts
if clinic_end < clinic_start:
shift_start_times.extend(range(clinic_start, 24))
shift_start_times.extend(range(0, clinic_end + 1))
else:
shift_start_times.extend(range(clinic_start, clinic_end - 4 + 1))
# Generate shifts
for start_time in shift_start_times:
for duration in [8, 12]: # Only 8 and 12 hour shifts - most common in healthcare
end_time = (start_time + duration) % 24
shift = {
'id': f"{duration}hr_{start_time:02d}",
'start': start_time,
'end': end_time,
'duration': duration,
'cycles_covered': set()
}
# Determine cycles covered
for cycle, (cycle_start, cycle_end) in cycle_times.items():
if cycle_end < cycle_start: # overnight cycle
if (start_time >= cycle_start or end_time <= cycle_end or
(end_time < start_time and (start_time < cycle_end or end_time > cycle_start))):
shift['cycles_covered'].add(cycle)
else: # normal cycle
shift_end = end_time if end_time > start_time else end_time + 24
cycle_end_adj = cycle_end if cycle_end > cycle_start else cycle_end + 24
if not (shift_end <= cycle_start or start_time >= cycle_end_adj):
shift['cycles_covered'].add(cycle)
if shift['cycles_covered']:
possible_shifts.append(shift)
# Calculate minimum staff needed
theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF)
min_staff_with_rest = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK)))
min_staff_estimate = min_staff_with_rest
# Define optimize_schedule function inside optimize_staffing to access all variables
def optimize_schedule(num_staff, time_limit=600):
try:
# Create the model
model = cp_model.CpModel()
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit
solver.parameters.num_search_workers = 8 # Use multiple threads
# Scale factor for converting hours to integers (x100 for 2 decimal precision)
SCALE = 100
MAX_SCALED_HOURS = int(MAX_HOURS_PER_STAFF * SCALE)
# Create shift variables - 1 if staff s works shift on day d
x = {}
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
for shift in possible_shifts:
x[s, d, shift['id']] = model.NewBoolVar(f'shift_{s}_{d}_{shift["id"]}')
# Staff hours variables (scaled to integers)
staff_hours = {}
for s in range(1, num_staff+1):
staff_hours[s] = model.NewIntVar(0, MAX_SCALED_HOURS, f'hours_{s}')
max_staff_hours = model.NewIntVar(0, MAX_SCALED_HOURS, 'max_hours')
# Calculate staff hours (with scaling)
for s in range(1, num_staff+1):
model.Add(staff_hours[s] == sum(
x[s, d, shift['id']] * int(shift['duration'] * SCALE)
for d in range(1, num_days+1)
for shift in possible_shifts
))
model.Add(staff_hours[s] <= max_staff_hours)
model.Add(staff_hours[s] <= MAX_SCALED_HOURS)
# Coverage constraints
for d in range(1, num_days+1):
day_index = d - 1
for cycle in cycle_cols:
staff_needed = int(data.iloc[day_index][f'{cycle}_staff'])
if staff_needed > 0:
covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']]
model.Add(sum(
x[s, d, shift['id']]
for s in range(1, num_staff+1)
for shift in covering_shifts
) >= staff_needed)
# One shift per day per staff
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
model.Add(sum(x[s, d, shift['id']] for shift in possible_shifts) <= 1)
# Rest days constraint
days_per_week = min(7, num_days)
min_rest_days = max(1, min(REST_DAYS_PER_WEEK, days_per_week // 3))
for s in range(1, num_staff+1):
for w in range((num_days + days_per_week - 1) // days_per_week):
week_start = w * days_per_week + 1
week_end = min(week_start + days_per_week - 1, num_days)
days_in_week = week_end - week_start + 1
adjusted_rest_days = max(1, int(min_rest_days * days_in_week / 7))
model.Add(sum(
x[s, d, shift['id']]
for d in range(week_start, week_end+1)
for shift in possible_shifts
) <= days_in_week - adjusted_rest_days)
# Objective: Minimize maximum hours (scaled)
model.Minimize(max_staff_hours * 1000 + sum(staff_hours[s] for s in range(1, num_staff+1)))
# Solve
status = solver.Solve(model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# Extract solution
schedule = []
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
for shift in possible_shifts:
if solver.Value(x[s, d, shift['id']]) == 1:
schedule.append({
'staff_id': s,
'day': d,
'shift_id': shift['id'],
'start': shift['start'],
'end': shift['end'],
'duration': shift['duration'],
'cycles_covered': list(shift['cycles_covered'])
})
objective_value = solver.ObjectiveValue() / SCALE # Convert back to hours
return schedule, objective_value
else:
print(f"No solution found. Status: {status}")
return None, None
except Exception as e:
print(f"Error in optimization: {e}")
return None, None
# Modify the optimization attempt logic for exact staff count
if exact_staff_count is not None and exact_staff_count > 0:
staff_count = int(exact_staff_count)
time_limit = 120 # Reduced time limit
schedule, objective = optimize_schedule(staff_count, time_limit)
results = f"Input max hours per staff (30-day period): {BASE_MAX_HOURS}\n"
results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n"
results += f"(Adjustment ratio: {num_days}/{30} = {(num_days/30):.2f})\n\n"
results += f"Requested staff count: {staff_count}\n"
# Calculate theoretical minimum staff needed
theoretical_min_staff = np.ceil(total_staff_hours / MAX_HOURS_PER_STAFF)
min_staff_with_rest = np.ceil(theoretical_min_staff * (7 / (7 - REST_DAYS_PER_WEEK)))
results += f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n"
results += f"Minimum staff with rest days: {min_staff_with_rest:.1f}\n"
if staff_count < min_staff_with_rest:
# Calculate required overtime per staff
total_work_needed = total_staff_hours
hours_per_staff_no_overtime = total_work_needed / min_staff_with_rest
hours_per_staff_with_fewer = total_work_needed / staff_count
overtime_needed_percent = ((hours_per_staff_with_fewer / hours_per_staff_no_overtime) - 1) * 100
results += f"\nWARNING: Requested staff count ({staff_count}) is below minimum ({min_staff_with_rest:.1f})"
results += f"\nEach staff member will need approximately {overtime_needed_percent:.1f}% overtime"
results += f"\nAttempting solution with overtime...\n"
# Update MAX_HOURS_PER_STAFF with required overtime
original_max_hours = MAX_HOURS_PER_STAFF
MAX_HOURS_PER_STAFF *= (1 + overtime_needed_percent/100)
# Try to find solution with overtime
schedule, objective = optimize_schedule(staff_count)
if schedule is not None:
results += f"\nSolution found with {overtime_needed_percent:.1f}% overtime allowance\n"
results += f"Original max hours per staff: {original_max_hours:.1f}\n"
results += f"Adjusted max hours with overtime: {MAX_HOURS_PER_STAFF:.1f}\n"
# Calculate actual overtime for each staff
schedule_df = pd.DataFrame(schedule)
staff_overtime = {}
for s in range(1, staff_count + 1):
staff_shifts = schedule_df[schedule_df['staff_id'] == s]
total_hours = staff_shifts['duration'].sum()
if total_hours > original_max_hours:
overtime = total_hours - original_max_hours
overtime_percent = (overtime / original_max_hours) * 100
staff_overtime[s] = (total_hours, overtime, overtime_percent)
results += "\nDetailed overtime breakdown:\n"
for staff_id, (total, ot, pct) in staff_overtime.items():
results += f"Staff {staff_id}: {total:.1f} total hours, {ot:.1f} overtime hours ({pct:.1f}% overtime)\n"
else:
results += "\nFailed to find solution even with overtime. Try increasing staff count.\n"
return results, None, None, None, None, None, None, None
else:
# Start from theoretical minimum and work up
min_staff = max(1, int(theoretical_min_staff))
max_staff = int(min_staff_estimate) + 2 # Reduced range further
results = f"Input max hours per staff (30-day period): {BASE_MAX_HOURS}\n"
results += f"Adjusted max hours for {num_days}-day period: {MAX_HOURS_PER_STAFF:.1f}\n"
results += f"Theoretical minimum staff needed: {theoretical_min_staff:.1f}\n"
results += f"Searching for minimum staff count starting from {min_staff}...\n"
schedule = None
for staff_count in range(min_staff, max_staff + 1):
results += f"Trying with {staff_count} staff...\n"
time_limit = 90 # Very aggressive time limit
schedule, objective = optimize_schedule(staff_count, time_limit)
if schedule is not None:
results += f"Found feasible solution with {staff_count} staff.\n"
break
if schedule is None:
results += "Failed to find a feasible solution with standard staff counts.\n"
results += "Attempting solution with minimum staff and overtime...\n"
staff_count = min_staff
required_overtime = ((min_staff_estimate / staff_count) - 1) * 100
overtime_percent = max(overtime_percent, required_overtime)
# Update MAX_HOURS_PER_STAFF with overtime
original_max_hours = MAX_HOURS_PER_STAFF
MAX_HOURS_PER_STAFF *= (1 + overtime_percent/100)
schedule, objective = optimize_schedule(staff_count)
if schedule is not None:
results += f"\nSolution found with {staff_count} staff and {overtime_percent:.1f}% overtime\n"
results += f"Original max hours: {original_max_hours:.1f}\n"
results += f"Adjusted max hours with overtime: {MAX_HOURS_PER_STAFF:.1f}\n"
results += f"Optimal solution found with {staff_count} staff\n"
results += f"Total staff hours: {objective}\n"
# Convert to DataFrame for analysis
schedule_df = pd.DataFrame(schedule)
# Analyze staff workload with overtime calculations
staff_hours = {}
overtime_hours = {}
for s in range(1, staff_count+1):
staff_shifts = schedule_df[schedule_df['staff_id'] == s]
total_hours = staff_shifts['duration'].sum()
staff_hours[s] = total_hours
# Calculate overtime
if total_hours > original_max_hours:
overtime = total_hours - original_max_hours
overtime_percent = (overtime / original_max_hours) * 100
overtime_hours[s] = (overtime, overtime_percent)
# Display staff hours and overtime
results += "\nStaff Hours and Overtime:\n"
for staff_id, hours in staff_hours.items():
results += f"Staff {staff_id}: {hours:.1f} hours"
if staff_id in overtime_hours:
ot_hours, ot_percent = overtime_hours[staff_id]
results += f" (includes {ot_hours:.1f} overtime hours, {ot_percent:.1f}% overtime)"
results += "\n"
# Convert to DataFrame for analysis
schedule_df = pd.DataFrame(schedule)
# Analyze staff workload
staff_hours = {}
for s in range(1, staff_count+1):
staff_shifts = schedule_df[schedule_df['staff_id'] == s]
total_hours = staff_shifts['duration'].sum()
staff_hours[s] = total_hours
# Handle staff hours display based on whether exact count was specified
if exact_staff_count is not None:
# When exact count is specified, show all staff including those with 0 hours
active_staff_hours = staff_hours
else:
# Otherwise, only show active staff
active_staff_hours = {s: hours for s, hours in staff_hours.items() if hours > 0}
results += "\nStaff Hours:\n"
total_active_hours = sum(active_staff_hours.values())
avg_hours = total_active_hours / len(active_staff_hours) if active_staff_hours else 0
for staff_id, hours in active_staff_hours.items():
utilization = (hours / MAX_HOURS_PER_STAFF) * 100
deviation_from_avg = ((hours - avg_hours) / avg_hours * 100) if avg_hours > 0 else 0
results += f"Staff {staff_id}: {hours:.1f} hours ({utilization:.1f}% utilization)"
if exact_staff_count is not None:
results += f" [Deviation from avg: {deviation_from_avg:+.1f}%]"
results += "\n"
# Add overtime information
if staff_id in overtime_hours:
ot_hours, ot_percent = overtime_hours[staff_id]
results += f" Overtime: {ot_hours:.1f} hours ({ot_percent:.1f}%)\n"
if exact_staff_count is not None:
results += f"\nWorkload Distribution Stats:\n"
results += f"Average hours per staff: {avg_hours:.1f}\n"
if active_staff_hours:
max_deviation = max(abs((hours - avg_hours) / avg_hours * 100) for hours in active_staff_hours.values()) if avg_hours > 0 else 0
results += f"Maximum deviation from average: {max_deviation:.1f}%\n"
# Use active_staff_hours for average utilization calculation
active_staff_count = len(active_staff_hours)
avg_utilization = sum(active_staff_hours.values()) / (active_staff_count * MAX_HOURS_PER_STAFF) * 100
results += f"\nAverage staff utilization: {avg_utilization:.1f}%\n"
# Check coverage for each day and cycle
coverage_check = []
for d in range(1, num_days+1):
day_index = d - 1 # 0-indexed for DataFrame
day_schedule = schedule_df[schedule_df['day'] == d]
for cycle in cycle_cols:
required = data.iloc[day_index][f'{cycle}_staff']
# Count staff covering this cycle
assigned = sum(1 for _, shift in day_schedule.iterrows()
if cycle in shift['cycles_covered'])
coverage_check.append({
'day': d,
'cycle': cycle,
'required': required,
'assigned': assigned,
'satisfied': assigned >= required
})
coverage_df = pd.DataFrame(coverage_check)
satisfaction = coverage_df['satisfied'].mean() * 100
results += f"Coverage satisfaction: {satisfaction:.1f}%\n"
if satisfaction < 100:
results += "Warning: Not all staffing requirements are met!\n"
unsatisfied = coverage_df[~coverage_df['satisfied']]
results += unsatisfied.to_string() + "\n"
# Generate detailed schedule report
detailed_schedule = "Detailed Schedule:\n"
for d in range(1, num_days+1):
day_schedule = schedule_df[schedule_df['day'] == d]
day_schedule = day_schedule.sort_values(['start'])
detailed_schedule += f"\nDay {d}:\n"
for _, shift in day_schedule.iterrows():
start_hour = shift['start']
end_hour = shift['end']
start_str = am_pm(start_hour)
end_str = am_pm(end_hour)
cycles = ", ".join(shift['cycles_covered'])
detailed_schedule += f" Staff {shift['staff_id']}: {start_str}-{end_str} ({shift['duration']} hrs), Cycles: {cycles}\n"
# Generate schedule visualization
fig, ax = plt.subplots(figsize=(15, 8))
# Prepare schedule for plotting
staff_days = {}
for s in range(1, staff_count+1):
staff_days[s] = [0] * num_days # 0 means off duty
for _, shift in schedule_df.iterrows():
staff_id = shift['staff_id']
day = shift['day'] - 1 # 0-indexed
staff_days[staff_id][day] = shift['duration']
# Plot the schedule
for s, hours in staff_days.items():
ax.bar(range(1, num_days+1), hours, label=f'Staff {s}')
ax.set_xlabel('Day')
ax.set_ylabel('Shift Hours')
ax.set_title('Staff Schedule')
ax.set_xticks(range(1, num_days+1))
ax.legend()
# Save the figure to a temporary file
plot_path = None
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
plt.savefig(f.name)
plt.close(fig)
plot_path = f.name
# Create a Gantt chart with advanced visuals and alternating labels - only showing active staff
gantt_path = create_gantt_chart(schedule_df, num_days, staff_count)
# Convert schedule to CSV data
schedule_df['start_ampm'] = schedule_df['start'].apply(am_pm)
schedule_df['end_ampm'] = schedule_df['end'].apply(am_pm)
schedule_csv = schedule_df[['staff_id', 'day', 'start_ampm', 'end_ampm', 'duration', 'cycles_covered']].to_csv(index=False)
# Create a temporary file and write the CSV data into it
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file:
temp_file.write(schedule_csv)
schedule_csv_path = temp_file.name
# Create staff assignment table
staff_assignment_data = []
for d in range(1, num_days + 1):
cycle_staff = {}
for cycle in cycle_cols:
# Get staff IDs assigned to this cycle on this day
staff_ids = schedule_df[(schedule_df['day'] == d) & (schedule_df['cycles_covered'].apply(lambda x: cycle in x))]['staff_id'].tolist()
cycle_staff[cycle] = len(staff_ids)
staff_assignment_data.append([d] + [cycle_staff[cycle] for cycle in cycle_cols])
staff_assignment_df = pd.DataFrame(staff_assignment_data, columns=['Day'] + cycle_cols)
# Create CSV files for download
staff_assignment_csv_path = None
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as temp_file:
staff_assignment_df.to_csv(temp_file.name, index=False)
staff_assignment_csv_path = temp_file.name
# Return all required values in the correct order
return results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path
def convert_to_24h(time_str):
"""Converts AM/PM time string to 24-hour format."""
try:
time_obj = datetime.strptime(time_str, "%I:00 %p")
return time_obj.hour
except ValueError:
return None
def gradio_wrapper(
csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle,
rest_days_per_week, clinic_start_ampm, clinic_end_ampm, overlap_time, max_start_time_change,
exact_staff_count=None, overtime_percent=100
):
try:
# Convert AM/PM times to 24-hour format
clinic_start = convert_to_24h(clinic_start_ampm)
clinic_end = convert_to_24h(clinic_end_ampm)
# Call the optimization function
results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing(
csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle,
rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change,
exact_staff_count, overtime_percent
)
# Return the results
return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path
except Exception as e:
# If there's an error in the optimization process, return a meaningful error message
empty_staff_df = pd.DataFrame(columns=["Day"])
error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset."
# Return error in the first output
return empty_staff_df, None, None, None, None, None
# Create a Gantt chart with advanced visuals and alternating labels - only showing active staff
def create_gantt_chart(schedule_df, num_days, staff_count):
# Get the list of active staff IDs (staff who have at least one shift)
active_staff_ids = sorted(schedule_df['staff_id'].unique())
active_staff_count = len(active_staff_ids)
# Create a mapping from original staff ID to position in the chart
staff_position = {staff_id: i+1 for i, staff_id in enumerate(active_staff_ids)}
# Create a figure size based on the actual number of days
plt.figure(figsize=(max(15, num_days * 1.2), max(8, active_staff_count * 0.8)), dpi=200)
# Use a more sophisticated color palette - only for active staff
colors = plt.cm.viridis(np.linspace(0.1, 0.9, active_staff_count))
# Set a modern style
plt.style.use('seaborn-v0_8-whitegrid')
# Create a new axis with a slight background color
ax = plt.gca()
ax.set_facecolor('#f8f9fa')
# Sort by staff then day
schedule_df = schedule_df.sort_values(['staff_id', 'day'])
# Plot Gantt chart - only for active staff
for i, staff_id in enumerate(active_staff_ids):
staff_shifts = schedule_df[schedule_df['staff_id'] == staff_id]
y_pos = active_staff_count - i # Position based on index in active staff list
# Add staff label with a background box
ax.text(-0.7, y_pos, f"Staff {staff_id}", fontsize=12, fontweight='bold',
ha='right', va='center', bbox=dict(facecolor='white', edgecolor='gray',
boxstyle='round,pad=0.5', alpha=0.9))
# Add a subtle background for each staff row
ax.axhspan(y_pos-0.4, y_pos+0.4, color='white', alpha=0.4, zorder=-5)
# Track shift positions to avoid label overlap
shift_positions = []
for idx, shift in enumerate(staff_shifts.iterrows()):
_, shift = shift
day = shift['day']
start_hour = shift['start']
end_hour = shift['end']
duration = shift['duration']
# Format times for display
start_ampm = am_pm(start_hour)
end_ampm = am_pm(end_hour)
# Calculate shift position
shift_start_pos = day-1+start_hour/24
# Handle overnight shifts
if end_hour < start_hour: # Overnight shift
# First part of shift (until midnight)
rect1 = ax.barh(y_pos, (24-start_hour)/24, left=shift_start_pos,
height=0.6, color=colors[i], alpha=0.9,
edgecolor='black', linewidth=1, zorder=10)
# Add gradient effect
for r in rect1:
r.set_edgecolor('black')
r.set_linewidth(1)
# Second part of shift (after midnight)
rect2 = ax.barh(y_pos, end_hour/24, left=day,
height=0.6, color=colors[i], alpha=0.9,
edgecolor='black', linewidth=1, zorder=10)
# Add gradient effect
for r in rect2:
r.set_edgecolor('black')
r.set_linewidth(1)
# For overnight shifts, we'll place the label in the first part if it's long enough
shift_width = (24-start_hour)/24
if shift_width >= 0.1: # Only add label if there's enough space
label_pos = shift_start_pos + shift_width/2
# Alternate labels above and below
y_offset = 0.35 if idx % 2 == 0 else -0.35
# Add label with background for better readability
label = f"{start_ampm}-{end_ampm}"
text = ax.text(label_pos, y_pos + y_offset, label,
ha='center', va='center', fontsize=9, fontweight='bold',
color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3,
boxstyle='round,pad=0.3', edgecolor='gray'),
zorder=20)
shift_positions.append(label_pos)
else:
# Regular shift
shift_width = duration/24
rect = ax.barh(y_pos, shift_width, left=shift_start_pos,
height=0.6, color=colors[i], alpha=0.9,
edgecolor='black', linewidth=1, zorder=10)
# Add gradient effect
for r in rect:
r.set_edgecolor('black')
r.set_linewidth(1)
# Only add label if there's enough space
if shift_width >= 0.1:
label_pos = shift_start_pos + shift_width/2
# Alternate labels above and below
y_offset = 0.35 if idx % 2 == 0 else -0.35
# Add label with background for better readability
label = f"{start_ampm}-{end_ampm}"
text = ax.text(label_pos, y_pos + y_offset, label,
ha='center', va='center', fontsize=9, fontweight='bold',
color='black', bbox=dict(facecolor='white', alpha=0.9, pad=3,
boxstyle='round,pad=0.3', edgecolor='gray'),
zorder=20)
shift_positions.append(label_pos)
# Add weekend highlighting with a more sophisticated look
for day in range(1, num_days + 1):
# Determine if this is a weekend (assuming day 1 is Monday)
is_weekend = (day % 7 == 0) or (day % 7 == 6) # Saturday or Sunday
if is_weekend:
ax.axvspan(day-1, day, alpha=0.15, color='#ff9999', zorder=-10)
day_label = "Saturday" if day % 7 == 6 else "Sunday"
ax.text(day-0.5, 0.2, day_label, ha='center', fontsize=10, color='#cc0000',
fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=2, boxstyle='round'))
# Set x-axis ticks for each day with better formatting
ax.set_xticks(np.arange(0.5, num_days, 1))
day_labels = [f"Day {d}" for d in range(1, num_days+1)]
ax.set_xticklabels(day_labels, rotation=45 if num_days > 20 else 0, ha='center', fontsize=10)
# Add vertical lines between days with better styling
for day in range(1, num_days):
ax.axvline(x=day, color='#aaaaaa', linestyle='-', alpha=0.5, zorder=-5)
# Set y-axis ticks for each staff
ax.set_yticks(np.arange(1, active_staff_count+1))
ax.set_yticklabels([]) # Remove default labels as we've added custom ones
# Set axis limits with some padding
ax.set_xlim(-0.8, num_days)
ax.set_ylim(0.5, active_staff_count + 0.5)
# Add grid for hours (every 6 hours) with better styling
for day in range(num_days):
for hour in [6, 12, 18]:
ax.axvline(x=day + hour/24, color='#cccccc', linestyle=':', alpha=0.5, zorder=-5)
# Add small hour markers at the bottom
hour_label = "6AM" if hour == 6 else "Noon" if hour == 12 else "6PM"
ax.text(day + hour/24, 0, hour_label, ha='center', va='bottom', fontsize=7,
color='#666666', rotation=90, alpha=0.7)
# Add title and labels with more sophisticated styling
plt.title(f'Staff Schedule ({active_staff_count} Active Staff)', fontsize=24, fontweight='bold', pad=20, color='#333333')
plt.xlabel('Day', fontsize=16, labelpad=10, color='#333333')
# Add a legend for time reference with better styling
time_box = plt.figtext(0.01, 0.01, "Time Reference:", ha='left', fontsize=10,
fontweight='bold', color='#333333')
time_markers = ['6 AM', 'Noon', '6 PM', 'Midnight']
for i, time in enumerate(time_markers):
plt.figtext(0.08 + i*0.06, 0.01, time, ha='left', fontsize=9, color='#555555')
# Remove spines
for spine in ['top', 'right', 'left']:
ax.spines[spine].set_visible(False)
# Add a note about weekends with better styling
weekend_note = plt.figtext(0.01, 0.97, "Red areas = Weekends", fontsize=12,
color='#cc0000', fontweight='bold',
bbox=dict(facecolor='white', alpha=0.7, pad=5, boxstyle='round'))
# Add a subtle border around the entire chart
plt.box(False)
# Save the Gantt chart with high quality
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
plt.tight_layout()
plt.savefig(f.name, dpi=200, bbox_inches='tight', facecolor='white')
plt.close()
return f.name
def optimize_schedule(num_staff, time_limit=600):
try:
# Create the model
model = cp_model.CpModel()
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit
solver.parameters.num_search_workers = 8 # Use multiple threads
# Scale factor for converting hours to integers (x100 for 2 decimal precision)
SCALE = 100
MAX_SCALED_HOURS = int(MAX_HOURS_PER_STAFF * SCALE)
# Create shift variables - 1 if staff s works shift on day d
x = {}
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
for shift in possible_shifts:
x[s, d, shift['id']] = model.NewBoolVar(f'shift_{s}_{d}_{shift["id"]}')
# Staff hours variables (scaled to integers)
staff_hours = {}
for s in range(1, num_staff+1):
staff_hours[s] = model.NewIntVar(0, MAX_SCALED_HOURS, f'hours_{s}')
max_staff_hours = model.NewIntVar(0, MAX_SCALED_HOURS, 'max_hours')
# Calculate staff hours (with scaling)
for s in range(1, num_staff+1):
model.Add(staff_hours[s] == sum(
x[s, d, shift['id']] * int(shift['duration'] * SCALE)
for d in range(1, num_days+1)
for shift in possible_shifts
))
model.Add(staff_hours[s] <= max_staff_hours)
model.Add(staff_hours[s] <= MAX_SCALED_HOURS)
# Coverage constraints
for d in range(1, num_days+1):
day_index = d - 1
for cycle in cycle_cols:
staff_needed = int(data.iloc[day_index][f'{cycle}_staff'])
if staff_needed > 0:
covering_shifts = [shift for shift in possible_shifts if cycle in shift['cycles_covered']]
model.Add(sum(
x[s, d, shift['id']]
for s in range(1, num_staff+1)
for shift in covering_shifts
) >= staff_needed)
# One shift per day per staff
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
model.Add(sum(x[s, d, shift['id']] for shift in possible_shifts) <= 1)
# Rest days constraint
days_per_week = min(7, num_days)
min_rest_days = max(1, min(REST_DAYS_PER_WEEK, days_per_week // 3))
for s in range(1, num_staff+1):
for w in range((num_days + days_per_week - 1) // days_per_week):
week_start = w * days_per_week + 1
week_end = min(week_start + days_per_week - 1, num_days)
days_in_week = week_end - week_start + 1
adjusted_rest_days = max(1, int(min_rest_days * days_in_week / 7))
model.Add(sum(
x[s, d, shift['id']]
for d in range(week_start, week_end+1)
for shift in possible_shifts
) <= days_in_week - adjusted_rest_days)
# Objective: Minimize maximum hours (scaled)
model.Minimize(max_staff_hours * 1000 + sum(staff_hours[s] for s in range(1, num_staff+1)))
# Solve
status = solver.Solve(model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# Extract solution
schedule = []
for s in range(1, num_staff+1):
for d in range(1, num_days+1):
for shift in possible_shifts:
if solver.Value(x[s, d, shift['id']]) == 1:
schedule.append({
'staff_id': s,
'day': d,
'shift_id': shift['id'],
'start': shift['start'],
'end': shift['end'],
'duration': shift['duration'],
'cycles_covered': list(shift['cycles_covered'])
})
objective_value = solver.ObjectiveValue() / SCALE # Convert back to hours
return schedule, objective_value
else:
print(f"No solution found. Status: {status}")
return None, None
except Exception as e:
print(f"Error in optimization: {e}")
return None, None
# Define Gradio UI
am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)]
with gr.Blocks(title="Staff Scheduling Optimizer", css="""
#staff_assignment_table {
width: 100% !important;
}
#csv_schedule {
width: 100% !important;
}
.container {
max-width: 100% !important;
padding: 0 !important;
}
.download-btn {
margin-top: 10px !important;
}
""") as iface:
gr.Markdown("# Staff Scheduling Optimizer")
gr.Markdown("Upload a CSV file with cycle data and configure parameters to generate an optimal staff schedule.")
with gr.Row():
# LEFT PANEL - Inputs
with gr.Column(scale=1):
gr.Markdown("### Input Parameters")
# Input parameters
csv_input = gr.File(label="Upload CSV")
beds_per_staff = gr.Number(label="Beds per Staff", value=3)
max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160)
hours_per_cycle = gr.Number(label="Hours per Cycle", value=4)
rest_days_per_week = gr.Number(label="Rest Days per Week", value=2)
clinic_start_ampm = gr.Dropdown(label="Clinic Start Hour (AM/PM)", choices=am_pm_times, value="08:00 AM")
clinic_end_ampm = gr.Dropdown(label="Clinic End Hour (AM/PM)", choices=am_pm_times, value="08:00 PM")
overlap_time = gr.Number(label="Overlap Time", value=0)
max_start_time_change = gr.Number(label="Max Start Time Change", value=2)
exact_staff_count = gr.Number(label="Exact Staff Count (optional)", value=None)
overtime_percent = gr.Slider(label="Overtime Allowed (%)", minimum=0, maximum=100, value=100, step=10)
optimize_btn = gr.Button("Optimize Schedule", variant="primary", size="lg")
# RIGHT PANEL - Outputs
with gr.Column(scale=2):
gr.Markdown("### Results")
# Tabs for different outputs - reordered
with gr.Tabs():
with gr.TabItem("Detailed Schedule"):
with gr.Row():
csv_schedule = gr.Dataframe(label="Detailed Schedule", elem_id="csv_schedule")
with gr.Row():
schedule_download_file = gr.File(label="Download Detailed Schedule", visible=True)
with gr.TabItem("Gantt Chart"):
gantt_chart = gr.Image(label="Staff Schedule Visualization", elem_id="gantt_chart")
with gr.TabItem("Staff Coverage by Cycle"):
with gr.Row():
staff_assignment_table = gr.Dataframe(label="Staff Count in Each Cycle (Staff May Overlap)", elem_id="staff_assignment_table")
with gr.Row():
staff_download_file = gr.File(label="Download Coverage Table", visible=True)
with gr.TabItem("Hours Visualization"):
schedule_visualization = gr.Image(label="Hours by Day Visualization", elem_id="schedule_visualization")
# Define download functions
def create_download_link(df, filename="data.csv"):
"""Create a CSV download link for a dataframe"""
if df is None or df.empty:
return None
csv_data = df.to_csv(index=False)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f:
f.write(csv_data)
return f.name
# Update the optimize_and_display function
def optimize_and_display(csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle,
rest_days_per_week, clinic_start_ampm, clinic_end_ampm,
overlap_time, max_start_time_change, exact_staff_count, overtime_percent):
try:
# Convert AM/PM times to 24-hour format
clinic_start = convert_to_24h(clinic_start_ampm)
clinic_end = convert_to_24h(clinic_end_ampm)
# Call the optimization function
results, staff_assignment_df, gantt_path, schedule_df, plot_path, schedule_csv_path, staff_assignment_csv_path = optimize_staffing(
csv_file, beds_per_staff, max_hours_per_staff, hours_per_cycle,
rest_days_per_week, clinic_start, clinic_end, overlap_time, max_start_time_change,
exact_staff_count, overtime_percent
)
# Return the results
return staff_assignment_df, gantt_path, schedule_df, plot_path, staff_assignment_csv_path, schedule_csv_path
except Exception as e:
# If there's an error in the optimization process, return a meaningful error message
empty_staff_df = pd.DataFrame(columns=["Day"])
error_message = f"Error during optimization: {str(e)}\n\nPlease try with different parameters or a simpler dataset."
# Return error in the first output
return empty_staff_df, None, None, None, None, None
# Connect the button to the optimization function
optimize_btn.click(
fn=optimize_and_display,
inputs=[
csv_input, beds_per_staff, max_hours_per_staff, hours_per_cycle,
rest_days_per_week, clinic_start_ampm, clinic_end_ampm,
overlap_time, max_start_time_change, exact_staff_count, overtime_percent
],
outputs=[
staff_assignment_table, gantt_chart, csv_schedule, schedule_visualization,
staff_download_file, schedule_download_file
]
)
# Launch the Gradio app
iface.launch(share=True)