Spaces:
Sleeping
Sleeping
| """ | |
| Health Matrix AI Command Center - Streamlit Application | |
| Modern AI-inspired healthcare workforce management platform | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| import time | |
| from datetime import datetime | |
| import os | |
| from typing import Dict, List, Any, Optional, Iterable | |
| import requests | |
| from io import StringIO | |
| import streamlit.components.v1 as components | |
| # Optional: OpenAI for GPT decisions (fails gracefully if missing) | |
| try: | |
| import openai | |
| HAS_OPENAI = True | |
| except Exception: | |
| openai = None | |
| HAS_OPENAI = False | |
| # ============================================================================= | |
| # Minimal embedded CSS (fallback if style.css not found) | |
| # ============================================================================= | |
| _EMBEDDED_CSS = """ | |
| /* ---- Layout helpers ---- */ | |
| .hero-section{padding:40px 0 12px} | |
| .hero-content{text-align:center} | |
| .hero-logo img{height:60px} | |
| .hero-title{font-weight:900;margin:10px 0 8px} | |
| .hero-title .gradient-text{background:linear-gradient(90deg,#0ea5e9,#22c55e); | |
| -webkit-background-clip:text;background-clip:text;color:transparent} | |
| .hero-title .subtitle{font-size:18px;color:#475569} | |
| .hero-description{max-width:900px;margin:8px auto 0;color:#334155} | |
| .hero-stats{display:grid;grid-template-columns:repeat(3,minmax(0,1fr));gap:12px;max-width:750px;margin:18px auto 0} | |
| .stat-item{border:1px solid #e5e7eb;border-radius:14px;padding:14px;background:#fff} | |
| .stat-value{font-size:22px;font-weight:800} | |
| .stat-label{font-size:12px;color:#64748b} | |
| /* KPI cards */ | |
| .kpi-grid{display:grid;grid-template-columns:repeat(3,minmax(0,1fr));gap:14px;margin:10px 0 6px} | |
| .kpi-card{border-radius:16px;border:1px solid #e5e7eb;background:#fff;padding:14px;box-shadow:0 1px 2px rgba(0,0,0,.04)} | |
| .kpi-icon{font-size:22px} | |
| .kpi-value{font-size:26px;font-weight:900;margin-top:6px} | |
| .kpi-label{font-size:12px;color:#64748b} | |
| .kpi-change{font-size:12px;margin-top:6px;color:#0f172a} | |
| /* Status indicator */ | |
| .status-indicator{display:flex;align-items:center;gap:10px;padding:10px 12px;border-radius:12px;border:1px solid #e5e7eb;background:#fff} | |
| .status-dot{width:10px;height:10px;border-radius:50%} | |
| .status-indicator.ready .status-dot{background:#10b981} | |
| .status-indicator.completed .status-dot{background:#3b82f6} | |
| .status-indicator.processing .status-dot{background:#f59e0b} | |
| .pulsing{animation:pulse 1.6s infinite} | |
| @keyframes pulse{0%{transform:scale(1);opacity:1}70%{transform:scale(1.35);opacity:.35}100%{transform:scale(1);opacity:1}} | |
| /* Timeline (for components.html) */ | |
| .timeline-wrap{margin-top:6px} | |
| .timeline{position:relative;margin:8px 0 0 22px;padding-left:18px} | |
| .timeline:before{content:"";position:absolute;left:0;top:0;bottom:0;width:2px;background:#e5e7eb} | |
| .timeline-item{position:relative;display:flex;gap:12px;margin:14px 0;padding:12px 14px;background:#fff;border:1px solid #e5e7eb;border-radius:12px;box-shadow:0 1px 2px rgba(0,0,0,.04)} | |
| .timeline-item.done{border-color:#d1fae5;background:#f0fdf4} | |
| .timeline-item.current{border-color:#93c5fd;background:#eff6ff} | |
| .timeline-marker{position:absolute;left:-23px;top:18px;width:10px;height:10px;border-radius:50%;background:#10b981;border:2px solid #fff;box-shadow:0 0 0 2px #e5e7eb} | |
| .timeline-item.current .timeline-marker{background:#3b82f6} | |
| .timeline-title{font-weight:800;color:#0f172a} | |
| .timeline-meta{font-size:12px;color:#64748b;margin-top:2px} | |
| .timeline-desc{font-size:13px;color:#1f2937;margin-top:6px} | |
| /* Kanban */ | |
| .kanban-board{display:grid;grid-template-columns:repeat(3,minmax(0,1fr));gap:16px} | |
| .kanban-lane{background:#fff;border:1px solid #e5e7eb;border-radius:16px;overflow:hidden} | |
| .lane-header{display:flex;align-items:center;gap:8px;padding:10px 12px;border-bottom:1px solid #e5e7eb;font-weight:800} | |
| .assigned-header{background:#f0fdf4} | |
| .notify-header{background:#eff6ff} | |
| .skipped-header{background:#fff7ed} | |
| .lane-count{margin-left:auto;background:#0f172a;color:#fff;font-size:12px;padding:2px 8px;border-radius:999px} | |
| .lane-content{padding:10px} | |
| .employee-card{border:1px solid #e5e7eb;border-radius:14px;background:#fff;padding:12px;margin-bottom:12px} | |
| .emp-header{display:flex;align-items:center;gap:10px} | |
| .emp-avatar{width:34px;height:34px;border-radius:50%;background:#0ea5e9;color:#fff;display:flex;align-items:center;justify-content:center;font-weight:800} | |
| .emp-info{flex:1} | |
| .emp-name{font-weight:800} | |
| .emp-shift{font-size:12px;color:#64748b} | |
| .emp-chip{font-size:12px;padding:4px 8px;border-radius:999px;border:1px solid #e5e7eb} | |
| .chip-ok{background:#f0fdf4} | |
| .chip-warn{background:#eff6ff} | |
| .chip-fail{background:#fff7ed} | |
| .emp-divider{height:1px;background:#e5e7eb;margin:10px 0} | |
| .emp-badges{display:flex;flex-wrap:wrap;gap:8px} | |
| .emp-badge{display:inline-flex;align-items:center;gap:6px;font-size:12px;border:1px solid #e5e7eb;border-radius:999px;padding:2px 8px} | |
| .badge-ok{background:#f0fdf4} | |
| .badge-fail{background:#fff7ed} | |
| .badge-info{background:#f1f5f9} | |
| .badge-dot{width:6px;height:6px;border-radius:50%;background:#0f172a} | |
| .emp-footer{display:flex;justify-content:space-between;margin-top:8px;font-size:12px;color:#64748b} | |
| /* Overview */ | |
| .overview-metrics{display:flex;flex-wrap:wrap;gap:10px;margin:12px 0} | |
| .metric-chip{border:1px solid #e5e7eb;border-radius:999px;background:#fff;padding:6px 10px;font-size:13px} | |
| .business-case{border:1px solid #e5e7eb;border-radius:12px;background:#fff;padding:12px;margin:10px 0} | |
| .footer{margin-top:24px} | |
| .footer-content{font-size:12px;color:#64748b} | |
| @media (max-width:900px){ | |
| .kpi-grid,.hero-stats,.kanban-board{grid-template-columns:1fr} | |
| } | |
| """ | |
| # ============================================================================= | |
| # Configuration & Setup | |
| # ============================================================================= | |
| def setup_page(): | |
| """Configure Streamlit page settings and load custom CSS""" | |
| st.set_page_config( | |
| page_title="Health Matrix AI Command Center", | |
| page_icon="π₯", | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| # Try external CSS, otherwise fallback | |
| css_loaded = False | |
| try: | |
| if os.path.exists("src/style.css"): | |
| with open("src/style.css", "r", encoding="utf-8") as f: | |
| st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) | |
| css_loaded = True | |
| except Exception: | |
| css_loaded = False | |
| if not css_loaded: | |
| st.markdown(f"<style>{_EMBEDDED_CSS}</style>", unsafe_allow_html=True) | |
| # ============================================================================= | |
| # UKG API helpers (unchanged logic) | |
| # ============================================================================= | |
| def _get_auth_header() -> Dict[str, str]: | |
| """Get UKG API authentication headers""" | |
| app_key = os.environ.get("UKG_APP_KEY") | |
| token = os.environ.get("UKG_AUTH_TOKEN") | |
| if not app_key or not token: | |
| st.warning("UKG authentication variables (UKG_APP_KEY / UKG_AUTH_TOKEN) are not set. API calls may fail.") | |
| return { | |
| "Content-Type": "application/json", | |
| "appkey": app_key or "", | |
| "Authorization": f"Bearer {token}" if token else "", | |
| } | |
| def fetch_open_shifts( | |
| start_date: str = "2000-01-01", | |
| end_date: str = "3000-01-01", | |
| location_ids: Optional[Iterable[str]] = None, | |
| ) -> pd.DataFrame: | |
| """Fetch open shifts from UKG API""" | |
| if location_ids is None: | |
| location_ids = ["2401", "2402", "2953", "2955", "2927", "2928", "2401", "2955"] | |
| url = "https://partnerdemo-019.cfn.mykronos.com/api/v1/scheduling/schedule/multi_read" | |
| headers = _get_auth_header() | |
| payload = { | |
| "select": ["OPENSHIFTS"], | |
| "where": { | |
| "locations": { | |
| "dateRange": {"startDate": start_date, "endDate": end_date}, | |
| "includeEmployeeTransfer": False, | |
| "locations": {"ids": list(location_ids)}, | |
| } | |
| }, | |
| } | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=30) | |
| r.raise_for_status() | |
| data = r.json() | |
| rows: List[Dict[str, Any]] = [] | |
| for shift in data.get("openShifts", []): | |
| rows.append({ | |
| "ID": shift.get("id"), | |
| "Start": shift.get("startDateTime"), | |
| "End": shift.get("endDateTime"), | |
| "Label": shift.get("label"), | |
| "Org Job": (shift.get("segments", [{}])[0].get("orgJobRef", {}).get("qualifier", "") | |
| if shift.get("segments") else ""), | |
| "Posted": shift.get("posted"), | |
| "Self Serviced": shift.get("selfServiced"), | |
| "Locked": shift.get("locked"), | |
| }) | |
| return pd.DataFrame(rows) | |
| except Exception as e: | |
| st.error(f"β UKG Open Shifts API call failed: {e}") | |
| return pd.DataFrame() | |
| def fetch_location_data(date: str = "2025-07-13", query: str = "Medsurg") -> pd.DataFrame: | |
| """Fetch location data from UKG API""" | |
| url = "https://partnerdemo-019.cfn.mykronos.com/api/v1/commons/locations/multi_read" | |
| headers = _get_auth_header() | |
| payload = { | |
| "multiReadOptions": {"includeOrgPathDetails": True}, | |
| "where": {"query": {"context": "ORG", "date": date, "q": query}} | |
| } | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=30) | |
| r.raise_for_status() | |
| data = r.json() | |
| rows = [] | |
| for item in data if isinstance(data, list) else data.get("locations", []): | |
| rows.append({ | |
| "Node ID": item.get("nodeId", ""), | |
| "Name": item.get("name", ""), | |
| "Full Name": item.get("fullName", ""), | |
| "Description": item.get("description", ""), | |
| "Org Path": item.get("orgPath", ""), | |
| "Persistent ID": item.get("persistentId", ""), | |
| }) | |
| return pd.DataFrame(rows) | |
| except Exception as e: | |
| st.error(f"β UKG Location API call failed: {e}") | |
| return pd.DataFrame() | |
| def fetch_employees(employee_ids: Iterable[int]) -> pd.DataFrame: | |
| """Fetch employee data from UKG API""" | |
| base_url = "https://partnerdemo-019.cfn.mykronos.com/api/v1/commons/persons/" | |
| headers = _get_auth_header() | |
| def fetch_employee_data(emp_id: int) -> Optional[Dict[str, Any]]: | |
| try: | |
| resp = requests.get(f"{base_url}{emp_id}", headers=headers, timeout=30) | |
| if resp.status_code != 200: | |
| st.warning(f"β οΈ Could not fetch employee {emp_id}: {resp.status_code}") | |
| return None | |
| data = resp.json() | |
| person_info = data.get("personInformation", {}).get("person", {}) | |
| person_number = person_info.get("personNumber") | |
| full_name = person_info.get("fullName", "") | |
| org_path = "" | |
| primary_accounts = data.get("jobAssignment", {}).get("primaryLaborAccounts", []) | |
| if primary_accounts: | |
| org_path = primary_accounts[0].get("organizationPath", "") | |
| phone = "" | |
| phones = data.get("personInformation", {}).get("telephoneNumbers", []) | |
| if phones: | |
| phone = phones[0].get("phoneNumber", "") | |
| # Fetch certifications | |
| cert_url = f"{base_url}certifications/{emp_id}" | |
| cert_resp = requests.get(cert_url, headers=headers, timeout=30) | |
| certs: List[str] = [] | |
| if cert_resp.status_code == 200: | |
| cert_data = cert_resp.json() | |
| if isinstance(cert_data, list): | |
| for c in cert_data: | |
| qual = c.get("qualifier") | |
| if qual: certs.append(qual) | |
| elif isinstance(cert_data, dict) and "assignments" in cert_data: | |
| for a in cert_data.get("assignments", []): | |
| qual = a.get("certification", {}).get("qualifier") | |
| if qual: certs.append(qual) | |
| certs = [str(x).strip() for x in certs if x] | |
| return { | |
| "personNumber": person_number, | |
| "organizationPath": org_path, | |
| "phoneNumber": phone, | |
| "fullName": full_name, | |
| "Certifications": certs | |
| } | |
| except Exception as e: | |
| st.error(f"β Error fetching employee {emp_id}: {e}") | |
| return None | |
| records = [] | |
| for emp_id in employee_ids: | |
| row = fetch_employee_data(emp_id) | |
| if row: | |
| records.append(row) | |
| df = pd.DataFrame(records) | |
| # Ensure mandatory columns exist with proper lengths | |
| n = len(df) | |
| for col, default in [ | |
| ("personNumber", ""), | |
| ("organizationPath", ""), | |
| ("phoneNumber", ""), | |
| ("fullName", ""), | |
| ("Certifications", []), | |
| ]: | |
| if col not in df.columns: | |
| df[col] = [default] * n | |
| if "Languages" not in df.columns: | |
| df["Languages"] = [[] for _ in range(n)] | |
| df["JobRole"] = df["organizationPath"].apply( | |
| lambda p: p.split("/")[-1] if isinstance(p, str) and p else "" | |
| ) | |
| return df | |
| # ============================================================================= | |
| # Decision Logic (unchanged) | |
| # ============================================================================= | |
| def _has_any(tokens: List[str], haystack: str) -> bool: | |
| hay = (haystack or "").lower() | |
| return any((t or "").strip().lower() in hay for t in tokens if t and t.strip()) | |
| def _has_all_in_list(need: List[str], have: List[str]) -> bool: | |
| have_l = [h.lower() for h in (have or [])] | |
| return all(n.strip().lower() in have_l for n in need if n and n.strip()) | |
| def is_eligible_strict(row: pd.Series, shift: pd.Series) -> bool: | |
| role_req = str(shift.get("RoleRequired","")).strip().lower() | |
| specialty = str(shift.get("Specialty","")).replace("-", " ").strip().lower() | |
| mandatory = [t.strip() for t in str(shift.get("MandatoryTraining","")).replace(",", "/").split("/") if t.strip()] | |
| ward = str(shift.get("WardDepartment","")).strip().lower() | |
| lang_req = str(shift.get("LanguageRequirement","")).strip().lower() | |
| role_ok = (row.get("JobRole","").strip().lower() == role_req) or _has_any([role_req], row.get("organizationPath","")) | |
| spec_ok = (not specialty) or _has_any([specialty], row.get("JobRole","")) \ | |
| or _has_any([specialty], row.get("organizationPath","")) \ | |
| or _has_any([specialty], " ".join(row.get("Certifications", []))) | |
| training_ok = (not mandatory) or _has_all_in_list(mandatory, row.get("Certifications", [])) | |
| ward_ok = (not ward) or _has_any([ward], row.get("organizationPath","")) | |
| lang_ok = (not lang_req) or (lang_req in [l.lower() for l in (row.get("Languages") or [])]) | |
| return role_ok and spec_ok and training_ok and ward_ok and lang_ok | |
| def is_eligible_lenient(row: pd.Series, shift: pd.Series) -> bool: | |
| role_req = str(shift.get("RoleRequired","")).strip().lower() | |
| specialty = str(shift.get("Specialty","")).replace("-", " ").strip().lower() | |
| mandatory_raw = str(shift.get("MandatoryTraining","")) | |
| mandatory = [t.strip() for t in mandatory_raw.replace(",", "/").split("/") if t.strip()] | |
| ward = str(shift.get("WardDepartment","")).strip().lower() | |
| lang_req = str(shift.get("LanguageRequirement","")).strip().lower() | |
| text_all = " ".join([ | |
| row.get("JobRole",""), row.get("organizationPath",""), | |
| " ".join((row.get("Certifications") or [])) | |
| ]).lower() | |
| role_ok = (row.get("JobRole","").strip().lower() == role_req) or (role_req and role_req in text_all) | |
| spec_ok = (not specialty) or any(tok in text_all for tok in [specialty, specialty.replace(" ", ""), "icu"]) | |
| training_ok = (not mandatory) or all(any(mtok in text_all for mtok in [m.lower(), m.lower().replace(" certified","")]) for m in mandatory) | |
| ward_ok = (not ward) or (ward in text_all) | |
| lang_ok = (not lang_req) or (lang_req in [l.lower() for l in (row.get("Languages") or [])]) | |
| core_ok = role_ok or spec_ok or training_ok | |
| return core_ok and (not ward or ward_ok) and (not lang_req or lang_ok) | |
| def gpt_decide(shift: pd.Series, eligible_df: pd.DataFrame) -> Dict[str, str]: | |
| if not HAS_OPENAI or openai is None or not os.getenv("OPENAI_API_KEY"): | |
| return {"action": "skip"} | |
| emp_list = [] | |
| for _, r in eligible_df.iterrows(): | |
| role_match = str(shift.get('RoleRequired','')) in (r.get("organizationPath") or "") or \ | |
| (r.get("JobRole","") == str(shift.get('RoleRequired',''))) | |
| mand_tokens = [t.strip() for t in str(shift.get("MandatoryTraining","")).replace(",", "/").split("/") if t.strip()] | |
| cert_ok = _has_all_in_list(mand_tokens, r.get("Certifications") or []) | |
| score = int(bool(role_match)) + int(bool(cert_ok)) | |
| emp_list.append({ | |
| "fullName": r.get("fullName",""), | |
| "phoneNumber": r.get("phoneNumber",""), | |
| "organizationPath": r.get("organizationPath",""), | |
| "Certifications": r.get("Certifications",[]), | |
| "matchScore": score | |
| }) | |
| prompt = f""" | |
| You are an intelligent shift assignment assistant: | |
| Shift Details: | |
| - Department: {shift.get('Department','')} | |
| - Required Role: {shift.get('RoleRequired','')} | |
| - Specialty/Experience: {shift.get('Specialty','')} | |
| - Mandatory Training: {shift.get('MandatoryTraining','')} | |
| - Shift Type/Duration: {shift.get('ShiftType','')} / {shift.get('ShiftDuration','')} | |
| - Ward/Department: {shift.get('WardDepartment','')} | |
| - Language Required: {shift.get('LanguageRequirement','')} | |
| - Time: {shift.get('ShiftTime','')} | |
| Eligible Employees (matchScore=0..2): {emp_list if emp_list else 'None'} | |
| Choose JSON only: | |
| {{"action":"assign","employee":"Name"}} or {{"action":"notify","employee":"Name"}} or {{"action":"skip"}} | |
| """.strip() | |
| try: | |
| resp = openai.ChatCompletion.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role":"system","content":"You are an intelligent healthcare shift management assistant"}, | |
| {"role":"user","content":prompt} | |
| ], | |
| temperature=0.4, | |
| max_tokens=200, | |
| ) | |
| import ast | |
| return ast.literal_eval(resp["choices"][0]["message"]["content"]) | |
| except Exception as e: | |
| st.error(f"β GPT Error: {e}") | |
| return {"action": "skip"} | |
| # ============================================================================= | |
| # Showcase helpers (unchanged) | |
| # ============================================================================= | |
| def _append_demo_employee(df_employees: pd.DataFrame) -> pd.DataFrame: | |
| demo = { | |
| "personNumber": "D-1001", | |
| "organizationPath": "Hospital/A&E/ICU/Registered Nurse", | |
| "phoneNumber": "966500001234", | |
| "fullName": "Ali Mansour", | |
| "Certifications": ["BLS certified", "ALS certified", "ICU-trained"], | |
| "Languages": ["Arabic"], | |
| "JobRole": "Registered Nurse", | |
| } | |
| extra = pd.DataFrame([demo]) | |
| cols = list(set(df_employees.columns) | set(extra.columns)) | |
| df_employees = df_employees.reindex(columns=cols) | |
| extra = extra.reindex(columns=cols) | |
| return pd.concat([df_employees, extra], ignore_index=True) | |
| def _score_match(row: pd.Series, shift: pd.Series) -> int: | |
| text_all = " ".join([ | |
| row.get("JobRole",""), row.get("organizationPath",""), | |
| " ".join((row.get("Certifications") or [])), " ".join((row.get("Languages") or [])) | |
| ]).lower() | |
| role_req = str(shift.get("RoleRequired","")).strip().lower() | |
| specialty = str(shift.get("Specialty","")).replace("-", " ").strip().lower() | |
| ward = str(shift.get("WardDepartment","")).strip().lower() | |
| lang = str(shift.get("LanguageRequirement","")).strip().lower() | |
| pts = 0 | |
| if role_req and role_req in text_all: pts += 3 | |
| if specialty and ("icu" in text_all or specialty.replace(" ", "") in text_all): pts += 2 | |
| if ward and ward in text_all: pts += 1 | |
| if lang and lang in text_all: pts += 1 | |
| if any(tok in text_all for tok in ["bls", "als"]): pts += 2 | |
| return pts | |
| # ============================================================================= | |
| # Agent runner (same logic, safer ops) | |
| # ============================================================================= | |
| def run_agent( | |
| employee_ids: Iterable[int], | |
| df_shifts: pd.DataFrame, | |
| showcase: bool = False, | |
| lenient: bool = False, | |
| ): | |
| events: List[Dict[str, Any]] = [] | |
| shift_assignment_results: List[tuple] = [] | |
| reasoning_rows: List[Dict[str, Any]] = [] | |
| # 1) Fetch employees | |
| t0 = time.perf_counter() | |
| df_employees = fetch_employees(employee_ids) | |
| if showcase and len(df_employees) < 3: | |
| df_employees = _append_demo_employee(df_employees) | |
| dur = (time.perf_counter() - t0) * 1000 | |
| loaded_txt = f"{len(df_employees)} employee(s) loaded successfully." if not df_employees.empty else "No employees returned or credentials invalid." | |
| events.append({ | |
| "title": "Fetch employees", | |
| "desc": loaded_txt, | |
| "status": "done", | |
| "dur_ms": max(int(dur), 1) | |
| }) | |
| # 2) Evaluate shifts | |
| t_eval = time.perf_counter() | |
| events.append({ | |
| "title": "Evaluate open shifts", | |
| "desc": "Matching employees vs. role/specialty/training/wardβ¦", | |
| "status": "done" | |
| }) | |
| elig_fn = is_eligible_lenient if lenient else is_eligible_strict | |
| for _, shift in df_shifts.iterrows(): | |
| eligible = df_employees[df_employees.apply(lambda r: elig_fn(r, shift), axis=1)] if not df_employees.empty else pd.DataFrame() | |
| # 3) AI decision (or showcase distribution) | |
| t_ai = time.perf_counter() | |
| if showcase: | |
| ranked = df_employees.copy() | |
| ranked["__score"] = ranked.apply(lambda r: _score_match(r, shift), axis=1) | |
| ranked = ranked.sort_values("__score", ascending=False).reset_index(drop=True) | |
| cand_assign = ranked.iloc[0] if len(ranked) > 0 else None | |
| cand_notify = ranked.iloc[1] if len(ranked) > 1 else None | |
| cand_skip = ranked.iloc[-1] if len(ranked) > 2 else None | |
| # add explicit "AI decision" event first | |
| ai_dur = (time.perf_counter() - t_ai) * 1000 | |
| events.append({ | |
| "title": "AI decision", | |
| "desc": "Showcase distribution Assign/Notify/Skip", | |
| "status": "done", | |
| "dur_ms": max(int(ai_dur), 1) | |
| }) | |
| if cand_assign is not None: | |
| name = cand_assign.get("fullName", "Candidate A") | |
| events.append({ | |
| "title": f"Assigned {name}", | |
| "desc": f"{name} β {shift.get('Department','')} ({shift.get('ShiftType','')}, {shift.get('ShiftDuration','')})", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append((name, shift["ShiftID"], "β Auto-Filled")) | |
| if cand_notify is not None: | |
| name = cand_notify.get("fullName", "Candidate B") | |
| events.append({ | |
| "title": f"Notify {name}", | |
| "desc": f"Send notification for {shift['ShiftID']}", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append((name, shift["ShiftID"], "π¨ Notify")) | |
| if cand_skip is not None: | |
| name = cand_skip.get("fullName", "Candidate C") | |
| events.append({ | |
| "title": "Skipped", | |
| "desc": f"No eligible employees or decision skipped (e.g., low match for {name})", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append((name, shift["ShiftID"], "β οΈ Skipped")) | |
| else: | |
| events.append({ | |
| "title": "AI decision", | |
| "desc": "Select assign / notify / skip", | |
| "status": "done" | |
| }) | |
| decision = gpt_decide(shift, eligible) | |
| ai_dur = (time.perf_counter() - t_ai) * 1000 | |
| events[-1]["dur_ms"] = max(int(ai_dur), 1) | |
| if decision.get("action") == "assign": | |
| emp = decision.get("employee") | |
| events.append({ | |
| "title": f"Assigned {emp}", | |
| "desc": f"{emp} β {shift.get('Department','')} ({shift.get('ShiftType','')}, {shift.get('ShiftDuration','')})", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append((emp, shift["ShiftID"], "β Auto-Filled")) | |
| elif decision.get("action") == "notify": | |
| emp = decision.get("employee") | |
| events.append({ | |
| "title": f"Notify {emp}", | |
| "desc": f"Send notification for {shift['ShiftID']}", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append((emp, shift["ShiftID"], "π¨ Notify")) | |
| else: | |
| events.append({ | |
| "title": "Skipped", | |
| "desc": "No eligible employees or decision skipped", | |
| "status": "done" | |
| }) | |
| shift_assignment_results.append(("β No eligible", shift["ShiftID"], "β οΈ Skipped")) | |
| # Generate reasoning rows | |
| for _, emp_row in df_employees.iterrows(): | |
| role_match = str(shift.get("RoleRequired","")).strip().lower() == emp_row.get("JobRole","").strip().lower() | |
| spec = str(shift.get("Specialty","")).replace("-", " ").strip().lower() | |
| text_all = (emp_row.get("JobRole","")+" "+emp_row.get("organizationPath","")+" "+" ".join(emp_row.get("Certifications",[]))).lower() | |
| spec_ok = (spec and ("icu" in text_all or spec.replace(" ", "") in text_all)) | |
| training_need = [t.strip().lower() for t in str(shift.get("MandatoryTraining","")).replace(",", "/").split("/") if t.strip()] | |
| training_ok_strict = all(t in [c.lower() for c in (emp_row.get("Certifications",[]) or [])] for t in training_need) | |
| training_ok_len = all(any(mtok in text_all for mtok in [t, t.replace(" certified","")]) for t in training_need) if training_need else True | |
| ward_ok = not str(shift.get("WardDepartment","")).strip() or str(shift.get("WardDepartment","")).lower() in (emp_row.get("organizationPath","").lower()) | |
| lang_req = str(shift.get("LanguageRequirement","")).strip().lower() | |
| lang_ok = (not lang_req) or (lang_req in [l.lower() for l in (emp_row.get("Languages") or [])]) | |
| if lenient: | |
| status = "β Eligible" if (role_match or spec_ok or training_ok_len) and ward_ok and lang_ok else "β Not Eligible" | |
| else: | |
| status = "β Eligible" if ((role_match or spec_ok) and training_ok_strict and ward_ok and lang_ok) else "β Not Eligible" | |
| reasoning_rows.append({ | |
| "Employee": emp_row.get("fullName",""), | |
| "ShiftID": shift["ShiftID"], | |
| "Eligible": status, | |
| "Reasoning": " | ".join([ | |
| "β Role" if role_match else "β Role", | |
| "β Specialty" if spec_ok else "β Specialty", | |
| "β Training" if (training_ok_len if lenient else training_ok_strict) else "β Training", | |
| "β Ward" if ward_ok else "β Ward", | |
| f"Lang: {shift.get('LanguageRequirement','') or 'β'}" + (" β " if lang_ok else " β") | |
| ]), | |
| "Certifications": ", ".join([str(x).strip() for x in (emp_row.get("Certifications", []) or []) if x]) | |
| }) | |
| eval_dur = (time.perf_counter() - t_eval) * 1000 | |
| for e in events: | |
| if e["title"] == "Evaluate open shifts": | |
| e["dur_ms"] = max(int(eval_dur), 1) | |
| break | |
| events.append({ | |
| "title": "Summary ready", | |
| "desc": "AI finished processing shifts", | |
| "status": "current" | |
| }) | |
| return events, shift_assignment_results, reasoning_rows | |
| # ============================================================================= | |
| # UI Components | |
| # ============================================================================= | |
| def render_hero(): | |
| st.markdown(""" | |
| <div class="hero-section"> | |
| <div class="hero-content"> | |
| <div class="hero-logo"> | |
| <img src="https://www.healthmatrixcorp.com/web/image/website/1/logo/Health%20Matrix?unique=956ad7b" alt="Health Matrix" /> | |
| </div> | |
| <h1 class="hero-title"> | |
| <span class="gradient-text">AI Command Center</span><br> | |
| <span class="subtitle">Smart Staffing & Actions</span> | |
| </h1> | |
| <p class="hero-description"> | |
| Transform healthcare delivery with AI-driven workforce management, predictive analytics, | |
| and intelligent automation that ensures compliance while optimizing patient care. | |
| </p> | |
| <div class="hero-stats"> | |
| <div class="stat-item"><div class="stat-value">99.9%</div><div class="stat-label">Compliance Rate</div></div> | |
| <div class="stat-item"><div class="stat-value">45%</div><div class="stat-label">Efficiency Gain</div></div> | |
| <div class="stat-item"><div class="stat-value"><2min</div><div class="stat-label">Response Time</div></div> | |
| </div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def render_kpi_cards(assigned: int, notified: int, skipped: int): | |
| st.markdown(f""" | |
| <div class="kpi-grid"> | |
| <div class="kpi-card kpi-assigned"> | |
| <div class="kpi-icon">β </div> | |
| <div class="kpi-value">{assigned}</div> | |
| <div class="kpi-label">Auto-Assigned</div> | |
| <div class="kpi-change">+15%</div> | |
| </div> | |
| <div class="kpi-card kpi-notify"> | |
| <div class="kpi-icon">π¨</div> | |
| <div class="kpi-value">{notified}</div> | |
| <div class="kpi-label">Notifications Sent</div> | |
| <div class="kpi-change">-8%</div> | |
| </div> | |
| <div class="kpi-card kpi-skipped"> | |
| <div class="kpi-icon">β οΈ</div> | |
| <div class="kpi-value">{skipped}</div> | |
| <div class="kpi-label">Skipped</div> | |
| <div class="kpi-change">-23%</div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def _timeline_html(events: List[Dict[str, Any]]) -> str: | |
| items = [] | |
| for event in events: | |
| status_class = event.get("status", "done") | |
| duration = event.get("dur_ms", 0) | |
| duration_text = f"{int(duration)} ms" if duration else "β" | |
| items.append(f""" | |
| <div class="timeline-item {status_class}"> | |
| <div class="timeline-marker"></div> | |
| <div class="timeline-content"> | |
| <div class="timeline-title">{event.get('title', '')}</div> | |
| <div class="timeline-meta">Duration: {duration_text}</div> | |
| <div class="timeline-desc">{event.get('desc', '')}</div> | |
| </div> | |
| </div>""") | |
| return f"""<div class="timeline-wrap"> | |
| <h3>π Agent Timeline</h3> | |
| <div class="timeline">{''.join(items)}</div> | |
| </div>""" | |
| def render_timeline(events: List[Dict[str, Any]]): | |
| html = f"""<!doctype html> | |
| <html><head><meta charset="utf-8"><style>{_EMBEDDED_CSS}</style></head> | |
| <body>{_timeline_html(events)}</body></html>""" | |
| # Use components.html to avoid any Markdown escaping | |
| components.html(html, height=520, scrolling=True) | |
| def render_employee_board(shift_results: List[tuple], reasoning_rows: List[Dict[str, Any]]): | |
| action_by_emp, shift_by_emp = {}, {} | |
| for emp, sid, status in shift_results: | |
| if emp and not str(emp).startswith("β"): | |
| action_by_emp[str(emp)] = status | |
| shift_by_emp[str(emp)] = str(sid) | |
| seen = set() | |
| employees = [] | |
| for r in reasoning_rows: | |
| emp = (r.get("Employee") or "").strip() | |
| if not emp or emp in seen: | |
| continue | |
| seen.add(emp) | |
| employees.append({ | |
| "name": emp, | |
| "shift": r.get("ShiftID", ""), | |
| "eligible": r.get("Eligible", ""), | |
| "reasoning": r.get("Reasoning", ""), | |
| "certs": r.get("Certifications", ""), | |
| "action": action_by_emp.get(emp, "") | |
| }) | |
| if not employees: | |
| st.info("No employee details available.") | |
| return | |
| lanes = {"assigned": [], "notify": [], "skipped": []} | |
| for emp in employees: | |
| action = (emp["action"] or "").lower() | |
| if "auto-filled" in action or "assign" in action: | |
| lane = "assigned" | |
| elif "notify" in action: | |
| lane = "notify" | |
| else: | |
| lane = "notify" if emp["eligible"].startswith("β ") else "skipped" | |
| lanes[lane].append(emp) | |
| st.markdown(f""" | |
| <div class="kanban-board"> | |
| <div class="kanban-lane"> | |
| <div class="lane-header assigned-header"><span class="lane-icon">β </span> | |
| <span class="lane-title">Assigned</span><span class="lane-count">{len(lanes["assigned"])}</span></div> | |
| <div class="lane-content"> | |
| """, unsafe_allow_html=True) | |
| for emp in lanes["assigned"]: | |
| render_employee_card(emp, "assigned") | |
| st.markdown(f""" | |
| </div> | |
| </div> | |
| <div class="kanban-lane"> | |
| <div class="lane-header notify-header"><span class="lane-icon">π¨</span> | |
| <span class="lane-title">Notify</span><span class="lane-count">{len(lanes["notify"])}</span></div> | |
| <div class="lane-content"> | |
| """, unsafe_allow_html=True) | |
| for emp in lanes["notify"]: | |
| render_employee_card(emp, "notify") | |
| st.markdown(f""" | |
| </div> | |
| </div> | |
| <div class="kanban-lane"> | |
| <div class="lane-header skipped-header"><span class="lane-icon">β οΈ</span> | |
| <span class="lane-title">Skipped</span><span class="lane-count">{len(lanes["skipped"])}</span></div> | |
| <div class="lane-content"> | |
| """, unsafe_allow_html=True) | |
| for emp in lanes["skipped"]: | |
| render_employee_card(emp, "skipped") | |
| st.markdown("</div></div></div>", unsafe_allow_html=True) | |
| def render_employee_card(emp: Dict[str, Any], lane: str): | |
| name = emp["name"] | |
| shift = emp["shift"] | |
| reasoning = emp["reasoning"] | |
| certs = emp["certs"] | |
| action = emp["action"] | |
| initials = "".join([w[0] for w in name.split()[:2]]).upper() if name else "?" | |
| checks = [x.strip() for x in str(reasoning or "").split("|") if x and x.strip()] | |
| badges_html = "" | |
| for check in checks: | |
| is_ok = "β " in check | |
| label = check.replace("β ", "").replace("β", "").strip() | |
| badge_class = "badge-ok" if is_ok else "badge-fail" | |
| badges_html += f'<div class="emp-badge {badge_class}"><span class="badge-dot"></span><span>{label}</span></div>' | |
| chip_text = action or ("β Eligible" if emp["eligible"].startswith("β ") else "β Not Eligible") | |
| chip_class = "chip-ok" if ("β " in chip_text or "Auto-Filled" in chip_text) else ("chip-warn" if "Notify" in chip_text else "chip-fail") | |
| st.markdown(f""" | |
| <div class="employee-card {lane}-card"> | |
| <div class="emp-header"> | |
| <div class="emp-avatar">{initials}</div> | |
| <div class="emp-info"> | |
| <div class="emp-name">{name}</div> | |
| <div class="emp-shift">Shift: {shift}</div> | |
| </div> | |
| <div class="emp-chip {chip_class}">{chip_text}</div> | |
| </div> | |
| <div class="emp-divider"></div> | |
| <div class="emp-badges"> | |
| {badges_html if badges_html else '<div class="emp-badge badge-info"><span class="badge-dot"></span><span>No reasoning available</span></div>'} | |
| </div> | |
| <div class="emp-footer"> | |
| <div class="emp-certs">πͺͺ Certifications: {certs or "β"}</div> | |
| <div class="emp-meta">Updated just now</div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def render_status_indicator(status: str, message: str): | |
| if status == "processing": | |
| st.markdown(f""" | |
| <div class="status-indicator processing"> | |
| <div class="status-dot pulsing"></div><span>{message}</span> | |
| </div>""", unsafe_allow_html=True) | |
| elif status == "completed": | |
| st.markdown(f""" | |
| <div class="status-indicator completed"> | |
| <div class="status-dot completed"></div><span>{message}</span> | |
| </div>""", unsafe_allow_html=True) | |
| else: | |
| st.markdown(f""" | |
| <div class="status-indicator ready"> | |
| <div class="status-dot ready"></div><span>{message}</span> | |
| </div>""", unsafe_allow_html=True) | |
| # ============================================================================= | |
| # Main Application | |
| # ============================================================================= | |
| def main(): | |
| setup_page() | |
| render_hero() | |
| st.markdown(""" | |
| <div class="business-case"> | |
| <h4>Business Case β Open Shifts Auto-Fulfillment</h4> | |
| <ul> | |
| <li>β Reduce time to fill critical shifts</li> | |
| <li>π Enforce mandatory certifications & policies</li> | |
| <li>π Transparent Agent Timeline (steps + durations)</li> | |
| </ul> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with st.expander("π§ Demo / Showcase Controls", expanded=False): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| show_showcase = st.checkbox("Ensure Assign + Notify + Skip (adds a demo nurse if needed)", value=True) | |
| lenient_mode = st.checkbox("Lenient eligibility (OR) for demo", value=True) | |
| with col2: | |
| st.caption("When these options are disabled, behavior returns to original strict logic (AND).") | |
| status_placeholder = st.empty() | |
| with status_placeholder.container(): | |
| render_status_indicator("ready", "Ready β Click 'Start AI Agent' to evaluate shifts") | |
| # Demo data | |
| employee_ids_default = [850, 825, 4503] | |
| shift_data = """ShiftID,Department,RoleRequired,Specialty,MandatoryTraining,ShiftType,ShiftDuration,WardDepartment,LanguageRequirement,ShiftTime | |
| S101,ICU,Registered Nurse,ICU-trained,BLS/ALS certified,Night,12h,A&E,Arabic,2025-06-04 07:00""" | |
| df_shifts_default = pd.read_csv(StringIO(shift_data)) | |
| st.markdown('<div class="cta-container">', unsafe_allow_html=True) | |
| if st.button("βΆοΈ Start AI Agent", key="start_agent", help="Let the AI handle the work"): | |
| with status_placeholder.container(): | |
| render_status_indicator("processing", "Processing β contacting UKG APIs and evaluating shifts...") | |
| try: | |
| events, shift_assignment_results, reasoning_rows = run_agent( | |
| employee_ids_default, | |
| df_shifts_default, | |
| showcase=show_showcase, | |
| lenient=lenient_mode | |
| ) | |
| with status_placeholder.container(): | |
| render_status_indicator("completed", "Completed β results below") | |
| assigned = sum(1 for e in shift_assignment_results if "Auto-Filled" in e[2]) | |
| notified = sum(1 for e in shift_assignment_results if "Notify" in e[2]) | |
| skipped = sum(1 for e in shift_assignment_results if "Skip" in e[2] or "Skipped" in e[2]) | |
| col1, col2 = st.columns([1, 2], gap="large") | |
| with col1: | |
| render_timeline(events) | |
| with col2: | |
| total_steps = len(events) | |
| total_ms = sum(int(e.get("dur_ms", 0) or 0) for e in events) | |
| st.markdown(f""" | |
| <div class="overview-metrics"> | |
| <div class="metric-chip">π§ <strong>Agent Steps:</strong> {total_steps}</div> | |
| <div class="metric-chip">β±οΈ <strong>Total Duration:</strong> {total_ms} ms</div> | |
| <div class="metric-chip">β <strong>Assigned:</strong> {assigned}</div> | |
| <div class="metric-chip">π¬ <strong>Notify:</strong> {notified}</div> | |
| <div class="metric-chip">β οΈ <strong>Skipped:</strong> {skipped}</div> | |
| </div>""", unsafe_allow_html=True) | |
| render_kpi_cards(assigned, notified, skipped) | |
| st.markdown("### π₯ Employee Results") | |
| render_employee_board(shift_assignment_results, reasoning_rows) | |
| with st.expander("Raw Summary (optional)", expanded=False): | |
| if shift_assignment_results: | |
| st.markdown("**Assignment Summary:**") | |
| st.dataframe(pd.DataFrame(shift_assignment_results, columns=["Employee", "ShiftID", "Status"]), use_container_width=True) | |
| if reasoning_rows: | |
| st.markdown("**Detailed Reasoning:**") | |
| st.dataframe(pd.DataFrame(reasoning_rows), use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Unexpected error: {e}") | |
| with status_placeholder.container(): | |
| render_status_indicator("ready", "Error occurred β please try again") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| st.markdown(""" | |
| <div class="footer"> | |
| <hr/> | |
| <div class="footer-content"> | |
| Β© 2025 Health Matrix Corp β Empowering Digital Health Transformation Β· | |
| <a href="mailto:info@healthmatrixcorp.com">info@healthmatrixcorp.com</a> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |