HaLim
migrate core code name
4003592
raw
history blame
20 kB
# ============================================================
# SD_roster_real - Fixed Team Production Planning (Option A)
# - Uses config-style variable names from src/config/optimization_config.py
# - Team per product (simultaneous): UNICEF Fixed term / Humanizer
# - Line types via numeric ids: 6=long, 7=short
# - One product per (line, shift, day)
# - Weekly demand (across DATE_SPAN)
# ============================================================
from ortools.linear_solver import pywraplp
from math import ceil
# ---- config import (프로젝트 경로에 맞춰 조정) ----
from src.config.optimization_config import (
DATE_SPAN, # [1..N]
PRODUCT_LIST, # list of products (e.g., ['A','B',...])
EMPLOYEE_TYPE_LIST, # e.g., ['UNICEF Fixed term','Humanizer']
SHIFT_LIST, # e.g., [1,2,3]
LINE_LIST, # e.g., [6,7] (line type ids)
LINE_CNT_PER_TYPE, # {6: count_of_long_lines, 7: count_of_short_lines}
DEMAND_DICTIONARY, # {product: total_units_over_period}
COST_LIST_PER_EMP_SHIFT, # {emp_type: {shift: cost_per_hour}}
MAX_EMPLOYEE_PER_TYPE_ON_DAY, # {emp_type: {t: headcount}}
MAX_HOUR_PER_PERSON_PER_DAY, # e.g., 14
MAX_HOUR_PER_SHIFT_PER_PERSON, # {1: hours, 2: hours, 3: hours}
PER_PRODUCT_SPEED, # {6: cap_units_per_hour, 7: cap_units_per_hour}
MAX_PARALLEL_WORKERS, # {6: max_workers, 7: max_workers}
DAILY_WEEKLY_SCHEDULE, # 'daily' or 'weekly' (여기선 weekly로 모델링)
FIXED_STAFF_CONSTRAINT_MODE, # not used in fixed-team model (동시 투입이라 무의미)
TEAM_REQ_PER_PRODUCT, # {emp_type: {product: team_size}} from Kits_Calculation.csv
PAYMENT_MODE_CONFIG, # {shift: 'bulk'/'partial'} payment mode configuration
KIT_LINE_MATCH_DICT,
EVENING_SHIFT_MODE,
EVENING_SHIFT_DEMAND_THRESHOLD,
# Hierarchy variables for production ordering
KIT_LEVELS, # {kit_id: level} where 0=prepack, 1=subkit, 2=master
KIT_DEPENDENCIES, # {kit_id: [dependency_list]}
PRODUCTION_PRIORITY_ORDER, # [kit_ids] sorted by production priority
)
# -----------------------------------------
# 추가 파라미터 설정 (config에 없던 것들) - TODO: 실제 값 채우세요
# -----------------------------------------
# 2) kit_line_match
KIT_LINE_MATCH_DICT
print("KIT_LINE_MATCH_DICT",KIT_LINE_MATCH_DICT)
# 3) If specific product is not produced on specific date, set it to 0
ACTIVE = {t: {p: 1 for p in PRODUCT_LIST} for t in DATE_SPAN}
# 예: ACTIVE[2]['C'] = 0
def build_lines():
"""List of line instances.
L elements are (line_type_id, idx) tuples. e.g., (6,1), (6,2), (7,1), ...
"""
L = []
for lt in LINE_LIST: # lt: 6 or 7
cnt = int(LINE_CNT_PER_TYPE.get(lt, 0))
for i in range(1, cnt + 1):
L.append((lt, i))
return L
L=build_lines()
print("L",L)
print("PER_PRODUCT_SPEED",PER_PRODUCT_SPEED)
def sort_products_by_hierarchy(product_list):
"""
Sort products by hierarchy levels and dependencies.
Returns products in optimal production order: prepacks → subkits → masters
"""
# Filter products that are in our production list and have hierarchy data
products_with_hierarchy = [p for p in product_list if p in KIT_LEVELS]
products_without_hierarchy = [p for p in product_list if p not in KIT_LEVELS]
if products_without_hierarchy:
print(f"[HIERARCHY] Products without hierarchy data: {products_without_hierarchy}")
# Sort by hierarchy level (0=prepack, 1=subkit, 2=master) then by product ID
sorted_products = sorted(products_with_hierarchy,
key=lambda p: (KIT_LEVELS.get(p, 999), p))
# Add products without hierarchy at the end
sorted_products.extend(sorted(products_without_hierarchy))
print(f"[HIERARCHY] Production order: {len(sorted_products)} products")
for i, p in enumerate(sorted_products[:10]): # Show first 10
level = KIT_LEVELS.get(p, "unknown")
level_name = {0: "prepack", 1: "subkit", 2: "master"}.get(level, "unknown")
deps = KIT_DEPENDENCIES.get(p, [])
print(f" {i+1}. {p} (level {level}={level_name}, deps: {len(deps)})")
if len(sorted_products) > 10:
print(f" ... and {len(sorted_products) - 10} more products")
return sorted_products
def get_dependency_timing_weight(product):
"""
Calculate timing weight based on hierarchy level.
Lower levels (prepacks) should be produced earlier.
"""
level = KIT_LEVELS.get(product, 2) # Default to master level
# Weight: prepack=0.1, subkit=0.5, master=1.0
weights = {0: 0.1, 1: 0.5, 2: 1.0}
return weights.get(level, 1.0)
def solve_fixed_team_weekly():
# --- Sets ---
D = list(DATE_SPAN)
S = sorted(list(SHIFT_LIST))
E = list(EMPLOYEE_TYPE_LIST) # e.g., ['UNICEF Fixed term','Humanizer']
# *** HIERARCHY SORTING: Sort products by production priority ***
print("\n" + "="*60)
print("🔗 APPLYING HIERARCHY-BASED PRODUCTION ORDERING")
print("="*60)
P_sorted = sort_products_by_hierarchy(list(PRODUCT_LIST))
P = P_sorted # Use sorted product list
L = build_lines()
# --- Short aliases for parameters ---
Hmax_s = dict(MAX_HOUR_PER_SHIFT_PER_PERSON) # per-shift hours
Hmax_daily = MAX_HOUR_PER_PERSON_PER_DAY # {6:cap, 7:cap}
max_workers_line = dict(MAX_PARALLEL_WORKERS) # per line type
N_day = MAX_EMPLOYEE_PER_TYPE_ON_DAY # {emp_type:{t:headcount}}
cost = COST_LIST_PER_EMP_SHIFT # {emp_type:{shift:cost}}
d_week = DEMAND_DICTIONARY # {product: demand over period}
# --- Feasibility quick checks ---
# 1) If team size is greater than max_workers_line, block the product-line type combination
for p in P:
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E)
lt = KIT_LINE_MATCH_DICT.get(p, 6) # Default to long line (6) if not found
if p not in KIT_LINE_MATCH_DICT:
print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)")
if req_total > max_workers_line.get(lt, 1e9):
print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] "
f"= {max_workers_line.get(lt)}. Blocked.")
# 2) Check if demand can be met without evening shift (only if in normal mode)
if EVENING_SHIFT_MODE == "normal":
total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in P)
# Calculate maximum capacity with regular + overtime shifts only
regular_overtime_shifts = [s for s in S if s in [1, 2]] # Only shifts 1, 2
max_capacity = 0
for p in P:
if p in PER_PRODUCT_SPEED:
product_speed = PER_PRODUCT_SPEED[p] # units per hour
# Calculate max hours available for this product across all lines and shifts
max_hours_per_product = 0
for ell in L:
for s in regular_overtime_shifts:
for t in D:
max_hours_per_product += Hmax_s[s]
max_capacity += product_speed * max_hours_per_product
capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf')
print(f"[CAPACITY CHECK] Total demand: {total_demand}")
print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}")
print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}")
if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD:
print(f"\n🚨 [ALERT] DEMAND TOO HIGH!")
print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand")
print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%")
print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift")
print(f" This will add shift 3 to increase capacity\n")
# 3) DAILY_WEEKLY_SCHEDULE warning (current model only enforces weekly demand)
if DAILY_WEEKLY_SCHEDULE.lower() == "daily":
print("[INFO] DAILY_WEEKLY_SCHEDULE='daily' but this model only enforces weekly demand. "
"If daily demand decomposition is needed, please let us know.")
# --- Solver ---
solver = pywraplp.Solver.CreateSolver('CBC')
if not solver:
raise RuntimeError("CBC solver not found.")
INF = solver.infinity()
# --- Variables ---
# Z[p,ell,s,t] ∈ {0,1}: 1 if product p runs on (line,shift,day)
Z, T, U = {}, {}, {} # T: run hours, U: production units
for p in P:
for ell in L: # ell = (line_type_id, idx)
for s in S:
for t in D:
Z[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
T[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
U[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
# Note: Binary variables for bulk payment are now created inline in the cost calculation
# --- Objective: total labor cost with payment modes + hierarchy timing penalty ---
print(f"Payment mode configuration: {PAYMENT_MODE_CONFIG}")
# Build cost terms based on payment mode
cost_terms = []
for e in E:
for s in S:
payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") # Default to partial if not specified
if payment_mode == "partial":
# Partial payment: pay for actual hours worked
for p in P:
for ell in L:
for t in D:
cost_terms.append(cost[e][s] * TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t])
elif payment_mode == "bulk":
# Bulk payment: if employees work ANY hours in a shift, pay them for FULL shift hours
# BUT only pay the employees who actually work, not all employees of that type
for p in P:
for ell in L:
for t in D:
# Calculate actual employees working: TEAM_REQ_PER_PRODUCT[e][p] employees work T[p,ell,s,t] hours
# For bulk payment: if T[p,ell,s,t] > 0, pay TEAM_REQ_PER_PRODUCT[e][p] employees for full shift
# We need a binary variable for each (e,s,p,ell,t) combination
# But we can use the existing logic: if T > 0, then those specific employees get bulk pay
# Create binary variable for this specific work assignment
work_binary = solver.BoolVar(f"work_{e}_s{s}_{p}_{ell[0]}{ell[1]}_d{t}")
# Link work_binary to T[p,ell,s,t]: work_binary = 1 if T > 0
solver.Add(T[p, ell, s, t] <= Hmax_s[s] * work_binary)
solver.Add(work_binary * 0.001 <= T[p, ell, s, t])
# Cost: pay the specific working employees for full shift hours
cost_terms.append(cost[e][s] * Hmax_s[s] * TEAM_REQ_PER_PRODUCT[e][p] * work_binary)
total_cost = solver.Sum(cost_terms)
# Secondary objective: encourage earlier production of dependencies (soft constraint)
# Small weight (0.01) to prioritize hierarchy without overwhelming cost optimization
hierarchy_penalty = solver.Sum(
0.01 * get_dependency_timing_weight(p) * t * T[p, ell, s, t]
for p in P for ell in L for s in S for t in D
)
total_objective = total_cost + hierarchy_penalty
solver.Minimize(total_objective)
# --- Constraints ---
# 1) Weekly demand
for p in P:
solver.Add(
solver.Sum(U[p, ell, s, t] for ell in L for s in S for t in D) >= d_week.get(p, 0)
)
# 2) One product per (line,shift,day) + time gating
for ell in L:
for s in S:
for t in D:
solver.Add(solver.Sum(Z[p, ell, s, t] for p in P) <= 1)
for p in P:
solver.Add(T[p, ell, s, t] <= Hmax_s[s] * Z[p, ell, s, t])
# 3) Product-line type compatibility + (optional) activity by day
for p in P:
req_lt = KIT_LINE_MATCH_DICT.get(p, 6) # Default to long line (6) if not found
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E)
for ell in L:
allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9))
for s in S:
for t in D:
if ACTIVE[t][p] == 0 or not allowed:
solver.Add(Z[p, ell, s, t] == 0)
solver.Add(T[p, ell, s, t] == 0)
solver.Add(U[p, ell, s, t] == 0)
# 4) Line throughput: U ≤ product_speed * T
for p in P:
for ell in L:
for s in S:
for t in D:
# Get product speed (same speed regardless of line type)
if p in PER_PRODUCT_SPEED:
# Convert kit per day to kit per hour (assuming 7.5 hour workday)
speed = PER_PRODUCT_SPEED[p]
solver.Add(
U[p, ell, s, t] <= speed * T[p, ell, s, t]
)
else:
# Default speed if not found
default_speed = 800 / 7.5 # units per hour
print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour")
solver.Add(
U[p, ell, s, t] <= default_speed * T[p, ell, s, t]
)
# 5) Per-shift staffing capacity by type: sum(req*hours) ≤ shift_hours * headcount
for e in E:
for s in S:
for t in D:
solver.Add(
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for p in P for ell in L)
<= Hmax_s[s] * N_day[e][t]
)
# 6) Per-day staffing capacity by type: sum(req*hours across shifts) ≤ 14h * headcount
for e in E:
for t in D:
solver.Add(
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for s in S for p in P for ell in L)
<= MAX_HOUR_PER_PERSON_PER_DAY * N_day[e][t]
)
# 7) OT after usual (optional but kept)
for e in E:
for t in D:
solver.Add(
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, 2, t] for p in P for ell in L)
<=
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, 1, t] for p in P for ell in L)
)
# (if needed, evening(3) after usual(1): sum(...)_s=3 ≤ sum(...)_s=1)
# 7.5) Bulk payment linking constraints are now handled inline in the cost calculation
# 8) *** HIERARCHY DEPENDENCY CONSTRAINTS ***
# For subkits with prepack dependencies: dependencies should be produced before or same time
print("\n[HIERARCHY] Adding dependency constraints...")
dependency_constraints_added = 0
for p in P:
dependencies = KIT_DEPENDENCIES.get(p, [])
if dependencies:
# Get the level of the current product
p_level = KIT_LEVELS.get(p, 2)
for dep in dependencies:
if dep in P: # Only if dependency is also in production list
# Calculate "completion time" for each product (sum of all production times)
p_completion = solver.Sum(
t * T[p, ell, s, t] for ell in L for s in S for t in D
)
dep_completion = solver.Sum(
t * T[dep, ell, s, t] for ell in L for s in S for t in D
)
# Dependency should complete before or at the same time
solver.Add(dep_completion <= p_completion)
dependency_constraints_added += 1
print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})")
print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints")
# --- Solve ---
status = solver.Solve()
if status != pywraplp.Solver.OPTIMAL:
print(f"No optimal solution. Status: {status} (2=INFEASIBLE)")
# Debug hint:
# solver.EnableOutput()
# solver.ExportModelAsLpFile("model.lp")
return None
# --- Report ---
result = {}
result['objective'] = solver.Objective().Value()
# Weekly production
prod_week = {p: sum(U[p, ell, s, t].solution_value() for ell in L for s in S for t in D) for p in P}
result['weekly_production'] = prod_week
# Which product ran on which line/shift/day
schedule = []
for t in D:
for ell in L:
for s in S:
chosen = [p for p in P if Z[p, ell, s, t].solution_value() > 0.5]
if chosen:
p = chosen[0]
schedule.append({
'day': t,
'line_type_id': ell[0],
'line_idx': ell[1],
'shift': s,
'product': p,
'run_hours': T[p, ell, s, t].solution_value(),
'units': U[p, ell, s, t].solution_value(),
})
result['run_schedule'] = schedule
# Implied headcount by type/shift/day (ceil)
headcount = []
for e in E:
for s in S:
for t in D:
used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for p in P for ell in L)
need = ceil(used_ph / (Hmax_s[s] + 1e-9))
headcount.append({'emp_type': e, 'shift': s, 'day': t,
'needed': need, 'available': N_day[e][t]})
result['headcount_per_shift'] = headcount
# Total person-hours by type/day (≤ 14h * headcount)
ph_by_day = []
for e in E:
for t in D:
used = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for s in S for p in P for ell in L)
ph_by_day.append({'emp_type': e, 'day': t,
'used_person_hours': used,
'cap_person_hours': Hmax_daily * N_day[e][t]})
result['person_hours_by_day'] = ph_by_day
# Pretty print
print("Objective (min cost):", result['objective'])
print("\n--- Weekly production by product ---")
for p, u in prod_week.items():
print(f"{p}: {u:.1f} / demand {d_week.get(p,0)}")
print("\n--- Schedule (line, shift, day) ---")
for row in schedule:
print(f"D{row['day']} L{row['line_type_id']}-{row['line_idx']} S{row['shift']}: "
f"{row['product']} T={row['run_hours']:.2f}h U={row['units']:.1f}")
print("\n--- Implied headcount need (per type/shift/day) ---")
for row in headcount:
print(f"{row['emp_type']}, S{row['shift']}, D{row['day']}: "
f"need={row['needed']} (avail {row['available']})")
print("\n--- Total person-hours by type/day ---")
for row in ph_by_day:
print(f"{row['emp_type']}, D{row['day']}: used={row['used_person_hours']:.1f} "
f"(cap {row['cap_person_hours']})")
return result
if __name__ == "__main__":
solve_fixed_team_weekly()