Spaces:
Sleeping
Sleeping
| """ | |
| Hard_Rule_Engine.py | |
| =================== | |
| Rule-engine version of the user's RBS checker. | |
| Purpose: | |
| - Apply hard validation rules to each claim | |
| - If any hard rule fires, mark claim as REJECTED | |
| - Return the list of broken hard rules and their messages | |
| This file is intentionally structured for a rule engine, not for ML features. | |
| A separate file can convert the same rule logic into binary features. | |
| USAGE: | |
| python Hard_Rule_Engine.py | |
| python Hard_Rule_Engine.py claims_to_check.xlsx Hard_Rule_Results.xlsx | |
| """ | |
| from __future__ import annotations | |
| import warnings | |
| from dataclasses import dataclass | |
| from datetime import date | |
| from typing import Callable, Dict, List, Optional | |
| import pandas as pd | |
| warnings.filterwarnings("ignore") | |
| # ============================================================ | |
| # Helpers | |
| # ============================================================ | |
| def get(row: Dict, col: str, default=""): | |
| value = row.get(col, default) | |
| if pd.isna(value): | |
| return default | |
| return value | |
| def dx_list(row: Dict) -> List[str]: | |
| raw = get(row, "AllDxCodes", "") | |
| return [c.strip().upper() for c in str(raw).split(",") if c.strip()] | |
| def patient_age(row: Dict) -> Optional[float]: | |
| try: | |
| dob = pd.to_datetime(get(row, "DateOfBirth")) | |
| if pd.isna(dob): | |
| return None | |
| return (pd.Timestamp.today() - dob).days / 365.25 | |
| except Exception: | |
| return None | |
| def is_medicare(row: Dict) -> bool: | |
| ins = str(get(row, "ClaimPriIns", "")).lower() | |
| return "medicare" in ins | |
| def get_modifiers(row: Dict) -> List[str]: | |
| return [str(get(row, f"Modifier{i}", "")).strip().upper() for i in range(1, 5)] | |
| def cpt(row: Dict) -> str: | |
| return str(get(row, "CPTCode", "")).strip().upper() | |
| def dos_date(row: Dict) -> Optional[date]: | |
| try: | |
| v = get(row, "DOS", "") | |
| if not v or str(v).strip() in ("", "nan", "NaT"): | |
| return None | |
| parsed = pd.to_datetime(v, errors="coerce") | |
| return None if pd.isna(parsed) else parsed.date() | |
| except Exception: | |
| return None | |
| def adm_date(row: Dict) -> Optional[date]: | |
| try: | |
| v = get(row, "AdmissionDate", "") | |
| if not v or str(v).strip() in ("", "nan", "NaT"): | |
| return None | |
| parsed = pd.to_datetime(v, errors="coerce") | |
| return None if pd.isna(parsed) else parsed.date() | |
| except Exception: | |
| return None | |
| def dis_date(row: Dict) -> Optional[date]: | |
| try: | |
| v = get(row, "DischargeDate", "") | |
| if not v or str(v).strip() in ("", "nan", "NaT"): | |
| return None | |
| parsed = pd.to_datetime(v, errors="coerce") | |
| return None if pd.isna(parsed) else parsed.date() | |
| except Exception: | |
| return None | |
| def has_modifier(row: Dict, mod: str) -> bool: | |
| return mod.upper() in [m.upper() for m in get_modifiers(row) if m] | |
| def to_float(value, default: float = 0.0) -> float: | |
| try: | |
| if value is None or str(value).strip() == "": | |
| return default | |
| return float(value) | |
| except Exception: | |
| return default | |
| # ============================================================ | |
| # Rule model | |
| # ============================================================ | |
| class RuleResult: | |
| name: str | |
| fired: bool | |
| message: str = "" | |
| RuleFunc = Callable[[Dict], Optional[RuleResult]] | |
| def fail(name: str, message: str) -> RuleResult: | |
| return RuleResult(name=name, fired=True, message=message) | |
| # ============================================================ | |
| # Hard rules | |
| # One rule = one function | |
| # ============================================================ | |
| def rule_rdxcode1(row: Dict) -> Optional[RuleResult]: | |
| dxs = dx_list(row) | |
| if dxs and dxs[0] == "I10": | |
| return fail("rdxcode1", 'Primary Diagnosis code must not equal to "I10".') | |
| return None | |
| def rule_rdxcode_missing(row: Dict) -> Optional[RuleResult]: | |
| if not dx_list(row): | |
| return fail("rDXCodeMissing", "DXCODE1 is mandatory field.") | |
| return None | |
| # def rule_duplicate_dx_codes(row: Dict) -> Optional[RuleResult]: | |
| # seen = [] | |
| # for i, d in enumerate(dx_list(row), start=1): | |
| # if d in seen: | |
| # return fail(f"rDxCode{i}_dup", f"DX{i} Code must not be entered twice. ({d})") | |
| # seen.append(d) | |
| # return None | |
| def collect_duplicate_dx_rules(row: Dict) -> List[RuleResult]: | |
| broken = [] | |
| seen = [] | |
| for i, d in enumerate(dx_list(row), start=1): | |
| if d in seen: | |
| broken.append(fail(f"rDxCode{i}_dup", f"DX{i} duplicate ({d})")) | |
| seen.append(d) | |
| return broken | |
| def rule_perinatal_dx_age(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if age is None or age <= 1: | |
| return None | |
| for i, d in enumerate(dx_list(row), start=1): | |
| if d.startswith(("P0", "P1", "P2", "P3", "P4", "P5", "P6", "P7", "P8", "P9")): | |
| return fail("rDXCodeCategory1", f"DX{i} ({d}) is for Perinatal/Newborn and not valid for this patient.") | |
| return None | |
| def rule_pediatric_dx_age(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if age is None or age <= 17: | |
| return None | |
| for i, d in enumerate(dx_list(row), start=1): | |
| if d.startswith("Z00.1") or d.startswith("Z001"): | |
| return fail("rDXCodeCategory2", f"DX{i} ({d}) is Pediatric and not valid for this patient.") | |
| return None | |
| def rule_maternity_dx_gender(row: Dict) -> Optional[RuleResult]: | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| if gender != "male": | |
| return None | |
| for i, d in enumerate(dx_list(row), start=1): | |
| if d.startswith(("O", "Z34", "Z36", "Z3A")): | |
| return fail("rDXCodeCategory3", f"DX{i} ({d}) is Maternity Diagnosis and not valid for Male patient.") | |
| return None | |
| def rule_female_only_dx_gender(row: Dict) -> Optional[RuleResult]: | |
| female_only_pfx = ( | |
| "N70", "N71", "N72", "N73", "N74", "N75", "N76", "N77", "N80", "N81", "N82", "N83", | |
| "N84", "N85", "N86", "N87", "N88", "N89", "N90", "N91", "N92", "N93", "N94", "N95", | |
| "N96", "N97" | |
| ) | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| if gender != "male": | |
| return None | |
| for i, d in enumerate(dx_list(row), start=1): | |
| if any(d.startswith(p) for p in female_only_pfx): | |
| return fail("rDXCodeCategory5", f"DX{i} ({d}) is Female-only and not valid for Male patient.") | |
| return None | |
| def rule_external_cause_not_primary(row: Dict) -> Optional[RuleResult]: | |
| dxs = dx_list(row) | |
| if dxs and dxs[0].startswith(("V", "W", "X", "Y")): | |
| return fail("rdxcode1_morbidity", f"External cause code ({dxs[0]}) may not be used as primary diagnosis.") | |
| return None | |
| def rule_z6830_not_primary(row: Dict) -> Optional[RuleResult]: | |
| dxs = dx_list(row) | |
| if dxs and dxs[0] in ("Z68.30", "Z6830"): | |
| return fail("rdxZ6830", "Z68.30 is not acceptable as principal DX.") | |
| return None | |
| def rule_dx_g40009_g40909(row: Dict) -> Optional[RuleResult]: | |
| dx_set = {d.replace(".", "") for d in dx_list(row)} | |
| if "G40009" in dx_set and "G40909" in dx_set: | |
| return fail("rDx1Diagnosis", "DX Code G40009 and G40909 cannot be billed in single claim.") | |
| return None | |
| def rule_dx_g4089_g40009(row: Dict) -> Optional[RuleResult]: | |
| dx_set = {d.replace(".", "") for d in dx_list(row)} | |
| if "G4089" in dx_set and "G40009" in dx_set: | |
| return fail("rDxCodeG40", "DX Code G40.89 and G40.009 cannot be billed in single claim.") | |
| return None | |
| def rule_dx_f17200_z87891(row: Dict) -> Optional[RuleResult]: | |
| dx_set = {d.replace(".", "") for d in dx_list(row)} | |
| if "F17200" in dx_set and "Z87891" in dx_set: | |
| return fail("rdxF17&Z87", "Diagnosis codes F17.200 and Z87.891 cannot be reported together.") | |
| return None | |
| def rule_dx_n183_date(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if dos and dos > date(2020, 9, 30): | |
| for d in dx_list(row): | |
| if d.replace(".", "") == "N183": | |
| return fail("rDxN183", "DxCode N18.3 was billable only through 2020-09-30.") | |
| return None | |
| def rule_dx_n1830_n1831_n1832_date(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if dos and dos < date(2020, 10, 1): | |
| for d in dx_list(row): | |
| if d.replace(".", "") in ("N1830", "N1831", "N1832"): | |
| return fail("rDxN18303132", f"DxCode {d} is effective from 2020-10-01.") | |
| return None | |
| def rule_dx_m5459_r051_date(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if dos and dos < date(2021, 10, 1): | |
| for d in dx_list(row): | |
| if d in ("M54.59", "R05.1", "M5459", "R051"): | |
| return fail("rDx-M54.59,R05.1", f"Diagnosis code {d} is effective from 2021-10-01.") | |
| return None | |
| def rule_z01818_z01810_only_primary(row: Dict) -> Optional[RuleResult]: | |
| for i, d in enumerate(dx_list(row)[1:], start=2): | |
| if d.replace(".", "") in ("Z01818", "Z01810"): | |
| return fail("rDXZ01818", f"DX {d} is acceptable only as primary diagnosis, found at position {i}.") | |
| return None | |
| def rule_wellmed_f_series_dx(row: Dict) -> Optional[RuleResult]: | |
| if "wellmed" not in str(get(row, "ClaimPriIns", "")).lower(): | |
| return None | |
| for d in dx_list(row): | |
| if d.startswith("F"): | |
| return fail("rDxCodeFSeries", f"For Wellmed insurance, F-series diagnosis codes are not acceptable. Found: {d}") | |
| return None | |
| def rule_90653_requires_z23(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "90653": | |
| if not any(d.replace(".", "") == "Z23" for d in dx_list(row)): | |
| return fail("rDx723Point", "ICD Z23 is mandatory with CPT 90653.") | |
| return None | |
| def rule_zip_valid(row: Dict) -> Optional[RuleResult]: | |
| zip_code = str(get(row, "Zip", "")).strip() | |
| if not zip_code or not (len(zip_code) == 5 or len(zip_code.replace("-", "")) == 9): | |
| return fail("rZIP", "Please enter valid Zip Code for patient.") | |
| return None | |
| def rule_state_valid(row: Dict) -> Optional[RuleResult]: | |
| state = str(get(row, "State", "")).strip() | |
| if not state or len(state) < 2: | |
| return fail("rState", "Please enter valid State for patient.") | |
| return None | |
| def rule_city_valid(row: Dict) -> Optional[RuleResult]: | |
| city = str(get(row, "City", "")).strip() | |
| if not city: | |
| return fail("rCity", "Please enter valid City for patient.") | |
| return None | |
| def rule_last_name_required(row: Dict) -> Optional[RuleResult]: | |
| if not str(get(row, "LastName", "")).strip(): | |
| return fail("rLNMMis", "Last Name is Mandatory field.") | |
| return None | |
| def rule_first_name_required(row: Dict) -> Optional[RuleResult]: | |
| if not str(get(row, "FirstName", "")).strip(): | |
| return fail("rFNMis", "First Name is Mandatory field.") | |
| return None | |
| def rule_patient_name_special_chars(row: Dict) -> Optional[RuleResult]: | |
| patient_name = str(get(row, "PatientName", "")).strip() | |
| special_chars = set('!#%$&+,./:;<=>@`{|}~"()*\\_^?[]\'') | |
| if set(patient_name) & special_chars: | |
| return fail("rSpecChar", "Special characters are not allowed in Patient name.") | |
| return None | |
| def rule_address_required(row: Dict) -> Optional[RuleResult]: | |
| if not str(get(row, "Address", "")).strip(): | |
| return fail("rADSMis", "Address is Mandatory field.") | |
| return None | |
| def rule_is_ptl(row: Dict) -> Optional[RuleResult]: | |
| is_ptl = str(get(row, "IsPTL", "0")).strip().lower() | |
| if is_ptl in ("1", "true", "yes"): | |
| return fail("rIsPTL", "Claim with PTL status must not be submitted.") | |
| return None | |
| def rule_cpt_required(row: Dict) -> Optional[RuleResult]: | |
| if not cpt(row): | |
| return fail("rCPTCode=1", "Please enter at least one CPT for the claim.") | |
| return None | |
| def rule_g0389_replaced(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0389": | |
| return fail("rCPTCOde=10", "CPT Code G0389 is replaced by 76706.") | |
| return None | |
| def rule_94620_replaced(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "94620": | |
| dos = dos_date(row) | |
| if dos and dos > date(2018, 1, 1): | |
| return fail("rCPTCode94620", "CPT 94620 is not acceptable after 2018-01-01 DOS.") | |
| return None | |
| def rule_g0436_replaced(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0436": | |
| return fail("rCPTCode=12", "CPT G0436 is replaced with 99406.") | |
| return None | |
| def rule_g0437_replaced(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0437": | |
| return fail("rCPTCode=13", "CPT G0437 is replaced with 99407.") | |
| return None | |
| def rule_q2037_replaced(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "Q2037": | |
| return fail("rCPTCodeQ2037", "CPT Q2037 has been replaced. Please enter updated CPT.") | |
| return None | |
| def rule_92585_deleted(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "92585": | |
| return fail("rCPT92585", "92585 is a deleted CPT code.") | |
| return None | |
| def rule_g8553_deleted(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G8553": | |
| return fail("rCPTG8553Del", "CPT G8553 is a deleted code.") | |
| return None | |
| def rule_0031a_wrong_code_for_age(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if cpt(row) == "0031A" and age is not None and 12 <= age <= 18: | |
| return fail("rCPT0031A", 'Correct code is "0001A 0002A". Please replace the code.') | |
| return None | |
| def rule_consult_code_medicare(row: Dict) -> Optional[RuleResult]: | |
| consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"} | |
| if cpt(row) in consult_codes and is_medicare(row): | |
| return fail("rConsultCode", "Medicare does not accept consultation codes.") | |
| return None | |
| def rule_g0008_insurance(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0008": | |
| ins_name = str(get(row, "ClaimPriIns", "")).lower() | |
| if "medicare" not in ins_name and "uhc" not in ins_name and "united" not in ins_name: | |
| return fail("rCPTCodeG0008", "G0008 is only allowed with Medicare or UHC.") | |
| return None | |
| def rule_90471_medicare(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "90471" and is_medicare(row): | |
| return fail("rCPTCode90471", "90471 is not allowed with Medicare.") | |
| return None | |
| def rule_consult_requires_referring(row: Dict) -> Optional[RuleResult]: | |
| consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"} | |
| if cpt(row) in consult_codes and not get(row, "ReferringPhysician"): | |
| return fail("rConsultCode2", "Referring physician is mandatory with consultation code.") | |
| return None | |
| def rule_25000_not_allowed(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "25000": | |
| return fail("rCPTCode25000", "CPT Code 25000 is not allowed.") | |
| return None | |
| def rule_jcode_ndc(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row).startswith("J") and not str(get(row, "Drug_NDC", "")).strip(): | |
| return fail("rJcodeNDC", "J-code without NDC should not be billed.") | |
| return None | |
| def rule_u0002_clia(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "U0002" and not get(row, "CLIANumber"): | |
| return fail("rCPTU0002Clia", "CLIA number is required with CPT U0002.") | |
| return None | |
| def rule_lab_cpt_clia(row: Dict) -> Optional[RuleResult]: | |
| try: | |
| cpt_num = int(cpt(row)) | |
| if 80000 <= cpt_num <= 89999 and not get(row, "CLIANumber"): | |
| return fail("rCLIANumberMissing", "CLIA number required for lab services.") | |
| except Exception: | |
| return None | |
| return None | |
| def rule_high_level_99203(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99203": | |
| return fail("rCPTHighLevel", "Please update CPT 99203 as 99204 according to visit type.") | |
| return None | |
| def rule_high_level_99213(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99213": | |
| return fail("rCPTHighLevel", "Please update CPT 99213 as 99214 according to visit type.") | |
| return None | |
| def rule_newborn_cpt_age(row: Dict) -> Optional[RuleResult]: | |
| newborn_cpts = {"99460", "99461", "99462", "99463", "99464", "99465"} | |
| age = patient_age(row) | |
| if cpt(row) in newborn_cpts and age is not None and age > (28 / 365.25): | |
| return fail("rNewbornCPT", f"Newborn care CPT {cpt(row)} is not valid for patient older than 28 days.") | |
| return None | |
| def rule_gender_specific_cpt(row: Dict) -> Optional[RuleResult]: | |
| female_only = {"58300", "58301", "58600", "58605", "58700", "58720", "58740", "58750", "57452", "57454", "57455"} | |
| male_only = {"54000", "54001", "54050", "54055", "54150", "54160", "54161", "55000", "55040", "55041"} | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| code = cpt(row) | |
| if code in female_only and gender == "male": | |
| return fail("rCPTGender", f"CPT {code} is not valid for Male gender.") | |
| if code in male_only and gender == "female": | |
| return fail("rCPTGender", f"CPT {code} is not valid for Female gender.") | |
| return None | |
| MATERNITY_CPTS = { | |
| "59000", "59012", "59015", "59020", "59025", "59030", "59050", "59051", "59070", "59072", | |
| "59074", "59076", "59100", "59120", "59121", "59130", "59135", "59136", "59140", "59150", | |
| "59151", "59160", "59200", "59300", "59320", "59325", "59350", "59400", "59409", "59410", | |
| "59412", "59414", "59425", "59426", "59430", "59510", "59514", "59515", "59525", "59610", | |
| "59612", "59614", "59618", "59620", "59622", "59812", "59820", "59821", "59830", "59840", | |
| "59841", "59850", "59851", "59852", "59855", "59856", "59857", "59866", "59870", "59871", | |
| "59897", "59898", "59899" | |
| } | |
| def rule_maternity_cpt_gender(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in MATERNITY_CPTS and str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rMatCPT", f"Maternity care CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| def rule_maternity_cpt_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in MATERNITY_CPTS: | |
| return None | |
| age = patient_age(row) | |
| if age is None: | |
| return None | |
| if age < 10 or age > 55: | |
| return fail("rMatCPTAge", f"Maternity CPT {cpt(row)} not valid for patient age {age:.0f} (expected 10–55).") | |
| return None | |
| def rule_delivery_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| delivery_cpts = {"59400", "59409", "59410", "59510", "59514", "59515", "59610", "59612", "59614", "59618", "59620", "59622"} | |
| if cpt(row) not in delivery_cpts: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| valid_pos = {"21", "22", "23", "02"} | |
| if pos and pos not in valid_pos: | |
| return fail("rDeliveryPOS", f"Delivery CPT {cpt(row)} requires POS 21/22/23/02, got POS {pos}.") | |
| return None | |
| def rule_chemo_requires_cancer_dx(row: Dict) -> Optional[RuleResult]: | |
| chemo_cpts = { | |
| "96401", "96402", "96405", "96406", "96409", "96411", "96413", "96415", "96416", "96417", | |
| "96420", "96422", "96423", "96425", "96440", "96446", "96450", "96521", "96522", "96523", | |
| "96542", "96549", | |
| } | |
| if cpt(row) not in chemo_cpts: | |
| return None | |
| for d in dx_list(row): | |
| # C00-C96 malignant; D00-D09 in situ; D37-D48 uncertain behavior | |
| if d.startswith("C") or d.startswith(("D0", "D37", "D38", "D39", "D4")): | |
| return None | |
| return fail("rChemoNoCancer", f"Chemotherapy CPT {cpt(row)} requires a neoplasm diagnosis (C-code or D00–D48).") | |
| def rule_dialysis_requires_renal_dx(row: Dict) -> Optional[RuleResult]: | |
| dialysis_cpts = { | |
| "90935", "90937", "90940", "90945", "90947", | |
| "90951", "90952", "90953", "90954", "90955", "90956", "90957", "90958", "90959", "90960", | |
| "90961", "90962", "90963", "90964", "90965", "90966", "90967", "90968", "90969", "90970", | |
| "90989", "90993", "90997", "90999", | |
| } | |
| if cpt(row) not in dialysis_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith("N18") or d == "Z992": | |
| return None | |
| return fail("rDialysisNoRenal", f"Dialysis CPT {cpt(row)} requires renal diagnosis (N18.x or Z99.2).") | |
| def rule_cardiac_rehab_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"93797", "93798", "G0422", "G0423"}: | |
| return None | |
| # I20 angina, I21 MI, I22 subsequent MI, I25 CAD, I50 heart failure, Z95.1 CABG hx, Z95.5 PCI hx | |
| valid_prefixes = ("I20", "I21", "I22", "I25", "I50", "Z951", "Z955") | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(valid_prefixes): | |
| return None | |
| return fail("rCardRehabNoCardiac", f"Cardiac rehab CPT {cpt(row)} requires cardiac diagnosis (I20–I25, I50, or Z95.1/Z95.5).") | |
| def rule_awv_medicare_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"G0438", "G0439"}: | |
| return None | |
| age = patient_age(row) | |
| if age is None or age >= 65: | |
| return None | |
| return fail("rAWVAge", f"AWV CPT {cpt(row)} requires Medicare-eligible age (65+), got age {age:.0f}.") | |
| def rule_screening_colonoscopy_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"G0121", "45378", "45380", "45385"}: | |
| return None | |
| dx_flat = [x.replace(".", "") for x in dx_list(row)] | |
| is_screening = any(d.startswith("Z1211") or d.startswith("Z8671") for d in dx_flat) | |
| if not is_screening: | |
| return None | |
| age = patient_age(row) | |
| if age is None or age >= 45: | |
| return None | |
| return fail("rColonScreenAge", f"Screening colonoscopy not recommended before age 45 (USPSTF), got age {age:.0f}.") | |
| def rule_inpatient_only_cpt_office(row: Dict) -> Optional[RuleResult]: | |
| # Sample of CMS Medicare Inpatient-Only (IPO) list — must be inpatient hospital | |
| inpatient_only = { | |
| "33533", "33534", "33535", "33536", # CABG | |
| "27130", "27132", "27134", "27137", "27138", # Hip arthroplasty | |
| "27447", "27486", "27487", # Knee arthroplasty | |
| "43644", "43645", "43770", "43775", # Bariatric | |
| "23470", "23472", # Shoulder arthroplasty | |
| "61510", "61512", "61518", "61519", "61521", # Craniotomy | |
| "63081", "63082", "63085", "63086", # Vertebral corpectomy | |
| } | |
| if cpt(row) not in inpatient_only: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos == "11": | |
| return fail("rIPOatOffice", f"CPT {cpt(row)} is on Medicare Inpatient-Only list — cannot be billed at POS 11 (office).") | |
| return None | |
| def rule_prostate_cpt_gender(row: Dict) -> Optional[RuleResult]: | |
| prostate_cpts = { | |
| "55700", "55705", "55706", | |
| "55801", "55810", "55812", "55815", | |
| "55821", "55831", "55840", "55842", "55845", | |
| "55866", "55873", "55875", | |
| } | |
| if cpt(row) not in prostate_cpts: | |
| return None | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| if gender == "female": | |
| return fail("rProstateFemale", f"Prostate CPT {cpt(row)} is not valid for Female gender.") | |
| return None | |
| def rule_pulmonary_rehab_requires_pulmonary_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"94625", "94626", "G0237", "G0238", "G0239", "G0424"}: | |
| return None | |
| dx_flat = [x.replace(".", "") for x in dx_list(row)] | |
| for d in dx_flat: | |
| # J40-J47 COPD/chronic bronchitis/emphysema, J84 interstitial lung, J96 respiratory failure, J68 chemical injury | |
| if d.startswith(("J40","J41","J42","J43","J44","J45","J46","J47","J84","J96","J68")): | |
| return None | |
| return fail("rPulmRehabNoPulm", f"Pulmonary rehab CPT {cpt(row)} requires pulmonary DX (J40–J47, J84, or J96).") | |
| def rule_cardiac_cath_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: | |
| cath_cpts = {"93451","93452","93453","93454","93455","93456","93457","93458","93459", | |
| "93460","93461","93530","93531","93532","93533"} | |
| if cpt(row) not in cath_cpts: | |
| return None | |
| dx_flat = [x.replace(".", "") for x in dx_list(row)] | |
| for d in dx_flat: | |
| # I20-I25 ischemic heart, I42-I43 cardiomyopathy, I48 afib, I49 arrhythmia, I50 CHF, R07 chest pain, R00 palpitations | |
| if d.startswith(("I20","I21","I22","I23","I24","I25","I42","I43","I48","I49","I50","R07","R000","R001")): | |
| return None | |
| return fail("rCardCathNoCardiac", f"Cardiac cath CPT {cpt(row)} requires cardiac DX (I20–I25, I42–I50, or R07/R00).") | |
| def rule_radiation_oncology_requires_cancer(row: Dict) -> Optional[RuleResult]: | |
| rad_cpts = {"77261","77262","77263","77280","77285","77290","77293","77295", | |
| "77299","77300","77301","77306","77307","77316","77317","77318", | |
| "77321","77331","77332","77333","77334","77336","77338", | |
| "77370","77371","77372","77373","77385","77386","77387", | |
| "77401","77402","77407","77412","77417","77427", | |
| "77520","77522","77523","77525"} | |
| if cpt(row) not in rad_cpts: | |
| return None | |
| for d in dx_list(row): | |
| if d.startswith("C") or d.startswith(("D0","D37","D38","D39","D4")): | |
| return None | |
| return fail("rRadOncNoCancer", f"Radiation oncology CPT {cpt(row)} requires neoplasm DX (C-code or D00–D48).") | |
| def rule_pt_requires_msk_neuro_dx(row: Dict) -> Optional[RuleResult]: | |
| pt_cpts = {"97110","97112","97116","97124","97140","97150","97530","97535","97537","97542","97545","97546"} | |
| if cpt(row) not in pt_cpts: | |
| return None | |
| dx_flat = [x.replace(".", "") for x in dx_list(row)] | |
| for d in dx_flat: | |
| # M-codes MSK, G8x palsies, S-codes injuries, Z47/Z48/Z51 aftercare, R26 gait/mobility | |
| if d.startswith(("M","S","G8","Z47","Z48","Z51","R26")): | |
| return None | |
| return fail("rPTNoMSK", f"PT CPT {cpt(row)} requires MSK (M-code), neuro (G8x), injury (S-code), or aftercare (Z47/Z48/Z51) DX.") | |
| def rule_cataract_surgery_requires_cataract_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"66982","66983","66984","66987","66988"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("H25","H26","H28")): | |
| return None | |
| return fail("rCataractNoDx", f"Cataract surgery CPT {cpt(row)} requires cataract DX (H25–H28).") | |
| def rule_joint_injection_requires_arthritis_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"20600","20604","20605","20606","20610","20611"}: | |
| return None | |
| dx_flat = [x.replace(".", "") for x in dx_list(row)] | |
| for d in dx_flat: | |
| # M00-M25 arthropathies, M60-M79 soft tissue disorders, S-codes injury | |
| if d.startswith(("M0","M1","M2","M6","M7","S")): | |
| return None | |
| return fail("rJointInjNoDx", f"Joint injection CPT {cpt(row)} requires joint/soft-tissue/injury DX (M0x–M2x, M6x–M7x, S-codes).") | |
| def rule_spine_surgery_requires_spine_dx(row: Dict) -> Optional[RuleResult]: | |
| spine_cpts = {"22551","22552","22554","22556","22558","22590","22595","22612", | |
| "22614","22630","22632","22633","22634", | |
| "63020","63030","63042","63045","63047","63048", | |
| "63056","63075","63081","63082","63085","63086"} | |
| if cpt(row) not in spine_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("M40","M41","M42","M43","M45","M46","M47","M48","M50","M51","M53","M54","M96","S14","S24","S34")): | |
| return None | |
| return fail("rSpineSurgNoDx", f"Spine surgery CPT {cpt(row)} requires spine DX (M40–M54, M96, or S14/S24/S34).") | |
| def rule_mammography_female(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"77063","77065","77066","77067"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rMammoMale", f"Mammography CPT {cpt(row)} is typically female-only; found Male gender.") | |
| return None | |
| def rule_hysterectomy_female(row: Dict) -> Optional[RuleResult]: | |
| hyst_cpts = {"58150","58152","58180","58200","58210","58260","58262","58263","58267", | |
| "58270","58275","58280","58285","58290","58291","58292","58293","58294", | |
| "58541","58542","58543","58544","58548","58550","58552","58553","58554", | |
| "58570","58571","58572","58573"} | |
| if cpt(row) not in hyst_cpts: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rHystMale", f"Hysterectomy CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| def rule_testicular_cpt_male(row: Dict) -> Optional[RuleResult]: | |
| test_cpts = {"54500","54505","54512","54520","54522","54530","54535","54550","54560"} | |
| if cpt(row) not in test_cpts: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "female": | |
| return fail("rTesticFemale", f"Testicular CPT {cpt(row)} is not valid for Female gender.") | |
| return None | |
| def rule_gallbladder_surgery_requires_dx(row: Dict) -> Optional[RuleResult]: | |
| gb_cpts = {"47562","47563","47564","47600","47605","47610","47612","47620"} | |
| if cpt(row) not in gb_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("K80","K81","K82","K83","C23")): | |
| return None | |
| return fail("rGallbladderNoDx", f"Gallbladder surgery CPT {cpt(row)} requires gallbladder DX (K80–K83 or C23).") | |
| def rule_appendectomy_requires_dx(row: Dict) -> Optional[RuleResult]: | |
| app_cpts = {"44950","44955","44960","44970"} | |
| if cpt(row) not in app_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("K35","K36","K37","K38")) or d == "C181": | |
| return None | |
| return fail("rAppendNoDx", f"Appendectomy CPT {cpt(row)} requires appendicitis DX (K35–K38 or C18.1).") | |
| def rule_tonsillectomy_requires_dx(row: Dict) -> Optional[RuleResult]: | |
| tons_cpts = {"42820","42821","42825","42826","42830","42831","42835","42836"} | |
| if cpt(row) not in tons_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("J02","J03","J35","J36")) or d == "G4733": | |
| return None | |
| return fail("rTonsilNoDx", f"Tonsillectomy CPT {cpt(row)} requires tonsil/adenoid/OSA DX (J02, J03, J35, J36, or G47.33).") | |
| def rule_pediatric_preventive_age(row: Dict) -> Optional[RuleResult]: | |
| code = cpt(row) | |
| age_limits = {"99381":(0,1), "99382":(1,4), "99383":(5,11), "99384":(12,17), | |
| "99391":(0,1), "99392":(1,4), "99393":(5,11), "99394":(12,17)} | |
| if code not in age_limits: | |
| return None | |
| age = patient_age(row) | |
| if age is None: | |
| return None | |
| lo, hi = age_limits[code] | |
| if age < lo or age > hi: | |
| return fail("rPedPrevAge", f"Preventive CPT {code} expects age {lo}–{hi}, got age {age:.0f}.") | |
| return None | |
| def rule_adult_preventive_age(row: Dict) -> Optional[RuleResult]: | |
| code = cpt(row) | |
| age_limits = {"99385":(18,39), "99386":(40,64), "99387":(65,200), | |
| "99395":(18,39), "99396":(40,64), "99397":(65,200)} | |
| if code not in age_limits: | |
| return None | |
| age = patient_age(row) | |
| if age is None: | |
| return None | |
| lo, hi = age_limits[code] | |
| if age < lo or age > hi: | |
| return fail("rAdultPrevAge", f"Preventive CPT {code} expects age {lo}–{hi}, got age {age:.0f}.") | |
| return None | |
| def rule_ed_visit_requires_pos23(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99281","99282","99283","99284","99285"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos and pos != "23": | |
| return fail("rEDVisitPOS", f"ED visit CPT {cpt(row)} requires POS 23 (emergency room), got POS {pos}.") | |
| return None | |
| def rule_office_em_inpatient_pos(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99202","99203","99204","99205","99212","99213","99214","99215"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos in {"21","31","32","51","61"}: | |
| return fail("rOfficeEMInpt", f"Office E&M CPT {cpt(row)} not valid at inpatient POS {pos}; use inpatient visit codes (99221–99233).") | |
| return None | |
| def rule_bone_density_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"77080","77081","77085","77086"}: | |
| return None | |
| age = patient_age(row) | |
| if age is None or age >= 50: | |
| return None | |
| return fail("rBoneDensAge", f"Bone density CPT {cpt(row)} typically not indicated before age 50, got age {age:.0f}.") | |
| def rule_screening_mammo_requires_z1231(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"77063","77067"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith("Z1231"): | |
| return None | |
| return fail("rScreenMammoDx", f"Screening mammography CPT {cpt(row)} requires DX Z12.31 (screening encounter).") | |
| def rule_wellness_visit_requires_z00(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99385","99386","99387","99395","99396","99397"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith("Z00"): | |
| return None | |
| return fail("rWellnessDx", f"Preventive wellness CPT {cpt(row)} requires Z00.xx encounter DX.") | |
| # ---- Imaging medical-necessity ---- | |
| def rule_mri_brain_requires_neuro_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"70551","70552","70553","70554","70555"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # G-codes neuro, I60-I69 cerebrovascular, R40-R42/R51 head sx, S00/S06 head injury, | |
| # C70-C72 brain tumors, F01-F09 dementia/cognitive, Z0183 neoplasm f/u | |
| if (d.startswith("G") or d.startswith(("I6","R4","R51","S00","S06","C70","C71","C72","F0")) | |
| or d == "Z0183"): | |
| return None | |
| return fail("rMRIBrainNoDx", f"MRI brain CPT {cpt(row)} requires neuro/head DX (G-code, I60–I69, R40–R51, S00/S06, C70–C72).") | |
| def rule_ct_chest_requires_chest_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"71250","71260","71270","71271","71275"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # J-codes pulm, I-codes cardiac, C34 lung CA, R05/R06/R07/R09 chest sx, S27 thorax injury, Z87 hx | |
| if (d.startswith(("J","I","C34","C33","R05","R06","R07","R09","S27","Z87"))): | |
| return None | |
| return fail("rCTChestNoDx", f"CT chest CPT {cpt(row)} requires chest DX (J/I codes, C33–C34, R05–R09, S27).") | |
| def rule_ct_abdomen_requires_abdominal_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"74150","74160","74170","74176","74177","74178"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # K-codes digestive, N-codes GU, R10/R11/R14/R19 abd sx, C15-C26 GI CA, S30-S39 abd injury | |
| if (d.startswith("K") or d.startswith("N") or d.startswith(("R10","R11","R14","R19","S3")) | |
| or d.startswith(("C15","C16","C17","C18","C19","C20","C21","C22","C23","C24","C25","C26","C67"))): | |
| return None | |
| return fail("rCTAbdNoDx", f"CT abdomen CPT {cpt(row)} requires abdominal DX (K/N code, R10–R19, S30–S39, C15–C26).") | |
| def rule_echo_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"93303","93304","93306","93307","93308","93312","93313","93314","93315","93316","93317","93320","93321"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith(("I","R07","R000","R001","R06","R55","Q2","Q3")): | |
| return None | |
| return fail("rEchoNoCardiac", f"Echocardiogram CPT {cpt(row)} requires cardiac DX (I-code, R00/R06/R07/R55, or Q2x/Q3x congenital).") | |
| # ---- Lab medical-necessity ---- | |
| def rule_a1c_requires_diabetes_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"83036","83037"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # E08-E13 diabetes, R73 elevated glucose, Z131 diabetes screening, O24 gestational, Z830 FH diabetes | |
| if d.startswith(("E08","E09","E10","E11","E12","E13","R73","Z131","O24","Z833")): | |
| return None | |
| return fail("rA1CNoDiab", f"HbA1c CPT {cpt(row)} requires diabetes/prediabetes DX (E08–E13, R73, Z13.1, O24).") | |
| def rule_lipid_panel_requires_cv_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"80061"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # Cardiovascular risk / disease / screening indicators | |
| if d.startswith(("I","E11","E10","E78","F17","Z13220","Z13","Z72","Z83","R7302","R794")): | |
| return None | |
| return fail("rLipidNoCV", f"Lipid panel CPT {cpt(row)} requires CV-risk/disease DX (I-code, E78, E10/E11, Z13.220, Z83.42, F17).") | |
| def rule_psa_male(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"84152","84153","84154","G0103"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "female": | |
| return fail("rPSAFemale", f"PSA CPT {cpt(row)} is not valid for Female gender.") | |
| return None | |
| def rule_pap_smear_female(row: Dict) -> Optional[RuleResult]: | |
| pap_cpts = {"88141","88142","88143","88147","88148","88150","88152","88153","88154","88155", | |
| "88164","88165","88166","88167","88174","88175","Q0091","G0123","G0124","G0141","G0143","G0144","G0145","G0147","G0148"} | |
| if cpt(row) not in pap_cpts: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rPapMale", f"Pap smear CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| # ---- DME medical-necessity ---- | |
| def rule_cpap_requires_osa_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"E0601","E0470","E0471","E0561","E0562","95810","95811"}: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| if d.startswith("G473"): | |
| return None | |
| return fail("rCPAPNoOSA", f"CPAP/BiPAP CPT {cpt(row)} requires sleep apnea DX (G47.3x).") | |
| def rule_wheelchair_requires_mobility_dx(row: Dict) -> Optional[RuleResult]: | |
| wc_cpts = {"K0001","K0002","K0003","K0004","K0005","K0006","K0007","K0009", | |
| "E1050","E1060","E1070","E1083","E1084","E1085","E1086","E1087","E1088","E1089","E1090", | |
| "E1100","E1110","E1130","E1140","E1150","E1160","E1161","E1170","E1180","E1190", | |
| "E1200","E1220","E1230","E1231","E1232","E1233","E1234","E1235","E1236","E1237","E1238"} | |
| if cpt(row) not in wc_cpts: | |
| return None | |
| for d in (x.replace(".", "") for x in dx_list(row)): | |
| # MSK, neuro (G20/G35/G80-G83/I69 stroke), injury, R26 gait | |
| if d.startswith(("M","S","G20","G35","G36","G80","G81","G82","G83","I69","R26","Z99")): | |
| return None | |
| return fail("rWheelchairNoDx", f"Wheelchair CPT {cpt(row)} requires mobility-impairment DX (M/S-code, G20/G35/G80–G83, I69, R26).") | |
| # ---- Gender/anatomy specific ---- | |
| def rule_fetal_us_female(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"76801","76802","76805","76810","76811","76812","76813","76814","76815","76816","76817","76818","76819","76820","76821"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rFetalUSMale", f"Fetal ultrasound CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| def rule_circumcision_male(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"54150","54160","54161","54162","54163","54164"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "female": | |
| return fail("rCircFemale", f"Circumcision CPT {cpt(row)} is not valid for Female gender.") | |
| return None | |
| def rule_iud_female(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"58300","58301","J7296","J7297","J7298","J7300","J7301"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rIUDMale", f"IUD CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| def rule_colposcopy_female(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"57420","57421","57452","57454","57455","57456","57460","57461"}: | |
| return None | |
| if str(get(row, "Gender", "")).strip().lower() == "male": | |
| return fail("rColpoMale", f"Colposcopy CPT {cpt(row)} is not valid for Male gender.") | |
| return None | |
| # ---- POS-specific E&M ---- | |
| def rule_inpatient_em_requires_inpatient_pos(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99221","99222","99223","99231","99232","99233","99238","99239"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos and pos not in {"21","51","61"}: | |
| return fail("rInptEMPos", f"Inpatient E&M CPT {cpt(row)} requires POS 21/51/61, got POS {pos}.") | |
| return None | |
| def rule_observation_em_requires_pos22(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99218","99219","99220","99224","99225","99226","99234","99235","99236"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos and pos != "22": | |
| return fail("rObsEMPos", f"Observation E&M CPT {cpt(row)} requires POS 22, got POS {pos}.") | |
| return None | |
| def rule_nf_em_requires_pos_31_32(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99304","99305","99306","99307","99308","99309","99310","99315","99316"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos and pos not in {"31","32"}: | |
| return fail("rNFEMPos", f"Nursing facility E&M CPT {cpt(row)} requires POS 31/32, got POS {pos}.") | |
| return None | |
| def rule_home_visit_requires_pos12(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"99341","99342","99343","99344","99345","99347","99348","99349","99350"}: | |
| return None | |
| pos = str(get(row, "POS", "")).strip() | |
| if pos and pos != "12": | |
| return fail("rHomeVisitPos", f"Home visit CPT {cpt(row)} requires POS 12, got POS {pos}.") | |
| return None | |
| # ---- Age/gender-gated screenings ---- | |
| def rule_aaa_screening_male_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"76706","G0389"}: | |
| return None | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| if gender == "female": | |
| return fail("rAAAScreenFemale", f"AAA screening CPT {cpt(row)} is Medicare-covered for male patients only.") | |
| age = patient_age(row) | |
| if age is None: | |
| return None | |
| if age < 65 or age > 75: | |
| return fail("rAAAScreenAge", f"AAA screening CPT {cpt(row)} covered only at age 65–75, got age {age:.0f}.") | |
| return None | |
| def rule_lung_ct_screening_age(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) not in {"71271","G0296","G0297"}: | |
| return None | |
| age = patient_age(row) | |
| if age is None: | |
| return None | |
| if age < 50 or age > 77: | |
| return fail("rLungCTScreenAge", f"Lung CT screening CPT {cpt(row)} covered only at age 50–77, got age {age:.0f}.") | |
| return None | |
| def rule_q4169_supervision(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "Q4169": | |
| return fail("rCPTCodeQ4169", "Supervision review mandatory for CPT Q4169.") | |
| return None | |
| def rule_medicare_forbidden_cpt(row: Dict) -> Optional[RuleResult]: | |
| forbidden = {"90471", "99417", "H0004", "1152F", "3281F"} | |
| if cpt(row) in forbidden and is_medicare(row): | |
| return fail("rNoMedicareCPT", f"Medicare does not cover CPT {cpt(row)}.") | |
| return None | |
| def rule_g0444_unbundle(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0444": | |
| return fail("rcptG0438&G0444", "G0444 has an unbundle relationship with G0438.") | |
| return None | |
| def rule_0000_nonbillable(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "0000" or any(d == "0000" for d in dx_list(row)): | |
| return fail("rCPTDX0000", "CPT or DX 0000 is non-billable.") | |
| return None | |
| def rule_g2211_modifier25(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G2211" and has_modifier(row, "25"): | |
| return fail("rG2211cpt25mf", "G2211 is not payable when reported with modifier 25.") | |
| return None | |
| def rule_g2011_modifier(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G2011" and any(m for m in get_modifiers(row) if m): | |
| return fail("rCPTCodeG2011", "Modifier is inappropriate with CPT G2011.") | |
| return None | |
| def rule_smoking_cessation_requires_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in ("99406", "99407") and not dx_list(row): | |
| return fail("rSCCDVdxcode", f"CPT {cpt(row)} requires at least one valid diagnosis code.") | |
| return None | |
| def rule_q4221_units(row: Dict) -> Optional[RuleResult]: | |
| units = to_float(get(row, "ServiceUnits", 1), 1) | |
| if cpt(row) == "Q4221" and units > 52: | |
| return fail("rcptQ4221", f"Max allowable units for Q4221 is 52. Current: {units:.0f}") | |
| return None | |
| def rule_q4250_units(row: Dict) -> Optional[RuleResult]: | |
| units = to_float(get(row, "ServiceUnits", 1), 1) | |
| if cpt(row) == "Q4250" and units > 24: | |
| return fail("rcptQ4250", f"Max allowable units for Q4250 is 24. Current: {units:.0f}") | |
| return None | |
| def rule_95004_min_units(row: Dict) -> Optional[RuleResult]: | |
| units = to_float(get(row, "ServiceUnits", 1), 1) | |
| if cpt(row) == "95004" and units < 70: | |
| return fail("r95004Unit<70", f"Less than 70 units not allowed for CPT 95004. Current: {units:.0f}") | |
| return None | |
| def rule_95165_min_units(row: Dict) -> Optional[RuleResult]: | |
| units = to_float(get(row, "ServiceUnits", 1), 1) | |
| if cpt(row) == "95165" and units < 10: | |
| return fail("r95165Unit<10", f"Less than 10 units not allowed for CPT 95165. Current: {units:.0f}") | |
| return None | |
| def rule_zero_units(row: Dict) -> Optional[RuleResult]: | |
| if to_float(get(row, "ServiceUnits", 1), 1) == 0: | |
| return fail("r0Units", "0 units is not allowed.") | |
| return None | |
| def rule_33340_primary_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) != "33340": | |
| return None | |
| dxs = dx_list(row) | |
| primary_dx = dxs[0].replace(".", "") if dxs else "" | |
| valid = {"I480", "I4811", "I4819", "I4821"} | |
| if primary_dx not in valid: | |
| return fail("r33340dx1", "For 33340, primary DX should be I48.0, I48.11, I48.19, or I48.21.") | |
| return None | |
| def rule_33340_pos(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "33340" and str(get(row, "POS", "")).strip() not in ("", "21"): | |
| return fail("r33340POS21", f"For 33340, POS should be 21. Current POS: {get(row, 'POS', '')}") | |
| return None | |
| def rule_rpm_checkbox(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"99453", "99454", "99457", "99458"}: | |
| is_rpm = str(get(row, "ISRPM", "0")).strip().lower() | |
| if is_rpm not in ("1", "true", "yes"): | |
| return fail("rRPMchk", f"RPM box must be checked for RPM CPT {cpt(row)}.") | |
| return None | |
| def rule_q_series_note(row: Dict) -> Optional[RuleResult]: | |
| code = cpt(row) | |
| if code.startswith("Q") and not str(get(row, "NoteText", "")).strip(): | |
| return fail("rQCPTCode", "Claim note text is mandatory for Q-series CPT code.") | |
| return None | |
| def rule_colon_in_note(row: Dict) -> Optional[RuleResult]: | |
| if ":" in str(get(row, "NoteText", "")).strip(): | |
| return fail("rColonNotAllowed", "Colon character is not allowed in NoteText.") | |
| return None | |
| def rule_wellmed_g0508(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0508" and "wellmed" in str(get(row, "ClaimPriIns", "")).lower(): | |
| return fail("rWellmedInsObjection", "Wellmed insurance does not accept CPT G0508.") | |
| return None | |
| def rule_prolong_service(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"99417", "G2212", "G0318", "G0317", "G0316", "99418"}: | |
| return fail("rProlongService", f"Prolong service code {cpt(row)} requires a high level E&M pairing.") | |
| return None | |
| def rule_99490_not_bill(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99490": | |
| return fail("rcpt99490", "Per provider instruction, do not bill CCM service 99490.") | |
| return None | |
| def rule_99024_nonreplacement(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99024": | |
| return fail("r99024", "99024 is non-replacement CPT without provider instructions.") | |
| return None | |
| def rule_99498_add_on(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99498": | |
| return fail("rcpt99498", "99498 is an add-on code and must be billed with 99497.") | |
| return None | |
| def rule_vaccine_admin_required(row: Dict) -> Optional[RuleResult]: | |
| vax_combos = { | |
| "91300": ["0001A", "0002A"], | |
| "91301": ["0011A", "0012A"], | |
| "91302": ["0021A", "0022A"], | |
| "91303": ["0031A"], | |
| } | |
| code = cpt(row) | |
| if code in vax_combos: | |
| return fail(f"rAdmin{code}", f"Administration code {'/'.join(vax_combos[code])} is mandatory for CPT {code}.") | |
| return None | |
| def rule_90653_admin_required(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "90653": | |
| return fail("rAdmcode90653", "Administration code G0008 or 90471 is mandatory for CPT 90653.") | |
| return None | |
| def rule_self_pay_with_active_insurance(row: Dict) -> Optional[RuleResult]: | |
| if str(get(row, "BillAs", "")).lower() == "self-pay" and get(row, "ClaimPriIns"): | |
| return fail("rInsAct", f"Patient has active insurance {get(row, 'ClaimPriIns')}. Do not bill as self-pay.") | |
| return None | |
| def rule_g9719_required_dx(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G9719": | |
| required = {"F319", "F329", "F339"} | |
| if not any(d.replace(".", "") in required for d in dx_list(row)): | |
| return fail("rCPTG9719dx", "Any one of F31.9, F32.9, or F33.9 is mandatory with CPT G9719.") | |
| return None | |
| def rule_mips_99221_99223(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if cpt(row) in {"99221", "99222", "99223"} and age is not None and age >= 65: | |
| return fail("rMIPS1", "MIPS code from QID#47 and G8427 is mandatory for CPT 99221-99223 when patient is 65+.") | |
| return None | |
| def rule_mips_99236(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "99236": | |
| return fail("rMIPS2", "MIPS code from QID#128 is mandatory for CPT 99236.") | |
| return None | |
| def rule_mips_99304_99306(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if cpt(row) in {"99304", "99305", "99306"} and age is not None and age >= 65: | |
| return fail("rMIPS3", "MIPS codes from QID#47 and QID#181 are mandatory for CPT 99304-99306 when patient is 65+.") | |
| return None | |
| def rule_mips_99307_99310(row: Dict) -> Optional[RuleResult]: | |
| age = patient_age(row) | |
| if cpt(row) in {"99307", "99308", "99309", "99310"} and age is not None and age >= 65: | |
| return fail("rMIPS4", f"G8427 is mandatory for CPT {cpt(row)} when patient is 65+.") | |
| return None | |
| def rule_comb1(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"G8733", "G8734", "G8535"}: | |
| return fail("rCOMB1", "CPT G8733, G8734, and G8535 are not acceptable together.") | |
| return None | |
| def rule_comb3(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"G8433", "G8431", "G8510"}: | |
| return fail("rCOMB3", "CPT G8433, G8431, and G8510 are not acceptable together.") | |
| return None | |
| def rule_comb4(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"G8420", "G8417", "G8418", "G9716"}: | |
| return fail("rCOMB4", "CPT G8420, G8417, G8418, and G9716 are not acceptable together.") | |
| return None | |
| def rule_comb2(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"1123F", "1124F"}: | |
| return fail("rCOMB2", "CPT 1123F and 1124F are not acceptable together.") | |
| return None | |
| def rule_medicare_uhc_use_g0447(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"99401", "99402", "99403"}: | |
| ins = str(get(row, "ClaimPriIns", "")).lower() | |
| if any(x in ins for x in ("medicare", "uhc", "united", "wellmed")): | |
| return fail("rcptG0447", f"For Medicare or UHC, use G0447 instead of {cpt(row)}.") | |
| return None | |
| def rule_99441_99443_pos11(row: Dict) -> Optional[RuleResult]: | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in {"99441", "99442", "99443"} and pos not in ("", "11"): | |
| return fail("rPOS11SpecCPT", "POS 11 is only acceptable for CPT codes 99441-99443.") | |
| return None | |
| def rule_99354_99359_not_pos11(row: Dict) -> Optional[RuleResult]: | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in {"99354", "99355", "99358", "99359"} and pos == "11": | |
| return fail("rCptPos11", f"CPT {cpt(row)} is not billable for office services POS 11.") | |
| return None | |
| def rule_0034a_date(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if cpt(row) == "0034A" and dos and dos < date(2021, 10, 20): | |
| return fail("rCPT0034A", "CPT 0034A is effective from 2021-10-20.") | |
| return None | |
| def rule_0071a_0072a_date(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if cpt(row) in {"0071A", "0072A"} and dos and dos < date(2021, 10, 29): | |
| return fail("rCPT0071A", f"CPT {cpt(row)} is effective from 2021-10-29.") | |
| return None | |
| def rule_g0438_use_99203(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "G0438": | |
| return fail("rCPT99203", "Please use bill code 99203 instead of G0438.") | |
| return None | |
| def rule_99470_99457_99458(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"99470", "99457", "99458"}: | |
| return fail("rCPTCode99470&99457/99458", "99470 and 99457/99458 are not billable on the same DOS.") | |
| return None | |
| def rule_99454_99445(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) in {"99454", "99445"}: | |
| return fail("rCPTCode99454&99445", "CPT 99454 and 99445 must not be billed for same patient on same DOS.") | |
| return None | |
| def rule_office_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| office_cpts = {"99201", "99202", "99203", "99204", "99205", "99211", "99212", "99213", "99214", "99215"} | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in office_cpts and pos not in ("", "11", "02"): | |
| return fail("rCPTCode=2", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 11 or 02.") | |
| return None | |
| def rule_inpatient_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| inpatient_cpts = {"99221", "99222", "99223", "99231", "99232", "99233", "99238", "99239"} | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in inpatient_cpts and pos not in ("", "21", "61"): | |
| return fail("rCPTCode=3", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 21 or 61.") | |
| return None | |
| def rule_snf_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| snf_cpts = {"99304", "99305", "99306", "99307", "99308", "99309", "99310", "99315", "99316"} | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in snf_cpts and pos not in ("", "31", "32", "02", "13"): | |
| return fail("rCPTCode=4", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 31, 32, 02, or 13.") | |
| return None | |
| def rule_observation_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in {"99217", "99218", "99219", "99220"} and pos not in ("", "22"): | |
| return fail("rCPTCode=5", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 22.") | |
| return None | |
| def rule_er_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in {"99281", "99282", "99283", "99284", "99285", "99286", "99287", "99288"} and pos not in ("", "23"): | |
| return fail("rCPTCode=6", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 23.") | |
| return None | |
| def rule_home_visit_cpt_pos(row: Dict) -> Optional[RuleResult]: | |
| pos = str(get(row, "POS", "")).strip() | |
| if cpt(row) in {"99341", "99342", "99343", "99344", "99345", "99347", "99348", "99349", "99350"} and pos not in ("", "02", "10", "12", "13", "14"): | |
| return fail("rCPTCode=7", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 02, 10, 12, 13, or 14.") | |
| return None | |
| def rule_admission_date_empty_for_office(row: Dict) -> Optional[RuleResult]: | |
| if str(get(row, "POS", "")).strip() == "11" and adm_date(row): | |
| return fail("rAdmissionDate=1", "Admission date must be empty for office visit POS 11.") | |
| return None | |
| def rule_admission_date_required_inpatient(row: Dict) -> Optional[RuleResult]: | |
| if str(get(row, "POS", "")).strip() == "21" and not adm_date(row): | |
| return fail("rAdmissionDate=2", "Admission date must be entered for inpatient POS 21.") | |
| return None | |
| def rule_discharge_not_before_admission(row: Dict) -> Optional[RuleResult]: | |
| adm = adm_date(row) | |
| dis = dis_date(row) | |
| if adm and dis and dis < adm: | |
| return fail("rAdmissionDate3", f"Discharge date {dis} should not be earlier than admission date {adm}.") | |
| return None | |
| def rule_future_admission_date(row: Dict) -> Optional[RuleResult]: | |
| adm = adm_date(row) | |
| if adm and adm > date.today(): | |
| return fail("rAdmComp", "Future admission date is not allowed.") | |
| return None | |
| def rule_future_dos(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if dos and dos > date.today(): | |
| return fail("rnotsubclm", f"Claim must not be submitted with future DOS {dos}.") | |
| return None | |
| def rule_january_2020_dos(row: Dict) -> Optional[RuleResult]: | |
| dos = dos_date(row) | |
| if dos and dos.year == 2020 and dos.month == 1: | |
| return fail("rDOSJan20", "Claim is not valid for DOS in January 2020.") | |
| return None | |
| def rule_dos_required(row: Dict) -> Optional[RuleResult]: | |
| if not dos_date(row): | |
| return fail("rDOSMissed", "DOS is mandatory field.") | |
| return None | |
| def rule_dummy_dos(row: Dict) -> Optional[RuleResult]: | |
| if "1900" in str(get(row, "DOS", "")): | |
| return fail("rDummyDOS", "Dummy DOS entered in claim.") | |
| return None | |
| def rule_charges_positive(row: Dict) -> Optional[RuleResult]: | |
| if to_float(get(row, "TotalCharges", 0), 0) <= 0: | |
| return fail("rCharges", "CPT charges must be greater than 0.") | |
| return None | |
| def rule_charges_max(row: Dict) -> Optional[RuleResult]: | |
| charges = to_float(get(row, "TotalCharges", 0), 0) | |
| if charges > 99999: | |
| return fail("r99Kcharges", f"Charges cannot exceed 99,999. Current: {charges:,.2f}") | |
| return None | |
| def rule_patient_payment_not_exceed_charges(row: Dict) -> Optional[RuleResult]: | |
| charges = to_float(get(row, "TotalCharges", 0), 0) | |
| pat_pmt = to_float(get(row, "PatientPayment", 0), 0) | |
| if pat_pmt > 0 and charges < pat_pmt: | |
| return fail("rChgPatPmt", f"Claim charges {charges:,.2f} cannot be less than patient payment {pat_pmt:,.2f}.") | |
| return None | |
| def rule_referring_required_for_medicare(row: Dict) -> Optional[RuleResult]: | |
| if is_medicare(row) and not str(get(row, "ReferringPhysician", "")).strip(): | |
| return fail("rRefferingPhysician", "Referring physician is mandatory for Medicare claims.") | |
| return None | |
| def rule_referral_requires_referring(row: Dict) -> Optional[RuleResult]: | |
| ref_num = str(get(row, "ReferralNumber", "")).strip() | |
| if ref_num and ref_num != "0" and not str(get(row, "ReferringPhysician", "")).strip(): | |
| return fail("rReferralNo", "When referral number is sent then referring provider is mandatory.") | |
| return None | |
| def rule_attending_required(row: Dict) -> Optional[RuleResult]: | |
| if not str(get(row, "AttendingPhysician", "")).strip(): | |
| return fail("rAttendingPhysician", "Attending Physician is mandatory field.") | |
| return None | |
| def rule_billing_required(row: Dict) -> Optional[RuleResult]: | |
| if not str(get(row, "BillingPhysician", "")).strip(): | |
| return fail("rBillingPhysician", "Billing Physician is mandatory field.") | |
| return None | |
| def rule_medicare_mbi_format(row: Dict) -> Optional[RuleResult]: | |
| if not is_medicare(row): | |
| return None | |
| mbi = str(get(row, "PolicyNumber", "")).strip().replace("-", "").replace(" ", "") | |
| if mbi and len(mbi) == 11: | |
| valid_mbi = True | |
| letter_pos = {1, 4, 7, 8} | |
| number_pos = {0, 3, 6, 9, 10} | |
| for i, ch in enumerate(mbi): | |
| if i in letter_pos and not ch.isalpha(): | |
| valid_mbi = False | |
| if i in number_pos and not ch.isdigit(): | |
| valid_mbi = False | |
| if not valid_mbi: | |
| return fail("rPolicyNumber", f"Patient MBI format is invalid: {mbi}") | |
| return None | |
| def rule_dummy_dob(row: Dict) -> Optional[RuleResult]: | |
| if "1900" in str(get(row, "DateOfBirth", "")): | |
| return fail("rDummyDOB", "Dummy DOB entered in patient.") | |
| return None | |
| def rule_dob_required(row: Dict) -> Optional[RuleResult]: | |
| if not get(row, "DateOfBirth"): | |
| return fail("rDobMis", "DOB is Mandatory field.") | |
| return None | |
| def rule_gender_required(row: Dict) -> Optional[RuleResult]: | |
| gender = str(get(row, "Gender", "")).strip().lower() | |
| if not gender or gender in ("unknown", "none"): | |
| return fail("rGender", "Subscriber gender code is mandatory.") | |
| return None | |
| def rule_insurance_required(row: Dict) -> Optional[RuleResult]: | |
| if not get(row, "ClaimPriIns"): | |
| return fail("rInsRequired", "Insurance Required. Please select valid insurance.") | |
| return None | |
| def rule_molina_9_digit_id(row: Dict) -> Optional[RuleResult]: | |
| pol_num = str(get(row, "PolicyNumber", "")).strip().replace("-", "") | |
| if len(pol_num) == 9 and pol_num.isdigit(): | |
| ins = str(get(row, "ClaimPriIns", "")).lower() | |
| if "medicaid" in ins and "molina" not in ins: | |
| return fail("rIns276PolicyNumberequal9", "Please select Molina Medicaid because member ID has 9 digits.") | |
| return None | |
| def rule_aarp_not_primary(row: Dict) -> Optional[RuleResult]: | |
| ins_name = str(get(row, "ClaimPriIns", "")).lower() | |
| if str(get(row, "BillAs", "")).lower() == "primary" and any(x in ins_name for x in ("aarp", "trycare", "tricare for life")): | |
| return fail("rAARPTryCare", f"Insurance {get(row, 'ClaimPriIns')} can never be primary payer.") | |
| return None | |
| def rule_workers_comp_accident_date(row: Dict) -> Optional[RuleResult]: | |
| ins_lower = str(get(row, "ClaimPriIns", "")).lower() | |
| if any(x in ins_lower for x in ("worker", "workers comp", "wc ")) and not get(row, "AccidentDate"): | |
| return fail("WCOMClaims", "Accident date is necessary for worker compensation claims.") | |
| return None | |
| def rule_11042_requires_pa(row: Dict) -> Optional[RuleResult]: | |
| if cpt(row) == "11042" and str(get(row, "BillAs", "")).lower() == "primary" and not get(row, "PANumber"): | |
| return fail("rPACPT11042", "Pre-authorization is missing for wound care 11042.") | |
| return None | |
| # ============================================================ | |
| # ICD-10 → CPT Mapping Rules | |
| # Common diagnosis codes with their medically appropriate CPT | |
| # procedure codes for denial prevention. | |
| # ============================================================ | |
| # ICD-10 primary diagnosis → acceptable CPT codes | |
| ICD_CPT_MAP: Dict[str, set] = { | |
| # --- Diabetes --- | |
| "E11.9": {"99213","99214","99215","99490","99491","83036","82947","81003","G0447"}, | |
| "E11.65": {"99213","99214","99215","83036","82947","80048","99490"}, | |
| "E11.40": {"99213","99214","99215","95905","95907","95908","95913","64490","64491"}, | |
| "E11.311": {"92004","92012","92014","92250","92228"}, | |
| "E10.9": {"99213","99214","99215","83036","82947","95249","99490"}, | |
| # --- Hypertension --- | |
| "I10": {"99213","99214","99215","93000","80048","80053","81003","99490","93786","93788"}, | |
| "I11.9": {"99213","99214","99215","93000","93306","93320","93325","80053"}, | |
| "I12.9": {"99213","99214","99215","80053","82565","81001","82042"}, | |
| # --- Heart Disease --- | |
| "I25.10": {"99213","99214","99215","93000","93306","93307","93350","93351","78452","93015"}, | |
| "I20.9": {"99213","99214","99215","93000","93015","93306","93350","78452"}, | |
| "I50.9": {"99213","99214","99215","93306","93307","93320","71046","80053","83880"}, | |
| "I48.91": {"99213","99214","99215","93000","93306","93268","93241","93243","93650"}, | |
| # --- Respiratory --- | |
| "J06.9": {"99212","99213","99214","87880","87804"}, | |
| "J18.9": {"99213","99214","99215","71045","71046","87046","87081","71250"}, | |
| "J44.1": {"99213","99214","99215","94010","94060","71046","94640","94726"}, | |
| "J44.0": {"99213","99214","99215","94010","71046","87046"}, | |
| "J45.901": {"99213","99214","99215","94010","94060","94640","94664","71046"}, | |
| # --- Musculoskeletal --- | |
| "M54.5": {"99213","99214","72100","72110","72148","97110","97530","64483","64484","63030","63047"}, | |
| "M54.2": {"99213","99214","72040","72050","72141","72142","97110","97530","64490","64491"}, | |
| "M17.11": {"99213","99214","73560","73562","73721","20610","27447","97110"}, | |
| "M16.11": {"99213","99214","73502","73521","73721","27130","20610"}, | |
| "M25.511": {"99213","99214","73010","73030","73221","20610","29827","97110"}, | |
| # --- Mental Health --- | |
| "F32.1": {"99213","99214","90832","90834","90837","96127","G2212"}, | |
| "F41.1": {"99213","99214","90832","90834","90837","96127"}, | |
| "F31.9": {"99213","99214","99215","90837","90847","80305","80306"}, | |
| "F20.9": {"99213","99214","99215","90837","90853","80305"}, | |
| "F10.20": {"99213","99214","90832","90837","G2067","G2068"}, | |
| "F11.20": {"99213","99214","99215","90837","G2067","G2068","G2069"}, | |
| # --- Gastrointestinal --- | |
| "K21.0": {"99213","99214","43239","43235","91034"}, | |
| "K57.30": {"99213","99214","45378","45380","74177"}, | |
| "K92.1": {"99213","99214","99215","43235","43239","45378","45380","74177"}, | |
| "K72.10": {"99214","99215","80076","80053","76700","74177"}, | |
| "K74.60": {"99214","99215","80076","80053","76700","43244"}, | |
| # --- Cancer --- | |
| "C34.10": {"99213","99214","99215","71250","78816","32663","96413","96415","77263","77301"}, | |
| "C50.911": {"99213","99214","99215","77067","76641","78816","19307","96413","96415","77263"}, | |
| "C18.9": {"99214","99215","45378","45384","74177","78816","96413","96415"}, | |
| "C61": {"99213","99214","99215","84153","76872","55866","96413","77263"}, | |
| "C64.1": {"99214","99215","74177","78816","50543","96413"}, | |
| # --- Infectious Disease --- | |
| "A41.9": {"99221","99222","99223","99231","99232","99233","87046","87081","36415","80053","85025"}, | |
| "B20": {"99213","99214","99215","86703","87536","86360","80305"}, | |
| "A09": {"99212","99213","99214","87046","87081","82270"}, | |
| # --- Neurological --- | |
| "G43.909": {"99213","99214","70553","95923","64615"}, | |
| "G35": {"99213","99214","99215","70553","95925","95926","96413"}, | |
| "G20": {"99213","99214","99215","70553","97110","97530","95923"}, | |
| "G47.33": {"99213","99214","95810","95811","94660","E0601"}, | |
| "G89.29": {"99213","99214","64483","64484","64490","64491","62322","62323","97110"}, | |
| # --- Genitourinary --- | |
| "N39.0": {"99212","99213","81001","81003","87086"}, | |
| "N18.3": {"99213","99214","80053","82565","81001","82042"}, | |
| "N18.6": {"99213","99214","99215","90935","90937","90945","90947","80053"}, | |
| "N40.1": {"99213","99214","52601","52441","76872","84153"}, | |
| # --- Endocrine --- | |
| "E03.9": {"99213","99214","84439","84480","84481","76536"}, | |
| "E05.90": {"99213","99214","84439","84443","76536","78012"}, | |
| "E66.01": {"99213","99214","99215","G0447","43644","43770","97802","97803"}, | |
| "E78.5": {"99213","99214","80061","82465","83718","84478"}, | |
| # --- Preventive / Wellness --- | |
| "Z00.00": {"99395","99396","99397","G0438","G0439","85025","80053","83036","80061","93000"}, | |
| "Z12.31": {"45378","45380","G0121","82274"}, | |
| "Z12.11": {"77066","77067","G0202"}, | |
| "Z23": {"90471","90472","90686","90714","90732"}, | |
| # --- OB/GYN --- | |
| "Z34.00": {"99213","99214","76801","80055","86703","85025"}, | |
| "N87.1": {"99213","99214","57460","57455","88141"}, | |
| # --- Injury --- | |
| "S72.001A":{"99221","99222","99223","27235","27236","73502"}, | |
| "S52.501A":{"99213","99214","25600","25605","73100","29125"}, | |
| "S13.4XXA":{"99213","99214","72050","97110","97530"}, | |
| } | |
| # CPT codes that require at least one matching ICD-10 on the claim | |
| CPT_REQUIRES_DX: Dict[str, set] = { | |
| "83036": {"E10.9","E11.9","E11.65","E10.10","R73.09","Z13.1"}, # HbA1c | |
| "84153": {"C61","N40.0","N40.1","Z12.5"}, # PSA | |
| "86703": {"B20","Z11.4","Z34.00","O09.90"}, # HIV 1/2 antibody | |
| "80061": {"E78.5","E78.00","E78.2","I25.10","I10","Z00.00"}, # Lipid panel | |
| "82565": {"N18.3","N18.4","N18.5","N18.6","I12.9","E11.65"}, # Creatinine | |
| "70553": {"G35","G43.909","G20","C71.9","I63.9","G40.909"}, # MRI brain w/wo | |
| "72148": {"M54.5","M51.16","M51.17","G89.29","S33.5XXA"}, # MRI lumbar | |
| "72141": {"M54.2","M50.10","G89.29","S14.9XXA"}, # MRI cervical | |
| "73721": {"M17.11","M17.12","M23.200","S83.9XXA"}, # MRI joint | |
| "74177": {"K57.30","K72.10","K74.60","C18.9","C64.1","R10.9"}, # CT abdomen/pelvis | |
| "78816": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90"}, # PET-CT | |
| "71046": {"J18.9","J44.1","I50.9","R05.9","R06.09"}, # Chest X-ray | |
| "93015": {"I25.10","I20.9","R07.9","I10"}, # Stress test | |
| "93306": {"I50.9","I48.91","I25.10","I11.9","I34.0"}, # Complete echo | |
| "78452": {"I25.10","I20.9","I48.91"}, # Nuclear stress | |
| "27447": {"M17.11","M17.12","M17.31"}, # TKA | |
| "27130": {"M16.11","M16.12","M16.31"}, # THA | |
| "45378": {"K57.30","K92.1","Z12.31","C18.9","D12.5"}, # Colonoscopy | |
| "43235": {"K21.0","K92.1","K29.70","D13.1"}, # EGD diagnostic | |
| "63030": {"M51.16","M51.17","G89.29","M54.5"}, # Lumbar discectomy | |
| "64483": {"M54.5","M51.16","G89.29"}, # Transforaminal ESI | |
| "64490": {"M54.2","M50.10","G89.29"}, # Cervical facet inj | |
| "90837": {"F32.1","F41.1","F31.9","F20.9","F10.20","F11.20"}, # Psychotherapy 60m | |
| "95810": {"G47.33","G47.31","G47.30"}, # Polysomnography | |
| "90935": {"N18.6","N17.9"}, # Hemodialysis | |
| "43644": {"E66.01","E66.09"}, # Gastric bypass | |
| "G0447": {"E66.01","E66.09","E66.1","Z68.35","Z68.36","Z68.37","Z68.38","Z68.39","Z68.41"}, | |
| "20610": {"M17.11","M17.12","M16.11","M25.511","M25.512","M25.311","M25.312"}, | |
| "94640": {"J44.1","J44.0","J45.901","J43.9"}, # Nebulizer | |
| "96413": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90","C82.90","C83.90"}, | |
| } | |
| def rule_dx_cpt_mismatch(row: Dict) -> Optional[RuleResult]: | |
| """ | |
| ICD-10 → CPT direction: | |
| If the primary diagnosis is in ICD_CPT_MAP, the billed CPT must appear | |
| in the allowed set for that diagnosis. | |
| """ | |
| claim_cpt = cpt(row) | |
| dxs = dx_list(row) | |
| if not claim_cpt or not dxs: | |
| return None | |
| primary_dx = dxs[0].upper().strip() | |
| if primary_dx in ICD_CPT_MAP: | |
| allowed = ICD_CPT_MAP[primary_dx] | |
| if claim_cpt not in allowed: | |
| sample = ", ".join(sorted(allowed)[:6]) | |
| return fail( | |
| "rDxCptMismatch", | |
| f"CPT {claim_cpt} is not a recognized match for primary diagnosis " | |
| f"{primary_dx}. Common expected codes: {sample} (and others).", | |
| ) | |
| return None | |
| def rule_cpt_requires_dx(row: Dict) -> Optional[RuleResult]: | |
| """ | |
| CPT → ICD-10 direction: | |
| If the billed CPT is in CPT_REQUIRES_DX, at least one claim diagnosis | |
| must match the required set. | |
| """ | |
| claim_cpt = cpt(row) | |
| dxs = dx_list(row) | |
| if not claim_cpt or claim_cpt not in CPT_REQUIRES_DX: | |
| return None | |
| required = CPT_REQUIRES_DX[claim_cpt] | |
| claim_dx_set = {d.upper().strip() for d in dxs} | |
| if not claim_dx_set & required: | |
| sample = ", ".join(sorted(required)[:4]) | |
| return fail( | |
| "rCptNoDx", | |
| f"CPT {claim_cpt} requires at least one supporting diagnosis " | |
| f"(e.g., {sample} …) but none found on the claim.", | |
| ) | |
| return None | |
| HARD_RULES: List[RuleFunc] = [ | |
| rule_rdxcode1, | |
| rule_rdxcode_missing, | |
| rule_perinatal_dx_age, | |
| rule_pediatric_dx_age, | |
| rule_maternity_dx_gender, | |
| rule_female_only_dx_gender, | |
| rule_external_cause_not_primary, | |
| rule_z6830_not_primary, | |
| rule_dx_g40009_g40909, | |
| rule_dx_g4089_g40009, | |
| rule_dx_f17200_z87891, | |
| rule_dx_n183_date, | |
| rule_dx_n1830_n1831_n1832_date, | |
| rule_dx_m5459_r051_date, | |
| rule_z01818_z01810_only_primary, | |
| rule_wellmed_f_series_dx, | |
| rule_90653_requires_z23, | |
| rule_zip_valid, | |
| rule_state_valid, | |
| rule_city_valid, | |
| rule_last_name_required, | |
| rule_first_name_required, | |
| rule_patient_name_special_chars, | |
| rule_address_required, | |
| rule_is_ptl, | |
| rule_cpt_required, | |
| rule_g0389_replaced, | |
| rule_94620_replaced, | |
| rule_g0436_replaced, | |
| rule_g0437_replaced, | |
| rule_q2037_replaced, | |
| rule_92585_deleted, | |
| rule_g8553_deleted, | |
| rule_0031a_wrong_code_for_age, | |
| rule_consult_code_medicare, | |
| rule_g0008_insurance, | |
| rule_90471_medicare, | |
| rule_consult_requires_referring, | |
| rule_25000_not_allowed, | |
| rule_jcode_ndc, | |
| rule_u0002_clia, | |
| rule_lab_cpt_clia, | |
| rule_high_level_99203, | |
| rule_high_level_99213, | |
| rule_newborn_cpt_age, | |
| rule_gender_specific_cpt, | |
| rule_maternity_cpt_gender, | |
| rule_maternity_cpt_age, | |
| rule_delivery_cpt_pos, | |
| rule_chemo_requires_cancer_dx, | |
| rule_dialysis_requires_renal_dx, | |
| rule_cardiac_rehab_requires_cardiac_dx, | |
| rule_awv_medicare_age, | |
| rule_screening_colonoscopy_age, | |
| rule_inpatient_only_cpt_office, | |
| rule_prostate_cpt_gender, | |
| rule_pulmonary_rehab_requires_pulmonary_dx, | |
| rule_cardiac_cath_requires_cardiac_dx, | |
| rule_radiation_oncology_requires_cancer, | |
| rule_pt_requires_msk_neuro_dx, | |
| rule_cataract_surgery_requires_cataract_dx, | |
| rule_joint_injection_requires_arthritis_dx, | |
| rule_spine_surgery_requires_spine_dx, | |
| rule_mammography_female, | |
| rule_hysterectomy_female, | |
| rule_testicular_cpt_male, | |
| rule_gallbladder_surgery_requires_dx, | |
| rule_appendectomy_requires_dx, | |
| rule_tonsillectomy_requires_dx, | |
| rule_pediatric_preventive_age, | |
| rule_adult_preventive_age, | |
| rule_ed_visit_requires_pos23, | |
| rule_office_em_inpatient_pos, | |
| rule_bone_density_age, | |
| rule_screening_mammo_requires_z1231, | |
| rule_wellness_visit_requires_z00, | |
| rule_mri_brain_requires_neuro_dx, | |
| rule_ct_chest_requires_chest_dx, | |
| rule_ct_abdomen_requires_abdominal_dx, | |
| rule_echo_requires_cardiac_dx, | |
| rule_a1c_requires_diabetes_dx, | |
| rule_lipid_panel_requires_cv_dx, | |
| rule_psa_male, | |
| rule_pap_smear_female, | |
| rule_cpap_requires_osa_dx, | |
| rule_wheelchair_requires_mobility_dx, | |
| rule_fetal_us_female, | |
| rule_circumcision_male, | |
| rule_iud_female, | |
| rule_colposcopy_female, | |
| rule_inpatient_em_requires_inpatient_pos, | |
| rule_observation_em_requires_pos22, | |
| rule_nf_em_requires_pos_31_32, | |
| rule_home_visit_requires_pos12, | |
| rule_aaa_screening_male_age, | |
| rule_lung_ct_screening_age, | |
| rule_q4169_supervision, | |
| rule_medicare_forbidden_cpt, | |
| rule_g0444_unbundle, | |
| rule_0000_nonbillable, | |
| rule_g2211_modifier25, | |
| rule_g2011_modifier, | |
| rule_smoking_cessation_requires_dx, | |
| rule_q4221_units, | |
| rule_q4250_units, | |
| rule_95004_min_units, | |
| rule_95165_min_units, | |
| rule_zero_units, | |
| rule_33340_primary_dx, | |
| rule_33340_pos, | |
| rule_rpm_checkbox, | |
| rule_q_series_note, | |
| rule_colon_in_note, | |
| rule_wellmed_g0508, | |
| rule_prolong_service, | |
| rule_99490_not_bill, | |
| rule_99024_nonreplacement, | |
| rule_99498_add_on, | |
| rule_vaccine_admin_required, | |
| rule_90653_admin_required, | |
| rule_self_pay_with_active_insurance, | |
| rule_g9719_required_dx, | |
| rule_mips_99221_99223, | |
| rule_mips_99236, | |
| rule_mips_99304_99306, | |
| rule_mips_99307_99310, | |
| rule_comb1, | |
| rule_comb3, | |
| rule_comb4, | |
| rule_comb2, | |
| rule_medicare_uhc_use_g0447, | |
| rule_99441_99443_pos11, | |
| rule_99354_99359_not_pos11, | |
| rule_0034a_date, | |
| rule_0071a_0072a_date, | |
| rule_g0438_use_99203, | |
| rule_99470_99457_99458, | |
| rule_99454_99445, | |
| rule_office_cpt_pos, | |
| rule_inpatient_cpt_pos, | |
| rule_snf_cpt_pos, | |
| rule_observation_cpt_pos, | |
| rule_er_cpt_pos, | |
| rule_home_visit_cpt_pos, | |
| rule_admission_date_empty_for_office, | |
| rule_admission_date_required_inpatient, | |
| rule_discharge_not_before_admission, | |
| rule_future_admission_date, | |
| rule_future_dos, | |
| rule_january_2020_dos, | |
| rule_dos_required, | |
| rule_dummy_dos, | |
| rule_charges_positive, | |
| rule_charges_max, | |
| rule_patient_payment_not_exceed_charges, | |
| rule_referring_required_for_medicare, | |
| rule_referral_requires_referring, | |
| rule_attending_required, | |
| rule_billing_required, | |
| rule_medicare_mbi_format, | |
| rule_dummy_dob, | |
| rule_dob_required, | |
| rule_gender_required, | |
| rule_insurance_required, | |
| rule_molina_9_digit_id, | |
| rule_aarp_not_primary, | |
| rule_workers_comp_accident_date, | |
| rule_11042_requires_pa, | |
| # ICD-10 ↔ CPT mapping rules | |
| rule_dx_cpt_mismatch, | |
| rule_cpt_requires_dx, | |
| ] | |
| # ============================================================ | |
| # Rule engine | |
| # ============================================================ | |
| def apply_hard_rules(row: Dict) -> List[RuleResult]: | |
| broken: List[RuleResult] = [] | |
| broken.extend(collect_duplicate_dx_rules(row)) | |
| for rule in HARD_RULES: | |
| result = rule(row) | |
| if result and result.fired: | |
| broken.append(result) | |
| return broken | |
| def evaluate_claim(row: Dict) -> Dict: | |
| broken = apply_hard_rules(row) | |
| return { | |
| "ClaimId": get(row, "ClaimId"), | |
| "PatientName": get(row, "PatientName"), | |
| "CPTCode": get(row, "CPTCode"), | |
| "DOS": get(row, "DOS"), | |
| "Status": "REJECTED" if broken else "ACCEPTED", | |
| "Rules_Broken": len(broken), | |
| "Rule_Names": " | ".join(r.name for r in broken), | |
| "Error_Messages": " | ".join(r.message for r in broken), | |
| } | |
| # ============================================================ | |
| # Runner | |
| # ============================================================ | |
| def run_hard_rule_engine(input_file: str = "claims_to_check.xlsx", output_file: str = "Hard_Rule_Results.xlsx"): | |
| print("=" * 60) | |
| print(" HARD RULE ENGINE") | |
| print("=" * 60) | |
| try: | |
| df = pd.read_excel(input_file, sheet_name="Claims", header=2) | |
| df = df[df["ClaimId"].notna() & (df["ClaimId"].astype(str).str.strip() != "")].copy() | |
| df = df.reset_index(drop=True) | |
| print(f"Loaded {len(df)} claims from {input_file}") | |
| except Exception as exc: | |
| print(f"Error loading {input_file}: {exc}") | |
| return None | |
| results = [] | |
| for _, row in df.iterrows(): | |
| result = evaluate_claim(row.to_dict()) | |
| results.append(result) | |
| icon = "ACCEPTED" if result["Status"] == "ACCEPTED" else f"REJECTED ({result['Rules_Broken']} rules)" | |
| print( | |
| f"Claim {str(result['ClaimId']):>8} | " | |
| f"{str(result['PatientName']):<22} | " | |
| f"CPT {str(result['CPTCode']):<8} | {icon}" | |
| ) | |
| out_df = pd.DataFrame(results) | |
| out_df.to_excel(output_file, index=False, sheet_name="Results") | |
| total = len(out_df) | |
| rejected = int((out_df["Status"] == "REJECTED").sum()) | |
| accepted = total - rejected | |
| rejection_pct = (rejected / total * 100) if total else 0 | |
| print("-" * 60) | |
| print(f"Total Claims : {total}") | |
| print(f"Accepted : {accepted}") | |
| print(f"Rejected : {rejected}") | |
| print(f"Rejection % : {rejection_pct:.1f}%") | |
| print(f"Saved results to: {output_file}") | |
| return out_df | |
| if __name__ == "__main__": | |
| import sys | |
| input_path = sys.argv[1] if len(sys.argv) > 1 else "claims_to_check.xlsx" | |
| output_path = sys.argv[2] if len(sys.argv) > 2 else "Hard_Rule_Results.xlsx" | |
| run_hard_rule_engine(input_path, output_path) | |