# Add to top of roster_app_enterprise.py (after imports) # ------------------------------ # Weekly Optimiser (7-day CP-SAT) # ------------------------------ def solve_week( week_idx: int, start_date: datetime.date, available_staff: list[str], cumulative_shifts: dict[str, int], # current totals names_all: list[str] ) -> tuple[dict, dict]: """ Returns (schedule_week, weekly_counts) for 7 days. schedule_week: {d: {"day":[], "night":[]}} for d in 0..6 (relative) """ # Map name → index for optimisation (only available staff + placeholders) idx_map = {name: i for i, name in enumerate(available_staff)} N = len(available_staff) if N < 5: raise ValueError("At least 5 staff must be available.") # Pad to 9 with vacants (to reuse model structure) full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)] SHIFT_IDX = {"day": 0, "night": 1} DAYS = 7 WEEKDAY_REL = {0,1,2,3,4} # Mon=0 relative to week start model = cp_model.CpModel() x = {} for p, d, s in itertools.product(range(9), range(DAYS), range(2)): x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") # Coverage (7 days) for d in range(DAYS): req = (3, 1) if d in WEEKDAY_REL else (1, 1) model.Add(sum(x[p, d, SHIFT_IDX["day"]] for p in range(9)) == req[0]) model.Add(sum(x[p, d, SHIFT_IDX["night"]] for p in range(9)) == req[1]) # Temporal constraints (relative days) for p, d in itertools.product(range(9), range(DAYS)): model.Add(x[p, d, SHIFT_IDX["day"]] + x[p, d, SHIFT_IDX["night"]] <= 1) for p in range(9): for d in range(DAYS - 1): model.Add( x[p, d, SHIFT_IDX["day"]] + x[p, d, SHIFT_IDX["night"]] + x[p, d+1, SHIFT_IDX["day"]] + x[p, d+1, SHIFT_IDX["night"]] <= 1 ) # Weekend cap (d=5,6) for p in range(9): model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) # Availability: vacants must be 0 for p in range(N, 9): # vacants for d, s in itertools.product(range(DAYS), range(2)): model.Add(x[p, d, s] == 0) # Fairness: minimize max deviation from target # Target = avg of cumulative + weekly fair share total_needed = sum(3+1 for _ in range(5)) + sum(1+1 for _ in range(2)) # 24 shifts/week target_per_person = total_needed / len(names_all) # ≈2.67 → use soft objective week_shifts = {} for p, name in enumerate(available_staff): var = model.NewIntVar(0, 3, f"weekshift_{p}") model.Add(var == sum(x[p, d, s] for d in range(DAYS) for s in range(2))) week_shifts[name] = var # Soft fairness: minimize max deviation max_dev = model.NewIntVar(0, 3, "max_dev") for name in available_staff: total_after = cumulative_shifts.get(name, 0) + week_shifts[name] # Deviation from ideal rolling mean (e.g., 16/6 ≈ 2.67 per week) ideal = 16 / 6 # 2.666... # Linearize |total_after - ideal| via auxiliary vars pos = model.NewIntVar(0, 10, f"pos_{name}") neg = model.NewIntVar(0, 10, f"neg_{name}") model.Add(total_after - round(ideal * 10) == pos - neg).OnlyEnforceIf(model.NewBoolVar("")) # skip exact # Simpler: bound each person to 2 or 3 shifts/week model.Add(week_shifts[name] >= 2) model.Add(week_shifts[name] <= 3) model.Minimize(max_dev) # Solve solver = cp_model.CpSolver() solver.parameters.max_time_in_seconds = 20.0 solver.parameters.num_search_workers = 4 if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE): raise RuntimeError(f"Week {week_idx+1} infeasible with given availability.") # Extract schedule_week = {} weekly_counts = {name: 0 for name in available_staff} for d in range(7): day_staff = [full_names[p] for p in range(9) if solver.Value(x[p, d, SHIFT_IDX["day"]]) and not full_names[p].startswith("Vacant_")] night_staff = [full_names[p] for p in range(9) if solver.Value(x[p, d, SHIFT_IDX["night"]]) and not full_names[p].startswith("Vacant_")] schedule_week[d] = {"day": day_staff, "night": night_staff} for name in day_staff + night_staff: weekly_counts[name] += 1 return schedule_week, weekly_counts # ------------------------------ # Streamlit UI Additions # ------------------------------ # Add to session state init (if not present) if "cumulative_shifts" not in st.session_state: st.session_state.cumulative_shifts = {} if "roster_weekly" not in st.session_state: st.session_state.roster_weekly = {} # week_idx → schedule # In Manager section, after staff input: st.header("3. Weekly Availability (Holiday/Mission)") st.markdown("Mark unavailable staff for each week (max 4 per week).") # Weekly toggles avail_matrix = {} cols = st.columns(6) for w in range(6): with cols[w]: st.subheader(f"Week {w+1}") available = [] for i, name in enumerate([n for n in st.session_state.names if n]): if st.checkbox(f"{name}", value=True, key=f"avail_w{w}_p{i}"): available.append(name) if len(available) < 5: st.error("⚠️ At least 5 must be available.") avail_matrix[w] = available if st.button("🚀 Generate Rolling Roster", type="primary"): try: names_all = [n for n in st.session_state.names if n] start = st.session_state.start_date cum_shifts = st.session_state.cumulative_shifts.copy() weekly_schedules = {} for w in range(6): week_start = start + timedelta(weeks=w) avail = avail_matrix[w] schedule_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all) # Update cumulative for name, cnt in counts_w.items(): cum_shifts[name] = cum_shifts.get(name, 0) + cnt # Store absolute-date schedule abs_schedule = {} for d_rel, shifts in schedule_w.items(): d_abs = w * 7 + d_rel abs_schedule[d_abs] = shifts weekly_schedules[w] = abs_schedule st.session_state.cumulative_shifts = cum_shifts st.session_state.roster_weekly = weekly_schedules st.session_state.roster_ready = True st.success("✅ Rolling roster generated!") except Exception as e: st.error(f"Generation failed: {e}") # Display logic (updated for weekly) if st.session_state.roster_ready: # Merge weekly schedules into full 42-day dict full_sched = {} for w_sched in st.session_state.roster_weekly.values(): full_sched.update(w_sched) if st.session_state.user_role == "manager": # Full roster table (same as before, using full_sched) rows = [] for d in range(42): dt = st.session_state.start_date + timedelta(days=d) wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7] week = d//7 + 1 typ = "WD" if (d%7)<5 else "WE" rows.append({ "Week": f"W{week}", "Date": dt.strftime("%Y-%m-%d"), "Day": wd, "Type": typ, "Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])), "Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])), }) df = pd.DataFrame(rows) st.dataframe(df, use_container_width=True, hide_index=True) # Cumulative summary st.subheader("📊 Cumulative Shifts (So Far)") summ = [] for name in [n for n in st.session_state.names if n]: summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)}) st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True) else: # Staff view: filter full_sched email = st.session_state.staff_email names = [n for n in st.session_state.names if n] emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] staff_name = None if email in emails: staff_name = names[emails.index(email)] if staff_name: my_shifts = [] for d in range(42): dt = st.session_state.start_date + timedelta(days=d) if staff_name in full_sched.get(d, {}).get("day", []): my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"}) if staff_name in full_sched.get(d, {}).get("night", []): my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"}) st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True) else: st.warning("Enter an email matching a staff member.")