File size: 18,834 Bytes
e8f491e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
011591f
 
e8f491e
 
 
011591f
 
 
 
 
 
 
 
e8f491e
011591f
 
 
 
 
 
 
 
 
 
 
 
 
 
e8f491e
 
011591f
 
 
 
 
 
 
 
 
 
e8f491e
011591f
 
e8f491e
 
011591f
 
 
 
 
e8f491e
 
011591f
 
e8f491e
011591f
e8f491e
011591f
 
e8f491e
011591f
 
e8f491e
011591f
e8f491e
011591f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
import numpy as np
import pandas as pd
import plotly.figure_factory as ff
import gradio as gr
from datetime import datetime, timedelta
import math
import random
from copy import deepcopy

# Load and process data
def load_data(file_path):
    df = pd.read_csv(file_path)
    return df

# Convert demand data to required staff
def calculate_staff_needed(demand_data, beds_per_staff=3):
    staff_needed = {}
    for column in demand_data.columns:
        if column.startswith('cycle'):
            staff_needed[column] = []
            for value in demand_data[column]:
                if pd.isna(value):
                    staff_needed[column].append(0)
                else:
                    staff_needed[column].append(math.ceil(value / beds_per_staff))
    
    return pd.DataFrame({'Date': demand_data['Date'], **staff_needed})

# Create time slots based on clinic hours
def create_time_slots(start_time, end_time, cycle_duration=5):
    start_hour = int(start_time.split(':')[0])
    start_minute = int(start_time.split(':')[1])
    end_hour = int(end_time.split(':')[0])
    end_minute = int(end_time.split(':')[1])
    
    start_datetime = datetime(2025, 1, 1, start_hour, start_minute)
    end_datetime = datetime(2025, 1, 1, end_hour, end_minute)
    
    if end_datetime <= start_datetime:
        end_datetime += timedelta(days=1)
    
    current_time = start_datetime
    time_slots = []
    
    while current_time < end_datetime:
        next_time = current_time + timedelta(hours=cycle_duration)
        if next_time > end_datetime:
            next_time = end_datetime
        
        time_slots.append({
            'start': current_time.strftime('%H:%M'),
            'end': next_time.strftime('%H:%M')
        })
        
        current_time = next_time
    
    return time_slots

# Check if a shift respects duration constraints
def is_valid_shift_duration(shift, max_duration=12):
    start_hour = int(shift['start_time'].split(':')[0])
    start_minute = int(shift['start_time'].split(':')[1])
    end_hour = int(shift['end_time'].split(':')[0])
    end_minute = int(shift['end_time'].split(':')[1])
    
    start = start_hour * 60 + start_minute
    end = end_hour * 60 + end_minute
    
    if end < start:  # Overnight shift
        end += 24 * 60
    
    duration = (end - start) / 60  # in hours
    return duration <= max_duration

# Check if shift change follows constraints
def is_valid_shift_change(prev_shift, curr_shift, allowed_diff=1):
    if not prev_shift:
        return True
    
    if prev_shift['is_rest_day']:
        return True
    
    prev_start = int(prev_shift['start_time'].split(':')[0]) * 60 + int(prev_shift['start_time'].split(':')[1])
    curr_start = int(curr_shift['start_time'].split(':')[0]) * 60 + int(curr_shift['start_time'].split(':')[1])
    
    diff_hours = abs(curr_start - prev_start) / 60
    return diff_hours <= allowed_diff

# Check weekly rest day requirement
def has_weekly_rest_day(schedule, current_day_idx, days_in_week=7):
    if current_day_idx < days_in_week:
        # Count rest days in the partial week
        rest_days = sum(1 for i in range(current_day_idx) if schedule[i]['is_rest_day'])
        return rest_days > 0
    else:
        # Check the previous 6 days plus current day
        rest_days = sum(1 for i in range(current_day_idx - 6, current_day_idx + 1) if schedule[i]['is_rest_day'])
        return rest_days > 0

# Check total monthly hours
def calculate_monthly_hours(schedule):
    total_hours = 0
    for day in schedule:
        if not day['is_rest_day']:
            start_hour = int(day['start_time'].split(':')[0])
            start_minute = int(day['start_time'].split(':')[1])
            end_hour = int(day['end_time'].split(':')[0])
            end_minute = int(day['end_time'].split(':')[1])
            
            start = start_hour * 60 + start_minute
            end = end_hour * 60 + end_minute
            
            if end < start:  # Overnight shift
                end += 24 * 60
            
            shift_hours = (end - start) / 60
            total_hours += shift_hours
    
    return total_hours

# Generate all possible shifts for a day
def generate_possible_shifts(time_slots, max_duration=12, handover_overlap=30):
    possible_shifts = []
    
    for i in range(len(time_slots)):
        for j in range(i, len(time_slots)):
            start_time = time_slots[i]['start']
            end_time = time_slots[j]['end']
            
            # Add handover overlap
            end_hour, end_minute = map(int, end_time.split(':'))
            end_minute += handover_overlap
            if end_minute >= 60:
                end_hour += 1
                end_minute -= 60
            end_time = f"{end_hour:02d}:{end_minute:02d}"
            
            shift = {
                'start_time': start_time,
                'end_time': end_time,
                'cycles': list(range(i, j+1))
            }
            
            if is_valid_shift_duration(shift, max_duration):
                possible_shifts.append(shift)
    
    return possible_shifts

# Primary scheduling algorithm
def schedule_staff(staff_needed, time_slots, start_date, clinic_start, clinic_end, 
                  max_duration=12, beds_per_staff=3, max_monthly_hours=234, handover_overlap=30):
    
    dates = pd.to_datetime(staff_needed['Date'])
    date_range = [d.strftime('%Y-%m-%d') for d in dates]
    
    # Create demand array
    demand_array = np.zeros((len(date_range), len(time_slots)))
    for i, date in enumerate(date_range):
        for j, cycle in enumerate(staff_needed.columns[1:]):  # Skip 'Date' column
            if j < len(time_slots):
                demand_array[i, j] = staff_needed[cycle].iloc[i]
    
    # Generate all possible shifts
    possible_shifts = generate_possible_shifts(time_slots, max_duration, handover_overlap)
    
    # Initialize staff schedules
    staff_schedules = []
    
    # Helper function to create a rest day entry
    def create_rest_day(date):
        return {
            'date': date,
            'is_rest_day': True,
            'start_time': '',
            'end_time': '',
            'cycles': []
        }
    
    # Initialize coverage array
    coverage = np.zeros_like(demand_array)
    
    # Function to calculate fitness of current solution
    def calculate_fitness(coverage, demand):
        # Calculate how well coverage meets demand
        shortfall = np.maximum(0, demand - coverage)
        return -np.sum(shortfall)  # Negative because we want to minimize shortfall
    
    # Add staff members until demand is met
    iteration_count = 0
    max_iterations = 1000
    
    while np.any(coverage < demand_array) and iteration_count < max_iterations:
        # Create a new staff schedule
        new_staff = []
        for i, date in enumerate(date_range):
            # Decide whether this is a rest day
            if i > 0 and not has_weekly_rest_day(new_staff, i-1):
                new_staff.append(create_rest_day(date))
                continue
            
            # Find the best shift for this day
            best_shift = None
            best_improvement = -1
            
            # Try rest day
            temp_schedule = deepcopy(new_staff)
            temp_schedule.append(create_rest_day(date))
            if has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and calculate_monthly_hours(temp_schedule) <= max_monthly_hours:
                # Rest day is valid
                best_shift = create_rest_day(date)
            
            # Try each possible shift
            for shift in possible_shifts:
                # Check if this shift would help with uncovered demand
                will_help = False
                for cycle_idx in shift['cycles']:
                    if cycle_idx < coverage.shape[1] and coverage[i, cycle_idx] < demand_array[i, cycle_idx]:
                        will_help = True
                        break
                
                if not will_help:
                    continue
                
                # Create temporary shift for validation
                temp_shift = {
                    'date': date,
                    'is_rest_day': False,
                    'start_time': shift['start_time'],
                    'end_time': shift['end_time'],
                    'cycles': shift['cycles']
                }
                
                # Check if shift change is valid
                if i > 0 and not is_valid_shift_change(new_staff[-1], temp_shift):
                    continue
                
                # Check if adding this shift would exceed monthly hours
                temp_schedule = deepcopy(new_staff)
                temp_schedule.append(temp_shift)
                
                if calculate_monthly_hours(temp_schedule) > max_monthly_hours:
                    continue
                
                # Check weekly rest day requirement
                if not has_weekly_rest_day(temp_schedule, len(temp_schedule)-1) and i < len(date_range) - 1:
                    continue
                
                # Calculate improvement in coverage
                temp_coverage = coverage.copy()
                for cycle_idx in shift['cycles']:
                    if cycle_idx < temp_coverage.shape[1]:
                        temp_coverage[i, cycle_idx] += 1
                
                improvement = calculate_fitness(temp_coverage, demand_array) - calculate_fitness(coverage, demand_array)
                
                if improvement > best_improvement:
                    best_improvement = improvement
                    best_shift = temp_shift
            
            # If no valid shift is found, use a rest day
            if best_shift is None:
                best_shift = create_rest_day(date)
            
            new_staff.append(best_shift)
            
            # Update coverage if not a rest day
            if not best_shift['is_rest_day']:
                for cycle_idx in best_shift['cycles']:
                    if cycle_idx < coverage.shape[1]:
                        coverage[i, cycle_idx] += 1
        
        # Add the new staff schedule
        staff_schedules.append(new_staff)
        
        # Check if we've met all demands
        if not np.any(coverage < demand_array):
            break
        
        iteration_count += 1
    
    return staff_schedules, coverage, demand_array

# Convert schedules to CSV format
def schedules_to_csv(staff_schedules):
    rows = []
    for staff_idx, schedule in enumerate(staff_schedules):
        for day in schedule:
            if not day['is_rest_day']:
                rows.append({
                    'Staff ID': f'Staff {staff_idx+1}',
                    'Date': day['date'],
                    'Start Time': day['start_time'],
                    'End Time': day['end_time'],
                    'Is Rest Day': 'No'
                })
            else:
                rows.append({
                    'Staff ID': f'Staff {staff_idx+1}',
                    'Date': day['date'],
                    'Start Time': '',
                    'End Time': '',
                    'Is Rest Day': 'Yes'
                })
    
    return pd.DataFrame(rows)

# Generate Gantt chart
def create_gantt_chart(staff_schedules, clinic_start, clinic_end):
    tasks = []
    colors = {}
    
    for staff_idx, schedule in enumerate(staff_schedules):
        staff_id = f'Staff {staff_idx+1}'
        colors[staff_id] = f'rgb({random.randint(50, 200)}, {random.randint(50, 200)}, {random.randint(50, 200)})'
        
        for day in schedule:
            if not day['is_rest_day']:
                date = day['date']
                start_time = day['start_time']
                end_time = day['end_time']
                
                start_dt = f"{date} {start_time}"
                end_dt = f"{date} {end_time}"
                
                # Handle overnight shifts
                start_hour = int(start_time.split(':')[0])
                end_hour = int(end_time.split(':')[0])
                if end_hour < start_hour:
                    end_date = (pd.to_datetime(date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
                    end_dt = f"{end_date} {end_time}"
                
                tasks.append({
                    'Task': staff_id,
                    'Start': start_dt,
                    'Finish': end_dt,
                    'Resource': 'Shift'
                })
            else:
                date = day['date']
                start_dt = f"{date} {clinic_start}"
                
                # Calculate end of day
                end_date = date
                end_dt = f"{end_date} {clinic_end}"
                
                tasks.append({
                    'Task': staff_id,
                    'Start': start_dt,
                    'Finish': end_dt,
                    'Resource': 'Rest Day'
                })
    
    df_tasks = pd.DataFrame(tasks)
    
    # Convert string dates to datetime
    df_tasks['Start'] = pd.to_datetime(df_tasks['Start'])
    df_tasks['Finish'] = pd.to_datetime(df_tasks['Finish'])
    
    # Create the Gantt chart
    fig = ff.create_gantt(df_tasks, colors=colors, index_col='Resource',
                         show_colorbar=True, group_tasks=True, showgrid_x=True,
                         title='Staff Schedule Gantt Chart')
    
    return fig

# Validate coverage
def validate_coverage(coverage, demand):
    is_valid = np.all(coverage >= demand)
    coverage_stats = {
        'Total Demand': np.sum(demand),
        'Total Coverage': np.sum(coverage),
        'Coverage Percentage': f"{100 * np.sum(coverage) / np.sum(demand) if np.sum(demand) > 0 else 100:.2f}%",
        'Uncovered Slots': np.sum(coverage < demand)
    }
    return is_valid, coverage_stats

# Gradio interface
def nurse_scheduling_app(
    csv_file,
    beds_per_staff=3,
    max_shift_duration=12,
    handover_overlap=30,
    max_monthly_hours=234,
    clinic_start_time="08:00",
    clinic_end_time="20:00",
    cycle_duration=5
):
    try:
        # Load data
        demand_data = load_data(csv_file.name)
        
        # Calculate staff needed
        staff_needed = calculate_staff_needed(demand_data, beds_per_staff)
        
        # Create time slots
        time_slots = create_time_slots(clinic_start_time, clinic_end_time, cycle_duration)
        
        # Run scheduling algorithm
        start_date = pd.to_datetime(demand_data['Date'].iloc[0]).strftime('%Y-%m-%d')
        staff_schedules, coverage, demand = schedule_staff(
            staff_needed, 
            time_slots, 
            start_date,
            clinic_start_time,
            clinic_end_time,
            max_shift_duration,
            beds_per_staff,
            max_monthly_hours,
            handover_overlap
        )
        
        # Convert to CSV
        schedule_df = schedules_to_csv(staff_schedules)
        
        # Create Gantt chart
        gantt_chart = create_gantt_chart(staff_schedules, clinic_start_time, clinic_end_time)
        
        # Validate coverage
        is_valid, coverage_stats = validate_coverage(coverage, demand)
        
        # Summary
        summary = f"""
        ## Scheduling Summary
        
        - Total Staff Required: {len(staff_schedules)}
        - 100% Coverage Achieved: {"Yes" if is_valid else "No"}
        - Total Demand (staff-shifts): {coverage_stats['Total Demand']}
        - Total Coverage Provided: {coverage_stats['Total Coverage']}
        - Coverage Percentage: {coverage_stats['Coverage Percentage']}
        - Uncovered Slots: {coverage_stats['Uncovered Slots']}
        """
        
        # Save to CSV
        csv_path = "schedule_output.csv"
        schedule_df.to_csv(csv_path, index=False)
        
        return summary, gantt_chart, schedule_df, csv_path
    
    except Exception as e:
        return f"Error: {str(e)}", None, None, None

# Create Gradio interface
def create_interface():
    with gr.Blocks() as interface:
        gr.Markdown("# Staff Scheduling Optimizer")
        
        with gr.Row():
            with gr.Column():
                # Left panel for inputs
                gr.Markdown("### Clinic Parameters")  # Use Markdown instead of Group label
                with gr.Group():  # Group without label
                    csv_input = gr.File(label="Upload CSV")
                    beds_per_staff = gr.Number(label="Beds per Staff", value=3)
                    max_hours_per_staff = gr.Number(label="Maximum monthly hours", value=160)
                    hours_per_cycle = gr.Number(label="Hours per Cycle", value=4)
                    rest_days_per_week = gr.Number(label="Rest Days per Week", value=2)
                
                gr.Markdown("### Time Parameters")  # Use Markdown for section headers
                with gr.Group():
                    clinic_start = gr.Dropdown(
                        label="Clinic Start Hour",
                        choices=am_pm_times,
                        value="08:00 AM"
                    )
                    clinic_end = gr.Dropdown(
                        label="Clinic End Hour",
                        choices=am_pm_times,
                        value="08:00 PM"
                    )
                    overlap_time = gr.Number(label="Overlap Time", value=0)
                    max_start_time_change = gr.Number(label="Max Start Time Change", value=2)
            
            with gr.Column():
                # Right panel for outputs
                with gr.Tabs():
                    with gr.TabItem("Schedule"):
                        schedule_output = gr.Dataframe()
                    with gr.TabItem("Visualization"):
                        gantt_chart = gr.Plot()
                    with gr.TabItem("Statistics"):
                        stats_output = gr.Markdown()
        
        optimize_btn = gr.Button("Optimize Schedule", variant="primary")
        
        # Connect the button to your optimization function
        optimize_btn.click(
            fn=nurse_scheduling_app,
            inputs=[
                csv_input,
                beds_per_staff,
                max_hours_per_staff,
                hours_per_cycle,
                rest_days_per_week,
                clinic_start,
                clinic_end,
                overlap_time,
                max_start_time_change
            ],
            outputs=[schedule_output, gantt_chart, stats_output]
        )
    
    return interface

# Define your time options
am_pm_times = [f"{i:02d}:00 AM" for i in range(1, 13)] + [f"{i:02d}:00 PM" for i in range(1, 13)]

# Launch the interface
if __name__ == "__main__":
    interface = create_interface()
    interface.launch(share=True)