diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,6 +1,4 @@ -# 0815 Version. - -# Imports. +# 0815 Version – Refactored & Optimized import gradio as gr import pandas as pd from datetime import datetime, time @@ -12,1584 +10,750 @@ import shutil import json from weasyprint import HTML, CSS import warnings +from concurrent.futures import ThreadPoolExecutor, as_completed warnings.filterwarnings("ignore") +# ---------------------------------------------------------------------- +# Constants & Config +# ---------------------------------------------------------------------- UPLOAD_DIR = "Uploads" UPLOAD_TIMES_FILE = os.path.join(UPLOAD_DIR, "upload_times.json") CHICAGO_TZ = ZoneInfo("America/Chicago") -if os.path.exists(UPLOAD_DIR) and not os.path.isdir(UPLOAD_DIR): - raise FileExistsError(f"Error: '{UPLOAD_DIR}' exists as a file, not a directory. Please rename or remove the 'Uploads' file and try again.") os.makedirs(UPLOAD_DIR, exist_ok=True) +if os.path.exists(UPLOAD_DIR) and not os.path.isdir(UPLOAD_DIR): + raise FileExistsError(f"'{UPLOAD_DIR}' exists as a file. Remove or rename it.") AGE_LST = ["Newborn-5mo", "6mo-9yo", "10-17yo", "18-20yo", "21-24yo", "25+yo"] OFF_LST = ['OFF', 'VACATION', 'FMLA', 'ADMIN', 'PAID_LEAVE', 'CME', 'TEACHING', 'SICK', 'HOLIDAY'] UNDER_18G = ["Newborn-5mo", "6mo-9yo", "10-17yo"] OVER_18G = ["18-20yo", "21-24yo", "25+yo"] -AVAILABLE_LOCATIONS = ['Berwyn', 'Juarez', 'LVHS', 'Morgan', 'Orozco', 'Western', 'Urgent Care', 'Psych', "OB/Gynecology", 'All Locations'] + +AVAILABLE_LOCATIONS = [ + 'Berwyn', 'Juarez', 'LVHS', 'Morgan', 'Orozco', 'Western', + 'Urgent Care', 'Psych', "OB/Gynecology", 'All Locations' +] NO_AGE_CHECK_LOCATIONS = ['Juarez', 'LVHS', 'Orozco', 'Urgent Care', 'Psych', "OB/Gynecology"] -NO_OPERATION_CHECK_LOCATIONS = ['Juarez', 'LVHS', 'Orozco', 'Urgent Care', 'Psych', "OB/Gynecology"] -LOCATION_MAP = {'B': 'Berwyn', 'J': 'Juarez', 'L': 'LVHS', 'M': 'Morgan', 'O': 'Orozco', 'W': 'Western', 'UC': 'Urgent Care', - 'PSY/M': 'Psych', 'PSY/B': 'Psych', "OB/B": "OB/Gynecology", "OB/M": "OB/Gynecology", "OB/W": "OB/Gynecology", "OB": "OB/Gynecology"} +NO_OPERATION_CHECK_LOCATIONS = NO_AGE_CHECK_LOCATIONS.copy() + +LOCATION_MAP = { + 'B': 'Berwyn', 'J': 'Juarez', 'L': 'LVHS', 'M': 'Morgan', 'O': 'Orozco', 'W': 'Western', + 'UC': 'Urgent Care', 'PSY/M': 'Psych', 'PSY/B': 'Psych', + "OB/B": "OB/Gynecology", "OB/M": "OB/Gynecology", "OB/W": "OB/Gynecology", "OB": "OB/Gynecology" +} + +# Clinic hours per location & weekday +CLINIC_HOURS = { + 'Berwyn': { (0,1,3,4): (time(8,30), time(17,30), [(12,30),(13,30)]), + 2: (time(13,0), time(20,0), [(16,0),(17,0)]), + 5: (time(8,30), time(15,0), [(11,30),(12,0)]) }, + 'Morgan': { **{k: v for k, v in CLINIC_HOURS.get('Berwyn', {}).items()} }, + 'Western': { **{k: v for k, v in CLINIC_HOURS.get('Berwyn', {}).items()} }, + 'Urgent Care': { (0,1,3,4): (time(9,0), time(18,0), [(13,0),(14,0)]), + 2: (time(9,0), time(18,0), [(13,0),(14,0)]), + 5: (time(9,0), time(13,30), []) }, + 'Juarez': { (0,1,2,3,4): (time(8,30), time(16,0), [(13,0),(14,0)]) }, + 'Orozco': { (0,1,2,3,4): (time(8,0), time(16,30), []) }, + 'LVHS': { (0,1,2,3): (time(8,30), time(16,0), [(12,0),(13,0)]), + 4: (time(12,0), time(13,0), []) }, + 'Psych': { (0,1,3,4): (None, None, [(12,30),(13,30)]), + 2: (None, None, [(16,0),(17,0)]), + 5: (None, None, []) } +} +# ---------------------------------------------------------------------- +# Helper Functions +# ---------------------------------------------------------------------- def get_time_string(row): if row['Location'] == "OB/Gynecology" and pd.isna(row['Start_Time']) and pd.isna(row['End_Time']) and pd.notna(row.get('Note')): - note = row['Note'].strip() - return note + return row['Note'].strip() if pd.isna(row['Start_Time']) and pd.isna(row['End_Time']) and pd.notna(row.get('Note')): note = row['Note'].strip().upper() - if note == 'SCHOOL CLOSED': - return "OFF" - if note in OFF_LST: - return row['Note'] + return "OFF" if note == 'SCHOOL CLOSED' else (row['Note'] if note in OFF_LST else "OFF") if pd.notna(row['Start_Time']) and pd.notna(row['End_Time']): time_str = f"{row['Start_Time'].strftime('%H:%M')} - {row['End_Time'].strftime('%H:%M')}" - if pd.notna(row.get('Note')) and row['Note'].strip() != '': - note = row['Note'].strip().upper() - if note == 'SCHOOL CLOSED': - return time_str - return f"{time_str} ({row['Note']})" - return time_str + note = row.get('Note', '').strip().upper() + return f"{time_str} ({row['Note']})" if note and note != 'SCHOOL CLOSED' else time_str return "OFF" def parse_date(val): if pd.isna(val) or val is None: return None - try: - if isinstance(val, (int, float)): - # Handle Excel serial numbers - return pd.Timestamp('1899-12-30') + pd.Timedelta(days=val) - elif isinstance(val, str): - # Try different date formats - for fmt in ['%m/%d/%y', '%m/%d/%Y', '%Y-%m-%d']: - try: - return pd.to_datetime(val, format=fmt) - except ValueError: - continue - # Fallback to general parsing - return pd.to_datetime(val) - elif isinstance(val, (pd.Timestamp, datetime)): - return val - return None - except (ValueError, TypeError) as e: - print(f"Date parsing error for value {val}: {str(e)}") + if isinstance(val, (int, float)): + return pd.Timestamp('1899-12-30') + pd.Timedelta(days=val) + if isinstance(val, str): + for fmt in ['%m/%d/%y', '%m/%d/%Y', '%Y-%m-%d']: + try: + return pd.to_datetime(val, format=fmt) + except ValueError: + continue + return pd.to_datetime(val, errors='coerce') + +def parse_time(val): + if pd.isna(val) or val in ["", "OFF", "nan", "NaT"]: return None + if isinstance(val, time): + return val + if isinstance(val, datetime): + return val.time() + try: + return pd.to_datetime(val, format='%H:%M:%S', errors='coerce').time() + except: + try: + return pd.to_datetime(val, format='%H:%M', errors='coerce').time() + except: + try: + hours = float(val) * 24 + h, m = int(hours), int((hours - int(hours)) * 60) + return time(h, m) if 0 <= h < 24 and 0 <= m < 60 else None + except: + return None -def validate_excel_file(file_path, expected_columns): +# ---------------------------------------------------------------------- +# Excel Parsing (Parallel) +# ---------------------------------------------------------------------- +def _parse_excel_file(path): try: - all_sheets = pd.read_excel(file_path, engine='openpyxl', sheet_name=None) + all_sheets = pd.read_excel(path, engine='openpyxl', sheet_name=None) if not all_sheets: - return None, "No sheets found in the Excel file!" - - combined_dfs = [] - + return None, "No sheets found." + combined = [] for sheet_name, df in all_sheets.items(): - print(f"Processing sheet: {sheet_name}") - base_columns = ['Name', 'Location'] - if not all(col in df.columns for col in base_columns): - return None, f"Missing required columns in sheet '{sheet_name}'. Expected at least: {base_columns}" - - if 'Start_Time1' not in df.columns: - return None, f"No valid time columns (e.g., Start_Time1) found in sheet '{sheet_name}'!" - - num_days = sum(1 for col in df.columns if col.startswith('Start_Time')) - + if not {'Name', 'Location'}.issubset(df.columns): + return None, f"Missing Name/Location in '{sheet_name}'." + if not any(col.startswith('Start_Time') for col in df.columns): + return None, f"No Start_Time columns in '{sheet_name}'." + num_days = sum(1 for c in df.columns if c.startswith('Start_Time')) week_rows = df[df['Name'].str.startswith('Week', na=False)].index.tolist() if not week_rows: - print(f"No week rows found in sheet '{sheet_name}'") continue - for week_idx in week_rows: dates = [] - for day in range(1, num_days + 1): - col = f'Start_Time{day}' - if col in df.columns: - date_val = df.at[week_idx, col] - parsed_date = parse_date(date_val) - dates.append(parsed_date) - print(f"Parsed date for {col} in sheet '{sheet_name}': {parsed_date}") - else: - dates.append(None) - - next_week = next((idx for idx in week_rows if idx > week_idx), len(df)) - provider_df = df.loc[week_idx + 1: next_week - 1] - provider_df = provider_df[~provider_df['Name'].eq('Name')] - - if provider_df.empty: - print(f"No provider data found for week starting at index {week_idx} in sheet '{sheet_name}'") + for d in range(1, num_days + 1): + col = f'Start_Time{d}' + dates.append(parse_date(df.at[week_idx, col]) if col in df.columns else None) + next_week = next((i for i in week_rows if i > week_idx), len(df)) + prov = df.loc[week_idx + 1: next_week - 1].copy() + prov = prov[~prov['Name'].eq('Name')] + if prov.empty: continue - - temp_dfs = [] - for day in range(1, num_days + 1): - if day > len(dates) or dates[day - 1] is None: + day_dfs = [] + for d in range(1, num_days + 1): + if d > len(dates) or dates[d-1] is None: continue - - start_col = f'Start_Time{day}' - end_col = f'End_Time{day}' - note_col = f'Note{day}' - - if start_col not in df.columns or end_col not in df.columns: + s, e, n = f'Start_Time{d}', f'End_Time{d}', f'Note{d}' + if not {s, e}.issubset(df.columns): continue - - temp = provider_df[['Name', 'Location', start_col, end_col, note_col]].copy() - temp['Date'] = dates[day - 1] - temp = temp.rename(columns={ - start_col: 'Start_Time', - end_col: 'End_Time', - note_col: 'Note' - }) - temp_dfs.append(temp) - - if temp_dfs: - sheet_week_df = pd.concat(temp_dfs, ignore_index=True) - sheet_week_df = sheet_week_df.dropna(subset=['Name']) - sheet_week_df['Location'] = sheet_week_df['Location'].map(lambda x: LOCATION_MAP.get(x, x) if pd.notna(x) else x) - - for time_col in ['Start_Time', 'End_Time']: - if time_col in sheet_week_df.columns: - sheet_week_df[time_col] = sheet_week_df[time_col].apply(parse_time) - - combined_dfs.append(sheet_week_df) - - if not combined_dfs: - return None, "No valid data found across all sheets! Check date formats and ensure data exists for the specified range." - final_df = pd.concat(combined_dfs, ignore_index=True) - final_df = final_df.drop_duplicates() - final_df = final_df.dropna(subset=["Date"]) - final_df = final_df[final_df["Location"] != "Location"] - final_df = final_df[final_df["Name"] != "Name"] - final_df["Date"] = pd.to_datetime(final_df["Date"]) - - if final_df.empty: - return None, "No valid data after filtering! Ensure the date range matches the data in the Excel files." - - print(f"Final DataFrame shape: {final_df.shape}") - print(f"Available dates: {final_df['Date'].unique()}") - return final_df, None + temp = prov[['Name', 'Location', s, e, n]].copy() + temp['Date'] = dates[d-1] + temp = temp.rename(columns={s: 'Start_Time', e: 'End_Time', n: 'Note'}) + day_dfs.append(temp) + if day_dfs: + sheet_df = pd.concat(day_dfs, ignore_index=True).dropna(subset=['Name']) + sheet_df['Location'] = sheet_df['Location'].map(lambda x: LOCATION_MAP.get(x, x) if pd.notna(x) else x) + for col in ['Start_Time', 'End_Time']: + if col in sheet_df.columns: + sheet_df[col] = sheet_df[col].apply(parse_time) + combined.append(sheet_df) + if not combined: + return None, "No valid data." + df = pd.concat(combined, ignore_index=True) + df = df.drop_duplicates().dropna(subset=['Date']) + df = df[(df['Location'] != 'Location') & (df['Name'] != 'Name')] + df['Date'] = pd.to_datetime(df['Date']) + return df, None except Exception as e: - return None, f"Error reading Excel: {str(e)}!" + return None, f"Error: {str(e)}" -def parse_time(value): - if value in ["nan", "NaT", "", "OFF"] or pd.isna(value): - return None - try: - if isinstance(value, (datetime, time)): - return value if isinstance(value, time) else value.time() - try: - return pd.to_datetime(value, format='%H:%M:%S').time() - except ValueError: - try: - return pd.to_datetime(value, format='%H:%M').time() - except ValueError: - try: - hours = float(value) * 24 - if hours < 0 or hours > 24: - return None - hour = int(hours) - minute = int((hours - hour) * 60) - if hour >= 24 or minute >= 60: - return None - return time(hour, minute) - except (ValueError, TypeError): - return None - except (ValueError, TypeError): - return None +def _parse_excel_files(paths): + results = [] + with ThreadPoolExecutor() as exec: + futures = [exec.submit(_parse_excel_file, p) for p in paths] + for f in as_completed(futures): + df, err = f.result() + if err: + return None, err + if df is not None: + results.append(df) + return (pd.concat(results, ignore_index=True).drop_duplicates() if results else None), None -def validate_provider_info(file_path): +# ---------------------------------------------------------------------- +# Provider Info Validation +# ---------------------------------------------------------------------- +def validate_provider_info(path): try: - df = pd.read_excel(file_path, engine='openpyxl') - expected_columns = ["Provider", "Last_Name", "Location"] + AGE_LST - if not all(col in df.columns for col in expected_columns): - return None, f"Invalid columns in Provider_Info. Expected: {expected_columns}!" + df = pd.read_excel(path, engine='openpyxl') + expected = ["Provider", "Last_Name", "Location"] + AGE_LST + if not all(c in df.columns for c in expected): + return None, f"Missing columns. Need: {expected}" df['Location'] = df['Location'].map(lambda x: LOCATION_MAP.get(x, x) if pd.notna(x) else x) for col in AGE_LST: if not df[col].isin([0, 1]).all(): - return None, f"Column {col} must contain only 0 or 1!" + return None, f"Column {col} must be 0/1 only." return df, None except Exception as e: - return None, f"Error reading Provider_Info Excel: {str(e)}!" + return None, f"Error: {str(e)}" +# ---------------------------------------------------------------------- +# File Management +# ---------------------------------------------------------------------- def save_files(file_list): if not file_list: return update_file_display() if not isinstance(file_list, list): file_list = [file_list] - upload_times = {} - if os.path.exists(UPLOAD_TIMES_FILE): - with open(UPLOAD_TIMES_FILE, 'r') as f: - upload_times = json.load(f) - for file in file_list: - if file and hasattr(file, 'name') and file.name and file.name.endswith(('.xlsx', '.xls')): - filename = os.path.basename(file.name) - dest_path = os.path.join(UPLOAD_DIR, filename) - if os.path.abspath(file.name) != os.path.abspath(dest_path): - shutil.copy(file.name, dest_path) - upload_time = datetime.now(CHICAGO_TZ).isoformat() - upload_times[filename] = upload_time - with open(UPLOAD_TIMES_FILE, 'w') as f: - json.dump(upload_times, f, indent=2) + times = json.load(open(UPLOAD_TIMES_FILE, 'r')) if os.path.exists(UPLOAD_TIMES_FILE) else {} + for f in file_list: + if f and hasattr(f, 'name') and f.name.endswith(('.xlsx', '.xls')): + dest = os.path.join(UPLOAD_DIR, os.path.basename(f.name)) + if os.path.abspath(f.name) != os.path.abspath(dest): + shutil.copy(f.name, dest) + times[os.path.basename(f.name)] = datetime.now(CHICAGO_TZ).isoformat() + json.dump(times, open(UPLOAD_TIMES_FILE, 'w'), indent=2) return update_file_display() def update_file_display(): files = sorted([f for f in os.listdir(UPLOAD_DIR) if f.endswith(('.xlsx', '.xls'))]) - file_info = [os.path.join(UPLOAD_DIR, f) for f in files] - upload_times = {} - if os.path.exists(UPLOAD_TIMES_FILE): - with open(UPLOAD_TIMES_FILE, 'r') as f: - upload_times = json.load(f) - file_times = [] + paths = [os.path.join(UPLOAD_DIR, f) for f in files] + times = json.load(open(UPLOAD_TIMES_FILE, 'r')) if os.path.exists(UPLOAD_TIMES_FILE) else {} + display = [] for f in files: - if f in upload_times: - upload_time = datetime.fromisoformat(upload_times[f]).astimezone(CHICAGO_TZ) - file_times.append(f"{f}: Uploaded on {upload_time.strftime('%Y-%m-%d %I:%M %p CDT')}") - else: - file_times.append(f"{f}: Upload time unknown") - file_times_display = "\n".join(file_times) if file_times else "No files uploaded." - return file_info, gr.update(choices=files or [], value=None), file_times_display + t = datetime.fromisoformat(times[f]).astimezone(CHICAGO_TZ) if f in times else None + display.append(f"{f}: Uploaded on {t.strftime('%Y-%m-%d %I:%M %p CDT')}" if t else f"{f}: Upload time unknown") + return paths, gr.update(choices=files, value=None), "\n".join(display) if display else "No files." -def delete_file(filename): - if filename: - file_path = os.path.join(UPLOAD_DIR, filename) - if os.path.exists(file_path): - os.remove(file_path) - if os.path.exists(UPLOAD_TIMES_FILE): - with open(UPLOAD_TIMES_FILE, 'r') as f: - upload_times = json.load(f) - if filename in upload_times: - del upload_times[filename] - with open(UPLOAD_TIMES_FILE, 'w') as f: - json.dump(upload_times, f, indent=2) +def delete_file(name): + if name: + path = os.path.join(UPLOAD_DIR, name) + if os.path.exists(path): + os.remove(path) + times = json.load(open(UPLOAD_TIMES_FILE, 'r')) if os.path.exists(UPLOAD_TIMES_FILE) else {} + times.pop(name, None) + json.dump(times, open(UPLOAD_TIMES_FILE, 'w'), indent=2) return update_file_display() -def check_age_coverage(providers_df, provider_info_df, location, date): +# ---------------------------------------------------------------------- +# Check Functions +# ---------------------------------------------------------------------- +def check_age_coverage(providers_df, info_df, loc, date): date_key = pd.to_datetime(date).strftime('%m/%d/%y') - providers_on_date = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) - ] - if providers_on_date.empty or provider_info_df.empty: + on_date = providers_df[(providers_df['Date'] == date_key) & (providers_df['Location'] == loc) & + providers_df['Start_Time'].notna() & providers_df['End_Time'].notna() & + ~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)] + if on_date.empty or info_df.empty: return AGE_LST, [], [], [], [] - working_providers = providers_on_date['Name'].unique() - provider_info_filtered = provider_info_df[ - (provider_info_df['Location'] == location) & - (provider_info_df['Provider'].isin(working_providers)) - ] - missing_ages = [age_group for age_group in AGE_LST if not any(provider_info_filtered[age_group] == 1)] - full_age_providers = [] - under_18_providers = [] - over_18_providers = [] - only_25_plus_providers = [] - for provider in working_providers: - provider_info = provider_info_filtered[provider_info_filtered['Provider'] == provider] - if provider_info.empty: - continue - if all(provider_info[age_group].iloc[0] == 1 for age_group in AGE_LST): - full_age_providers.append(provider) - elif all(provider_info[age_group].iloc[0] == 1 for age_group in UNDER_18G): - under_18_providers.append(provider) - elif all(provider_info[age_group].iloc[0] == 1 for age_group in OVER_18G): - over_18_providers.append(provider) - elif (provider_info["25+yo"].iloc[0] == 1 and - all(provider_info[age_group].iloc[0] == 0 for age_group in ["Newborn-5mo", "6mo-9yo", "10-17yo", "18-20yo", "21-24yo"])): - only_25_plus_providers.append(provider) - return missing_ages, full_age_providers, under_18_providers, over_18_providers, only_25_plus_providers + working = on_date['Name'].unique() + info = info_df[(info_df['Location'] == loc) & info_df['Provider'].isin(working)] + missing = [a for a in AGE_LST if not (info[a] == 1).any()] + full, under, over, only25 = [], [], [], [] + for p in working: + row = info[info['Provider'] == p] + if row.empty: continue + r = row.iloc[0] + if all(r[a] == 1 for a in AGE_LST): full.append(p) + elif all(r[a] == 1 for a in UNDER_18G): under.append(p) + elif all(r[a] == 1 for a in OVER_18G): over.append(p) + elif r["25+yo"] == 1 and all(r[a] == 0 for a in AGE_LST[:-1]): only25.append(p) + return missing, full, under, over, only25 -def check_overall_age_coverage(providers_df, provider_info_df, date, locations): +def check_overall_age_coverage(providers_df, info_df, date, locs): date_key = pd.to_datetime(date).strftime('%m/%d/%y') - is_holiday = False - for location in locations: - loc_providers_df = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) - ] - if not loc_providers_df.empty: - all_provider_notes = loc_providers_df['Note'].dropna().str.strip().str.upper().tolist() - if all_provider_notes and all(note == 'HOLIDAY' for note in all_provider_notes): - is_holiday = True - break - if is_holiday: + holiday = any( + all(row['Note'].strip().upper() == 'HOLIDAY' for _, row in providers_df[ + (providers_df['Date'] == date_key) & (providers_df['Location'] == loc) + ].iterrows()) for loc in locs + ) + if holiday: return [], True - check_locations = [loc for loc in locations if loc not in NO_AGE_CHECK_LOCATIONS] - if not check_locations: + check_locs = [l for l in locs if l not in NO_AGE_CHECK_LOCATIONS] + if not check_locs: return [], False - providers_on_date = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'].isin(check_locations)) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) + on_date = providers_df[ + (providers_df['Date'] == date_key) & providers_df['Location'].isin(check_locs) & + providers_df['Start_Time'].notna() & providers_df['End_Time'].notna() & + ~providers_df['Note'].str.upper().fillna('').isin(OFF_LST) ] - if providers_on_date.empty or provider_info_df.empty: + if on_date.empty or info_df.empty: return AGE_LST, False - working_providers = providers_on_date['Name'].unique() - provider_info_filtered = provider_info_df[ - provider_info_df['Provider'].isin(working_providers) - ] - missing_ages = [age_group for age_group in AGE_LST if not any(provider_info_filtered[age_group] == 1)] - return missing_ages, False + working = on_date['Name'].unique() + info = info_df[info_df['Provider'].isin(working)] + missing = [a for a in AGE_LST if not (info[a] == 1).any()] + return missing, False -def check_provider_location_conflicts(providers_df, date, locations): +def check_provider_location_conflicts(providers_df, date, locs): date_key = pd.to_datetime(date).strftime('%m/%d/%y') - providers_on_date = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'].isin(locations)) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) + on_date = providers_df[ + (providers_df['Date'] == date_key) & providers_df['Location'].isin(locs) & + providers_df['Start_Time'].notna() & providers_df['End_Time'].notna() & + ~providers_df['Note'].str.upper().fillna('').isin(OFF_LST) ] - if providers_on_date.empty: + if on_date.empty: return [] - provider_locations = providers_on_date.groupby('Name')['Location'].nunique() conflicts = [] - for provider, num_locations in provider_locations.items(): - if num_locations > 1: - conflict_locations = providers_on_date[providers_on_date['Name'] == provider]['Location'].unique().tolist() - if provider == 'DFW' and set(conflict_locations).issuperset(['Morgan', 'Urgent Care']): - conflicts.append((provider, conflict_locations, 'wen-conflict-warning', 'Provider Wen at both Morgan and Urgent Care!')) + for prov, locs_set in on_date.groupby('Name')['Location'].apply(set).items(): + if len(locs_set) > 1: + if prov == 'DFW' and {'Morgan', 'Urgent Care'} <= locs_set: + conflicts.append((prov, list(locs_set), 'wen-conflict-warning', 'Provider Wen at both Morgan and Urgent Care!')) else: - conflicts.append((provider, conflict_locations, 'conflict-warning', f'Provider {provider} scheduled at: {", ".join(conflict_locations)}')) + conflicts.append((prov, list(locs_set), 'conflict-warning', f'Provider {prov} at: {", ".join(locs_set)}')) return conflicts -def check_operation_time_coverage(providers_df, date, location): +def get_clinic_hours(loc, weekday): + config = CLINIC_HOURS.get(loc, {}) + for days, (start, end, breaks) in config.items(): + if isinstance(days, tuple): + if weekday in days: + return start, end, breaks + elif weekday == days: + return start, end, breaks + return None, None, [] + +def check_operation_time_coverage(providers_df, date, loc): date_key = pd.to_datetime(date).strftime('%m/%d/%y') weekday = pd.to_datetime(date).weekday() - - if location in ['Berwyn', 'Morgan', 'Western']: - if weekday in [0, 1, 3, 4]: - clinic_start = time(8, 30) - clinic_end = time(17, 30) - break_times = [(time(12, 30), time(13, 30))] - elif weekday == 2: - clinic_start = time(13, 0) - clinic_end = time(20, 0) - break_times = [(time(16, 0), time(17, 0))] - elif weekday == 5: - clinic_start = time(8, 30) - clinic_end = time(15, 0) - break_times = [(time(11, 30), time(12, 00))] - else: - return [] - elif location == 'Urgent Care': - if weekday in [0, 1, 3, 4]: - clinic_start = time(9, 0) - clinic_end = time(18, 0) - break_times = [(time(13, 0), time(14, 0))] - elif weekday == 2: - clinic_start = time(9, 0) - clinic_end = time(18, 0) - break_times = [(time(13, 0), time(14, 0))] - elif weekday == 5: - clinic_start = time(9, 0) - clinic_end = time(13, 30) - break_times = [] - else: - return [] - elif location == 'Juarez': - if weekday in [0, 1, 2, 3, 4]: - clinic_start = time(8, 30) - clinic_end = time(16, 0) - break_times = [(time(13, 0), time(14, 0))] - else: - return [] - elif location == 'Orozco': - if weekday in [0, 1, 2, 3, 4]: - clinic_start = time(8, 0) - clinic_end = time(16, 30) - break_times = [] - else: - return [] - elif location == 'LVHS': - if weekday in [0, 1, 2, 3]: - clinic_start = time(8, 30) - clinic_end = time(16, 0) - break_times = [(time(12, 0), time(13, 0))] - elif weekday == 4: - clinic_start = time(12, 0) - clinic_end = time(13, 0) - break_times = [] - else: - return [] - elif location == 'Psych': - providers_on_date = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) - ] - if providers_on_date.empty: - return [] - if weekday in [0, 1, 3, 4]: - break_times = [(time(12, 30), time(13, 30))] - elif weekday == 2: - break_times = [(time(16, 0), time(17, 0))] - elif weekday == 5: - break_times = [] - else: - return [] - start_times = [row['Start_Time'] for _, row in providers_on_date.iterrows() if row['Start_Time']] - end_times = [row['End_Time'] for _, row in providers_on_date.iterrows() if row['End_Time']] - if not start_times or not end_times: - return [] - clinic_start = min(start_times) - clinic_end = max(end_times) - else: + clinic_start, clinic_end, break_times = get_clinic_hours(loc, weekday) + if clinic_start is None: return [] - - providers_on_date = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) + on_date = providers_df[ + (providers_df['Date'] == date_key) & (providers_df['Location'] == loc) & + providers_df['Start_Time'].notna() & providers_df['End_Time'].notna() & + ~providers_df['Note'].str.upper().fillna('').isin(OFF_LST) ] - if providers_on_date.empty: + if on_date.empty: gaps = [] - current_time = clinic_start - for break_start, break_end in break_times: - if current_time < break_start: - gaps.append(f"{current_time.strftime('%H:%M')} - {break_start.strftime('%H:%M')}") - current_time = max(current_time, break_end) - if current_time < clinic_end: - gaps.append(f"{current_time.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") - return gaps if gaps else [] - - intervals = [] - for _, row in providers_on_date.iterrows(): - start = row['Start_Time'] - end = row['End_Time'] - if start and end: - intervals.append((start, end)) - - intervals.sort(key=lambda x: x[0]) - merged_intervals = [] - current_start = None - current_end = None - for start, end in intervals: - if current_start is None: - current_start = start - current_end = end - elif start <= current_end: - current_end = max(current_end, end) + cur = clinic_start + for bs, be in break_times: + if cur < bs: + gaps.append(f"{cur.strftime('%H:%M')} - {bs.strftime('%H:%M')}") + cur = max(cur, be) + if cur < clinic_end: + gaps.append(f"{cur.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") + return gaps + intervals = [(r['Start_Time'], r['End_Time']) for _, r in on_date.iterrows() if r['Start_Time'] and r['End_Time']] + intervals.sort() + merged = [] + cs, ce = None, None + for s, e in intervals: + if cs is None: + cs, ce = s, e + elif s <= ce: + ce = max(ce, e) else: - merged_intervals.append((current_start, current_end)) - current_start = start - current_end = end - if current_start is not None: - merged_intervals.append((current_start, current_end)) - - operational_intervals = [] - for start, end in merged_intervals: - current_start = start - for break_start, break_end in break_times: - if current_start < break_end and end > break_start: - if current_start < break_start: - operational_intervals.append((current_start, min(break_start, end))) - current_start = max(current_start, break_end) - if current_start < end: - operational_intervals.append((current_start, end)) - + merged.append((cs, ce)) + cs, ce = s, e + if cs: + merged.append((cs, ce)) gaps = [] - current_time = clinic_start - for break_start, break_end in break_times: - for start, end in sorted(operational_intervals, key=lambda x: x[0]): - if start > current_time and current_time < break_start: - gaps.append(f"{current_time.strftime('%H:%M')} - {min(start, break_start).strftime('%H:%M')}") - current_time = max(current_time, end) - current_time = max(current_time, break_end) - if current_time >= clinic_end: - break - if current_time < clinic_end: - for start, end in sorted(operational_intervals, key=lambda x: x[0]): - if start > current_time and current_time < clinic_end: - gaps.append(f"{current_time.strftime('%H:%M')} - {min(start, clinic_end).strftime('%H:%M')}") - current_time = max(current_time, end) - if current_time < clinic_end: - gaps.append(f"{current_time.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") - + cur = clinic_start + for bs, be in break_times: + for s, e in merged: + if s > cur and cur < bs: + gaps.append(f"{cur.strftime('%H:%M')} - {min(s, bs).strftime('%H:%M')}") + cur = max(cur, e) + cur = max(cur, be) + if cur < clinic_end: + for s, e in merged: + if s > cur and cur < clinic_end: + gaps.append(f"{cur.strftime('%H:%M')} - {min(s, clinic_end).strftime('%H:%M')}") + cur = max(cur, e) + if cur < clinic_end: + gaps.append(f"{cur.strftime('%H:%M')} - {clinic_end.strftime('%H:%M')}") return gaps -def calculate_weekly_hours(providers_df, provider_info_df, start_date, end_date, locations): - weekly_hours = {} - weekly_totals = {} - current_date = start_date - week_number = 1 - week_start = start_date - while current_date <= end_date: - if current_date.weekday() == 6: - current_date += pd.Timedelta(days=1) +# ---------------------------------------------------------------------- +# Weekly Hours +# ---------------------------------------------------------------------- +def calculate_weekly_hours(providers_df, info_df, start, end, locs): + weekly, totals = {}, {} + cur = start + week_num = 1 + while cur <= end: + if cur.weekday() == 6: + cur += pd.Timedelta(days=1) continue - if current_date.weekday() == 0 and current_date != start_date: - week_number += 1 - week_start = current_date - date_key = current_date.strftime('%m/%d/%y') - weekday = current_date.weekday() - - for location in sorted(locations): - if location in ['Berwyn', 'Morgan', 'Western']: - if weekday in [0, 1, 3, 4]: - clinic_start = time(8, 30) - clinic_end = time(17, 30) - break_times = [(time(12, 30), time(13, 30))] - elif weekday == 2: - clinic_start = time(13, 0) - clinic_end = time(20, 0) - break_times = [(time(16, 0), time(17, 0))] - elif weekday == 5: - clinic_start = time(8, 30) - clinic_end = time(15, 0) - break_times = [(time(11, 30), time(12, 00))] - else: - break_times = [] - continue - elif location == 'Urgent Care': - if weekday in [0, 1, 3, 4]: - clinic_start = time(9, 0) - clinic_end = time(18, 0) - break_times = [(time(13, 0), time(14, 0))] - elif weekday == 2: - clinic_start = time(9, 0) - clinic_end = time(18, 0) - break_times = [(time(13, 0), time(14, 0))] - elif weekday == 5: - clinic_start = time(9, 0) - clinic_end = time(13, 30) - break_times = [] - else: - break_times = [] - continue - elif location == 'Juarez': - if weekday in [0, 1, 2, 3, 4]: - clinic_start = time(8, 30) - clinic_end = time(16, 0) - break_times = [(time(13, 0), time(14, 0))] - else: - break_times = [] - continue - elif location == 'Orozco': - if weekday in [0, 1, 2, 3, 4]: - clinic_start = time(8, 0) - clinic_end = time(16, 30) - break_times = [] - else: - break_times = [] - continue - elif location == 'LVHS': - if weekday in [0, 1, 2, 3]: - clinic_start = time(8, 30) - clinic_end = time(16, 0) - break_times = [(time(12, 0), time(13, 0))] - elif weekday == 4: - clinic_start = time(12, 0) - clinic_end = time(13, 0) - break_times = [] - else: - break_times = [] - continue - elif location == 'Psych': - loc_providers_df = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) - ] - if loc_providers_df.empty: - break_times = [] - continue - if weekday in [0, 1, 3, 4]: - break_times = [(time(12, 30), time(13, 30))] - elif weekday == 2: - break_times = [(time(16, 0), time(17, 0))] - elif weekday == 5: - break_times = [] - else: - break_times = [] - continue - start_times = [row['Start_Time'] for _, row in loc_providers_df.iterrows() if row['Start_Time']] - end_times = [row['End_Time'] for _, row in loc_providers_df.iterrows() if row['End_Time']] - if not start_times or not end_times: - break_times = [] - continue - clinic_start = min(start_times) - clinic_end = max(end_times) - else: - break_times = [] + if cur.weekday() == 0 and cur != start: + week_num += 1 + date_key = cur.strftime('%m/%d/%y') + weekday = cur.weekday() + for loc in locs: + start_h, end_h, breaks = get_clinic_hours(loc, weekday) + if start_h is None: continue - - loc_providers_df = providers_df[ - (providers_df['Date'] == date_key) & - (providers_df['Location'] == location) & - (providers_df['Start_Time'].notna()) & - (providers_df['End_Time'].notna()) & - (~providers_df['Note'].str.upper().fillna('').isin(OFF_LST)) + on_date = providers_df[ + (providers_df['Date'] == date_key) & (providers_df['Location'] == loc) & + providers_df['Start_Time'].notna() & providers_df['End_Time'].notna() & + ~providers_df['Note'].str.upper().fillna('').isin(OFF_LST) ] - for _, row in loc_providers_df.iterrows(): - provider = row['Name'] - start_time = row['Start_Time'] - end_time = row['End_Time'] - note = row['Note'].strip().upper() if pd.notna(row['Note']) and row['Note'].strip() != '' else '' - provider_info_row = provider_info_df[provider_info_df['Provider'] == provider] - display_name = provider_info_row['Last_Name'].iloc[0] if not provider_info_row.empty else provider - week_key = f"Week {week_number}" - if week_key not in weekly_hours: - weekly_hours[week_key] = {} - weekly_totals[week_key] = {} - if location not in weekly_hours[week_key]: - weekly_hours[week_key][location] = {} - if display_name not in weekly_hours[week_key][location]: - weekly_hours[week_key][location][display_name] = 0.0 - if display_name not in weekly_totals[week_key]: - weekly_totals[week_key][display_name] = 0.0 - - start_dt = datetime.combine(current_date, start_time) - end_dt = datetime.combine(current_date, end_time) - if end_dt < start_dt: - end_dt += pd.Timedelta(days=1) - total_hours = (end_dt - start_dt).total_seconds() / 3600 - - # Add 1 hour if the note contains "6-7/TELE" + for _, row in on_date.iterrows(): + prov = row['Name'] + s, e = row['Start_Time'], row['End_Time'] + note = row.get('Note', '').strip().upper() + last_name = info_df[info_df['Provider'] == prov]['Last_Name'].iloc[0] if not info_df[info_df['Provider'] == prov].empty else prov + key = f"Week {week_num}" + weekly.setdefault(key, {}).setdefault(loc, {}) + totals.setdefault(key, {}) + weekly[key][loc][last_name] = weekly[key][loc].get(last_name, 0.0) + totals[key][last_name] = totals[key].get(last_name, 0.0) + dt_s = datetime.combine(cur, s) + dt_e = datetime.combine(cur, e) + if dt_e < dt_s: + dt_e += pd.Timedelta(days=1) + hours = (dt_e - dt_s).total_seconds() / 3600 if note == '6-7/TELE': - total_hours += 1.0 - - # Newly added. - # Check if clinical hours are less than 5 hours to decide on break time deduction - clinical_hours = total_hours - apply_break = clinical_hours >= 5.0 - - if apply_break: - for break_start, break_end in break_times: - break_start_dt = datetime.combine(current_date, break_start) - break_end_dt = datetime.combine(current_date, break_end) - if break_end_dt < break_start_dt: - break_end_dt += pd.Timedelta(days=1) - overlap_start = max(start_dt, break_start_dt) - overlap_end = min(end_dt, break_end_dt) - if overlap_start < overlap_end: - overlap_hours = (overlap_end - overlap_start).total_seconds() / 3600 - total_hours -= overlap_hours - - clinic_start_dt = datetime.combine(current_date, clinic_start) - clinic_end_dt = datetime.combine(current_date, clinic_end) - if clinic_end_dt < clinic_start_dt: - clinic_end_dt += pd.Timedelta(days=1) - overlap_start = max(start_dt, clinic_start_dt) - overlap_end = min(end_dt, clinic_end_dt) - if overlap_start < overlap_end: - total_hours = min(total_hours, (overlap_end - overlap_start).total_seconds() / 3600) + hours += 1.0 + if hours >= 5.0: + for bs, be in breaks: + bs_dt = datetime.combine(cur, time(*bs)) + be_dt = datetime.combine(cur, time(*be)) + if be_dt < bs_dt: + be_dt += pd.Timedelta(days=1) + os, oe = max(dt_s, bs_dt), min(dt_e, be_dt) + if os < oe: + hours -= (oe - os).total_seconds() / 3600 + cs_dt = datetime.combine(cur, start_h) + ce_dt = datetime.combine(cur, end_h) + if ce_dt < cs_dt: + ce_dt += pd.Timedelta(days=1) + os, oe = max(dt_s, cs_dt), min(dt_e, ce_dt) + if os < oe: + hours = min(hours, (oe - os).total_seconds() / 3600) else: - total_hours = 0.0 - - weekly_hours[week_key][location][display_name] += max(total_hours, 0.0) - weekly_totals[week_key][display_name] += max(total_hours, 0.0) - - current_date += pd.Timedelta(days=1) - return weekly_hours, weekly_totals + hours = 0.0 + weekly[key][loc][last_name] += max(hours, 0.0) + totals[key][last_name] += max(hours, 0.0) + cur += pd.Timedelta(days=1) + return weekly, totals -def calculate_max_entries_per_day(providers_df, ma_df, start_obj, end_obj, all_locations, check_age_coverage_flag, check_location_conflicts_flag, check_operation_coverage_flag, check_ma_mismatch_flag, num_provider_files): - week_entries = [] - current_date = start_obj - current_week = [] - current_week_days = 0 - while current_date <= end_obj: - if current_date.weekday() == 6: - current_date += pd.Timedelta(days=1) +# ---------------------------------------------------------------------- +# Entry Count for Pagination +# ---------------------------------------------------------------------- +def calculate_max_entries_per_day(providers_df, ma_df, start, end, locs, checks, num_prov_files): + entries = [] + cur = start + week, day_count = [], 0 + while cur <= end: + if cur.weekday() == 6: + cur += pd.Timedelta(days=1) continue - if current_week_days >= 6: - week_entries.append(sum(current_week)) - current_week = [] - current_week_days = 0 - total_entries_day = 0 - has_providers = False - check_locations = [loc for loc in all_locations if loc not in NO_AGE_CHECK_LOCATIONS] - perform_overall_check = check_age_coverage_flag and num_provider_files > 1 and len(check_locations) > 1 - perform_conflict_check = check_location_conflicts_flag and num_provider_files > 1 and len(all_locations) > 1 - operation_check_locations = [loc for loc in all_locations if loc not in NO_OPERATION_CHECK_LOCATIONS] - for location in all_locations: - providers_on_day = providers_df[ - (providers_df['Date'] == current_date) & - (providers_df['Location'] == location) - ] if not providers_df.empty else pd.DataFrame() - providers_count = len(providers_on_day[~( - (providers_on_day['Start_Time'].isna()) & - (providers_on_day['End_Time'].isna()) & - (providers_on_day['Note'].isna() | (providers_on_day['Note'] == ''))) - ]) if not providers_on_day.empty else 0 - is_holiday = False - is_school_closed = False - if not providers_on_day.empty: - all_provider_notes = providers_on_day['Note'].dropna().str.strip().str.upper().tolist() - if all_provider_notes and all(note == 'HOLIDAY' for note in all_provider_notes): - is_holiday = True - elif all_provider_notes and all(note == 'SCHOOL CLOSED' for note in all_provider_notes) and location in NO_AGE_CHECK_LOCATIONS: - is_school_closed = True - if providers_count > 0 or is_holiday or is_school_closed: - total_entries_day += 1 - if is_holiday or is_school_closed: - total_entries_day += 1 + if day_count >= 6: + entries.append(sum(week)) + week, day_count = [], 0 + day_total = 0 + has = False + for loc in locs: + prov_day = providers_df[providers_df['Date'] == cur] if not providers_df.empty else pd.DataFrame() + prov_day = prov_day[prov_day['Location'] == loc] + prov_count = len(prov_day) - len(prov_day[ + prov_day['Start_Time'].isna() & prov_day['End_Time'].isna() & + (prov_day['Note'].isna() | (prov_day['Note'] == '')) + ]) + holiday = school_closed = False + if not prov_day.empty: + notes = prov_day['Note'].dropna().str.strip().str.upper().tolist() + holiday = all(n == 'HOLIDAY' for n in notes) if notes else False + school_closed = all(n == 'SCHOOL CLOSED' for n in notes) and loc in NO_AGE_CHECK_LOCATIONS if notes else False + if prov_count or holiday or school_closed: + has = True + day_total += 1 + if holiday or school_closed: + day_total += 1 else: - total_entries_day += providers_count - if check_age_coverage_flag and providers_count > 0 and location not in NO_AGE_CHECK_LOCATIONS: - total_entries_day += 1 - if check_operation_coverage_flag and providers_count > 0 and location in operation_check_locations: - gaps = check_operation_time_coverage(providers_df, current_date, location) - if gaps: - total_entries_day += 1 - if check_ma_mismatch_flag: - working_providers = len(providers_on_day[ - providers_on_day['Start_Time'].notna() & - providers_on_day['End_Time'].notna() & - ~providers_on_day['Note'].str.upper().fillna('').isin(OFF_LST) - ]) - ma_on_day = ma_df[ - (ma_df['Date'] == current_date) & - (ma_df['Location'] == location) - ] if not ma_df.empty else pd.DataFrame() - ma_count = len(ma_on_day[ - ma_on_day['Start_Time'].notna() & - ma_on_day['End_Time'].notna() & - ~ma_on_day['Note'].str.upper().fillna('').isin(OFF_LST) - ]) if not ma_on_day.empty else 0 - total_entries_day += 1 # MAs header - total_entries_day += ma_count # Each MA - if not (ma_count == working_providers or ma_count == working_providers + 1): - total_entries_day += 1 # Warning - has_providers = True - if perform_overall_check and has_providers: - total_entries_day += 1 - if perform_conflict_check and has_providers: - conflicts = check_provider_location_conflicts(providers_df, current_date, all_locations) - if conflicts: - total_entries_day += len(conflicts) - current_week.append(total_entries_day) - current_week_days += 1 - current_date += pd.Timedelta(days=1) - if current_week: - week_entries.append(sum(current_week)) - return week_entries + day_total += prov_count + if checks[0] and prov_count and loc not in NO_AGE_CHECK_LOCATIONS: + day_total += 1 + if checks[2] and prov_count and loc not in NO_OPERATION_CHECK_LOCATIONS: + if check_operation_time_coverage(providers_df, cur, loc): + day_total += 1 + if checks[3]: + wp = len(prov_day[prov_day['Start_Time'].notna() & prov_day['End_Time'].notna() & + ~prov_day['Note'].str.upper().fillna('').isin(OFF_LST)]) + ma_day = ma_df[ma_df['Date'] == cur] if not ma_df.empty else pd.DataFrame() + ma_day = ma_day[ma_day['Location'] == loc] + ma_count = len(ma_day[ma_day['Start_Time'].notna() & ma_day['End_Time'].notna() & + ~ma_day['Note'].str.upper().fillna('').isin(OFF_LST)]) + day_total += 1 + ma_count + if not (ma_count == wp or ma_count == wp + 1): + day_total += 1 + if checks[0] and num_prov_files > 1 and len([l for l in locs if l not in NO_AGE_CHECK_LOCATIONS]) > 1 and has: + day_total += 1 + if checks[1] and num_prov_files > 1 and len(locs) > 1 and has: + conf = check_provider_location_conflicts(providers_df, cur, locs) + day_total += len(conf) + week.append(day_total) + day_count += 1 + cur += pd.Timedelta(days=1) + if week: + entries.append(sum(week)) + return entries + +# ---------------------------------------------------------------------- +# HTML Day Renderer +# ---------------------------------------------------------------------- +def _render_day(providers_df, info_df, ma_df, date, locs, checks, bmw_locs): + date_key = pd.to_datetime(date).strftime('%m/%d/%y') + html = f'
{date.day}
' + has_content = False + + # Conflicts + if checks[1]: + for prov, locs_list, cls, msg in check_provider_location_conflicts(providers_df, date, locs): + html += f'
{msg}
' + has_content = True + + # Overall age + if checks[0]: + miss, holiday = check_overall_age_coverage(providers_df, info_df, date, locs) + if not holiday and miss: + html += f'
Missing age all locations: {", ".join(miss)}
' + has_content = True -def combine_schedules(provider_info_file, provider_files, ma_files, start_date, end_date, check_age_coverage_flag, check_location_conflicts_flag, check_operation_coverage_flag, check_ma_mismatch_flag, show_weekly_hours, selected_locations): + for loc in sorted(locs): + loc_df = providers_df[(providers_df['Date'] == pd.to_datetime(date_key)) & (providers_df['Location'] == loc)] + loc_info = info_df[info_df['Location'] == loc] if info_df is not None else pd.DataFrame() + holiday = school_closed = False + if not loc_df.empty: + notes = loc_df['Note'].dropna().str.strip().str.upper().tolist() + holiday = all(n == 'HOLIDAY' for n in notes) if notes else False + school_closed = all(n == 'SCHOOL CLOSED' for n in notes) and loc in NO_AGE_CHECK_LOCATIONS if notes else False + loc_df = loc_df[~((loc_df['Start_Time'].isna()) & (loc_df['End_Time'].isna()) & + (loc_df['Note'].isna() | (loc_df['Note'] == '')))] + if not loc_df.empty or holiday or school_closed: + has_content = True + html += f'
{loc}
' + if holiday: + html += '
Holiday!
Clinic Closed!
' + if school_closed: + html += '
School Closed!
' + if not holiday and not school_closed: + html += '
Providers:
' + miss, full, under, over, only25 = check_age_coverage(providers_df, loc_info, loc, date) + for _, row in loc_df.iterrows(): + info_row = loc_info[loc_info['Provider'] == row['Name']] + name = info_row['Last_Name'].iloc[0] if not info_row.empty else row['Name'] + tstr = get_time_string(row) + color = "#ff6347" if row['Name'] in full else \ + "#008000" if row['Name'] in under else \ + "#0000ff" if row['Name'] in over else \ + "#8E44AD" if row['Name'] in only25 else "#000000" + style = f"font-size:7pt;margin:1mm;line-height:1.1;color:{color}" + if tstr in OFF_LST: + style += ";text-decoration:line-through" + html += f'{name}: {tstr}
' + html += '
' + if checks[2] and loc not in NO_OPERATION_CHECK_LOCATIONS: + gaps = check_operation_time_coverage(providers_df, date, loc) + if gaps: + html += f'
Missing: {", ".join(gaps)}
' + if checks[0] and miss and loc not in NO_AGE_CHECK_LOCATIONS: + html += f'
Missing: {", ".join(miss)}
' + if checks[3]: + ma_loc = ma_df[(ma_df['Date'] == pd.to_datetime(date_key)) & (ma_df['Location'] == loc)] + ma_loc = ma_loc[~((ma_loc['Start_Time'].isna()) & (ma_loc['End_Time'].isna()) & + (ma_loc['Note'].isna() | (ma_loc['Note'] == '')))] + if not ma_loc.empty: + html += '
MAs:
' + for _, r in ma_loc.iterrows(): + tstr = get_time_string(r) + style = "font-size:7pt;margin:1mm;line-height:1.1;color:#000" + if tstr in OFF_LST: + style += ";text-decoration:line-through" + html += f'{r["Name"]}: {tstr}
' + html += '
' + wp = len(loc_df[loc_df['Start_Time'].notna() & loc_df['End_Time'].notna() & + ~loc_df['Note'].str.upper().fillna('').isin(OFF_LST)]) + wm = len(ma_loc[ma_loc['Start_Time'].notna() & ma_loc['End_Time'].notna() & + ~ma_loc['Note'].str.upper().fillna('').isin(OFF_LST)]) + if not (wm == wp or wm == wp + 1): + html += f'
MA Mismatch: {wm} MAs for {wp} Providers
' + html += '
' + html += '
' + return html, has_content + +# ---------------------------------------------------------------------- +# Main Schedule Generator +# ---------------------------------------------------------------------- +def combine_schedules(provider_info_file, provider_files, ma_files, start_date, end_date, + check_age, check_conflict, check_op, check_ma, show_hours, selected_locs): save_files([provider_info_file] if provider_info_file else []) - save_files(provider_files if provider_files else []) - save_files(ma_files if ma_files else []) - provider_info_path = None - provider_schedule_paths = [] - ma_schedule_paths = [] - for file in os.listdir(UPLOAD_DIR): - file_path = os.path.join(UPLOAD_DIR, file) - if file.lower().endswith(('.xlsx', '.xls')): - if "provider_info" in file.lower(): - provider_info_path = file_path - elif "ma" in file.lower(): - ma_schedule_paths.append(file_path) - else: - provider_schedule_paths.append(file_path) - if not provider_schedule_paths: - return "

At least one Provider Schedule file must be present in the uploads directory

", None, None - if not provider_info_path: - return "

Provider Info file is required when Provider Schedule is present

", None, None - if check_ma_mismatch_flag and not ma_schedule_paths: - return "

MA mismatch check selected but no MA schedule files found. Please upload files with 'MA' in the filename.

", None, None - provider_info_df, provider_info_error = validate_provider_info(provider_info_path) - if provider_info_error: - return f"

{provider_info_error}

", None, None - providers_dfs = [] - for provider_file_path in provider_schedule_paths: - providers_df, provider_error = validate_excel_file(provider_file_path, ['Name', 'Location', 'Date1', 'Start_Time1', 'End_Time1']) - if provider_error: - return f"

{provider_error}

", None, None - if providers_df is not None: - providers_dfs.append(providers_df) - if not providers_dfs: - return "

No valid provider schedules found!

", None, None - providers_df = pd.concat(providers_dfs, ignore_index=True) - providers_df = providers_df.drop_duplicates() - ma_dfs = [] - if check_ma_mismatch_flag: - for ma_file_path in ma_schedule_paths: - ma_df_temp, ma_error = validate_excel_file(ma_file_path, ['Name', 'Location', 'Date1', 'Start_Time1', 'End_Time1']) - if ma_error: - return f"

{ma_error}

", None, None - if ma_df_temp is not None: - ma_dfs.append(ma_df_temp) - if not ma_dfs: - return "

No valid MA schedules found!

", None, None - ma_df = pd.concat(ma_dfs, ignore_index=True) - ma_df = ma_df.drop_duplicates() - else: - ma_df = pd.DataFrame() - all_locations = set(providers_df['Location'].unique()) if not providers_df.empty else set() - if not selected_locations: - return "

At least one location must be selected!

", None, None - specific_locations = [loc for loc in selected_locations if loc != 'All Locations'] - if specific_locations: - display_locations = {loc for loc in specific_locations if loc in all_locations} - else: - display_locations = all_locations - if not display_locations: - return "

Selected locations not found in uploaded provider schedules!

", None, None + save_files(provider_files or []) + save_files(ma_files or []) + + # Locate files + prov_info_path = next((os.path.join(UPLOAD_DIR, f) for f in os.listdir(UPLOAD_DIR) + if "provider_info" in f.lower() and f.endswith(('.xlsx', '.xls'))), None) + prov_paths = [os.path.join(UPLOAD_DIR, f) for f in os.listdir(UPLOAD_DIR) + if f not in (os.path.basename(prov_info_path) if prov_info_path else "") and + not "ma" in f.lower() and f.endswith(('.xlsx', '.xls'))] + ma_paths = [os.path.join(UPLOAD_DIR, f) for f in os.listdir(UPLOAD_DIR) + if "ma" in f.lower() and f.endswith(('.xlsx', '.xls'))] + + if not prov_paths: + return "

No provider schedule files.

", None, None + if not prov_info_path: + return "

Provider Info required.

", None, None + if check_ma and not ma_paths: + return "

MA files required for mismatch check.

", None, None + + # Validate + info_df, err = validate_provider_info(prov_info_path) + if err: + return f"

{err}

", None, None + + prov_df, err = _parse_excel_files(prov_paths) + if err: + return f"

Provider file error: {err}

", None, None + if prov_df is None: + return "

No valid provider data.

", None, None + + ma_df = pd.DataFrame() + if check_ma: + ma_df, err = _parse_excel_files(ma_paths) + if err or ma_df is None: + return "

MA file error or no data.

", None, None + + # Locations + all_locs = set(prov_df['Location'].unique()) + sel = [l for l in selected_locs if l != 'All Locations'] + display_locs = {l for l in (sel or all_locs) if l in all_locs} + if not display_locs: + return "

No valid locations selected.

", None, None + + # Dates try: - if not start_date or not end_date: - return "

Start date and end date must be provided!

", None, None - start_obj = pd.to_datetime(start_date.strip(), format='%m/%d/%y') - end_obj = pd.to_datetime(end_date.strip(), format='%m/%d/%y') - if start_obj > end_obj: - return "

Start date must be before or equal to end date!

", None, None - available_dates = providers_df['Date'].unique() if not providers_df.empty else [] - if available_dates: - available_dates = pd.to_datetime(available_dates) - start_obj = max(start_obj, min(available_dates)) - end_obj = min(end_obj, max(available_dates)) - else: - return "

No valid dates found in the provider schedules!

", None, None - except ValueError: - return "

Dates must be in MM/DD/YY format!

", None, None - num_days = (end_obj - start_obj).days + 1 - weeks = (num_days + start_obj.weekday()) // 7 + (1 if (num_days + start_obj.weekday()) % 7 > 0 else 0) - header_height = 50 - week_header_height = 10 - day_header_height = 10 - entry_height = 10 - num_locations = len(display_locations) - num_provider_files = len(providers_dfs) - buffer_height = 200 + (num_locations * 20) + (num_provider_files * 25) - week_entries = calculate_max_entries_per_day(providers_df, ma_df, start_obj, end_obj, display_locations, check_age_coverage_flag, check_location_conflicts_flag, check_operation_coverage_flag, check_ma_mismatch_flag, num_provider_files) - if show_weekly_hours: - weekly_hours, weekly_totals = calculate_weekly_hours(providers_df, provider_info_df, start_obj, end_obj, display_locations) - providers_per_location = {} - for location in display_locations: - loc_providers = providers_df[providers_df['Location'] == location]['Name'].unique() - loc_provider_info = provider_info_df[provider_info_df['Provider'].isin(loc_providers)] - providers_per_location[location] = sorted( - [provider_info_df[provider_info_df['Provider'] == p]['Last_Name'].iloc[0] if not provider_info_df[provider_info_df['Provider'] == p].empty else p for p in loc_providers] - ) - max_providers = max([len(providers) for providers in providers_per_location.values()], default=0) - hours_table_height = (len(weekly_hours) * (max_providers + 2) * entry_height) + 20 - buffer_height += hours_table_height - else: - weekly_hours = {} - weekly_totals = {} - hours_table_height = 0 - a4_height = 842 - bmw_locations = [loc for loc in display_locations if loc in ['Berwyn', 'Morgan', 'Western']] - perform_overall_check = check_age_coverage_flag and len(providers_dfs) > 1 and len(bmw_locations) > 1 - perform_conflict_check = check_location_conflicts_flag and len(providers_dfs) > 1 and len(display_locations) > 1 - locations_str = ", ".join(sorted(display_locations)) if display_locations else "No Locations" - generation_time = datetime.now(CHICAGO_TZ).strftime('%I:%M %p CDT, %B %d, %Y') - html_content = f""" - - - - - - Alivio Schedule for {locations_str} - {start_date} to {end_date} - - - -
- """ - current_date = start_obj - start_weekday = current_date.weekday() - current_week_days = 0 - week_content = [] - page_content = [] - week_counter = 1 - is_first_page = True - current_page_height = header_height - week_index = 0 - for i in range(start_weekday % 6): + s_obj = pd.to_datetime(start_date.strip(), format='%m/%d/%y') + e_obj = pd.to_datetime(end_date.strip(), format='%m/%d/%y') + if s_obj > e_obj: + return "

Start > End.

", None, None + avail = pd.to_datetime(prov_df['Date'].unique()) + s_obj, e_obj = max(s_obj, avail.min()), min(e_obj, avail.max()) + except: + return "

Invalid date format.

", None, None + + # Pagination + week_entries = calculate_max_entries_per_day( + prov_df, ma_df, s_obj, e_obj, display_locs, + [check_age, check_conflict, check_op, check_ma], + len(prov_paths) + ) + + # Weekly hours + weekly, totals = {}, {} + if show_hours: + weekly, totals = calculate_weekly_hours(prov_df, info_df, s_obj, e_obj, display_locs) + + # HTML + gen_time = datetime.now(CHICAGO_TZ).strftime('%I:%M %p CDT, %B %d, %Y') + loc_str = ", ".join(sorted(display_locs)) + a4_h = 842 + header_h = 50 + buffer = 200 + len(display_locs)*20 + len(prov_paths)*25 + if show_hours: + buffer += (len(weekly) * (max(len(v) for v in weekly.values()) + 2) * 10) + 20 + + html = f"""Alivio Schedule
""" + day_headers = '
' + \ + ''.join(f'
{d}
' for d in ["Monday","Tuesday","Wednesday","Thursday","Friday","Saturday"]) + '
' + + cur_date = s_obj + week_content, page_content = [], [] + cur_page_h = header_h + week_idx = 0 + week_num = 1 + start_wd = s_obj.weekday() + for _ in range(start_wd % 6): week_content.append('
') - current_week_days += 1 - day_headers = """ -
-
-
Monday
-
Tuesday
-
Wednesday
-
Thursday
-
Friday
-
Saturday
-
- """ - while current_date <= end_obj: - if current_date.weekday() == 6: - current_date += pd.Timedelta(days=1) + + while cur_date <= e_obj: + if cur_date.weekday() == 6: + cur_date += pd.Timedelta(days=1) continue - if current_week_days >= 6: - week_height = (week_header_height + day_header_height if week_counter == 1 else week_header_height) + (week_entries[week_index] * entry_height) + 10 - if current_page_height + week_height > a4_height - buffer_height and not is_first_page: - html_content += f'
{"".join(page_content)}
' - page_content = [] - current_page_height = header_height - is_first_page = False - week_class = "first-week" if week_counter == 1 else "week" - week_html = f'
{week_counter}
' + ''.join(week_content) + '
' - if week_counter == 1: - week_html = f'
{day_headers}{week_html}
' - else: - week_html = f'
{week_html}
' + if len(week_content) >= 7: + week_h = (10 + (10 if week_num == 1 else 0)) + (week_entries[week_idx] * 10) + 10 + if cur_page_h + week_h > a4_h - buffer and page_content: + html += f'
{"".join(page_content)}
' + page_content, cur_page_h = [], header_h + cls = "first-week" if week_num == 1 else "week" + week_html = f'
{week_num}
{"".join(week_content)}
' + week_html = f'
{day_headers}{week_html}
' if week_num == 1 else f'
{week_html}
' page_content.append(week_html) - current_page_height += week_height - week_content = [] - current_week_days = 0 - week_index += 1 - week_counter += 1 - day_num = current_date.day - date_key = current_date.strftime('%m/%d/%y') - is_current_day = current_date.strftime('%Y-%m-%d') == datetime.now().strftime('%Y-%m-%d') - day_html = f'
{day_num}
' - has_content = False - if perform_conflict_check: - conflicts = check_provider_location_conflicts(providers_df, pd.to_datetime(date_key, format='%m/%d/%y'), display_locations) - for provider, locations, warning_class, message in conflicts: - day_html += f'
{message}
' - has_content = True - if perform_overall_check: - overall_missing_ages, is_holiday = check_overall_age_coverage(providers_df, provider_info_df, pd.to_datetime(date_key, format='%m/%d/%y'), display_locations) - if not is_holiday and overall_missing_ages: - day_html += f'
Missing age all locations: {", ".join(overall_missing_ages)}
' - has_content = True - for location in sorted(display_locations): - loc_providers_df = providers_df[(providers_df['Date'] == pd.to_datetime(date_key, format='%m/%d/%y')) & - (providers_df['Location'] == location)] if not providers_df.empty else pd.DataFrame() - loc_provider_info_df = provider_info_df[provider_info_df['Location'] == location] if provider_info_df is not None else pd.DataFrame() - is_holiday = False - is_school_closed = False - if not loc_providers_df.empty: - all_provider_notes = loc_providers_df['Note'].dropna().str.strip().str.upper().tolist() - if all_provider_notes and all(note == 'HOLIDAY' for note in all_provider_notes): - is_holiday = True - elif all_provider_notes and all(note == 'SCHOOL CLOSED' for note in all_provider_notes) and location in NO_AGE_CHECK_LOCATIONS: - is_school_closed = True - if not loc_providers_df.empty: - loc_providers_df = loc_providers_df[ - ~((loc_providers_df['Start_Time'].isna()) & - (loc_providers_df['End_Time'].isna()) & - (loc_providers_df['Note'].isna() | (loc_providers_df['Note'] == ''))) - ] - if not loc_providers_df.empty or is_holiday or is_school_closed: - has_content = True - day_html += f'
{location}
' - if is_holiday: - day_html += '
Holiday!
Clinic Closed!
' - if is_school_closed: - day_html += '
School Closed!
' - if not is_holiday and not is_school_closed: - if not loc_providers_df.empty: - day_html += '
Providers:
' - missing_ages, full_age_providers, under_18_providers, over_18_providers, only_25_plus_providers = check_age_coverage(providers_df, loc_provider_info_df, location, pd.to_datetime(date_key, format='%m/%d/%y')) - for _, row in loc_providers_df.iterrows(): - provider_info_row = loc_provider_info_df[loc_provider_info_df['Provider'] == row['Name']] - display_name = provider_info_row['Last_Name'].iloc[0] if not provider_info_row.empty else row['Name'] - time_str = get_time_string(row) - style = "font-size: 7pt; margin: 1mm; line-height: 1.1;" - if row['Name'] in full_age_providers: - style += "color: #ff6347;" - elif row['Name'] in under_18_providers: - style += "color: #008000;" - elif row['Name'] in over_18_providers: - style += "color: #0000ff;" - elif row['Name'] in only_25_plus_providers: - style += "color: #8E44AD;" - else: - style += "color: #000000;" - if time_str in OFF_LST: - style += " text-decoration: line-through;" - day_html += f'{display_name}: {time_str}
' - day_html += '
' - if check_operation_coverage_flag and location not in NO_OPERATION_CHECK_LOCATIONS: - gaps = check_operation_time_coverage(providers_df, pd.to_datetime(date_key, format='%m/%d/%y'), location) - if gaps: - gap_text = ", ".join(gaps) - day_html += f'
Missing: {gap_text}
' - if check_age_coverage_flag and missing_ages and location not in NO_AGE_CHECK_LOCATIONS: - warning_text = f"Missing: {', '.join(missing_ages)}" - if len(bmw_locations) == 1: - warning_text = f"Missing: {', '.join(missing_ages)}" - day_html += f'
{warning_text}
' - if check_ma_mismatch_flag: - loc_ma_df = ma_df[(ma_df['Date'] == pd.to_datetime(date_key, format='%m/%d/%y')) & - (ma_df['Location'] == location)] if not ma_df.empty else pd.DataFrame() - if not loc_ma_df.empty: - loc_ma_df = loc_ma_df[ - ~((loc_ma_df['Start_Time'].isna()) & - (loc_ma_df['End_Time'].isna()) & - (loc_ma_df['Note'].isna() | (loc_ma_df['Note'] == ''))) - ] - day_html += '
MAs:
' - for _, row in loc_ma_df.iterrows(): - display_name = row['Name'] - time_str = get_time_string(row) - style = "font-size: 7pt; margin: 1mm; line-height: 1.1; color: #000000;" - if time_str in OFF_LST: - style += " text-decoration: line-through;" - day_html += f'{display_name}: {time_str}
' - day_html += '
' - working_providers = len(loc_providers_df[ - loc_providers_df['Start_Time'].notna() & - loc_providers_df['End_Time'].notna() & - ~loc_providers_df['Note'].str.upper().fillna('').isin(OFF_LST) - ]) - working_mas = len(loc_ma_df[ - loc_ma_df['Start_Time'].notna() & - loc_ma_df['End_Time'].notna() & - ~loc_ma_df['Note'].str.upper().fillna('').isin(OFF_LST) - ]) - if not (working_mas == working_providers or working_mas == working_providers + 1): - day_html += f'
MA Mismatch: {working_mas} MAs for {working_providers} Providers
' - day_html += '
' - day_html += '
' + cur_page_h += week_h + week_content, week_idx, week_num = [], week_idx + 1, week_num + 1 + day_html, _ = _render_day(prov_df, info_df, ma_df, cur_date, display_locs, + [check_age, check_conflict, check_op, check_ma], + [l for l in display_locs if l in ['Berwyn','Morgan','Western']]) week_content.append(day_html) - current_week_days += 1 - current_date += pd.Timedelta(days=1) + cur_date += pd.Timedelta(days=1) + if week_content: - week_height = (week_header_height + day_header_height if week_counter == 1 else week_header_height) + (week_entries[week_index] * entry_height) + 10 - if current_page_height + week_height > a4_height - buffer_height and not is_first_page: - html_content += f'
{"".join(page_content)}
' - page_content = [] - current_page_height = header_height - week_class = "week" - week_html = f'
{week_counter}
' + ''.join(week_content) + '
' - if week_counter == 1: - week_html = f'
{day_headers}{week_html}
' - else: - week_html = f'
{week_html}
' + week_h = (10 + (10 if week_num == 1 else 0)) + (week_entries[week_idx] * 10) + 10 + if cur_page_h + week_h > a4_h - buffer and page_content: + html += f'
{"".join(page_content)}
' + page_content, cur_page_h = [], header_h + cls = "week" + week_html = f'
{week_num}
{"".join(week_content)}
' + week_html = f'
{day_headers}{week_html}
' if week_num == 1 else f'
{week_html}
' page_content.append(week_html) - current_page_height += week_height - week_index += 1 - week_counter += 1 + if page_content: - html_content += f'
{"".join(page_content)}
' - if show_weekly_hours: - html_content += '
' - for week in weekly_hours.keys(): - html_content += f'' - html_content += '' + ''.join(f'' for loc in sorted(display_locations)) + '' - for provider in sorted(set(p for loc in weekly_hours[week].values() for p in loc.keys())): - html_content += '' - for loc in sorted(display_locations): - hours = weekly_hours[week].get(loc, {}).get(provider, 0.0) - html_content += f'' - total_hours = weekly_totals[week].get(provider, 0.0) - html_content += f'' - html_content += '' - html_content += '
{week} Clinical Hours
Provider{loc}Total
' + provider + '{hours:.1f}{total_hours:.1f}
' - html_content += '
' - html_content += "
" - with tempfile.TemporaryDirectory() as temp_dir: - output_file = os.path.join(temp_dir, f"schedule_combined_{uuid.uuid4()}.html") - with open(output_file, 'w') as f: - f.write(html_content) - output_pdf_file = None + html += f'
{"".join(page_content)}
' + + if show_hours: + html += '
' + for w in weekly: + html += f'' + html += '' + ''.join(f'' for l in sorted(display_locs)) + '' + for p in sorted(set(p for loc in weekly[w].values() for p in loc)): + html += f'' + for l in sorted(display_locs): + html += f'' + html += f'' + html += '
{w} Clinical Hours
Provider{l}Total
{p}{weekly[w].get(l, {}).get(p, 0.0):.1f}{totals[w].get(p, 0.0):.1f}
' + html += '
' + + html += '
' + + # PDF + with tempfile.TemporaryDirectory() as td: + html_path = os.path.join(td, f"sched_{uuid.uuid4()}.html") + with open(html_path, 'w') as f: + f.write(html) + pdf_path = os.path.join(td, f"sched_{uuid.uuid4()}.pdf") try: - output_pdf_file = os.path.join(temp_dir, f"schedule_combined_{uuid.uuid4()}.pdf") - css = CSS(string=''' - @page { - size: A4; - margin: 1.5mm 1.5mm 5mm 1.5mm; - @top-center { - font-size: 8pt; - font-family: Arial, Helvetica, sans-serif; - margin-top: 1.5mm; - } - @bottom-center { - content: "Alivio Schedule for ''' + locations_str + ''' - ''' + start_date + ''' to ''' + end_date + '''\\aLast Edited on ''' + generation_time + '''"; - font-size: 8pt; - font-family: Arial, Helvetica, sans-serif; - margin-bottom: 5mm; - white-space: pre-line; - line-height: 1.3; - } - } - * { - box-sizing: border-box; - } - body { - font-family: Arial, Helvetica, sans-serif; - margin: 1mm; - width: 210mm; - background-color: #ffffff; - font-size: 10pt; - orphans: 4; - widows: 4; - } - .calendar { - width: 210mm; - padding: 0.5mm; - background-color: #ffffff; - break-before: auto; - break-after: always; - page-break-inside: avoid; - margin-top: 5mm; - margin-bottom: 15mm; - } - .page-group { - break-before: page; - break-after: page; - break-inside: avoid; - page-break-inside: avoid; - } - .week-group { - break-inside: avoid; - page-break-inside: avoid; - break-before: auto; - break-after: auto; - margin-bottom: 1mm; - } - .week { - display: grid; - grid-template-columns: 5mm repeat(6, 33.1mm); - gap: 0.2mm; - margin-bottom: 1mm; - break-inside: avoid; - page-break-inside: avoid; - break-before: auto; - break-after: auto; - } - .first-week { - display: grid; - grid-template-columns: 5mm repeat(6, 33.1mm); - gap: 0.2mm; - margin-bottom: 1mm; - break-inside: avoid; - page-break-inside: avoid; - break-before: auto; - break-after: auto; - } - .day-headers { - display: grid; - grid-template-columns: 5mm repeat(6, 33.1mm); - gap: 0.2mm; - break-inside: avoid; - page-break-inside: avoid; - break-before: auto; - break-after: auto; - margin-bottom: 0; - } - .week-number { - font-weight: bold; - text-align: center; - background-color: #e0e0e0; - padding: 0.5mm; - border: 0.1mm solid #A6A09B; - font-size: 8pt; - line-height: 1.1; - display: flex; - align-items: center; - justify-content: center; - } - .day { - border: 0.1mm solid #A6A09B; - padding: 0.5mm; - min-height: 40mm; - background-color: #f9f9f9; - border-radius: 0.5mm; - display: flex; - flex-direction: column; - align-items: flex-start; - break-inside: avoid; - page-break-inside: avoid; - break-after: avoid; - break-before: auto; - overflow-wrap: break-word; - line-height: 1.1; - } - .day-header { - border: 0.1mm solid #A6A09B; - font-weight: bold; - text-align: center; - background-color: #e0e0e0; - padding: 0.5mm; - font-size: 8pt; - width: 100%; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - break-before: auto; - break-after: auto; - } - .event { - margin: 0.3mm 0; - font-size: 7pt; - line-height: 1.1; - overflow-wrap: break-word; - } - .event-info { - font-size: 7pt; - line-height: 1.1; - margin-left: 1mm; - overflow-wrap: break-word; - } - .warning { - color: #d32f2f; - font-weight: bold; - margin: 0.3mm 0; - background-color: #fff3cd; - padding: 0.5mm; - border: 1px solid #d32f2f; - border-radius: 0.5mm; - font-size: 7pt; - overflow-wrap: break-word; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - } - .warning-details { - color: #d32f2f; - margin: 0.3mm 0; - font-size: 7pt; - display: block; - overflow-wrap: break-word; - line-height: 1.1; - } - .overall-warning { - color: #d32f2f; - font-weight: bold; - margin: 0.3mm 0; - background-color: #BBE3FC; - padding: 0.5mm; - border-radius: 0.5mm; - font-size: 7pt; - overflow-wrap: break-word; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - border: 1px solid; - } - .conflict-warning { - color: #d32f2f; - font-weight: bold; - margin: 0.3mm 0; - background-color: #F6CFFF; - padding: 0.5mm; - border-radius: 0.5mm; - font-size: 7pt; - overflow-wrap: break-word; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - border: 1px solid; - } - .wen-conflict-warning { - color: #d32f2f; - font-weight: bold; - margin: 0.3mm 0; - background-color: #F4A8FF; - padding: 0.5mm; - border: 1px solid; - border-radius: 0.5mm; - font-size: 7pt; - overflow-wrap: break-word; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - } - .operation-warning { - color: #d32f2f; - transformed: 0.3mm 0; - background-color: #ffcccb; - padding: 0.5mm; - border-radius: 0.5mm; - font-size: 7pt; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - border: 1px solid; - } - .holiday-message { - color: #000000; - font-weight: bold; - margin: 0.3mm 0; - font-size: 8pt; - text-align: center; - background Kayla: #fa91de; - padding: 0.5mm; - overflow-wrap: break-word; - line-height: 1.1; - border-radius: 0.5mm; - break-inside: avoid; - page-break-inside: avoid; - } - .location-section { - margin: 0.3mm 0; - padding: 0.5mm; - border-left: 0.3mm solid #4682b4; - font-size: 8pt; - overflow-wrap: break-word; - line-height: 1.1; - break-inside: avoid; - page-break-inside: avoid; - } - .hours-table { - width: 200mm; - border-collapse: collapse; - margin-top: 5mm; - margin-bottom: 10mm; - font-size: 7pt; - break-inside: avoid; - page-break-inside: avoid; - } - .hours-table th, .hours-table td { - border: 0.1mm solid #A6A09B; - padding: 0.5mm; - text-align: center; - line-height: 1.1; - } - .hours-table th { - background-color: #e0e0e0; - font-weight: bold; - } - ''') - HTML(string=html_content).write_pdf(output_pdf_file, stylesheets=[css]) + HTML(string=html).write_pdf(pdf_path, stylesheets=[CSS(string='@page {size:A4; margin:1.5mm 1.5mm 5mm 1.5mm;}')]) except Exception as e: - return f"

Error generating PDF: {str(e)}

", None, None - final_output_file = f"schedule_combined_{uuid.uuid4()}.html" - final_output_pdf = f"schedule_combined_{uuid.uuid4()}.pdf" if output_pdf_file else None - if os.path.exists(output_file): - shutil.copy(output_file, final_output_file) - if output_pdf_file and os.path.exists(output_pdf_file): - shutil.copy(output_pdf_file, final_output_pdf) - return html_content, final_output_file, final_output_pdf + return f"

PDF error: {e}

", None, None + out_html = f"schedule_{uuid.uuid4()}.html" + out_pdf = f"schedule_{uuid.uuid4()}.pdf" + shutil.copy(html_path, out_html) + shutil.copy(pdf_path, out_pdf) + return html, out_html, out_pdf +# ---------------------------------------------------------------------- +# UI +# ---------------------------------------------------------------------- def check_password(pwd): - if pwd == "alivio0000": - return gr.update(visible=False), gr.update(visible=True), "" - else: - return gr.update(visible=True), gr.update(visible=False), "Incorrect password." + return (gr.update(visible=False), gr.update(visible=True), "") if pwd == "alivio0000" else \ + (gr.update(visible=True), gr.update(visible=False), "Incorrect password.") def create_interface(): with gr.Blocks(title="Alivio Schedule Display") as demo: @@ -1624,11 +788,11 @@ def create_interface(): with gr.Row(): start_date = gr.Textbox(label="Start Date (e.g., 06/02/25 for June 2, 2025)", placeholder="e.g., 06/02/25") end_date = gr.Textbox(label="End Date (e.g., 07/05/25 for July 5, 2025)", placeholder="e.g., 07/05/25") - check_age_coverage = gr.Checkbox(label="Check Age Coverage per Location (Berwyn, Morgan, Western only)", value=False) - check_operation_coverage = gr.Checkbox(label="Check Operation Time Coverage (Berwyn, Morgan, Western only)", value=False) - check_location_conflicts = gr.Checkbox(label="Check Provider Location Conflicts", value=True) - check_ma_mismatch = gr.Checkbox(label="Check Provider-MA Mismatch Rate", value=False) - show_weekly_hours = gr.Checkbox(label="Show Weekly Provider Hours per Location", value=False) + check_age_coverage = gr.Checkbox(label="Age Coverage Check", value=False) + check_operation_coverage = gr.Checkbox(label="Operation Hours Check", value=False) + check_location_conflicts = gr.Checkbox(label="Provider Location Conflict Check", value=True) + check_ma_mismatch = gr.Checkbox(label="Staffing Ratio Check", value=False) + show_weekly_hours = gr.Checkbox(label="Provider Hours Summary", value=False) location_selector = gr.CheckboxGroup(label="Select Locations to Display", choices=AVAILABLE_LOCATIONS, value=['All Locations']) submit_btn = gr.Button("Submit") output = gr.HTML(label="Hospital Schedule") @@ -1648,7 +812,7 @@ def create_interface(): password_button.click(fn=check_password, inputs=password_input, outputs=[password_section, app_section, password_feedback]) return demo + if __name__ == "__main__": demo = create_interface() - demo.launch() - \ No newline at end of file + demo.launch() \ No newline at end of file