haileyhalimj@gmail.com
π Replace get_per_product_speed() with direct extract.read_package_speed_data() calls
60e706b | import pandas as pd | |
| import src.preprocess.transform as transformed_data | |
| import datetime | |
| from datetime import timedelta | |
| import src.preprocess.extract as extract | |
| from src.config.constants import ShiftType, LineType, KitLevel, DefaultConfig | |
| # Re-import all the packages | |
| import importlib | |
| # Reload modules to get latest changes - REMOVED to prevent infinite loops | |
| # importlib.reload(extract) | |
| # importlib.reload(transformed_data) # Uncomment if needed | |
| def get_date_span(): | |
| try: | |
| # Try to get from streamlit session state (from config page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'start_date' in st.session_state: | |
| from datetime import datetime, timedelta | |
| start_date = datetime.combine(st.session_state.start_date, datetime.min.time()) | |
| # Check if we have calculated planning_days, otherwise determine from data | |
| if 'planning_days' in st.session_state and st.session_state.planning_days: | |
| planning_days = st.session_state.planning_days | |
| end_date = start_date + timedelta(days=planning_days - 1) | |
| else: | |
| # Determine date range from actual demand data for the exact start date | |
| try: | |
| demand_data = extract.read_orders_data(start_date=start_date) | |
| if not demand_data.empty: | |
| import pandas as pd | |
| # Get unique finish dates for this exact start date | |
| finish_dates = pd.to_datetime(demand_data["Basic finish date"]).dt.date.unique() | |
| finish_dates = sorted(finish_dates) | |
| if finish_dates: | |
| end_date = datetime.combine(max(finish_dates), datetime.min.time()) | |
| planning_days = (end_date - start_date).days + 1 | |
| else: | |
| end_date = start_date | |
| planning_days = 1 | |
| else: | |
| end_date = start_date + timedelta(days=4) # Default 5 days | |
| planning_days = 5 | |
| except Exception as e: | |
| print(f"Could not determine date range from data: {e}") | |
| end_date = start_date + timedelta(days=4) # Default 5 days | |
| planning_days = 5 | |
| date_span = list(range(1, planning_days + 1)) | |
| print(f"Using dates from config page: {start_date} to {end_date} ({planning_days} days)") | |
| print("date span", date_span) | |
| return date_span, start_date, end_date | |
| except Exception as e: | |
| print(f"Could not get dates from streamlit session: {e}") | |
| print(f"Loading default date values") | |
| # Default to match the user's data in COOIS_Released_Prod_Orders.csv | |
| from datetime import datetime | |
| return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) # Default 5 days | |
| #fetch date from streamlit or default value. The streamlit and default references the demand data (COOIS_Planned_and_Released.csv) | |
| DATE_SPAN, start_date, end_date = get_date_span() | |
| # Note: No need to set global dates - extract functions take start_date as parameter directly | |
| print(f"\nπ DATE RANGE: {start_date} to {end_date}") | |
| print(f"π PRODUCT SOURCE: COOIS_Released_Prod_Orders.csv") | |
| def get_product_list(): | |
| """ | |
| Get filtered product list. | |
| IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. | |
| """ | |
| try: | |
| # Always get fresh filtered products to reflect current configs | |
| from src.demand_filtering import DemandFilter | |
| filter_instance = DemandFilter() | |
| # Force reload data to pick up new dates/configs | |
| filter_instance.load_data(force_reload=True) | |
| product_list = filter_instance.get_filtered_product_list() | |
| print(f"π¦ FRESH FILTERED PRODUCTS: {len(product_list)} products ready for optimization") | |
| print(f"π― Products: {product_list}") | |
| return product_list | |
| except Exception as e: | |
| print(f"Error loading dynamic product list: {e}") | |
| # Fallback to unfiltered list | |
| product_list = transformed_data.get_released_product_list(start_date) | |
| print(f"π¦ FALLBACK UNFILTERED PRODUCTS: {len(product_list)} products -> {product_list}") | |
| return product_list | |
| # DO NOT load at import time - always call get_product_list() dynamically | |
| # PRODUCT_LIST = get_product_list() # REMOVED - was causing stale data! | |
| def get_employee_type_list(): | |
| """Get employee type list - try from streamlit session state first, then from data files""" | |
| try: | |
| # Try to get from streamlit session state (from Dataset Metadata page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state: | |
| print(f"Using employee types from Dataset Metadata page: {st.session_state.selected_employee_types}") | |
| return st.session_state.selected_employee_types | |
| except Exception as e: | |
| print(f"Could not get employee types from streamlit session: {e}") | |
| # Default: load from data files | |
| print(f"Loading employee type list from data files") | |
| employee_type_list = extract.read_employee_data() | |
| emp_type_list = employee_type_list["employment_type"].unique() | |
| return emp_type_list | |
| # DO NOT load at import time - always call get_employee_type_list() dynamically | |
| # EMPLOYEE_TYPE_LIST = get_employee_type_list() # REMOVED - was causing stale data! | |
| def get_shift_list(): | |
| """Get shift list - try from streamlit session state first, then from data files""" | |
| try: | |
| # Try to get from streamlit session state (from Dataset Metadata page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state: | |
| print(f"Using shifts from Dataset Metadata page: {st.session_state.selected_shifts}") | |
| return st.session_state.selected_shifts | |
| except Exception as e: | |
| print(f"Could not get shifts from streamlit session: {e}") | |
| # Default: load from data files | |
| print(f"Loading shift list from data files") | |
| shift_list = extract.get_shift_info() | |
| shift_list = shift_list["id"].unique() | |
| return shift_list | |
| # Evening shift activation mode - define early to avoid circular dependency | |
| # Options: | |
| # "normal" - Only use regular shift (1) and overtime shift (3) - NO evening shift | |
| # "activate_evening" - Allow evening shift (2) when demand is too high or cost-effective | |
| # "always_available" - Evening shift always available as option | |
| EVENING_SHIFT_MODE = "normal" # Default: only regular + overtime | |
| # Evening shift activation threshold | |
| # If demand cannot be met with regular + overtime, suggest evening shift activation | |
| EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 # Activate if regular+overtime capacity < 90% of demand | |
| def get_active_shift_list(): | |
| """ | |
| Get the list of active shifts based on EVENING_SHIFT_MODE setting. | |
| """ | |
| all_shifts = get_shift_list() | |
| if EVENING_SHIFT_MODE == "normal": | |
| # Only regular and overtime shifts - NO evening shift | |
| active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] | |
| print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only, NO evening)") | |
| elif EVENING_SHIFT_MODE == "activate_evening": | |
| # All shifts including evening (2) | |
| active_shifts = list(all_shifts) | |
| print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}") | |
| elif EVENING_SHIFT_MODE == "always_available": | |
| # All shifts always available | |
| active_shifts = list(all_shifts) | |
| print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}") | |
| else: | |
| # Default to normal mode | |
| active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] | |
| print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}") | |
| return active_shifts | |
| # DO NOT load at import time - always call get_active_shift_list() dynamically | |
| # SHIFT_LIST = get_active_shift_list() # REMOVED - was causing stale data! | |
| def get_line_list(): | |
| """Get line list - try from streamlit session state first, then from data files""" | |
| try: | |
| # Try to get from streamlit session state (from Dataset Metadata page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'selected_lines' in st.session_state: | |
| print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}") | |
| return st.session_state.selected_lines | |
| except Exception as e: | |
| print(f"Could not get lines from streamlit session: {e}") | |
| # Default: load from data files | |
| print(f"Loading line list from data files") | |
| line_df = extract.read_packaging_line_data() | |
| line_list = line_df["id"].unique().tolist() | |
| return line_list | |
| # DO NOT load at import time - always call get_line_list() dynamically | |
| # LINE_LIST = get_line_list() # REMOVED - was causing stale data! | |
| def get_kit_line_match(): | |
| kit_line_match = extract.read_kit_line_match_data() | |
| kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() | |
| # Create line name to ID mapping | |
| line_name_to_id = { | |
| "long line": LineType.LONG_LINE, | |
| "mini load": LineType.MINI_LOAD, | |
| "miniload": LineType.MINI_LOAD, # Alternative naming (no space) | |
| "Long_line": LineType.LONG_LINE, # Alternative naming | |
| "Mini_load": LineType.MINI_LOAD, # Alternative naming | |
| } | |
| # Convert string line names to numeric IDs | |
| converted_dict = {} | |
| for kit, line_name in kit_line_match_dict.items(): | |
| if isinstance(line_name, str) and line_name.strip(): | |
| # Convert string names to numeric IDs | |
| line_id = line_name_to_id.get(line_name.strip(), None) | |
| if line_id is not None: | |
| converted_dict[kit] = line_id | |
| else: | |
| print(f"Warning: Unknown line type '{line_name}' for kit {kit}") | |
| # Default to long line if unknown | |
| converted_dict[kit] = LineType.LONG_LINE | |
| elif isinstance(line_name, (int, float)) and not pd.isna(line_name): | |
| # Already numeric | |
| converted_dict[kit] = int(line_name) | |
| else: | |
| # Missing or empty line type - skip (no production needed for non-standalone masters) | |
| pass # Don't add to converted_dict - these kits won't have line assignments | |
| return converted_dict | |
| KIT_LINE_MATCH_DICT = get_kit_line_match() | |
| def get_line_cnt_per_type(): | |
| try: | |
| # Try to get from streamlit session state (from config page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'line_counts' in st.session_state: | |
| print(f"Using line counts from config page: {st.session_state.line_counts}") | |
| return st.session_state.line_counts | |
| except Exception as e: | |
| print(f"Could not get line counts from streamlit session: {e}") | |
| print(f"Loading default line count values from data files") | |
| line_df = extract.read_packaging_line_data() | |
| line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict() | |
| print("line cnt per type", line_cnt_per_type) | |
| return line_cnt_per_type | |
| # DO NOT load at import time - always call get_line_cnt_per_type() dynamically | |
| # LINE_CNT_PER_TYPE = get_line_cnt_per_type() # REMOVED - was causing stale data! | |
| def get_demand_dictionary(force_reload=False): | |
| """ | |
| Get filtered demand dictionary. | |
| IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. | |
| """ | |
| try: | |
| # Always get fresh filtered demand to reflect current configs | |
| from src.demand_filtering import DemandFilter | |
| filter_instance = DemandFilter() | |
| # Force reload data to pick up new dates/configs | |
| filter_instance.load_data(force_reload=True) | |
| demand_dictionary = filter_instance.get_filtered_demand_dictionary() | |
| print(f"π FRESH FILTERED DEMAND: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}") | |
| print(f"π LOADED DYNAMICALLY: Reflects current Streamlit configs") | |
| return demand_dictionary | |
| except Exception as e: | |
| print(f"Error loading dynamic demand dictionary: {e}") | |
| raise Exception("Demand dictionary not found with error:"+str(e)) | |
| # DO NOT load at import time - always call get_demand_dictionary() dynamically | |
| # DEMAND_DICTIONARY = get_demand_dictionary() # REMOVED - was causing stale data! | |
| def get_cost_list_per_emp_shift(): | |
| try: | |
| # Try to get from streamlit session state (from config page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'cost_list_per_emp_shift' in st.session_state: | |
| print(f"Using cost list from config page: {st.session_state.cost_list_per_emp_shift}") | |
| return st.session_state.cost_list_per_emp_shift | |
| except Exception as e: | |
| print(f"Could not get cost list from streamlit session: {e}") | |
| print(f"Loading default cost values") | |
| # Default hourly rates - Important: multiple employment types with different costs | |
| return DefaultConfig.DEFAULT_COST_RATES | |
| def shift_code_to_name(): | |
| return ShiftType.get_all_names() | |
| def line_code_to_name(): | |
| """Convert line type IDs to readable names""" | |
| return LineType.get_all_names() | |
| # DO NOT load at import time - always call get_cost_list_per_emp_shift() dynamically | |
| # COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() # REMOVED - was causing stale data! | |
| # COST_LIST_PER_EMP_SHIFT = { # WH_Workforce_Hourly_Pay_Scale | |
| # "Fixed": {1: 0, 2: 22, 3: 18}, | |
| # "Humanizer": {1: 10, 2: 10, 3: 10}, | |
| # } | |
| def get_team_requirements(product_list=None): | |
| """ | |
| Extract team requirements from Kits Calculation CSV. | |
| Returns dictionary with employee type as key and product requirements as nested dict. | |
| """ | |
| if product_list is None: | |
| product_list = get_product_list() # Get fresh product list | |
| try: | |
| # Check if streamlit has this data (for future extension) | |
| # streamlit_team_req = dashboard.team_requirements | |
| # return streamlit_team_req | |
| pass | |
| except Exception as e: | |
| print(f"Using default value for team requirements, extracting from CSV: {e}") | |
| # Read the kits calculation data directly | |
| kits_df = extract.read_personnel_requirement_data() | |
| # kits_path = "data/real_data_excel/converted_csv/Kits__Calculation.csv" | |
| # kits_df = pd.read_csv(kits_path) | |
| print("kits_df columns:", kits_df.columns.tolist()) | |
| print("kits_df head:", kits_df.head()) | |
| # Initialize the team requirements dictionary | |
| team_req_dict = { | |
| "UNICEF Fixed term": {}, | |
| "Humanizer": {} | |
| } | |
| # Process each product in the product list | |
| for product in product_list: | |
| print("product",product) | |
| print(f"Processing team requirements for product: {product}") | |
| product_data = kits_df[kits_df['Kit'] == product] | |
| print("product_data",product_data) | |
| if not product_data.empty: | |
| # Extract Humanizer and UNICEF staff requirements | |
| humanizer_req = product_data["Humanizer"].iloc[0] | |
| unicef_req = product_data["UNICEF staff"].iloc[0] | |
| # Convert to int (data is already cleaned in extract function) | |
| team_req_dict["Humanizer"][product] = int(humanizer_req) | |
| team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) | |
| else: | |
| print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0") | |
| return team_req_dict | |
| # DO NOT load at import time - always call get_team_requirements() dynamically | |
| # TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST) # REMOVED - was causing stale data! | |
| def get_max_employee_per_type_on_day(): | |
| try: | |
| # Try to get from streamlit session state (from config page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'max_employee_per_type_on_day' in st.session_state: | |
| print(f"Using max employee counts from config page: {st.session_state.max_employee_per_type_on_day}") | |
| return st.session_state.max_employee_per_type_on_day | |
| except Exception as e: | |
| print(f"Could not get max employee counts from streamlit session: {e}") | |
| print(f"Loading default max employee values") | |
| max_employee_per_type_on_day = { | |
| "UNICEF Fixed term": { | |
| t: 8 for t in DATE_SPAN | |
| }, | |
| "Humanizer": { | |
| t: 10 for t in DATE_SPAN | |
| } | |
| } | |
| return max_employee_per_type_on_day | |
| # DO NOT load at import time - always call get_max_employee_per_type_on_day() dynamically | |
| # MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day() # REMOVED - was causing stale data! | |
| # available employee but for fixed in shift 1, it is mandatory employment | |
| MAX_HOUR_PER_PERSON_PER_DAY = 14 # legal standard | |
| def get_max_hour_per_shift_per_person(): | |
| """Get max hours per shift per person - checks Streamlit session state first""" | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'max_hour_per_shift_per_person' in st.session_state: | |
| return st.session_state.max_hour_per_shift_per_person | |
| except Exception as e: | |
| print(f"Could not get max hours per shift from session: {e}") | |
| # Fallback to default only if not configured by user | |
| return DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON | |
| # DO NOT load at import time - always call get_max_hour_per_shift_per_person() dynamically | |
| # MAX_HOUR_PER_SHIFT_PER_PERSON = get_max_hour_per_shift_per_person() # REMOVED - was causing stale data! | |
| # Removed unnecessary getter functions - use direct imports instead: | |
| # - MAX_HOUR_PER_PERSON_PER_DAY | |
| # - MAX_HOUR_PER_SHIFT_PER_PERSON | |
| # - KIT_LINE_MATCH_DICT | |
| # - MAX_PARALLEL_WORKERS | |
| # - EVENING_SHIFT_MODE | |
| # Keep these complex getters that access DefaultConfig or have complex logic: | |
| def get_evening_shift_demand_threshold(): | |
| """Get evening shift demand threshold - checks Streamlit session state first""" | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'evening_shift_demand_threshold' in st.session_state: | |
| return st.session_state.evening_shift_demand_threshold | |
| except Exception as e: | |
| print(f"Could not get evening shift threshold from session: {e}") | |
| # Fallback to default only if not configured by user | |
| return getattr(DefaultConfig, 'EVENING_SHIFT_DEMAND_THRESHOLD', 10000) | |
| def get_fixed_min_unicef_per_day(): | |
| """Get fixed minimum UNICEF staff per day - checks Streamlit session state first""" | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: | |
| return st.session_state.fixed_min_unicef_per_day | |
| except Exception as e: | |
| print(f"Could not get fixed min UNICEF from session: {e}") | |
| # Fallback to default only if not configured by user | |
| return getattr(DefaultConfig, 'FIXED_MIN_UNICEF_PER_DAY', {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}) | |
| # ============================================================================ | |
| # BETTER APPROACH: Explicit module-level variables with clear documentation | |
| # These variables provide backward compatibility while being explicit and clear | |
| # ============================================================================ | |
| def _ensure_fresh_config(): | |
| """ | |
| Helper function to refresh module-level variables when configuration changes. | |
| Call this after updating Streamlit session state to ensure fresh values. | |
| """ | |
| global PER_PRODUCT_SPEED, LINE_LIST, EMPLOYEE_TYPE_LIST, SHIFT_LIST | |
| global LINE_CNT_PER_TYPE, COST_LIST_PER_EMP_SHIFT, MAX_EMPLOYEE_PER_TYPE_ON_DAY | |
| global MAX_HOUR_PER_SHIFT_PER_PERSON, MAX_PARALLEL_WORKERS, FIXED_MIN_UNICEF_PER_DAY | |
| global PAYMENT_MODE_CONFIG | |
| # Refresh all cached values | |
| PER_PRODUCT_SPEED = extract.read_package_speed_data() | |
| LINE_LIST = get_line_list() | |
| EMPLOYEE_TYPE_LIST = get_employee_type_list() | |
| SHIFT_LIST = get_active_shift_list() | |
| LINE_CNT_PER_TYPE = get_line_cnt_per_type() | |
| COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() | |
| MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day() | |
| MAX_HOUR_PER_SHIFT_PER_PERSON = get_max_hour_per_shift_per_person() | |
| MAX_PARALLEL_WORKERS = get_max_parallel_workers() | |
| FIXED_MIN_UNICEF_PER_DAY = get_fixed_min_unicef_per_day() | |
| PAYMENT_MODE_CONFIG = get_payment_mode_config() | |
| # Note: Module-level variables will be initialized at the end of this file | |
| # after all functions are defined. This ensures all getter functions are available. | |
| # ---- Kit Hierarchy for Production Ordering ---- | |
| def get_kit_hierarchy_data(): | |
| try: | |
| # Try to get from streamlit first (future extension) | |
| # streamlit_hierarchy = dashboard.kit_hierarchy_data | |
| # return streamlit_hierarchy | |
| pass | |
| except Exception as e: | |
| print(f"Using default hierarchy data from extract: {e}") | |
| # Get hierarchy data from extract functions | |
| kit_levels, dependencies, priority_order = extract.get_production_order_data() | |
| return kit_levels, dependencies, priority_order | |
| KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data() | |
| print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items") | |
| def get_max_parallel_workers(): | |
| """Get max parallel workers - checks Streamlit session state first""" | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'max_parallel_workers' in st.session_state: | |
| return st.session_state.max_parallel_workers | |
| except Exception as e: | |
| print(f"Could not get max parallel workers from session: {e}") | |
| # Fallback to default only if not configured by user | |
| return DefaultConfig.MAX_PARALLEL_WORKERS | |
| # DO NOT load at import time - always call get_max_parallel_workers() dynamically | |
| # MAX_PARALLEL_WORKERS = get_max_parallel_workers() # REMOVED - was causing stale data! | |
| # maximum number of workers that can work on a line at the same time | |
| # Fixed staff constraint mode | |
| # Options: | |
| # "mandatory" - Forces all fixed staff to work full hours every day (expensive, 99.7% waste) | |
| # "available" - Staff available up to limits but not forced (balanced approach) | |
| # "priority" - Fixed staff used first, then temporary staff (realistic business model) | |
| # "none" - Purely demand-driven scheduling (cost-efficient) | |
| FIXED_STAFF_CONSTRAINT_MODE = "priority" # Recommended: "priority" for realistic business model | |
| def get_fixed_min_unicef_per_day(): | |
| """ | |
| Get fixed minimum UNICEF employees per day - try from streamlit session state first, then default | |
| This ensures a minimum number of UNICEF fixed-term staff are present every working day | |
| """ | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: | |
| print(f"Using fixed minimum UNICEF per day from config page: {st.session_state.fixed_min_unicef_per_day}") | |
| return st.session_state.fixed_min_unicef_per_day | |
| except ImportError: | |
| pass # Streamlit not available in CLI mode | |
| # Default value - minimum UNICEF Fixed term employees required per day | |
| return 2 | |
| # Set the constant for backward compatibility | |
| # DO NOT load at import time - always call get_fixed_min_unicef_per_day() dynamically | |
| # FIXED_MIN_UNICEF_PER_DAY = get_fixed_min_unicef_per_day() # REMOVED - was causing stale data! | |
| def get_payment_mode_config(): | |
| """ | |
| Get payment mode configuration - try from streamlit session state first, then default values | |
| Payment modes: | |
| - "bulk": If employee works any hours in shift, pay for full shift hours | |
| - "partial": Pay only for actual hours worked | |
| """ | |
| try: | |
| # Try to get from streamlit session state (from Dataset Metadata page) | |
| import streamlit as st | |
| if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state: | |
| print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}") | |
| return st.session_state.payment_mode_config | |
| except Exception as e: | |
| print(f"Could not get payment mode config from streamlit session: {e}") | |
| # Default payment mode configuration | |
| print(f"Loading default payment mode configuration") | |
| payment_mode_config = DefaultConfig.PAYMENT_MODE_CONFIG | |
| return payment_mode_config | |
| # DO NOT load at import time - always call get_payment_mode_config() dynamically | |
| # PAYMENT_MODE_CONFIG = get_payment_mode_config() # REMOVED - was causing stale data! | |
| # ============================================================================ | |
| # INITIALIZE MODULE-LEVEL VARIABLES | |
| # This section is at the end to ensure all functions are defined first | |
| # ============================================================================ | |
| # DISABLED: Module-level initialization was causing infinite loops in Hugging Face deployment | |
| # The functions are called dynamically when needed instead of at import time | |
| # This prevents the infinite loop where importing this module triggers Streamlit session access | |
| # which causes the app to reload, which imports this module again, etc. | |
| # Initialize with default values (will use fallback data when no Streamlit session) | |
| # PER_PRODUCT_SPEED = extract.read_package_speed_data() | |
| # LINE_LIST = get_line_list() | |
| # EMPLOYEE_TYPE_LIST = get_employee_type_list() | |
| # SHIFT_LIST = get_active_shift_list() | |
| # LINE_CNT_PER_TYPE = get_line_cnt_per_type() | |
| # COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() | |
| # MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day() | |
| # MAX_HOUR_PER_SHIFT_PER_PERSON = get_max_hour_per_shift_per_person() | |
| # MAX_PARALLEL_WORKERS = get_max_parallel_workers() | |
| # FIXED_MIN_UNICEF_PER_DAY = get_fixed_min_unicef_per_day() | |
| # PAYMENT_MODE_CONFIG = get_payment_mode_config() | |
| print("β Module-level configuration functions defined (variables initialized dynamically)") | |
| # Note: These variables are initialized once at import time with default/fallback values. | |
| # To get fresh values after changing Streamlit configuration, either: | |
| # 1. Call the get_*() functions directly (RECOMMENDED for dynamic use) | |
| # 2. Call _ensure_fresh_config() to refresh all module-level variables | |
| # 3. Use importlib.reload() to reload the entire module | |