# Imports. import gradio as gr import pandas as pd from datetime import datetime, time as dttime import uuid from zoneinfo import ZoneInfo import tempfile import os import shutil import json from weasyprint import HTML, CSS import threading import time import requests import numpy as np from collections import defaultdict # ==================================================== Configuration ==================================================== # Directory where uploaded Excel files are stored UPLOAD_DIR = "Uploads" # File that stores upload timestamps for each uploaded file UPLOAD_TIMES_FILE = os.path.join(UPLOAD_DIR, "upload_times.json") # Timezone object for Chicago (used for all timestamp handling) CHICAGO_TZ = ZoneInfo("America/Chicago") # List of age groups used throughout the application for coverage checks AGE_LST = ["Newborn-5mo", "6mo-9yo", "10-17yo", "18-20yo", "21-24yo", "25+yo"] # List of recognized "off-duty" notes that indicate a provider is not working OFF_LST = ['OFF', 'VACATION', 'FMLA', 'ADMIN', 'PAID_LEAVE', 'CME', 'TEACHING', 'SICK', 'HOLIDAY', "CLINIC_CLOSED", "CLINIC_CLOSE"] # Age groups considered "under 18" UNDER_18G = ["Newborn-5mo", "6mo-9yo", "10-17yo"] # Age groups considered "18 and over" OVER_18G = ["18-20yo", "21-24yo", "25+yo"] # List of all possible clinic locations that can be selected or displayed AVAILABLE_LOCATIONS = ['Berwyn', 'Morgan', 'Western', "Urgent Care", 'Juarez', 'LVHS', 'Orozco', 'All Locations'] # Locations where age coverage checks are NOT performed (e.g., specialty or school-based sites) NO_AGE_CHECK_LOCATIONS = ["Urgent Care", 'Juarez', 'LVHS', 'Orozco', 'Psych', "OB/Gynecology"] # Locations where operational hour coverage checks are NOT performed NO_OPERATION_CHECK_LOCATIONS = ["Urgent Care", 'Juarez', 'LVHS', 'Orozco', 'Psych', "OB/Gynecology"] # Mapping from short/abbreviated location codes (as they appear in Excel) to full display names LOCATION_MAP = { 'B': 'Berwyn', 'M': 'Morgan', 'W': 'Western', "UC": "Urgent Care", 'J': 'Juarez', 'L': 'LVHS', 'O': 'Orozco', 'PSY/M': 'Psych', 'PSY/B': 'Psych', "OB/B": "OB/Gynecology", "OB/M": "OB/Gynecology", "OB/W": "OB/Gynecology", "OB": "OB/Gynecology" } # =============================================== Page-height constants (A4) =============================================== # Standard A4 page height in millimeters PAGE_HEIGHT_MM = 297 # Top margin used in PDF generation (in mm) TOP_MARGIN_MM = 1.5 # Bottom margin used in PDF generation (in mm) BOTTOM_MARGIN_MM = 5 # Usable printable height after subtracting margins PRINTABLE_HEIGHT_MM = PAGE_HEIGHT_MM - TOP_MARGIN_MM - BOTTOM_MARGIN_MM # ≈ 290 mm # Approximate height of a single text line in the calendar layout (in mm) LINE_HEIGHT_MM = 0.90 # 6.8 pt ≈ 0.9 mm # Estimated height of the day headers block (in mm) — measured empirically DAY_HEADERS_HEIGHT_MM = 5.0 # ========================================== Shared CSS (Used in both HTML preview and PDF generation) ========================================== # CSS styles shared between the HTML preview and the PDF output. # Uses placeholders {{locations}}, {{start}}, {{end}}, {{time}} that are replaced at generation time. SHARED_CSS = """ @page { size: A4; margin: 1.5mm 1.5mm 5mm 1.5mm; @top-center { font-size: 8pt; font-family: Arial, Helvetica, sans-serif; margin-top: 1.5mm; } @bottom-center { content: "Alivio Schedule for {{locations}} - {{start}} to {{end}}\aLast Edited on {{time}}"; font-size: 8pt; font-family: Arial, Helvetica, sans-serif; margin-bottom: 5mm; white-space: pre-line; line-height: 1.3; } } * { box-sizing: border-box; } body { font-family: Arial, Helvetica, sans-serif; margin: 1mm; width: 210mm; background-color: #ffffff; font-size: 10pt; orphans: 4; widows: 4; } .calendar { width: 210mm; padding: 0.5mm; background-color: #ffffff; margin-top: 5mm; margin-bottom: 15mm; } .page-group { break-before: page; break-after: page; page-break-inside: avoid; } .week-group { display: flex; flex-direction: column; gap: 1mm; margin-bottom: 2mm; } .week, .day, .event, .warning, .location-section, .hours-table { break-inside: avoid !important; page-break-inside: avoid !important; } .week { display: grid; grid-template-columns: 5mm repeat(6, 30mm); gap: 0.2mm; margin-bottom: 0.5mm; } .day-headers { display: grid; grid-template-columns: 5mm repeat(6, 30mm); gap: 0.2mm; margin-bottom: 0.3mm; } .week-number { font-weight: bold; text-align: center; background-color: #e0e0e0; padding: 0.5mm; border: 0.1mm solid #A6A09B; font-size: 8pt; line-height: 1.1; display: flex; align-items: center; justify-content: center; } .day { border: 0.1mm solid #A6A09B; padding: 0.5mm 0.5mm 0.8mm 0.5mm; background-color: #f9f9f9; border-radius: 0.5mm; display: flex; flex-direction: column; align-items: flex-start; overflow-wrap: break-word; line-height: 1.1; font-size: 7pt; } .day-header { border: 0.1mm solid #A6A09B; font-weight: bold; text-align: center; background-color: #e0e0e0; padding: 0.5mm; font-size: 8pt; width: 100%; line-height: 1.1; } .event, .event-info { margin: 0.2mm 0; font-size: 6.8pt; line-height: 1.05; overflow-wrap: break-word; } .warning, .overall-warning, .conflict-warning, .wen-conflict-warning, .operation-warning, .clinic-closed-warning { color: #d32f2f; font-weight: bold; margin: 0.2mm 0; padding: 0.4mm; border-radius: 0.5mm; font-size: 6.5pt; line-height: 1.05; border: 1px solid; } .warning { background-color: #fff3cd; } .overall-warning { background-color: #BBE3FC; } .conflict-warning { background-color: #F6CFFF; } .wen-conflict-warning { background-color: #fcd968; } .operation-warning { background-color: #ffcccb; } .clinic-closed-warning { background-color: #ff9999; font-size: 8pt; text-align: center; } .holiday-message { color: #000; font-weight: bold; font-size: 7.5pt; text-align: center; background-color: #fa91de; padding: 0.4mm; border-radius: 0.5mm; line-height: 1.1; } .location-section { margin: 0.2mm 0; padding: 0.4mm; border-left: 0.3mm solid #4682b4; font-size: 7.2pt; line-height: 1.05; } .hours-table { width: 200mm; border-collapse: collapse; margin: 4mm 0 8mm; font-size: 6.8pt; } .hours-table th, .hours-table td { border: 0.1mm solid #A6A09B; padding: 0.4mm; text-align: center; line-height: 1.1; } .hours-table th { background-color: #e0e0e0; font-weight: bold; } """ # === Helper Functions === def get_time_string(row): """ Convert start/end times (or special notes) into a human-readable string for display. Handles special cases like OB/Gynecology notes without times and recognized OFF states. """ start_t = safe_time(row['Start_Time']) end_t = safe_time(row['End_Time']) # Special handling for OB/Gynecology rows that only have a note (no times) if row['Location'] == "OB/Gynecology" and pd.isna(row['Start_Time']) and pd.isna(row['End_Time']) and pd.notna(row.get('Note')): return row['Note'].strip() # If no times but a note that indicates off-duty, display the note (or standardized term) if start_t is None and end_t is None and pd.notna(row.get('Note')): note = str(row['Note']).strip().upper() if note in OFF_LST or note == 'SCHOOL CLOSED': return row['Note'] if note not in OFF_LST else note # Normal case: both start and end times exist if start_t and end_t: time_str = f"{start_t.strftime('%H:%M')} - {end_t.strftime('%H:%M')}" if pd.notna(row.get('Note')) and row['Note'].strip(): return f"{time_str} ({row['Note']})" return time_str # Fallback for any other situation return "OFF" def determine_display_location(row): """ Determine the location where a provider row should be displayed. If the note contains location-related keywords (e.g., UC, JUAREZ), override the original Location. Priority: Note keywords > original Location. """ if pd.isna(row.get('Note')): return row['Location'] note_upper = str(row['Note']).upper().replace(' ', '').replace('-', '').replace('/', '') # Urgent Care related keywords → force display at 'Urgent Care' if any(k in note_upper for k in ['UC', 'UC+1HT']): return 'Urgent Care' # Juarez related keywords → force display at 'Juarez' if any(k in note_upper for k in ['JUAREZ', 'JUAREZ+1HT']): return 'Juarez' # Berwyn related keywords → force display at 'Berwyn' if any(k in note_upper for k in ['BERWYN']): return 'Berwyn' # No overriding keyword found → use the original (already mapped) location return row['Location'] def parse_date(val): """ Robustly parse various date representations from Excel into a pandas Timestamp. Handles integer serial dates, strings in multiple formats, and already-parsed objects. """ if pd.isna(val) or val is None: return None try: if isinstance(val, (int, float)): return pd.Timestamp('1899-12-30') + pd.Timedelta(days=val) elif isinstance(val, str): for fmt in ['%m/%d/%y', '%m/%d/%Y', '%Y-%m-%d']: try: return pd.to_datetime(val, format=fmt) except ValueError: continue return pd.to_datetime(val) elif isinstance(val, (pd.Timestamp, datetime)): return pd.to_datetime(val) return None except (ValueError, TypeError): return None def parse_time(value): """ Safely convert various time representations from Excel into a datetime.time object. Handles strings, datetime objects, and Excel float fractions of a day. Includes special rounding fixes for common Excel floating-point inaccuracies. """ if pd.isna(value) or value in ["", "OFF", "nan", "NaT"]: return None try: if isinstance(value, (datetime, dttime)): return value.time() if isinstance(value, datetime) else value # Try common string formats first for fmt in ['%H:%M:%S', '%H:%M']: try: return pd.to_datetime(value, format=fmt).time() except ValueError: continue # Handle Excel float (fraction of a day) → convert to hours/minutes with rounding hours = float(value) * 24 total_minutes = round(hours * 60) hour = total_minutes // 60 minute = total_minutes % 60 # Special corrections for known Excel floating-point quirks if 0.6666666666666666 <= value <= 0.6666666666666670: # 16:00 return dttime(16, 0) if 0.75 <= value <= 0.7500000000001: # 18:00 return dttime(18, 0) if 0.5208333333333333 <= value <= 0.5208333333333335: # 12:30 return dttime(12, 30) if 0.7291666666666666 <= value <= 0.7291666666666668: # 17:30 return dttime(17, 30) if 0.8333333333333333 <= value <= 0.8333333333333335: # 20:00 return dttime(20, 0) if 0.625 <= value <= 0.6250000000001: # 15:00 return dttime(15, 0) if 0 <= hour < 24 and 0 <= minute < 60: return dttime(int(hour), int(minute)) return None except (ValueError, TypeError): return None def get_clinic_hours(location, weekday): """ Return standard clinic operating hours, lunch breaks, etc. for a given location and weekday. Returns (open_time, close_time, list_of_break_intervals). Weekday is 0=Monday ... 6=Sunday. """ if location in ['Berwyn', 'Morgan', 'Western']: if weekday in [0, 1, 3, 4]: # Mon, Tue, Thu, Fri return dttime(8, 30), dttime(17, 30), [(dttime(12, 30), dttime(13, 30))] elif weekday == 2: # Wednesday return dttime(13, 0), dttime(20, 0), [(dttime(16, 0), dttime(17, 0))] elif weekday == 5: # Saturday return dttime(8, 30), dttime(15, 0), [(dttime(11, 30), dttime(12, 0))] elif location == 'Urgent Care': if weekday in [0, 1, 3, 4, 2]: return dttime(9, 0), dttime(18, 0), [(dttime(13, 0), dttime(14, 0))] elif weekday == 5: return dttime(9, 0), dttime(13, 30), [] elif location == 'Juarez': if weekday in [0, 1, 2, 3, 4]: return dttime(8, 30), dttime(16, 0), [(dttime(13, 0), dttime(14, 0))] elif location == 'Orozco': if weekday in [0, 1, 2, 3, 4]: return dttime(8, 0), dttime(16, 30), [] elif location == 'LVHS': if weekday in [0, 1, 2, 3]: return dttime(8, 30), dttime(16, 0), [(dttime(12, 0), dttime(13, 0))] elif weekday == 4: return dttime(12, 0), dttime(13, 0), [] # Default/fallback return None, None, [] def is_clinic_closed(providers_df, date, location): """ Return True if the clinic is closed on the given date/location: - Any provider has a note containing "CLINIC_CLOSE" (case-insensitive) - OR all providers at this location have no working hours (OFF note or missing times) """ df = providers_df[ (providers_df['Date'] == date) & (providers_df['Display_Location'] == location) ].copy() if df.empty: return True # no providers scheduled → closed # Check for explicit CLINIC_CLOSE note notes = df['Note'].dropna().astype(str).str.upper() if notes.str.contains('CLINIC_CLOSE').any(): return True # Check if any provider is actually working (has start/end times and not OFF) df['start_t'] = df['Start_Time'].apply(safe_time) df['end_t'] = df['End_Time'].apply(safe_time) working = df[ df['start_t'].notna() & df['end_t'].notna() & (~df['Note'].str.upper().fillna('').isin(OFF_LST)) ] return working.empty # closed if no one is working # === Height estimator (content-aware) === def estimate_week_height(week_content, display_locs): """ Estimate the vertical height (in mm) needed for a week block. Used for intelligent page breaking to avoid splitting weeks across pages. """ max_lines = 0 for day_html in week_content: content = day_html.split(' ', 1)[1] if ' ' in day_html else day_html lines = content.count('
') + content.count('') + 1 # Better estimate using tags if lines > max_lines: max_lines = lines base_height = 4.0 return base_height + max_lines * LINE_HEIGHT_MM # === File Handling === # Ensure the upload directory exists and is actually a directory if os.path.exists(UPLOAD_DIR) and not os.path.isdir(UPLOAD_DIR): raise FileExistsError(f"'{UPLOAD_DIR}' exists as a file. Please remove it.") os.makedirs(UPLOAD_DIR, exist_ok=True) def save_files(file_list): """ Save uploaded Excel files to the upload directory and record their upload timestamps. Accepts single file or list of files. """ if not file_list: return update_file_display() if not isinstance(file_list, list): file_list = [file_list] upload_times = {} if os.path.exists(UPLOAD_TIMES_FILE): with open(UPLOAD_TIMES_FILE, 'r') as f: upload_times = json.load(f) for file in file_list: if file and file.name.endswith(('.xlsx', '.xls')): filename = os.path.basename(file.name) dest_path = os.path.join(UPLOAD_DIR, filename) shutil.copy(file.name, dest_path) upload_times[filename] = datetime.now(CHICAGO_TZ).isoformat() with open(UPLOAD_TIMES_FILE, 'w') as f: json.dump(upload_times, f, indent=2) return update_file_display() def update_file_display(): """ Refresh the list of uploaded files, their paths, and display upload timestamps. Used to update the Gradio file gallery and dropdown after upload/delete. """ files = sorted([f for f in os.listdir(UPLOAD_DIR) if f.endswith(('.xlsx', '.xls'))]) file_paths = [os.path.join(UPLOAD_DIR, f) for f in files] upload_times = {} if os.path.exists(UPLOAD_TIMES_FILE): with open(UPLOAD_TIMES_FILE, 'r') as f: upload_times = json.load(f) file_times = [] for f in files: if f in upload_times: t = datetime.fromisoformat(upload_times[f]) file_times.append(f"{f}: Uploaded on {t.strftime('%Y-%m-%d %I:%M %p CDT')}") else: file_times.append(f"{f}: Upload time unknown") return file_paths, gr.update(choices=files or [], value=None), "\n".join(file_times) if file_times else "No files uploaded." def delete_file(filename): """ Delete a selected uploaded file and remove its timestamp record. """ if filename: path = os.path.join(UPLOAD_DIR, filename) if os.path.exists(path): os.remove(path) if os.path.exists(UPLOAD_TIMES_FILE): with open(UPLOAD_TIMES_FILE, 'r') as f: times = json.load(f) times.pop(filename, None) with open(UPLOAD_TIMES_FILE, 'w') as f: json.dump(times, f, indent=2) return update_file_display() # === Validation === def validate_excel_file(file_path, expected_columns): """ Parse and validate a schedule Excel file (provider or MA). Extracts weekly data blocks, normalizes dates/times/locations, and adds Display_Location column. Returns cleaned DataFrame or (None, error_message). """ try: all_sheets = pd.read_excel(file_path, engine='openpyxl', sheet_name=None) if not all_sheets: return None, "No sheets found!" combined_dfs = [] for sheet_name, df in all_sheets.items(): if not all(col in df.columns for col in ['Name', 'Location']): continue if not any(col.startswith('Start_Time') for col in df.columns): continue num_days = sum(1 for col in df.columns if col.startswith('Start_Time')) week_rows = df[df['Name'].str.startswith('Week', na=False)].index.tolist() if not week_rows: continue for week_idx in week_rows: dates = [] for day in range(1, num_days + 1): col = f'Start_Time{day}' if col in df.columns: dates.append(parse_date(df.at[week_idx, col])) else: dates.append(None) next_week = next((idx for idx in week_rows if idx > week_idx), len(df)) provider_df = df.loc[week_idx + 1: next_week - 1] provider_df = provider_df[~provider_df['Name'].eq('Name')] if provider_df.empty: continue temp_dfs = [] for day in range(1, num_days + 1): if day > len(dates) or dates[day - 1] is None: continue start_col = f'Start_Time{day}' end_col = f'End_Time{day}' note_col = f'Note{day}' if start_col not in df.columns or end_col not in df.columns: continue temp = provider_df[['Name', 'Location', start_col, end_col, note_col]].copy() temp['Date'] = dates[day - 1] temp = temp.rename(columns={start_col: 'Start_Time', end_col: 'End_Time', note_col: 'Note'}) temp_dfs.append(temp) if temp_dfs: sheet_df = pd.concat(temp_dfs, ignore_index=True) sheet_df = sheet_df.dropna(subset=['Name']) sheet_df['Location'] = sheet_df['Location'].map(lambda x: LOCATION_MAP.get(x, x) if pd.notna(x) else x) # Add column for display location (may be overridden by note keywords) sheet_df['Display_Location'] = sheet_df.apply(determine_display_location, axis=1) combined_dfs.append(sheet_df) if not combined_dfs: return None, "No valid data found!" final_df = pd.concat(combined_dfs, ignore_index=True) final_df = final_df.drop_duplicates().dropna(subset=["Date"]) final_df = final_df[final_df["Location"] != "Location"] final_df = final_df[final_df["Name"] != "Name"] final_df["Date"] = pd.to_datetime(final_df["Date"]) return final_df, None except Exception as e: return None, f"Error: {str(e)}!" def validate_provider_info(file_path): """ Validate the Provider Information Excel file. Checks required columns and ensures age coverage flags are 0/1. Also maps short location codes to full names. """ try: df = pd.read_excel(file_path, engine='openpyxl') expected = ["Provider", "Last_Name", "Location"] + AGE_LST if not all(col in df.columns for col in expected): return None, f"Missing columns: {expected}" df['Location'] = df['Location'].map(lambda x: LOCATION_MAP.get(x, x) if pd.notna(x) else x) for col in AGE_LST: if not df[col].isin([0, 1]).all(): return None, f"Column {col} must contain only 0 or 1 values!" return df, None except Exception as e: return None, f"Error: {str(e)}!" def safe_time(val): """ Safely convert a value to datetime.time for display and checks. Falls back to parse_time() which includes extensive error handling. """ if pd.isna(val): return None if isinstance(val, dttime): return val return parse_time(val) # === Core Logic === def check_age_coverage(providers_df, provider_info_df, location, date): """ Check which age groups are missing coverage at a specific location on a specific date. Also identifies providers with full, under-18, over-18, or 25+-only coverage. Returns (missing_age_groups, full_coverage_providers, under18_providers, over18_providers, only25_providers) """ df = providers_df[ (providers_df['Date'] == date) & (providers_df['Display_Location'] == location) ].copy() df['start_t'] = df['Start_Time'].apply(safe_time) df['end_t'] = df['End_Time'].apply(safe_time) providers_on_date = df[ df['start_t'].notna() & df['end_t'].notna() & (~df['Note'].str.upper().fillna('').isin(OFF_LST)) ] if providers_on_date.empty or provider_info_df.empty: return AGE_LST, [], [], [], [] working = providers_on_date['Name'].unique() info = provider_info_df[ (provider_info_df['Location'] == location) & (provider_info_df['Provider'].isin(working)) ] missing = [age for age in AGE_LST if not any(info[age] == 1)] full, under, over, only25 = [], [], [], [] for p in working: row = info[info['Provider'] == p] if row.empty: continue r = row.iloc[0] if all(r[age] == 1 for age in AGE_LST): full.append(p) elif all(r[age] == 1 for age in UNDER_18G): under.append(p) elif all(r[age] == 1 for age in OVER_18G): over.append(p) elif r["25+yo"] == 1 and all(r[age] == 0 for age in AGE_LST[:5]): only25.append(p) return missing, full, under, over, only25 def check_overall_age_coverage(providers_df, provider_info_df, date, locations): """ Check age coverage across multiple main locations (Berwyn/Morgan/Western) on a single date. Returns missing age groups and a flag (here always False as it's only used for missing list). """ check_locs = [loc for loc in locations if loc not in NO_AGE_CHECK_LOCATIONS] if not check_locs: return [], False df = providers_df[ (providers_df['Date'] == date) & (providers_df['Display_Location'].isin(check_locs)) ].copy() df['start_t'] = df['Start_Time'].apply(safe_time) df['end_t'] = df['End_Time'].apply(safe_time) df = df[ df['start_t'].notna() & df['end_t'].notna() & (~df['Note'].str.upper().fillna('').isin(OFF_LST)) ] if df.empty or provider_info_df.empty: return AGE_LST, False working = df['Name'].unique() info = provider_info_df[provider_info_df['Provider'].isin(working)] missing = [age for age in AGE_LST if not any(info[age] == 1)] return missing, False def check_provider_location_conflicts(providers_df, date, locations): """ Detect providers scheduled at multiple locations on the same date. Special handling for provider 'DFW' who is allowed at Morgan + Urgent Care. Returns list of conflict tuples. """ df = providers_df[ (providers_df['Date'] == date) & (providers_df['Display_Location'].isin(locations)) ].copy() df['start_t'] = df['Start_Time'].apply(safe_time) df['end_t'] = df['End_Time'].apply(safe_time) df = df[ df['start_t'].notna() & df['end_t'].notna() & (~df['Note'].str.upper().fillna('').isin(OFF_LST)) ] if df.empty: return [] conflicts = [] for provider, loc_count in df.groupby('Name')['Display_Location'].nunique().items(): if loc_count > 1: loc_list = df[df['Name'] == provider]['Display_Location'].unique().tolist() if provider == 'DFW' and set(loc_list) >= {'Morgan', 'Urgent Care'}: conflicts.append((provider, loc_list, 'wen-conflict-warning', 'Provider Wen at both Morgan and Urgent Care!')) else: conflicts.append((provider, loc_list, 'conflict-warning', f'Provider {provider} at: {", ".join(loc_list)}')) return conflicts def check_operation_time_coverage(providers_df, date, location): """ Check if clinic operating hours are fully covered by working providers at a location on a date. Identifies uncovered time gaps (excluding scheduled breaks). Returns list of gap strings (e.g., "08:30 - 09:15"). """ weekday = date.weekday() clinic_start, clinic_end, break_times = get_clinic_hours(location, weekday) if clinic_start is None: return [] df = providers_df[ (providers_df['Date'] == date) & (providers_df['Display_Location'] == location) ].copy() df['start_t'] = df['Start_Time'].apply(safe_time) df['end_t'] = df['End_Time'].apply(safe_time) df = df[ df['start_t'].notna() & df['end_t'].notna() & (~df['Note'].str.upper().fillna('').isin(OFF_LST)) ] if df.empty: # No providers → all time after handling breaks is uncovered gaps = [] current = clinic_start for bs, be in break_times: if current < bs: gaps.append(f"{current.strftime('%H:%M')} - {bs.strftime('%H:%M')}") current = max(current, be) if current < clinic_end: gaps.append(f"{current.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") return gaps # Merge overlapping provider intervals intervals = [(r['start_t'], r['end_t']) for _, r in df.iterrows()] intervals.sort() merged = [] cs, ce = intervals[0] for s, e in intervals[1:]: if s <= ce: ce = max(ce, e) else: merged.append((cs, ce)) cs, ce = s, e merged.append((cs, ce)) # Remove scheduled break periods from coverage operational = [] for s, e in merged: curr = s for bs, be in break_times: if curr < be and e > bs: if curr < bs: operational.append((curr, min(bs, e))) curr = max(curr, be) if curr < e: operational.append((curr, e)) # Find remaining gaps gaps = [] current = clinic_start for bs, be in break_times: for s, e in sorted(operational): if s > current and current < bs: gaps.append(f"{current.strftime('%H:%M')} - {min(s, bs).strftime('%H:%M')}") current = max(current, e) current = max(current, be) if current < clinic_end: for s, e in sorted(operational): if s > current and current < clinic_end: gaps.append(f"{current.strftime('%H:%M')} - {min(s, clinic_end).strftime('%H:%M')}") current = max(current, e) if current < clinic_end: gaps.append(f"{current.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") return gaps def calculate_weekly_hours(providers_df, provider_info_df, start_date, end_date, locations): """ Calculate clinical hours per provider per location per week within the date range. Handles lunch break deductions based on location and special note keywords. Returns two dicts: weekly hours by location and weekly total hours. """ df = providers_df[ (providers_df['Date'] >= start_date) & (providers_df['Date'] <= end_date) & (providers_df['Display_Location'].isin(locations)) ].copy() if df.empty: return {}, {} # Precompute time floats and note flags vectorized def to_float(val): if pd.isna(val): return np.nan if isinstance(val, dttime): return (val.hour + val.minute / 60.0) / 24.0 try: return float(val) except: return np.nan start_f = df['Start_Time'].apply(to_float) end_f = df['End_Time'].apply(to_float) raw_hours = (end_f - start_f) * 24.0 valid = (raw_hours > 0) & start_f.notna() & end_f.notna() df = df[valid].reset_index(drop=True) if df.empty: return {}, {} raw_hours = raw_hours[valid].values start_hour = start_f[valid].values * 24 end_hour = end_f[valid].values * 24 note_upper = df['Note'].fillna('').astype(str).str.upper().str.replace(' ', '').str.replace('-', '').str.replace('/', '') off_mask = note_upper.isin(OFF_LST) | note_upper.str.contains('|'.join(['VACATION', 'FMLA', 'ADMIN', 'PAID_LEAVE', 'CME', 'TEACHING', 'SICK', 'HOLIDAY'])) df = df[~off_mask].reset_index(drop=True) if df.empty: return {}, {} raw_hours = raw_hours[~off_mask.values] start_hour = start_hour[~off_mask.values] end_hour = end_hour[~off_mask.values] has_uc = note_upper[~off_mask].str.contains('UC|UC\+1HT|UC\+1H|UC1HT') has_juarez = note_upper[~off_mask].str.contains('JUAREZ|JUREZ|JUAREZ\+1HT|JUAREZ\+1H|JUREZ\+1HT|JUAREZ1HT') has_no_lun = note_upper[~off_mask].str.contains('NOLUN|NO_LUN') has_30m_lun = note_upper[~off_mask].str.contains('30M_LUN|30MLUN|30MLUNCH|30M_LUNCH') df['week_start'] = df['Date'] - pd.to_timedelta(df['Date'].dt.weekday, unit='D') df['week_end'] = df['week_start'] + pd.Timedelta(days=5) df['week_key'] = 'Week of ' + df['week_start'].dt.strftime('%m/%d/%Y') + ' - ' + df['week_end'].dt.strftime('%m/%d/%Y') df['weekday'] = df['Date'].dt.weekday # Define break arrays (start/end in decimal hours) bmw_breaks = { 0: [(12.5, 13.5)], 1: [(12.5, 13.5)], 2: [(16.0, 17.0)], 3: [(12.5, 13.5)], 4: [(12.5, 13.5)], 5: [(11.5, 12.0)], } uc_breaks = { 0: [(13.0, 14.0)], 1: [(13.0, 14.0)], 2: [(13.0, 14.0)], 3: [(13.0, 14.0)], 4: [(13.0, 14.0)], 5: [], } juarez_base = { 0: [(13.0, 14.0)], 1: [(13.0, 14.0)], 2: [(13.0, 14.0)], 3: [(13.0, 14.0)], 4: [(13.0, 14.0)], } clinical_hours = raw_hours.copy() # Apply deductions vectorized for i in range(len(df)): weekday = df.at[i, 'weekday'] if has_uc.iloc[i]: breaks = uc_breaks.get(weekday, []) elif has_juarez.iloc[i]: breaks = juarez_base.get(weekday, []).copy() if end_hour[i] > 17.5: breaks.append((17.5, 18.0)) elif has_30m_lun.iloc[i]: clinical_hours[i] -= 0.5 continue elif has_no_lun.iloc[i]: continue else: breaks = bmw_breaks.get(weekday, []) if raw_hours[i] < 5.0: continue for b_start, b_end in breaks: overlap = min(end_hour[i], b_end) - max(start_hour[i], b_start) if overlap > 0: clinical_hours[i] -= overlap clinical_hours = np.round(clinical_hours, 2) df['clinical_hours'] = clinical_hours # Aggregate loc_agg = df.groupby(['week_key', 'Display_Location', 'Name'])['clinical_hours'].sum().round(2) total_agg = df.groupby(['week_key', 'Name'])['clinical_hours'].sum().round(2) weekly_loc_hours = defaultdict(lambda: defaultdict(lambda: defaultdict(float))) for (week, loc, prov), hrs in loc_agg.items(): weekly_loc_hours[week][loc][prov] = hrs weekly_totals = defaultdict(lambda: defaultdict(float)) for (week, prov), hrs in total_agg.items(): weekly_totals[week][prov] = hrs return dict(weekly_loc_hours), dict(weekly_totals) # === Main Schedule Generator === def combine_schedules(provider_info_file, provider_files, ma_files, start_date, end_date, check_age_coverage_flag, check_location_conflicts_flag, check_operation_coverage_flag, check_ma_mismatch_flag, show_weekly_hours, selected_locations): """ Core function that processes uploaded files, performs all checks, and generates HTML/PDF schedule. Returns (html_string, html_file_path, pdf_file_path) or error message. """ # Save any newly uploaded files save_files([provider_info_file] if provider_info_file else []) save_files(provider_files or []) save_files(ma_files or []) # Detect uploaded file paths by filename patterns provider_info_path = ma_paths = provider_paths = None for f in os.listdir(UPLOAD_DIR): path = os.path.join(UPLOAD_DIR, f) if not f.endswith(('.xlsx', '.xls')): continue if "provider_info" in f.lower(): provider_info_path = path elif "ma" in f.lower(): ma_paths = ma_paths or [] ma_paths.append(path) else: provider_paths = provider_paths or [] provider_paths.append(path) # Basic validation of required files if not provider_paths: return "

No Provider Schedule files!

", None, None if not provider_info_path: return "

Provider Info required!

", None, None if check_ma_mismatch_flag and not ma_paths: return "

MA files required for mismatch check!

", None, None # Validate and load provider info info_df, err = validate_provider_info(provider_info_path) if err: return f"

{err}

", None, None # Validate and load all provider schedule files prov_dfs = [] for p in provider_paths: df, err = validate_excel_file(p, ['Name', 'Location']) if err: return f"

{err}

", None, None if df is not None: prov_dfs.append(df) if not prov_dfs: return "

No valid provider data!

", None, None providers_df = pd.concat(prov_dfs, ignore_index=True).drop_duplicates() # Load MA schedules if mismatch check is enabled ma_df = pd.DataFrame() if check_ma_mismatch_flag: ma_dfs = [] for p in ma_paths: df, err = validate_excel_file(p, ['Name', 'Location']) if err: return f"

{err}

", None, None if df is not None: ma_dfs.append(df) if ma_dfs: ma_df = pd.concat(ma_dfs, ignore_index=True).drop_duplicates() ma_df['Display_Location'] = ma_df.apply(determine_display_location, axis=1) # Determine which locations to display all_locs = set(providers_df['Display_Location'].unique()) specific_locs = [loc for loc in selected_locations if loc != 'All Locations'] display_locs = {loc for loc in (specific_locs or all_locs) if loc in all_locs} if not display_locs: return "

No valid locations selected!

", None, None # Parse and validate date range try: start_obj = pd.to_datetime(start_date.strip(), format='%m/%d/%y') end_obj = pd.to_datetime(end_date.strip(), format='%m/%d/%y') if start_obj > end_obj: return "

Start date is after end date!

", None, None dates = providers_df['Date'].unique() if dates.size: dates = pd.to_datetime(dates) start_obj = max(start_obj, dates.min()) end_obj = min(end_obj, dates.max()) except: return "

Invalid date format! Use MM/DD/YY

", None, None # Determine whether multi-file checks should be performed bmw_locs = [loc for loc in display_locs if loc in ['Berwyn', 'Morgan', 'Western']] perform_overall = check_age_coverage_flag and len(prov_dfs) > 1 and len(bmw_locs) > 1 perform_conflict = check_location_conflicts_flag and len(prov_dfs) > 1 and len(display_locs) > 1 # Prepare header/footer values loc_str = ", ".join(sorted(display_locs)) gen_time = datetime.now(CHICAGO_TZ).strftime('%I:%M %p CDT, %B %d, %Y') # Begin HTML construction html = f"""Alivio Schedule
""" current = start_obj week_num = 1 page_content = [] week_content = [] current_page_height = 0 # Static day headers (Mon-Sat) — now included on EVERY page day_headers = """
Monday
Tuesday
Wednesday
Thursday
Friday
Saturday
""" # Add placeholder empty days before the first Monday first_monday = start_obj - pd.Timedelta(days=start_obj.weekday()) placeholder_days = (start_obj - first_monday).days for _ in range(placeholder_days): week_content.append('
') # Main loop: iterate through each day in the range while current <= end_obj: if current.weekday() == 6: # Skip Sundays current += pd.Timedelta(days=1) continue # When reaching a new Monday and a week is complete, finalize the week block if current.weekday() == 0 and len(week_content) > 0: finished_week = f'
{week_num}
{"".join(week_content)}
' week_height = estimate_week_height(week_content, display_locs) # Add day headers height only if this is the first week on the current page extra_header = DAY_HEADERS_HEIGHT_MM if len(page_content) == 0 else 0 needed = week_height + extra_header + 1 # +1 mm buffer between weeks # Page break logic if current_page_height + needed > PRINTABLE_HEIGHT_MM: # Close current page (always include headers) html += f'
{day_headers}{"".join(page_content)}
' page_content = [] current_page_height = 0 page_content.append(finished_week) current_page_height += needed week_content = [] week_num += 1 # Build the HTML for the current day day_html = f'
{current.strftime("%m/%d")}' # Global conflict warnings if perform_conflict: for p, locs, cls, msg in check_provider_location_conflicts(providers_df, current, display_locs): day_html += f'
{msg}
' # Check for CLINIC_CLOSE note anywhere has_clinic_close_anywhere = False if 'Note' in providers_df.columns: day_notes = providers_df[providers_df['Date'] == current]['Note'].dropna().astype(str).str.upper() if day_notes.str.contains('CLINIC_CLOSE').any(): has_clinic_close_anywhere = True # Overall age coverage if perform_overall and not has_clinic_close_anywhere: missing, _ = check_overall_age_coverage(providers_df, info_df, current, display_locs) if missing: day_html += f'
Missing: {", ".join(missing)}
' # Per-location details for loc in sorted(display_locs): loc_df = providers_df[(providers_df['Date'] == current) & (providers_df['Display_Location'] == loc)] loc_info = info_df[info_df['Location'] == loc] # Detect holiday or school-closed is_holiday = is_school = False if not loc_df.empty: notes = loc_df['Note'].dropna().str.strip().str.upper().tolist() if notes and all(n == 'HOLIDAY' for n in notes): is_holiday = True elif notes and all(n == 'SCHOOL CLOSED' for n in notes) and loc in NO_AGE_CHECK_LOCATIONS: is_school = True loc_df = loc_df[~((loc_df['Start_Time'].isna()) & (loc_df['End_Time'].isna()) & (loc_df['Note'].isna() | (loc_df['Note'] == '')))] if not loc_df.empty or is_holiday or is_school: day_html += f'
{loc} ' if is_holiday: day_html += '
Holiday! Clinic Closed!
' elif is_school: day_html += '
School Closed!
' else: if is_clinic_closed(providers_df, current, loc): day_html += '
Clinic Closed!
' else: day_html += '
Providers:
' missing, full, under, over, only25 = check_age_coverage(providers_df, info_df, loc, current) for _, r in loc_df.iterrows(): info_row = loc_info[loc_info['Provider'] == r['Name']] name = info_row['Last_Name'].iloc[0] if not info_row.empty else r['Name'] tstr = get_time_string(r) if r['Name'] in full: color = "#ff6347" elif r['Name'] in under: color = "#008000" elif r['Name'] in over: color = "#0000ff" elif r['Name'] in only25: color = "#8E44AD" else: color = "#000000" style = f"font-size:6.8pt;margin:0.2mm;line-height:1.05;color:{color};" if tstr in OFF_LST: style += "text-decoration:line-through;" day_html += f'{name}: {tstr}
' day_html += '
' if check_operation_coverage_flag and loc not in NO_OPERATION_CHECK_LOCATIONS: gaps = check_operation_time_coverage(providers_df, current, loc) if gaps: day_html += f'
Missing: {", ".join(gaps)}
' if check_age_coverage_flag and missing and loc not in NO_AGE_CHECK_LOCATIONS: day_html += f'
Missing: {", ".join(missing)}
' if check_ma_mismatch_flag: ma_loc_df = ma_df[(ma_df['Date'] == current) & (ma_df['Display_Location'] == loc)] ma_loc_df = ma_loc_df[~((ma_loc_df['Start_Time'].isna()) & (ma_loc_df['End_Time'].isna()) & (ma_loc_df['Note'].isna() | (ma_loc_df['Note'] == '')))] if not ma_loc_df.empty: day_html += '
MAs: ' for _, r in ma_loc_df.iterrows(): tstr = get_time_string(r) style = "font-size:6.8pt;margin:0.2mm;line-height:1.05;color:#000;" if tstr in OFF_LST: style += "text-decoration:line-through;" day_html += f'{r["Name"]}: {tstr} ' day_html += '
' prov_count = len(loc_df[loc_df['Start_Time'].notna() & ~loc_df['Note'].str.upper().fillna('').isin(OFF_LST)]) ma_count = len(ma_loc_df[ma_loc_df['Start_Time'].notna() & ~ma_loc_df['Note'].str.upper().fillna('').isin(OFF_LST)]) if not (ma_count == prov_count or ma_count == prov_count + 1): day_html += f'
MA Mismatch: {ma_count} MAs for {prov_count} Providers
' day_html += '
' day_html += '
' week_content.append(day_html) current += pd.Timedelta(days=1) # Finalize remaining week if week_content: finished_week = f'
{week_num}
{"".join(week_content)}
' week_height = estimate_week_height(week_content, display_locs) extra_header = DAY_HEADERS_HEIGHT_MM if len(page_content) == 0 else 0 needed = week_height + extra_header + 1 if current_page_height + needed > PRINTABLE_HEIGHT_MM: html += f'
{day_headers}{"".join(page_content)}
' page_content = [] current_page_height = 0 page_content.append(finished_week) # Close final page if page_content: html += f'
{day_headers}{"".join(page_content)}
' # Add weekly hours table if show_weekly_hours: wh, wt = calculate_weekly_hours(providers_df, info_df, start_obj, end_obj, display_locs) html += '
' for week in wh.keys(): html += f'' html += '' + ''.join(f'' for loc in sorted(display_locs)) + '' providers_in_week = {p for loc_dict in wh[week].values() for p in loc_dict} for prov in sorted(providers_in_week): html += f'' + ''.join(f'' for loc in sorted(display_locs)) + f'' html += '
{week} Clinical Hours
Provider{loc}Total
{prov}{wh[week].get(loc, {}).get(prov, 0.0):.1f}{wt[week].get(prov, 0.0):.1f}
' html += '
' html += "
" # Write HTML and generate PDF EXPORT_DIR = "exports" os.makedirs(EXPORT_DIR, exist_ok=True) today_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") html_filename = f"schedule*{today_str}.html" html_path = os.path.join(EXPORT_DIR, html_filename) with open(html_path, 'w', encoding='utf-8') as f: f.write(html) pdf_filename = f"schedule*{today_str}.pdf" pdf_path = os.path.join(EXPORT_DIR, pdf_filename) try: css = CSS(string=SHARED_CSS.replace('{{locations}}', loc_str) .replace('{{start}}', start_date) .replace('{{end}}', end_date) .replace('{{time}}', gen_time)) HTML(string=html).write_pdf(pdf_path, stylesheets=[css]) except Exception as e: return f"

PDF generation error: {e}

", html_path, None return html, html_path, pdf_path # === Password Check === def check_password(pwd): """ Simple password check to unlock the admin upload/delete panel. """ if pwd == "alivio0000": return gr.update(visible=False), gr.update(visible=True), "" return gr.update(visible=True), gr.update(visible=False), "Incorrect password." # === Gradio Interface === def create_interface(): """ Build the complete Gradio interface with public view tab and password-protected admin tab. """ with gr.Blocks(title="Alivio Schedule Display") as demo: gr.Markdown("# Alivio Schedule Display") gr.Markdown("""Upload the Provider Information Excel and at least one Provider Schedule Excel file.""") gr.Markdown("""Schedules will be generated for the selected locations found in the uploaded provider schedule files, displayed on a single calendar.""") gr.Markdown("""Providers are always displayed in different colors based on age coverage:""") gr.HTML(""" """) with gr.Tabs(): # Public tab – view only, no upload/delete with gr.Tab("View Schedule (Public Access)"): gr.Markdown("## View Generated Schedule") gr.Markdown("**No upload or delete allowed. Uses existing uploaded files.**") dummy_pinfo = gr.File(label="Provider Info", visible=False) dummy_pfiles = gr.File(label="Provider Schedules", file_count="multiple", visible=False) dummy_mafiles = gr.File(label="MA Schedules", file_count="multiple", visible=False) with gr.Row(): sdate_pub = gr.Textbox(label="Start Date", placeholder="06/02/25") edate_pub = gr.Textbox(label="End Date", placeholder="07/05/25") with gr.Row(): c_age_pub = gr.Checkbox(label="Age Coverage Check", value=False) c_op_pub = gr.Checkbox(label="Hours Coverage Check", value=False) c_conf_pub = gr.Checkbox(label="Location Conflict Check", value=True) c_ma_pub = gr.Checkbox(label="Staff Ratio Check", value=False) c_hours_pub = gr.Checkbox(label="Weekly Hours Summary", value=False) locs_pub = gr.CheckboxGroup(label="Locations", choices=AVAILABLE_LOCATIONS, value=['All Locations']) submit_pub = gr.Button("Generate Schedule") output_pub = gr.HTML() with gr.Row(): dl_pdf_pub = gr.File(label="Download PDF") dl_html_pub = gr.File(label="Download HTML") submit_pub.click( combine_schedules, [dummy_pinfo, dummy_pfiles, dummy_mafiles, sdate_pub, edate_pub, c_age_pub, c_conf_pub, c_op_pub, c_ma_pub, c_hours_pub, locs_pub], [output_pub, dl_html_pub, dl_pdf_pub] ) # Admin tab – password protected upload and management with gr.Tab("Admin Panel (Password Required)"): with gr.Column(visible=True) as pwd_section: gr.Markdown("## Enter Password to Access Admin Functions") pwd_in = gr.Textbox(label="Password", type="password") pwd_fb = gr.Textbox(label="Status", interactive=False) pwd_btn = gr.Button("Submit") with gr.Column(visible=False) as admin_app: gr.Markdown("## Upload & Manage Files") with gr.Row(): pinfo = gr.File(label="Provider Info (Required)", file_types=[".xlsx"]) pfiles = gr.File(label="Provider Schedules", file_count="multiple", file_types=[".xlsx"]) mafiles = gr.File(label="MA Schedules", file_count="multiple", file_types=[".xlsx"]) with gr.Row(): gallery = gr.Files(label="Uploaded Files", interactive=False) file_times = gr.Textbox(label="Upload Times", lines=8, interactive=False) dropdown = gr.Dropdown(label="Delete File", choices=[]) del_btn = gr.Button("Delete") with gr.Row(): sdate = gr.Textbox(label="Start Date", placeholder="06/02/25") edate = gr.Textbox(label="End Date", placeholder="07/05/25") with gr.Row(): c_age = gr.Checkbox(label="Age Coverage Check", value=False) c_op = gr.Checkbox(label="Hours Coverage Check", value=False) c_conf = gr.Checkbox(label="Location Conflict Check", value=True) c_ma = gr.Checkbox(label="Staff Ratio Check", value=False) c_hours = gr.Checkbox(label="Weekly Hours Summary", value=False) locs = gr.CheckboxGroup(label="Locations", choices=AVAILABLE_LOCATIONS, value=['All Locations']) submit = gr.Button("Generate") output = gr.HTML() with gr.Row(): dl_pdf = gr.File(label="Download PDF") dl_html = gr.File(label="Download HTML") # Load current file list on startup demo.load(update_file_display, None, [gallery, dropdown, file_times]) # Save files when uploaded pinfo.change(save_files, [pinfo], [gallery, dropdown, file_times]) pfiles.change(save_files, [pfiles], [gallery, dropdown, file_times]) mafiles.change(save_files, [mafiles], [gallery, dropdown, file_times]) # Delete selected file del_btn.click(delete_file, dropdown, [gallery, dropdown, file_times]) # Generate schedule submit.click( combine_schedules, [pinfo, pfiles, mafiles, sdate, edate, c_age, c_conf, c_op, c_ma, c_hours, locs], [output, dl_html, dl_pdf] ) # Password submission handler pwd_btn.click(check_password, pwd_in, [pwd_section, admin_app, pwd_fb]) return demo # === Keep-alive for Hugging Face Space === SPACE_URL = "https://wanwanlin0521-alivio-scheduling-web-app.hf.space" PING_URL = f"{SPACE_URL}/" def keep_alive(): """Background thread that pings the Space every 30 minutes to prevent sleeping.""" while True: try: r = requests.get(PING_URL, timeout=20) print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Keep-alive ping success → {r.status_code}") except Exception as e: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Keep-alive ping failed: {e}") time.sleep(1800) # 30 minutes def start_background_ping(): """Start the keep-alive thread.""" thread = threading.Thread(target=keep_alive, daemon=True) thread.start() print("Keep-alive background thread started (ping every 30 min)") if __name__ == "__main__": demo = create_interface() start_background_ping() demo.launch()