| | |
| | |
| |
|
| | """ |
| | Enterprise Roster Generator β Final Corrected Version |
| | All Streamlit widget/session_state conflicts resolved. |
| | """ |
| |
|
| | import streamlit as st |
| | import itertools |
| | import pandas as pd |
| | from datetime import datetime, timedelta, time as dt_time |
| | from pathlib import Path |
| | import tempfile |
| | import os |
| | import pickle |
| | import json |
| | import threading |
| | import schedule |
| | import time as pytime |
| |
|
| | |
| | |
| | |
| | from ortools.sat.python import cp_model |
| |
|
| | def solve_week( |
| | week_idx: int, |
| | start_date: datetime.date, |
| | available_staff: list[str], |
| | cumulative_shifts: dict[str, int], |
| | all_staff: list[str] |
| | ) -> tuple[dict, dict]: |
| | if len(available_staff) < 5: |
| | raise ValueError("At least 5 staff must be available.") |
| |
|
| | idx_map = {name: i for i, name in enumerate(available_staff)} |
| | N = len(available_staff) |
| | full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)] |
| | SHIFT = {"day": 0, "night": 1} |
| | DAYS = 7 |
| | WEEKDAY_REL = {0, 1, 2, 3, 4} |
| |
|
| | model = cp_model.CpModel() |
| | x = {} |
| | for p, d, s in itertools.product(range(9), range(DAYS), range(2)): |
| | x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") |
| |
|
| | |
| | for d in range(DAYS): |
| | req = (3, 1) if d in WEEKDAY_REL else (1, 1) |
| | model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == req[0]) |
| | model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == req[1]) |
| |
|
| | |
| | for p, d in itertools.product(range(9), range(DAYS)): |
| | model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1) |
| |
|
| | |
| | for p in range(9): |
| | for d in range(DAYS - 1): |
| | model.Add( |
| | x[p, d, SHIFT["day"]] + |
| | x[p, d, SHIFT["night"]] + |
| | x[p, d+1, SHIFT["day"]] + |
| | x[p, d+1, SHIFT["night"]] <= 1 |
| | ) |
| |
|
| | |
| | for p in range(9): |
| | model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) |
| |
|
| | |
| | for p in range(9): |
| | for d in range(DAYS): |
| | night_d = x[p, d, SHIFT["night"]] |
| | if d + 2 < DAYS: |
| | any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]] |
| | any_d2 = x[p, d+2, SHIFT["day"]] + x[p, d+2, SHIFT["night"]] |
| | model.Add(any_d1 + any_d2 <= 2 * (1 - night_d)) |
| | elif d + 1 < DAYS: |
| | any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]] |
| | model.Add(any_d1 <= 1 - night_d) |
| |
|
| | |
| | for p in range(N, 9): |
| | for d, s in itertools.product(range(DAYS), range(2)): |
| | model.Add(x[p, d, s] == 0) |
| |
|
| | |
| | week_shifts = {} |
| | for i, name in enumerate(available_staff): |
| | var = model.NewIntVar(2, 3, f"wshift_{i}") |
| | model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2))) |
| | week_shifts[name] = var |
| |
|
| | solver = cp_model.CpSolver() |
| | solver.parameters.max_time_in_seconds = 30.0 |
| | solver.parameters.num_search_workers = 6 |
| |
|
| | if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE): |
| | raise RuntimeError(f"Week {week_idx+1} infeasible.") |
| |
|
| | schedule_week = {} |
| | weekly_counts = {name: 0 for name in available_staff} |
| | for d in range(7): |
| | day_staff = [full_names[p] for p in range(9) |
| | if solver.Value(x[p, d, SHIFT["day"]]) and not full_names[p].startswith("Vacant_")] |
| | night_staff = [full_names[p] for p in range(9) |
| | if solver.Value(x[p, d, SHIFT["night"]]) and not full_names[p].startswith("Vacant_")] |
| | schedule_week[d] = {"day": day_staff, "night": night_staff} |
| | for name in day_staff + night_staff: |
| | weekly_counts[name] += 1 |
| |
|
| | return schedule_week, weekly_counts |
| |
|
| | |
| | |
| | |
| | from pydrive2.auth import GoogleAuth |
| | from pydrive2.drive import GoogleDrive |
| | from google.oauth2.service_account import Credentials |
| | from googleapiclient.discovery import build |
| | import io |
| |
|
| | def get_drive(): |
| | scopes = ["https://www.googleapis.com/auth/drive.file", "https://www.googleapis.com/auth/gmail.send"] |
| | cred_path = "credentials.json" |
| | if Path(cred_path).exists(): |
| | creds = Credentials.from_service_account_file(cred_path, scopes=scopes) |
| | else: |
| | cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") |
| | creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes) |
| | gauth = GoogleAuth() |
| | gauth.credentials = creds |
| | return GoogleDrive(gauth) |
| |
|
| | DRIVE_FOLDER_ID = os.getenv("DRIVE_FOLDER_ID", "root") |
| |
|
| | def save_to_drive(filename: str, content: bytes): |
| | drive = get_drive() |
| | file = drive.CreateFile({ |
| | "title": filename, |
| | "parents": [{"id": DRIVE_FOLDER_ID}] |
| | }) |
| | file.content = io.BytesIO(content) |
| | file.Upload() |
| | return file["id"] |
| |
|
| | def list_drive_files(prefix="roster_"): |
| | try: |
| | drive = get_drive() |
| | files = drive.ListFile({ |
| | "q": f"'{DRIVE_FOLDER_ID}' in parents and title contains '{prefix}' and trashed=false" |
| | }).GetList() |
| | return sorted(files, key=lambda f: f["createdDate"], reverse=True) |
| | except Exception: |
| | return [] |
| |
|
| | def load_from_drive(file_id: str) -> bytes: |
| | drive = get_drive() |
| | file = drive.CreateFile({"id": file_id}) |
| | buffer = io.BytesIO() |
| | file.GetContentFile(buffer) |
| | return buffer.getvalue() |
| |
|
| | def send_email(to: str, subject: str, body: str): |
| | scopes = ["https://www.googleapis.com/auth/gmail.send"] |
| | cred_path = "credentials.json" |
| | if Path(cred_path).exists(): |
| | creds = Credentials.from_service_account_file(cred_path, scopes=scopes) |
| | else: |
| | cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON") |
| | creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes) |
| | service = build("gmail", "v1", credentials=creds) |
| | from_email = os.getenv("GMAIL_FROM", "no-reply@yourdomain.com") |
| | message = f"""From: {from_email} |
| | To: {to} |
| | Subject: {subject} |
| | MIME-Version: 1.0 |
| | Content-Type: text/html; charset=utf-8 |
| | |
| | {body}""" |
| | import base64 |
| | raw = base64.urlsafe_b64encode(message.encode()).decode() |
| | body_req = {"raw": raw} |
| | service.users().messages().send(userId="me", body=body_req).execute() |
| |
|
| | def schedule_weekly_emails(schedule_full: dict, staff_emails: dict, start_date: datetime.date): |
| | def job(): |
| | today = datetime.now().date() |
| | days_since = (today - start_date).days |
| | if not (0 <= days_since < 42): |
| | return |
| | week_idx = days_since // 7 |
| | for name, email in staff_emails.items(): |
| | shifts = [] |
| | for d in range(week_idx * 7, min((week_idx + 1) * 7, 42)): |
| | dt = start_date + timedelta(days=d) |
| | if name in schedule_full.get(d, {}).get("day", []): |
| | shifts.append(f"{dt:%a %d %b} Day") |
| | if name in schedule_full.get(d, {}).get("night", []): |
| | shifts.append(f"{dt:%a %d %b} Night") |
| | if shifts: |
| | body = f"<p>Hi {name},</p><p>Your shifts for Week {week_idx+1}:</p><ul>" + \ |
| | "".join(f"<li>{s}</li>" for s in shifts) + "</ul><p>Rest well!</p>" |
| | send_email(email, f"Roster: Week {week_idx+1}", body) |
| |
|
| | schedule.every().monday.at("08:00").do(job) |
| | def run_sched(): |
| | while True: |
| | schedule.run_pending() |
| | pytime.sleep(60) |
| | threading.Thread(target=run_sched, daemon=True).start() |
| |
|
| | |
| | |
| | |
| | def export_pdf(schedule, weekly_counts, start_date, output_path): |
| | try: |
| | from reportlab.lib import colors |
| | from reportlab.lib.pagesizes import A4, landscape |
| | from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, PageBreak |
| | from reportlab.lib.styles import ParagraphStyle |
| | from reportlab.lib.units import inch |
| |
|
| | doc = SimpleDocTemplate(str(output_path), pagesize=landscape(A4), |
| | leftMargin=0.4*inch, rightMargin=0.4*inch, |
| | topMargin=0.4*inch, bottomMargin=0.4*inch) |
| | elements = [] |
| | title_style = ParagraphStyle("Title", fontSize=16, spaceAfter=12, alignment=1) |
| | week_style = ParagraphStyle("Week", fontSize=14, spaceAfter=6, spaceBefore=12) |
| | elements.append(Paragraph("6βWeek Fair Roster (48h Night Rest)", title_style)) |
| |
|
| | def day_label(d): |
| | return f"W{(d//7)+1:02d}-{'Mon Tue Wed Thu Fri Sat Sun'.split()[d%7]}" |
| |
|
| | for w in range(6): |
| | elements.append(Paragraph(f"Week {w+1}", week_style)) |
| | data = [["Date", "Day", "Type", "Day Shift", "Night Shift"]] |
| | for d in range(w*7, (w+1)*7): |
| | dt = start_date + timedelta(days=d) |
| | dl = day_label(d) |
| | typ = "WD" if (d%7)<5 else "WE" |
| | ds = ", ".join(schedule.get(d, {}).get("day", [])) |
| | ns = ", ".join(schedule.get(d, {}).get("night", [])) |
| | data.append([dt.strftime("%a %d %b"), dl, typ, ds, ns]) |
| | table = Table(data, colWidths=[0.8*inch, 0.9*inch, 0.5*inch, 2.2*inch, 2.2*inch]) |
| | table.setStyle(TableStyle([ |
| | ('BACKGROUND', (0,0), (-1,0), colors.darkblue), |
| | ('TEXTCOLOR', (0,0), (-1,0), colors.whitesmoke), |
| | ('GRID', (0,0), (-1,-1), 0.5, colors.black), |
| | ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'), |
| | ('FONTSIZE', (0,0), (-1,0), 10), |
| | ])) |
| | elements.append(table) |
| | if w < 5: elements.append(PageBreak()) |
| | doc.build(elements) |
| | return True |
| | except Exception as e: |
| | st.error(f"PDF failed: {e}") |
| | return False |
| |
|
| | def export_ics(schedule, start_date, output_path): |
| | try: |
| | from icalendar import Calendar, Event, Alarm |
| | cal = Calendar() |
| | cal.add('prodid', '-//Roster//streamlit//') |
| | cal.add('version', '2.0') |
| | for d in range(42): |
| | shift_date = start_date + timedelta(days=d) |
| | for shift_type, key in [("Day","day"),("Night","night")]: |
| | for name in schedule.get(d, {}).get(key, []): |
| | ev = Event() |
| | st_t = dt_time(8,0) if shift_type=="Day" else dt_time(20,0) |
| | en_t = dt_time(16,0) if shift_type=="Day" else dt_time(8,0) |
| | ev.add('summary', f"{name} β {shift_type} Shift") |
| | ev.add('dtstart', datetime.combine(shift_date, st_t)) |
| | ev.add('dtend', datetime.combine(shift_date + (timedelta(days=1) if shift_type=="Night" else timedelta()), en_t)) |
| | ev.add('categories', [shift_type]) |
| | alarm = Alarm() |
| | alarm.add('action', 'DISPLAY') |
| | alarm.add('trigger', timedelta(minutes=-15)) |
| | ev.add_component(alarm) |
| | cal.add_component(ev) |
| | with open(output_path, 'wb') as f: |
| | f.write(cal.to_ical()) |
| | return True |
| | except Exception as e: |
| | st.error(f"ICS failed: {e}") |
| | return False |
| |
|
| | |
| | |
| | |
| | st.set_page_config(page_title="Enterprise Roster (Final)", layout="wide") |
| | st.title("πͺ Enterprise Roster Generator β Final") |
| |
|
| | |
| | if "initialized" not in st.session_state: |
| | st.session_state.initialized = True |
| | st.session_state.names = [""] * 9 |
| | st.session_state.emails = [""] * 9 |
| | |
| | st.session_state.start_date = (datetime.today() + timedelta(days=(7 - datetime.today().weekday()) % 7)).date() |
| | st.session_state.user_role = "manager" |
| | st.session_state.staff_email = "" |
| | st.session_state.cumulative_shifts = {} |
| | st.session_state.roster_weekly = {} |
| | st.session_state.roster_ready = False |
| |
|
| | |
| | st.sidebar.header("π Access") |
| | role = st.sidebar.radio( |
| | "Role", |
| | ["Manager", "Staff"], |
| | index=0 if st.session_state.user_role == "manager" else 1, |
| | key="role_radio" |
| | ) |
| | st.session_state.user_role = "manager" if role == "Manager" else "staff" |
| |
|
| | if st.session_state.user_role == "staff": |
| | staff_email_input = st.sidebar.text_input( |
| | "Your Email", |
| | value=st.session_state.staff_email, |
| | key="staff_email_input" |
| | ) |
| | st.session_state.staff_email = staff_email_input.strip().lower() |
| |
|
| | |
| | if st.session_state.user_role == "manager": |
| | st.header("1. Staff") |
| | cols = st.columns(3) |
| | for i in range(9): |
| | with cols[i % 3]: |
| | name_val = st.text_input( |
| | f"Staff {i+1} Name", |
| | value=st.session_state.names[i], |
| | key=f"name_input_{i}" |
| | ) |
| | email_val = st.text_input( |
| | f"Email {i+1}", |
| | value=st.session_state.emails[i], |
| | key=f"email_input_{i}" |
| | ) |
| | st.session_state.names[i] = name_val.strip() |
| | st.session_state.emails[i] = email_val.strip().lower() |
| |
|
| | st.header("2. Start Monday") |
| | |
| | sd = st.date_input( |
| | "First Monday", |
| | value=st.session_state.start_date, |
| | key="start_date_input" |
| | ) |
| | |
| | if sd != st.session_state.start_date: |
| | st.session_state.start_date = sd |
| |
|
| | st.header("3. Weekly Availability (Holiday/Mission)") |
| | st.markdown("Uncheck staff who are unavailable (max 4 absent/week β min 5 available).") |
| | avail_matrix = {} |
| | cols_w = st.columns(6) |
| | for w in range(6): |
| | with cols_w[w]: |
| | st.subheader(f"Week {w+1}") |
| | available = [] |
| | for i, name in enumerate([n for n in st.session_state.names if n]): |
| | |
| | is_avail = st.checkbox( |
| | f"{name}", |
| | value=True, |
| | key=f"avail_w{w}_p{i}" |
| | ) |
| | if is_avail: |
| | available.append(name) |
| | avail_matrix[w] = available |
| | if len(available) < 5: |
| | st.error("β οΈ β₯5 must be available") |
| |
|
| | if st.button("π Generate Rolling Roster", type="primary", key="generate_btn"): |
| | try: |
| | names_all = [n for n in st.session_state.names if n] |
| | emails_all = { |
| | n: e for n, e in zip( |
| | [n for n in st.session_state.names if n], |
| | [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] |
| | ) |
| | } |
| | cum_shifts = st.session_state.cumulative_shifts.copy() |
| | weekly_sched = {} |
| |
|
| | for w in range(6): |
| | week_start = st.session_state.start_date + timedelta(weeks=w) |
| | avail = avail_matrix[w] |
| | sched_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all) |
| | for name, cnt in counts_w.items(): |
| | cum_shifts[name] = cum_shifts.get(name, 0) + cnt |
| | abs_sched = {} |
| | for d_rel, shifts in sched_w.items(): |
| | d_abs = w * 7 + d_rel |
| | abs_sched[d_abs] = shifts |
| | weekly_sched[w] = abs_sched |
| |
|
| | st.session_state.cumulative_shifts = cum_shifts |
| | st.session_state.roster_weekly = weekly_sched |
| | st.session_state.roster_ready = True |
| |
|
| | |
| | try: |
| | data = pickle.dumps({ |
| | "weekly": weekly_sched, |
| | "cumulative": cum_shifts, |
| | "start": st.session_state.start_date |
| | }) |
| | fid = save_to_drive(f"roster_{st.session_state.start_date:%Y%m%d}.pkl", data) |
| | st.info(f"πΎ Saved to Drive (ID: {fid[:8]}β¦)") |
| | except Exception as e: |
| | st.warning(f"Drive save failed: {e}") |
| |
|
| | |
| | try: |
| | full_sched = {} |
| | for w_sched in weekly_sched.values(): |
| | full_sched.update(w_sched) |
| | schedule_weekly_emails(full_sched, emails_all, st.session_state.start_date) |
| | st.info("π§ Weekly email reminders scheduled.") |
| | except Exception as e: |
| | st.warning(f"Email setup failed: {e}") |
| |
|
| | st.success("β
Rolling roster generated!") |
| |
|
| | except Exception as e: |
| | st.error(f"Generation failed: {e}") |
| |
|
| | |
| | if st.session_state.roster_ready: |
| | full_sched = {} |
| | for w_sched in st.session_state.roster_weekly.values(): |
| | full_sched.update(w_sched) |
| |
|
| | if st.session_state.user_role == "manager": |
| | st.header("π Full 6βWeek Roster") |
| | rows = [] |
| | for d in range(42): |
| | dt = st.session_state.start_date + timedelta(days=d) |
| | wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7] |
| | week = d//7 + 1 |
| | typ = "WD" if (d%7)<5 else "WE" |
| | rows.append({ |
| | "Week": f"W{week}", |
| | "Date": dt.strftime("%Y-%m-%d"), |
| | "Day": wd, |
| | "Type": typ, |
| | "Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])), |
| | "Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])), |
| | }) |
| | df = pd.DataFrame(rows) |
| | st.dataframe(df, use_container_width=True, hide_index=True) |
| |
|
| | st.subheader("π Cumulative Shifts") |
| | summ = [] |
| | for name in [n for n in st.session_state.names if n]: |
| | summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)}) |
| | st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True) |
| |
|
| | |
| | c1, c2, c3, c4 = st.columns(4) |
| | with c1: |
| | st.download_button("π₯ CSV", df.to_csv(index=False).encode(), "roster.csv", "text/csv", key="dl_csv") |
| | with c2: |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f: |
| | if export_pdf(full_sched, {}, st.session_state.start_date, Path(f.name)): |
| | with open(f.name, "rb") as pf: |
| | st.download_button("π PDF", pf.read(), "roster.pdf", "application/pdf", key="dl_pdf") |
| | os.unlink(f.name) |
| | with c3: |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".ics") as f: |
| | if export_ics(full_sched, st.session_state.start_date, Path(f.name)): |
| | with open(f.name, "rb") as pf: |
| | st.download_button("π
ICS", pf.read(), "roster.ics", "text/calendar", key="dl_ics") |
| | os.unlink(f.name) |
| | with c4: |
| | if st.button("ποΈ Clear", key="clear_btn"): |
| | st.session_state.roster_ready = False |
| | st.session_state.roster_weekly = {} |
| | st.session_state.cumulative_shifts = {} |
| | st.rerun() |
| |
|
| | |
| | st.subheader("βοΈ Load from Drive") |
| | files = list_drive_files() |
| | if files: |
| | opts = {f["title"]: f["id"] for f in files} |
| | sel = st.selectbox("Select roster", list(opts.keys()), key="drive_select") |
| | if st.button("π₯ Load Selected", key="load_btn"): |
| | try: |
| | data = pickle.loads(load_from_drive(opts[sel])) |
| | st.session_state.roster_weekly = data["weekly"] |
| | st.session_state.cumulative_shifts = data["cumulative"] |
| | st.session_state.start_date = data["start"] |
| | st.session_state.roster_ready = True |
| | st.success("Loaded!") |
| | st.rerun() |
| | except Exception as e: |
| | st.error(f"Load failed: {e}") |
| | else: |
| | st.info("No saved rosters.") |
| |
|
| | else: |
| | known_names = [n for n in st.session_state.names if n] |
| | known_emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] |
| | staff_name = None |
| | if st.session_state.staff_email in known_emails: |
| | staff_name = known_names[known_emails.index(st.session_state.staff_email)] |
| |
|
| | if not staff_name: |
| | st.warning("Enter an email matching a staff member.") |
| | else: |
| | st.header(f"π Your Shifts, {staff_name}") |
| | my_shifts = [] |
| | for d in range(42): |
| | dt = st.session_state.start_date + timedelta(days=d) |
| | if staff_name in full_sched.get(d, {}).get("day", []): |
| | my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"}) |
| | if staff_name in full_sched.get(d, {}).get("night", []): |
| | my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"}) |
| | st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True) |
| |
|
| | st.caption("Final corrected version β no Streamlit session state errors.") |