lll / mith.py
anujkum0x's picture
Update mith.py
35e3340 verified
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.colors import LinearSegmentedColormap
import plotly.figure_factory as ff
import plotly.express as px
from io import StringIO, BytesIO
from datetime import datetime, timedelta
import random
import math
import tempfile
import os
# Function to parse CSV and schedule staff
def schedule_staff(
demand_csv,
clinic_start_time="07:00",
cycle_lengths="4,7,6,7",
beds_per_staff=3,
max_hours=240,
rest_days=1,
overlap=30,
shift_min=6,
shift_max=12,
staff_count=0
):
# Convert None to default value
staff_count = 0 if staff_count is None else staff_count
beds_per_staff = 3 if beds_per_staff is None else beds_per_staff
# Calculate staff ratio (inverse of beds per staff)
staff_ratio = 1.0 / float(beds_per_staff)
# Parse the CSV data
try:
demand_data = pd.read_csv(StringIO(demand_csv), header=0)
except:
# If no header, try to parse without header
demand_data = pd.read_csv(StringIO(demand_csv), header=None)
# Rename columns
cols = [f"day"]
cols.extend([f"cycle{i+1}" for i in range(len(demand_data.columns)-1)])
demand_data.columns = cols
# Parse cycle lengths
cycle_lengths = [int(x.strip()) for x in cycle_lengths.split(",")]
# Convert clinic start time to datetime
start_time = datetime.strptime(clinic_start_time, "%H:%M")
# Calculate cycle times
cycle_times = []
current_time = start_time
for length in cycle_lengths:
end_time = current_time + timedelta(hours=length)
cycle_times.append((current_time, end_time))
current_time = end_time
# Calculate total days
total_days = len(demand_data)
# Calculate minimum staff needed for each cycle on each day
min_staff_per_cycle = {}
for day in range(total_days):
for cycle in range(len(cycle_lengths)):
cycle_col = f"cycle{cycle+1}" if f"cycle{cycle+1}" in demand_data.columns else cycle+1
demand = demand_data.iloc[day][cycle_col]
if pd.isna(demand):
demand = 0
min_staff_per_cycle[(day, cycle)] = math.ceil(demand * staff_ratio)
# Initialize optimization
if staff_count <= 0:
# Start with a reasonable number of staff
max_demand_day = max(sum(min_staff_per_cycle.get((day, cycle), 0) for cycle in range(len(cycle_lengths)))
for day in range(total_days))
initial_staff = max(max_demand_day,
math.ceil(sum(min_staff_per_cycle.values()) / (shift_max * total_days / 30 * max_hours)))
else:
initial_staff = staff_count
# Create schedule using a greedy algorithm with constraints
best_schedule = None
best_staff_count = float('inf')
# Try multiple iterations with different random seeds
for attempt in range(5):
schedule = create_schedule(
total_days,
cycle_times,
min_staff_per_cycle,
initial_staff,
max_hours,
rest_days,
overlap / 60.0, # Convert to hours
shift_min,
shift_max,
attempt
)
# Count actual staff used
staff_used = len(set(staff for day_schedule in schedule.values() for staff, _, _ in day_schedule))
if staff_used < best_staff_count:
best_schedule = schedule
best_staff_count = staff_used
# If user specified staff count, ensure we use exactly that many
if staff_count > 0 and staff_count > best_staff_count:
best_schedule = create_schedule(
total_days,
cycle_times,
min_staff_per_cycle,
staff_count,
max_hours,
rest_days,
overlap / 60.0,
shift_min,
shift_max,
fixed_staff=staff_count
)
best_staff_count = staff_count
# Generate CSV output
csv_output = generate_csv_output(best_schedule, total_days, cycle_times)
# Create a temporary file for the CSV
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w+') as temp_file:
csv_output.to_csv(temp_file.name, index=False)
temp_file_path = temp_file.name
# Create Gantt chart
fig = create_gantt_chart(best_schedule, total_days, cycle_times, best_staff_count)
return temp_file_path, fig
def create_schedule(total_days, cycle_times, min_staff_per_cycle, initial_staff,
max_hours, rest_days, overlap, shift_min, shift_max, seed=0, fixed_staff=None):
random.seed(seed)
# Initialize empty schedule
schedule = {day: [] for day in range(total_days)}
# Track staff hours and consecutive days
staff_hours = {staff: 0 for staff in range(initial_staff)}
staff_last_day = {staff: -2 for staff in range(initial_staff)} # -2 means never worked
staff_last_shift = {staff: None for staff in range(initial_staff)} # Last shift start time
# For each day
for day in range(total_days):
# For each cycle
for cycle in range(len(cycle_times)):
cycle_start, cycle_end = cycle_times[cycle]
cycle_duration = (cycle_end - cycle_start).total_seconds() / 3600 # in hours
# Calculate how many staff needed for this cycle
staff_needed = min_staff_per_cycle.get((day, cycle), 0)
# Skip if no staff needed
if staff_needed == 0:
continue
# Find available staff for this cycle
available_staff = []
for staff in range(initial_staff):
# Check if staff has a rest day
if staff_last_day[staff] != -2 and (day - staff_last_day[staff]) % 7 < rest_days:
continue
# Check if staff has reached max hours
if staff_hours[staff] + cycle_duration > max_hours:
continue
# Staff is available
available_staff.append(staff)
# Sort available staff by priority
# Priority: 1) Staff who worked the previous day (for consecutive shift constraint)
# 2) Staff with fewer hours
def staff_priority(staff):
consecutive_priority = 1 if staff_last_day[staff] == day - 1 else 0
hours_priority = -staff_hours[staff] # Negative so fewer hours = higher priority
return (consecutive_priority, hours_priority)
available_staff.sort(key=staff_priority, reverse=True)
# Assign staff to this cycle
assigned_staff = []
for i in range(min(staff_needed, len(available_staff))):
staff = available_staff[i]
# Determine shift start and end times
shift_start = cycle_start
shift_end = cycle_end
# Apply consecutive day constraint (±1 hour flexibility)
if staff_last_day[staff] == day - 1 and staff_last_shift[staff] is not None:
# Try to keep the same start time as previous day, with ±1 hour flexibility
prev_start = staff_last_shift[staff]
time_diff = (shift_start.hour - prev_start.hour) + (shift_start.minute - prev_start.minute) / 60
if abs(time_diff) > 1:
# Adjust shift start time to be within 1 hour of previous day
if time_diff > 1:
shift_start = datetime(shift_start.year, shift_start.month, shift_start.day,
prev_start.hour + 1, prev_start.minute)
else:
shift_start = datetime(shift_start.year, shift_start.month, shift_start.day,
prev_start.hour - 1, prev_start.minute)
# Ensure shift duration is between min and max
shift_duration = (shift_end - shift_start).total_seconds() / 3600
if shift_duration < shift_min:
# Extend shift to meet minimum duration
shift_end = shift_start + timedelta(hours=shift_min)
elif shift_duration > shift_max:
# Shorten shift to meet maximum duration
shift_end = shift_start + timedelta(hours=shift_max)
# Add overlap for handover
if i > 0:
# Overlap with previous staff
shift_start = shift_start - timedelta(hours=overlap)
# Update staff information
staff_hours[staff] += (shift_end - shift_start).total_seconds() / 3600
staff_last_day[staff] = day
staff_last_shift[staff] = shift_start
# Add to schedule
schedule[day].append((staff, shift_start, shift_end))
assigned_staff.append(staff)
# If we couldn't assign enough staff, try to extend shifts of already assigned staff
if len(assigned_staff) < staff_needed and not fixed_staff:
# This is a simplified approach - in a real system, you'd need more sophisticated logic
pass
return schedule
def generate_csv_output(schedule, total_days, cycle_times):
# Create a DataFrame for the schedule
rows = []
for day in range(total_days):
for staff, start, end in schedule.get(day, []):
rows.append({
'Day': day + 1,
'Staff': f'Staff {staff + 1}',
'Start Time': start.strftime('%H:%M'),
'End Time': end.strftime('%H:%M'),
'Duration (hours)': round((end - start).total_seconds() / 3600, 2)
})
return pd.DataFrame(rows)
def create_gantt_chart(schedule, total_days, cycle_times, staff_count):
# Prepare data for Gantt chart
df_list = []
for day in range(total_days):
for staff, start, end in schedule.get(day, []):
# Convert to datetime for plotting
plot_start = datetime(2023, 1, 1) + timedelta(days=day, hours=start.hour, minutes=start.minute)
plot_end = datetime(2023, 1, 1) + timedelta(days=day, hours=end.hour, minutes=end.minute)
df_list.append({
'Task': f'Staff {staff + 1}',
'Start': plot_start,
'Finish': plot_end,
'Day': day + 1
})
if not df_list:
# Return empty plot if no schedule
fig = plt.figure(figsize=(12, 8))
plt.text(0.5, 0.5, "No valid schedule found", ha='center', va='center')
return fig
df = pd.DataFrame(df_list)
# Create color map
colors = px.colors.qualitative.Plotly[:staff_count]
if staff_count > len(colors):
# Repeat colors if needed
colors = colors * (staff_count // len(colors) + 1)
# Create Gantt chart using plotly
fig = ff.create_gantt(df, colors=colors, index_col='Task',
show_colorbar=True, group_tasks=True)
# Update layout
fig.update_layout(
title='Staff Schedule Gantt Chart',
xaxis_title='Day and Time',
yaxis_title='Staff',
height=600,
margin=dict(l=50, r=50, t=100, b=100)
)
return fig
# Gradio interface setup
demo = gr.Interface(
fn=schedule_staff,
inputs=[
gr.Textbox(label="Demand CSV Data", lines=10),
gr.Textbox(label="Clinic Start Time (HH:MM)", value="07:00"),
gr.Textbox(label="Cycle Lengths (comma-separated hours)", value="4,7,6,7"),
gr.Number(label="Beds per Staff", value=3),
gr.Number(label="Max Monthly Hours", value=240),
gr.Number(label="Rest Days per Week", value=1),
gr.Number(label="Shift Overlap (minutes)", value=30),
gr.Number(label="Min Shift Duration (hours)", value=6),
gr.Number(label="Max Shift Duration (hours)", value=12),
gr.Number(label="Fixed Staff Count (0 for auto-optimize)", value=0)
],
outputs=[
gr.File(label="Optimized Schedule CSV"),
gr.Plot(label="Schedule Gantt Chart")
],
title="Advanced Staff Scheduling App",
description="Upload demand data and set constraints to generate an optimized staff schedule."
)
demo.launch()