Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import streamlit as st | |
| from datetime import datetime | |
| import pandas as pd | |
| from io import StringIO | |
| from twilio.rest import Client | |
| from transformers import pipeline | |
| # ---- ENVIRONMENT CONFIGURATION ---- | |
| os.environ["HF_HOME"] = "/tmp" | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp" | |
| # ---- TWILIO CONFIGURATION ---- | |
| TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID') | |
| TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN') | |
| TWILIO_PHONE_NUMBER = os.environ.get('TWILIO_PHONE_NUMBER') | |
| employee_phone_numbers = { | |
| "E001": "+18777804236", | |
| "E002": "+18777804236", | |
| } | |
| client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) | |
| # ---- GPT-2 PIPELINE ---- | |
| def load_gpt2(): | |
| return pipeline("text-generation", model="gpt2", model_kwargs={"cache_dir": "/tmp"}, max_length=512) | |
| gpt2_pipeline = load_gpt2() | |
| # ---- STREAMLIT UI SETUP ---- | |
| st.set_page_config(page_title="AI Shift Filler for Workforce Management") | |
| st.title("π₯ AI Shift Shortage Detection and Auto-Fulfillment") | |
| st.markdown(""" | |
| ### π· Workforce Management Use Case | |
| This AI Agent automatically: | |
| - π Detects staff shortages | |
| - π§ Identifies qualified, available employees | |
| - π© Sends SMS/email shift offers | |
| - β Fills the open shift upon acceptance | |
| """) | |
| # ---- WFM API CONFIG ---- | |
| WFM_AUTH_URL = "https://partnerdemo-019.cfn.mykronos.com/api/authentication/access_token" | |
| WFM_PROFILE_URL = "https://partnerdemo-019.cfn.mykronos.com/api/v1/commons/profiles/people_profiles" | |
| APP_KEY = "SARH2VSSdpOEbcryyGTxXCk23Wm7YVOv" | |
| CLIENT_ID = "KZJbuClBmM3bPk07s7cCfoNKZ6jJ3jLd" | |
| CLIENT_SECRET = "ODq3D9lIYzSg0HFo" | |
| USERNAME = "SeanIvan" | |
| PASSWORD = "P@$$w)rdHm2025" | |
| def get_access_token(): | |
| headers = { | |
| "appkey": APP_KEY, | |
| "Content-Type": "application/x-www-form-urlencoded" | |
| } | |
| data = { | |
| "grant_type": "password", | |
| "auth_chain": "OAuthLdapService", | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "username": USERNAME, | |
| "password": PASSWORD | |
| } | |
| response = requests.post(WFM_AUTH_URL, headers=headers, data=data) | |
| response.raise_for_status() | |
| return response.json()["access_token"] | |
| def fetch_employee_profiles(token): | |
| headers = { | |
| "appkey": APP_KEY, | |
| "Authorization": token, | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.get(WFM_PROFILE_URL, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| # ---- FETCH EMPLOYEE DATA FROM WFM ---- | |
| try: | |
| token = get_access_token() | |
| profiles = fetch_employee_profiles(token) | |
| records = [] | |
| for p in profiles.get("peopleProfiles", []): | |
| records.append({ | |
| "ID": p.get("id", "N/A"), | |
| "Name": f"{p.get('firstName', '')} {p.get('lastName', '')}".strip(), | |
| "Skills": p.get("customFields", {}).get("skills", "ICU").split(","), | |
| "Certifications": p.get("customFields", {}).get("certifications", "ACLS").split(","), | |
| "Available": p.get("employmentStatus", {}).get("status", "") == "Active", | |
| "OvertimeHours": float(p.get("customFields", {}).get("overtimeHours", 0)) | |
| }) | |
| df_employees = pd.DataFrame(records) | |
| except Exception as e: | |
| st.error(f"β Failed to fetch employee data: {str(e)}") | |
| st.stop() | |
| # ---- SHIFT DATA (HARDCODED FOR NOW) ---- | |
| shift_data = """ | |
| ShiftID,Department,RequiredSkill,RequiredCert,ShiftTime | |
| S101,ICU,ICU,ACLS,2025-06-04 07:00 | |
| """ | |
| df_shifts = pd.read_csv(StringIO(shift_data)) | |
| # ---- MATCHING LOGIC ---- | |
| def find_eligible_employees(shift, employees): | |
| return employees[ | |
| (employees['Skills'].apply(lambda s: shift['RequiredSkill'] in s)) & | |
| (employees['Certifications'].apply(lambda c: shift['RequiredCert'] in c)) & | |
| (employees['Available']) & | |
| (employees['OvertimeHours'] < 10) | |
| ] | |
| # ---- PROCESS EACH SHIFT ---- | |
| results = [] | |
| for _, shift in df_shifts.iterrows(): | |
| eligible = find_eligible_employees(shift, df_employees) | |
| if not eligible.empty: | |
| for _, emp in eligible.iterrows(): | |
| phone_number = employee_phone_numbers.get(emp['ID'], None) | |
| if phone_number: | |
| sms_body = ( | |
| f"Shift Alert! Dear {emp['Name']}, " | |
| f"a shift in {shift['Department']} at {shift['ShiftTime']} is available. " | |
| f"Please reply to accept or decline." | |
| ) | |
| try: | |
| client.messages.create( | |
| body=sms_body, | |
| from_=TWILIO_PHONE_NUMBER, | |
| to=phone_number | |
| ) | |
| results.append((emp['Name'], shift['ShiftID'], "SMS Sent")) | |
| except Exception as e: | |
| results.append((emp['Name'], shift['ShiftID'], f"Failed: {str(e)}")) | |
| else: | |
| results.append((emp['Name'], shift['ShiftID'], "No phone number")) | |
| else: | |
| results.append(("No eligible staff", shift['ShiftID'], "Shift Unfilled")) | |
| # ---- DISPLAY RESULTS ---- | |
| st.subheader("π¬ Shift Assignment Summary") | |
| result_df = pd.DataFrame(results, columns=["Employee", "ShiftID", "Status"]) | |
| st.dataframe(result_df) | |
| # ---- GPT-2 INSIGHT ---- | |
| st.subheader("π§ GPT-2 Insights on Staffing") | |
| if st.button("Generate Shift Fulfillment Recommendations"): | |
| shift_info = "" | |
| for _, row in df_shifts.iterrows(): | |
| shift_info += ( | |
| f"Shift {row['ShiftID']} in {row['Department']} at {row['ShiftTime']} requires {row['RequiredSkill']} with {row['RequiredCert']}.\n" | |
| ) | |
| gpt_prompt = f""" | |
| As an AI assistant, suggest strategies to handle shift shortages based on the following: | |
| {shift_info} | |
| Recommendations: | |
| """ | |
| with st.spinner("AI Agent (GPT-2) is analyzing..."): | |
| try: | |
| result = gpt2_pipeline(gpt_prompt)[0]['generated_text'] | |
| st.success("β GPT-2 Suggestion Ready") | |
| st.text_area("π GPT-2 Output", result, height=300) | |
| except Exception as e: | |
| st.error(f"Error using GPT-2: {str(e)}") | |