Spaces:
Running
Running
| """ | |
| Commission rules engine. | |
| Reads all configuration from the database and computes the summary table output | |
| (the same logic as Cell 3 in the notebook, but driven entirely by DB records). | |
| """ | |
| from typing import Optional | |
| from sqlalchemy.orm import Session | |
| from app.models import ( | |
| CommissionAgreement, CommissionTier, FixedConstant, | |
| BellRingerThreshold, Producer, | |
| ) | |
| def get_constant(db: Session, key: str) -> float: | |
| """Fetch a fixed constant by key. Returns 0 if not found.""" | |
| c = db.query(FixedConstant).filter(FixedConstant.key == key).first() | |
| return c.value if c else 0 | |
| def get_all_constants(db: Session) -> dict: | |
| """Fetch all fixed constants as a dict.""" | |
| return {c.key: c.value for c in db.query(FixedConstant).all()} | |
| def get_bell_ringer(db: Session, market_segment: str, dept_code: str) -> Optional[float]: | |
| """Get bell ringer threshold for a segment/dept combo. Returns None if N/A.""" | |
| br = db.query(BellRingerThreshold).filter( | |
| BellRingerThreshold.market_segment == market_segment, | |
| BellRingerThreshold.dept_code == dept_code, | |
| ).first() | |
| return br.threshold if br else None | |
| def check_bell_ringer(db: Session, market_segment: str, dept_code: str, revenue: float) -> dict: | |
| """Check if bell ringer is triggered. Returns status dict.""" | |
| threshold = get_bell_ringer(db, market_segment, dept_code) | |
| if threshold is None: | |
| return {"status": "na", "message": "N/A for this Dept / Segment"} | |
| if revenue >= threshold: | |
| return {"status": "triggered", "threshold": threshold, | |
| "message": f"TRIGGERED (threshold ${threshold:,.0f})"} | |
| return {"status": "not_triggered", "threshold": threshold, | |
| "message": f"Not triggered (threshold ${threshold:,.0f})"} | |
| def get_agreement_with_tiers(db: Session, agreement_id: int) -> Optional[dict]: | |
| """Load an agreement and its tiers from DB, structured like the notebook.""" | |
| agr = db.query(CommissionAgreement).filter( | |
| CommissionAgreement.id == agreement_id | |
| ).first() | |
| if not agr: | |
| return None | |
| p1_tiers = [t for t in agr.tiers if t.producer_role == "P1"] | |
| p2_tiers = [t for t in agr.tiers if t.producer_role == "P2"] | |
| return { | |
| "id": agr.id, | |
| "name": agr.name, | |
| "p1_suffix": agr.p1_suffix, | |
| "p2_suffix": agr.p2_suffix, | |
| "ce_as_producer": agr.ce_as_producer, | |
| "P1": [_tier_to_dict(t) for t in p1_tiers] or None, | |
| "P2": [_tier_to_dict(t) for t in p2_tiers] or None, | |
| } | |
| def _tier_to_dict(t: CommissionTier) -> dict: | |
| return { | |
| "years": t.years_label, | |
| "commission": t.commission, | |
| "credit": t.credit, | |
| "agreement": t.agreement_label, | |
| "note": t.note, | |
| "flag": t.flag, | |
| } | |
| def _is_tier_active(tier: dict) -> bool: | |
| """Check if a tier has active commission (not ended).""" | |
| return tier.get("commission") is not None | |
| def _count_active_producers_for_tier(tier_index: int, | |
| all_producer_tiers: list) -> int: | |
| """ | |
| For a given tier index, count how many producers have active commission | |
| at that tier position. This handles the case where P2/Origination ends | |
| in Year 3+ but P1 continues β the deduction should shift entirely to P1. | |
| all_producer_tiers: list of tier-lists, e.g. [p1_tiers, p2_tiers] | |
| """ | |
| count = 0 | |
| for tiers in all_producer_tiers: | |
| if tiers is None: | |
| continue | |
| if tier_index < len(tiers): | |
| if _is_tier_active(tiers[tier_index]): | |
| count += 1 | |
| # If this producer has fewer tiers, they've ended β don't count | |
| return count | |
| def compute_summary(db: Session, form_data: dict) -> dict: | |
| """ | |
| Compute the full summary table given form input. | |
| form_data keys: | |
| agreement_id : int | |
| producer1_prefix : str or None | |
| producer2_prefix : str or None | |
| ce_prefix : str or None | |
| complex_claims : str or None (e.g. "PARMI2", "EMPOR2") | |
| orig_type : 'producer' | 'employee' | |
| orig_producer_prefix : str or None | |
| orig_producer_code : int (1 or 2) | |
| orig_employee_name : str or None | |
| revenue : float | |
| market_segment : str | |
| dept_code : str | |
| Returns a dict with rows for the summary table + warnings + bell ringer status. | |
| """ | |
| agr_id = form_data.get("agreement_id") | |
| if not agr_id: | |
| return {"error": "No agreement selected", "rows": [], "warnings": []} | |
| R = get_agreement_with_tiers(db, agr_id) | |
| if not R: | |
| return {"error": "Agreement not found", "rows": [], "warnings": []} | |
| consts = get_all_constants(db) | |
| rev = form_data.get("revenue", 0) or 0 | |
| p1pfx = form_data.get("producer1_prefix") | |
| p2pfx = form_data.get("producer2_prefix") | |
| cepfx = form_data.get("ce_prefix") | |
| cc_value = form_data.get("complex_claims") # "PARMI2", "EMPOR2", or None/empty | |
| has_cc = bool(cc_value) | |
| is_ce = R["ce_as_producer"] | |
| # Detect Referral / Origination Fee Agreement | |
| is_referral_orig = R["name"] == "Referral / Origination Fee Agreement" | |
| # For Referral/Origination, P2 tiers go to Originating Producer, not Producer 2 | |
| if is_referral_orig: | |
| has_p2 = False # Producer 2 row = N/A | |
| orig_tiers = R["P2"] # Originating gets the P2 tier data | |
| else: | |
| has_p2 = R["P2"] is not None | |
| orig_tiers = None # Normal originating (no commission tiers) | |
| warnings = [] | |
| rows = [] | |
| # ββ Lookup names ββββββββββββββββββββββββββββββββββββββββββ | |
| def _lookup_name(prefix): | |
| if not prefix: | |
| return "" | |
| p = db.query(Producer).filter(Producer.prefix == prefix).first() | |
| return p.name if p else "" | |
| p1name = _lookup_name(p1pfx) | |
| p2name = _lookup_name(p2pfx) | |
| cename = _lookup_name(cepfx) | |
| # ββ Derive codes ββββββββββββββββββββββββββββββββββββββββββ | |
| p1suf = R["p1_suffix"] | |
| p2suf = R["p2_suffix"] | |
| ce_suf = int(consts.get("CE_PROD_SUFFIX", 2)) if is_ce else int(consts.get("CE_SUFFIX", 1)) | |
| p1code = f"{p1pfx}{p1suf}" if (p1pfx and p1suf) else "β" | |
| p2code = f"{p2pfx}{p2suf}" if (has_p2 and p2pfx and p2suf) else "β" | |
| cecode = f"{cepfx}{ce_suf}" if cepfx else "β" | |
| # ββ Originating Producer info βββββββββββββββββββββββββββββ | |
| orig_type = form_data.get("orig_type", "producer") | |
| has_orig_producer = (orig_type == "producer" and form_data.get("orig_producer_prefix")) | |
| has_orig_employee = (orig_type == "employee" and form_data.get("orig_employee_name")) | |
| has_orig = has_orig_producer or has_orig_employee | |
| if has_orig_producer: | |
| opfx = form_data["orig_producer_prefix"] | |
| ocode_num = form_data.get("orig_producer_code", 2) | |
| ocode = f"{opfx}{ocode_num}" | |
| oname = _lookup_name(opfx) | |
| elif has_orig_employee: | |
| ocode = "EMPOR2" | |
| oname = form_data["orig_employee_name"] | |
| opfx = None | |
| else: | |
| ocode = "β" | |
| oname = "" | |
| opfx = None | |
| # For Referral/Origination, the originating producer code uses P2 suffix | |
| if is_referral_orig and has_orig_producer: | |
| ocode = f"{opfx}{p2suf}" if p2suf else ocode | |
| # ββ Commission reduction rule (per-tier aware) ββββββββββββ | |
| CE_COMM = consts.get("CE_COMM", 2) | |
| CC_COMM = consts.get("CC_COMM", 5) | |
| total_deduction = 0 | |
| if cepfx and not is_ce: | |
| total_deduction += CE_COMM | |
| if has_cc: | |
| total_deduction += CC_COMM | |
| # Build list of all producer tier-lists for per-tier counting | |
| # This tells us how many producers are active at each tier index | |
| producer_tier_lists = [] | |
| if R["P1"] and not is_ce: | |
| producer_tier_lists.append(R["P1"]) | |
| if is_referral_orig and orig_tiers and has_orig: | |
| # Originating has tiers in Referral/Origination | |
| producer_tier_lists.append(orig_tiers) | |
| elif has_p2: | |
| # Normal P2 | |
| if R["P2"]: | |
| producer_tier_lists.append(R["P2"]) | |
| # Compute flat deduction for simple (non-tiered) rows | |
| n_producers_flat = len(producer_tier_lists) | |
| flat_deduction = total_deduction / n_producers_flat if n_producers_flat > 0 else 0 | |
| # ββ Validation ββββββββββββββββββββββββββββββββββββββββββββ | |
| if not p1pfx and not is_ce: | |
| warnings.append("Producer 1 (Lead) is required but not selected.") | |
| if has_p2 and not p2pfx: | |
| warnings.append(f"{R['name']} uses two producers β select Producer 2.") | |
| if not has_p2 and not is_ce and p2pfx and not is_referral_orig: | |
| warnings.append(f"{R['name']} does not use Producer 2 β that row will be N/A.") | |
| if is_ce and not cepfx: | |
| warnings.append("This agreement needs a Client Executive as producer β select one above.") | |
| if is_referral_orig and not has_orig: | |
| warnings.append("Referral / Origination Fee Agreement requires an Originating Producer β select one.") | |
| if has_orig and not is_referral_orig: | |
| warnings.append("An Originating Producer/Employee is selected β " | |
| "the agreement must be Referral / Origination Fee Agreement.") | |
| # CCP is not eligible for Bond Agreement | |
| is_bond = "bond" in R["name"].lower() | |
| if has_cc and is_bond: | |
| warnings.append("Complex Claims Practice (CCP) is not eligible for Bond Agreement.") | |
| # Validate CE is selected if checkbox was intended | |
| if cepfx and not is_ce and not cename: | |
| warnings.append("Client Executive is selected but no name found β verify selection.") | |
| # ββ Build rows ββββββββββββββββββββββββββββββββββββββββββββ | |
| # ROW: Producer 1 | |
| if is_ce: | |
| rows.append(_na_row("Producer 1", "N/A β CE acting as Producer")) | |
| elif R["P1"]: | |
| rows.append(_tier_row_smart( | |
| "Producer 1", "(Lead)", p1code, p1name, | |
| R["P1"], rev, total_deduction, producer_tier_lists, 0 | |
| )) | |
| else: | |
| rows.append(_na_row("Producer 1", "Not applicable")) | |
| # ROW: Producer 2 | |
| if is_referral_orig: | |
| # For Referral/Origination, P2 is always N/A (goes to Originating) | |
| rows.append(_na_row("Producer 2", "N/A β see Originating Producer")) | |
| elif has_p2 and p2pfx: | |
| p2_tier_index = 1 if len(producer_tier_lists) > 1 else 0 | |
| rows.append(_tier_row_smart( | |
| "Producer 2", "(Shared)", p2code, p2name, | |
| R["P2"], rev, total_deduction, producer_tier_lists, p2_tier_index | |
| )) | |
| else: | |
| rows.append(_na_row("Producer 2", "Not applicable / not selected")) | |
| # ROW: Client Executive | |
| if cepfx: | |
| if is_ce: | |
| ce_pct = consts.get("CE_PROD_COMM", 10) | |
| ce_crd = consts.get("CE_PROD_CREDIT", 100) | |
| ce_agr = "Ceded Accounts β CE as Producer" | |
| else: | |
| ce_pct = CE_COMM | |
| ce_crd = consts.get("CE_CREDIT", 0) | |
| ce_agr = "Client Executive Agreement" | |
| rows.append({ | |
| "role": "Client Executive", "sub_role": "", | |
| "code": cecode, "name": cename, | |
| "agreement": ce_agr, | |
| "commission": ce_pct, | |
| "commission_dollar": _dollar(ce_pct, rev), | |
| "credit": ce_crd, | |
| "is_na": False, | |
| }) | |
| else: | |
| rows.append(_na_row("Client Executive", "Not selected")) | |
| # ROW: Complex Claims | |
| if has_cc: | |
| cc_label_map = {"PARMI2": "Mike Parsa", "EMPOR2": "Other Employees"} | |
| cc_display_name = cc_label_map.get(cc_value, "") | |
| rows.append({ | |
| "role": "Complex Claims", "sub_role": "", | |
| "code": cc_value, "name": cc_display_name, | |
| "agreement": "Complex Claims Program", | |
| "commission": CC_COMM, | |
| "commission_dollar": _dollar(CC_COMM, rev), | |
| "credit": consts.get("CC_CREDIT", 0), | |
| "is_na": False, | |
| }) | |
| else: | |
| rows.append(_na_row("Complex Claims", "Not selected")) | |
| # ROW: Originating Producer / Employee | |
| if is_referral_orig and has_orig and orig_tiers: | |
| # Originating gets the P2 tiers with commission/credit | |
| orig_tier_index = 1 if len(producer_tier_lists) > 1 else 0 | |
| rows.append(_tier_row_smart( | |
| "Originating", "Producer / Employee", ocode, oname, | |
| orig_tiers, rev, total_deduction, producer_tier_lists, orig_tier_index | |
| )) | |
| elif has_orig_producer: | |
| rows.append({ | |
| "role": "Originating", "sub_role": "Producer / Employee", | |
| "code": ocode, "name": oname, | |
| "agreement": "β", | |
| "commission": "β", "commission_dollar": "", "credit": "β", | |
| "is_na": False, | |
| }) | |
| elif has_orig_employee: | |
| rows.append({ | |
| "role": "Originating", "sub_role": "Producer / Employee", | |
| "code": "EMPOR2", "name": oname, | |
| "agreement": "Originating Employee", | |
| "commission": "β", "commission_dollar": "", "credit": "β", | |
| "is_na": False, | |
| }) | |
| else: | |
| rows.append(_na_row("Originating Producer / Employee", "Not selected")) | |
| # ββ Bell ringer βββββββββββββββββββββββββββββββββββββββββββ | |
| bell = check_bell_ringer( | |
| db, | |
| form_data.get("market_segment", ""), | |
| form_data.get("dept_code", ""), | |
| rev, | |
| ) | |
| return { | |
| "agreement_name": R["name"], | |
| "rows": rows, | |
| "warnings": warnings, | |
| "bell_ringer": bell, | |
| "reduction_per_producer": flat_deduction, # kept for display note | |
| } | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _dollar(pct, rev) -> str: | |
| if rev > 0 and pct is not None and isinstance(pct, (int, float)): | |
| return f"${rev * pct / 100:,.0f}" | |
| return "" | |
| def _na_row(role: str, message: str) -> dict: | |
| return { | |
| "role": role, "sub_role": "", | |
| "is_na": True, "na_message": message, | |
| "code": "", "name": "", | |
| "agreement": "", "commission": "", | |
| "commission_dollar": "", "credit": "", | |
| } | |
| def _tier_row_smart(role, sub_role, code, name, tiers, rev, | |
| total_deduction, all_producer_tiers, self_index) -> dict: | |
| """ | |
| Build a row with possibly multiple year tiers. | |
| Deduction is calculated PER TIER based on how many producers are active | |
| in that time period (not a flat rate across all tiers). | |
| Args: | |
| role, sub_role, code, name: display fields | |
| tiers: this producer's tier list | |
| rev: revenue for dollar calculation | |
| total_deduction: total CE+CC deduction percentage | |
| all_producer_tiers: list of ALL producer tier lists | |
| self_index: index of this producer in all_producer_tiers | |
| """ | |
| tier_details = [] | |
| agreements_seen = [] | |
| for i, t in enumerate(tiers): | |
| # Count how many producers are active at this tier index | |
| active_count = _count_active_producers_for_tier(i, all_producer_tiers) | |
| per_producer = total_deduction / active_count if active_count > 0 else 0 | |
| adj_comm = max(t["commission"] - per_producer, 0) if t["commission"] is not None else None | |
| tier_details.append({ | |
| "years": t["years"], | |
| "commission": round(adj_comm, 1) if adj_comm is not None else None, | |
| "commission_orig": t["commission"], | |
| "commission_dollar": _dollar(adj_comm, rev) if adj_comm is not None else "", | |
| "credit": t["credit"], | |
| "agreement": t["agreement"], | |
| "flag": t.get("flag"), | |
| "reduced": per_producer > 0 and t["commission"] is not None, | |
| }) | |
| if t["agreement"] not in agreements_seen: | |
| agreements_seen.append(t["agreement"]) | |
| return { | |
| "role": role, "sub_role": sub_role, | |
| "code": code, "name": name, | |
| "tiers": tier_details, | |
| "agreements": agreements_seen, | |
| "is_na": False, "has_tiers": True, | |
| } | |