""" Commission rules engine. Reads all configuration from the database and computes the summary table output (the same logic as Cell 3 in the notebook, but driven entirely by DB records). """ from typing import Optional from sqlalchemy.orm import Session from app.models import ( CommissionAgreement, CommissionTier, FixedConstant, BellRingerThreshold, Producer, ) def get_constant(db: Session, key: str) -> float: """Fetch a fixed constant by key. Returns 0 if not found.""" c = db.query(FixedConstant).filter(FixedConstant.key == key).first() return c.value if c else 0 def get_all_constants(db: Session) -> dict: """Fetch all fixed constants as a dict.""" return {c.key: c.value for c in db.query(FixedConstant).all()} def get_bell_ringer(db: Session, market_segment: str, dept_code: str) -> Optional[float]: """Get bell ringer threshold for a segment/dept combo. Returns None if N/A.""" br = db.query(BellRingerThreshold).filter( BellRingerThreshold.market_segment == market_segment, BellRingerThreshold.dept_code == dept_code, ).first() return br.threshold if br else None def check_bell_ringer(db: Session, market_segment: str, dept_code: str, revenue: float) -> dict: """Check if bell ringer is triggered. Returns status dict.""" threshold = get_bell_ringer(db, market_segment, dept_code) if threshold is None: return {"status": "na", "message": "N/A for this Dept / Segment"} if revenue >= threshold: return {"status": "triggered", "threshold": threshold, "message": f"TRIGGERED (threshold ${threshold:,.0f})"} return {"status": "not_triggered", "threshold": threshold, "message": f"Not triggered (threshold ${threshold:,.0f})"} def get_agreement_with_tiers(db: Session, agreement_id: int) -> Optional[dict]: """Load an agreement and its tiers from DB, structured like the notebook.""" agr = db.query(CommissionAgreement).filter( CommissionAgreement.id == agreement_id ).first() if not agr: return None p1_tiers = [t for t in agr.tiers if t.producer_role == "P1"] p2_tiers = [t for t in agr.tiers if t.producer_role == "P2"] return { "id": agr.id, "name": agr.name, "p1_suffix": agr.p1_suffix, "p2_suffix": agr.p2_suffix, "ce_as_producer": agr.ce_as_producer, "P1": [_tier_to_dict(t) for t in p1_tiers] or None, "P2": [_tier_to_dict(t) for t in p2_tiers] or None, } def _tier_to_dict(t: CommissionTier) -> dict: return { "years": t.years_label, "commission": t.commission, "credit": t.credit, "agreement": t.agreement_label, "note": t.note, "flag": t.flag, } def _is_tier_active(tier: dict) -> bool: """Check if a tier has active commission (not ended).""" return tier.get("commission") is not None def _count_active_producers_for_tier(tier_index: int, all_producer_tiers: list) -> int: """ For a given tier index, count how many producers have active commission at that tier position. This handles the case where P2/Origination ends in Year 3+ but P1 continues — the deduction should shift entirely to P1. all_producer_tiers: list of tier-lists, e.g. [p1_tiers, p2_tiers] """ count = 0 for tiers in all_producer_tiers: if tiers is None: continue if tier_index < len(tiers): if _is_tier_active(tiers[tier_index]): count += 1 # If this producer has fewer tiers, they've ended → don't count return count def compute_summary(db: Session, form_data: dict) -> dict: """ Compute the full summary table given form input. form_data keys: agreement_id : int producer1_prefix : str or None producer2_prefix : str or None ce_prefix : str or None complex_claims : str or None (e.g. "PARMI2", "EMPOR2") orig_type : 'producer' | 'employee' orig_producer_prefix : str or None orig_producer_code : int (1 or 2) orig_employee_name : str or None revenue : float market_segment : str dept_code : str Returns a dict with rows for the summary table + warnings + bell ringer status. """ agr_id = form_data.get("agreement_id") if not agr_id: return {"error": "No agreement selected", "rows": [], "warnings": []} R = get_agreement_with_tiers(db, agr_id) if not R: return {"error": "Agreement not found", "rows": [], "warnings": []} consts = get_all_constants(db) rev = form_data.get("revenue", 0) or 0 p1pfx = form_data.get("producer1_prefix") p2pfx = form_data.get("producer2_prefix") cepfx = form_data.get("ce_prefix") cc_value = form_data.get("complex_claims") # "PARMI2", "EMPOR2", or None/empty has_cc = bool(cc_value) is_ce = R["ce_as_producer"] # Detect Referral / Origination Fee Agreement is_referral_orig = R["name"] == "Referral / Origination Fee Agreement" # For Referral/Origination, P2 tiers go to Originating Producer, not Producer 2 if is_referral_orig: has_p2 = False # Producer 2 row = N/A orig_tiers = R["P2"] # Originating gets the P2 tier data else: has_p2 = R["P2"] is not None orig_tiers = None # Normal originating (no commission tiers) warnings = [] rows = [] # ── Lookup names ────────────────────────────────────────── def _lookup_name(prefix): if not prefix: return "" p = db.query(Producer).filter(Producer.prefix == prefix).first() return p.name if p else "" p1name = _lookup_name(p1pfx) p2name = _lookup_name(p2pfx) cename = _lookup_name(cepfx) # ── Derive codes ────────────────────────────────────────── p1suf = R["p1_suffix"] p2suf = R["p2_suffix"] ce_suf = int(consts.get("CE_PROD_SUFFIX", 2)) if is_ce else int(consts.get("CE_SUFFIX", 1)) p1code = f"{p1pfx}{p1suf}" if (p1pfx and p1suf) else "—" p2code = f"{p2pfx}{p2suf}" if (has_p2 and p2pfx and p2suf) else "—" cecode = f"{cepfx}{ce_suf}" if cepfx else "—" # ── Originating Producer info ───────────────────────────── orig_type = form_data.get("orig_type", "producer") has_orig_producer = (orig_type == "producer" and form_data.get("orig_producer_prefix")) has_orig_employee = (orig_type == "employee" and form_data.get("orig_employee_name")) has_orig = has_orig_producer or has_orig_employee if has_orig_producer: opfx = form_data["orig_producer_prefix"] ocode_num = form_data.get("orig_producer_code", 2) ocode = f"{opfx}{ocode_num}" oname = _lookup_name(opfx) elif has_orig_employee: ocode = "EMPOR2" oname = form_data["orig_employee_name"] opfx = None else: ocode = "—" oname = "" opfx = None # For Referral/Origination, the originating producer code uses P2 suffix if is_referral_orig and has_orig_producer: ocode = f"{opfx}{p2suf}" if p2suf else ocode # ── Commission reduction rule (per-tier aware) ──────────── CE_COMM = consts.get("CE_COMM", 2) CC_COMM = consts.get("CC_COMM", 5) total_deduction = 0 if cepfx and not is_ce: total_deduction += CE_COMM if has_cc: total_deduction += CC_COMM # Build list of all producer tier-lists for per-tier counting # This tells us how many producers are active at each tier index producer_tier_lists = [] if R["P1"] and not is_ce: producer_tier_lists.append(R["P1"]) if is_referral_orig and orig_tiers and has_orig: # Originating has tiers in Referral/Origination producer_tier_lists.append(orig_tiers) elif has_p2: # Normal P2 if R["P2"]: producer_tier_lists.append(R["P2"]) # Compute flat deduction for simple (non-tiered) rows n_producers_flat = len(producer_tier_lists) flat_deduction = total_deduction / n_producers_flat if n_producers_flat > 0 else 0 # ── Validation ──────────────────────────────────────────── if not p1pfx and not is_ce: warnings.append("Producer 1 (Lead) is required but not selected.") if has_p2 and not p2pfx: warnings.append(f"{R['name']} uses two producers — select Producer 2.") if not has_p2 and not is_ce and p2pfx and not is_referral_orig: warnings.append(f"{R['name']} does not use Producer 2 — that row will be N/A.") if is_ce and not cepfx: warnings.append("This agreement needs a Client Executive as producer — select one above.") if is_referral_orig and not has_orig: warnings.append("Referral / Origination Fee Agreement requires an Originating Producer — select one.") if has_orig and not is_referral_orig: warnings.append("An Originating Producer/Employee is selected — " "the agreement must be Referral / Origination Fee Agreement.") # CCP is not eligible for Bond Agreement is_bond = "bond" in R["name"].lower() if has_cc and is_bond: warnings.append("Complex Claims Practice (CCP) is not eligible for Bond Agreement.") # Validate CE is selected if checkbox was intended if cepfx and not is_ce and not cename: warnings.append("Client Executive is selected but no name found — verify selection.") # ── Build rows ──────────────────────────────────────────── # ROW: Producer 1 if is_ce: rows.append(_na_row("Producer 1", "N/A — CE acting as Producer")) elif R["P1"]: rows.append(_tier_row_smart( "Producer 1", "(Lead)", p1code, p1name, R["P1"], rev, total_deduction, producer_tier_lists, 0 )) else: rows.append(_na_row("Producer 1", "Not applicable")) # ROW: Producer 2 if is_referral_orig: # For Referral/Origination, P2 is always N/A (goes to Originating) rows.append(_na_row("Producer 2", "N/A — see Originating Producer")) elif has_p2 and p2pfx: p2_tier_index = 1 if len(producer_tier_lists) > 1 else 0 rows.append(_tier_row_smart( "Producer 2", "(Shared)", p2code, p2name, R["P2"], rev, total_deduction, producer_tier_lists, p2_tier_index )) else: rows.append(_na_row("Producer 2", "Not applicable / not selected")) # ROW: Client Executive if cepfx: if is_ce: ce_pct = consts.get("CE_PROD_COMM", 10) ce_crd = consts.get("CE_PROD_CREDIT", 100) ce_agr = "Ceded Accounts — CE as Producer" else: ce_pct = CE_COMM ce_crd = consts.get("CE_CREDIT", 0) ce_agr = "Client Executive Agreement" rows.append({ "role": "Client Executive", "sub_role": "", "code": cecode, "name": cename, "agreement": ce_agr, "commission": ce_pct, "commission_dollar": _dollar(ce_pct, rev), "credit": ce_crd, "is_na": False, }) else: rows.append(_na_row("Client Executive", "Not selected")) # ROW: Complex Claims if has_cc: cc_label_map = {"PARMI2": "Mike Parsa", "EMPOR2": "Other Employees"} cc_display_name = cc_label_map.get(cc_value, "") rows.append({ "role": "Complex Claims", "sub_role": "", "code": cc_value, "name": cc_display_name, "agreement": "Complex Claims Program", "commission": CC_COMM, "commission_dollar": _dollar(CC_COMM, rev), "credit": consts.get("CC_CREDIT", 0), "is_na": False, }) else: rows.append(_na_row("Complex Claims", "Not selected")) # ROW: Originating Producer / Employee if is_referral_orig and has_orig and orig_tiers: # Originating gets the P2 tiers with commission/credit orig_tier_index = 1 if len(producer_tier_lists) > 1 else 0 rows.append(_tier_row_smart( "Originating", "Producer / Employee", ocode, oname, orig_tiers, rev, total_deduction, producer_tier_lists, orig_tier_index )) elif has_orig_producer: rows.append({ "role": "Originating", "sub_role": "Producer / Employee", "code": ocode, "name": oname, "agreement": "—", "commission": "—", "commission_dollar": "", "credit": "—", "is_na": False, }) elif has_orig_employee: rows.append({ "role": "Originating", "sub_role": "Producer / Employee", "code": "EMPOR2", "name": oname, "agreement": "Originating Employee", "commission": "—", "commission_dollar": "", "credit": "—", "is_na": False, }) else: rows.append(_na_row("Originating Producer / Employee", "Not selected")) # ── Bell ringer ─────────────────────────────────────────── bell = check_bell_ringer( db, form_data.get("market_segment", ""), form_data.get("dept_code", ""), rev, ) return { "agreement_name": R["name"], "rows": rows, "warnings": warnings, "bell_ringer": bell, "reduction_per_producer": flat_deduction, # kept for display note } # ── Helpers ────────────────────────────────────────────────── def _dollar(pct, rev) -> str: if rev > 0 and pct is not None and isinstance(pct, (int, float)): return f"${rev * pct / 100:,.0f}" return "" def _na_row(role: str, message: str) -> dict: return { "role": role, "sub_role": "", "is_na": True, "na_message": message, "code": "", "name": "", "agreement": "", "commission": "", "commission_dollar": "", "credit": "", } def _tier_row_smart(role, sub_role, code, name, tiers, rev, total_deduction, all_producer_tiers, self_index) -> dict: """ Build a row with possibly multiple year tiers. Deduction is calculated PER TIER based on how many producers are active in that time period (not a flat rate across all tiers). Args: role, sub_role, code, name: display fields tiers: this producer's tier list rev: revenue for dollar calculation total_deduction: total CE+CC deduction percentage all_producer_tiers: list of ALL producer tier lists self_index: index of this producer in all_producer_tiers """ tier_details = [] agreements_seen = [] for i, t in enumerate(tiers): # Count how many producers are active at this tier index active_count = _count_active_producers_for_tier(i, all_producer_tiers) per_producer = total_deduction / active_count if active_count > 0 else 0 adj_comm = max(t["commission"] - per_producer, 0) if t["commission"] is not None else None tier_details.append({ "years": t["years"], "commission": round(adj_comm, 1) if adj_comm is not None else None, "commission_orig": t["commission"], "commission_dollar": _dollar(adj_comm, rev) if adj_comm is not None else "", "credit": t["credit"], "agreement": t["agreement"], "flag": t.get("flag"), "reduced": per_producer > 0 and t["commission"] is not None, }) if t["agreement"] not in agreements_seen: agreements_seen.append(t["agreement"]) return { "role": role, "sub_role": sub_role, "code": code, "name": name, "tiers": tier_details, "agreements": agreements_seen, "is_na": False, "has_tiers": True, }