haileyhalimj@gmail.com
π Replace get_per_product_speed() with direct extract.read_package_speed_data() calls
60e706b | # ============================================================ | |
| # SD_roster_real - Fixed Team Production Planning (Option A) | |
| # - Uses config-style variable names from src/config/optimization_config.py | |
| # - Team per product (simultaneous): UNICEF Fixed term / Humanizer | |
| # - Line types via numeric ids: 6=long, 7=short | |
| # - One product per (line, shift, day) | |
| # - Weekly demand (across DATE_SPAN) | |
| # ============================================================ | |
| from ortools.linear_solver import pywraplp | |
| from math import ceil | |
| from src.config.constants import ShiftType, LineType, KitLevel | |
| # ---- config import (νλ‘μ νΈ κ²½λ‘μ λ§μΆ° μ‘°μ ) ---- | |
| from src.config.optimization_config import ( | |
| DATE_SPAN, # [1..N] | |
| get_product_list, # DYNAMIC: list of products (e.g., ['A','B',...]) | |
| get_employee_type_list, # DYNAMIC: e.g., ['UNICEF Fixed term','Humanizer'] | |
| get_active_shift_list, # DYNAMIC: e.g., [1,2,3] | |
| get_line_list, # DYNAMIC: e.g., [6,7] (line type ids) | |
| get_line_cnt_per_type, # DYNAMIC: {6: count_of_long_lines, 7: count_of_short_lines} | |
| get_demand_dictionary, # DYNAMIC: {product: total_units_over_period} | |
| get_cost_list_per_emp_shift, # DYNAMIC: {emp_type: {shift: cost_per_hour}} | |
| get_max_employee_per_type_on_day, # DYNAMIC: {emp_type: {t: headcount}} | |
| MAX_HOUR_PER_PERSON_PER_DAY, # e.g., 14 | |
| get_max_hour_per_shift_per_person, # DYNAMIC: {1: hours, 2: hours, 3: hours} | |
| get_max_parallel_workers, # DYNAMIC: {6: max_workers, 7: max_workers} | |
| FIXED_STAFF_CONSTRAINT_MODE, # not used in fixed-team model (λμ ν¬μ μ΄λΌ 무μλ―Έ) | |
| get_team_requirements, # DYNAMIC: {emp_type: {product: team_size}} from Kits_Calculation.csv | |
| get_payment_mode_config, # DYNAMIC: {shift: 'bulk'/'partial'} payment mode configuration | |
| KIT_LINE_MATCH_DICT, | |
| EVENING_SHIFT_MODE, | |
| EVENING_SHIFT_DEMAND_THRESHOLD, | |
| # Hierarchy variables for production ordering | |
| KIT_LEVELS, # {kit_id: level} where 0=prepack, 1=subkit, 2=master | |
| KIT_DEPENDENCIES, # {kit_id: [dependency_list]} | |
| PRODUCTION_PRIORITY_ORDER, # [kit_ids] sorted by production priority | |
| # Fixed staffing requirements | |
| get_fixed_min_unicef_per_day, # DYNAMIC: Minimum UNICEF employees required per day | |
| ) | |
| # 2) kit_line_match | |
| KIT_LINE_MATCH_DICT | |
| print("KIT_LINE_MATCH_DICT",KIT_LINE_MATCH_DICT) | |
| # 3) If specific product is not produced on specific date, set it to 0 | |
| # ACTIVE will be built dynamically in solve function based on fresh PRODUCT_LIST | |
| # Example: ACTIVE[2]['C'] = 0 # Disable product C on day 2 | |
| def build_lines(): | |
| """List of line instances. | |
| line_tuples elements are (line_type_id, idx) tuples. e.g., (6,1), (6,2), (7,1), ... | |
| """ | |
| line_tuples = [] | |
| LINE_LIST = get_line_list() # Dynamic call | |
| LINE_CNT_PER_TYPE = get_line_cnt_per_type() # Dynamic call | |
| for lt in LINE_LIST: # lt: 6 or 7 | |
| cnt = int(LINE_CNT_PER_TYPE.get(lt, 0)) | |
| for i in range(1, cnt + 1): | |
| line_tuples.append((lt, i)) | |
| return line_tuples | |
| # DISABLED: Module-level initialization was causing infinite loops | |
| # These variables are now created dynamically when needed | |
| # line_tuples=build_lines() | |
| # print("line_tuples",line_tuples) | |
| # PER_PRODUCT_SPEED = extract.read_package_speed_data() # Dynamic call | |
| # print("PER_PRODUCT_SPEED",PER_PRODUCT_SPEED) | |
| def sort_products_by_hierarchy(product_list): | |
| """ | |
| Sort products by hierarchy levels and dependencies using topological sorting. | |
| Returns products in optimal production order: prepacks β subkits β masters | |
| Dependencies within the same level are properly ordered. | |
| """ | |
| from collections import defaultdict, deque | |
| # Filter products that are in our production list and have hierarchy data | |
| products_with_hierarchy = [p for p in product_list if p in KIT_LEVELS] | |
| products_without_hierarchy = [p for p in product_list if p not in KIT_LEVELS] | |
| if products_without_hierarchy: | |
| print(f"[HIERARCHY] Products without hierarchy data: {products_without_hierarchy}") | |
| # Build dependency graph for products in our list | |
| graph = defaultdict(list) # product -> [dependents] | |
| in_degree = defaultdict(int) # product -> number of dependencies | |
| # Initialize all products | |
| for product in products_with_hierarchy: | |
| in_degree[product] = 0 | |
| # Build edges based on actual dependencies | |
| # KIT_DEPENDENCIES = {product: [dependencies]} - "What does THIS product need?" | |
| # graph = {dependency: [products]} - "What depends on THIS dependency?" | |
| # | |
| # Example transformation: | |
| # KIT_DEPENDENCIES = {'subkit_A': ['prepack_1'], 'master_B': ['subkit_A']} | |
| # After building: graph = {'prepack_1': ['subkit_A'], 'subkit_A': ['master_B']} | |
| # This means: prepack_1 is needed by subkit_A, subkit_A is needed by master_B | |
| # | |
| # Example: | |
| # 1. product='subkit_A', deps=['prepack_1'] | |
| # β graph['prepack_1'].append('subkit_A') | |
| # β graph = {'prepack_1': ['subkit_A']} | |
| # 2. product='master_B', deps=['subkit_A'] | |
| # β graph['subkit_A'].append('master_B') | |
| # β graph = {'prepack_1': ['subkit_A'], 'subkit_A': ['master_B']} | |
| for product in products_with_hierarchy: | |
| deps = KIT_DEPENDENCIES.get(product, []) #dependencies = products that has to be packed first | |
| for dep in deps: | |
| if dep in products_with_hierarchy: # Only if dependency is in our production list | |
| # REVERSE THE RELATIONSHIP: | |
| # KIT_DEPENDENCIES says: "product needs dep" | |
| # graph says: "dep is needed by product" | |
| graph[dep].append(product) # dep -> product (reverse the relationship!) | |
| in_degree[product] += 1 | |
| # Topological sort with hierarchy level priority | |
| sorted_products = [] | |
| #queue = able to remove from both sides | |
| queue = deque() | |
| # Start with products that have no dependencies | |
| for product in products_with_hierarchy: | |
| if in_degree[product] == 0: | |
| queue.append(product) | |
| while queue: | |
| current = queue.popleft() | |
| sorted_products.append(current) | |
| # Process dependents - sort by hierarchy level first | |
| for dependent in sorted(graph[current], key=lambda p: (KIT_LEVELS.get(p, 999), p)): | |
| in_degree[dependent] -= 1 #decrement the in_degree of the dependent | |
| if in_degree[dependent] == 0: #if the in_degree of the dependent is 0, add it to the queue so that it can be processed | |
| queue.append(dependent) | |
| # Check for cycles (shouldn't happen with proper hierarchy) | |
| if len(sorted_products) != len(products_with_hierarchy): | |
| remaining = [p for p in products_with_hierarchy if p not in sorted_products] | |
| print(f"[HIERARCHY] WARNING: Potential circular dependencies detected in: {remaining}") | |
| # Add remaining products sorted by level as fallback | |
| remaining_sorted = sorted(remaining, key=lambda p: (KIT_LEVELS.get(p, 999), p)) | |
| sorted_products.extend(remaining_sorted) | |
| # Add products without hierarchy information at the end | |
| sorted_products.extend(sorted(products_without_hierarchy)) | |
| print(f"[HIERARCHY] Dependency-aware production order: {len(sorted_products)} products") | |
| for i, p in enumerate(sorted_products[:10]): # Show first 10 | |
| level = KIT_LEVELS.get(p, "unknown") | |
| level_name = KitLevel.get_name(level) | |
| deps = KIT_DEPENDENCIES.get(p, []) | |
| deps_in_list = [d for d in deps if d in products_with_hierarchy] | |
| print(f" {i+1}. {p} (level {level}={level_name}, deps: {len(deps_in_list)})") | |
| if deps_in_list: | |
| print(f" Dependencies: {deps_in_list}") | |
| if len(sorted_products) > 10: | |
| print(f" ... and {len(sorted_products) - 10} more products") | |
| return sorted_products | |
| # Removed get_dependency_timing_weight function - no longer needed | |
| # Dependency ordering is now handled by topological sorting in sort_products_by_hierarchy() | |
| def run_optimization_for_week(): | |
| # *** CRITICAL: Load fresh data to reflect current Streamlit configs *** | |
| print("\n" + "="*60) | |
| print("π LOADING FRESH DATA FOR OPTIMIZATION") | |
| print("="*60) | |
| # Get fresh product list and demand data | |
| PRODUCT_LIST = get_product_list() | |
| DEMAND_DICTIONARY = get_demand_dictionary() | |
| TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST) | |
| print(f"π¦ LOADED PRODUCTS: {len(PRODUCT_LIST)} products") | |
| print(f"π LOADED DEMAND: {sum(DEMAND_DICTIONARY.values())} total units") | |
| print(f"π₯ LOADED TEAM REQUIREMENTS: {len(TEAM_REQ_PER_PRODUCT)} employee types") | |
| # Build ACTIVE schedule for fresh product list | |
| ACTIVE = {t: {p: 1 for p in PRODUCT_LIST} for t in DATE_SPAN} | |
| # --- Sets --- | |
| date_span_list = list(DATE_SPAN) | |
| # print("date_span_list",date_span_list) | |
| active_shift_list = sorted(list(get_active_shift_list())) # Dynamic call | |
| employee_type_list = list(get_employee_type_list()) # Dynamic call - e.g., ['UNICEF Fixed term','Humanizer'] | |
| print("employee_type_list",employee_type_list) | |
| # *** HIERARCHY SORTING: Sort products by production priority *** | |
| print("\n" + "="*60) | |
| print("π APPLYING HIERARCHY-BASED PRODUCTION ORDERING") | |
| print("="*60) | |
| sorted_product_list = sort_products_by_hierarchy(list(PRODUCT_LIST)) | |
| line_tuples = build_lines() | |
| print("Lines",line_tuples) | |
| # Load product speed data dynamically | |
| # Import extract module for direct access to data functions | |
| from src.preprocess import extract | |
| PER_PRODUCT_SPEED = extract.read_package_speed_data() | |
| print("PER_PRODUCT_SPEED",PER_PRODUCT_SPEED) | |
| # --- Short aliases for parameters --- | |
| Hmax_s = dict(get_max_hour_per_shift_per_person()) # Dynamic call - per-shift hours | |
| Hmax_daily = MAX_HOUR_PER_PERSON_PER_DAY # {6:cap, 7:cap} | |
| max_workers_line = dict(get_max_parallel_workers()) # Dynamic call - per line type | |
| max_employee_type_day = get_max_employee_per_type_on_day() # Dynamic call - {emp_type:{t:headcount}} | |
| cost = get_cost_list_per_emp_shift() # Dynamic call - {emp_type:{shift:cost}} | |
| # --- Feasibility quick checks --- | |
| # 1) If team size is greater than max_workers_line, block the product-line type combination | |
| for p in sorted_product_list: | |
| req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) | |
| lt = KIT_LINE_MATCH_DICT.get(p, 6) # Default to long line (6) if not found | |
| if p not in KIT_LINE_MATCH_DICT: | |
| print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)") | |
| if req_total > max_workers_line.get(lt, 1e9): | |
| print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] " | |
| f"= {max_workers_line.get(lt)}. Blocked.") | |
| # 2) Check if demand can be met without evening shift (only if in normal mode) | |
| if EVENING_SHIFT_MODE == "normal": | |
| total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in sorted_product_list) | |
| # Calculate maximum capacity with regular + overtime shifts only | |
| regular_overtime_shifts = [s for s in active_shift_list if s in ShiftType.REGULAR_AND_OVERTIME] | |
| max_capacity = 0 | |
| for p in sorted_product_list: | |
| if p in PER_PRODUCT_SPEED: | |
| product_speed = PER_PRODUCT_SPEED[p] # units per hour | |
| # Calculate max hours available for this product across all lines and shifts | |
| max_hours_per_product = 0 | |
| for ell in line_tuples: | |
| for s in regular_overtime_shifts: | |
| for t in date_span_list: | |
| max_hours_per_product += Hmax_s[s] | |
| max_capacity += product_speed * max_hours_per_product | |
| capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf') | |
| print(f"[CAPACITY CHECK] Total demand: {total_demand}") | |
| print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}") | |
| print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}") | |
| if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD: | |
| print(f"\nπ¨ [ALERT] DEMAND TOO HIGH!") | |
| print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand") | |
| print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%") | |
| print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift") | |
| print(f" This will add shift 3 to increase capacity\n") | |
| # --- Solver --- | |
| solver = pywraplp.Solver.CreateSolver('CBC') | |
| if not solver: | |
| raise RuntimeError("CBC solver not found.") | |
| INF = solver.infinity() | |
| # --- Variables --- | |
| # Assignment[p,ell,s,t] β {0,1}: 1 if product p runs on (line,shift,day) | |
| Assignment, Hours, Units = {}, {}, {} # Hours: run hours, Units: production units | |
| for p in sorted_product_list: | |
| for ell in line_tuples: # ell = (line_type_id, idx) | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| #Is product p assigned to run on line ell, during shift s, on day t? | |
| Assignment[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") | |
| #How many hours does product p run on line ell, during shift s, on day t? | |
| Hours[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") | |
| #How many units does product p run on line ell, during shift s, on day t? | |
| Units[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") | |
| # Note: IDLE variables removed - we only track employees actually working on production | |
| # Load fixed minimum UNICEF requirement (needed for EMPLOYEE_COUNT variable creation) | |
| FIXED_MIN_UNICEF_PER_DAY = get_fixed_min_unicef_per_day() # Dynamic call | |
| # Variable to track actual number of employees of each type working each shift each day | |
| # This represents how many distinct employees of type e are working in shift s on day t | |
| EMPLOYEE_COUNT = {} | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| # Note: Minimum staffing is per day, not per shift | |
| # We'll handle the daily minimum constraint separately | |
| max_count = max_employee_type_day.get(e, {}).get(t, 100) | |
| EMPLOYEE_COUNT[e, s, t] = solver.IntVar( | |
| 0, # No minimum per shift (daily minimum handled separately) | |
| max_count, | |
| f"EmpCount_{e}_s{s}_day{t}" | |
| ) | |
| # Track total person-hours worked by each employee type per shift per day | |
| # This is needed for employee-centric wage calculation | |
| EMPLOYEE_HOURS = {} | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| # Sum of all work hours for employee type e in shift s on day t | |
| # This represents total person-hours (e.g., 5 employees Γ 8 hours = 40 person-hours) | |
| EMPLOYEE_HOURS[e, s, t] = solver.Sum( | |
| TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] | |
| for p in sorted_product_list | |
| for ell in line_tuples | |
| ) | |
| # Note: Binary variables for bulk payment are now created inline in the cost calculation | |
| # --- Objective: Minimize total labor cost (wages) --- | |
| # Employee-centric approach: calculate wages based on actual employees and their hours | |
| PAYMENT_MODE_CONFIG = get_payment_mode_config() # Dynamic call | |
| print(f"Payment mode configuration: {PAYMENT_MODE_CONFIG}") | |
| # Build cost terms based on payment mode | |
| cost_terms = [] | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") # Default to partial if not specified | |
| if payment_mode == "partial": | |
| # Partial payment: pay for actual person-hours worked | |
| # Cost = hourly_rate Γ total_person_hours | |
| # Example: $20/hr Γ 40 person-hours = $800 | |
| cost_terms.append(cost[e][s] * EMPLOYEE_HOURS[e, s, t]) | |
| elif payment_mode == "bulk": | |
| # Bulk payment: if ANY work happens in shift, pay ALL working employees for FULL shift | |
| # We need to know: did employee type e work at all in shift s on day t? | |
| # Create binary: 1 if employee type e worked in this shift | |
| work_in_shift = solver.BoolVar(f"work_{e}_s{s}_d{t}") | |
| # Link binary to work hours | |
| # If EMPLOYEE_HOURS > 0, then work_in_shift = 1 | |
| # If EMPLOYEE_HOURS = 0, then work_in_shift = 0 | |
| max_possible_hours = Hmax_s[s] * max_employee_type_day[e][t] | |
| solver.Add(EMPLOYEE_HOURS[e, s, t] <= max_possible_hours * work_in_shift) | |
| solver.Add(work_in_shift * 0.001 <= EMPLOYEE_HOURS[e, s, t]) | |
| # Calculate number of employees working in this shift | |
| # This is approximately: ceil(EMPLOYEE_HOURS / Hmax_s[s]) | |
| # But we can use: employees_working_in_shift | |
| # For simplicity, use EMPLOYEE_HOURS / Hmax_s[s] as continuous approximation | |
| # Or better: create a variable for employees per shift | |
| # Simpler approach: For bulk payment, assume if work happens, | |
| # we need approximately EMPLOYEE_HOURS/Hmax_s[s] employees, | |
| # and each gets paid for full shift | |
| # Cost β (EMPLOYEE_HOURS / Hmax_s[s]) Γ Hmax_s[s] Γ hourly_rate = EMPLOYEE_HOURS Γ hourly_rate | |
| # But that's the same as partial! The difference is we round up employees. | |
| # Better approach: Create variable for employees working in this specific shift | |
| employees_in_shift = solver.IntVar(0, max_employee_type_day[e][t], f"emp_{e}_s{s}_d{t}") | |
| # Link employees_in_shift to work requirements | |
| # If EMPLOYEE_HOURS requires N employees, then employees_in_shift >= ceil(N) | |
| solver.Add(employees_in_shift * Hmax_s[s] >= EMPLOYEE_HOURS[e, s, t]) | |
| # Cost: pay each employee for full shift | |
| cost_terms.append(cost[e][s] * Hmax_s[s] * employees_in_shift) | |
| # Note: No idle employee costs - only pay for employees actually working | |
| total_cost = solver.Sum(cost_terms) | |
| # Objective: minimize total labor cost (wages) | |
| # This finds the optimal production schedule (product order, line assignment, timing) | |
| # that minimizes total wages while meeting all demand and capacity constraints | |
| solver.Minimize(total_cost) | |
| # --- Constraints --- | |
| # 1) Weekly demand - must meet exactly (no over/under production) | |
| for p in sorted_product_list: | |
| total_production = solver.Sum(Units[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list) | |
| demand = DEMAND_DICTIONARY.get(p, 0) | |
| # Must produce at least the demand | |
| solver.Add(total_production >= demand) | |
| # Must not produce more than the demand (prevent overproduction) | |
| solver.Add(total_production <= demand) | |
| # 2) One product per (line,shift,day) + time gating | |
| for ell in line_tuples: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| solver.Add(solver.Sum(Assignment[p, ell, s, t] for p in sorted_product_list) <= 1) | |
| for p in sorted_product_list: | |
| solver.Add(Hours[p, ell, s, t] <= Hmax_s[s] * Assignment[p, ell, s, t]) | |
| # 3) Product-line type compatibility + (optional) activity by day | |
| for p in sorted_product_list: | |
| req_lt = KIT_LINE_MATCH_DICT.get(p, LineType.LONG_LINE) # Default to long line if not found | |
| req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) | |
| for ell in line_tuples: | |
| allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9)) | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| if ACTIVE[t][p] == 0 or not allowed: | |
| solver.Add(Assignment[p, ell, s, t] == 0) | |
| solver.Add(Hours[p, ell, s, t] == 0) | |
| solver.Add(Units[p, ell, s, t] == 0) | |
| # 4) Line throughput: Units β€ product_speed * Hours | |
| for p in sorted_product_list: | |
| for ell in line_tuples: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| # Get product speed (same speed regardless of line type) | |
| if p in PER_PRODUCT_SPEED: | |
| # Convert kit per day to kit per hour (assuming 7.5 hour workday) | |
| speed = PER_PRODUCT_SPEED[p] | |
| # Upper bound: units cannot exceed capacity | |
| solver.Add( | |
| Units[p, ell, s, t] <= speed * Hours[p, ell, s, t] | |
| ) | |
| # Lower bound: if working, must produce (prevent phantom work) | |
| solver.Add( | |
| Units[p, ell, s, t] >= speed * Hours[p, ell, s, t] | |
| ) | |
| else: | |
| # Default speed if not found | |
| default_speed = 800 / 7.5 # units per hour | |
| print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour") | |
| # Upper bound: units cannot exceed capacity | |
| solver.Add( | |
| Units[p, ell, s, t] <= default_speed * Hours[p, ell, s, t] | |
| ) | |
| # Lower bound: if working, must produce (prevent phantom work) | |
| solver.Add( | |
| Units[p, ell, s, t] >= default_speed * Hours[p, ell, s, t] | |
| ) | |
| # Working hours constraint: active employees cannot exceed shift hour capacity | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| # No idle employee constraints - employees are only counted when working | |
| solver.Add( | |
| solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples) | |
| <= Hmax_s[s] * max_employee_type_day[e][t] | |
| ) | |
| # 6) Per-shift staffing capacity by type: link employee count to actual work hours | |
| # This constraint ensures EMPLOYEE_COUNT[e,s,t] represents the actual number of employees needed in each shift | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| # Total person-hours worked by employee type e in shift s on day t | |
| total_person_hours_in_shift = solver.Sum( | |
| TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] | |
| for p in sorted_product_list | |
| for ell in line_tuples | |
| ) | |
| # Employee count must be sufficient to cover the work in this shift | |
| # If employees work H person-hours total and each can work max M hours/shift, | |
| # then we need at least ceil(H/M) employees | |
| # Constraint: employee_count Γ max_hours_per_shift >= total_person_hours_in_shift | |
| solver.Add(EMPLOYEE_COUNT[e, s, t] * Hmax_s[s] >= total_person_hours_in_shift) | |
| # 7) Shift ordering constraints (only apply if shifts are available) | |
| # Evening shift after regular shift | |
| if ShiftType.EVENING in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available | |
| for e in employee_type_list: | |
| for t in date_span_list: | |
| solver.Add( | |
| solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.EVENING, t] for p in sorted_product_list for ell in line_tuples) | |
| <= | |
| solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] for p in sorted_product_list for ell in line_tuples) | |
| ) | |
| # Overtime should only be used when regular shift is at capacity | |
| if ShiftType.OVERTIME in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available | |
| print("\n[OVERTIME] Adding constraints to ensure overtime only when regular shift is insufficient...") | |
| for e in employee_type_list: | |
| for t in date_span_list: | |
| # Get available regular capacity for this employee type and day | |
| regular_capacity = max_employee_type_day[e][t] | |
| # Total regular shift usage for this employee type and day | |
| regular_usage = solver.Sum( | |
| TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] | |
| for p in sorted_product_list for ell in line_tuples | |
| ) | |
| # Total overtime usage for this employee type and day | |
| overtime_usage = solver.Sum( | |
| TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.OVERTIME, t] | |
| for p in sorted_product_list for ell in line_tuples | |
| ) | |
| # Create binary variable: 1 if using overtime, 0 otherwise | |
| using_overtime = solver.IntVar(0, 1, f'using_overtime_{e}_{t}') | |
| # If using overtime, regular capacity must be utilized significantly | |
| # Regular usage must be at least 90% of capacity to allow overtime | |
| min_regular_for_overtime = int(0.9 * regular_capacity) | |
| # Constraint 1: Can only use overtime if regular usage is high | |
| solver.Add(regular_usage >= min_regular_for_overtime * using_overtime) | |
| # Constraint 2: If any overtime is used, set the binary variable | |
| solver.Add(overtime_usage <= regular_capacity * using_overtime) | |
| overtime_constraints_added = len(employee_type_list) * len(date_span_list) * 2 # 2 constraints per employee type per day | |
| print(f"[OVERTIME] Added {overtime_constraints_added} constraints ensuring overtime only when regular shifts are at 90%+ capacity") | |
| # 7.5) Bulk payment linking constraints are now handled inline in the cost calculation | |
| # 7.6) *** FIXED MINIMUM UNICEF EMPLOYEES CONSTRAINT *** | |
| # Ensure minimum UNICEF fixed-term staff work in the REGULAR shift every day | |
| # The minimum applies to the regular shift specifically (not overtime or evening) | |
| if 'UNICEF Fixed term' in employee_type_list and FIXED_MIN_UNICEF_PER_DAY > 0: | |
| if ShiftType.REGULAR in active_shift_list: | |
| print(f"\n[FIXED STAFFING] Adding constraint for minimum {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in REGULAR shift per day...") | |
| for t in date_span_list: | |
| # At least FIXED_MIN_UNICEF_PER_DAY employees must work in the regular shift each day | |
| solver.Add( | |
| EMPLOYEE_COUNT['UNICEF Fixed term', ShiftType.REGULAR, t] >= FIXED_MIN_UNICEF_PER_DAY | |
| ) | |
| print(f"[FIXED STAFFING] Added {len(date_span_list)} constraints ensuring >= {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in regular shift per day") | |
| else: | |
| print(f"\n[FIXED STAFFING] Warning: Regular shift not available, cannot enforce minimum UNICEF staffing") | |
| # 8) *** HIERARCHY DEPENDENCY CONSTRAINTS *** | |
| # For subkits with prepack dependencies: dependencies should be produced before or same time | |
| print("\n[HIERARCHY] Adding dependency constraints...") | |
| dependency_constraints_added = 0 | |
| for p in sorted_product_list: | |
| dependencies = KIT_DEPENDENCIES.get(p, []) | |
| if dependencies: | |
| # Get the level of the current product | |
| p_level = KIT_LEVELS.get(p, 2) | |
| for dep in dependencies: | |
| if dep in sorted_product_list: # Only if dependency is also in production list | |
| # Calculate "completion time" for each product (sum of all production times) | |
| p_completion = solver.Sum( | |
| t * Hours[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list | |
| ) | |
| dep_completion = solver.Sum( | |
| t * Hours[dep, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list | |
| ) | |
| # Dependency should complete before or at the same time | |
| solver.Add(dep_completion <= p_completion) | |
| dependency_constraints_added += 1 | |
| print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})") | |
| print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints") | |
| # --- Solve --- | |
| status = solver.Solve() | |
| if status != pywraplp.Solver.OPTIMAL: | |
| status_names = {pywraplp.Solver.INFEASIBLE: "INFEASIBLE", pywraplp.Solver.UNBOUNDED: "UNBOUNDED"} | |
| print(f"No optimal solution. Status: {status} ({status_names.get(status, 'UNKNOWN')})") | |
| # Debug hint: | |
| # solver.EnableOutput() | |
| # solver.ExportModelAsLpFile("model.lp") | |
| return None | |
| # --- Report --- | |
| result = {} | |
| result['objective'] = solver.Objective().Value() | |
| # Weekly production | |
| prod_week = {p: sum(Units[p, ell, s, t].solution_value() for ell in line_tuples for s in active_shift_list for t in date_span_list) for p in sorted_product_list} | |
| result['weekly_production'] = prod_week | |
| # Which product ran on which line/shift/day | |
| schedule = [] | |
| for t in date_span_list: | |
| for ell in line_tuples: | |
| for s in active_shift_list: | |
| chosen = [p for p in sorted_product_list if Assignment[p, ell, s, t].solution_value() > 0.5] | |
| if chosen: | |
| p = chosen[0] | |
| schedule.append({ | |
| 'day': t, | |
| 'line_type_id': ell[0], | |
| 'line_idx': ell[1], | |
| 'shift': s, | |
| 'product': p, | |
| 'run_hours': Hours[p, ell, s, t].solution_value(), | |
| 'units': Units[p, ell, s, t].solution_value(), | |
| }) | |
| result['run_schedule'] = schedule | |
| # Implied headcount by type/shift/day (ceil) | |
| headcount = [] | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for p in sorted_product_list for ell in line_tuples) | |
| need = ceil(used_ph / (Hmax_s[s] + 1e-9)) | |
| headcount.append({'emp_type': e, 'shift': s, 'day': t, | |
| 'needed': need, 'available': max_employee_type_day[e][t]}) | |
| result['headcount_per_shift'] = headcount | |
| # Total person-hours by type/day (β€ 14h * headcount) | |
| ph_by_day = [] | |
| for e in employee_type_list: | |
| for t in date_span_list: | |
| used = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for s in active_shift_list for p in sorted_product_list for ell in line_tuples) | |
| ph_by_day.append({'emp_type': e, 'day': t, | |
| 'used_person_hours': used, | |
| 'cap_person_hours': Hmax_daily * max_employee_type_day[e][t]}) | |
| result['person_hours_by_day'] = ph_by_day | |
| # Actual employee count per type/shift/day (from EMPLOYEE_COUNT variable) | |
| employee_count_by_shift = [] | |
| for e in employee_type_list: | |
| for s in active_shift_list: | |
| for t in date_span_list: | |
| count = int(EMPLOYEE_COUNT[e, s, t].solution_value()) | |
| used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() | |
| for p in sorted_product_list for ell in line_tuples) | |
| avg_hours_per_employee = used_hours / count if count > 0 else 0 | |
| if count > 0: # Only add entries where employees are working | |
| employee_count_by_shift.append({ | |
| 'emp_type': e, | |
| 'shift': s, | |
| 'day': t, | |
| 'employee_count': count, | |
| 'total_person_hours': used_hours, | |
| 'avg_hours_per_employee': avg_hours_per_employee, | |
| 'available': max_employee_type_day[e][t] | |
| }) | |
| result['employee_count_by_shift'] = employee_count_by_shift | |
| # Also calculate daily totals (summing across shifts) | |
| employee_count_by_day = [] | |
| for e in employee_type_list: | |
| for t in date_span_list: | |
| # Sum employees across all shifts for this day | |
| total_count = sum(int(EMPLOYEE_COUNT[e, s, t].solution_value()) for s in active_shift_list) | |
| used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() | |
| for s in active_shift_list for p in sorted_product_list for ell in line_tuples) | |
| avg_hours_per_employee = used_hours / total_count if total_count > 0 else 0 | |
| if total_count > 0: # Only add days where employees are working | |
| employee_count_by_day.append({ | |
| 'emp_type': e, | |
| 'day': t, | |
| 'employee_count': total_count, | |
| 'total_person_hours': used_hours, | |
| 'avg_hours_per_employee': avg_hours_per_employee, | |
| 'available': max_employee_type_day[e][t] | |
| }) | |
| result['employee_count_by_day'] = employee_count_by_day | |
| # Note: Idle employee tracking removed - only counting employees actually working | |
| # Pretty print | |
| print("Objective (min cost):", result['objective']) | |
| print("\n--- Weekly production by product ---") | |
| for p, u in prod_week.items(): | |
| print(f"{p}: {u:.1f} / demand {DEMAND_DICTIONARY.get(p,0)}") | |
| print("\n--- Schedule (line, shift, day) ---") | |
| for row in schedule: | |
| shift_name = ShiftType.get_name(row['shift']) | |
| line_name = LineType.get_name(row['line_type_id']) | |
| print(f"date_span_list{row['day']} {line_name}-{row['line_idx']} {shift_name}: " | |
| f"{row['product']} Hours={row['run_hours']:.2f}h Units={row['units']:.1f}") | |
| print("\n--- Implied headcount need (per type/shift/day) ---") | |
| for row in headcount: | |
| shift_name = ShiftType.get_name(row['shift']) | |
| print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " | |
| f"need={row['needed']} (avail {row['available']})") | |
| print("\n--- Total person-hours by type/day ---") | |
| for row in ph_by_day: | |
| print(f"{row['emp_type']}, date_span_list{row['day']}: used={row['used_person_hours']:.1f} " | |
| f"(cap {row['cap_person_hours']})") | |
| print("\n--- Actual employee count by type/shift/day ---") | |
| for row in employee_count_by_shift: | |
| shift_name = ShiftType.get_name(row['shift']) | |
| print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " | |
| f"count={row['employee_count']} employees, " | |
| f"total_hours={row['total_person_hours']:.1f}h, " | |
| f"avg={row['avg_hours_per_employee']:.1f}h/employee") | |
| print("\n--- Daily employee totals by type/day (sum across shifts) ---") | |
| for row in employee_count_by_day: | |
| print(f"{row['emp_type']}, date_span_list{row['day']}: " | |
| f"count={row['employee_count']} employees total, " | |
| f"total_hours={row['total_person_hours']:.1f}h, " | |
| f"avg={row['avg_hours_per_employee']:.1f}h/employee " | |
| f"(available: {row['available']})") | |
| # Note: Idle employee reporting removed - only tracking employees actually working | |
| return result | |
| if __name__ == "__main__": | |
| run_optimization_for_week() |