roster / src /old_streamlit_app.py
nyimbi's picture
Rename src/streamlit_app.py to src/old_streamlit_app.py
7287975 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Enterprise Roster Generator β€” Final Corrected Version
All Streamlit widget/session_state conflicts resolved.
"""
import streamlit as st
import itertools
import pandas as pd
from datetime import datetime, timedelta, time as dt_time
from pathlib import Path
import tempfile
import os
import pickle
import json
import threading
import schedule
import time as pytime
# ------------------------------
# OR-Tools Weekly Optimiser
# ------------------------------
from ortools.sat.python import cp_model
def solve_week(
week_idx: int,
start_date: datetime.date,
available_staff: list[str],
cumulative_shifts: dict[str, int],
all_staff: list[str]
) -> tuple[dict, dict]:
if len(available_staff) < 5:
raise ValueError("At least 5 staff must be available.")
idx_map = {name: i for i, name in enumerate(available_staff)}
N = len(available_staff)
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)]
SHIFT = {"day": 0, "night": 1}
DAYS = 7
WEEKDAY_REL = {0, 1, 2, 3, 4}
model = cp_model.CpModel()
x = {}
for p, d, s in itertools.product(range(9), range(DAYS), range(2)):
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}")
# Coverage
for d in range(DAYS):
req = (3, 1) if d in WEEKDAY_REL else (1, 1)
model.Add(sum(x[p, d, SHIFT["day"]] for p in range(9)) == req[0])
model.Add(sum(x[p, d, SHIFT["night"]] for p in range(9)) == req[1])
# No same-day double shift
for p, d in itertools.product(range(9), range(DAYS)):
model.Add(x[p, d, SHIFT["day"]] + x[p, d, SHIFT["night"]] <= 1)
# No consecutive days
for p in range(9):
for d in range(DAYS - 1):
model.Add(
x[p, d, SHIFT["day"]] +
x[p, d, SHIFT["night"]] +
x[p, d+1, SHIFT["day"]] +
x[p, d+1, SHIFT["night"]] <= 1
)
# Weekend cap (Sat=5, Sun=6)
for p in range(9):
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1)
# β˜…β˜… 48h rest after night shift β˜…β˜…
for p in range(9):
for d in range(DAYS):
night_d = x[p, d, SHIFT["night"]]
if d + 2 < DAYS:
any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]]
any_d2 = x[p, d+2, SHIFT["day"]] + x[p, d+2, SHIFT["night"]]
model.Add(any_d1 + any_d2 <= 2 * (1 - night_d))
elif d + 1 < DAYS:
any_d1 = x[p, d+1, SHIFT["day"]] + x[p, d+1, SHIFT["night"]]
model.Add(any_d1 <= 1 - night_d)
# Vacants forced to 0
for p in range(N, 9):
for d, s in itertools.product(range(DAYS), range(2)):
model.Add(x[p, d, s] == 0)
# Weekly shift bounds (2–3)
week_shifts = {}
for i, name in enumerate(available_staff):
var = model.NewIntVar(2, 3, f"wshift_{i}")
model.Add(var == sum(x[i, d, s] for d in range(DAYS) for s in range(2)))
week_shifts[name] = var
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 30.0
solver.parameters.num_search_workers = 6
if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
raise RuntimeError(f"Week {week_idx+1} infeasible.")
schedule_week = {}
weekly_counts = {name: 0 for name in available_staff}
for d in range(7):
day_staff = [full_names[p] for p in range(9)
if solver.Value(x[p, d, SHIFT["day"]]) and not full_names[p].startswith("Vacant_")]
night_staff = [full_names[p] for p in range(9)
if solver.Value(x[p, d, SHIFT["night"]]) and not full_names[p].startswith("Vacant_")]
schedule_week[d] = {"day": day_staff, "night": night_staff}
for name in day_staff + night_staff:
weekly_counts[name] += 1
return schedule_week, weekly_counts
# ------------------------------
# Google Drive & Gmail Helpers (graceful fallback)
# ------------------------------
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
import io
def get_drive():
scopes = ["https://www.googleapis.com/auth/drive.file", "https://www.googleapis.com/auth/gmail.send"]
cred_path = "credentials.json"
if Path(cred_path).exists():
creds = Credentials.from_service_account_file(cred_path, scopes=scopes)
else:
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON")
creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes)
gauth = GoogleAuth()
gauth.credentials = creds
return GoogleDrive(gauth)
DRIVE_FOLDER_ID = os.getenv("DRIVE_FOLDER_ID", "root")
def save_to_drive(filename: str, content: bytes):
drive = get_drive()
file = drive.CreateFile({
"title": filename,
"parents": [{"id": DRIVE_FOLDER_ID}]
})
file.content = io.BytesIO(content)
file.Upload()
return file["id"]
def list_drive_files(prefix="roster_"):
try:
drive = get_drive()
files = drive.ListFile({
"q": f"'{DRIVE_FOLDER_ID}' in parents and title contains '{prefix}' and trashed=false"
}).GetList()
return sorted(files, key=lambda f: f["createdDate"], reverse=True)
except Exception:
return []
def load_from_drive(file_id: str) -> bytes:
drive = get_drive()
file = drive.CreateFile({"id": file_id})
buffer = io.BytesIO()
file.GetContentFile(buffer)
return buffer.getvalue()
def send_email(to: str, subject: str, body: str):
scopes = ["https://www.googleapis.com/auth/gmail.send"]
cred_path = "credentials.json"
if Path(cred_path).exists():
creds = Credentials.from_service_account_file(cred_path, scopes=scopes)
else:
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON")
creds = Credentials.from_service_account_info(json.loads(cred_json), scopes=scopes)
service = build("gmail", "v1", credentials=creds)
from_email = os.getenv("GMAIL_FROM", "no-reply@yourdomain.com")
message = f"""From: {from_email}
To: {to}
Subject: {subject}
MIME-Version: 1.0
Content-Type: text/html; charset=utf-8
{body}"""
import base64
raw = base64.urlsafe_b64encode(message.encode()).decode()
body_req = {"raw": raw}
service.users().messages().send(userId="me", body=body_req).execute()
def schedule_weekly_emails(schedule_full: dict, staff_emails: dict, start_date: datetime.date):
def job():
today = datetime.now().date()
days_since = (today - start_date).days
if not (0 <= days_since < 42):
return
week_idx = days_since // 7
for name, email in staff_emails.items():
shifts = []
for d in range(week_idx * 7, min((week_idx + 1) * 7, 42)):
dt = start_date + timedelta(days=d)
if name in schedule_full.get(d, {}).get("day", []):
shifts.append(f"{dt:%a %d %b} Day")
if name in schedule_full.get(d, {}).get("night", []):
shifts.append(f"{dt:%a %d %b} Night")
if shifts:
body = f"<p>Hi {name},</p><p>Your shifts for Week {week_idx+1}:</p><ul>" + \
"".join(f"<li>{s}</li>" for s in shifts) + "</ul><p>Rest well!</p>"
send_email(email, f"Roster: Week {week_idx+1}", body)
schedule.every().monday.at("08:00").do(job)
def run_sched():
while True:
schedule.run_pending()
pytime.sleep(60)
threading.Thread(target=run_sched, daemon=True).start()
# ------------------------------
# PDF & ICS Exports
# ------------------------------
def export_pdf(schedule, weekly_counts, start_date, output_path):
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, PageBreak
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import inch
doc = SimpleDocTemplate(str(output_path), pagesize=landscape(A4),
leftMargin=0.4*inch, rightMargin=0.4*inch,
topMargin=0.4*inch, bottomMargin=0.4*inch)
elements = []
title_style = ParagraphStyle("Title", fontSize=16, spaceAfter=12, alignment=1)
week_style = ParagraphStyle("Week", fontSize=14, spaceAfter=6, spaceBefore=12)
elements.append(Paragraph("6‑Week Fair Roster (48h Night Rest)", title_style))
def day_label(d):
return f"W{(d//7)+1:02d}-{'Mon Tue Wed Thu Fri Sat Sun'.split()[d%7]}"
for w in range(6):
elements.append(Paragraph(f"Week {w+1}", week_style))
data = [["Date", "Day", "Type", "Day Shift", "Night Shift"]]
for d in range(w*7, (w+1)*7):
dt = start_date + timedelta(days=d)
dl = day_label(d)
typ = "WD" if (d%7)<5 else "WE"
ds = ", ".join(schedule.get(d, {}).get("day", []))
ns = ", ".join(schedule.get(d, {}).get("night", []))
data.append([dt.strftime("%a %d %b"), dl, typ, ds, ns])
table = Table(data, colWidths=[0.8*inch, 0.9*inch, 0.5*inch, 2.2*inch, 2.2*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0,0), (-1,0), colors.darkblue),
('TEXTCOLOR', (0,0), (-1,0), colors.whitesmoke),
('GRID', (0,0), (-1,-1), 0.5, colors.black),
('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
('FONTSIZE', (0,0), (-1,0), 10),
]))
elements.append(table)
if w < 5: elements.append(PageBreak())
doc.build(elements)
return True
except Exception as e:
st.error(f"PDF failed: {e}")
return False
def export_ics(schedule, start_date, output_path):
try:
from icalendar import Calendar, Event, Alarm
cal = Calendar()
cal.add('prodid', '-//Roster//streamlit//')
cal.add('version', '2.0')
for d in range(42):
shift_date = start_date + timedelta(days=d)
for shift_type, key in [("Day","day"),("Night","night")]:
for name in schedule.get(d, {}).get(key, []):
ev = Event()
st_t = dt_time(8,0) if shift_type=="Day" else dt_time(20,0)
en_t = dt_time(16,0) if shift_type=="Day" else dt_time(8,0)
ev.add('summary', f"{name} β€” {shift_type} Shift")
ev.add('dtstart', datetime.combine(shift_date, st_t))
ev.add('dtend', datetime.combine(shift_date + (timedelta(days=1) if shift_type=="Night" else timedelta()), en_t))
ev.add('categories', [shift_type])
alarm = Alarm()
alarm.add('action', 'DISPLAY')
alarm.add('trigger', timedelta(minutes=-15))
ev.add_component(alarm)
cal.add_component(ev)
with open(output_path, 'wb') as f:
f.write(cal.to_ical())
return True
except Exception as e:
st.error(f"ICS failed: {e}")
return False
# ------------------------------
# Streamlit App β€” CORRECTED SESSION STATE HANDLING
# ------------------------------
st.set_page_config(page_title="Enterprise Roster (Final)", layout="wide")
st.title("πŸͺ– Enterprise Roster Generator β€” Final")
# === Initialize session state (FIRST RUN ONLY) ===
if "initialized" not in st.session_state:
st.session_state.initialized = True
st.session_state.names = [""] * 9
st.session_state.emails = [""] * 9
# Use a separate key for default start date
st.session_state.start_date = (datetime.today() + timedelta(days=(7 - datetime.today().weekday()) % 7)).date()
st.session_state.user_role = "manager"
st.session_state.staff_email = ""
st.session_state.cumulative_shifts = {}
st.session_state.roster_weekly = {}
st.session_state.roster_ready = False
# === Auth UI ===
st.sidebar.header("πŸ” Access")
role = st.sidebar.radio(
"Role",
["Manager", "Staff"],
index=0 if st.session_state.user_role == "manager" else 1,
key="role_radio"
)
st.session_state.user_role = "manager" if role == "Manager" else "staff"
if st.session_state.user_role == "staff":
staff_email_input = st.sidebar.text_input(
"Your Email",
value=st.session_state.staff_email,
key="staff_email_input"
)
st.session_state.staff_email = staff_email_input.strip().lower()
# === Manager Input ===
if st.session_state.user_role == "manager":
st.header("1. Staff")
cols = st.columns(3)
for i in range(9):
with cols[i % 3]:
name_val = st.text_input(
f"Staff {i+1} Name",
value=st.session_state.names[i],
key=f"name_input_{i}"
)
email_val = st.text_input(
f"Email {i+1}",
value=st.session_state.emails[i],
key=f"email_input_{i}"
)
st.session_state.names[i] = name_val.strip()
st.session_state.emails[i] = email_val.strip().lower()
st.header("2. Start Monday")
# βœ… CORRECT: widget key β‰  session state key
sd = st.date_input(
"First Monday",
value=st.session_state.start_date,
key="start_date_input" # ← distinct from st.session_state.start_date
)
# Update session state only if changed
if sd != st.session_state.start_date:
st.session_state.start_date = sd
st.header("3. Weekly Availability (Holiday/Mission)")
st.markdown("Uncheck staff who are unavailable (max 4 absent/week β†’ min 5 available).")
avail_matrix = {}
cols_w = st.columns(6)
for w in range(6):
with cols_w[w]:
st.subheader(f"Week {w+1}")
available = []
for i, name in enumerate([n for n in st.session_state.names if n]):
# βœ… Use unique key per checkbox
is_avail = st.checkbox(
f"{name}",
value=True,
key=f"avail_w{w}_p{i}"
)
if is_avail:
available.append(name)
avail_matrix[w] = available
if len(available) < 5:
st.error("⚠️ β‰₯5 must be available")
if st.button("πŸš€ Generate Rolling Roster", type="primary", key="generate_btn"):
try:
names_all = [n for n in st.session_state.names if n]
emails_all = {
n: e for n, e in zip(
[n for n in st.session_state.names if n],
[e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]]
)
}
cum_shifts = st.session_state.cumulative_shifts.copy()
weekly_sched = {}
for w in range(6):
week_start = st.session_state.start_date + timedelta(weeks=w)
avail = avail_matrix[w]
sched_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all)
for name, cnt in counts_w.items():
cum_shifts[name] = cum_shifts.get(name, 0) + cnt
abs_sched = {}
for d_rel, shifts in sched_w.items():
d_abs = w * 7 + d_rel
abs_sched[d_abs] = shifts
weekly_sched[w] = abs_sched
st.session_state.cumulative_shifts = cum_shifts
st.session_state.roster_weekly = weekly_sched
st.session_state.roster_ready = True
# Auto-save to Drive (graceful)
try:
data = pickle.dumps({
"weekly": weekly_sched,
"cumulative": cum_shifts,
"start": st.session_state.start_date
})
fid = save_to_drive(f"roster_{st.session_state.start_date:%Y%m%d}.pkl", data)
st.info(f"πŸ’Ύ Saved to Drive (ID: {fid[:8]}…)")
except Exception as e:
st.warning(f"Drive save failed: {e}")
# Email scheduler
try:
full_sched = {}
for w_sched in weekly_sched.values():
full_sched.update(w_sched)
schedule_weekly_emails(full_sched, emails_all, st.session_state.start_date)
st.info("πŸ“§ Weekly email reminders scheduled.")
except Exception as e:
st.warning(f"Email setup failed: {e}")
st.success("βœ… Rolling roster generated!")
except Exception as e:
st.error(f"Generation failed: {e}")
# === Display ===
if st.session_state.roster_ready:
full_sched = {}
for w_sched in st.session_state.roster_weekly.values():
full_sched.update(w_sched)
if st.session_state.user_role == "manager":
st.header("πŸ“‹ Full 6‑Week Roster")
rows = []
for d in range(42):
dt = st.session_state.start_date + timedelta(days=d)
wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7]
week = d//7 + 1
typ = "WD" if (d%7)<5 else "WE"
rows.append({
"Week": f"W{week}",
"Date": dt.strftime("%Y-%m-%d"),
"Day": wd,
"Type": typ,
"Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])),
"Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])),
})
df = pd.DataFrame(rows)
st.dataframe(df, use_container_width=True, hide_index=True)
st.subheader("πŸ“Š Cumulative Shifts")
summ = []
for name in [n for n in st.session_state.names if n]:
summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)})
st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True)
# Exports
c1, c2, c3, c4 = st.columns(4)
with c1:
st.download_button("πŸ“₯ CSV", df.to_csv(index=False).encode(), "roster.csv", "text/csv", key="dl_csv")
with c2:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f:
if export_pdf(full_sched, {}, st.session_state.start_date, Path(f.name)):
with open(f.name, "rb") as pf:
st.download_button("πŸ“„ PDF", pf.read(), "roster.pdf", "application/pdf", key="dl_pdf")
os.unlink(f.name)
with c3:
with tempfile.NamedTemporaryFile(delete=False, suffix=".ics") as f:
if export_ics(full_sched, st.session_state.start_date, Path(f.name)):
with open(f.name, "rb") as pf:
st.download_button("πŸ“… ICS", pf.read(), "roster.ics", "text/calendar", key="dl_ics")
os.unlink(f.name)
with c4:
if st.button("πŸ—‘οΈ Clear", key="clear_btn"):
st.session_state.roster_ready = False
st.session_state.roster_weekly = {}
st.session_state.cumulative_shifts = {}
st.rerun()
# Drive Load
st.subheader("☁️ Load from Drive")
files = list_drive_files()
if files:
opts = {f["title"]: f["id"] for f in files}
sel = st.selectbox("Select roster", list(opts.keys()), key="drive_select")
if st.button("πŸ“₯ Load Selected", key="load_btn"):
try:
data = pickle.loads(load_from_drive(opts[sel]))
st.session_state.roster_weekly = data["weekly"]
st.session_state.cumulative_shifts = data["cumulative"]
st.session_state.start_date = data["start"]
st.session_state.roster_ready = True
st.success("Loaded!")
st.rerun()
except Exception as e:
st.error(f"Load failed: {e}")
else:
st.info("No saved rosters.")
else: # staff view
known_names = [n for n in st.session_state.names if n]
known_emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]]
staff_name = None
if st.session_state.staff_email in known_emails:
staff_name = known_names[known_emails.index(st.session_state.staff_email)]
if not staff_name:
st.warning("Enter an email matching a staff member.")
else:
st.header(f"πŸ‘‹ Your Shifts, {staff_name}")
my_shifts = []
for d in range(42):
dt = st.session_state.start_date + timedelta(days=d)
if staff_name in full_sched.get(d, {}).get("day", []):
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"})
if staff_name in full_sched.get(d, {}).get("night", []):
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"})
st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True)
st.caption("Final corrected version β€” no Streamlit session state errors.")