|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ortools.linear_solver import pywraplp |
|
|
from math import ceil |
|
|
from src.config.constants import ShiftType, LineType, KitLevel |
|
|
|
|
|
|
|
|
from src.config.optimization_config import ( |
|
|
DATE_SPAN, |
|
|
get_product_list, |
|
|
EMPLOYEE_TYPE_LIST, |
|
|
SHIFT_LIST, |
|
|
LINE_LIST, |
|
|
LINE_CNT_PER_TYPE, |
|
|
get_demand_dictionary, |
|
|
COST_LIST_PER_EMP_SHIFT, |
|
|
MAX_EMPLOYEE_PER_TYPE_ON_DAY, |
|
|
MAX_HOUR_PER_PERSON_PER_DAY, |
|
|
MAX_HOUR_PER_SHIFT_PER_PERSON, |
|
|
PER_PRODUCT_SPEED, |
|
|
MAX_PARALLEL_WORKERS, |
|
|
DAILY_WEEKLY_SCHEDULE, |
|
|
FIXED_STAFF_CONSTRAINT_MODE, |
|
|
get_team_requirements, |
|
|
PAYMENT_MODE_CONFIG, |
|
|
KIT_LINE_MATCH_DICT, |
|
|
EVENING_SHIFT_MODE, |
|
|
EVENING_SHIFT_DEMAND_THRESHOLD, |
|
|
|
|
|
KIT_LEVELS, |
|
|
KIT_DEPENDENCIES, |
|
|
PRODUCTION_PRIORITY_ORDER, |
|
|
|
|
|
FIXED_MIN_UNICEF_PER_DAY, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KIT_LINE_MATCH_DICT |
|
|
print("KIT_LINE_MATCH_DICT",KIT_LINE_MATCH_DICT) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_lines(): |
|
|
"""List of line instances. |
|
|
L elements are (line_type_id, idx) tuples. e.g., (6,1), (6,2), (7,1), ... |
|
|
""" |
|
|
L = [] |
|
|
for lt in LINE_LIST: |
|
|
cnt = int(LINE_CNT_PER_TYPE.get(lt, 0)) |
|
|
for i in range(1, cnt + 1): |
|
|
L.append((lt, i)) |
|
|
return L |
|
|
|
|
|
L=build_lines() |
|
|
print("L",L) |
|
|
print("PER_PRODUCT_SPEED",PER_PRODUCT_SPEED) |
|
|
|
|
|
def sort_products_by_hierarchy(product_list): |
|
|
""" |
|
|
Sort products by hierarchy levels and dependencies using topological sorting. |
|
|
Returns products in optimal production order: prepacks โ subkits โ masters |
|
|
Dependencies within the same level are properly ordered. |
|
|
""" |
|
|
from collections import defaultdict, deque |
|
|
|
|
|
|
|
|
products_with_hierarchy = [p for p in product_list if p in KIT_LEVELS] |
|
|
products_without_hierarchy = [p for p in product_list if p not in KIT_LEVELS] |
|
|
|
|
|
if products_without_hierarchy: |
|
|
print(f"[HIERARCHY] Products without hierarchy data: {products_without_hierarchy}") |
|
|
|
|
|
|
|
|
graph = defaultdict(list) |
|
|
in_degree = defaultdict(int) |
|
|
|
|
|
|
|
|
for product in products_with_hierarchy: |
|
|
in_degree[product] = 0 |
|
|
|
|
|
|
|
|
dependency_count = 0 |
|
|
for product in products_with_hierarchy: |
|
|
deps = KIT_DEPENDENCIES.get(product, []) |
|
|
for dep in deps: |
|
|
if dep in products_with_hierarchy: |
|
|
graph[dep].append(product) |
|
|
in_degree[product] += 1 |
|
|
dependency_count += 1 |
|
|
|
|
|
print(f"[HIERARCHY] Found {dependency_count} dependency relationships in production list") |
|
|
|
|
|
|
|
|
sorted_products = [] |
|
|
queue = deque() |
|
|
|
|
|
|
|
|
no_deps = [(KIT_LEVELS.get(p, 999), p) for p in products_with_hierarchy if in_degree[p] == 0] |
|
|
no_deps.sort() |
|
|
|
|
|
for _, product in no_deps: |
|
|
queue.append(product) |
|
|
|
|
|
while queue: |
|
|
current = queue.popleft() |
|
|
sorted_products.append(current) |
|
|
|
|
|
|
|
|
dependents = [(KIT_LEVELS.get(dep, 999), dep) for dep in graph[current]] |
|
|
dependents.sort() |
|
|
|
|
|
for _, dependent in dependents: |
|
|
in_degree[dependent] -= 1 |
|
|
if in_degree[dependent] == 0: |
|
|
queue.append(dependent) |
|
|
|
|
|
|
|
|
if len(sorted_products) != len(products_with_hierarchy): |
|
|
remaining = [p for p in products_with_hierarchy if p not in sorted_products] |
|
|
print(f"[HIERARCHY] WARNING: Potential circular dependencies detected in: {remaining}") |
|
|
|
|
|
remaining_sorted = sorted(remaining, key=lambda p: (KIT_LEVELS.get(p, 999), p)) |
|
|
sorted_products.extend(remaining_sorted) |
|
|
|
|
|
|
|
|
sorted_products.extend(sorted(products_without_hierarchy)) |
|
|
|
|
|
print(f"[HIERARCHY] Dependency-aware production order: {len(sorted_products)} products") |
|
|
for i, p in enumerate(sorted_products[:10]): |
|
|
level = KIT_LEVELS.get(p, "unknown") |
|
|
level_name = KitLevel.get_name(level) |
|
|
deps = KIT_DEPENDENCIES.get(p, []) |
|
|
deps_in_list = [d for d in deps if d in products_with_hierarchy] |
|
|
print(f" {i+1}. {p} (level {level}={level_name}, deps: {len(deps_in_list)})") |
|
|
if deps_in_list: |
|
|
print(f" Dependencies: {deps_in_list}") |
|
|
|
|
|
if len(sorted_products) > 10: |
|
|
print(f" ... and {len(sorted_products) - 10} more products") |
|
|
|
|
|
return sorted_products |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def solve_fixed_team_weekly(): |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("๐ LOADING FRESH DATA FOR OPTIMIZATION") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
PRODUCT_LIST = get_product_list() |
|
|
DEMAND_DICTIONARY = get_demand_dictionary() |
|
|
TEAM_REQ_PER_PRODUCT = get_team_requirements(PRODUCT_LIST) |
|
|
|
|
|
print(f"๐ฆ LOADED PRODUCTS: {len(PRODUCT_LIST)} products") |
|
|
print(f"๐ LOADED DEMAND: {sum(DEMAND_DICTIONARY.values())} total units") |
|
|
print(f"๐ฅ LOADED TEAM REQUIREMENTS: {len(TEAM_REQ_PER_PRODUCT)} employee types") |
|
|
|
|
|
|
|
|
ACTIVE = {t: {p: 1 for p in PRODUCT_LIST} for t in DATE_SPAN} |
|
|
|
|
|
|
|
|
D = list(DATE_SPAN) |
|
|
|
|
|
S = sorted(list(SHIFT_LIST)) |
|
|
E = list(EMPLOYEE_TYPE_LIST) |
|
|
print("E",E) |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("๐ APPLYING HIERARCHY-BASED PRODUCTION ORDERING") |
|
|
print("="*60) |
|
|
P_sorted = sort_products_by_hierarchy(list(PRODUCT_LIST)) |
|
|
P = P_sorted |
|
|
|
|
|
L = build_lines() |
|
|
|
|
|
|
|
|
Hmax_s = dict(MAX_HOUR_PER_SHIFT_PER_PERSON) |
|
|
Hmax_daily = MAX_HOUR_PER_PERSON_PER_DAY |
|
|
max_workers_line = dict(MAX_PARALLEL_WORKERS) |
|
|
N_day = MAX_EMPLOYEE_PER_TYPE_ON_DAY |
|
|
cost = COST_LIST_PER_EMP_SHIFT |
|
|
d_week = DEMAND_DICTIONARY |
|
|
print("d_week",d_week) |
|
|
|
|
|
|
|
|
|
|
|
for p in P: |
|
|
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E) |
|
|
lt = KIT_LINE_MATCH_DICT.get(p, 6) |
|
|
if p not in KIT_LINE_MATCH_DICT: |
|
|
print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)") |
|
|
if req_total > max_workers_line.get(lt, 1e9): |
|
|
print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] " |
|
|
f"= {max_workers_line.get(lt)}. Blocked.") |
|
|
|
|
|
|
|
|
if EVENING_SHIFT_MODE == "normal": |
|
|
total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in P) |
|
|
|
|
|
|
|
|
regular_overtime_shifts = [s for s in S if s in ShiftType.REGULAR_AND_OVERTIME] |
|
|
max_capacity = 0 |
|
|
|
|
|
for p in P: |
|
|
if p in PER_PRODUCT_SPEED: |
|
|
product_speed = PER_PRODUCT_SPEED[p] |
|
|
|
|
|
max_hours_per_product = 0 |
|
|
for ell in L: |
|
|
for s in regular_overtime_shifts: |
|
|
for t in D: |
|
|
max_hours_per_product += Hmax_s[s] |
|
|
|
|
|
max_capacity += product_speed * max_hours_per_product |
|
|
|
|
|
capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf') |
|
|
|
|
|
print(f"[CAPACITY CHECK] Total demand: {total_demand}") |
|
|
print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}") |
|
|
print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}") |
|
|
|
|
|
if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD: |
|
|
print(f"\n๐จ [ALERT] DEMAND TOO HIGH!") |
|
|
print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand") |
|
|
print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%") |
|
|
print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift") |
|
|
print(f" This will add shift 3 to increase capacity\n") |
|
|
|
|
|
|
|
|
if DAILY_WEEKLY_SCHEDULE.lower() == "daily": |
|
|
print("[INFO] DAILY_WEEKLY_SCHEDULE='daily' but this model only enforces weekly demand. " |
|
|
"If daily demand decomposition is needed, please let us know.") |
|
|
|
|
|
|
|
|
solver = pywraplp.Solver.CreateSolver('CBC') |
|
|
if not solver: |
|
|
raise RuntimeError("CBC solver not found.") |
|
|
INF = solver.infinity() |
|
|
|
|
|
|
|
|
|
|
|
Z, T, U = {}, {}, {} |
|
|
for p in P: |
|
|
for ell in L: |
|
|
for s in S: |
|
|
for t in D: |
|
|
Z[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
T[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
U[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
|
|
|
|
|
|
IDLE = {} |
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
max_idle = N_day[e][t] |
|
|
IDLE[e, s, t] = solver.IntVar(0, max_idle, f"IDLE_{e}_s{s}_d{t}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"Payment mode configuration: {PAYMENT_MODE_CONFIG}") |
|
|
|
|
|
|
|
|
cost_terms = [] |
|
|
|
|
|
for e in E: |
|
|
for s in S: |
|
|
payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") |
|
|
|
|
|
if payment_mode == "partial": |
|
|
|
|
|
for p in P: |
|
|
for ell in L: |
|
|
for t in D: |
|
|
cost_terms.append(cost[e][s] * TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t]) |
|
|
|
|
|
elif payment_mode == "bulk": |
|
|
|
|
|
|
|
|
for p in P: |
|
|
for ell in L: |
|
|
for t in D: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
work_binary = solver.BoolVar(f"work_{e}_s{s}_{p}_{ell[0]}{ell[1]}_d{t}") |
|
|
|
|
|
|
|
|
solver.Add(T[p, ell, s, t] <= Hmax_s[s] * work_binary) |
|
|
solver.Add(work_binary * 0.001 <= T[p, ell, s, t]) |
|
|
|
|
|
|
|
|
cost_terms.append(cost[e][s] * Hmax_s[s] * TEAM_REQ_PER_PRODUCT[e][p] * work_binary) |
|
|
|
|
|
|
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
cost_terms.append(cost[e][s] * Hmax_s[s] * IDLE[e, s, t]) |
|
|
|
|
|
total_cost = solver.Sum(cost_terms) |
|
|
|
|
|
|
|
|
|
|
|
solver.Minimize(total_cost) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for p in P: |
|
|
total_production = solver.Sum(U[p, ell, s, t] for ell in L for s in S for t in D) |
|
|
demand = d_week.get(p, 0) |
|
|
|
|
|
|
|
|
solver.Add(total_production >= demand) |
|
|
|
|
|
|
|
|
solver.Add(total_production <= demand) |
|
|
|
|
|
|
|
|
for ell in L: |
|
|
for s in S: |
|
|
for t in D: |
|
|
solver.Add(solver.Sum(Z[p, ell, s, t] for p in P) <= 1) |
|
|
for p in P: |
|
|
solver.Add(T[p, ell, s, t] <= Hmax_s[s] * Z[p, ell, s, t]) |
|
|
|
|
|
|
|
|
for p in P: |
|
|
req_lt = KIT_LINE_MATCH_DICT.get(p, LineType.LONG_LINE) |
|
|
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in E) |
|
|
for ell in L: |
|
|
allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9)) |
|
|
for s in S: |
|
|
for t in D: |
|
|
if ACTIVE[t][p] == 0 or not allowed: |
|
|
solver.Add(Z[p, ell, s, t] == 0) |
|
|
solver.Add(T[p, ell, s, t] == 0) |
|
|
solver.Add(U[p, ell, s, t] == 0) |
|
|
|
|
|
|
|
|
for p in P: |
|
|
for ell in L: |
|
|
for s in S: |
|
|
for t in D: |
|
|
|
|
|
if p in PER_PRODUCT_SPEED: |
|
|
|
|
|
speed = PER_PRODUCT_SPEED[p] |
|
|
|
|
|
solver.Add( |
|
|
U[p, ell, s, t] <= speed * T[p, ell, s, t] |
|
|
) |
|
|
|
|
|
solver.Add( |
|
|
U[p, ell, s, t] >= speed * T[p, ell, s, t] |
|
|
) |
|
|
else: |
|
|
|
|
|
default_speed = 800 / 7.5 |
|
|
print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour") |
|
|
|
|
|
solver.Add( |
|
|
U[p, ell, s, t] <= default_speed * T[p, ell, s, t] |
|
|
) |
|
|
|
|
|
solver.Add( |
|
|
U[p, ell, s, t] >= default_speed * T[p, ell, s, t] |
|
|
) |
|
|
|
|
|
|
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
|
|
|
|
|
|
solver.Add(IDLE[e, s, t] <= N_day[e][t]) |
|
|
|
|
|
|
|
|
solver.Add( |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for p in P for ell in L) |
|
|
<= Hmax_s[s] * N_day[e][t] |
|
|
) |
|
|
|
|
|
|
|
|
for e in E: |
|
|
for t in D: |
|
|
solver.Add( |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t] for s in S for p in P for ell in L) |
|
|
<= MAX_HOUR_PER_PERSON_PER_DAY * N_day[e][t] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if ShiftType.EVENING in S and ShiftType.REGULAR in S: |
|
|
for e in E: |
|
|
for t in D: |
|
|
solver.Add( |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, ShiftType.EVENING, t] for p in P for ell in L) |
|
|
<= |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, ShiftType.REGULAR, t] for p in P for ell in L) |
|
|
) |
|
|
|
|
|
|
|
|
if ShiftType.OVERTIME in S and ShiftType.REGULAR in S: |
|
|
print("\n[OVERTIME] Adding constraints to ensure overtime only when regular shift is insufficient...") |
|
|
|
|
|
for e in E: |
|
|
for t in D: |
|
|
|
|
|
regular_capacity = N_day[e][t] |
|
|
|
|
|
|
|
|
regular_usage = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, ShiftType.REGULAR, t] |
|
|
for p in P for ell in L |
|
|
) |
|
|
|
|
|
|
|
|
overtime_usage = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, ShiftType.OVERTIME, t] |
|
|
for p in P for ell in L |
|
|
) |
|
|
|
|
|
|
|
|
using_overtime = solver.IntVar(0, 1, f'using_overtime_{e}_{t}') |
|
|
|
|
|
|
|
|
|
|
|
min_regular_for_overtime = int(0.9 * regular_capacity) |
|
|
|
|
|
|
|
|
solver.Add(regular_usage >= min_regular_for_overtime * using_overtime) |
|
|
|
|
|
|
|
|
solver.Add(overtime_usage <= regular_capacity * using_overtime) |
|
|
|
|
|
overtime_constraints_added = len(E) * len(D) * 2 |
|
|
print(f"[OVERTIME] Added {overtime_constraints_added} constraints ensuring overtime only when regular shifts are at 90%+ capacity") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'UNICEF Fixed term' in E and FIXED_MIN_UNICEF_PER_DAY > 0: |
|
|
print(f"\n[FIXED STAFFING] Adding constraint for minimum {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees per day...") |
|
|
|
|
|
unicef_constraints_added = 0 |
|
|
for t in D: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_unicef_hours = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT.get('UNICEF Fixed term', {}).get(p, 0) * T[p, ell, s, t] |
|
|
for p in P |
|
|
for ell in L |
|
|
for s in S |
|
|
) |
|
|
|
|
|
|
|
|
idle_unicef_employees = solver.Sum( |
|
|
IDLE['UNICEF Fixed term', s, t] for s in S |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
solver.Add(all_unicef_hours + idle_unicef_employees * MAX_HOUR_PER_PERSON_PER_DAY >= FIXED_MIN_UNICEF_PER_DAY * MAX_HOUR_PER_PERSON_PER_DAY) |
|
|
|
|
|
|
|
|
|
|
|
total_unicef_hours_needed_for_production = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT.get('UNICEF Fixed term', {}).get(p, 0) * T[p, ell, s, t] |
|
|
for p in P for ell in L for s in S |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unicef_constraints_added += 1 |
|
|
|
|
|
print(f"[FIXED STAFFING] Added {unicef_constraints_added} constraints ensuring >= {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees per day") |
|
|
|
|
|
|
|
|
|
|
|
print("\n[HIERARCHY] Adding dependency constraints...") |
|
|
dependency_constraints_added = 0 |
|
|
|
|
|
for p in P: |
|
|
dependencies = KIT_DEPENDENCIES.get(p, []) |
|
|
if dependencies: |
|
|
|
|
|
p_level = KIT_LEVELS.get(p, 2) |
|
|
|
|
|
for dep in dependencies: |
|
|
if dep in P: |
|
|
|
|
|
p_completion = solver.Sum( |
|
|
t * T[p, ell, s, t] for ell in L for s in S for t in D |
|
|
) |
|
|
dep_completion = solver.Sum( |
|
|
t * T[dep, ell, s, t] for ell in L for s in S for t in D |
|
|
) |
|
|
|
|
|
|
|
|
solver.Add(dep_completion <= p_completion) |
|
|
dependency_constraints_added += 1 |
|
|
|
|
|
print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})") |
|
|
|
|
|
print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints") |
|
|
|
|
|
|
|
|
status = solver.Solve() |
|
|
if status != pywraplp.Solver.OPTIMAL: |
|
|
status_names = {pywraplp.Solver.INFEASIBLE: "INFEASIBLE", pywraplp.Solver.UNBOUNDED: "UNBOUNDED"} |
|
|
print(f"No optimal solution. Status: {status} ({status_names.get(status, 'UNKNOWN')})") |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
result = {} |
|
|
result['objective'] = solver.Objective().Value() |
|
|
|
|
|
|
|
|
prod_week = {p: sum(U[p, ell, s, t].solution_value() for ell in L for s in S for t in D) for p in P} |
|
|
result['weekly_production'] = prod_week |
|
|
|
|
|
|
|
|
schedule = [] |
|
|
for t in D: |
|
|
for ell in L: |
|
|
for s in S: |
|
|
chosen = [p for p in P if Z[p, ell, s, t].solution_value() > 0.5] |
|
|
if chosen: |
|
|
p = chosen[0] |
|
|
schedule.append({ |
|
|
'day': t, |
|
|
'line_type_id': ell[0], |
|
|
'line_idx': ell[1], |
|
|
'shift': s, |
|
|
'product': p, |
|
|
'run_hours': T[p, ell, s, t].solution_value(), |
|
|
'units': U[p, ell, s, t].solution_value(), |
|
|
}) |
|
|
result['run_schedule'] = schedule |
|
|
|
|
|
|
|
|
headcount = [] |
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for p in P for ell in L) |
|
|
need = ceil(used_ph / (Hmax_s[s] + 1e-9)) |
|
|
headcount.append({'emp_type': e, 'shift': s, 'day': t, |
|
|
'needed': need, 'available': N_day[e][t]}) |
|
|
result['headcount_per_shift'] = headcount |
|
|
|
|
|
|
|
|
ph_by_day = [] |
|
|
for e in E: |
|
|
for t in D: |
|
|
used = sum(TEAM_REQ_PER_PRODUCT[e][p] * T[p, ell, s, t].solution_value() for s in S for p in P for ell in L) |
|
|
ph_by_day.append({'emp_type': e, 'day': t, |
|
|
'used_person_hours': used, |
|
|
'cap_person_hours': Hmax_daily * N_day[e][t]}) |
|
|
result['person_hours_by_day'] = ph_by_day |
|
|
|
|
|
|
|
|
idle_employees = [] |
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
idle_count = IDLE[e, s, t].solution_value() |
|
|
if idle_count > 0: |
|
|
idle_employees.append({ |
|
|
'emp_type': e, |
|
|
'shift': s, |
|
|
'day': t, |
|
|
'idle_count': idle_count |
|
|
}) |
|
|
result['idle_employees'] = idle_employees |
|
|
|
|
|
|
|
|
print("Objective (min cost):", result['objective']) |
|
|
print("\n--- Weekly production by product ---") |
|
|
for p, u in prod_week.items(): |
|
|
print(f"{p}: {u:.1f} / demand {d_week.get(p,0)}") |
|
|
|
|
|
print("\n--- Schedule (line, shift, day) ---") |
|
|
for row in schedule: |
|
|
shift_name = ShiftType.get_name(row['shift']) |
|
|
line_name = LineType.get_name(row['line_type_id']) |
|
|
print(f"D{row['day']} {line_name}-{row['line_idx']} {shift_name}: " |
|
|
f"{row['product']} T={row['run_hours']:.2f}h U={row['units']:.1f}") |
|
|
|
|
|
print("\n--- Implied headcount need (per type/shift/day) ---") |
|
|
for row in headcount: |
|
|
shift_name = ShiftType.get_name(row['shift']) |
|
|
print(f"{row['emp_type']}, {shift_name}, D{row['day']}: " |
|
|
f"need={row['needed']} (avail {row['available']})") |
|
|
|
|
|
print("\n--- Total person-hours by type/day ---") |
|
|
for row in ph_by_day: |
|
|
print(f"{row['emp_type']}, D{row['day']}: used={row['used_person_hours']:.1f} " |
|
|
f"(cap {row['cap_person_hours']})") |
|
|
|
|
|
|
|
|
print("\n--- Idle employees (per type/shift/day) ---") |
|
|
idle_found = False |
|
|
for e in E: |
|
|
for s in S: |
|
|
for t in D: |
|
|
idle_count = IDLE[e, s, t].solution_value() |
|
|
if idle_count > 0: |
|
|
shift_name = ShiftType.get_name(s) |
|
|
print(f"{e}, {shift_name}, D{t}: idle={idle_count}") |
|
|
idle_found = True |
|
|
if not idle_found: |
|
|
print("No idle employees scheduled") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
solve_fixed_team_weekly() |