|
|
|
|
|
|
|
|
""" |
|
|
Enterprise Roster Generator ‚Final Adaptive Version |
|
|
All constraints adapt dynamically based on team size to ensure feasibility. |
|
|
""" |
|
|
|
|
|
import itertools |
|
|
import json |
|
|
import os |
|
|
import pickle |
|
|
import tempfile |
|
|
import threading |
|
|
import time as pytime |
|
|
from datetime import datetime, timedelta |
|
|
from datetime import time as dt_time |
|
|
from pathlib import Path |
|
|
|
|
|
import pandas as pd |
|
|
import schedule |
|
|
import streamlit as st |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ortools.sat.python import cp_model |
|
|
|
|
|
|
|
|
def get_weekly_requirements(n_staff: int) -> tuple: |
|
|
""" |
|
|
Returns coverage requirements based on available staff count. |
|
|
For n_staff == 5: reduces ONE weekday day shift to 2. |
|
|
""" |
|
|
if n_staff >= 6: |
|
|
return (3, 1, 1, 1, []) |
|
|
elif n_staff == 5: |
|
|
|
|
|
return (3, 1, 1, 1, [2]) |
|
|
else: |
|
|
raise ValueError("At least 5 staff required.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
N_AVAIL = len(available_staff) |
|
|
if N_AVAIL < 5: |
|
|
raise ValueError("Minimum 5 staff per week.") |
|
|
|
|
|
|
|
|
wd_day, wd_night, we_day, we_night, reduced_days = get_weekly_requirements(N_AVAIL) |
|
|
|
|
|
|
|
|
FULL_NO_CONSECUTIVE = N_AVAIL >= 8 |
|
|
FULL_48H_REST = N_AVAIL >= 8 |
|
|
|
|
|
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N_AVAIL)] |
|
|
SHIFT = {"day": 0, "night": 1} |
|
|
DAYS = 7 |
|
|
WEEKDAY_REL = {0, 1, 2, 3, 4} |
|
|
|
|
|
model = cp_model.CpModel() |
|
|
x = {} |
|
|
for p, d, s in itertools.product(range(9), range(DAYS), range(2)): |
|
|
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") |
|
|
|
|
|
|
|
|
for d in range(DAYS): |
|
|
if d in WEEKDAY_REL: |
|
|
day_req = wd_day |
|
|
if d in reduced_days: |
|
|
day_req = 2 |
|
|
night_req = wd_night |
|
|
else: |
|
|
day_req = we_day |
|
|
night_req = we_night |
|
|
model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == day_req) |
|
|
model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == night_req) |
|
|
|
|
|
|
|
|
for p, d in itertools.product(range(9), range(DAYS)): |
|
|
model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS - 1): |
|
|
if FULL_NO_CONSECUTIVE: |
|
|
|
|
|
model.Add( |
|
|
x[p, d, SHIFT["day"]] |
|
|
+ x[p, d, SHIFT["night"]] |
|
|
+ x[p, d + 1, SHIFT["day"]] |
|
|
+ x[p, d + 1, SHIFT["night"]] |
|
|
<= 1 |
|
|
) |
|
|
else: |
|
|
|
|
|
model.Add(x[p, d, SHIFT["night"]] + x[p, d + 1, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS): |
|
|
night_d = x[p, d, SHIFT["night"]] |
|
|
|
|
|
if d + 1 < DAYS: |
|
|
any_d1 = x[p, d + 1, SHIFT["day"]] + x[p, d + 1, SHIFT["night"]] |
|
|
model.Add(any_d1 <= 1 - night_d) |
|
|
|
|
|
|
|
|
if FULL_48H_REST and d + 2 < DAYS: |
|
|
any_d2 = x[p, d + 2, SHIFT["day"]] + x[p, d + 2, SHIFT["night"]] |
|
|
model.Add(any_d2 <= 1 - night_d) |
|
|
|
|
|
|
|
|
for p in range(N_AVAIL, 9): |
|
|
for d, s in itertools.product(range(DAYS), range(2)): |
|
|
model.Add(x[p, d, s] == 0) |
|
|
|
|
|
|
|
|
if N_AVAIL <= 6: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 3, 4 |
|
|
elif N_AVAIL == 7: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 4 |
|
|
else: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 3 |
|
|
|
|
|
week_shifts = {} |
|
|
for i, name in enumerate(available_staff): |
|
|
var = model.NewIntVar(MIN_WEEKLY, MAX_WEEKLY, f"wshift_{i}") |
|
|
model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2))) |
|
|
week_shifts[name] = var |
|
|
|
|
|
|
|
|
objective_terms = [] |
|
|
for i, name in enumerate(available_staff): |
|
|
cum = cumulative_shifts.get(name, 0) |
|
|
|
|
|
objective_terms.append(cum * week_shifts[name]) |
|
|
model.Minimize(sum(objective_terms)) |
|
|
|
|
|
solver = cp_model.CpSolver() |
|
|
|
|
|
solver.parameters.max_time_in_seconds = 45.0 if N_AVAIL <= 7 else 30.0 |
|
|
solver.parameters.num_search_workers = 6 |
|
|
|
|
|
if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE): |
|
|
|
|
|
error_msg = f"Week {week_idx + 1} infeasible with {N_AVAIL} staff.\n" |
|
|
error_msg += "Active constraints:\n" |
|
|
error_msg += f"- No consecutive days: {'FULL' if FULL_NO_CONSECUTIVE else 'NIGHT ONLY'}\n" |
|
|
error_msg += f"- Night rest: {'48h' if FULL_48H_REST else '24h'}\n" |
|
|
error_msg += f"- Weekly bounds: {MIN_WEEKLY}-{MAX_WEEKLY} shifts\n" |
|
|
error_msg += ( |
|
|
f"- Coverage: {wd_day}/{wd_night} weekdays, {we_day}/{we_night} weekends" |
|
|
) |
|
|
if reduced_days: |
|
|
error_msg += ( |
|
|
f"\n- Reduced day(s): {', '.join(str(d) for d in reduced_days)}" |
|
|
) |
|
|
raise RuntimeError(error_msg) |
|
|
|
|
|
schedule_week = {} |
|
|
weekly_counts = {name: 0 for name in available_staff} |
|
|
for d in range(7): |
|
|
day_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["day"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
night_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["night"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
schedule_week[d] = {"day": day_staff, "night": night_staff} |
|
|
for name in day_staff + night_staff: |
|
|
weekly_counts[name] += 1 |
|
|
|
|
|
return schedule_week, weekly_counts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wd_day, wd_night, we_day, we_night, reduced_days = get_weekly_requirements(N_AVAIL) |
|
|
|
|
|
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N_AVAIL)] |
|
|
SHIFT = {"day": 0, "night": 1} |
|
|
DAYS = 7 |
|
|
WEEKDAY_REL = {0, 1, 2, 3, 4} |
|
|
|
|
|
model = cp_model.CpModel() |
|
|
x = {} |
|
|
for p, d, s in itertools.product(range(9), range(DAYS), range(2)): |
|
|
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") |
|
|
|
|
|
|
|
|
for d in range(DAYS): |
|
|
if d in WEEKDAY_REL: |
|
|
day_req = wd_day |
|
|
if d in reduced_days: |
|
|
day_req = 2 |
|
|
night_req = wd_night |
|
|
else: |
|
|
day_req = we_day |
|
|
night_req = we_night |
|
|
model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == day_req) |
|
|
model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == night_req) |
|
|
|
|
|
|
|
|
for p, d in itertools.product(range(9), range(DAYS)): |
|
|
model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS - 1): |
|
|
model.Add(x[p, d, SHIFT["night"]] + x[p, d + 1, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) |
|
|
|
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS): |
|
|
night_d = x[p, d, SHIFT["night"]] |
|
|
if d + 1 < DAYS: |
|
|
any_d1 = x[p, d + 1, SHIFT["day"]] + x[p, d + 1, SHIFT["night"]] |
|
|
model.Add(any_d1 <= 1 - night_d) |
|
|
|
|
|
|
|
|
for p in range(N_AVAIL, 9): |
|
|
for d, s in itertools.product(range(DAYS), range(2)): |
|
|
model.Add(x[p, d, s] == 0) |
|
|
|
|
|
|
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 3 |
|
|
|
|
|
week_shifts = {} |
|
|
for i, name in enumerate(available_staff): |
|
|
var = model.NewIntVar(MIN_WEEKLY, MAX_WEEKLY, f"wshift_{i}") |
|
|
model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2))) |
|
|
week_shifts[name] = var |
|
|
|
|
|
|
|
|
objective_terms = [] |
|
|
for i, name in enumerate(available_staff): |
|
|
cum = cumulative_shifts.get(name, 0) |
|
|
objective_terms.append(cum * week_shifts[name]) |
|
|
model.Minimize(sum(objective_terms)) |
|
|
|
|
|
solver = cp_model.CpSolver() |
|
|
solver.parameters.max_time_in_seconds = 45.0 |
|
|
solver.parameters.num_search_workers = 6 |
|
|
|
|
|
status = solver.Solve(model) |
|
|
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): |
|
|
status_map = { |
|
|
cp_model.UNKNOWN: "UNKNOWN", |
|
|
cp_model.MODEL_INVALID: "MODEL_INVALID", |
|
|
cp_model.FEASIBLE: "FEASIBLE", |
|
|
cp_model.INFEASIBLE: "INFEASIBLE", |
|
|
cp_model.OPTIMAL: "OPTIMAL", |
|
|
} |
|
|
status_name = status_map.get(status, f"Status {status}") |
|
|
|
|
|
error_msg = ( |
|
|
f"Week {week_idx + 1} solver returned: {status_name} with {N_AVAIL} staff\n" |
|
|
) |
|
|
error_msg += "To fix infeasibility:\n" |
|
|
error_msg += "1. Check if constraints are too strict\n" |
|
|
error_msg += "2. Verify staff count (9 should work with relaxed constraints)\n" |
|
|
error_msg += "3. Ensure coverage requirements match team capacity" |
|
|
raise RuntimeError(error_msg) |
|
|
|
|
|
|
|
|
schedule_week = {} |
|
|
weekly_counts = {name: 0 for name in available_staff} |
|
|
for d in range(7): |
|
|
day_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["day"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
night_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["night"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
schedule_week[d] = {"day": day_staff, "night": night_staff} |
|
|
for name in day_staff + night_staff: |
|
|
weekly_counts[name] += 1 |
|
|
|
|
|
return schedule_week, weekly_counts |
|
|
|
|
|
|
|
|
|
|
|
def solve_week( |
|
|
week_idx: int, |
|
|
start_date: datetime.date, |
|
|
available_staff: list[str], |
|
|
cumulative_shifts: dict[str, int], |
|
|
all_staff_global: list[str], |
|
|
) -> tuple[dict, dict]: |
|
|
"""Solve one week with constraints that adapt to team size.""" |
|
|
N_AVAIL = len(available_staff) |
|
|
if N_AVAIL < 5: |
|
|
raise ValueError("Minimum 5 staff per week required.") |
|
|
|
|
|
|
|
|
IS_LARGE_TEAM = N_AVAIL >= 9 |
|
|
IS_MEDIUM_LARGE_TEAM = N_AVAIL == 8 |
|
|
|
|
|
|
|
|
wd_day, wd_night, we_day, we_night, reduced_days = get_weekly_requirements(N_AVAIL) |
|
|
|
|
|
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N_AVAIL)] |
|
|
SHIFT = {"day": 0, "night": 1} |
|
|
DAYS = 7 |
|
|
WEEKDAY_REL = {0, 1, 2, 3, 4} |
|
|
|
|
|
model = cp_model.CpModel() |
|
|
x = {} |
|
|
for p, d, s in itertools.product(range(9), range(DAYS), range(2)): |
|
|
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") |
|
|
|
|
|
|
|
|
for d in range(DAYS): |
|
|
if d in WEEKDAY_REL: |
|
|
day_req = wd_day |
|
|
if d in reduced_days: |
|
|
day_req = 2 |
|
|
night_req = wd_night |
|
|
else: |
|
|
day_req = we_day |
|
|
night_req = we_night |
|
|
model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == day_req) |
|
|
model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == night_req) |
|
|
|
|
|
|
|
|
for p, d in itertools.product(range(9), range(DAYS)): |
|
|
model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS - 1): |
|
|
if IS_LARGE_TEAM: |
|
|
|
|
|
model.Add( |
|
|
x[p, d, SHIFT["day"]] |
|
|
+ x[p, d, SHIFT["night"]] |
|
|
+ x[p, d + 1, SHIFT["day"]] |
|
|
+ x[p, d + 1, SHIFT["night"]] |
|
|
<= 1 |
|
|
) |
|
|
elif IS_MEDIUM_LARGE_TEAM: |
|
|
|
|
|
model.Add(x[p, d, SHIFT["night"]] + x[p, d + 1, SHIFT["night"]] <= 1) |
|
|
|
|
|
model.Add(x[p, d, SHIFT["night"]] + x[p, d + 1, SHIFT["day"]] <= 1) |
|
|
model.Add(x[p, d, SHIFT["day"]] + x[p, d + 1, SHIFT["night"]] <= 1) |
|
|
else: |
|
|
|
|
|
model.Add(x[p, d, SHIFT["night"]] + x[p, d + 1, SHIFT["night"]] <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) |
|
|
|
|
|
|
|
|
for p in range(9): |
|
|
for d in range(DAYS): |
|
|
night_d = x[p, d, SHIFT["night"]] |
|
|
|
|
|
if d + 1 < DAYS: |
|
|
any_d1 = x[p, d + 1, SHIFT["day"]] + x[p, d + 1, SHIFT["night"]] |
|
|
model.Add(any_d1 <= 1 - night_d) |
|
|
|
|
|
|
|
|
if IS_LARGE_TEAM and d + 2 < DAYS: |
|
|
any_d2 = x[p, d + 2, SHIFT["day"]] + x[p, d + 2, SHIFT["night"]] |
|
|
model.Add(any_d2 <= 1 - night_d) |
|
|
|
|
|
|
|
|
for p in range(N_AVAIL, 9): |
|
|
for d, s in itertools.product(range(DAYS), range(2)): |
|
|
model.Add(x[p, d, s] == 0) |
|
|
|
|
|
|
|
|
if N_AVAIL <= 6: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 3, 4 |
|
|
elif N_AVAIL == 7: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 4 |
|
|
elif N_AVAIL == 8: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 3 |
|
|
else: |
|
|
MIN_WEEKLY, MAX_WEEKLY = 2, 3 |
|
|
|
|
|
week_shifts = {} |
|
|
for i, name in enumerate(available_staff): |
|
|
var = model.NewIntVar(MIN_WEEKLY, MAX_WEEKLY, f"wshift_{i}") |
|
|
model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2))) |
|
|
week_shifts[name] = var |
|
|
|
|
|
|
|
|
objective_terms = [] |
|
|
for i, name in enumerate(available_staff): |
|
|
cum = cumulative_shifts.get(name, 0) |
|
|
objective_terms.append(cum * week_shifts[name]) |
|
|
model.Minimize(sum(objective_terms)) |
|
|
|
|
|
solver = cp_model.CpSolver() |
|
|
|
|
|
solver.parameters.max_time_in_seconds = 45.0 if N_AVAIL <= 8 else 30.0 |
|
|
solver.parameters.num_search_workers = 6 |
|
|
|
|
|
status = solver.Solve(model) |
|
|
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): |
|
|
status_map = { |
|
|
cp_model.UNKNOWN: "UNKNOWN", |
|
|
cp_model.MODEL_INVALID: "MODEL_INVALID", |
|
|
cp_model.FEASIBLE: "FEASIBLE", |
|
|
cp_model.INFEASIBLE: "INFEASIBLE", |
|
|
cp_model.OPTIMAL: "OPTIMAL", |
|
|
} |
|
|
status_name = status_map.get(status, f"Status {status}") |
|
|
|
|
|
error_msg = ( |
|
|
f"Week {week_idx + 1} solver returned: {status_name} with {N_AVAIL} staff\n" |
|
|
) |
|
|
error_msg += "Active constraints:\n" |
|
|
error_msg += f"- No consecutive days: {'FULL' if IS_LARGE_TEAM else ('MODERATE' if IS_MEDIUM_LARGE_TEAM else 'MINIMAL')}\n" |
|
|
error_msg += f"- Night rest: {'48h' if IS_LARGE_TEAM else '24h'}\n" |
|
|
error_msg += f"- Weekly bounds: {MIN_WEEKLY}-{MAX_WEEKLY} shifts\n" |
|
|
error_msg += ( |
|
|
f"- Coverage: {wd_day}/{wd_night} weekdays, {we_day}/{we_night} weekends" |
|
|
) |
|
|
if reduced_days: |
|
|
error_msg += ( |
|
|
f"\n- Reduced day(s): {', '.join(str(d) for d in reduced_days)}" |
|
|
) |
|
|
|
|
|
|
|
|
if N_AVAIL == 8 and status == cp_model.INFEASIBLE: |
|
|
error_msg += "\n\nüí° Troubleshooting for 8 staff:" |
|
|
error_msg += "\n- Try increasing max weekly shifts to 4 for some staff" |
|
|
error_msg += "\n- Consider relaxing weekend cap for one staff member" |
|
|
error_msg += "\n- Check if night shift distribution is balanced" |
|
|
|
|
|
raise RuntimeError(error_msg) |
|
|
|
|
|
|
|
|
schedule_week = {} |
|
|
weekly_counts = {name: 0 for name in available_staff} |
|
|
for d in range(7): |
|
|
day_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["day"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
night_staff = [ |
|
|
full_names[p] |
|
|
for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT["night"]]) |
|
|
and not full_names[p].startswith("Vacant_") |
|
|
] |
|
|
schedule_week[d] = {"day": day_staff, "night": night_staff} |
|
|
for name in day_staff + night_staff: |
|
|
weekly_counts[name] += 1 |
|
|
|
|
|
return schedule_week, weekly_counts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import io |
|
|
|
|
|
from google.oauth2.service_account import Credentials |
|
|
from googleapiclient.discovery import build |
|
|
from pydrive2.auth import GoogleAuth |
|
|
from pydrive2.drive import GoogleDrive |
|
|
|
|
|
|
|
|
def get_drive(): |
|
|
scopes = [ |
|
|
"https://www.googleapis.com/auth/drive.file", |
|
|
"https://www.googleapis.com/auth/gmail.send", |
|
|
] |
|
|
cred_path = "credentials.json" |
|
|
if Path(cred_path).exists(): |
|
|
creds = Credentials.from_service_account_file(cred_path, scopes=scopes) |
|
|
else: |
|
|
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") |
|
|
if cred_json: |
|
|
creds = Credentials.from_service_account_info( |
|
|
json.loads(cred_json), scopes=scopes |
|
|
) |
|
|
else: |
|
|
raise FileNotFoundError("No Google credentials found") |
|
|
gauth = GoogleAuth() |
|
|
gauth.credentials = creds |
|
|
return GoogleDrive(gauth) |
|
|
|
|
|
|
|
|
DRIVE_FOLDER_ID = os.getenv("DRIVE_FOLDER_ID", "root") |
|
|
|
|
|
|
|
|
def save_to_drive(filename: str, content: bytes): |
|
|
try: |
|
|
drive = get_drive() |
|
|
file = drive.CreateFile( |
|
|
{"title": filename, "parents": [{"id": DRIVE_FOLDER_ID}]} |
|
|
) |
|
|
file.content = io.BytesIO(content) |
|
|
file.Upload() |
|
|
return file["id"] |
|
|
except Exception as e: |
|
|
st.warning(f"Google Drive save failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def list_drive_files(prefix="roster_"): |
|
|
try: |
|
|
drive = get_drive() |
|
|
files = drive.ListFile( |
|
|
{ |
|
|
"q": f"'{DRIVE_FOLDER_ID}' in parents and title contains '{prefix}' and trashed=false" |
|
|
} |
|
|
).GetList() |
|
|
return sorted(files, key=lambda f: f["createdDate"], reverse=True) |
|
|
except Exception: |
|
|
return [] |
|
|
|
|
|
|
|
|
def load_from_drive(file_id: str) -> bytes: |
|
|
try: |
|
|
drive = get_drive() |
|
|
file = drive.CreateFile({"id": file_id}) |
|
|
buffer = io.BytesIO() |
|
|
file.GetContentFile(buffer) |
|
|
return buffer.getvalue() |
|
|
except Exception as e: |
|
|
st.error(f"Drive load failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def send_email(to: str, subject: str, body: str): |
|
|
try: |
|
|
scopes = ["https://www.googleapis.com/auth/gmail.send"] |
|
|
cred_path = "credentials.json" |
|
|
if Path(cred_path).exists(): |
|
|
creds = Credentials.from_service_account_file(cred_path, scopes=scopes) |
|
|
else: |
|
|
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") |
|
|
if not cred_json: |
|
|
st.warning("Email skipped: No Google credentials found") |
|
|
return |
|
|
creds = Credentials.from_service_account_info( |
|
|
json.loads(cred_json), scopes=scopes |
|
|
) |
|
|
service = build("gmail", "v1", credentials=creds) |
|
|
from_email = os.getenv("GMAIL_FROM", "no-reply@yourdomain.com") |
|
|
message = f"""From: {from_email} |
|
|
To: {to} |
|
|
Subject: {subject} |
|
|
MIME-Version: 1.0 |
|
|
Content-Type: text/html; charset=utf-8 |
|
|
|
|
|
{body}""" |
|
|
import base64 |
|
|
|
|
|
raw = base64.urlsafe_b64encode(message.encode()).decode() |
|
|
body_req = {"raw": raw} |
|
|
service.users().messages().send(userId="me", body=body_req).execute() |
|
|
except Exception as e: |
|
|
st.warning(f"Email sending failed: {e}") |
|
|
|
|
|
|
|
|
def schedule_weekly_emails( |
|
|
schedule_full: dict, staff_emails: dict, start_date: datetime.date |
|
|
): |
|
|
def job(): |
|
|
today = datetime.now().date() |
|
|
days_since = (today - start_date).days |
|
|
if not (0 <= days_since < 42): |
|
|
return |
|
|
week_idx = days_since // 7 |
|
|
for name, email in staff_emails.items(): |
|
|
shifts = [] |
|
|
for d in range(week_idx * 7, min((week_idx + 1) * 7, 42)): |
|
|
dt = start_date + timedelta(days=d) |
|
|
if name in schedule_full.get(d, {}).get("day", []): |
|
|
shifts.append(f"{dt:%a %d %b} Day") |
|
|
if name in schedule_full.get(d, {}).get("night", []): |
|
|
shifts.append(f"{dt:%a %d %b} Night") |
|
|
if shifts: |
|
|
body = ( |
|
|
f"<p>Hi {name},</p><p>Your shifts for Week {week_idx + 1}:</p><ul>" |
|
|
+ "".join(f"<li>{s}</li>" for s in shifts) |
|
|
+ "</ul><p>Rest well!</p>" |
|
|
) |
|
|
send_email(email, f"Roster: Week {week_idx + 1}", body) |
|
|
|
|
|
schedule.every().monday.at("08:00").do(job) |
|
|
|
|
|
def run_sched(): |
|
|
while True: |
|
|
schedule.run_pending() |
|
|
pytime.sleep(60) |
|
|
|
|
|
threading.Thread(target=run_sched, daemon=True).start() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_pdf(schedule, weekly_counts, start_date, output_path): |
|
|
try: |
|
|
from reportlab.lib import colors |
|
|
from reportlab.lib.pagesizes import A4, landscape |
|
|
from reportlab.lib.styles import ParagraphStyle |
|
|
from reportlab.lib.units import inch |
|
|
from reportlab.platypus import ( |
|
|
PageBreak, |
|
|
Paragraph, |
|
|
SimpleDocTemplate, |
|
|
Table, |
|
|
TableStyle, |
|
|
) |
|
|
|
|
|
doc = SimpleDocTemplate( |
|
|
str(output_path), |
|
|
pagesize=landscape(A4), |
|
|
leftMargin=0.4 * inch, |
|
|
rightMargin=0.4 * inch, |
|
|
topMargin=0.4 * inch, |
|
|
bottomMargin=0.4 * inch, |
|
|
) |
|
|
elements = [] |
|
|
title_style = ParagraphStyle("Title", fontSize=16, spaceAfter=12, alignment=1) |
|
|
week_style = ParagraphStyle("Week", fontSize=14, spaceAfter=6, spaceBefore=12) |
|
|
elements.append(Paragraph("6‚ÄëWeek Fair Roster", title_style)) |
|
|
|
|
|
def day_label(d): |
|
|
return f"W{(d // 7) + 1:02d}-{'Mon Tue Wed Thu Fri Sat Sun'.split()[d % 7]}" |
|
|
|
|
|
for w in range(6): |
|
|
elements.append(Paragraph(f"Week {w + 1}", week_style)) |
|
|
data = [["Date", "Day", "Type", "Day Shift", "Night Shift"]] |
|
|
for d in range(w * 7, (w + 1) * 7): |
|
|
dt = start_date + timedelta(days=d) |
|
|
dl = day_label(d) |
|
|
typ = "WD" if (d % 7) < 5 else "WE" |
|
|
ds = ", ".join(schedule.get(d, {}).get("day", [])) |
|
|
ns = ", ".join(schedule.get(d, {}).get("night", [])) |
|
|
data.append([dt.strftime("%a %d %b"), dl, typ, ds, ns]) |
|
|
table = Table( |
|
|
data, |
|
|
colWidths=[0.8 * inch, 0.9 * inch, 0.5 * inch, 2.2 * inch, 2.2 * inch], |
|
|
) |
|
|
table.setStyle( |
|
|
TableStyle( |
|
|
[ |
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.darkblue), |
|
|
("TEXTCOLOR", (0, 0), (-1, 0), colors.whitesmoke), |
|
|
("GRID", (0, 0), (-1, -1), 0.5, colors.black), |
|
|
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), |
|
|
("FONTSIZE", (0, 0), (-1, 0), 10), |
|
|
] |
|
|
) |
|
|
) |
|
|
elements.append(table) |
|
|
if w < 5: |
|
|
elements.append(PageBreak()) |
|
|
doc.build(elements) |
|
|
return True |
|
|
except Exception as e: |
|
|
st.error(f"PDF failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def export_ics(schedule, start_date, output_path): |
|
|
try: |
|
|
from icalendar import Alarm, Calendar, Event |
|
|
|
|
|
cal = Calendar() |
|
|
cal.add("prodid", "-//Roster//streamlit//") |
|
|
cal.add("version", "2.0") |
|
|
for d in range(42): |
|
|
shift_date = start_date + timedelta(days=d) |
|
|
for shift_type, key in [("Day", "day"), ("Night", "night")]: |
|
|
for name in schedule.get(d, {}).get(key, []): |
|
|
ev = Event() |
|
|
st_t = dt_time(8, 0) if shift_type == "Day" else dt_time(20, 0) |
|
|
en_t = dt_time(16, 0) if shift_type == "Day" else dt_time(8, 0) |
|
|
ev.add("summary", f"{name} — {shift_type} Shift") |
|
|
ev.add("dtstart", datetime.combine(shift_date, st_t)) |
|
|
ev.add( |
|
|
"dtend", |
|
|
datetime.combine( |
|
|
shift_date |
|
|
+ ( |
|
|
timedelta(days=1) |
|
|
if shift_type == "Night" |
|
|
else timedelta() |
|
|
), |
|
|
en_t, |
|
|
), |
|
|
) |
|
|
ev.add("categories", [shift_type]) |
|
|
alarm = Alarm() |
|
|
alarm.add("action", "DISPLAY") |
|
|
alarm.add("trigger", timedelta(minutes=-15)) |
|
|
ev.add_component(alarm) |
|
|
cal.add_component(ev) |
|
|
with open(output_path, "wb") as f: |
|
|
f.write(cal.to_ical()) |
|
|
return True |
|
|
except Exception as e: |
|
|
st.error(f"ICS failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Enterprise Roster (Final)", layout="wide") |
|
|
st.title("Enterprise Roster Generator ‚Final") |
|
|
|
|
|
|
|
|
if "initialized" not in st.session_state: |
|
|
st.session_state.initialized = True |
|
|
st.session_state.names = [""] * 9 |
|
|
st.session_state.emails = [""] * 9 |
|
|
|
|
|
st.session_state.start_date = ( |
|
|
datetime.today() + timedelta(days=(7 - datetime.today().weekday()) % 7) |
|
|
).date() |
|
|
st.session_state.user_role = "manager" |
|
|
st.session_state.staff_email = "" |
|
|
st.session_state.cumulative_shifts = {} |
|
|
st.session_state.roster_weekly = {} |
|
|
st.session_state.roster_ready = False |
|
|
|
|
|
|
|
|
st.sidebar.header("Access") |
|
|
role = st.sidebar.radio( |
|
|
"Role", |
|
|
["Manager", "Staff"], |
|
|
index=0 if st.session_state.user_role == "manager" else 1, |
|
|
key="role_radio", |
|
|
) |
|
|
st.session_state.user_role = "manager" if role == "Manager" else "staff" |
|
|
|
|
|
if st.session_state.user_role == "staff": |
|
|
staff_email_input = st.sidebar.text_input( |
|
|
"Your Email", value=st.session_state.staff_email, key="staff_email_input" |
|
|
) |
|
|
st.session_state.staff_email = staff_email_input.strip().lower() |
|
|
|
|
|
|
|
|
if st.session_state.user_role == "manager": |
|
|
st.header("1. Staff") |
|
|
cols = st.columns(3) |
|
|
for i in range(9): |
|
|
with cols[i % 3]: |
|
|
name_val = st.text_input( |
|
|
f"Staff {i + 1} Name", |
|
|
value=st.session_state.names[i], |
|
|
key=f"name_input_{i}", |
|
|
) |
|
|
email_val = st.text_input( |
|
|
f"Email {i + 1}", |
|
|
value=st.session_state.emails[i], |
|
|
key=f"email_input_{i}", |
|
|
) |
|
|
st.session_state.names[i] = name_val.strip() |
|
|
st.session_state.emails[i] = email_val.strip().lower() |
|
|
|
|
|
st.header("2. Start Monday") |
|
|
|
|
|
sd = st.date_input( |
|
|
"First Monday", |
|
|
value=st.session_state.start_date, |
|
|
key="start_date_input", |
|
|
) |
|
|
|
|
|
if sd != st.session_state.start_date: |
|
|
st.session_state.start_date = sd |
|
|
|
|
|
st.header("3. Weekly Availability (Holiday/Mission)") |
|
|
st.markdown( |
|
|
"Uncheck staff who are unavailable (max 4 absent/week ‚min 5 available)." |
|
|
) |
|
|
avail_matrix = {} |
|
|
cols_w = st.columns(6) |
|
|
for w in range(6): |
|
|
with cols_w[w]: |
|
|
st.subheader(f"Week {w + 1}") |
|
|
available = [] |
|
|
for i, name in enumerate([n for n in st.session_state.names if n]): |
|
|
|
|
|
is_avail = st.checkbox(f"{name}", value=True, key=f"avail_w{w}_p{i}") |
|
|
if is_avail: |
|
|
available.append(name) |
|
|
avail_matrix[w] = available |
|
|
if len(available) < 5: |
|
|
st.error("⚠️ ≥5 must be available") |
|
|
|
|
|
if st.button("Generate Rolling Roster", type="primary", key="generate_btn"): |
|
|
try: |
|
|
names_all = [n for n in st.session_state.names if n] |
|
|
if not names_all: |
|
|
st.error("Please enter at least one staff name.") |
|
|
st.stop() |
|
|
|
|
|
emails_all = { |
|
|
n: e |
|
|
for n, e in zip( |
|
|
[n for n in st.session_state.names if n], |
|
|
[ |
|
|
e |
|
|
for i, e in enumerate(st.session_state.emails) |
|
|
if st.session_state.names[i] |
|
|
], |
|
|
) |
|
|
} |
|
|
|
|
|
cum_shifts = {name: 0 for name in names_all} |
|
|
weekly_sched = {} |
|
|
|
|
|
|
|
|
for w in range(6): |
|
|
avail = avail_matrix[w] |
|
|
if len(avail) < 5: |
|
|
st.error( |
|
|
f"Week {w + 1} has only {len(avail)} available staff. Minimum is 5." |
|
|
) |
|
|
st.stop() |
|
|
|
|
|
if len(avail) == 5: |
|
|
st.warning( |
|
|
f"Week {w + 1}: Reduced coverage mode (one weekday day shift = 2 staff)" |
|
|
) |
|
|
|
|
|
for w in range(6): |
|
|
week_start = st.session_state.start_date + timedelta(weeks=w) |
|
|
avail = avail_matrix[w] |
|
|
try: |
|
|
sched_w, counts_w = solve_week( |
|
|
w, week_start, avail, cum_shifts, names_all |
|
|
) |
|
|
except Exception as e: |
|
|
st.error(f"Failed to generate Week {w + 1}: {str(e)}") |
|
|
st.stop() |
|
|
|
|
|
for name, cnt in counts_w.items(): |
|
|
cum_shifts[name] = cum_shifts.get(name, 0) + cnt |
|
|
|
|
|
abs_sched = {} |
|
|
for d_rel, shifts in sched_w.items(): |
|
|
d_abs = w * 7 + d_rel |
|
|
abs_sched[d_abs] = shifts |
|
|
weekly_sched[w] = abs_sched |
|
|
|
|
|
st.session_state.cumulative_shifts = cum_shifts |
|
|
st.session_state.roster_weekly = weekly_sched |
|
|
st.session_state.roster_ready = True |
|
|
|
|
|
|
|
|
try: |
|
|
data = pickle.dumps( |
|
|
{ |
|
|
"weekly": weekly_sched, |
|
|
"cumulative": cum_shifts, |
|
|
"start": st.session_state.start_date, |
|
|
} |
|
|
) |
|
|
fid = save_to_drive( |
|
|
f"roster_{st.session_state.start_date:%Y%m%d}.pkl", data |
|
|
) |
|
|
if fid: |
|
|
st.info(f"Saved to Drive (ID: {fid[:8]}…)") |
|
|
except Exception as e: |
|
|
st.warning(f"Drive save failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
full_sched = {} |
|
|
for w_sched in weekly_sched.values(): |
|
|
full_sched.update(w_sched) |
|
|
schedule_weekly_emails( |
|
|
full_sched, emails_all, st.session_state.start_date |
|
|
) |
|
|
st.info("Weekly email reminders scheduled.") |
|
|
except Exception as e: |
|
|
st.warning(f"Email setup failed: {e}") |
|
|
|
|
|
st.success("Rolling roster generated!") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Generation failed: {e}") |
|
|
|
|
|
|
|
|
if st.session_state.roster_ready: |
|
|
full_sched = {} |
|
|
for w_sched in st.session_state.roster_weekly.values(): |
|
|
full_sched.update(w_sched) |
|
|
|
|
|
if st.session_state.user_role == "manager": |
|
|
|
|
|
reduced_weeks = [] |
|
|
for w in range(6): |
|
|
if len(avail_matrix[w]) == 5: |
|
|
reduced_weeks.append(w + 1) |
|
|
|
|
|
if reduced_weeks: |
|
|
st.warning( |
|
|
f"Reduced coverage in Week(s): {', '.join(map(str, reduced_weeks))} " |
|
|
"(one weekday day shift = 2 staff instead of 3)." |
|
|
) |
|
|
|
|
|
st.header("Full 6 Week Roster") |
|
|
rows = [] |
|
|
for d in range(42): |
|
|
dt = st.session_state.start_date + timedelta(days=d) |
|
|
wd = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][d % 7] |
|
|
week = d // 7 + 1 |
|
|
typ = "WD" if (d % 7) < 5 else "WE" |
|
|
rows.append( |
|
|
{ |
|
|
"Week": f"W{week}", |
|
|
"Date": dt.strftime("%Y-%m-%d"), |
|
|
"Day": wd, |
|
|
"Type": typ, |
|
|
"Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])), |
|
|
"Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])), |
|
|
} |
|
|
) |
|
|
df = pd.DataFrame(rows) |
|
|
st.dataframe(df, use_container_width=True, hide_index=True) |
|
|
|
|
|
st.subheader("Cumulative Shifts") |
|
|
summ = [] |
|
|
for name in [n for n in st.session_state.names if n]: |
|
|
summ.append( |
|
|
{ |
|
|
"Staff": name, |
|
|
"Total": st.session_state.cumulative_shifts.get(name, 0), |
|
|
} |
|
|
) |
|
|
st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True) |
|
|
|
|
|
|
|
|
c1, c2, c3, c4 = st.columns(4) |
|
|
with c1: |
|
|
st.download_button( |
|
|
"CSV", |
|
|
df.to_csv(index=False).encode(), |
|
|
"roster.csv", |
|
|
"text/csv", |
|
|
key="dl_csv", |
|
|
) |
|
|
with c2: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f: |
|
|
if export_pdf( |
|
|
full_sched, {}, st.session_state.start_date, Path(f.name) |
|
|
): |
|
|
with open(f.name, "rb") as pf: |
|
|
st.download_button( |
|
|
"PDF", |
|
|
pf.read(), |
|
|
"roster.pdf", |
|
|
"application/pdf", |
|
|
key="dl_pdf", |
|
|
) |
|
|
os.unlink(f.name) |
|
|
with c3: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".ics") as f: |
|
|
if export_ics(full_sched, st.session_state.start_date, Path(f.name)): |
|
|
with open(f.name, "rb") as pf: |
|
|
st.download_button( |
|
|
"ICS", |
|
|
pf.read(), |
|
|
"roster.ics", |
|
|
"text/calendar", |
|
|
key="dl_ics", |
|
|
) |
|
|
os.unlink(f.name) |
|
|
with c4: |
|
|
if st.button("Clear", key="clear_btn"): |
|
|
st.session_state.roster_ready = False |
|
|
st.session_state.roster_weekly = {} |
|
|
st.session_state.cumulative_shifts = {} |
|
|
st.rerun() |
|
|
|
|
|
|
|
|
st.subheader("Load from Drive") |
|
|
files = list_drive_files() |
|
|
if files: |
|
|
opts = {f["title"]: f["id"] for f in files} |
|
|
sel = st.selectbox("Select roster", list(opts.keys()), key="drive_select") |
|
|
if st.button("Load Selected", key="load_btn"): |
|
|
try: |
|
|
data = pickle.loads(load_from_drive(opts[sel])) |
|
|
st.session_state.roster_weekly = data["weekly"] |
|
|
st.session_state.cumulative_shifts = data["cumulative"] |
|
|
st.session_state.start_date = data["start"] |
|
|
st.session_state.roster_ready = True |
|
|
st.success("Loaded!") |
|
|
st.rerun() |
|
|
except Exception as e: |
|
|
st.error(f"Load failed: {e}") |
|
|
else: |
|
|
st.info("No saved rosters.") |
|
|
|
|
|
else: |
|
|
known_names = [n for n in st.session_state.names if n] |
|
|
known_emails = [ |
|
|
e |
|
|
for i, e in enumerate(st.session_state.emails) |
|
|
if st.session_state.names[i] |
|
|
] |
|
|
staff_name = None |
|
|
if st.session_state.staff_email in known_emails: |
|
|
staff_name = known_names[known_emails.index(st.session_state.staff_email)] |
|
|
|
|
|
if not staff_name: |
|
|
st.warning("Enter an email matching a staff member.") |
|
|
else: |
|
|
st.header(f"Your Shifts, {staff_name}") |
|
|
my_shifts = [] |
|
|
for d in range(42): |
|
|
dt = st.session_state.start_date + timedelta(days=d) |
|
|
if staff_name in full_sched.get(d, {}).get("day", []): |
|
|
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"}) |
|
|
if staff_name in full_sched.get(d, {}).get("night", []): |
|
|
my_shifts.append( |
|
|
{"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"} |
|
|
) |
|
|
if my_shifts: |
|
|
st.dataframe( |
|
|
pd.DataFrame(my_shifts), use_container_width=True, hide_index=True |
|
|
) |
|
|
else: |
|
|
st.info("No shifts assigned to you in this roster period.") |
|
|
|
|
|
st.caption( |
|
|
"Final adaptive version — constraints adjust based on team size for 5-9 staff teams." |
|
|
) |
|
|
|