| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from ortools.linear_solver import pywraplp |
| | from math import ceil |
| |
|
| | |
| | from src.config.optimization_config import ( |
| | DATE_SPAN, |
| | PRODUCT_LIST, |
| | EMPLOYEE_TYPE_LIST, |
| | SHIFT_LIST, |
| | LINE_LIST, |
| | LINE_CNT_PER_TYPE, |
| | DEMAND_DICTIONARY, |
| | COST_LIST_PER_EMP_SHIFT, |
| | MAX_EMPLOYEE_PER_TYPE_ON_DAY, |
| | MAX_HOUR_PER_PERSON_PER_DAY, |
| | MAX_HOUR_PER_SHIFT_PER_PERSON, |
| | PER_PRODUCT_SPEED, |
| | MAX_PARALLEL_WORKERS, |
| | DAILY_WEEKLY_SCHEDULE, |
| | FIXED_STAFF_CONSTRAINT_MODE, |
| | TEAM_REQ_PER_PRODUCT, |
| | PAYMENT_MODE_CONFIG, |
| | KIT_LINE_MATCH_DICT, |
| | EVENING_SHIFT_MODE, |
| | EVENING_SHIFT_DEMAND_THRESHOLD, |
| | |
| | KIT_LEVELS, |
| | KIT_DEPENDENCIES, |
| | PRODUCTION_PRIORITY_ORDER, |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| | KIT_LINE_MATCH_DICT |
| | print("KIT_LINE_MATCH_DICT",KIT_LINE_MATCH_DICT) |
| |
|
| | |
| | ACTIVE = {t: {p: 1 for p in PRODUCT_LIST} for t in DATE_SPAN} |
| | |
| |
|
| |
|
| | def build_lines(): |
| | """List of line instances. |
| | L elements are (line_type_id, idx) tuples. e.g., (6,1), (6,2), (7,1), ... |
| | """ |
| | L = [] |
| | for lt in LINE_LIST: |
| | cnt = int(LINE_CNT_PER_TYPE.get(lt, 0)) |
| | for i in range(1, cnt + 1): |
| | L.append((lt, i)) |
| | return L |
| |
|
| | L=build_lines() |
| | print("L",L) |
| | print("PER_PRODUCT_SPEED",PER_PRODUCT_SPEED) |
| |
|
| | def sort_products_by_hierarchy(product_list): |
| | """ |
| | Sort products by hierarchy levels and dependencies. |
| | Returns products in optimal production order: prepacks → subkits → masters |
| | """ |
| | |
| | products_with_hierarchy = [p for p in product_list if p in KIT_LEVELS] |
| | products_without_hierarchy = [p for p in product_list if p not in KIT_LEVELS] |
| | |
| | if products_without_hierarchy: |
| | print(f"[HIERARCHY] Products without hierarchy data: {products_without_hierarchy}") |
| | |
| | |
| | sorted_products = sorted(products_with_hierarchy, |
| | key=lambda p: (KIT_LEVELS.get(p, 999), p)) |
| | |
| | |
| | sorted_products.extend(sorted(products_without_hierarchy)) |
| | |
| | print(f"[HIERARCHY] Production order: {len(sorted_products)} products") |
| | for i, p in enumerate(sorted_products[:10]): |
| | level = KIT_LEVELS.get(p, "unknown") |
| | level_name = {0: "prepack", 1: "subkit", 2: "master"}.get(level, "unknown") |
| | deps = KIT_DEPENDENCIES.get(p, []) |
| | print(f" {i+1}. {p} (level {level}={level_name}, deps: {len(deps)})") |
| | |
| | if len(sorted_products) > 10: |
| | print(f" ... and {len(sorted_products) - 10} more products") |
| | |
| | return sorted_products |
| |
|
| | def get_dependency_timing_weight(product): |
| | """ |
| | Calculate timing weight based on hierarchy level. |
| | Lower levels (prepacks) should be produced earlier. |
| | """ |
| | level = KIT_LEVELS.get(product, 2) |
| | |
| | weights = {0: 0.1, 1: 0.5, 2: 1.0} |
| | return weights.get(level, 1.0) |
| |
|
| | def solve_fixed_team_weekly(): |
| | |
| | D = list(DATE_SPAN) |
| | S = sorted(list(SHIFT_LIST)) |
| | E = list(EMPLOYEE_TYPE_LIST) |
| | |
| | |
| | print("\n" + "="*60) |
| | print("🔗 APPLYING HIERARCHY-BASED PRODUCTION ORDERING") |
| | print("="*60) |
| | P_sorted = sort_products_by_hierarchy(list(PRODUCT_LIST)) |
| | P = P_sorted |
| | |
| | L = build_lines() |
| |
|
| | |
| | Hmax_s = dict(MAX_HOUR_PER_SHIFT_PER_PERSON) |
| | Hmax_daily = MAX_HOUR_PER_PERSON_PER_DAY |
| | max_workers_line = dict(MAX_PARALLEL_WORKERS) |
| | N_day = MAX_EMPLOYEE_PER_TYPE_ON_DAY |
| | cost = COST_LIST_PER_EMP_SHIFT |
| | d_week = DEMAND_DICTIONARY |
| |
|
| | |
| | |
| | |
| | for p in P: |
| | req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E) |
| | lt = KIT_LINE_MATCH_DICT.get(p, 6) |
| | if p not in KIT_LINE_MATCH_DICT: |
| | print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)") |
| | if req_total > max_workers_line.get(lt, 1e9): |
| | print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] " |
| | f"= {max_workers_line.get(lt)}. Blocked.") |
| |
|
| | |
| | if EVENING_SHIFT_MODE == "normal": |
| | total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in P) |
| | |
| | |
| | regular_overtime_shifts = [s for s in S if s in [1, 2]] |
| | max_capacity = 0 |
| | |
| | for p in P: |
| | if p in PER_PRODUCT_SPEED: |
| | product_speed = PER_PRODUCT_SPEED[p] |
| | |
| | max_hours_per_product = 0 |
| | for ell in L: |
| | for s in regular_overtime_shifts: |
| | for t in D: |
| | max_hours_per_product += Hmax_s[s] |
| | |
| | max_capacity += product_speed * max_hours_per_product |
| | |
| | capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf') |
| | |
| | print(f"[CAPACITY CHECK] Total demand: {total_demand}") |
| | print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}") |
| | print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}") |
| | |
| | if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD: |
| | print(f"\n🚨 [ALERT] DEMAND TOO HIGH!") |
| | print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand") |
| | print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%") |
| | print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift") |
| | print(f" This will add shift 3 to increase capacity\n") |
| |
|
| | |
| | if DAILY_WEEKLY_SCHEDULE.lower() == "daily": |
| | print("[INFO] DAILY_WEEKLY_SCHEDULE='daily' but this model only enforces weekly demand. " |
| | "If daily demand decomposition is needed, please let us know.") |
| |
|
| | |
| | solver = pywraplp.Solver.CreateSolver('CBC') |
| | if not solver: |
| | raise RuntimeError("CBC solver not found.") |
| | INF = solver.infinity() |
| |
|
| | |
| | |
| | Z, T, U = {}, {}, {} |
| | for p in P: |
| | for ell in L: |
| | for s in S: |
| | for t in D: |
| | Z[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
| | T[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
| | U[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
| |
|
| | |
| |
|
| | |
| | print(f"Payment mode configuration: {PAYMENT_MODE_CONFIG}") |
| | |
| | |
| | cost_terms = [] |
| | |
| | for e in E: |
| | for s in S: |
| | payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") |
| | |
| | if payment_mode == "partial": |
| | |
| | for p in P: |
| | for ell in L: |
| | for t in D: |
| | cost_terms.append(cost[e][s] * TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t]) |
| | |
| | elif payment_mode == "bulk": |
| | |
| | |
| | for p in P: |
| | for ell in L: |
| | for t in D: |
| | |
| | |
| | |
| | |
| | |
| | |
| | work_binary = solver.BoolVar(f"work_{e}_s{s}_{p}_{ell[0]}{ell[1]}_d{t}") |
| | |
| | |
| | solver.Add(T[p, ell, s, t] <= Hmax_s[s] * work_binary) |
| | solver.Add(work_binary * 0.001 <= T[p, ell, s, t]) |
| | |
| | |
| | cost_terms.append(cost[e][s] * Hmax_s[s] * TEAM_REQ_PER_PRODUCT[e][p] * work_binary) |
| | |
| | total_cost = solver.Sum(cost_terms) |
| | |
| | |
| | |
| | hierarchy_penalty = solver.Sum( |
| | 0.01 * get_dependency_timing_weight(p) * t * T[p, ell, s, t] |
| | for p in P for ell in L for s in S for t in D |
| | ) |
| | |
| | total_objective = total_cost + hierarchy_penalty |
| | solver.Minimize(total_objective) |
| |
|
| | |
| |
|
| | |
| | for p in P: |
| | solver.Add( |
| | solver.Sum(U[p, ell, s, t] for ell in L for s in S for t in D) >= d_week.get(p, 0) |
| | ) |
| |
|
| | |
| | for ell in L: |
| | for s in S: |
| | for t in D: |
| | solver.Add(solver.Sum(Z[p, ell, s, t] for p in P) <= 1) |
| | for p in P: |
| | solver.Add(T[p, ell, s, t] <= Hmax_s[s] * Z[p, ell, s, t]) |
| |
|
| | |
| | for p in P: |
| | req_lt = KIT_LINE_MATCH_DICT.get(p, 6) |
| | req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E) |
| | for ell in L: |
| | allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9)) |
| | for s in S: |
| | for t in D: |
| | if ACTIVE[t][p] == 0 or not allowed: |
| | solver.Add(Z[p, ell, s, t] == 0) |
| | solver.Add(T[p, ell, s, t] == 0) |
| | solver.Add(U[p, ell, s, t] == 0) |
| |
|
| | |
| | for p in P: |
| | for ell in L: |
| | for s in S: |
| | for t in D: |
| | |
| | if p in PER_PRODUCT_SPEED: |
| | |
| | speed = PER_PRODUCT_SPEED[p] |
| | solver.Add( |
| | U[p, ell, s, t] <= speed * T[p, ell, s, t] |
| | ) |
| | else: |
| | |
| | default_speed = 800 / 7.5 |
| | print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour") |
| | solver.Add( |
| | U[p, ell, s, t] <= default_speed * T[p, ell, s, t] |
| | ) |
| |
|
| | |
| | for e in E: |
| | for s in S: |
| | for t in D: |
| | solver.Add( |
| | solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for p in P for ell in L) |
| | <= Hmax_s[s] * N_day[e][t] |
| | ) |
| |
|
| | |
| | for e in E: |
| | for t in D: |
| | solver.Add( |
| | solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for s in S for p in P for ell in L) |
| | <= MAX_HOUR_PER_PERSON_PER_DAY * N_day[e][t] |
| | ) |
| |
|
| | |
| | for e in E: |
| | for t in D: |
| | solver.Add( |
| | solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, 2, t] for p in P for ell in L) |
| | <= |
| | solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, 1, t] for p in P for ell in L) |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | print("\n[HIERARCHY] Adding dependency constraints...") |
| | dependency_constraints_added = 0 |
| | |
| | for p in P: |
| | dependencies = KIT_DEPENDENCIES.get(p, []) |
| | if dependencies: |
| | |
| | p_level = KIT_LEVELS.get(p, 2) |
| | |
| | for dep in dependencies: |
| | if dep in P: |
| | |
| | p_completion = solver.Sum( |
| | t * T[p, ell, s, t] for ell in L for s in S for t in D |
| | ) |
| | dep_completion = solver.Sum( |
| | t * T[dep, ell, s, t] for ell in L for s in S for t in D |
| | ) |
| | |
| | |
| | solver.Add(dep_completion <= p_completion) |
| | dependency_constraints_added += 1 |
| | |
| | print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})") |
| | |
| | print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints") |
| |
|
| | |
| | status = solver.Solve() |
| | if status != pywraplp.Solver.OPTIMAL: |
| | print(f"No optimal solution. Status: {status} (2=INFEASIBLE)") |
| | |
| | |
| | |
| | return None |
| |
|
| | |
| | result = {} |
| | result['objective'] = solver.Objective().Value() |
| |
|
| | |
| | prod_week = {p: sum(U[p, ell, s, t].solution_value() for ell in L for s in S for t in D) for p in P} |
| | result['weekly_production'] = prod_week |
| |
|
| | |
| | schedule = [] |
| | for t in D: |
| | for ell in L: |
| | for s in S: |
| | chosen = [p for p in P if Z[p, ell, s, t].solution_value() > 0.5] |
| | if chosen: |
| | p = chosen[0] |
| | schedule.append({ |
| | 'day': t, |
| | 'line_type_id': ell[0], |
| | 'line_idx': ell[1], |
| | 'shift': s, |
| | 'product': p, |
| | 'run_hours': T[p, ell, s, t].solution_value(), |
| | 'units': U[p, ell, s, t].solution_value(), |
| | }) |
| | result['run_schedule'] = schedule |
| |
|
| | |
| | headcount = [] |
| | for e in E: |
| | for s in S: |
| | for t in D: |
| | used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for p in P for ell in L) |
| | need = ceil(used_ph / (Hmax_s[s] + 1e-9)) |
| | headcount.append({'emp_type': e, 'shift': s, 'day': t, |
| | 'needed': need, 'available': N_day[e][t]}) |
| | result['headcount_per_shift'] = headcount |
| |
|
| | |
| | ph_by_day = [] |
| | for e in E: |
| | for t in D: |
| | used = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for s in S for p in P for ell in L) |
| | ph_by_day.append({'emp_type': e, 'day': t, |
| | 'used_person_hours': used, |
| | 'cap_person_hours': Hmax_daily * N_day[e][t]}) |
| | result['person_hours_by_day'] = ph_by_day |
| |
|
| | |
| | print("Objective (min cost):", result['objective']) |
| | print("\n--- Weekly production by product ---") |
| | for p, u in prod_week.items(): |
| | print(f"{p}: {u:.1f} / demand {d_week.get(p,0)}") |
| |
|
| | print("\n--- Schedule (line, shift, day) ---") |
| | for row in schedule: |
| | print(f"D{row['day']} L{row['line_type_id']}-{row['line_idx']} S{row['shift']}: " |
| | f"{row['product']} T={row['run_hours']:.2f}h U={row['units']:.1f}") |
| |
|
| | print("\n--- Implied headcount need (per type/shift/day) ---") |
| | for row in headcount: |
| | print(f"{row['emp_type']}, S{row['shift']}, D{row['day']}: " |
| | f"need={row['needed']} (avail {row['available']})") |
| |
|
| | print("\n--- Total person-hours by type/day ---") |
| | for row in ph_by_day: |
| | print(f"{row['emp_type']}, D{row['day']}: used={row['used_person_hours']:.1f} " |
| | f"(cap {row['cap_person_hours']})") |
| |
|
| | return result |
| |
|
| |
|
| | if __name__ == "__main__": |
| | solve_fixed_team_weekly() |