""" Hard_Rule_Engine.py =================== Rule-engine version of the user's RBS checker. Purpose: - Apply hard validation rules to each claim - If any hard rule fires, mark claim as REJECTED - Return the list of broken hard rules and their messages This file is intentionally structured for a rule engine, not for ML features. A separate file can convert the same rule logic into binary features. USAGE: python Hard_Rule_Engine.py python Hard_Rule_Engine.py claims_to_check.xlsx Hard_Rule_Results.xlsx """ from __future__ import annotations import warnings from dataclasses import dataclass from datetime import date from typing import Callable, Dict, List, Optional import pandas as pd warnings.filterwarnings("ignore") # ============================================================ # Helpers # ============================================================ def get(row: Dict, col: str, default=""): value = row.get(col, default) if pd.isna(value): return default return value def dx_list(row: Dict) -> List[str]: raw = get(row, "AllDxCodes", "") return [c.strip().upper() for c in str(raw).split(",") if c.strip()] def patient_age(row: Dict) -> Optional[float]: try: dob = pd.to_datetime(get(row, "DateOfBirth")) if pd.isna(dob): return None return (pd.Timestamp.today() - dob).days / 365.25 except Exception: return None def is_medicare(row: Dict) -> bool: ins = str(get(row, "ClaimPriIns", "")).lower() return "medicare" in ins def get_modifiers(row: Dict) -> List[str]: return [str(get(row, f"Modifier{i}", "")).strip().upper() for i in range(1, 5)] def cpt(row: Dict) -> str: return str(get(row, "CPTCode", "")).strip().upper() def dos_date(row: Dict) -> Optional[date]: try: v = get(row, "DOS", "") if not v or str(v).strip() in ("", "nan", "NaT"): return None parsed = pd.to_datetime(v, errors="coerce") return None if pd.isna(parsed) else parsed.date() except Exception: return None def adm_date(row: Dict) -> Optional[date]: try: v = get(row, "AdmissionDate", "") if not v or str(v).strip() in ("", "nan", "NaT"): return None parsed = pd.to_datetime(v, errors="coerce") return None if pd.isna(parsed) else parsed.date() except Exception: return None def dis_date(row: Dict) -> Optional[date]: try: v = get(row, "DischargeDate", "") if not v or str(v).strip() in ("", "nan", "NaT"): return None parsed = pd.to_datetime(v, errors="coerce") return None if pd.isna(parsed) else parsed.date() except Exception: return None def has_modifier(row: Dict, mod: str) -> bool: return mod.upper() in [m.upper() for m in get_modifiers(row) if m] def to_float(value, default: float = 0.0) -> float: try: if value is None or str(value).strip() == "": return default return float(value) except Exception: return default # ============================================================ # Rule model # ============================================================ @dataclass class RuleResult: name: str fired: bool message: str = "" RuleFunc = Callable[[Dict], Optional[RuleResult]] def fail(name: str, message: str) -> RuleResult: return RuleResult(name=name, fired=True, message=message) # ============================================================ # Hard rules # One rule = one function # ============================================================ def rule_rdxcode1(row: Dict) -> Optional[RuleResult]: dxs = dx_list(row) if dxs and dxs[0] == "I10": return fail("rdxcode1", 'Primary Diagnosis code must not equal to "I10".') return None def rule_rdxcode_missing(row: Dict) -> Optional[RuleResult]: if not dx_list(row): return fail("rDXCodeMissing", "DXCODE1 is mandatory field.") return None # def rule_duplicate_dx_codes(row: Dict) -> Optional[RuleResult]: # seen = [] # for i, d in enumerate(dx_list(row), start=1): # if d in seen: # return fail(f"rDxCode{i}_dup", f"DX{i} Code must not be entered twice. ({d})") # seen.append(d) # return None def collect_duplicate_dx_rules(row: Dict) -> List[RuleResult]: broken = [] seen = [] for i, d in enumerate(dx_list(row), start=1): if d in seen: broken.append(fail(f"rDxCode{i}_dup", f"DX{i} duplicate ({d})")) seen.append(d) return broken def rule_perinatal_dx_age(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if age is None or age <= 1: return None for i, d in enumerate(dx_list(row), start=1): if d.startswith(("P0", "P1", "P2", "P3", "P4", "P5", "P6", "P7", "P8", "P9")): return fail("rDXCodeCategory1", f"DX{i} ({d}) is for Perinatal/Newborn and not valid for this patient.") return None def rule_pediatric_dx_age(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if age is None or age <= 17: return None for i, d in enumerate(dx_list(row), start=1): if d.startswith("Z00.1") or d.startswith("Z001"): return fail("rDXCodeCategory2", f"DX{i} ({d}) is Pediatric and not valid for this patient.") return None def rule_maternity_dx_gender(row: Dict) -> Optional[RuleResult]: gender = str(get(row, "Gender", "")).strip().lower() if gender != "male": return None for i, d in enumerate(dx_list(row), start=1): if d.startswith(("O", "Z34", "Z36", "Z3A")): return fail("rDXCodeCategory3", f"DX{i} ({d}) is Maternity Diagnosis and not valid for Male patient.") return None def rule_female_only_dx_gender(row: Dict) -> Optional[RuleResult]: female_only_pfx = ( "N70", "N71", "N72", "N73", "N74", "N75", "N76", "N77", "N80", "N81", "N82", "N83", "N84", "N85", "N86", "N87", "N88", "N89", "N90", "N91", "N92", "N93", "N94", "N95", "N96", "N97" ) gender = str(get(row, "Gender", "")).strip().lower() if gender != "male": return None for i, d in enumerate(dx_list(row), start=1): if any(d.startswith(p) for p in female_only_pfx): return fail("rDXCodeCategory5", f"DX{i} ({d}) is Female-only and not valid for Male patient.") return None def rule_external_cause_not_primary(row: Dict) -> Optional[RuleResult]: dxs = dx_list(row) if dxs and dxs[0].startswith(("V", "W", "X", "Y")): return fail("rdxcode1_morbidity", f"External cause code ({dxs[0]}) may not be used as primary diagnosis.") return None def rule_z6830_not_primary(row: Dict) -> Optional[RuleResult]: dxs = dx_list(row) if dxs and dxs[0] in ("Z68.30", "Z6830"): return fail("rdxZ6830", "Z68.30 is not acceptable as principal DX.") return None def rule_dx_g40009_g40909(row: Dict) -> Optional[RuleResult]: dx_set = {d.replace(".", "") for d in dx_list(row)} if "G40009" in dx_set and "G40909" in dx_set: return fail("rDx1Diagnosis", "DX Code G40009 and G40909 cannot be billed in single claim.") return None def rule_dx_g4089_g40009(row: Dict) -> Optional[RuleResult]: dx_set = {d.replace(".", "") for d in dx_list(row)} if "G4089" in dx_set and "G40009" in dx_set: return fail("rDxCodeG40", "DX Code G40.89 and G40.009 cannot be billed in single claim.") return None def rule_dx_f17200_z87891(row: Dict) -> Optional[RuleResult]: dx_set = {d.replace(".", "") for d in dx_list(row)} if "F17200" in dx_set and "Z87891" in dx_set: return fail("rdxF17&Z87", "Diagnosis codes F17.200 and Z87.891 cannot be reported together.") return None def rule_dx_n183_date(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if dos and dos > date(2020, 9, 30): for d in dx_list(row): if d.replace(".", "") == "N183": return fail("rDxN183", "DxCode N18.3 was billable only through 2020-09-30.") return None def rule_dx_n1830_n1831_n1832_date(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if dos and dos < date(2020, 10, 1): for d in dx_list(row): if d.replace(".", "") in ("N1830", "N1831", "N1832"): return fail("rDxN18303132", f"DxCode {d} is effective from 2020-10-01.") return None def rule_dx_m5459_r051_date(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if dos and dos < date(2021, 10, 1): for d in dx_list(row): if d in ("M54.59", "R05.1", "M5459", "R051"): return fail("rDx-M54.59,R05.1", f"Diagnosis code {d} is effective from 2021-10-01.") return None def rule_z01818_z01810_only_primary(row: Dict) -> Optional[RuleResult]: for i, d in enumerate(dx_list(row)[1:], start=2): if d.replace(".", "") in ("Z01818", "Z01810"): return fail("rDXZ01818", f"DX {d} is acceptable only as primary diagnosis, found at position {i}.") return None def rule_wellmed_f_series_dx(row: Dict) -> Optional[RuleResult]: if "wellmed" not in str(get(row, "ClaimPriIns", "")).lower(): return None for d in dx_list(row): if d.startswith("F"): return fail("rDxCodeFSeries", f"For Wellmed insurance, F-series diagnosis codes are not acceptable. Found: {d}") return None def rule_90653_requires_z23(row: Dict) -> Optional[RuleResult]: if cpt(row) == "90653": if not any(d.replace(".", "") == "Z23" for d in dx_list(row)): return fail("rDx723Point", "ICD Z23 is mandatory with CPT 90653.") return None def rule_zip_valid(row: Dict) -> Optional[RuleResult]: zip_code = str(get(row, "Zip", "")).strip() if not zip_code or not (len(zip_code) == 5 or len(zip_code.replace("-", "")) == 9): return fail("rZIP", "Please enter valid Zip Code for patient.") return None def rule_state_valid(row: Dict) -> Optional[RuleResult]: state = str(get(row, "State", "")).strip() if not state or len(state) < 2: return fail("rState", "Please enter valid State for patient.") return None def rule_city_valid(row: Dict) -> Optional[RuleResult]: city = str(get(row, "City", "")).strip() if not city: return fail("rCity", "Please enter valid City for patient.") return None def rule_last_name_required(row: Dict) -> Optional[RuleResult]: if not str(get(row, "LastName", "")).strip(): return fail("rLNMMis", "Last Name is Mandatory field.") return None def rule_first_name_required(row: Dict) -> Optional[RuleResult]: if not str(get(row, "FirstName", "")).strip(): return fail("rFNMis", "First Name is Mandatory field.") return None def rule_patient_name_special_chars(row: Dict) -> Optional[RuleResult]: patient_name = str(get(row, "PatientName", "")).strip() special_chars = set('!#%$&+,./:;<=>@`{|}~"()*\\_^?[]\'') if set(patient_name) & special_chars: return fail("rSpecChar", "Special characters are not allowed in Patient name.") return None def rule_address_required(row: Dict) -> Optional[RuleResult]: if not str(get(row, "Address", "")).strip(): return fail("rADSMis", "Address is Mandatory field.") return None def rule_is_ptl(row: Dict) -> Optional[RuleResult]: is_ptl = str(get(row, "IsPTL", "0")).strip().lower() if is_ptl in ("1", "true", "yes"): return fail("rIsPTL", "Claim with PTL status must not be submitted.") return None def rule_cpt_required(row: Dict) -> Optional[RuleResult]: if not cpt(row): return fail("rCPTCode=1", "Please enter at least one CPT for the claim.") return None def rule_g0389_replaced(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0389": return fail("rCPTCOde=10", "CPT Code G0389 is replaced by 76706.") return None def rule_94620_replaced(row: Dict) -> Optional[RuleResult]: if cpt(row) == "94620": dos = dos_date(row) if dos and dos > date(2018, 1, 1): return fail("rCPTCode94620", "CPT 94620 is not acceptable after 2018-01-01 DOS.") return None def rule_g0436_replaced(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0436": return fail("rCPTCode=12", "CPT G0436 is replaced with 99406.") return None def rule_g0437_replaced(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0437": return fail("rCPTCode=13", "CPT G0437 is replaced with 99407.") return None def rule_q2037_replaced(row: Dict) -> Optional[RuleResult]: if cpt(row) == "Q2037": return fail("rCPTCodeQ2037", "CPT Q2037 has been replaced. Please enter updated CPT.") return None def rule_92585_deleted(row: Dict) -> Optional[RuleResult]: if cpt(row) == "92585": return fail("rCPT92585", "92585 is a deleted CPT code.") return None def rule_g8553_deleted(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G8553": return fail("rCPTG8553Del", "CPT G8553 is a deleted code.") return None def rule_0031a_wrong_code_for_age(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if cpt(row) == "0031A" and age is not None and 12 <= age <= 18: return fail("rCPT0031A", 'Correct code is "0001A 0002A". Please replace the code.') return None def rule_consult_code_medicare(row: Dict) -> Optional[RuleResult]: consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"} if cpt(row) in consult_codes and is_medicare(row): return fail("rConsultCode", "Medicare does not accept consultation codes.") return None def rule_g0008_insurance(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0008": ins_name = str(get(row, "ClaimPriIns", "")).lower() if "medicare" not in ins_name and "uhc" not in ins_name and "united" not in ins_name: return fail("rCPTCodeG0008", "G0008 is only allowed with Medicare or UHC.") return None def rule_90471_medicare(row: Dict) -> Optional[RuleResult]: if cpt(row) == "90471" and is_medicare(row): return fail("rCPTCode90471", "90471 is not allowed with Medicare.") return None def rule_consult_requires_referring(row: Dict) -> Optional[RuleResult]: consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"} if cpt(row) in consult_codes and not get(row, "ReferringPhysician"): return fail("rConsultCode2", "Referring physician is mandatory with consultation code.") return None def rule_25000_not_allowed(row: Dict) -> Optional[RuleResult]: if cpt(row) == "25000": return fail("rCPTCode25000", "CPT Code 25000 is not allowed.") return None def rule_jcode_ndc(row: Dict) -> Optional[RuleResult]: if cpt(row).startswith("J") and not str(get(row, "Drug_NDC", "")).strip(): return fail("rJcodeNDC", "J-code without NDC should not be billed.") return None def rule_u0002_clia(row: Dict) -> Optional[RuleResult]: if cpt(row) == "U0002" and not get(row, "CLIANumber"): return fail("rCPTU0002Clia", "CLIA number is required with CPT U0002.") return None def rule_lab_cpt_clia(row: Dict) -> Optional[RuleResult]: try: cpt_num = int(cpt(row)) if 80000 <= cpt_num <= 89999 and not get(row, "CLIANumber"): return fail("rCLIANumberMissing", "CLIA number required for lab services.") except Exception: return None return None def rule_high_level_99203(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99203": return fail("rCPTHighLevel", "Please update CPT 99203 as 99204 according to visit type.") return None def rule_high_level_99213(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99213": return fail("rCPTHighLevel", "Please update CPT 99213 as 99214 according to visit type.") return None def rule_newborn_cpt_age(row: Dict) -> Optional[RuleResult]: newborn_cpts = {"99460", "99461", "99462", "99463", "99464", "99465"} age = patient_age(row) if cpt(row) in newborn_cpts and age is not None and age > (28 / 365.25): return fail("rNewbornCPT", f"Newborn care CPT {cpt(row)} is not valid for patient older than 28 days.") return None def rule_gender_specific_cpt(row: Dict) -> Optional[RuleResult]: female_only = {"58300", "58301", "58600", "58605", "58700", "58720", "58740", "58750", "57452", "57454", "57455"} male_only = {"54000", "54001", "54050", "54055", "54150", "54160", "54161", "55000", "55040", "55041"} gender = str(get(row, "Gender", "")).strip().lower() code = cpt(row) if code in female_only and gender == "male": return fail("rCPTGender", f"CPT {code} is not valid for Male gender.") if code in male_only and gender == "female": return fail("rCPTGender", f"CPT {code} is not valid for Female gender.") return None MATERNITY_CPTS = { "59000", "59012", "59015", "59020", "59025", "59030", "59050", "59051", "59070", "59072", "59074", "59076", "59100", "59120", "59121", "59130", "59135", "59136", "59140", "59150", "59151", "59160", "59200", "59300", "59320", "59325", "59350", "59400", "59409", "59410", "59412", "59414", "59425", "59426", "59430", "59510", "59514", "59515", "59525", "59610", "59612", "59614", "59618", "59620", "59622", "59812", "59820", "59821", "59830", "59840", "59841", "59850", "59851", "59852", "59855", "59856", "59857", "59866", "59870", "59871", "59897", "59898", "59899" } def rule_maternity_cpt_gender(row: Dict) -> Optional[RuleResult]: if cpt(row) in MATERNITY_CPTS and str(get(row, "Gender", "")).strip().lower() == "male": return fail("rMatCPT", f"Maternity care CPT {cpt(row)} is not valid for Male gender.") return None def rule_maternity_cpt_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in MATERNITY_CPTS: return None age = patient_age(row) if age is None: return None if age < 10 or age > 55: return fail("rMatCPTAge", f"Maternity CPT {cpt(row)} not valid for patient age {age:.0f} (expected 10–55).") return None def rule_delivery_cpt_pos(row: Dict) -> Optional[RuleResult]: delivery_cpts = {"59400", "59409", "59410", "59510", "59514", "59515", "59610", "59612", "59614", "59618", "59620", "59622"} if cpt(row) not in delivery_cpts: return None pos = str(get(row, "POS", "")).strip() valid_pos = {"21", "22", "23", "02"} if pos and pos not in valid_pos: return fail("rDeliveryPOS", f"Delivery CPT {cpt(row)} requires POS 21/22/23/02, got POS {pos}.") return None def rule_chemo_requires_cancer_dx(row: Dict) -> Optional[RuleResult]: chemo_cpts = { "96401", "96402", "96405", "96406", "96409", "96411", "96413", "96415", "96416", "96417", "96420", "96422", "96423", "96425", "96440", "96446", "96450", "96521", "96522", "96523", "96542", "96549", } if cpt(row) not in chemo_cpts: return None for d in dx_list(row): # C00-C96 malignant; D00-D09 in situ; D37-D48 uncertain behavior if d.startswith("C") or d.startswith(("D0", "D37", "D38", "D39", "D4")): return None return fail("rChemoNoCancer", f"Chemotherapy CPT {cpt(row)} requires a neoplasm diagnosis (C-code or D00–D48).") def rule_dialysis_requires_renal_dx(row: Dict) -> Optional[RuleResult]: dialysis_cpts = { "90935", "90937", "90940", "90945", "90947", "90951", "90952", "90953", "90954", "90955", "90956", "90957", "90958", "90959", "90960", "90961", "90962", "90963", "90964", "90965", "90966", "90967", "90968", "90969", "90970", "90989", "90993", "90997", "90999", } if cpt(row) not in dialysis_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith("N18") or d == "Z992": return None return fail("rDialysisNoRenal", f"Dialysis CPT {cpt(row)} requires renal diagnosis (N18.x or Z99.2).") def rule_cardiac_rehab_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"93797", "93798", "G0422", "G0423"}: return None # I20 angina, I21 MI, I22 subsequent MI, I25 CAD, I50 heart failure, Z95.1 CABG hx, Z95.5 PCI hx valid_prefixes = ("I20", "I21", "I22", "I25", "I50", "Z951", "Z955") for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(valid_prefixes): return None return fail("rCardRehabNoCardiac", f"Cardiac rehab CPT {cpt(row)} requires cardiac diagnosis (I20–I25, I50, or Z95.1/Z95.5).") def rule_awv_medicare_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"G0438", "G0439"}: return None age = patient_age(row) if age is None or age >= 65: return None return fail("rAWVAge", f"AWV CPT {cpt(row)} requires Medicare-eligible age (65+), got age {age:.0f}.") def rule_screening_colonoscopy_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"G0121", "45378", "45380", "45385"}: return None dx_flat = [x.replace(".", "") for x in dx_list(row)] is_screening = any(d.startswith("Z1211") or d.startswith("Z8671") for d in dx_flat) if not is_screening: return None age = patient_age(row) if age is None or age >= 45: return None return fail("rColonScreenAge", f"Screening colonoscopy not recommended before age 45 (USPSTF), got age {age:.0f}.") def rule_inpatient_only_cpt_office(row: Dict) -> Optional[RuleResult]: # Sample of CMS Medicare Inpatient-Only (IPO) list — must be inpatient hospital inpatient_only = { "33533", "33534", "33535", "33536", # CABG "27130", "27132", "27134", "27137", "27138", # Hip arthroplasty "27447", "27486", "27487", # Knee arthroplasty "43644", "43645", "43770", "43775", # Bariatric "23470", "23472", # Shoulder arthroplasty "61510", "61512", "61518", "61519", "61521", # Craniotomy "63081", "63082", "63085", "63086", # Vertebral corpectomy } if cpt(row) not in inpatient_only: return None pos = str(get(row, "POS", "")).strip() if pos == "11": return fail("rIPOatOffice", f"CPT {cpt(row)} is on Medicare Inpatient-Only list — cannot be billed at POS 11 (office).") return None def rule_prostate_cpt_gender(row: Dict) -> Optional[RuleResult]: prostate_cpts = { "55700", "55705", "55706", "55801", "55810", "55812", "55815", "55821", "55831", "55840", "55842", "55845", "55866", "55873", "55875", } if cpt(row) not in prostate_cpts: return None gender = str(get(row, "Gender", "")).strip().lower() if gender == "female": return fail("rProstateFemale", f"Prostate CPT {cpt(row)} is not valid for Female gender.") return None def rule_pulmonary_rehab_requires_pulmonary_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"94625", "94626", "G0237", "G0238", "G0239", "G0424"}: return None dx_flat = [x.replace(".", "") for x in dx_list(row)] for d in dx_flat: # J40-J47 COPD/chronic bronchitis/emphysema, J84 interstitial lung, J96 respiratory failure, J68 chemical injury if d.startswith(("J40","J41","J42","J43","J44","J45","J46","J47","J84","J96","J68")): return None return fail("rPulmRehabNoPulm", f"Pulmonary rehab CPT {cpt(row)} requires pulmonary DX (J40–J47, J84, or J96).") def rule_cardiac_cath_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: cath_cpts = {"93451","93452","93453","93454","93455","93456","93457","93458","93459", "93460","93461","93530","93531","93532","93533"} if cpt(row) not in cath_cpts: return None dx_flat = [x.replace(".", "") for x in dx_list(row)] for d in dx_flat: # I20-I25 ischemic heart, I42-I43 cardiomyopathy, I48 afib, I49 arrhythmia, I50 CHF, R07 chest pain, R00 palpitations if d.startswith(("I20","I21","I22","I23","I24","I25","I42","I43","I48","I49","I50","R07","R000","R001")): return None return fail("rCardCathNoCardiac", f"Cardiac cath CPT {cpt(row)} requires cardiac DX (I20–I25, I42–I50, or R07/R00).") def rule_radiation_oncology_requires_cancer(row: Dict) -> Optional[RuleResult]: rad_cpts = {"77261","77262","77263","77280","77285","77290","77293","77295", "77299","77300","77301","77306","77307","77316","77317","77318", "77321","77331","77332","77333","77334","77336","77338", "77370","77371","77372","77373","77385","77386","77387", "77401","77402","77407","77412","77417","77427", "77520","77522","77523","77525"} if cpt(row) not in rad_cpts: return None for d in dx_list(row): if d.startswith("C") or d.startswith(("D0","D37","D38","D39","D4")): return None return fail("rRadOncNoCancer", f"Radiation oncology CPT {cpt(row)} requires neoplasm DX (C-code or D00–D48).") def rule_pt_requires_msk_neuro_dx(row: Dict) -> Optional[RuleResult]: pt_cpts = {"97110","97112","97116","97124","97140","97150","97530","97535","97537","97542","97545","97546"} if cpt(row) not in pt_cpts: return None dx_flat = [x.replace(".", "") for x in dx_list(row)] for d in dx_flat: # M-codes MSK, G8x palsies, S-codes injuries, Z47/Z48/Z51 aftercare, R26 gait/mobility if d.startswith(("M","S","G8","Z47","Z48","Z51","R26")): return None return fail("rPTNoMSK", f"PT CPT {cpt(row)} requires MSK (M-code), neuro (G8x), injury (S-code), or aftercare (Z47/Z48/Z51) DX.") def rule_cataract_surgery_requires_cataract_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"66982","66983","66984","66987","66988"}: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("H25","H26","H28")): return None return fail("rCataractNoDx", f"Cataract surgery CPT {cpt(row)} requires cataract DX (H25–H28).") def rule_joint_injection_requires_arthritis_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"20600","20604","20605","20606","20610","20611"}: return None dx_flat = [x.replace(".", "") for x in dx_list(row)] for d in dx_flat: # M00-M25 arthropathies, M60-M79 soft tissue disorders, S-codes injury if d.startswith(("M0","M1","M2","M6","M7","S")): return None return fail("rJointInjNoDx", f"Joint injection CPT {cpt(row)} requires joint/soft-tissue/injury DX (M0x–M2x, M6x–M7x, S-codes).") def rule_spine_surgery_requires_spine_dx(row: Dict) -> Optional[RuleResult]: spine_cpts = {"22551","22552","22554","22556","22558","22590","22595","22612", "22614","22630","22632","22633","22634", "63020","63030","63042","63045","63047","63048", "63056","63075","63081","63082","63085","63086"} if cpt(row) not in spine_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("M40","M41","M42","M43","M45","M46","M47","M48","M50","M51","M53","M54","M96","S14","S24","S34")): return None return fail("rSpineSurgNoDx", f"Spine surgery CPT {cpt(row)} requires spine DX (M40–M54, M96, or S14/S24/S34).") def rule_mammography_female(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"77063","77065","77066","77067"}: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rMammoMale", f"Mammography CPT {cpt(row)} is typically female-only; found Male gender.") return None def rule_hysterectomy_female(row: Dict) -> Optional[RuleResult]: hyst_cpts = {"58150","58152","58180","58200","58210","58260","58262","58263","58267", "58270","58275","58280","58285","58290","58291","58292","58293","58294", "58541","58542","58543","58544","58548","58550","58552","58553","58554", "58570","58571","58572","58573"} if cpt(row) not in hyst_cpts: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rHystMale", f"Hysterectomy CPT {cpt(row)} is not valid for Male gender.") return None def rule_testicular_cpt_male(row: Dict) -> Optional[RuleResult]: test_cpts = {"54500","54505","54512","54520","54522","54530","54535","54550","54560"} if cpt(row) not in test_cpts: return None if str(get(row, "Gender", "")).strip().lower() == "female": return fail("rTesticFemale", f"Testicular CPT {cpt(row)} is not valid for Female gender.") return None def rule_gallbladder_surgery_requires_dx(row: Dict) -> Optional[RuleResult]: gb_cpts = {"47562","47563","47564","47600","47605","47610","47612","47620"} if cpt(row) not in gb_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("K80","K81","K82","K83","C23")): return None return fail("rGallbladderNoDx", f"Gallbladder surgery CPT {cpt(row)} requires gallbladder DX (K80–K83 or C23).") def rule_appendectomy_requires_dx(row: Dict) -> Optional[RuleResult]: app_cpts = {"44950","44955","44960","44970"} if cpt(row) not in app_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("K35","K36","K37","K38")) or d == "C181": return None return fail("rAppendNoDx", f"Appendectomy CPT {cpt(row)} requires appendicitis DX (K35–K38 or C18.1).") def rule_tonsillectomy_requires_dx(row: Dict) -> Optional[RuleResult]: tons_cpts = {"42820","42821","42825","42826","42830","42831","42835","42836"} if cpt(row) not in tons_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("J02","J03","J35","J36")) or d == "G4733": return None return fail("rTonsilNoDx", f"Tonsillectomy CPT {cpt(row)} requires tonsil/adenoid/OSA DX (J02, J03, J35, J36, or G47.33).") def rule_pediatric_preventive_age(row: Dict) -> Optional[RuleResult]: code = cpt(row) age_limits = {"99381":(0,1), "99382":(1,4), "99383":(5,11), "99384":(12,17), "99391":(0,1), "99392":(1,4), "99393":(5,11), "99394":(12,17)} if code not in age_limits: return None age = patient_age(row) if age is None: return None lo, hi = age_limits[code] if age < lo or age > hi: return fail("rPedPrevAge", f"Preventive CPT {code} expects age {lo}–{hi}, got age {age:.0f}.") return None def rule_adult_preventive_age(row: Dict) -> Optional[RuleResult]: code = cpt(row) age_limits = {"99385":(18,39), "99386":(40,64), "99387":(65,200), "99395":(18,39), "99396":(40,64), "99397":(65,200)} if code not in age_limits: return None age = patient_age(row) if age is None: return None lo, hi = age_limits[code] if age < lo or age > hi: return fail("rAdultPrevAge", f"Preventive CPT {code} expects age {lo}–{hi}, got age {age:.0f}.") return None def rule_ed_visit_requires_pos23(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99281","99282","99283","99284","99285"}: return None pos = str(get(row, "POS", "")).strip() if pos and pos != "23": return fail("rEDVisitPOS", f"ED visit CPT {cpt(row)} requires POS 23 (emergency room), got POS {pos}.") return None def rule_office_em_inpatient_pos(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99202","99203","99204","99205","99212","99213","99214","99215"}: return None pos = str(get(row, "POS", "")).strip() if pos in {"21","31","32","51","61"}: return fail("rOfficeEMInpt", f"Office E&M CPT {cpt(row)} not valid at inpatient POS {pos}; use inpatient visit codes (99221–99233).") return None def rule_bone_density_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"77080","77081","77085","77086"}: return None age = patient_age(row) if age is None or age >= 50: return None return fail("rBoneDensAge", f"Bone density CPT {cpt(row)} typically not indicated before age 50, got age {age:.0f}.") def rule_screening_mammo_requires_z1231(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"77063","77067"}: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith("Z1231"): return None return fail("rScreenMammoDx", f"Screening mammography CPT {cpt(row)} requires DX Z12.31 (screening encounter).") def rule_wellness_visit_requires_z00(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99385","99386","99387","99395","99396","99397"}: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith("Z00"): return None return fail("rWellnessDx", f"Preventive wellness CPT {cpt(row)} requires Z00.xx encounter DX.") # ---- Imaging medical-necessity ---- def rule_mri_brain_requires_neuro_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"70551","70552","70553","70554","70555"}: return None for d in (x.replace(".", "") for x in dx_list(row)): # G-codes neuro, I60-I69 cerebrovascular, R40-R42/R51 head sx, S00/S06 head injury, # C70-C72 brain tumors, F01-F09 dementia/cognitive, Z0183 neoplasm f/u if (d.startswith("G") or d.startswith(("I6","R4","R51","S00","S06","C70","C71","C72","F0")) or d == "Z0183"): return None return fail("rMRIBrainNoDx", f"MRI brain CPT {cpt(row)} requires neuro/head DX (G-code, I60–I69, R40–R51, S00/S06, C70–C72).") def rule_ct_chest_requires_chest_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"71250","71260","71270","71271","71275"}: return None for d in (x.replace(".", "") for x in dx_list(row)): # J-codes pulm, I-codes cardiac, C34 lung CA, R05/R06/R07/R09 chest sx, S27 thorax injury, Z87 hx if (d.startswith(("J","I","C34","C33","R05","R06","R07","R09","S27","Z87"))): return None return fail("rCTChestNoDx", f"CT chest CPT {cpt(row)} requires chest DX (J/I codes, C33–C34, R05–R09, S27).") def rule_ct_abdomen_requires_abdominal_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"74150","74160","74170","74176","74177","74178"}: return None for d in (x.replace(".", "") for x in dx_list(row)): # K-codes digestive, N-codes GU, R10/R11/R14/R19 abd sx, C15-C26 GI CA, S30-S39 abd injury if (d.startswith("K") or d.startswith("N") or d.startswith(("R10","R11","R14","R19","S3")) or d.startswith(("C15","C16","C17","C18","C19","C20","C21","C22","C23","C24","C25","C26","C67"))): return None return fail("rCTAbdNoDx", f"CT abdomen CPT {cpt(row)} requires abdominal DX (K/N code, R10–R19, S30–S39, C15–C26).") def rule_echo_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"93303","93304","93306","93307","93308","93312","93313","93314","93315","93316","93317","93320","93321"}: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith(("I","R07","R000","R001","R06","R55","Q2","Q3")): return None return fail("rEchoNoCardiac", f"Echocardiogram CPT {cpt(row)} requires cardiac DX (I-code, R00/R06/R07/R55, or Q2x/Q3x congenital).") # ---- Lab medical-necessity ---- def rule_a1c_requires_diabetes_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"83036","83037"}: return None for d in (x.replace(".", "") for x in dx_list(row)): # E08-E13 diabetes, R73 elevated glucose, Z131 diabetes screening, O24 gestational, Z830 FH diabetes if d.startswith(("E08","E09","E10","E11","E12","E13","R73","Z131","O24","Z833")): return None return fail("rA1CNoDiab", f"HbA1c CPT {cpt(row)} requires diabetes/prediabetes DX (E08–E13, R73, Z13.1, O24).") def rule_lipid_panel_requires_cv_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"80061"}: return None for d in (x.replace(".", "") for x in dx_list(row)): # Cardiovascular risk / disease / screening indicators if d.startswith(("I","E11","E10","E78","F17","Z13220","Z13","Z72","Z83","R7302","R794")): return None return fail("rLipidNoCV", f"Lipid panel CPT {cpt(row)} requires CV-risk/disease DX (I-code, E78, E10/E11, Z13.220, Z83.42, F17).") def rule_psa_male(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"84152","84153","84154","G0103"}: return None if str(get(row, "Gender", "")).strip().lower() == "female": return fail("rPSAFemale", f"PSA CPT {cpt(row)} is not valid for Female gender.") return None def rule_pap_smear_female(row: Dict) -> Optional[RuleResult]: pap_cpts = {"88141","88142","88143","88147","88148","88150","88152","88153","88154","88155", "88164","88165","88166","88167","88174","88175","Q0091","G0123","G0124","G0141","G0143","G0144","G0145","G0147","G0148"} if cpt(row) not in pap_cpts: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rPapMale", f"Pap smear CPT {cpt(row)} is not valid for Male gender.") return None # ---- DME medical-necessity ---- def rule_cpap_requires_osa_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"E0601","E0470","E0471","E0561","E0562","95810","95811"}: return None for d in (x.replace(".", "") for x in dx_list(row)): if d.startswith("G473"): return None return fail("rCPAPNoOSA", f"CPAP/BiPAP CPT {cpt(row)} requires sleep apnea DX (G47.3x).") def rule_wheelchair_requires_mobility_dx(row: Dict) -> Optional[RuleResult]: wc_cpts = {"K0001","K0002","K0003","K0004","K0005","K0006","K0007","K0009", "E1050","E1060","E1070","E1083","E1084","E1085","E1086","E1087","E1088","E1089","E1090", "E1100","E1110","E1130","E1140","E1150","E1160","E1161","E1170","E1180","E1190", "E1200","E1220","E1230","E1231","E1232","E1233","E1234","E1235","E1236","E1237","E1238"} if cpt(row) not in wc_cpts: return None for d in (x.replace(".", "") for x in dx_list(row)): # MSK, neuro (G20/G35/G80-G83/I69 stroke), injury, R26 gait if d.startswith(("M","S","G20","G35","G36","G80","G81","G82","G83","I69","R26","Z99")): return None return fail("rWheelchairNoDx", f"Wheelchair CPT {cpt(row)} requires mobility-impairment DX (M/S-code, G20/G35/G80–G83, I69, R26).") # ---- Gender/anatomy specific ---- def rule_fetal_us_female(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"76801","76802","76805","76810","76811","76812","76813","76814","76815","76816","76817","76818","76819","76820","76821"}: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rFetalUSMale", f"Fetal ultrasound CPT {cpt(row)} is not valid for Male gender.") return None def rule_circumcision_male(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"54150","54160","54161","54162","54163","54164"}: return None if str(get(row, "Gender", "")).strip().lower() == "female": return fail("rCircFemale", f"Circumcision CPT {cpt(row)} is not valid for Female gender.") return None def rule_iud_female(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"58300","58301","J7296","J7297","J7298","J7300","J7301"}: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rIUDMale", f"IUD CPT {cpt(row)} is not valid for Male gender.") return None def rule_colposcopy_female(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"57420","57421","57452","57454","57455","57456","57460","57461"}: return None if str(get(row, "Gender", "")).strip().lower() == "male": return fail("rColpoMale", f"Colposcopy CPT {cpt(row)} is not valid for Male gender.") return None # ---- POS-specific E&M ---- def rule_inpatient_em_requires_inpatient_pos(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99221","99222","99223","99231","99232","99233","99238","99239"}: return None pos = str(get(row, "POS", "")).strip() if pos and pos not in {"21","51","61"}: return fail("rInptEMPos", f"Inpatient E&M CPT {cpt(row)} requires POS 21/51/61, got POS {pos}.") return None def rule_observation_em_requires_pos22(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99218","99219","99220","99224","99225","99226","99234","99235","99236"}: return None pos = str(get(row, "POS", "")).strip() if pos and pos != "22": return fail("rObsEMPos", f"Observation E&M CPT {cpt(row)} requires POS 22, got POS {pos}.") return None def rule_nf_em_requires_pos_31_32(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99304","99305","99306","99307","99308","99309","99310","99315","99316"}: return None pos = str(get(row, "POS", "")).strip() if pos and pos not in {"31","32"}: return fail("rNFEMPos", f"Nursing facility E&M CPT {cpt(row)} requires POS 31/32, got POS {pos}.") return None def rule_home_visit_requires_pos12(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"99341","99342","99343","99344","99345","99347","99348","99349","99350"}: return None pos = str(get(row, "POS", "")).strip() if pos and pos != "12": return fail("rHomeVisitPos", f"Home visit CPT {cpt(row)} requires POS 12, got POS {pos}.") return None # ---- Age/gender-gated screenings ---- def rule_aaa_screening_male_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"76706","G0389"}: return None gender = str(get(row, "Gender", "")).strip().lower() if gender == "female": return fail("rAAAScreenFemale", f"AAA screening CPT {cpt(row)} is Medicare-covered for male patients only.") age = patient_age(row) if age is None: return None if age < 65 or age > 75: return fail("rAAAScreenAge", f"AAA screening CPT {cpt(row)} covered only at age 65–75, got age {age:.0f}.") return None def rule_lung_ct_screening_age(row: Dict) -> Optional[RuleResult]: if cpt(row) not in {"71271","G0296","G0297"}: return None age = patient_age(row) if age is None: return None if age < 50 or age > 77: return fail("rLungCTScreenAge", f"Lung CT screening CPT {cpt(row)} covered only at age 50–77, got age {age:.0f}.") return None def rule_q4169_supervision(row: Dict) -> Optional[RuleResult]: if cpt(row) == "Q4169": return fail("rCPTCodeQ4169", "Supervision review mandatory for CPT Q4169.") return None def rule_medicare_forbidden_cpt(row: Dict) -> Optional[RuleResult]: forbidden = {"90471", "99417", "H0004", "1152F", "3281F"} if cpt(row) in forbidden and is_medicare(row): return fail("rNoMedicareCPT", f"Medicare does not cover CPT {cpt(row)}.") return None def rule_g0444_unbundle(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0444": return fail("rcptG0438&G0444", "G0444 has an unbundle relationship with G0438.") return None def rule_0000_nonbillable(row: Dict) -> Optional[RuleResult]: if cpt(row) == "0000" or any(d == "0000" for d in dx_list(row)): return fail("rCPTDX0000", "CPT or DX 0000 is non-billable.") return None def rule_g2211_modifier25(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G2211" and has_modifier(row, "25"): return fail("rG2211cpt25mf", "G2211 is not payable when reported with modifier 25.") return None def rule_g2011_modifier(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G2011" and any(m for m in get_modifiers(row) if m): return fail("rCPTCodeG2011", "Modifier is inappropriate with CPT G2011.") return None def rule_smoking_cessation_requires_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) in ("99406", "99407") and not dx_list(row): return fail("rSCCDVdxcode", f"CPT {cpt(row)} requires at least one valid diagnosis code.") return None def rule_q4221_units(row: Dict) -> Optional[RuleResult]: units = to_float(get(row, "ServiceUnits", 1), 1) if cpt(row) == "Q4221" and units > 52: return fail("rcptQ4221", f"Max allowable units for Q4221 is 52. Current: {units:.0f}") return None def rule_q4250_units(row: Dict) -> Optional[RuleResult]: units = to_float(get(row, "ServiceUnits", 1), 1) if cpt(row) == "Q4250" and units > 24: return fail("rcptQ4250", f"Max allowable units for Q4250 is 24. Current: {units:.0f}") return None def rule_95004_min_units(row: Dict) -> Optional[RuleResult]: units = to_float(get(row, "ServiceUnits", 1), 1) if cpt(row) == "95004" and units < 70: return fail("r95004Unit<70", f"Less than 70 units not allowed for CPT 95004. Current: {units:.0f}") return None def rule_95165_min_units(row: Dict) -> Optional[RuleResult]: units = to_float(get(row, "ServiceUnits", 1), 1) if cpt(row) == "95165" and units < 10: return fail("r95165Unit<10", f"Less than 10 units not allowed for CPT 95165. Current: {units:.0f}") return None def rule_zero_units(row: Dict) -> Optional[RuleResult]: if to_float(get(row, "ServiceUnits", 1), 1) == 0: return fail("r0Units", "0 units is not allowed.") return None def rule_33340_primary_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) != "33340": return None dxs = dx_list(row) primary_dx = dxs[0].replace(".", "") if dxs else "" valid = {"I480", "I4811", "I4819", "I4821"} if primary_dx not in valid: return fail("r33340dx1", "For 33340, primary DX should be I48.0, I48.11, I48.19, or I48.21.") return None def rule_33340_pos(row: Dict) -> Optional[RuleResult]: if cpt(row) == "33340" and str(get(row, "POS", "")).strip() not in ("", "21"): return fail("r33340POS21", f"For 33340, POS should be 21. Current POS: {get(row, 'POS', '')}") return None def rule_rpm_checkbox(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"99453", "99454", "99457", "99458"}: is_rpm = str(get(row, "ISRPM", "0")).strip().lower() if is_rpm not in ("1", "true", "yes"): return fail("rRPMchk", f"RPM box must be checked for RPM CPT {cpt(row)}.") return None def rule_q_series_note(row: Dict) -> Optional[RuleResult]: code = cpt(row) if code.startswith("Q") and not str(get(row, "NoteText", "")).strip(): return fail("rQCPTCode", "Claim note text is mandatory for Q-series CPT code.") return None def rule_colon_in_note(row: Dict) -> Optional[RuleResult]: if ":" in str(get(row, "NoteText", "")).strip(): return fail("rColonNotAllowed", "Colon character is not allowed in NoteText.") return None def rule_wellmed_g0508(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0508" and "wellmed" in str(get(row, "ClaimPriIns", "")).lower(): return fail("rWellmedInsObjection", "Wellmed insurance does not accept CPT G0508.") return None def rule_prolong_service(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"99417", "G2212", "G0318", "G0317", "G0316", "99418"}: return fail("rProlongService", f"Prolong service code {cpt(row)} requires a high level E&M pairing.") return None def rule_99490_not_bill(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99490": return fail("rcpt99490", "Per provider instruction, do not bill CCM service 99490.") return None def rule_99024_nonreplacement(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99024": return fail("r99024", "99024 is non-replacement CPT without provider instructions.") return None def rule_99498_add_on(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99498": return fail("rcpt99498", "99498 is an add-on code and must be billed with 99497.") return None def rule_vaccine_admin_required(row: Dict) -> Optional[RuleResult]: vax_combos = { "91300": ["0001A", "0002A"], "91301": ["0011A", "0012A"], "91302": ["0021A", "0022A"], "91303": ["0031A"], } code = cpt(row) if code in vax_combos: return fail(f"rAdmin{code}", f"Administration code {'/'.join(vax_combos[code])} is mandatory for CPT {code}.") return None def rule_90653_admin_required(row: Dict) -> Optional[RuleResult]: if cpt(row) == "90653": return fail("rAdmcode90653", "Administration code G0008 or 90471 is mandatory for CPT 90653.") return None def rule_self_pay_with_active_insurance(row: Dict) -> Optional[RuleResult]: if str(get(row, "BillAs", "")).lower() == "self-pay" and get(row, "ClaimPriIns"): return fail("rInsAct", f"Patient has active insurance {get(row, 'ClaimPriIns')}. Do not bill as self-pay.") return None def rule_g9719_required_dx(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G9719": required = {"F319", "F329", "F339"} if not any(d.replace(".", "") in required for d in dx_list(row)): return fail("rCPTG9719dx", "Any one of F31.9, F32.9, or F33.9 is mandatory with CPT G9719.") return None def rule_mips_99221_99223(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if cpt(row) in {"99221", "99222", "99223"} and age is not None and age >= 65: return fail("rMIPS1", "MIPS code from QID#47 and G8427 is mandatory for CPT 99221-99223 when patient is 65+.") return None def rule_mips_99236(row: Dict) -> Optional[RuleResult]: if cpt(row) == "99236": return fail("rMIPS2", "MIPS code from QID#128 is mandatory for CPT 99236.") return None def rule_mips_99304_99306(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if cpt(row) in {"99304", "99305", "99306"} and age is not None and age >= 65: return fail("rMIPS3", "MIPS codes from QID#47 and QID#181 are mandatory for CPT 99304-99306 when patient is 65+.") return None def rule_mips_99307_99310(row: Dict) -> Optional[RuleResult]: age = patient_age(row) if cpt(row) in {"99307", "99308", "99309", "99310"} and age is not None and age >= 65: return fail("rMIPS4", f"G8427 is mandatory for CPT {cpt(row)} when patient is 65+.") return None def rule_comb1(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"G8733", "G8734", "G8535"}: return fail("rCOMB1", "CPT G8733, G8734, and G8535 are not acceptable together.") return None def rule_comb3(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"G8433", "G8431", "G8510"}: return fail("rCOMB3", "CPT G8433, G8431, and G8510 are not acceptable together.") return None def rule_comb4(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"G8420", "G8417", "G8418", "G9716"}: return fail("rCOMB4", "CPT G8420, G8417, G8418, and G9716 are not acceptable together.") return None def rule_comb2(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"1123F", "1124F"}: return fail("rCOMB2", "CPT 1123F and 1124F are not acceptable together.") return None def rule_medicare_uhc_use_g0447(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"99401", "99402", "99403"}: ins = str(get(row, "ClaimPriIns", "")).lower() if any(x in ins for x in ("medicare", "uhc", "united", "wellmed")): return fail("rcptG0447", f"For Medicare or UHC, use G0447 instead of {cpt(row)}.") return None def rule_99441_99443_pos11(row: Dict) -> Optional[RuleResult]: pos = str(get(row, "POS", "")).strip() if cpt(row) in {"99441", "99442", "99443"} and pos not in ("", "11"): return fail("rPOS11SpecCPT", "POS 11 is only acceptable for CPT codes 99441-99443.") return None def rule_99354_99359_not_pos11(row: Dict) -> Optional[RuleResult]: pos = str(get(row, "POS", "")).strip() if cpt(row) in {"99354", "99355", "99358", "99359"} and pos == "11": return fail("rCptPos11", f"CPT {cpt(row)} is not billable for office services POS 11.") return None def rule_0034a_date(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if cpt(row) == "0034A" and dos and dos < date(2021, 10, 20): return fail("rCPT0034A", "CPT 0034A is effective from 2021-10-20.") return None def rule_0071a_0072a_date(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if cpt(row) in {"0071A", "0072A"} and dos and dos < date(2021, 10, 29): return fail("rCPT0071A", f"CPT {cpt(row)} is effective from 2021-10-29.") return None def rule_g0438_use_99203(row: Dict) -> Optional[RuleResult]: if cpt(row) == "G0438": return fail("rCPT99203", "Please use bill code 99203 instead of G0438.") return None def rule_99470_99457_99458(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"99470", "99457", "99458"}: return fail("rCPTCode99470&99457/99458", "99470 and 99457/99458 are not billable on the same DOS.") return None def rule_99454_99445(row: Dict) -> Optional[RuleResult]: if cpt(row) in {"99454", "99445"}: return fail("rCPTCode99454&99445", "CPT 99454 and 99445 must not be billed for same patient on same DOS.") return None def rule_office_cpt_pos(row: Dict) -> Optional[RuleResult]: office_cpts = {"99201", "99202", "99203", "99204", "99205", "99211", "99212", "99213", "99214", "99215"} pos = str(get(row, "POS", "")).strip() if cpt(row) in office_cpts and pos not in ("", "11", "02"): return fail("rCPTCode=2", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 11 or 02.") return None def rule_inpatient_cpt_pos(row: Dict) -> Optional[RuleResult]: inpatient_cpts = {"99221", "99222", "99223", "99231", "99232", "99233", "99238", "99239"} pos = str(get(row, "POS", "")).strip() if cpt(row) in inpatient_cpts and pos not in ("", "21", "61"): return fail("rCPTCode=3", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 21 or 61.") return None def rule_snf_cpt_pos(row: Dict) -> Optional[RuleResult]: snf_cpts = {"99304", "99305", "99306", "99307", "99308", "99309", "99310", "99315", "99316"} pos = str(get(row, "POS", "")).strip() if cpt(row) in snf_cpts and pos not in ("", "31", "32", "02", "13"): return fail("rCPTCode=4", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 31, 32, 02, or 13.") return None def rule_observation_cpt_pos(row: Dict) -> Optional[RuleResult]: pos = str(get(row, "POS", "")).strip() if cpt(row) in {"99217", "99218", "99219", "99220"} and pos not in ("", "22"): return fail("rCPTCode=5", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 22.") return None def rule_er_cpt_pos(row: Dict) -> Optional[RuleResult]: pos = str(get(row, "POS", "")).strip() if cpt(row) in {"99281", "99282", "99283", "99284", "99285", "99286", "99287", "99288"} and pos not in ("", "23"): return fail("rCPTCode=6", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 23.") return None def rule_home_visit_cpt_pos(row: Dict) -> Optional[RuleResult]: pos = str(get(row, "POS", "")).strip() if cpt(row) in {"99341", "99342", "99343", "99344", "99345", "99347", "99348", "99349", "99350"} and pos not in ("", "02", "10", "12", "13", "14"): return fail("rCPTCode=7", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 02, 10, 12, 13, or 14.") return None def rule_admission_date_empty_for_office(row: Dict) -> Optional[RuleResult]: if str(get(row, "POS", "")).strip() == "11" and adm_date(row): return fail("rAdmissionDate=1", "Admission date must be empty for office visit POS 11.") return None def rule_admission_date_required_inpatient(row: Dict) -> Optional[RuleResult]: if str(get(row, "POS", "")).strip() == "21" and not adm_date(row): return fail("rAdmissionDate=2", "Admission date must be entered for inpatient POS 21.") return None def rule_discharge_not_before_admission(row: Dict) -> Optional[RuleResult]: adm = adm_date(row) dis = dis_date(row) if adm and dis and dis < adm: return fail("rAdmissionDate3", f"Discharge date {dis} should not be earlier than admission date {adm}.") return None def rule_future_admission_date(row: Dict) -> Optional[RuleResult]: adm = adm_date(row) if adm and adm > date.today(): return fail("rAdmComp", "Future admission date is not allowed.") return None def rule_future_dos(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if dos and dos > date.today(): return fail("rnotsubclm", f"Claim must not be submitted with future DOS {dos}.") return None def rule_january_2020_dos(row: Dict) -> Optional[RuleResult]: dos = dos_date(row) if dos and dos.year == 2020 and dos.month == 1: return fail("rDOSJan20", "Claim is not valid for DOS in January 2020.") return None def rule_dos_required(row: Dict) -> Optional[RuleResult]: if not dos_date(row): return fail("rDOSMissed", "DOS is mandatory field.") return None def rule_dummy_dos(row: Dict) -> Optional[RuleResult]: if "1900" in str(get(row, "DOS", "")): return fail("rDummyDOS", "Dummy DOS entered in claim.") return None def rule_charges_positive(row: Dict) -> Optional[RuleResult]: if to_float(get(row, "TotalCharges", 0), 0) <= 0: return fail("rCharges", "CPT charges must be greater than 0.") return None def rule_charges_max(row: Dict) -> Optional[RuleResult]: charges = to_float(get(row, "TotalCharges", 0), 0) if charges > 99999: return fail("r99Kcharges", f"Charges cannot exceed 99,999. Current: {charges:,.2f}") return None def rule_patient_payment_not_exceed_charges(row: Dict) -> Optional[RuleResult]: charges = to_float(get(row, "TotalCharges", 0), 0) pat_pmt = to_float(get(row, "PatientPayment", 0), 0) if pat_pmt > 0 and charges < pat_pmt: return fail("rChgPatPmt", f"Claim charges {charges:,.2f} cannot be less than patient payment {pat_pmt:,.2f}.") return None def rule_referring_required_for_medicare(row: Dict) -> Optional[RuleResult]: if is_medicare(row) and not str(get(row, "ReferringPhysician", "")).strip(): return fail("rRefferingPhysician", "Referring physician is mandatory for Medicare claims.") return None def rule_referral_requires_referring(row: Dict) -> Optional[RuleResult]: ref_num = str(get(row, "ReferralNumber", "")).strip() if ref_num and ref_num != "0" and not str(get(row, "ReferringPhysician", "")).strip(): return fail("rReferralNo", "When referral number is sent then referring provider is mandatory.") return None def rule_attending_required(row: Dict) -> Optional[RuleResult]: if not str(get(row, "AttendingPhysician", "")).strip(): return fail("rAttendingPhysician", "Attending Physician is mandatory field.") return None def rule_billing_required(row: Dict) -> Optional[RuleResult]: if not str(get(row, "BillingPhysician", "")).strip(): return fail("rBillingPhysician", "Billing Physician is mandatory field.") return None def rule_medicare_mbi_format(row: Dict) -> Optional[RuleResult]: if not is_medicare(row): return None mbi = str(get(row, "PolicyNumber", "")).strip().replace("-", "").replace(" ", "") if mbi and len(mbi) == 11: valid_mbi = True letter_pos = {1, 4, 7, 8} number_pos = {0, 3, 6, 9, 10} for i, ch in enumerate(mbi): if i in letter_pos and not ch.isalpha(): valid_mbi = False if i in number_pos and not ch.isdigit(): valid_mbi = False if not valid_mbi: return fail("rPolicyNumber", f"Patient MBI format is invalid: {mbi}") return None def rule_dummy_dob(row: Dict) -> Optional[RuleResult]: if "1900" in str(get(row, "DateOfBirth", "")): return fail("rDummyDOB", "Dummy DOB entered in patient.") return None def rule_dob_required(row: Dict) -> Optional[RuleResult]: if not get(row, "DateOfBirth"): return fail("rDobMis", "DOB is Mandatory field.") return None def rule_gender_required(row: Dict) -> Optional[RuleResult]: gender = str(get(row, "Gender", "")).strip().lower() if not gender or gender in ("unknown", "none"): return fail("rGender", "Subscriber gender code is mandatory.") return None def rule_insurance_required(row: Dict) -> Optional[RuleResult]: if not get(row, "ClaimPriIns"): return fail("rInsRequired", "Insurance Required. Please select valid insurance.") return None def rule_molina_9_digit_id(row: Dict) -> Optional[RuleResult]: pol_num = str(get(row, "PolicyNumber", "")).strip().replace("-", "") if len(pol_num) == 9 and pol_num.isdigit(): ins = str(get(row, "ClaimPriIns", "")).lower() if "medicaid" in ins and "molina" not in ins: return fail("rIns276PolicyNumberequal9", "Please select Molina Medicaid because member ID has 9 digits.") return None def rule_aarp_not_primary(row: Dict) -> Optional[RuleResult]: ins_name = str(get(row, "ClaimPriIns", "")).lower() if str(get(row, "BillAs", "")).lower() == "primary" and any(x in ins_name for x in ("aarp", "trycare", "tricare for life")): return fail("rAARPTryCare", f"Insurance {get(row, 'ClaimPriIns')} can never be primary payer.") return None def rule_workers_comp_accident_date(row: Dict) -> Optional[RuleResult]: ins_lower = str(get(row, "ClaimPriIns", "")).lower() if any(x in ins_lower for x in ("worker", "workers comp", "wc ")) and not get(row, "AccidentDate"): return fail("WCOMClaims", "Accident date is necessary for worker compensation claims.") return None def rule_11042_requires_pa(row: Dict) -> Optional[RuleResult]: if cpt(row) == "11042" and str(get(row, "BillAs", "")).lower() == "primary" and not get(row, "PANumber"): return fail("rPACPT11042", "Pre-authorization is missing for wound care 11042.") return None # ============================================================ # ICD-10 → CPT Mapping Rules # Common diagnosis codes with their medically appropriate CPT # procedure codes for denial prevention. # ============================================================ # ICD-10 primary diagnosis → acceptable CPT codes ICD_CPT_MAP: Dict[str, set] = { # --- Diabetes --- "E11.9": {"99213","99214","99215","99490","99491","83036","82947","81003","G0447"}, "E11.65": {"99213","99214","99215","83036","82947","80048","99490"}, "E11.40": {"99213","99214","99215","95905","95907","95908","95913","64490","64491"}, "E11.311": {"92004","92012","92014","92250","92228"}, "E10.9": {"99213","99214","99215","83036","82947","95249","99490"}, # --- Hypertension --- "I10": {"99213","99214","99215","93000","80048","80053","81003","99490","93786","93788"}, "I11.9": {"99213","99214","99215","93000","93306","93320","93325","80053"}, "I12.9": {"99213","99214","99215","80053","82565","81001","82042"}, # --- Heart Disease --- "I25.10": {"99213","99214","99215","93000","93306","93307","93350","93351","78452","93015"}, "I20.9": {"99213","99214","99215","93000","93015","93306","93350","78452"}, "I50.9": {"99213","99214","99215","93306","93307","93320","71046","80053","83880"}, "I48.91": {"99213","99214","99215","93000","93306","93268","93241","93243","93650"}, # --- Respiratory --- "J06.9": {"99212","99213","99214","87880","87804"}, "J18.9": {"99213","99214","99215","71045","71046","87046","87081","71250"}, "J44.1": {"99213","99214","99215","94010","94060","71046","94640","94726"}, "J44.0": {"99213","99214","99215","94010","71046","87046"}, "J45.901": {"99213","99214","99215","94010","94060","94640","94664","71046"}, # --- Musculoskeletal --- "M54.5": {"99213","99214","72100","72110","72148","97110","97530","64483","64484","63030","63047"}, "M54.2": {"99213","99214","72040","72050","72141","72142","97110","97530","64490","64491"}, "M17.11": {"99213","99214","73560","73562","73721","20610","27447","97110"}, "M16.11": {"99213","99214","73502","73521","73721","27130","20610"}, "M25.511": {"99213","99214","73010","73030","73221","20610","29827","97110"}, # --- Mental Health --- "F32.1": {"99213","99214","90832","90834","90837","96127","G2212"}, "F41.1": {"99213","99214","90832","90834","90837","96127"}, "F31.9": {"99213","99214","99215","90837","90847","80305","80306"}, "F20.9": {"99213","99214","99215","90837","90853","80305"}, "F10.20": {"99213","99214","90832","90837","G2067","G2068"}, "F11.20": {"99213","99214","99215","90837","G2067","G2068","G2069"}, # --- Gastrointestinal --- "K21.0": {"99213","99214","43239","43235","91034"}, "K57.30": {"99213","99214","45378","45380","74177"}, "K92.1": {"99213","99214","99215","43235","43239","45378","45380","74177"}, "K72.10": {"99214","99215","80076","80053","76700","74177"}, "K74.60": {"99214","99215","80076","80053","76700","43244"}, # --- Cancer --- "C34.10": {"99213","99214","99215","71250","78816","32663","96413","96415","77263","77301"}, "C50.911": {"99213","99214","99215","77067","76641","78816","19307","96413","96415","77263"}, "C18.9": {"99214","99215","45378","45384","74177","78816","96413","96415"}, "C61": {"99213","99214","99215","84153","76872","55866","96413","77263"}, "C64.1": {"99214","99215","74177","78816","50543","96413"}, # --- Infectious Disease --- "A41.9": {"99221","99222","99223","99231","99232","99233","87046","87081","36415","80053","85025"}, "B20": {"99213","99214","99215","86703","87536","86360","80305"}, "A09": {"99212","99213","99214","87046","87081","82270"}, # --- Neurological --- "G43.909": {"99213","99214","70553","95923","64615"}, "G35": {"99213","99214","99215","70553","95925","95926","96413"}, "G20": {"99213","99214","99215","70553","97110","97530","95923"}, "G47.33": {"99213","99214","95810","95811","94660","E0601"}, "G89.29": {"99213","99214","64483","64484","64490","64491","62322","62323","97110"}, # --- Genitourinary --- "N39.0": {"99212","99213","81001","81003","87086"}, "N18.3": {"99213","99214","80053","82565","81001","82042"}, "N18.6": {"99213","99214","99215","90935","90937","90945","90947","80053"}, "N40.1": {"99213","99214","52601","52441","76872","84153"}, # --- Endocrine --- "E03.9": {"99213","99214","84439","84480","84481","76536"}, "E05.90": {"99213","99214","84439","84443","76536","78012"}, "E66.01": {"99213","99214","99215","G0447","43644","43770","97802","97803"}, "E78.5": {"99213","99214","80061","82465","83718","84478"}, # --- Preventive / Wellness --- "Z00.00": {"99395","99396","99397","G0438","G0439","85025","80053","83036","80061","93000"}, "Z12.31": {"45378","45380","G0121","82274"}, "Z12.11": {"77066","77067","G0202"}, "Z23": {"90471","90472","90686","90714","90732"}, # --- OB/GYN --- "Z34.00": {"99213","99214","76801","80055","86703","85025"}, "N87.1": {"99213","99214","57460","57455","88141"}, # --- Injury --- "S72.001A":{"99221","99222","99223","27235","27236","73502"}, "S52.501A":{"99213","99214","25600","25605","73100","29125"}, "S13.4XXA":{"99213","99214","72050","97110","97530"}, } # CPT codes that require at least one matching ICD-10 on the claim CPT_REQUIRES_DX: Dict[str, set] = { "83036": {"E10.9","E11.9","E11.65","E10.10","R73.09","Z13.1"}, # HbA1c "84153": {"C61","N40.0","N40.1","Z12.5"}, # PSA "86703": {"B20","Z11.4","Z34.00","O09.90"}, # HIV 1/2 antibody "80061": {"E78.5","E78.00","E78.2","I25.10","I10","Z00.00"}, # Lipid panel "82565": {"N18.3","N18.4","N18.5","N18.6","I12.9","E11.65"}, # Creatinine "70553": {"G35","G43.909","G20","C71.9","I63.9","G40.909"}, # MRI brain w/wo "72148": {"M54.5","M51.16","M51.17","G89.29","S33.5XXA"}, # MRI lumbar "72141": {"M54.2","M50.10","G89.29","S14.9XXA"}, # MRI cervical "73721": {"M17.11","M17.12","M23.200","S83.9XXA"}, # MRI joint "74177": {"K57.30","K72.10","K74.60","C18.9","C64.1","R10.9"}, # CT abdomen/pelvis "78816": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90"}, # PET-CT "71046": {"J18.9","J44.1","I50.9","R05.9","R06.09"}, # Chest X-ray "93015": {"I25.10","I20.9","R07.9","I10"}, # Stress test "93306": {"I50.9","I48.91","I25.10","I11.9","I34.0"}, # Complete echo "78452": {"I25.10","I20.9","I48.91"}, # Nuclear stress "27447": {"M17.11","M17.12","M17.31"}, # TKA "27130": {"M16.11","M16.12","M16.31"}, # THA "45378": {"K57.30","K92.1","Z12.31","C18.9","D12.5"}, # Colonoscopy "43235": {"K21.0","K92.1","K29.70","D13.1"}, # EGD diagnostic "63030": {"M51.16","M51.17","G89.29","M54.5"}, # Lumbar discectomy "64483": {"M54.5","M51.16","G89.29"}, # Transforaminal ESI "64490": {"M54.2","M50.10","G89.29"}, # Cervical facet inj "90837": {"F32.1","F41.1","F31.9","F20.9","F10.20","F11.20"}, # Psychotherapy 60m "95810": {"G47.33","G47.31","G47.30"}, # Polysomnography "90935": {"N18.6","N17.9"}, # Hemodialysis "43644": {"E66.01","E66.09"}, # Gastric bypass "G0447": {"E66.01","E66.09","E66.1","Z68.35","Z68.36","Z68.37","Z68.38","Z68.39","Z68.41"}, "20610": {"M17.11","M17.12","M16.11","M25.511","M25.512","M25.311","M25.312"}, "94640": {"J44.1","J44.0","J45.901","J43.9"}, # Nebulizer "96413": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90","C82.90","C83.90"}, } def rule_dx_cpt_mismatch(row: Dict) -> Optional[RuleResult]: """ ICD-10 → CPT direction: If the primary diagnosis is in ICD_CPT_MAP, the billed CPT must appear in the allowed set for that diagnosis. """ claim_cpt = cpt(row) dxs = dx_list(row) if not claim_cpt or not dxs: return None primary_dx = dxs[0].upper().strip() if primary_dx in ICD_CPT_MAP: allowed = ICD_CPT_MAP[primary_dx] if claim_cpt not in allowed: sample = ", ".join(sorted(allowed)[:6]) return fail( "rDxCptMismatch", f"CPT {claim_cpt} is not a recognized match for primary diagnosis " f"{primary_dx}. Common expected codes: {sample} (and others).", ) return None def rule_cpt_requires_dx(row: Dict) -> Optional[RuleResult]: """ CPT → ICD-10 direction: If the billed CPT is in CPT_REQUIRES_DX, at least one claim diagnosis must match the required set. """ claim_cpt = cpt(row) dxs = dx_list(row) if not claim_cpt or claim_cpt not in CPT_REQUIRES_DX: return None required = CPT_REQUIRES_DX[claim_cpt] claim_dx_set = {d.upper().strip() for d in dxs} if not claim_dx_set & required: sample = ", ".join(sorted(required)[:4]) return fail( "rCptNoDx", f"CPT {claim_cpt} requires at least one supporting diagnosis " f"(e.g., {sample} …) but none found on the claim.", ) return None HARD_RULES: List[RuleFunc] = [ rule_rdxcode1, rule_rdxcode_missing, rule_perinatal_dx_age, rule_pediatric_dx_age, rule_maternity_dx_gender, rule_female_only_dx_gender, rule_external_cause_not_primary, rule_z6830_not_primary, rule_dx_g40009_g40909, rule_dx_g4089_g40009, rule_dx_f17200_z87891, rule_dx_n183_date, rule_dx_n1830_n1831_n1832_date, rule_dx_m5459_r051_date, rule_z01818_z01810_only_primary, rule_wellmed_f_series_dx, rule_90653_requires_z23, rule_zip_valid, rule_state_valid, rule_city_valid, rule_last_name_required, rule_first_name_required, rule_patient_name_special_chars, rule_address_required, rule_is_ptl, rule_cpt_required, rule_g0389_replaced, rule_94620_replaced, rule_g0436_replaced, rule_g0437_replaced, rule_q2037_replaced, rule_92585_deleted, rule_g8553_deleted, rule_0031a_wrong_code_for_age, rule_consult_code_medicare, rule_g0008_insurance, rule_90471_medicare, rule_consult_requires_referring, rule_25000_not_allowed, rule_jcode_ndc, rule_u0002_clia, rule_lab_cpt_clia, rule_high_level_99203, rule_high_level_99213, rule_newborn_cpt_age, rule_gender_specific_cpt, rule_maternity_cpt_gender, rule_maternity_cpt_age, rule_delivery_cpt_pos, rule_chemo_requires_cancer_dx, rule_dialysis_requires_renal_dx, rule_cardiac_rehab_requires_cardiac_dx, rule_awv_medicare_age, rule_screening_colonoscopy_age, rule_inpatient_only_cpt_office, rule_prostate_cpt_gender, rule_pulmonary_rehab_requires_pulmonary_dx, rule_cardiac_cath_requires_cardiac_dx, rule_radiation_oncology_requires_cancer, rule_pt_requires_msk_neuro_dx, rule_cataract_surgery_requires_cataract_dx, rule_joint_injection_requires_arthritis_dx, rule_spine_surgery_requires_spine_dx, rule_mammography_female, rule_hysterectomy_female, rule_testicular_cpt_male, rule_gallbladder_surgery_requires_dx, rule_appendectomy_requires_dx, rule_tonsillectomy_requires_dx, rule_pediatric_preventive_age, rule_adult_preventive_age, rule_ed_visit_requires_pos23, rule_office_em_inpatient_pos, rule_bone_density_age, rule_screening_mammo_requires_z1231, rule_wellness_visit_requires_z00, rule_mri_brain_requires_neuro_dx, rule_ct_chest_requires_chest_dx, rule_ct_abdomen_requires_abdominal_dx, rule_echo_requires_cardiac_dx, rule_a1c_requires_diabetes_dx, rule_lipid_panel_requires_cv_dx, rule_psa_male, rule_pap_smear_female, rule_cpap_requires_osa_dx, rule_wheelchair_requires_mobility_dx, rule_fetal_us_female, rule_circumcision_male, rule_iud_female, rule_colposcopy_female, rule_inpatient_em_requires_inpatient_pos, rule_observation_em_requires_pos22, rule_nf_em_requires_pos_31_32, rule_home_visit_requires_pos12, rule_aaa_screening_male_age, rule_lung_ct_screening_age, rule_q4169_supervision, rule_medicare_forbidden_cpt, rule_g0444_unbundle, rule_0000_nonbillable, rule_g2211_modifier25, rule_g2011_modifier, rule_smoking_cessation_requires_dx, rule_q4221_units, rule_q4250_units, rule_95004_min_units, rule_95165_min_units, rule_zero_units, rule_33340_primary_dx, rule_33340_pos, rule_rpm_checkbox, rule_q_series_note, rule_colon_in_note, rule_wellmed_g0508, rule_prolong_service, rule_99490_not_bill, rule_99024_nonreplacement, rule_99498_add_on, rule_vaccine_admin_required, rule_90653_admin_required, rule_self_pay_with_active_insurance, rule_g9719_required_dx, rule_mips_99221_99223, rule_mips_99236, rule_mips_99304_99306, rule_mips_99307_99310, rule_comb1, rule_comb3, rule_comb4, rule_comb2, rule_medicare_uhc_use_g0447, rule_99441_99443_pos11, rule_99354_99359_not_pos11, rule_0034a_date, rule_0071a_0072a_date, rule_g0438_use_99203, rule_99470_99457_99458, rule_99454_99445, rule_office_cpt_pos, rule_inpatient_cpt_pos, rule_snf_cpt_pos, rule_observation_cpt_pos, rule_er_cpt_pos, rule_home_visit_cpt_pos, rule_admission_date_empty_for_office, rule_admission_date_required_inpatient, rule_discharge_not_before_admission, rule_future_admission_date, rule_future_dos, rule_january_2020_dos, rule_dos_required, rule_dummy_dos, rule_charges_positive, rule_charges_max, rule_patient_payment_not_exceed_charges, rule_referring_required_for_medicare, rule_referral_requires_referring, rule_attending_required, rule_billing_required, rule_medicare_mbi_format, rule_dummy_dob, rule_dob_required, rule_gender_required, rule_insurance_required, rule_molina_9_digit_id, rule_aarp_not_primary, rule_workers_comp_accident_date, rule_11042_requires_pa, # ICD-10 ↔ CPT mapping rules rule_dx_cpt_mismatch, rule_cpt_requires_dx, ] # ============================================================ # Rule engine # ============================================================ def apply_hard_rules(row: Dict) -> List[RuleResult]: broken: List[RuleResult] = [] broken.extend(collect_duplicate_dx_rules(row)) for rule in HARD_RULES: result = rule(row) if result and result.fired: broken.append(result) return broken def evaluate_claim(row: Dict) -> Dict: broken = apply_hard_rules(row) return { "ClaimId": get(row, "ClaimId"), "PatientName": get(row, "PatientName"), "CPTCode": get(row, "CPTCode"), "DOS": get(row, "DOS"), "Status": "REJECTED" if broken else "ACCEPTED", "Rules_Broken": len(broken), "Rule_Names": " | ".join(r.name for r in broken), "Error_Messages": " | ".join(r.message for r in broken), } # ============================================================ # Runner # ============================================================ def run_hard_rule_engine(input_file: str = "claims_to_check.xlsx", output_file: str = "Hard_Rule_Results.xlsx"): print("=" * 60) print(" HARD RULE ENGINE") print("=" * 60) try: df = pd.read_excel(input_file, sheet_name="Claims", header=2) df = df[df["ClaimId"].notna() & (df["ClaimId"].astype(str).str.strip() != "")].copy() df = df.reset_index(drop=True) print(f"Loaded {len(df)} claims from {input_file}") except Exception as exc: print(f"Error loading {input_file}: {exc}") return None results = [] for _, row in df.iterrows(): result = evaluate_claim(row.to_dict()) results.append(result) icon = "ACCEPTED" if result["Status"] == "ACCEPTED" else f"REJECTED ({result['Rules_Broken']} rules)" print( f"Claim {str(result['ClaimId']):>8} | " f"{str(result['PatientName']):<22} | " f"CPT {str(result['CPTCode']):<8} | {icon}" ) out_df = pd.DataFrame(results) out_df.to_excel(output_file, index=False, sheet_name="Results") total = len(out_df) rejected = int((out_df["Status"] == "REJECTED").sum()) accepted = total - rejected rejection_pct = (rejected / total * 100) if total else 0 print("-" * 60) print(f"Total Claims : {total}") print(f"Accepted : {accepted}") print(f"Rejected : {rejected}") print(f"Rejection % : {rejection_pct:.1f}%") print(f"Saved results to: {output_file}") return out_df if __name__ == "__main__": import sys input_path = sys.argv[1] if len(sys.argv) > 1 else "claims_to_check.xlsx" output_path = sys.argv[2] if len(sys.argv) > 2 else "Hard_Rule_Results.xlsx" run_hard_rule_engine(input_path, output_path)