| import pandas as pd |
| import src.etl.transform as transformed_data |
| import datetime |
| from datetime import timedelta |
| import src.etl.extract as extract |
|
|
| |
| import importlib |
|
|
| |
| importlib.reload(extract) |
| importlib.reload(transformed_data) |
|
|
|
|
| def get_date_span(): |
| print(f"π§ FORCING NEW DATE RANGE FOR TESTING") |
| |
| from datetime import datetime |
| return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| DATE_SPAN, start_date, end_date = get_date_span() |
|
|
|
|
| print(f"\nπ
DATE RANGE: {start_date} to {end_date}") |
| print(f"π PRODUCT SOURCE: COOIS_Released_Prod_Orders.csv") |
| PRODUCT_LIST = transformed_data.get_released_product_list(start_date, end_date) |
| print(f"π¦ PRODUCTS FOUND: {len(PRODUCT_LIST)} products -> {PRODUCT_LIST}") |
|
|
|
|
| def get_employee_type_list(): |
| """Get employee type list - try from streamlit session state first, then from data files""" |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state: |
| print(f"Using employee types from Dataset Metadata page: {st.session_state.selected_employee_types}") |
| return st.session_state.selected_employee_types |
| except Exception as e: |
| print(f"Could not get employee types from streamlit session: {e}") |
| |
| |
| print(f"Loading employee type list from data files") |
| employee_type_list = extract.read_employee_data() |
| emp_type_list = employee_type_list["employment_type"].unique() |
| return emp_type_list |
| |
| EMPLOYEE_TYPE_LIST = get_employee_type_list() |
| |
|
|
| def get_shift_list(): |
| """Get shift list - try from streamlit session state first, then from data files""" |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state: |
| print(f"Using shifts from Dataset Metadata page: {st.session_state.selected_shifts}") |
| return st.session_state.selected_shifts |
| except Exception as e: |
| print(f"Could not get shifts from streamlit session: {e}") |
| |
| |
| print(f"Loading shift list from data files") |
| shift_list = extract.get_shift_info() |
| shift_list = shift_list["id"].unique() |
| return shift_list |
|
|
| |
| |
| |
| |
| |
| EVENING_SHIFT_MODE = "normal" |
|
|
| |
| |
| EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 |
|
|
| def get_active_shift_list(): |
| """ |
| Get the list of active shifts based on EVENING_SHIFT_MODE setting. |
| """ |
| all_shifts = get_shift_list() |
| |
| if EVENING_SHIFT_MODE == "normal": |
| |
| active_shifts = [s for s in all_shifts if s in [1, 2]] |
| print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only)") |
| |
| elif EVENING_SHIFT_MODE == "activate_evening": |
| |
| active_shifts = list(all_shifts) |
| print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}") |
| |
| elif EVENING_SHIFT_MODE == "always_available": |
| |
| active_shifts = list(all_shifts) |
| print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}") |
| |
| else: |
| |
| active_shifts = [s for s in all_shifts if s in [1, 2]] |
| print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}") |
| |
| return active_shifts |
|
|
| SHIFT_LIST = get_active_shift_list() |
| |
|
|
|
|
| def get_line_list(): |
| """Get line list - try from streamlit session state first, then from data files""" |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_lines' in st.session_state: |
| print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}") |
| return st.session_state.selected_lines |
| except Exception as e: |
| print(f"Could not get lines from streamlit session: {e}") |
| |
| |
| print(f"Loading line list from data files") |
| line_df = extract.read_packaging_line_data() |
| line_list = line_df["id"].unique().tolist() |
| return line_list |
|
|
| LINE_LIST = get_line_list() |
| |
|
|
|
|
| def get_kit_line_match(): |
| kit_line_match = extract.read_kit_line_match_data() |
| kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() |
| |
| |
| line_name_to_id = { |
| "long line": 6, |
| "mini load": 7, |
| "Long_line": 6, |
| "Mini_load": 7, |
| } |
| |
| |
| converted_dict = {} |
| for kit, line_name in kit_line_match_dict.items(): |
| if isinstance(line_name, str) and line_name.strip(): |
| |
| line_id = line_name_to_id.get(line_name.strip(), None) |
| if line_id is not None: |
| converted_dict[kit] = line_id |
| else: |
| print(f"Warning: Unknown line type '{line_name}' for kit {kit}") |
| |
| converted_dict[kit] = 6 |
| elif isinstance(line_name, (int, float)) and not pd.isna(line_name): |
| |
| converted_dict[kit] = int(line_name) |
| else: |
| |
| converted_dict[kit] = 6 |
| |
| return converted_dict |
|
|
| KIT_LINE_MATCH_DICT = get_kit_line_match() |
|
|
|
|
| def get_line_cnt_per_type(): |
| try: |
| streamlit_line_cnt_per_type = dashboard.line_cnt_per_type |
| return streamlit_line_cnt_per_type |
| except Exception as e: |
| print(f"using default value for line cnt per type") |
| line_df = extract.read_packaging_line_data() |
| line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict() |
| print("line cnt per type") |
| print(line_cnt_per_type) |
| return line_cnt_per_type |
|
|
| LINE_CNT_PER_TYPE = get_line_cnt_per_type() |
| print("line cnt per type",LINE_CNT_PER_TYPE) |
|
|
| def get_demand_dictionary(): |
| try: |
| streamlit_demand_dictionary = dashboard.demand_dictionary |
| return streamlit_demand_dictionary |
| except Exception as e: |
| |
| |
| demand_df = extract.read_released_orders_data(start_date=start_date, end_date=end_date) |
| demand_dictionary = demand_df.groupby('Material Number')["Order quantity (GMEIN)"].sum().to_dict() |
| print(f"π DEMAND DATA: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}") |
| return demand_dictionary |
|
|
| DEMAND_DICTIONARY = get_demand_dictionary() |
| print(f"π― FINAL DEMAND: {DEMAND_DICTIONARY}") |
|
|
| def get_cost_list_per_emp_shift(): |
| try: |
| streamlit_cost_list_per_emp_shift = dashboard.cost_list_per_emp_shift |
| return streamlit_cost_list_per_emp_shift |
| except Exception as e: |
| print(f"using default value for cost list per emp shift") |
| shift_cost_df = extract.read_shift_cost_data() |
| |
| |
| return {"UNICEF Fixed term":{1:43.27,2:43.27,3:64.91},"Humanizer":{1:27.94,2:27.94,3:41.91}} |
|
|
| def shift_code_to_name(): |
| shift_code_to_name_dict = { |
| 1: "normal", |
| 2: "evening", |
| 3: "overtime" |
| } |
| return shift_code_to_name_dict |
|
|
| COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() |
| |
| |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_team_requirements(PRODUCT_LIST): |
| """ |
| Extract team requirements from Kits Calculation CSV. |
| Returns dictionary with employee type as key and product requirements as nested dict. |
| """ |
| try: |
| |
| |
| |
| pass |
| except Exception as e: |
| print(f"Using default value for team requirements, extracting from CSV: {e}") |
| |
| |
| kits_path = "data/real_data_excel/converted_csv/Kits__Calculation.csv" |
| kits_df = pd.read_csv(kits_path) |
| |
| |
| team_req_dict = { |
| "UNICEF Fixed term": {}, |
| "Humanizer": {} |
| } |
| |
| |
| for product in PRODUCT_LIST: |
| print(f"Processing team requirements for product: {product}") |
| product_data = kits_df[kits_df['Kit'] == product] |
| |
| if not product_data.empty: |
| |
| humanizer_req = product_data["Humanizer"].iloc[0] |
| unicef_req = product_data["UNICEF staff"].iloc[0] |
| |
| |
| team_req_dict["Humanizer"][product] = int(humanizer_req) if pd.notna(humanizer_req) else 0 |
| team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) if pd.notna(unicef_req) else 0 |
| else: |
| print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0") |
| |
| |
| return team_req_dict |
|
|
| TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST) |
| print("team requirements per product:", TEAM_REQ_PER_PRODUCT) |
|
|
|
|
| def get_max_employee_per_type_on_day(): |
| try: |
| max_employee_per_type_on_day = dashboard.max_employee_per_type_on_day |
| return max_employee_per_type_on_day |
| except Exception as e: |
| print(f"using default value for max employee per type on day") |
| max_employee_per_type_on_day = { |
| "UNICEF Fixed term": { |
| t: 8 for t in DATE_SPAN |
| }, |
| "Humanizer": { |
| t: 10 for t in DATE_SPAN |
| } |
| } |
| return max_employee_per_type_on_day |
|
|
| MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day() |
| print("max employee per type on day",MAX_EMPLOYEE_PER_TYPE_ON_DAY) |
|
|
| |
|
|
| MAX_HOUR_PER_PERSON_PER_DAY = 14 |
| MAX_HOUR_PER_SHIFT_PER_PERSON = {1: 7.5, 2: 7.5, 3: 5} |
| def get_per_product_speed(): |
| try: |
| streamlit_per_product_speed = dashboard.per_product_speed |
| return streamlit_per_product_speed |
| except Exception as e: |
| print(f"using default value for per product speed") |
| per_product_speed = extract.read_package_speed_data() |
| return per_product_speed |
|
|
|
|
| |
| PER_PRODUCT_SPEED = get_per_product_speed() |
| |
| |
|
|
| |
| def get_kit_hierarchy_data(): |
| try: |
| |
| |
| |
| pass |
| except Exception as e: |
| print(f"Using default hierarchy data from extract: {e}") |
| |
| |
| kit_levels, dependencies, priority_order = extract.get_production_order_data() |
| |
| return kit_levels, dependencies, priority_order |
|
|
| KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data() |
| print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items") |
|
|
| MAX_PARALLEL_WORKERS = { |
| 6: 15, |
| 7: 15, |
| } |
| |
|
|
| DAILY_WEEKLY_SCHEDULE = "daily" |
|
|
| |
| |
| |
| |
| |
| |
| FIXED_STAFF_CONSTRAINT_MODE = "priority" |
|
|
|
|
| def get_payment_mode_config(): |
| """ |
| Get payment mode configuration - try from streamlit session state first, then default values |
| Payment modes: |
| - "bulk": If employee works any hours in shift, pay for full shift hours |
| - "partial": Pay only for actual hours worked |
| """ |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state: |
| print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}") |
| return st.session_state.payment_mode_config |
| except Exception as e: |
| print(f"Could not get payment mode config from streamlit session: {e}") |
| |
| |
| |
| print(f"Loading default payment mode configuration") |
| payment_mode_config = { |
| 1: "bulk", |
| 2: "bulk", |
| 3: "partial" |
| } |
| |
| return payment_mode_config |
|
|
| PAYMENT_MODE_CONFIG = get_payment_mode_config() |
| print("Payment mode configuration:", PAYMENT_MODE_CONFIG) |
|
|
|
|