#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Enterprise Roster Generator — Final Corrected Version All Streamlit widget/session_state conflicts resolved. """ import streamlit as st import itertools import pandas as pd from datetime import datetime, timedelta, time as dt_time from pathlib import Path import tempfile import os import pickle import json import threading import schedule import time as pytime # ------------------------------ # OR-Tools Weekly Optimiser # ------------------------------ from ortools.sat.python import cp_model def solve_week( week_idx: int, start_date: datetime.date, available_staff: list[str], cumulative_shifts: dict[str, int], all_staff: list[str] ) -> tuple[dict, dict]: if len(available_staff) < 5: raise ValueError("At least 5 staff must be available.") idx_map = {name: i for i, name in enumerate(available_staff)} N = len(available_staff) full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)] SHIFT = {"day": 0, "night": 1} DAYS = 7 WEEKDAY_REL = {0, 1, 2, 3, 4} model = cp_model.CpModel() x = {} for p, d, s in itertools.product(range(9), range(DAYS), range(2)): x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") # Coverage for d in range(DAYS): req = (3, 1) if d in WEEKDAY_REL else (1, 1) model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == req[0]) model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == req[1]) # No same-day double shift for p, d in itertools.product(range(9), range(DAYS)): model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1) # No consecutive days for p in range(9): for d in range(DAYS - 1): model.Add( x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] + x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]] <= 1 ) # Weekend cap (Sat=5, Sun=6) for p in range(9): model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) # ★★ 48h rest after night shift ★★ for p in range(9): for d in range(DAYS): night_d = x[p, d, SHIFT["night"]] if d + 2 < DAYS: any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]] any_d2 = x[p, d+2, SHIFT["day"]] + x[p, d+2, SHIFT["night"]] model.Add(any_d1 + any_d2 <= 2 * (1 - night_d)) elif d + 1 < DAYS: any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]] model.Add(any_d1 <= 1 - night_d) # Vacants forced to 0 for p in range(N, 9): for d, s in itertools.product(range(DAYS), range(2)): model.Add(x[p, d, s] == 0) # Weekly shift bounds (2–3) week_shifts = {} for i, name in enumerate(available_staff): var = model.NewIntVar(2, 3, f"wshift_{i}") model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2))) week_shifts[name] = var solver = cp_model.CpSolver() solver.parameters.max_time_in_seconds = 30.0 solver.parameters.num_search_workers = 6 if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE): raise RuntimeError(f"Week {week_idx+1} infeasible.") schedule_week = {} weekly_counts = {name: 0 for name in available_staff} for d in range(7): day_staff = [full_names[p] for p in range(9) if solver.Value(x[p, d, SHIFT["day"]]) and not full_names[p].startswith("Vacant_")] night_staff = [full_names[p] for p in range(9) if solver.Value(x[p, d, SHIFT["night"]]) and not full_names[p].startswith("Vacant_")] schedule_week[d] = {"day": day_staff, "night": night_staff} for name in day_staff + night_staff: weekly_counts[name] += 1 return schedule_week, weekly_counts # ------------------------------ # Google Drive & Gmail Helpers (graceful fallback) # ------------------------------ from pydrive2.auth import GoogleAuth from pydrive2.drive import GoogleDrive from google.oauth2.service_account import Credentials from googleapiclient.discovery import build import io def get_drive(): scopes = ["https://www.googleapis.com/auth/drive.file", "https://www.googleapis.com/auth/gmail.send"] cred_path = "credentials.json" if Path(cred_path).exists(): creds = Credentials.from_service_account_file(cred_path, scopes=scopes) else: cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes) gauth = GoogleAuth() gauth.credentials = creds return GoogleDrive(gauth) DRIVE_FOLDER_ID = os.getenv("DRIVE_FOLDER_ID", "root") def save_to_drive(filename: str, content: bytes): drive = get_drive() file = drive.CreateFile({ "title": filename, "parents": [{"id": DRIVE_FOLDER_ID}] }) file.content = io.BytesIO(content) file.Upload() return file["id"] def list_drive_files(prefix="roster_"): try: drive = get_drive() files = drive.ListFile({ "q": f"'{DRIVE_FOLDER_ID}' in parents and title contains '{prefix}' and trashed=false" }).GetList() return sorted(files, key=lambda f: f["createdDate"], reverse=True) except Exception: return [] def load_from_drive(file_id: str) -> bytes: drive = get_drive() file = drive.CreateFile({"id": file_id}) buffer = io.BytesIO() file.GetContentFile(buffer) return buffer.getvalue() def send_email(to: str, subject: str, body: str): scopes = ["https://www.googleapis.com/auth/gmail.send"] cred_path = "credentials.json" if Path(cred_path).exists(): creds = Credentials.from_service_account_file(cred_path, scopes=scopes) else: cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes) service = build("gmail", "v1", credentials=creds) from_email = os.getenv("GMAIL_FROM", "no-reply@yourdomain.com") message = f"""From: {from_email} To: {to} Subject: {subject} MIME-Version: 1.0 Content-Type: text/html; charset=utf-8 {body}""" import base64 raw = base64.urlsafe_b64encode(message.encode()).decode() body_req = {"raw": raw} service.users().messages().send(userId="me", body=body_req).execute() def schedule_weekly_emails(schedule_full: dict, staff_emails: dict, start_date: datetime.date): def job(): today = datetime.now().date() days_since = (today - start_date).days if not (0 <= days_since < 42): return week_idx = days_since // 7 for name, email in staff_emails.items(): shifts = [] for d in range(week_idx * 7, min((week_idx + 1) * 7, 42)): dt = start_date + timedelta(days=d) if name in schedule_full.get(d, {}).get("day", []): shifts.append(f"{dt:%a %d %b} Day") if name in schedule_full.get(d, {}).get("night", []): shifts.append(f"{dt:%a %d %b} Night") if shifts: body = f"
Hi {name},
Your shifts for Week {week_idx+1}:
Rest well!
" send_email(email, f"Roster: Week {week_idx+1}", body) schedule.every().monday.at("08:00").do(job) def run_sched(): while True: schedule.run_pending() pytime.sleep(60) threading.Thread(target=run_sched, daemon=True).start() # ------------------------------ # PDF & ICS Exports # ------------------------------ def export_pdf(schedule, weekly_counts, start_date, output_path): try: from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, PageBreak from reportlab.lib.styles import ParagraphStyle from reportlab.lib.units import inch doc = SimpleDocTemplate(str(output_path), pagesize=landscape(A4), leftMargin=0.4*inch, rightMargin=0.4*inch, topMargin=0.4*inch, bottomMargin=0.4*inch) elements = [] title_style = ParagraphStyle("Title", fontSize=16, spaceAfter=12, alignment=1) week_style = ParagraphStyle("Week", fontSize=14, spaceAfter=6, spaceBefore=12) elements.append(Paragraph("6‑Week Fair Roster (48h Night Rest)", title_style)) def day_label(d): return f"W{(d//7)+1:02d}-{'Mon Tue Wed Thu Fri Sat Sun'.split()[d%7]}" for w in range(6): elements.append(Paragraph(f"Week {w+1}", week_style)) data = [["Date", "Day", "Type", "Day Shift", "Night Shift"]] for d in range(w*7, (w+1)*7): dt = start_date + timedelta(days=d) dl = day_label(d) typ = "WD" if (d%7)<5 else "WE" ds = ", ".join(schedule.get(d, {}).get("day", [])) ns = ", ".join(schedule.get(d, {}).get("night", [])) data.append([dt.strftime("%a %d %b"), dl, typ, ds, ns]) table = Table(data, colWidths=[0.8*inch, 0.9*inch, 0.5*inch, 2.2*inch, 2.2*inch]) table.setStyle(TableStyle([ ('BACKGROUND', (0,0), (-1,0), colors.darkblue), ('TEXTCOLOR', (0,0), (-1,0), colors.whitesmoke), ('GRID', (0,0), (-1,-1), 0.5, colors.black), ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'), ('FONTSIZE', (0,0), (-1,0), 10), ])) elements.append(table) if w < 5: elements.append(PageBreak()) doc.build(elements) return True except Exception as e: st.error(f"PDF failed: {e}") return False def export_ics(schedule, start_date, output_path): try: from icalendar import Calendar, Event, Alarm cal = Calendar() cal.add('prodid', '-//Roster//streamlit//') cal.add('version', '2.0') for d in range(42): shift_date = start_date + timedelta(days=d) for shift_type, key in [("Day","day"),("Night","night")]: for name in schedule.get(d, {}).get(key, []): ev = Event() st_t = dt_time(8,0) if shift_type=="Day" else dt_time(20,0) en_t = dt_time(16,0) if shift_type=="Day" else dt_time(8,0) ev.add('summary', f"{name} — {shift_type} Shift") ev.add('dtstart', datetime.combine(shift_date, st_t)) ev.add('dtend', datetime.combine(shift_date + (timedelta(days=1) if shift_type=="Night" else timedelta()), en_t)) ev.add('categories', [shift_type]) alarm = Alarm() alarm.add('action', 'DISPLAY') alarm.add('trigger', timedelta(minutes=-15)) ev.add_component(alarm) cal.add_component(ev) with open(output_path, 'wb') as f: f.write(cal.to_ical()) return True except Exception as e: st.error(f"ICS failed: {e}") return False # ------------------------------ # Streamlit App — CORRECTED SESSION STATE HANDLING # ------------------------------ st.set_page_config(page_title="Enterprise Roster (Final)", layout="wide") st.title("🪖 Enterprise Roster Generator — Final") # === Initialize session state (FIRST RUN ONLY) === if "initialized" not in st.session_state: st.session_state.initialized = True st.session_state.names = [""] * 9 st.session_state.emails = [""] * 9 # Use a separate key for default start date st.session_state.start_date = (datetime.today() + timedelta(days=(7 - datetime.today().weekday()) % 7)).date() st.session_state.user_role = "manager" st.session_state.staff_email = "" st.session_state.cumulative_shifts = {} st.session_state.roster_weekly = {} st.session_state.roster_ready = False # === Auth UI === st.sidebar.header("🔐 Access") role = st.sidebar.radio( "Role", ["Manager", "Staff"], index=0 if st.session_state.user_role == "manager" else 1, key="role_radio" ) st.session_state.user_role = "manager" if role == "Manager" else "staff" if st.session_state.user_role == "staff": staff_email_input = st.sidebar.text_input( "Your Email", value=st.session_state.staff_email, key="staff_email_input" ) st.session_state.staff_email = staff_email_input.strip().lower() # === Manager Input === if st.session_state.user_role == "manager": st.header("1. Staff") cols = st.columns(3) for i in range(9): with cols[i % 3]: name_val = st.text_input( f"Staff {i+1} Name", value=st.session_state.names[i], key=f"name_input_{i}" ) email_val = st.text_input( f"Email {i+1}", value=st.session_state.emails[i], key=f"email_input_{i}" ) st.session_state.names[i] = name_val.strip() st.session_state.emails[i] = email_val.strip().lower() st.header("2. Start Monday") # ✅ CORRECT: widget key ≠ session state key sd = st.date_input( "First Monday", value=st.session_state.start_date, key="start_date_input" # ← distinct from st.session_state.start_date ) # Update session state only if changed if sd != st.session_state.start_date: st.session_state.start_date = sd st.header("3. Weekly Availability (Holiday/Mission)") st.markdown("Uncheck staff who are unavailable (max 4 absent/week → min 5 available).") avail_matrix = {} cols_w = st.columns(6) for w in range(6): with cols_w[w]: st.subheader(f"Week {w+1}") available = [] for i, name in enumerate([n for n in st.session_state.names if n]): # ✅ Use unique key per checkbox is_avail = st.checkbox( f"{name}", value=True, key=f"avail_w{w}_p{i}" ) if is_avail: available.append(name) avail_matrix[w] = available if len(available) < 5: st.error("⚠️ ≥5 must be available") if st.button("🚀 Generate Rolling Roster", type="primary", key="generate_btn"): try: names_all = [n for n in st.session_state.names if n] emails_all = { n: e for n, e in zip( [n for n in st.session_state.names if n], [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] ) } cum_shifts = st.session_state.cumulative_shifts.copy() weekly_sched = {} for w in range(6): week_start = st.session_state.start_date + timedelta(weeks=w) avail = avail_matrix[w] sched_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all) for name, cnt in counts_w.items(): cum_shifts[name] = cum_shifts.get(name, 0) + cnt abs_sched = {} for d_rel, shifts in sched_w.items(): d_abs = w * 7 + d_rel abs_sched[d_abs] = shifts weekly_sched[w] = abs_sched st.session_state.cumulative_shifts = cum_shifts st.session_state.roster_weekly = weekly_sched st.session_state.roster_ready = True # Auto-save to Drive (graceful) try: data = pickle.dumps({ "weekly": weekly_sched, "cumulative": cum_shifts, "start": st.session_state.start_date }) fid = save_to_drive(f"roster_{st.session_state.start_date:%Y%m%d}.pkl", data) st.info(f"💾 Saved to Drive (ID: {fid[:8]}…)") except Exception as e: st.warning(f"Drive save failed: {e}") # Email scheduler try: full_sched = {} for w_sched in weekly_sched.values(): full_sched.update(w_sched) schedule_weekly_emails(full_sched, emails_all, st.session_state.start_date) st.info("📧 Weekly email reminders scheduled.") except Exception as e: st.warning(f"Email setup failed: {e}") st.success("✅ Rolling roster generated!") except Exception as e: st.error(f"Generation failed: {e}") # === Display === if st.session_state.roster_ready: full_sched = {} for w_sched in st.session_state.roster_weekly.values(): full_sched.update(w_sched) if st.session_state.user_role == "manager": st.header("📋 Full 6‑Week Roster") rows = [] for d in range(42): dt = st.session_state.start_date + timedelta(days=d) wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7] week = d//7 + 1 typ = "WD" if (d%7)<5 else "WE" rows.append({ "Week": f"W{week}", "Date": dt.strftime("%Y-%m-%d"), "Day": wd, "Type": typ, "Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])), "Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])), }) df = pd.DataFrame(rows) st.dataframe(df, use_container_width=True, hide_index=True) st.subheader("📊 Cumulative Shifts") summ = [] for name in [n for n in st.session_state.names if n]: summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)}) st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True) # Exports c1, c2, c3, c4 = st.columns(4) with c1: st.download_button("📥 CSV", df.to_csv(index=False).encode(), "roster.csv", "text/csv", key="dl_csv") with c2: with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f: if export_pdf(full_sched, {}, st.session_state.start_date, Path(f.name)): with open(f.name, "rb") as pf: st.download_button("📄 PDF", pf.read(), "roster.pdf", "application/pdf", key="dl_pdf") os.unlink(f.name) with c3: with tempfile.NamedTemporaryFile(delete=False, suffix=".ics") as f: if export_ics(full_sched, st.session_state.start_date, Path(f.name)): with open(f.name, "rb") as pf: st.download_button("📅 ICS", pf.read(), "roster.ics", "text/calendar", key="dl_ics") os.unlink(f.name) with c4: if st.button("🗑️ Clear", key="clear_btn"): st.session_state.roster_ready = False st.session_state.roster_weekly = {} st.session_state.cumulative_shifts = {} st.rerun() # Drive Load st.subheader("☁️ Load from Drive") files = list_drive_files() if files: opts = {f["title"]: f["id"] for f in files} sel = st.selectbox("Select roster", list(opts.keys()), key="drive_select") if st.button("📥 Load Selected", key="load_btn"): try: data = pickle.loads(load_from_drive(opts[sel])) st.session_state.roster_weekly = data["weekly"] st.session_state.cumulative_shifts = data["cumulative"] st.session_state.start_date = data["start"] st.session_state.roster_ready = True st.success("Loaded!") st.rerun() except Exception as e: st.error(f"Load failed: {e}") else: st.info("No saved rosters.") else: # staff view known_names = [n for n in st.session_state.names if n] known_emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] staff_name = None if st.session_state.staff_email in known_emails: staff_name = known_names[known_emails.index(st.session_state.staff_email)] if not staff_name: st.warning("Enter an email matching a staff member.") else: st.header(f"👋 Your Shifts, {staff_name}") my_shifts = [] for d in range(42): dt = st.session_state.start_date + timedelta(days=d) if staff_name in full_sched.get(d, {}).get("day", []): my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"}) if staff_name in full_sched.get(d, {}).get("night", []): my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"}) st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True) st.caption("Final corrected version — no Streamlit session state errors.")