import pandas as pd import src.etl.transform as transformed_data import datetime from datetime import timedelta import src.etl.extract as extract from src.config.constants import ShiftType, LineType, KitLevel, DefaultConfig # Re-import all the packages import importlib # Reload modules to get latest changes importlib.reload(extract) importlib.reload(transformed_data) # Uncomment if needed def get_date_span(): try: # Try to get from streamlit session state (from config page) import streamlit as st if hasattr(st, 'session_state') and 'start_date' in st.session_state: from datetime import datetime, timedelta start_date = datetime.combine(st.session_state.start_date, datetime.min.time()) # Check if we have calculated planning_days, otherwise determine from data if 'planning_days' in st.session_state and st.session_state.planning_days: planning_days = st.session_state.planning_days end_date = start_date + timedelta(days=planning_days - 1) else: # Determine date range from actual demand data for the exact start date try: demand_data = extract.read_orders_data(start_date=start_date) if not demand_data.empty: import pandas as pd # Get unique finish dates for this exact start date finish_dates = pd.to_datetime(demand_data["Basic finish date"]).dt.date.unique() finish_dates = sorted(finish_dates) if finish_dates: end_date = datetime.combine(max(finish_dates), datetime.min.time()) planning_days = (end_date - start_date).days + 1 else: end_date = start_date planning_days = 1 else: end_date = start_date + timedelta(days=4) # Default 5 days planning_days = 5 except Exception as e: print(f"Could not determine date range from data: {e}") end_date = start_date + timedelta(days=4) # Default 5 days planning_days = 5 date_span = list(range(1, planning_days + 1)) print(f"Using dates from config page: {start_date} to {end_date} ({planning_days} days)") print("date span", date_span) return date_span, start_date, end_date except Exception as e: print(f"Could not get dates from streamlit session: {e}") print(f"Loading default date values") # Default to match the user's data in COOIS_Released_Prod_Orders.csv from datetime import datetime return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) # Default 5 days #fetch date from streamlit or default value. The streamlit and default references the demand data (COOIS_Planned_and_Released.csv) DATE_SPAN, start_date, end_date = get_date_span() # Update global dates in extract module BEFORE any data loading extract.set_global_dates(start_date, end_date) print(f"\nšŸ“… DATE RANGE: {start_date} to {end_date}") print(f"šŸ“ PRODUCT SOURCE: COOIS_Released_Prod_Orders.csv") def get_product_list(): """ Get filtered product list. IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. """ try: # Always get fresh filtered products to reflect current configs from src.demand_filtering import get_shared_filter_instance filter_instance = get_shared_filter_instance() # Force reload data to pick up new dates/configs filter_instance.load_data(force_reload=True) product_list = filter_instance.get_filtered_product_list() print(f"šŸ“¦ FRESH FILTERED PRODUCTS: {len(product_list)} products ready for optimization") print(f"šŸŽÆ Products: {product_list}") return product_list except Exception as e: print(f"Error loading dynamic product list: {e}") # Fallback to unfiltered list product_list = transformed_data.get_released_product_list(start_date) print(f"šŸ“¦ FALLBACK UNFILTERED PRODUCTS: {len(product_list)} products -> {product_list}") return product_list # DO NOT load at import time - always call get_product_list() dynamically # PRODUCT_LIST = get_product_list() # REMOVED - was causing stale data! def get_employee_type_list(): """Get employee type list - try from streamlit session state first, then from data files""" try: # Try to get from streamlit session state (from Dataset Metadata page) import streamlit as st if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state: print(f"Using employee types from Dataset Metadata page: {st.session_state.selected_employee_types}") return st.session_state.selected_employee_types except Exception as e: print(f"Could not get employee types from streamlit session: {e}") # Default: load from data files print(f"Loading employee type list from data files") employee_type_list = extract.read_employee_data() emp_type_list = employee_type_list["employment_type"].unique() return emp_type_list EMPLOYEE_TYPE_LIST = get_employee_type_list() # print("employee type list",EMPLOYEE_TYPE_LIST) def get_shift_list(): """Get shift list - try from streamlit session state first, then from data files""" try: # Try to get from streamlit session state (from Dataset Metadata page) import streamlit as st if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state: print(f"Using shifts from Dataset Metadata page: {st.session_state.selected_shifts}") return st.session_state.selected_shifts except Exception as e: print(f"Could not get shifts from streamlit session: {e}") # Default: load from data files print(f"Loading shift list from data files") shift_list = extract.get_shift_info() shift_list = shift_list["id"].unique() return shift_list # Evening shift activation mode - define early to avoid circular dependency # Options: # "normal" - Only use regular shift (1) and overtime shift (3) - NO evening shift # "activate_evening" - Allow evening shift (2) when demand is too high or cost-effective # "always_available" - Evening shift always available as option EVENING_SHIFT_MODE = "normal" # Default: only regular + overtime # Evening shift activation threshold # If demand cannot be met with regular + overtime, suggest evening shift activation EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 # Activate if regular+overtime capacity < 90% of demand def get_active_shift_list(): """ Get the list of active shifts based on EVENING_SHIFT_MODE setting. """ all_shifts = get_shift_list() if EVENING_SHIFT_MODE == "normal": # Only regular and overtime shifts - NO evening shift active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only, NO evening)") elif EVENING_SHIFT_MODE == "activate_evening": # All shifts including evening (2) active_shifts = list(all_shifts) print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}") elif EVENING_SHIFT_MODE == "always_available": # All shifts always available active_shifts = list(all_shifts) print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}") else: # Default to normal mode active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}") return active_shifts SHIFT_LIST = get_active_shift_list() # print("shift list",SHIFT_LIST) def get_line_list(): """Get line list - try from streamlit session state first, then from data files""" try: # Try to get from streamlit session state (from Dataset Metadata page) import streamlit as st if hasattr(st, 'session_state') and 'selected_lines' in st.session_state: print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}") return st.session_state.selected_lines except Exception as e: print(f"Could not get lines from streamlit session: {e}") # Default: load from data files print(f"Loading line list from data files") line_df = extract.read_packaging_line_data() line_list = line_df["id"].unique().tolist() return line_list LINE_LIST = get_line_list() # print("line list",LINE_LIST) def get_kit_line_match(): kit_line_match = extract.read_kit_line_match_data() kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() # Create line name to ID mapping line_name_to_id = { "long line": LineType.LONG_LINE, "mini load": LineType.MINI_LOAD, "miniload": LineType.MINI_LOAD, # Alternative naming (no space) "Long_line": LineType.LONG_LINE, # Alternative naming "Mini_load": LineType.MINI_LOAD, # Alternative naming } # Convert string line names to numeric IDs converted_dict = {} for kit, line_name in kit_line_match_dict.items(): if isinstance(line_name, str) and line_name.strip(): # Convert string names to numeric IDs line_id = line_name_to_id.get(line_name.strip(), None) if line_id is not None: converted_dict[kit] = line_id else: print(f"Warning: Unknown line type '{line_name}' for kit {kit}") # Default to long line if unknown converted_dict[kit] = LineType.LONG_LINE elif isinstance(line_name, (int, float)) and not pd.isna(line_name): # Already numeric converted_dict[kit] = int(line_name) else: # Missing or empty line type - skip (no production needed for non-standalone masters) pass # Don't add to converted_dict - these kits won't have line assignments return converted_dict KIT_LINE_MATCH_DICT = get_kit_line_match() def get_line_cnt_per_type(): try: # Try to get from streamlit session state (from config page) import streamlit as st if hasattr(st, 'session_state') and 'line_counts' in st.session_state: print(f"Using line counts from config page: {st.session_state.line_counts}") return st.session_state.line_counts except Exception as e: print(f"Could not get line counts from streamlit session: {e}") print(f"Loading default line count values from data files") line_df = extract.read_packaging_line_data() line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict() print("line cnt per type", line_cnt_per_type) return line_cnt_per_type LINE_CNT_PER_TYPE = get_line_cnt_per_type() print("line cnt per type",LINE_CNT_PER_TYPE) def get_demand_dictionary(force_reload=False): """ Get filtered demand dictionary. IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. """ try: # Always get fresh filtered demand to reflect current configs from src.demand_filtering import get_shared_filter_instance filter_instance = get_shared_filter_instance() # Force reload data to pick up new dates/configs filter_instance.load_data(force_reload=True) demand_dictionary = filter_instance.get_filtered_demand_dictionary() print(f"šŸ“ˆ FRESH FILTERED DEMAND: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}") print(f"šŸ”„ LOADED DYNAMICALLY: Reflects current Streamlit configs") return demand_dictionary except Exception as e: print(f"Error loading dynamic demand dictionary: {e}") raise Exception("Demand dictionary not found with error:"+str(e)) # DO NOT load at import time - always call get_demand_dictionary() dynamically # DEMAND_DICTIONARY = get_demand_dictionary() # REMOVED - was causing stale data! def get_cost_list_per_emp_shift(): try: # Try to get from streamlit session state (from config page) import streamlit as st if hasattr(st, 'session_state') and 'cost_list_per_emp_shift' in st.session_state: print(f"Using cost list from config page: {st.session_state.cost_list_per_emp_shift}") return st.session_state.cost_list_per_emp_shift except Exception as e: print(f"Could not get cost list from streamlit session: {e}") print(f"Loading default cost values") # Default hourly rates - Important: multiple employment types with different costs return DefaultConfig.DEFAULT_COST_RATES def shift_code_to_name(): return ShiftType.get_all_names() def line_code_to_name(): """Convert line type IDs to readable names""" return LineType.get_all_names() COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() # print("cost list per emp shift",COST_LIST_PER_EMP_SHIFT) # COST_LIST_PER_EMP_SHIFT = { # WH_Workforce_Hourly_Pay_Scale # "Fixed": {1: 0, 2: 22, 3: 18}, # "Humanizer": {1: 10, 2: 10, 3: 10}, # } def get_team_requirements(product_list=None): """ Extract team requirements from Kits Calculation CSV. Returns dictionary with employee type as key and product requirements as nested dict. """ if product_list is None: product_list = get_product_list() # Get fresh product list try: # Check if streamlit has this data (for future extension) # streamlit_team_req = dashboard.team_requirements # return streamlit_team_req pass except Exception as e: print(f"Using default value for team requirements, extracting from CSV: {e}") # Read the kits calculation data directly kits_df = extract.read_personnel_requirement_data() # kits_path = "data/real_data_excel/converted_csv/Kits__Calculation.csv" # kits_df = pd.read_csv(kits_path) print("kits_df columns:", kits_df.columns.tolist()) print("kits_df head:", kits_df.head()) # Initialize the team requirements dictionary team_req_dict = { "UNICEF Fixed term": {}, "Humanizer": {} } # Process each product in the product list for product in product_list: print("product",product) print(f"Processing team requirements for product: {product}") product_data = kits_df[kits_df['Kit'] == product] print("product_data",product_data) if not product_data.empty: # Extract Humanizer and UNICEF staff requirements humanizer_req = product_data["Humanizer"].iloc[0] unicef_req = product_data["UNICEF staff"].iloc[0] # Convert to int (data is already cleaned in extract function) team_req_dict["Humanizer"][product] = int(humanizer_req) team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) else: print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0") return team_req_dict # DO NOT load at import time - always call get_team_requirements() dynamically # TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST) # REMOVED - was causing stale data! def get_max_employee_per_type_on_day(): try: # Try to get from streamlit session state (from config page) import streamlit as st if hasattr(st, 'session_state') and 'max_employee_per_type_on_day' in st.session_state: print(f"Using max employee counts from config page: {st.session_state.max_employee_per_type_on_day}") return st.session_state.max_employee_per_type_on_day except Exception as e: print(f"Could not get max employee counts from streamlit session: {e}") print(f"Loading default max employee values") max_employee_per_type_on_day = { "UNICEF Fixed term": { t: 8 for t in DATE_SPAN }, "Humanizer": { t: 10 for t in DATE_SPAN } } return max_employee_per_type_on_day MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day() print("max employee per type on day",MAX_EMPLOYEE_PER_TYPE_ON_DAY) # available employee but for fixed in shift 1, it is mandatory employment MAX_HOUR_PER_PERSON_PER_DAY = 14 # legal standard def get_max_hour_per_shift_per_person(): """Get max hours per shift per person - checks Streamlit session state first""" try: import streamlit as st if hasattr(st, 'session_state') and 'max_hour_per_shift_per_person' in st.session_state: return st.session_state.max_hour_per_shift_per_person except Exception as e: print(f"Could not get max hours per shift from session: {e}") # Fallback to default only if not configured by user return DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON MAX_HOUR_PER_SHIFT_PER_PERSON = get_max_hour_per_shift_per_person() # Removed unnecessary getter functions - use direct imports instead: # - MAX_HOUR_PER_PERSON_PER_DAY # - MAX_HOUR_PER_SHIFT_PER_PERSON # - KIT_LINE_MATCH_DICT # - MAX_PARALLEL_WORKERS # - DAILY_WEEKLY_SCHEDULE # - EVENING_SHIFT_MODE # Keep these complex getters that access DefaultConfig or have complex logic: def get_evening_shift_demand_threshold(): """Get evening shift demand threshold - checks Streamlit session state first""" try: import streamlit as st if hasattr(st, 'session_state') and 'evening_shift_demand_threshold' in st.session_state: return st.session_state.evening_shift_demand_threshold except Exception as e: print(f"Could not get evening shift threshold from session: {e}") # Fallback to default only if not configured by user return getattr(DefaultConfig, 'EVENING_SHIFT_DEMAND_THRESHOLD', 10000) def get_fixed_min_unicef_per_day(): """Get fixed minimum UNICEF staff per day - checks Streamlit session state first""" try: import streamlit as st if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: return st.session_state.fixed_min_unicef_per_day except Exception as e: print(f"Could not get fixed min UNICEF from session: {e}") # Fallback to default only if not configured by user return getattr(DefaultConfig, 'FIXED_MIN_UNICEF_PER_DAY', {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}) def get_per_product_speed(): try: # Try to get from streamlit session state (from config page) import streamlit as st if hasattr(st, 'session_state') and 'per_product_speed' in st.session_state: print(f"Using per product speed from config page: {st.session_state.per_product_speed}") return st.session_state.per_product_speed except Exception as e: print(f"Could not get per product speed from streamlit session: {e}") print(f"Loading default per product speed from data files") per_product_speed = extract.read_package_speed_data() return per_product_speed # Get per product speed - will use actual product names from PRODUCT_LIST PER_PRODUCT_SPEED = get_per_product_speed() # number of products that can be produced per hour per line #This information is critical and it should not rely on the productivity information # ---- Kit Hierarchy for Production Ordering ---- def get_kit_hierarchy_data(): try: # Try to get from streamlit first (future extension) # streamlit_hierarchy = dashboard.kit_hierarchy_data # return streamlit_hierarchy pass except Exception as e: print(f"Using default hierarchy data from extract: {e}") # Get hierarchy data from extract functions kit_levels, dependencies, priority_order = extract.get_production_order_data() return kit_levels, dependencies, priority_order KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data() print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items") def get_max_parallel_workers(): """Get max parallel workers - checks Streamlit session state first""" try: import streamlit as st if hasattr(st, 'session_state') and 'max_parallel_workers' in st.session_state: return st.session_state.max_parallel_workers except Exception as e: print(f"Could not get max parallel workers from session: {e}") # Fallback to default only if not configured by user return DefaultConfig.MAX_PARALLEL_WORKERS MAX_PARALLEL_WORKERS = get_max_parallel_workers() # maximum number of workers that can work on a line at the same time DAILY_WEEKLY_SCHEDULE = "daily" # daily or weekly ,this needs to be implementedin in if F_x1_day is not None... F_x1_week is not None... also need to change x1 to Fixedstaff_first_shift # Fixed staff constraint mode # Options: # "mandatory" - Forces all fixed staff to work full hours every day (expensive, 99.7% waste) # "available" - Staff available up to limits but not forced (balanced approach) # "priority" - Fixed staff used first, then temporary staff (realistic business model) # "none" - Purely demand-driven scheduling (cost-efficient) FIXED_STAFF_CONSTRAINT_MODE = "priority" # Recommended: "priority" for realistic business model def get_fixed_min_unicef_per_day(): """ Get fixed minimum UNICEF employees per day - try from streamlit session state first, then default This ensures a minimum number of UNICEF fixed-term staff are present every working day """ try: import streamlit as st if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: print(f"Using fixed minimum UNICEF per day from config page: {st.session_state.fixed_min_unicef_per_day}") return st.session_state.fixed_min_unicef_per_day except ImportError: pass # Streamlit not available in CLI mode # Default value - minimum UNICEF Fixed term employees required per day return 2 # Set the constant for backward compatibility FIXED_MIN_UNICEF_PER_DAY = get_fixed_min_unicef_per_day() def get_payment_mode_config(): """ Get payment mode configuration - try from streamlit session state first, then default values Payment modes: - "bulk": If employee works any hours in shift, pay for full shift hours - "partial": Pay only for actual hours worked """ try: # Try to get from streamlit session state (from Dataset Metadata page) import streamlit as st if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state: print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}") return st.session_state.payment_mode_config except Exception as e: print(f"Could not get payment mode config from streamlit session: {e}") # Default payment mode configuration print(f"Loading default payment mode configuration") payment_mode_config = DefaultConfig.PAYMENT_MODE_CONFIG return payment_mode_config PAYMENT_MODE_CONFIG = get_payment_mode_config() print("Payment mode configuration:", PAYMENT_MODE_CONFIG)