try / app.py
anujkum0x's picture
Update app.py
011591f verified
import numpy as np
import pandas as pd
import plotly.figure_factory as ff
import gradio as gr
from datetime import datetime, timedelta
import math
import random
from copy import deepcopy
# Load and process data
def load_data(file_path):
df = pd.read_csv(file_path)
return df
# Convert demand data to required staff
def calculate_staff_needed(demand_data, beds_per_staff=3):
staff_needed = {}
for column in demand_data.columns:
if column.startswith('cycle'):
staff_needed[column] = []
for value in demand_data[column]:
if pd.isna(value):
staff_needed[column].append(0)
else:
staff_needed[column].append(math.ceil(value / beds_per_staff))
return pd.DataFrame({'Date': demand_data['Date'], **staff_needed})
# Create time slots based on clinic hours
def create_time_slots(start_time, end_time, cycle_duration=5):
start_hour = int(start_time.split(':')[0])
start_minute = int(start_time.split(':')[1])
end_hour = int(end_time.split(':')[0])
end_minute = int(end_time.split(':')[1])
start_datetime = datetime(2025, 1, 1, start_hour, start_minute)
end_datetime = datetime(2025, 1, 1, end_hour, end_minute)
if end_datetime <= start_datetime:
end_datetime += timedelta(days=1)
current_time = start_datetime
time_slots = []
while current_time < end_datetime:
next_time = current_time + timedelta(hours=cycle_duration)
if next_time > end_datetime:
next_time = end_datetime
time_slots.append({
'start': current_time.strftime('%H:%M'),
'end': next_time.strftime('%H:%M')
})
current_time = next_time
return time_slots
# Check if a shift respects duration constraints
def is_valid_shift_duration(shift, max_duration=12):
start_hour = int(shift['start_time'].split(':')[0])
start_minute = int(shift['start_time'].split(':')[1])
end_hour = int(shift['end_time'].split(':')[0])
end_minute = int(shift['end_time'].split(':')[1])
start = start_hour * 60 + start_minute
end = end_hour * 60 + end_minute
if end < start: # Overnight shift
end += 24 * 60
duration = (end - start) / 60 # in hours
return duration <= max_duration
# Check if shift change follows constraints
def is_valid_shift_change(prev_shift, curr_shift, allowed_diff=1):
if not prev_shift:
return True
if prev_shift['is_rest_day']:
return True
prev_start = int(prev_shift['start_time'].split(':')[0]) * 60 + int(prev_shift['start_time'].split(':')[1])
curr_start = int(curr_shift['start_time'].split(':')[0]) * 60 + int(curr_shift['start_time'].split(':')[1])
diff_hours = abs(curr_start - prev_start) / 60
return diff_hours <= allowed_diff
# Check weekly rest day requirement
def has_weekly_rest_day(schedule, current_day_idx, days_in_week=7):
if current_day_idx < days_in_week:
# Count rest days in the partial week
rest_days = sum(1 for i in range(current_day_idx) if schedule[i]['is_rest_day'])
return rest_days > 0
else:
# Check the previous 6 days plus current day
rest_days = sum(1 for i in range(current_day_idx - 6, current_day_idx + 1) if schedule[i]['is_rest_day'])
return rest_days > 0
# Check total monthly hours
def calculate_monthly_hours(schedule):
total_hours = 0
for day in schedule:
if not day['is_rest_day']:
start_hour = int(day['start_time'].split(':')[0])
start_minute = int(day['start_time'].split(':')[1])
end_hour = int(day['end_time'].split(':')[0])
end_minute = int(day['end_time'].split(':')[1])
start = start_hour * 60 + start_minute
end = end_hour * 60 + end_minute
if end < start: # Overnight shift
end += 24 * 60
shift_hours = (end - start) / 60
total_hours += shift_hours
return total_hours
# Generate all possible shifts for a day
def generate_possible_shifts(time_slots, max_duration=12, handover_overlap=30):
possible_shifts = []
for i in range(len(time_slots)):
for j in range(i, len(time_slots)):
start_time = time_slots[i]['start']
end_time = time_slots[j]['end']
# Add handover overlap
end_hour, end_minute = map(int, end_time.split(':'))
end_minute += handover_overlap
if end_minute >= 60:
end_hour += 1
end_minute -= 60
end_time = f"{end_hour:02d}:{end_minute:02d}"
shift = {
'start_time': start_time,
'end_time': end_time,
'cycles': list(range(i, j+1))
}
if is_valid_shift_duration(shift, max_duration):
possible_shifts.append(shift)
return possible_shifts
# Primary scheduling algorithm
def schedule_staff(staff_needed, time_slots, start_date, clinic_start, clinic_end,
max_duration=12, beds_per_staff=3, max_monthly_hours=234, handover_overlap=30):
dates = pd.to_datetime(staff_needed['Date'])
date_range = [d.strftime('%Y-%m-%d') for d in dates]
# Create demand array
demand_array = np.zeros((len(date_range), len(time_slots)))
for i, date in enumerate(date_range):
for j, cycle in enumerate(staff_needed.columns[1:]): # Skip 'Date' column
if j < len(time_slots):
demand_array[i, j] = staff_needed[cycle].iloc[i]
# Generate all possible shifts
possible_shifts = generate_possible_shifts(time_slots, max_duration, handover_overlap)
# Initialize staff schedules
staff_schedules = []
# Helper function to create a rest day entry
def create_rest_day(date):
return {
'date': date,
'is_rest_day': True,
'start_time': '',
'end_time': '',
'cycles': []
}
# Initialize coverage array
coverage = np.zeros_like(demand_array)
# Function to calculate fitness of current solution
def calculate_fitness(coverage, demand):
# Calculate how well coverage meets demand
shortfall = np.maximum(0, demand - coverage)
return -np.sum(shortfall) # Negative because we want to minimize shortfall
# Add staff members until demand is met
iteration_count = 0
max_iterations = 1000
while np.any(coverage < demand_array) and iteration_count < max_iterations:
# Create a new staff schedule
new_staff = []
for i, date in enumerate(date_range):
# Decide whether this is a rest day
if i > 0 and not has_weekly_rest_day(new_staff, i-1):
new_staff.append(create_rest_day(date))
continue
# Find the best shift for this day
best_shift = None
best_improvement = -1
# Try rest day
temp_schedule = deepcopy(new_staff)
temp_schedule.append(create_rest_day(date))
if has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and calculate_monthly_hours(temp_schedule) <= max_monthly_hours:
# Rest day is valid
best_shift = create_rest_day(date)
# Try each possible shift
for shift in possible_shifts:
# Check if this shift would help with uncovered demand
will_help = False
for cycle_idx in shift['cycles']:
if cycle_idx < coverage.shape[1] and coverage[i, cycle_idx] < demand_array[i, cycle_idx]:
will_help = True
break
if not will_help:
continue
# Create temporary shift for validation
temp_shift = {
'date': date,
'is_rest_day': False,
'start_time': shift['start_time'],
'end_time': shift['end_time'],
'cycles': shift['cycles']
}
# Check if shift change is valid
if i > 0 and not is_valid_shift_change(new_staff[-1], temp_shift):
continue
# Check if adding this shift would exceed monthly hours
temp_schedule = deepcopy(new_staff)
temp_schedule.append(temp_shift)
if calculate_monthly_hours(temp_schedule) > max_monthly_hours:
continue
# Check weekly rest day requirement
if not has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and i < len(date_range) - 1:
continue
# Calculate improvement in coverage
temp_coverage = coverage.copy()
for cycle_idx in shift['cycles']:
if cycle_idx < temp_coverage.shape[1]:
temp_coverage[i, cycle_idx] += 1
improvement = calculate_fitness(temp_coverage, demand_array) - calculate_fitness(coverage, demand_array)
if improvement > best_improvement:
best_improvement = improvement
best_shift = temp_shift
# If no valid shift is found, use a rest day
if best_shift is None:
best_shift = create_rest_day(date)
new_staff.append(best_shift)
# Update coverage if not a rest day
if not best_shift['is_rest_day']:
for cycle_idx in best_shift['cycles']:
if cycle_idx < coverage.shape[1]:
coverage[i, cycle_idx] += 1
# Add the new staff schedule
staff_schedules.append(new_staff)
# Check if we've met all demands
if not np.any(coverage < demand_array):
break
iteration_count += 1
return staff_schedules, coverage, demand_array
# Convert schedules to CSV format
def schedules_to_csv(staff_schedules):
rows = []
for staff_idx, schedule in enumerate(staff_schedules):
for day in schedule:
if not day['is_rest_day']:
rows.append({
'Staff ID': f'Staff {staff_idx+1}',
'Date': day['date'],
'Start Time': day['start_time'],
'End Time': day['end_time'],
'Is Rest Day': 'No'
})
else:
rows.append({
'Staff ID': f'Staff {staff_idx+1}',
'Date': day['date'],
'Start Time': '',
'End Time': '',
'Is Rest Day': 'Yes'
})
return pd.DataFrame(rows)
# Generate Gantt chart
def create_gantt_chart(staff_schedules, clinic_start, clinic_end):
tasks = []
colors = {}
for staff_idx, schedule in enumerate(staff_schedules):
staff_id = f'Staff {staff_idx+1}'
colors[staff_id] = f'rgb({random.randint(50, 200)}, {random.randint(50, 200)}, {random.randint(50, 200)})'
for day in schedule:
if not day['is_rest_day']:
date = day['date']
start_time = day['start_time']
end_time = day['end_time']
start_dt = f"{date} {start_time}"
end_dt = f"{date} {end_time}"
# Handle overnight shifts
start_hour = int(start_time.split(':')[0])
end_hour = int(end_time.split(':')[0])
if end_hour < start_hour:
end_date = (pd.to_datetime(date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
end_dt = f"{end_date} {end_time}"
tasks.append({
'Task': staff_id,
'Start': start_dt,
'Finish': end_dt,
'Resource': 'Shift'
})
else:
date = day['date']
start_dt = f"{date} {clinic_start}"
# Calculate end of day
end_date = date
end_dt = f"{end_date} {clinic_end}"
tasks.append({
'Task': staff_id,
'Start': start_dt,
'Finish': end_dt,
'Resource': 'Rest Day'
})
df_tasks = pd.DataFrame(tasks)
# Convert string dates to datetime
df_tasks['Start'] = pd.to_datetime(df_tasks['Start'])
df_tasks['Finish'] = pd.to_datetime(df_tasks['Finish'])
# Create the Gantt chart
fig = ff.create_gantt(df_tasks, colors=colors, index_col='Resource',
show_colorbar=True, group_tasks=True, showgrid_x=True,
title='Staff Schedule Gantt Chart')
return fig
# Validate coverage
def validate_coverage(coverage, demand):
is_valid = np.all(coverage >= demand)
coverage_stats = {
'Total Demand': np.sum(demand),
'Total Coverage': np.sum(coverage),
'Coverage Percentage': f"{100 * np.sum(coverage) / np.sum(demand) if np.sum(demand) > 0 else 100:.2f}%",
'Uncovered Slots': np.sum(coverage < demand)
}
return is_valid, coverage_stats
# Gradio interface
def nurse_scheduling_app(
csv_file,
beds_per_staff=3,
max_shift_duration=12,
handover_overlap=30,
max_monthly_hours=234,
clinic_start_time="08:00",
clinic_end_time="20:00",
cycle_duration=5
):
try:
# Load data
demand_data = load_data(csv_file.name)
# Calculate staff needed
staff_needed = calculate_staff_needed(demand_data, beds_per_staff)
# Create time slots
time_slots = create_time_slots(clinic_start_time, clinic_end_time, cycle_duration)
# Run scheduling algorithm
start_date = pd.to_datetime(demand_data['Date'].iloc[0]).strftime('%Y-%m-%d')
staff_schedules, coverage, demand = schedule_staff(
staff_needed,
time_slots,
start_date,
clinic_start_time,
clinic_end_time,
max_shift_duration,
beds_per_staff,
max_monthly_hours,
handover_overlap
)
# Convert to CSV
schedule_df = schedules_to_csv(staff_schedules)
# Create Gantt chart
gantt_chart = create_gantt_chart(staff_schedules, clinic_start_time, clinic_end_time)
# Validate coverage
is_valid, coverage_stats = validate_coverage(coverage, demand)
# Summary
summary = f"""
## Scheduling Summary
- Total Staff Required: {len(staff_schedules)}
- 100% Coverage Achieved: {"Yes" if is_valid else "No"}
- Total Demand (staff-shifts): {coverage_stats['Total Demand']}
- Total Coverage Provided: {coverage_stats['Total Coverage']}
- Coverage Percentage: {coverage_stats['Coverage Percentage']}
- Uncovered Slots: {coverage_stats['Uncovered Slots']}
"""
# Save to CSV
csv_path = "schedule_output.csv"
schedule_df.to_csv(csv_path, index=False)
return summary, gantt_chart, schedule_df, csv_path
except Exception as e:
return f"Error: {str(e)}", None, None, None
# Create Gradio interface
def create_interface():
with gr.Blocks() as interface:
gr.Markdown("# Staff Scheduling Optimizer")
with gr.Row():
with gr.Column():
# Left panel for inputs
gr.Markdown("### Clinic Parameters") # Use Markdown instead of Group label
with gr.Group(): # Group without label
csv_input = gr.File(label="Upload CSV")
beds_per_staff = gr.Number(label="Beds per Staff", value=3)
max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160)
hours_per_cycle = gr.Number(label="Hours per Cycle", value=4)
rest_days_per_week = gr.Number(label="Rest Days per Week", value=2)
gr.Markdown("### Time Parameters") # Use Markdown for section headers
with gr.Group():
clinic_start = gr.Dropdown(
label="Clinic Start Hour",
choices=am_pm_times,
value="08:00 AM"
)
clinic_end = gr.Dropdown(
label="Clinic End Hour",
choices=am_pm_times,
value="08:00 PM"
)
overlap_time = gr.Number(label="Overlap Time", value=0)
max_start_time_change = gr.Number(label="Max Start Time Change", value=2)
with gr.Column():
# Right panel for outputs
with gr.Tabs():
with gr.TabItem("Schedule"):
schedule_output = gr.Dataframe()
with gr.TabItem("Visualization"):
gantt_chart = gr.Plot()
with gr.TabItem("Statistics"):
stats_output = gr.Markdown()
optimize_btn = gr.Button("Optimize Schedule", variant="primary")
# Connect the button to your optimization function
optimize_btn.click(
fn=nurse_scheduling_app,
inputs=[
csv_input,
beds_per_staff,
max_hours_per_staff,
hours_per_cycle,
rest_days_per_week,
clinic_start,
clinic_end,
overlap_time,
max_start_time_change
],
outputs=[schedule_output, gantt_chart, stats_output]
)
return interface
# Define your time options
am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)]
# Launch the interface
if __name__ == "__main__":
interface = create_interface()
interface.launch(share=True)