| from __future__ import annotations |
|
|
| from typing import Dict, List, Tuple, Iterable, Set |
|
|
| from ortools.sat.python import cp_model |
|
|
| from core.models import CompileInput, DecisionVars, CompileReport, RuleIR, Window, Rotation |
|
|
|
|
| def build_model(ci: CompileInput) -> Tuple[cp_model.CpModel, DecisionVars, Dict[str, cp_model.IntVar], CompileReport]: |
| model = cp_model.CpModel() |
|
|
| residents = [r.id for r in ci.ontology.residents] |
| blocks = ci.ontology.blocks |
| rotations = [r.id for r in ci.ontology.rotations] |
|
|
| |
| var_map: Dict[str, cp_model.IntVar] = {} |
| x_vars: List[str] = [] |
| for r in residents: |
| for b in blocks: |
| |
| choices = [] |
| for k in rotations: |
| name = f"x[{r},{b},{k}]" |
| v = model.NewBoolVar(name) |
| var_map[name] = v |
| x_vars.append(name) |
| choices.append(v) |
| |
| model.Add(sum(choices) == 1) |
|
|
| |
| coverage_constraints = 0 |
| total_constraints = 0 |
| lock_constraints = 0 |
| eligibility_forced_zero = 0 |
| consecutive_constraints = 0 |
| rest_constraints = 0 |
| pairing_constraints = 0 |
| preference_penalties = 0 |
| unsupported_rules = [] |
| feasibility_warnings = [] |
| rule_to_constraints = {} |
| |
| |
| penalty_vars = [] |
| penalty_weights = [] |
|
|
| def blocks_from_windows(windows: List[Window]) -> List[str]: |
| if not windows: |
| return list(blocks) |
| selected: List[str] = [] |
| for w in windows: |
| |
| if w.blocks: |
| for blk in w.blocks: |
| if blk in blocks and blk not in selected: |
| selected.append(blk) |
| |
| if w.start_block and w.end_block and w.start_block in blocks and w.end_block in blocks: |
| start_idx = blocks.index(w.start_block) |
| end_idx = blocks.index(w.end_block) |
| lo, hi = (start_idx, end_idx) if start_idx <= end_idx else (end_idx, start_idx) |
| for blk in blocks[lo : hi + 1]: |
| if blk not in selected: |
| selected.append(blk) |
| return selected or list(blocks) |
|
|
| def get_var(rid: str, blk: str, rot: str) -> cp_model.IntVar | None: |
| name = f"x[{rid},{blk},{rot}]" |
| return var_map.get(name) |
|
|
| for rule in ci.rules: |
| try: |
| if rule.constraint_type == "rotation_total": |
| |
| rotation = str(rule.params.get("rotation")) if rule.params else None |
| count = int(rule.params.get("count")) if rule.params and "count" in rule.params else None |
| if not rotation or count is None: |
| continue |
| tgt_blocks = blocks_from_windows(rule.windows) |
| sel_residents: Iterable[str] = ( |
| [str(x) for x in (rule.selectors.get("residents") or [])] |
| if rule.selectors |
| else [] |
| ) |
| if not sel_residents: |
| sel_residents = residents |
| for rid in sel_residents: |
| terms: List[cp_model.IntVar] = [] |
| for blk in tgt_blocks: |
| v = get_var(rid, blk, rotation) |
| if v is not None: |
| terms.append(v) |
| if terms: |
| model.Add(sum(terms) == count) |
| total_constraints += 1 |
|
|
| elif rule.constraint_type == "coverage_min_max": |
| |
| rotation = str(rule.params.get("rotation")) if rule.params else None |
| |
| min_req = (rule.params.get("min") or |
| rule.params.get("min_count") or |
| rule.params.get("min_required")) if rule.params else None |
| max_req = (rule.params.get("max") or |
| rule.params.get("max_count") or |
| rule.params.get("max_required")) if rule.params else None |
| if not rotation: |
| continue |
| |
| |
| rotation_map = { |
| "Night": "NIGHTS", |
| "Nights": "NIGHTS", |
| "Electives": "ELECTIVES", |
| "ELECTIVE": "ELECTIVES" |
| } |
| rotation = rotation_map.get(rotation, rotation) |
| tgt_blocks = blocks_from_windows(rule.windows) |
| for blk in tgt_blocks: |
| terms: List[cp_model.IntVar] = [] |
| for rid in residents: |
| v = get_var(rid, blk, rotation) |
| if v is not None: |
| terms.append(v) |
| if not terms: |
| continue |
| if min_req is not None: |
| model.Add(sum(terms) >= int(min_req)) |
| coverage_constraints += 1 |
| if max_req is not None: |
| model.Add(sum(terms) <= int(max_req)) |
| coverage_constraints += 1 |
|
|
| elif rule.constraint_type == "eligibility": |
| |
| |
| sel_residents: Iterable[str] = ( |
| [str(x) for x in (rule.selectors.get("residents") or [])] |
| if rule.selectors |
| else [] |
| ) |
| sel_rotations: Iterable[str] = ( |
| [str(x) for x in (rule.selectors.get("rotations") or [])] |
| if rule.selectors |
| else [] |
| ) |
| if not sel_residents: |
| sel_residents = residents |
| if not sel_rotations: |
| |
| continue |
| tgt_blocks = blocks_from_windows(rule.windows) |
| for rid in sel_residents: |
| for blk in tgt_blocks: |
| for rot in sel_rotations: |
| v = get_var(rid, blk, rot) |
| if v is not None: |
| model.Add(v == 0) |
| eligibility_forced_zero += 1 |
|
|
| elif rule.constraint_type == "lock_assignment": |
| |
| if not rule.params: |
| continue |
| locks = rule.params.get("locks") or [] |
| for item in locks: |
| rid = str(item.get("resident")) |
| blk = str(item.get("block")) |
| rot = str(item.get("rotation")) |
| v = get_var(rid, blk, rot) |
| if v is not None: |
| model.Add(v == 1) |
| lock_constraints += 1 |
| rule_to_constraints.setdefault(rule.rule_id, []).append(f"Lock {rid} to {rot} in {blk}") |
|
|
| elif rule.constraint_type == "run_consecutive": |
| |
| rotation = str(rule.params.get("rotation")) if rule.params else None |
| max_consec = rule.params.get("max_consecutive") if rule.params else None |
| if not rotation or max_consec is None: |
| continue |
| max_consec = int(max_consec) |
| tgt_blocks = blocks_from_windows(rule.windows) |
| sel_residents: Iterable[str] = ( |
| [str(x) for x in (rule.selectors.get("residents") or [])] |
| if rule.selectors |
| else residents |
| ) |
| |
| for rid in sel_residents: |
| for i in range(len(tgt_blocks) - max_consec): |
| window_blocks = tgt_blocks[i : i + max_consec + 1] |
| terms = [] |
| for blk in window_blocks: |
| v = get_var(rid, blk, rotation) |
| if v is not None: |
| terms.append(v) |
| if terms: |
| if rule.hardness == "hard": |
| model.Add(sum(terms) <= max_consec) |
| consecutive_constraints += 1 |
| else: |
| |
| violation_var = model.NewIntVar(0, len(terms), f"consec_penalty_{rule.rule_id}_{rid}_{i}") |
| model.Add(violation_var >= sum(terms) - max_consec) |
| penalty_vars.append(violation_var) |
| penalty_weights.append(rule.weight) |
| consecutive_constraints += 1 |
| rule_to_constraints.setdefault(rule.rule_id, []).append(f"Max {max_consec} consecutive {rotation}") |
|
|
| elif rule.constraint_type == "rest_gap": |
| |
| |
| from_session = rule.params.get("from_session", "Night") if rule.params else "Night" |
| to_session = rule.params.get("to_session", "AM") if rule.params else "AM" |
| if from_session == "Night" and to_session == "AM": |
| tgt_blocks = blocks_from_windows(rule.windows) |
| sel_residents: Iterable[str] = ( |
| [str(x) for x in (rule.selectors.get("residents") or [])] |
| if rule.selectors |
| else residents |
| ) |
| for rid in sel_residents: |
| for i in range(len(tgt_blocks) - 1): |
| |
| night_vars = [] |
| am_vars = [] |
| for rot_obj in ci.ontology.rotations: |
| if "Night" in rot_obj.sessions: |
| v = get_var(rid, tgt_blocks[i], rot_obj.id) |
| if v is not None: |
| night_vars.append(v) |
| if "AM" in rot_obj.sessions: |
| v = get_var(rid, tgt_blocks[i + 1], rot_obj.id) |
| if v is not None: |
| am_vars.append(v) |
| |
| if night_vars and am_vars: |
| if rule.hardness == "hard": |
| model.Add(sum(night_vars) + sum(am_vars) <= 1) |
| rest_constraints += 1 |
| else: |
| violation_var = model.NewIntVar(0, 1, f"rest_penalty_{rule.rule_id}_{rid}_{i}") |
| model.Add(violation_var >= sum(night_vars) + sum(am_vars) - 1) |
| penalty_vars.append(violation_var) |
| penalty_weights.append(rule.weight) |
| rest_constraints += 1 |
| rule_to_constraints.setdefault(rule.rule_id, []).append(f"Rest gap: no {from_session} → {to_session}") |
|
|
| elif rule.constraint_type == "preference_weight": |
| |
| if not rule.params: |
| continue |
| preferences = rule.params.get("preferences", []) |
| tgt_blocks = blocks_from_windows(rule.windows) |
| for pref in preferences: |
| rid = str(pref.get("resident")) |
| rot = str(pref.get("rotation")) |
| weight = pref.get("weight", 1) |
| if rid and rot: |
| terms = [] |
| for blk in tgt_blocks: |
| v = get_var(rid, blk, rot) |
| if v is not None: |
| terms.append(v) |
| if terms: |
| |
| pref_var = model.NewIntVar(0, len(terms), f"pref_{rule.rule_id}_{rid}_{rot}") |
| model.Add(pref_var == sum(terms)) |
| penalty_vars.append(pref_var) |
| penalty_weights.append(int(weight)) |
| preference_penalties += 1 |
| rule_to_constraints.setdefault(rule.rule_id, []).append("Preference weights applied") |
|
|
| except Exception as e: |
| |
| unsupported_rules.append(rule.rule_id) |
| continue |
|
|
| dv = DecisionVars(x_vars=x_vars) |
| report = CompileReport( |
| num_residents=len(residents), |
| num_blocks=len(blocks), |
| num_rotations=len(rotations), |
| num_x_vars=len(x_vars), |
| num_aux_vars=len(penalty_vars), |
| coverage_constraints=coverage_constraints, |
| total_constraints=total_constraints, |
| lock_constraints=lock_constraints, |
| eligibility_forced_zero=eligibility_forced_zero, |
| consecutive_constraints=consecutive_constraints, |
| rest_constraints=rest_constraints, |
| pairing_constraints=pairing_constraints, |
| preference_penalties=preference_penalties, |
| unsupported_rules=unsupported_rules, |
| feasibility_warnings=feasibility_warnings, |
| rule_to_constraints=rule_to_constraints, |
| ) |
| |
| if penalty_vars: |
| objective_terms = [] |
| for penalty_var, weight in zip(penalty_vars, penalty_weights): |
| objective_terms.append(penalty_var * weight) |
| model.Minimize(sum(objective_terms)) |
| |
| |
| for i, penalty_var in enumerate(penalty_vars): |
| var_map[f"penalty_{i}"] = penalty_var |
| |
| |
| dv.aux_vars = [f"penalty_{i}" for i in range(len(penalty_vars))] |
| |
| return model, dv, var_map, report |
|
|
|
|
|
|