supply-roster-optimization / src /config /optimization_config.py
HaLim
Finalize optimizer with shift type / partial&bulk work type etc
42b5ea5
raw
history blame
15.9 kB
import pandas as pd
import src.etl.transform as transformed_data
import datetime
from datetime import timedelta
import src.etl.extract as extract
# Re-import all the packages
import importlib
# Reload modules to get latest changes
importlib.reload(extract)
importlib.reload(transformed_data) # Uncomment if needed
def get_date_span():
print(f"πŸ”§ FORCING NEW DATE RANGE FOR TESTING")
# Force use new date range to match user's data
from datetime import datetime
return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) # Force 5 days for user's data
# Original logic (commented out for testing)
# try:
# start_date = dashboard.start_date
# end_date = dashboard.end_date
# date_span = list(range(1, (end_date - start_date).days + 2))
# print(f"date from user input")
# print("date span",date_span)
# print("start date",start_date)
# print("end date",end_date)
# return date_span, start_date, end_date
# except Exception as e:
# print(f"using default value for date span")
# # Updated to match the user's data in COOIS_Released_Prod_Orders.csv
# from datetime import datetime
# return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) # Default 5 days
#fetch date from streamlit or default value. The streamlit and default references the demand data (COOIS_Planned_and_Released.csv)
DATE_SPAN, start_date, end_date = get_date_span()
print(f"\nπŸ“… DATE RANGE: {start_date} to {end_date}")
print(f"πŸ“ PRODUCT SOURCE: COOIS_Released_Prod_Orders.csv")
PRODUCT_LIST = transformed_data.get_released_product_list(start_date, end_date)
print(f"πŸ“¦ PRODUCTS FOUND: {len(PRODUCT_LIST)} products -> {PRODUCT_LIST}")
def get_employee_type_list():
"""Get employee type list - try from streamlit session state first, then from data files"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state:
print(f"Using employee types from Dataset Metadata page: {st.session_state.selected_employee_types}")
return st.session_state.selected_employee_types
except Exception as e:
print(f"Could not get employee types from streamlit session: {e}")
# Default: load from data files
print(f"Loading employee type list from data files")
employee_type_list = extract.read_employee_data()
emp_type_list = employee_type_list["employment_type"].unique()
return emp_type_list
EMPLOYEE_TYPE_LIST = get_employee_type_list()
# print("employee type list",EMPLOYEE_TYPE_LIST)
def get_shift_list():
"""Get shift list - try from streamlit session state first, then from data files"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state:
print(f"Using shifts from Dataset Metadata page: {st.session_state.selected_shifts}")
return st.session_state.selected_shifts
except Exception as e:
print(f"Could not get shifts from streamlit session: {e}")
# Default: load from data files
print(f"Loading shift list from data files")
shift_list = extract.get_shift_info()
shift_list = shift_list["id"].unique()
return shift_list
# Evening shift activation mode - define early to avoid circular dependency
# Options:
# "normal" - Only use regular shift (1) and overtime shift (2)
# "activate_evening" - Allow evening shift (3) when demand is too high or cost-effective
# "always_available" - Evening shift always available as option
EVENING_SHIFT_MODE = "normal" # Default: only regular + overtime
# Evening shift activation threshold
# If demand cannot be met with regular + overtime, suggest evening shift activation
EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 # Activate if regular+overtime capacity < 90% of demand
def get_active_shift_list():
"""
Get the list of active shifts based on EVENING_SHIFT_MODE setting.
"""
all_shifts = get_shift_list()
if EVENING_SHIFT_MODE == "normal":
# Only regular (1) and overtime (2) shifts
active_shifts = [s for s in all_shifts if s in [1, 2]]
print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only)")
elif EVENING_SHIFT_MODE == "activate_evening":
# All shifts including evening (3)
active_shifts = list(all_shifts)
print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}")
elif EVENING_SHIFT_MODE == "always_available":
# All shifts always available
active_shifts = list(all_shifts)
print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}")
else:
# Default to normal mode
active_shifts = [s for s in all_shifts if s in [1, 2]]
print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}")
return active_shifts
SHIFT_LIST = get_active_shift_list()
# print("shift list",SHIFT_LIST)
def get_line_list():
"""Get line list - try from streamlit session state first, then from data files"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'selected_lines' in st.session_state:
print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}")
return st.session_state.selected_lines
except Exception as e:
print(f"Could not get lines from streamlit session: {e}")
# Default: load from data files
print(f"Loading line list from data files")
line_df = extract.read_packaging_line_data()
line_list = line_df["id"].unique().tolist()
return line_list
LINE_LIST = get_line_list()
# print("line list",LINE_LIST)
def get_kit_line_match():
kit_line_match = extract.read_kit_line_match_data()
kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict()
# Create line name to ID mapping
line_name_to_id = {
"long line": 6,
"mini load": 7,
"Long_line": 6, # Alternative naming
"Mini_load": 7, # Alternative naming
}
# Convert string line names to numeric IDs
converted_dict = {}
for kit, line_name in kit_line_match_dict.items():
if isinstance(line_name, str) and line_name.strip():
# Convert string names to numeric IDs
line_id = line_name_to_id.get(line_name.strip(), None)
if line_id is not None:
converted_dict[kit] = line_id
else:
print(f"Warning: Unknown line type '{line_name}' for kit {kit}")
# Default to long line if unknown
converted_dict[kit] = 6
elif isinstance(line_name, (int, float)) and not pd.isna(line_name):
# Already numeric
converted_dict[kit] = int(line_name)
else:
# Missing or empty line type - default to long line
converted_dict[kit] = 6
return converted_dict
KIT_LINE_MATCH_DICT = get_kit_line_match()
def get_line_cnt_per_type():
try:
streamlit_line_cnt_per_type = dashboard.line_cnt_per_type
return streamlit_line_cnt_per_type
except Exception as e:
print(f"using default value for line cnt per type")
line_df = extract.read_packaging_line_data()
line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict()
print("line cnt per type")
print(line_cnt_per_type)
return line_cnt_per_type
LINE_CNT_PER_TYPE = get_line_cnt_per_type()
print("line cnt per type",LINE_CNT_PER_TYPE)
def get_demand_dictionary():
try:
streamlit_demand_dictionary = dashboard.demand_dictionary
return streamlit_demand_dictionary
except Exception as e:
# Use released orders instead of planned orders for demand
demand_df = extract.read_released_orders_data(start_date=start_date, end_date=end_date)
demand_dictionary = demand_df.groupby('Material Number')["Order quantity (GMEIN)"].sum().to_dict()
print(f"πŸ“ˆ DEMAND DATA: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}")
return demand_dictionary
DEMAND_DICTIONARY = get_demand_dictionary()
print(f"🎯 FINAL DEMAND: {DEMAND_DICTIONARY}")
def get_cost_list_per_emp_shift():
try:
streamlit_cost_list_per_emp_shift = dashboard.cost_list_per_emp_shift
return streamlit_cost_list_per_emp_shift
except Exception as e:
print(f"using default value for cost list per emp shift")
shift_cost_df = extract.read_shift_cost_data()
#question - Important : there is multiple type of employment type in terms of the cost
#Shift 1 = normal, 2 = evening, 3 = overtime
return {"UNICEF Fixed term":{1:43.27,2:43.27,3:64.91},"Humanizer":{1:27.94,2:27.94,3:41.91}}
def shift_code_to_name():
shift_code_to_name_dict = {
1: "normal",
2: "evening",
3: "overtime"
}
return shift_code_to_name_dict
COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift()
# print("cost list per emp shift",COST_LIST_PER_EMP_SHIFT)
# COST_LIST_PER_EMP_SHIFT = { # WH_Workforce_Hourly_Pay_Scale
# "Fixed": {1: 0, 2: 22, 3: 18},
# "Humanizer": {1: 10, 2: 10, 3: 10},
# }
def get_team_requirements(PRODUCT_LIST):
"""
Extract team requirements from Kits Calculation CSV.
Returns dictionary with employee type as key and product requirements as nested dict.
"""
try:
# Check if streamlit has this data (for future extension)
# streamlit_team_req = dashboard.team_requirements
# return streamlit_team_req
pass
except Exception as e:
print(f"Using default value for team requirements, extracting from CSV: {e}")
# Read the kits calculation data directly
kits_path = "data/real_data_excel/converted_csv/Kits__Calculation.csv"
kits_df = pd.read_csv(kits_path)
# Initialize the team requirements dictionary
team_req_dict = {
"UNICEF Fixed term": {},
"Humanizer": {}
}
# Process each product in the product list
for product in PRODUCT_LIST:
print(f"Processing team requirements for product: {product}")
product_data = kits_df[kits_df['Kit'] == product]
if not product_data.empty:
# Extract Humanizer and UNICEF staff requirements
humanizer_req = product_data["Humanizer"].iloc[0]
unicef_req = product_data["UNICEF staff"].iloc[0]
# Convert to int, handle NaN/empty values
team_req_dict["Humanizer"][product] = int(humanizer_req) if pd.notna(humanizer_req) else 0
team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) if pd.notna(unicef_req) else 0
else:
print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0")
return team_req_dict
TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST)
print("team requirements per product:", TEAM_REQ_PER_PRODUCT)
def get_max_employee_per_type_on_day():
try:
max_employee_per_type_on_day = dashboard.max_employee_per_type_on_day
return max_employee_per_type_on_day
except Exception as e:
print(f"using default value for max employee per type on day")
max_employee_per_type_on_day = {
"UNICEF Fixed term": {
t: 8 for t in DATE_SPAN
},
"Humanizer": {
t: 10 for t in DATE_SPAN
}
}
return max_employee_per_type_on_day
MAX_EMPLOYEE_PER_TYPE_ON_DAY = get_max_employee_per_type_on_day()
print("max employee per type on day",MAX_EMPLOYEE_PER_TYPE_ON_DAY)
# available employee but for fixed in shift 1, it is mandatory employment
MAX_HOUR_PER_PERSON_PER_DAY = 14 # legal standard
MAX_HOUR_PER_SHIFT_PER_PERSON = {1: 7.5, 2: 7.5, 3: 5} #1 = normal, 2 = evening, 3 = overtime
def get_per_product_speed():
try:
streamlit_per_product_speed = dashboard.per_product_speed
return streamlit_per_product_speed
except Exception as e:
print(f"using default value for per product speed")
per_product_speed = extract.read_package_speed_data()
return per_product_speed
# Get per product speed - will use actual product names from PRODUCT_LIST
PER_PRODUCT_SPEED = get_per_product_speed()
# number of products that can be produced per hour per line
#This information is critical and it should not rely on the productivity information
# ---- Kit Hierarchy for Production Ordering ----
def get_kit_hierarchy_data():
try:
# Try to get from streamlit first (future extension)
# streamlit_hierarchy = dashboard.kit_hierarchy_data
# return streamlit_hierarchy
pass
except Exception as e:
print(f"Using default hierarchy data from extract: {e}")
# Get hierarchy data from extract functions
kit_levels, dependencies, priority_order = extract.get_production_order_data()
return kit_levels, dependencies, priority_order
KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data()
print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items")
MAX_PARALLEL_WORKERS = {
6: 15, # long line can have max 15 workers simultaneously
7: 15, # mini load can have max 15 workers simultaneously
}
# maximum number of workers that can work on a line at the same time
DAILY_WEEKLY_SCHEDULE = "daily" # daily or weekly ,this needs to be implementedin in if F_x1_day is not None... F_x1_week is not None... also need to change x1 to Fixedstaff_first_shift
# Fixed staff constraint mode
# Options:
# "mandatory" - Forces all fixed staff to work full hours every day (expensive, 99.7% waste)
# "available" - Staff available up to limits but not forced (balanced approach)
# "priority" - Fixed staff used first, then temporary staff (realistic business model)
# "none" - Purely demand-driven scheduling (cost-efficient)
FIXED_STAFF_CONSTRAINT_MODE = "priority" # Recommended: "priority" for realistic business model
def get_payment_mode_config():
"""
Get payment mode configuration - try from streamlit session state first, then default values
Payment modes:
- "bulk": If employee works any hours in shift, pay for full shift hours
- "partial": Pay only for actual hours worked
"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state:
print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}")
return st.session_state.payment_mode_config
except Exception as e:
print(f"Could not get payment mode config from streamlit session: {e}")
# Default payment mode configuration
# Shift 1: bulk, Shift 2: bulk (evening), Shift 3: partial (overtime)
print(f"Loading default payment mode configuration")
payment_mode_config = {
1: "bulk", # Regular shift - bulk payment
2: "bulk", # Evening shift - bulk payment
3: "partial" # Overtime shift - partial payment
}
return payment_mode_config
PAYMENT_MODE_CONFIG = get_payment_mode_config()
print("Payment mode configuration:", PAYMENT_MODE_CONFIG)