rdcm-rule-checker / Hard_Rule_Engine.py
AhsanIkram231's picture
Upload 8 files
d6913a4 verified
"""
Hard_Rule_Engine.py
===================
Rule-engine version of the user's RBS checker.
Purpose:
- Apply hard validation rules to each claim
- If any hard rule fires, mark claim as REJECTED
- Return the list of broken hard rules and their messages
This file is intentionally structured for a rule engine, not for ML features.
A separate file can convert the same rule logic into binary features.
USAGE:
python Hard_Rule_Engine.py
python Hard_Rule_Engine.py claims_to_check.xlsx Hard_Rule_Results.xlsx
"""
from __future__ import annotations
import warnings
from dataclasses import dataclass
from datetime import date
from typing import Callable, Dict, List, Optional
import pandas as pd
warnings.filterwarnings("ignore")
# ============================================================
# Helpers
# ============================================================
def get(row: Dict, col: str, default=""):
value = row.get(col, default)
if pd.isna(value):
return default
return value
def dx_list(row: Dict) -> List[str]:
raw = get(row, "AllDxCodes", "")
return [c.strip().upper() for c in str(raw).split(",") if c.strip()]
def patient_age(row: Dict) -> Optional[float]:
try:
dob = pd.to_datetime(get(row, "DateOfBirth"))
if pd.isna(dob):
return None
return (pd.Timestamp.today() - dob).days / 365.25
except Exception:
return None
def is_medicare(row: Dict) -> bool:
ins = str(get(row, "ClaimPriIns", "")).lower()
return "medicare" in ins
def get_modifiers(row: Dict) -> List[str]:
return [str(get(row, f"Modifier{i}", "")).strip().upper() for i in range(1, 5)]
def cpt(row: Dict) -> str:
return str(get(row, "CPTCode", "")).strip().upper()
def dos_date(row: Dict) -> Optional[date]:
try:
v = get(row, "DOS", "")
if not v or str(v).strip() in ("", "nan", "NaT"):
return None
parsed = pd.to_datetime(v, errors="coerce")
return None if pd.isna(parsed) else parsed.date()
except Exception:
return None
def adm_date(row: Dict) -> Optional[date]:
try:
v = get(row, "AdmissionDate", "")
if not v or str(v).strip() in ("", "nan", "NaT"):
return None
parsed = pd.to_datetime(v, errors="coerce")
return None if pd.isna(parsed) else parsed.date()
except Exception:
return None
def dis_date(row: Dict) -> Optional[date]:
try:
v = get(row, "DischargeDate", "")
if not v or str(v).strip() in ("", "nan", "NaT"):
return None
parsed = pd.to_datetime(v, errors="coerce")
return None if pd.isna(parsed) else parsed.date()
except Exception:
return None
def has_modifier(row: Dict, mod: str) -> bool:
return mod.upper() in [m.upper() for m in get_modifiers(row) if m]
def to_float(value, default: float = 0.0) -> float:
try:
if value is None or str(value).strip() == "":
return default
return float(value)
except Exception:
return default
# ============================================================
# Rule model
# ============================================================
@dataclass
class RuleResult:
name: str
fired: bool
message: str = ""
RuleFunc = Callable[[Dict], Optional[RuleResult]]
def fail(name: str, message: str) -> RuleResult:
return RuleResult(name=name, fired=True, message=message)
# ============================================================
# Hard rules
# One rule = one function
# ============================================================
def rule_rdxcode1(row: Dict) -> Optional[RuleResult]:
dxs = dx_list(row)
if dxs and dxs[0] == "I10":
return fail("rdxcode1", 'Primary Diagnosis code must not equal to "I10".')
return None
def rule_rdxcode_missing(row: Dict) -> Optional[RuleResult]:
if not dx_list(row):
return fail("rDXCodeMissing", "DXCODE1 is mandatory field.")
return None
# def rule_duplicate_dx_codes(row: Dict) -> Optional[RuleResult]:
# seen = []
# for i, d in enumerate(dx_list(row), start=1):
# if d in seen:
# return fail(f"rDxCode{i}_dup", f"DX{i} Code must not be entered twice. ({d})")
# seen.append(d)
# return None
def collect_duplicate_dx_rules(row: Dict) -> List[RuleResult]:
broken = []
seen = []
for i, d in enumerate(dx_list(row), start=1):
if d in seen:
broken.append(fail(f"rDxCode{i}_dup", f"DX{i} duplicate ({d})"))
seen.append(d)
return broken
def rule_perinatal_dx_age(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if age is None or age <= 1:
return None
for i, d in enumerate(dx_list(row), start=1):
if d.startswith(("P0", "P1", "P2", "P3", "P4", "P5", "P6", "P7", "P8", "P9")):
return fail("rDXCodeCategory1", f"DX{i} ({d}) is for Perinatal/Newborn and not valid for this patient.")
return None
def rule_pediatric_dx_age(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if age is None or age <= 17:
return None
for i, d in enumerate(dx_list(row), start=1):
if d.startswith("Z00.1") or d.startswith("Z001"):
return fail("rDXCodeCategory2", f"DX{i} ({d}) is Pediatric and not valid for this patient.")
return None
def rule_maternity_dx_gender(row: Dict) -> Optional[RuleResult]:
gender = str(get(row, "Gender", "")).strip().lower()
if gender != "male":
return None
for i, d in enumerate(dx_list(row), start=1):
if d.startswith(("O", "Z34", "Z36", "Z3A")):
return fail("rDXCodeCategory3", f"DX{i} ({d}) is Maternity Diagnosis and not valid for Male patient.")
return None
def rule_female_only_dx_gender(row: Dict) -> Optional[RuleResult]:
female_only_pfx = (
"N70", "N71", "N72", "N73", "N74", "N75", "N76", "N77", "N80", "N81", "N82", "N83",
"N84", "N85", "N86", "N87", "N88", "N89", "N90", "N91", "N92", "N93", "N94", "N95",
"N96", "N97"
)
gender = str(get(row, "Gender", "")).strip().lower()
if gender != "male":
return None
for i, d in enumerate(dx_list(row), start=1):
if any(d.startswith(p) for p in female_only_pfx):
return fail("rDXCodeCategory5", f"DX{i} ({d}) is Female-only and not valid for Male patient.")
return None
def rule_external_cause_not_primary(row: Dict) -> Optional[RuleResult]:
dxs = dx_list(row)
if dxs and dxs[0].startswith(("V", "W", "X", "Y")):
return fail("rdxcode1_morbidity", f"External cause code ({dxs[0]}) may not be used as primary diagnosis.")
return None
def rule_z6830_not_primary(row: Dict) -> Optional[RuleResult]:
dxs = dx_list(row)
if dxs and dxs[0] in ("Z68.30", "Z6830"):
return fail("rdxZ6830", "Z68.30 is not acceptable as principal DX.")
return None
def rule_dx_g40009_g40909(row: Dict) -> Optional[RuleResult]:
dx_set = {d.replace(".", "") for d in dx_list(row)}
if "G40009" in dx_set and "G40909" in dx_set:
return fail("rDx1Diagnosis", "DX Code G40009 and G40909 cannot be billed in single claim.")
return None
def rule_dx_g4089_g40009(row: Dict) -> Optional[RuleResult]:
dx_set = {d.replace(".", "") for d in dx_list(row)}
if "G4089" in dx_set and "G40009" in dx_set:
return fail("rDxCodeG40", "DX Code G40.89 and G40.009 cannot be billed in single claim.")
return None
def rule_dx_f17200_z87891(row: Dict) -> Optional[RuleResult]:
dx_set = {d.replace(".", "") for d in dx_list(row)}
if "F17200" in dx_set and "Z87891" in dx_set:
return fail("rdxF17&Z87", "Diagnosis codes F17.200 and Z87.891 cannot be reported together.")
return None
def rule_dx_n183_date(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if dos and dos > date(2020, 9, 30):
for d in dx_list(row):
if d.replace(".", "") == "N183":
return fail("rDxN183", "DxCode N18.3 was billable only through 2020-09-30.")
return None
def rule_dx_n1830_n1831_n1832_date(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if dos and dos < date(2020, 10, 1):
for d in dx_list(row):
if d.replace(".", "") in ("N1830", "N1831", "N1832"):
return fail("rDxN18303132", f"DxCode {d} is effective from 2020-10-01.")
return None
def rule_dx_m5459_r051_date(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if dos and dos < date(2021, 10, 1):
for d in dx_list(row):
if d in ("M54.59", "R05.1", "M5459", "R051"):
return fail("rDx-M54.59,R05.1", f"Diagnosis code {d} is effective from 2021-10-01.")
return None
def rule_z01818_z01810_only_primary(row: Dict) -> Optional[RuleResult]:
for i, d in enumerate(dx_list(row)[1:], start=2):
if d.replace(".", "") in ("Z01818", "Z01810"):
return fail("rDXZ01818", f"DX {d} is acceptable only as primary diagnosis, found at position {i}.")
return None
def rule_wellmed_f_series_dx(row: Dict) -> Optional[RuleResult]:
if "wellmed" not in str(get(row, "ClaimPriIns", "")).lower():
return None
for d in dx_list(row):
if d.startswith("F"):
return fail("rDxCodeFSeries", f"For Wellmed insurance, F-series diagnosis codes are not acceptable. Found: {d}")
return None
def rule_90653_requires_z23(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "90653":
if not any(d.replace(".", "") == "Z23" for d in dx_list(row)):
return fail("rDx723Point", "ICD Z23 is mandatory with CPT 90653.")
return None
def rule_zip_valid(row: Dict) -> Optional[RuleResult]:
zip_code = str(get(row, "Zip", "")).strip()
if not zip_code or not (len(zip_code) == 5 or len(zip_code.replace("-", "")) == 9):
return fail("rZIP", "Please enter valid Zip Code for patient.")
return None
def rule_state_valid(row: Dict) -> Optional[RuleResult]:
state = str(get(row, "State", "")).strip()
if not state or len(state) < 2:
return fail("rState", "Please enter valid State for patient.")
return None
def rule_city_valid(row: Dict) -> Optional[RuleResult]:
city = str(get(row, "City", "")).strip()
if not city:
return fail("rCity", "Please enter valid City for patient.")
return None
def rule_last_name_required(row: Dict) -> Optional[RuleResult]:
if not str(get(row, "LastName", "")).strip():
return fail("rLNMMis", "Last Name is Mandatory field.")
return None
def rule_first_name_required(row: Dict) -> Optional[RuleResult]:
if not str(get(row, "FirstName", "")).strip():
return fail("rFNMis", "First Name is Mandatory field.")
return None
def rule_patient_name_special_chars(row: Dict) -> Optional[RuleResult]:
patient_name = str(get(row, "PatientName", "")).strip()
special_chars = set('!#%$&+,./:;<=>@`{|}~"()*\\_^?[]\'')
if set(patient_name) & special_chars:
return fail("rSpecChar", "Special characters are not allowed in Patient name.")
return None
def rule_address_required(row: Dict) -> Optional[RuleResult]:
if not str(get(row, "Address", "")).strip():
return fail("rADSMis", "Address is Mandatory field.")
return None
def rule_is_ptl(row: Dict) -> Optional[RuleResult]:
is_ptl = str(get(row, "IsPTL", "0")).strip().lower()
if is_ptl in ("1", "true", "yes"):
return fail("rIsPTL", "Claim with PTL status must not be submitted.")
return None
def rule_cpt_required(row: Dict) -> Optional[RuleResult]:
if not cpt(row):
return fail("rCPTCode=1", "Please enter at least one CPT for the claim.")
return None
def rule_g0389_replaced(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0389":
return fail("rCPTCOde=10", "CPT Code G0389 is replaced by 76706.")
return None
def rule_94620_replaced(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "94620":
dos = dos_date(row)
if dos and dos > date(2018, 1, 1):
return fail("rCPTCode94620", "CPT 94620 is not acceptable after 2018-01-01 DOS.")
return None
def rule_g0436_replaced(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0436":
return fail("rCPTCode=12", "CPT G0436 is replaced with 99406.")
return None
def rule_g0437_replaced(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0437":
return fail("rCPTCode=13", "CPT G0437 is replaced with 99407.")
return None
def rule_q2037_replaced(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "Q2037":
return fail("rCPTCodeQ2037", "CPT Q2037 has been replaced. Please enter updated CPT.")
return None
def rule_92585_deleted(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "92585":
return fail("rCPT92585", "92585 is a deleted CPT code.")
return None
def rule_g8553_deleted(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G8553":
return fail("rCPTG8553Del", "CPT G8553 is a deleted code.")
return None
def rule_0031a_wrong_code_for_age(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if cpt(row) == "0031A" and age is not None and 12 <= age <= 18:
return fail("rCPT0031A", 'Correct code is "0001A 0002A". Please replace the code.')
return None
def rule_consult_code_medicare(row: Dict) -> Optional[RuleResult]:
consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"}
if cpt(row) in consult_codes and is_medicare(row):
return fail("rConsultCode", "Medicare does not accept consultation codes.")
return None
def rule_g0008_insurance(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0008":
ins_name = str(get(row, "ClaimPriIns", "")).lower()
if "medicare" not in ins_name and "uhc" not in ins_name and "united" not in ins_name:
return fail("rCPTCodeG0008", "G0008 is only allowed with Medicare or UHC.")
return None
def rule_90471_medicare(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "90471" and is_medicare(row):
return fail("rCPTCode90471", "90471 is not allowed with Medicare.")
return None
def rule_consult_requires_referring(row: Dict) -> Optional[RuleResult]:
consult_codes = {"99241", "99242", "99243", "99244", "99245", "99251", "99252", "99253", "99254", "99255"}
if cpt(row) in consult_codes and not get(row, "ReferringPhysician"):
return fail("rConsultCode2", "Referring physician is mandatory with consultation code.")
return None
def rule_25000_not_allowed(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "25000":
return fail("rCPTCode25000", "CPT Code 25000 is not allowed.")
return None
def rule_jcode_ndc(row: Dict) -> Optional[RuleResult]:
if cpt(row).startswith("J") and not str(get(row, "Drug_NDC", "")).strip():
return fail("rJcodeNDC", "J-code without NDC should not be billed.")
return None
def rule_u0002_clia(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "U0002" and not get(row, "CLIANumber"):
return fail("rCPTU0002Clia", "CLIA number is required with CPT U0002.")
return None
def rule_lab_cpt_clia(row: Dict) -> Optional[RuleResult]:
try:
cpt_num = int(cpt(row))
if 80000 <= cpt_num <= 89999 and not get(row, "CLIANumber"):
return fail("rCLIANumberMissing", "CLIA number required for lab services.")
except Exception:
return None
return None
def rule_high_level_99203(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99203":
return fail("rCPTHighLevel", "Please update CPT 99203 as 99204 according to visit type.")
return None
def rule_high_level_99213(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99213":
return fail("rCPTHighLevel", "Please update CPT 99213 as 99214 according to visit type.")
return None
def rule_newborn_cpt_age(row: Dict) -> Optional[RuleResult]:
newborn_cpts = {"99460", "99461", "99462", "99463", "99464", "99465"}
age = patient_age(row)
if cpt(row) in newborn_cpts and age is not None and age > (28 / 365.25):
return fail("rNewbornCPT", f"Newborn care CPT {cpt(row)} is not valid for patient older than 28 days.")
return None
def rule_gender_specific_cpt(row: Dict) -> Optional[RuleResult]:
female_only = {"58300", "58301", "58600", "58605", "58700", "58720", "58740", "58750", "57452", "57454", "57455"}
male_only = {"54000", "54001", "54050", "54055", "54150", "54160", "54161", "55000", "55040", "55041"}
gender = str(get(row, "Gender", "")).strip().lower()
code = cpt(row)
if code in female_only and gender == "male":
return fail("rCPTGender", f"CPT {code} is not valid for Male gender.")
if code in male_only and gender == "female":
return fail("rCPTGender", f"CPT {code} is not valid for Female gender.")
return None
MATERNITY_CPTS = {
"59000", "59012", "59015", "59020", "59025", "59030", "59050", "59051", "59070", "59072",
"59074", "59076", "59100", "59120", "59121", "59130", "59135", "59136", "59140", "59150",
"59151", "59160", "59200", "59300", "59320", "59325", "59350", "59400", "59409", "59410",
"59412", "59414", "59425", "59426", "59430", "59510", "59514", "59515", "59525", "59610",
"59612", "59614", "59618", "59620", "59622", "59812", "59820", "59821", "59830", "59840",
"59841", "59850", "59851", "59852", "59855", "59856", "59857", "59866", "59870", "59871",
"59897", "59898", "59899"
}
def rule_maternity_cpt_gender(row: Dict) -> Optional[RuleResult]:
if cpt(row) in MATERNITY_CPTS and str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rMatCPT", f"Maternity care CPT {cpt(row)} is not valid for Male gender.")
return None
def rule_maternity_cpt_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in MATERNITY_CPTS:
return None
age = patient_age(row)
if age is None:
return None
if age < 10 or age > 55:
return fail("rMatCPTAge", f"Maternity CPT {cpt(row)} not valid for patient age {age:.0f} (expected 10–55).")
return None
def rule_delivery_cpt_pos(row: Dict) -> Optional[RuleResult]:
delivery_cpts = {"59400", "59409", "59410", "59510", "59514", "59515", "59610", "59612", "59614", "59618", "59620", "59622"}
if cpt(row) not in delivery_cpts:
return None
pos = str(get(row, "POS", "")).strip()
valid_pos = {"21", "22", "23", "02"}
if pos and pos not in valid_pos:
return fail("rDeliveryPOS", f"Delivery CPT {cpt(row)} requires POS 21/22/23/02, got POS {pos}.")
return None
def rule_chemo_requires_cancer_dx(row: Dict) -> Optional[RuleResult]:
chemo_cpts = {
"96401", "96402", "96405", "96406", "96409", "96411", "96413", "96415", "96416", "96417",
"96420", "96422", "96423", "96425", "96440", "96446", "96450", "96521", "96522", "96523",
"96542", "96549",
}
if cpt(row) not in chemo_cpts:
return None
for d in dx_list(row):
# C00-C96 malignant; D00-D09 in situ; D37-D48 uncertain behavior
if d.startswith("C") or d.startswith(("D0", "D37", "D38", "D39", "D4")):
return None
return fail("rChemoNoCancer", f"Chemotherapy CPT {cpt(row)} requires a neoplasm diagnosis (C-code or D00–D48).")
def rule_dialysis_requires_renal_dx(row: Dict) -> Optional[RuleResult]:
dialysis_cpts = {
"90935", "90937", "90940", "90945", "90947",
"90951", "90952", "90953", "90954", "90955", "90956", "90957", "90958", "90959", "90960",
"90961", "90962", "90963", "90964", "90965", "90966", "90967", "90968", "90969", "90970",
"90989", "90993", "90997", "90999",
}
if cpt(row) not in dialysis_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith("N18") or d == "Z992":
return None
return fail("rDialysisNoRenal", f"Dialysis CPT {cpt(row)} requires renal diagnosis (N18.x or Z99.2).")
def rule_cardiac_rehab_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"93797", "93798", "G0422", "G0423"}:
return None
# I20 angina, I21 MI, I22 subsequent MI, I25 CAD, I50 heart failure, Z95.1 CABG hx, Z95.5 PCI hx
valid_prefixes = ("I20", "I21", "I22", "I25", "I50", "Z951", "Z955")
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(valid_prefixes):
return None
return fail("rCardRehabNoCardiac", f"Cardiac rehab CPT {cpt(row)} requires cardiac diagnosis (I20–I25, I50, or Z95.1/Z95.5).")
def rule_awv_medicare_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"G0438", "G0439"}:
return None
age = patient_age(row)
if age is None or age >= 65:
return None
return fail("rAWVAge", f"AWV CPT {cpt(row)} requires Medicare-eligible age (65+), got age {age:.0f}.")
def rule_screening_colonoscopy_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"G0121", "45378", "45380", "45385"}:
return None
dx_flat = [x.replace(".", "") for x in dx_list(row)]
is_screening = any(d.startswith("Z1211") or d.startswith("Z8671") for d in dx_flat)
if not is_screening:
return None
age = patient_age(row)
if age is None or age >= 45:
return None
return fail("rColonScreenAge", f"Screening colonoscopy not recommended before age 45 (USPSTF), got age {age:.0f}.")
def rule_inpatient_only_cpt_office(row: Dict) -> Optional[RuleResult]:
# Sample of CMS Medicare Inpatient-Only (IPO) list — must be inpatient hospital
inpatient_only = {
"33533", "33534", "33535", "33536", # CABG
"27130", "27132", "27134", "27137", "27138", # Hip arthroplasty
"27447", "27486", "27487", # Knee arthroplasty
"43644", "43645", "43770", "43775", # Bariatric
"23470", "23472", # Shoulder arthroplasty
"61510", "61512", "61518", "61519", "61521", # Craniotomy
"63081", "63082", "63085", "63086", # Vertebral corpectomy
}
if cpt(row) not in inpatient_only:
return None
pos = str(get(row, "POS", "")).strip()
if pos == "11":
return fail("rIPOatOffice", f"CPT {cpt(row)} is on Medicare Inpatient-Only list — cannot be billed at POS 11 (office).")
return None
def rule_prostate_cpt_gender(row: Dict) -> Optional[RuleResult]:
prostate_cpts = {
"55700", "55705", "55706",
"55801", "55810", "55812", "55815",
"55821", "55831", "55840", "55842", "55845",
"55866", "55873", "55875",
}
if cpt(row) not in prostate_cpts:
return None
gender = str(get(row, "Gender", "")).strip().lower()
if gender == "female":
return fail("rProstateFemale", f"Prostate CPT {cpt(row)} is not valid for Female gender.")
return None
def rule_pulmonary_rehab_requires_pulmonary_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"94625", "94626", "G0237", "G0238", "G0239", "G0424"}:
return None
dx_flat = [x.replace(".", "") for x in dx_list(row)]
for d in dx_flat:
# J40-J47 COPD/chronic bronchitis/emphysema, J84 interstitial lung, J96 respiratory failure, J68 chemical injury
if d.startswith(("J40","J41","J42","J43","J44","J45","J46","J47","J84","J96","J68")):
return None
return fail("rPulmRehabNoPulm", f"Pulmonary rehab CPT {cpt(row)} requires pulmonary DX (J40–J47, J84, or J96).")
def rule_cardiac_cath_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]:
cath_cpts = {"93451","93452","93453","93454","93455","93456","93457","93458","93459",
"93460","93461","93530","93531","93532","93533"}
if cpt(row) not in cath_cpts:
return None
dx_flat = [x.replace(".", "") for x in dx_list(row)]
for d in dx_flat:
# I20-I25 ischemic heart, I42-I43 cardiomyopathy, I48 afib, I49 arrhythmia, I50 CHF, R07 chest pain, R00 palpitations
if d.startswith(("I20","I21","I22","I23","I24","I25","I42","I43","I48","I49","I50","R07","R000","R001")):
return None
return fail("rCardCathNoCardiac", f"Cardiac cath CPT {cpt(row)} requires cardiac DX (I20–I25, I42–I50, or R07/R00).")
def rule_radiation_oncology_requires_cancer(row: Dict) -> Optional[RuleResult]:
rad_cpts = {"77261","77262","77263","77280","77285","77290","77293","77295",
"77299","77300","77301","77306","77307","77316","77317","77318",
"77321","77331","77332","77333","77334","77336","77338",
"77370","77371","77372","77373","77385","77386","77387",
"77401","77402","77407","77412","77417","77427",
"77520","77522","77523","77525"}
if cpt(row) not in rad_cpts:
return None
for d in dx_list(row):
if d.startswith("C") or d.startswith(("D0","D37","D38","D39","D4")):
return None
return fail("rRadOncNoCancer", f"Radiation oncology CPT {cpt(row)} requires neoplasm DX (C-code or D00–D48).")
def rule_pt_requires_msk_neuro_dx(row: Dict) -> Optional[RuleResult]:
pt_cpts = {"97110","97112","97116","97124","97140","97150","97530","97535","97537","97542","97545","97546"}
if cpt(row) not in pt_cpts:
return None
dx_flat = [x.replace(".", "") for x in dx_list(row)]
for d in dx_flat:
# M-codes MSK, G8x palsies, S-codes injuries, Z47/Z48/Z51 aftercare, R26 gait/mobility
if d.startswith(("M","S","G8","Z47","Z48","Z51","R26")):
return None
return fail("rPTNoMSK", f"PT CPT {cpt(row)} requires MSK (M-code), neuro (G8x), injury (S-code), or aftercare (Z47/Z48/Z51) DX.")
def rule_cataract_surgery_requires_cataract_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"66982","66983","66984","66987","66988"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("H25","H26","H28")):
return None
return fail("rCataractNoDx", f"Cataract surgery CPT {cpt(row)} requires cataract DX (H25–H28).")
def rule_joint_injection_requires_arthritis_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"20600","20604","20605","20606","20610","20611"}:
return None
dx_flat = [x.replace(".", "") for x in dx_list(row)]
for d in dx_flat:
# M00-M25 arthropathies, M60-M79 soft tissue disorders, S-codes injury
if d.startswith(("M0","M1","M2","M6","M7","S")):
return None
return fail("rJointInjNoDx", f"Joint injection CPT {cpt(row)} requires joint/soft-tissue/injury DX (M0x–M2x, M6x–M7x, S-codes).")
def rule_spine_surgery_requires_spine_dx(row: Dict) -> Optional[RuleResult]:
spine_cpts = {"22551","22552","22554","22556","22558","22590","22595","22612",
"22614","22630","22632","22633","22634",
"63020","63030","63042","63045","63047","63048",
"63056","63075","63081","63082","63085","63086"}
if cpt(row) not in spine_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("M40","M41","M42","M43","M45","M46","M47","M48","M50","M51","M53","M54","M96","S14","S24","S34")):
return None
return fail("rSpineSurgNoDx", f"Spine surgery CPT {cpt(row)} requires spine DX (M40–M54, M96, or S14/S24/S34).")
def rule_mammography_female(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"77063","77065","77066","77067"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rMammoMale", f"Mammography CPT {cpt(row)} is typically female-only; found Male gender.")
return None
def rule_hysterectomy_female(row: Dict) -> Optional[RuleResult]:
hyst_cpts = {"58150","58152","58180","58200","58210","58260","58262","58263","58267",
"58270","58275","58280","58285","58290","58291","58292","58293","58294",
"58541","58542","58543","58544","58548","58550","58552","58553","58554",
"58570","58571","58572","58573"}
if cpt(row) not in hyst_cpts:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rHystMale", f"Hysterectomy CPT {cpt(row)} is not valid for Male gender.")
return None
def rule_testicular_cpt_male(row: Dict) -> Optional[RuleResult]:
test_cpts = {"54500","54505","54512","54520","54522","54530","54535","54550","54560"}
if cpt(row) not in test_cpts:
return None
if str(get(row, "Gender", "")).strip().lower() == "female":
return fail("rTesticFemale", f"Testicular CPT {cpt(row)} is not valid for Female gender.")
return None
def rule_gallbladder_surgery_requires_dx(row: Dict) -> Optional[RuleResult]:
gb_cpts = {"47562","47563","47564","47600","47605","47610","47612","47620"}
if cpt(row) not in gb_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("K80","K81","K82","K83","C23")):
return None
return fail("rGallbladderNoDx", f"Gallbladder surgery CPT {cpt(row)} requires gallbladder DX (K80–K83 or C23).")
def rule_appendectomy_requires_dx(row: Dict) -> Optional[RuleResult]:
app_cpts = {"44950","44955","44960","44970"}
if cpt(row) not in app_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("K35","K36","K37","K38")) or d == "C181":
return None
return fail("rAppendNoDx", f"Appendectomy CPT {cpt(row)} requires appendicitis DX (K35–K38 or C18.1).")
def rule_tonsillectomy_requires_dx(row: Dict) -> Optional[RuleResult]:
tons_cpts = {"42820","42821","42825","42826","42830","42831","42835","42836"}
if cpt(row) not in tons_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("J02","J03","J35","J36")) or d == "G4733":
return None
return fail("rTonsilNoDx", f"Tonsillectomy CPT {cpt(row)} requires tonsil/adenoid/OSA DX (J02, J03, J35, J36, or G47.33).")
def rule_pediatric_preventive_age(row: Dict) -> Optional[RuleResult]:
code = cpt(row)
age_limits = {"99381":(0,1), "99382":(1,4), "99383":(5,11), "99384":(12,17),
"99391":(0,1), "99392":(1,4), "99393":(5,11), "99394":(12,17)}
if code not in age_limits:
return None
age = patient_age(row)
if age is None:
return None
lo, hi = age_limits[code]
if age < lo or age > hi:
return fail("rPedPrevAge", f"Preventive CPT {code} expects age {lo}{hi}, got age {age:.0f}.")
return None
def rule_adult_preventive_age(row: Dict) -> Optional[RuleResult]:
code = cpt(row)
age_limits = {"99385":(18,39), "99386":(40,64), "99387":(65,200),
"99395":(18,39), "99396":(40,64), "99397":(65,200)}
if code not in age_limits:
return None
age = patient_age(row)
if age is None:
return None
lo, hi = age_limits[code]
if age < lo or age > hi:
return fail("rAdultPrevAge", f"Preventive CPT {code} expects age {lo}{hi}, got age {age:.0f}.")
return None
def rule_ed_visit_requires_pos23(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99281","99282","99283","99284","99285"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos and pos != "23":
return fail("rEDVisitPOS", f"ED visit CPT {cpt(row)} requires POS 23 (emergency room), got POS {pos}.")
return None
def rule_office_em_inpatient_pos(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99202","99203","99204","99205","99212","99213","99214","99215"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos in {"21","31","32","51","61"}:
return fail("rOfficeEMInpt", f"Office E&M CPT {cpt(row)} not valid at inpatient POS {pos}; use inpatient visit codes (99221–99233).")
return None
def rule_bone_density_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"77080","77081","77085","77086"}:
return None
age = patient_age(row)
if age is None or age >= 50:
return None
return fail("rBoneDensAge", f"Bone density CPT {cpt(row)} typically not indicated before age 50, got age {age:.0f}.")
def rule_screening_mammo_requires_z1231(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"77063","77067"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith("Z1231"):
return None
return fail("rScreenMammoDx", f"Screening mammography CPT {cpt(row)} requires DX Z12.31 (screening encounter).")
def rule_wellness_visit_requires_z00(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99385","99386","99387","99395","99396","99397"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith("Z00"):
return None
return fail("rWellnessDx", f"Preventive wellness CPT {cpt(row)} requires Z00.xx encounter DX.")
# ---- Imaging medical-necessity ----
def rule_mri_brain_requires_neuro_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"70551","70552","70553","70554","70555"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# G-codes neuro, I60-I69 cerebrovascular, R40-R42/R51 head sx, S00/S06 head injury,
# C70-C72 brain tumors, F01-F09 dementia/cognitive, Z0183 neoplasm f/u
if (d.startswith("G") or d.startswith(("I6","R4","R51","S00","S06","C70","C71","C72","F0"))
or d == "Z0183"):
return None
return fail("rMRIBrainNoDx", f"MRI brain CPT {cpt(row)} requires neuro/head DX (G-code, I60–I69, R40–R51, S00/S06, C70–C72).")
def rule_ct_chest_requires_chest_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"71250","71260","71270","71271","71275"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# J-codes pulm, I-codes cardiac, C34 lung CA, R05/R06/R07/R09 chest sx, S27 thorax injury, Z87 hx
if (d.startswith(("J","I","C34","C33","R05","R06","R07","R09","S27","Z87"))):
return None
return fail("rCTChestNoDx", f"CT chest CPT {cpt(row)} requires chest DX (J/I codes, C33–C34, R05–R09, S27).")
def rule_ct_abdomen_requires_abdominal_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"74150","74160","74170","74176","74177","74178"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# K-codes digestive, N-codes GU, R10/R11/R14/R19 abd sx, C15-C26 GI CA, S30-S39 abd injury
if (d.startswith("K") or d.startswith("N") or d.startswith(("R10","R11","R14","R19","S3"))
or d.startswith(("C15","C16","C17","C18","C19","C20","C21","C22","C23","C24","C25","C26","C67"))):
return None
return fail("rCTAbdNoDx", f"CT abdomen CPT {cpt(row)} requires abdominal DX (K/N code, R10–R19, S30–S39, C15–C26).")
def rule_echo_requires_cardiac_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"93303","93304","93306","93307","93308","93312","93313","93314","93315","93316","93317","93320","93321"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith(("I","R07","R000","R001","R06","R55","Q2","Q3")):
return None
return fail("rEchoNoCardiac", f"Echocardiogram CPT {cpt(row)} requires cardiac DX (I-code, R00/R06/R07/R55, or Q2x/Q3x congenital).")
# ---- Lab medical-necessity ----
def rule_a1c_requires_diabetes_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"83036","83037"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# E08-E13 diabetes, R73 elevated glucose, Z131 diabetes screening, O24 gestational, Z830 FH diabetes
if d.startswith(("E08","E09","E10","E11","E12","E13","R73","Z131","O24","Z833")):
return None
return fail("rA1CNoDiab", f"HbA1c CPT {cpt(row)} requires diabetes/prediabetes DX (E08–E13, R73, Z13.1, O24).")
def rule_lipid_panel_requires_cv_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"80061"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# Cardiovascular risk / disease / screening indicators
if d.startswith(("I","E11","E10","E78","F17","Z13220","Z13","Z72","Z83","R7302","R794")):
return None
return fail("rLipidNoCV", f"Lipid panel CPT {cpt(row)} requires CV-risk/disease DX (I-code, E78, E10/E11, Z13.220, Z83.42, F17).")
def rule_psa_male(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"84152","84153","84154","G0103"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "female":
return fail("rPSAFemale", f"PSA CPT {cpt(row)} is not valid for Female gender.")
return None
def rule_pap_smear_female(row: Dict) -> Optional[RuleResult]:
pap_cpts = {"88141","88142","88143","88147","88148","88150","88152","88153","88154","88155",
"88164","88165","88166","88167","88174","88175","Q0091","G0123","G0124","G0141","G0143","G0144","G0145","G0147","G0148"}
if cpt(row) not in pap_cpts:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rPapMale", f"Pap smear CPT {cpt(row)} is not valid for Male gender.")
return None
# ---- DME medical-necessity ----
def rule_cpap_requires_osa_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"E0601","E0470","E0471","E0561","E0562","95810","95811"}:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
if d.startswith("G473"):
return None
return fail("rCPAPNoOSA", f"CPAP/BiPAP CPT {cpt(row)} requires sleep apnea DX (G47.3x).")
def rule_wheelchair_requires_mobility_dx(row: Dict) -> Optional[RuleResult]:
wc_cpts = {"K0001","K0002","K0003","K0004","K0005","K0006","K0007","K0009",
"E1050","E1060","E1070","E1083","E1084","E1085","E1086","E1087","E1088","E1089","E1090",
"E1100","E1110","E1130","E1140","E1150","E1160","E1161","E1170","E1180","E1190",
"E1200","E1220","E1230","E1231","E1232","E1233","E1234","E1235","E1236","E1237","E1238"}
if cpt(row) not in wc_cpts:
return None
for d in (x.replace(".", "") for x in dx_list(row)):
# MSK, neuro (G20/G35/G80-G83/I69 stroke), injury, R26 gait
if d.startswith(("M","S","G20","G35","G36","G80","G81","G82","G83","I69","R26","Z99")):
return None
return fail("rWheelchairNoDx", f"Wheelchair CPT {cpt(row)} requires mobility-impairment DX (M/S-code, G20/G35/G80–G83, I69, R26).")
# ---- Gender/anatomy specific ----
def rule_fetal_us_female(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"76801","76802","76805","76810","76811","76812","76813","76814","76815","76816","76817","76818","76819","76820","76821"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rFetalUSMale", f"Fetal ultrasound CPT {cpt(row)} is not valid for Male gender.")
return None
def rule_circumcision_male(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"54150","54160","54161","54162","54163","54164"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "female":
return fail("rCircFemale", f"Circumcision CPT {cpt(row)} is not valid for Female gender.")
return None
def rule_iud_female(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"58300","58301","J7296","J7297","J7298","J7300","J7301"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rIUDMale", f"IUD CPT {cpt(row)} is not valid for Male gender.")
return None
def rule_colposcopy_female(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"57420","57421","57452","57454","57455","57456","57460","57461"}:
return None
if str(get(row, "Gender", "")).strip().lower() == "male":
return fail("rColpoMale", f"Colposcopy CPT {cpt(row)} is not valid for Male gender.")
return None
# ---- POS-specific E&M ----
def rule_inpatient_em_requires_inpatient_pos(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99221","99222","99223","99231","99232","99233","99238","99239"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos and pos not in {"21","51","61"}:
return fail("rInptEMPos", f"Inpatient E&M CPT {cpt(row)} requires POS 21/51/61, got POS {pos}.")
return None
def rule_observation_em_requires_pos22(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99218","99219","99220","99224","99225","99226","99234","99235","99236"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos and pos != "22":
return fail("rObsEMPos", f"Observation E&M CPT {cpt(row)} requires POS 22, got POS {pos}.")
return None
def rule_nf_em_requires_pos_31_32(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99304","99305","99306","99307","99308","99309","99310","99315","99316"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos and pos not in {"31","32"}:
return fail("rNFEMPos", f"Nursing facility E&M CPT {cpt(row)} requires POS 31/32, got POS {pos}.")
return None
def rule_home_visit_requires_pos12(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"99341","99342","99343","99344","99345","99347","99348","99349","99350"}:
return None
pos = str(get(row, "POS", "")).strip()
if pos and pos != "12":
return fail("rHomeVisitPos", f"Home visit CPT {cpt(row)} requires POS 12, got POS {pos}.")
return None
# ---- Age/gender-gated screenings ----
def rule_aaa_screening_male_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"76706","G0389"}:
return None
gender = str(get(row, "Gender", "")).strip().lower()
if gender == "female":
return fail("rAAAScreenFemale", f"AAA screening CPT {cpt(row)} is Medicare-covered for male patients only.")
age = patient_age(row)
if age is None:
return None
if age < 65 or age > 75:
return fail("rAAAScreenAge", f"AAA screening CPT {cpt(row)} covered only at age 65–75, got age {age:.0f}.")
return None
def rule_lung_ct_screening_age(row: Dict) -> Optional[RuleResult]:
if cpt(row) not in {"71271","G0296","G0297"}:
return None
age = patient_age(row)
if age is None:
return None
if age < 50 or age > 77:
return fail("rLungCTScreenAge", f"Lung CT screening CPT {cpt(row)} covered only at age 50–77, got age {age:.0f}.")
return None
def rule_q4169_supervision(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "Q4169":
return fail("rCPTCodeQ4169", "Supervision review mandatory for CPT Q4169.")
return None
def rule_medicare_forbidden_cpt(row: Dict) -> Optional[RuleResult]:
forbidden = {"90471", "99417", "H0004", "1152F", "3281F"}
if cpt(row) in forbidden and is_medicare(row):
return fail("rNoMedicareCPT", f"Medicare does not cover CPT {cpt(row)}.")
return None
def rule_g0444_unbundle(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0444":
return fail("rcptG0438&G0444", "G0444 has an unbundle relationship with G0438.")
return None
def rule_0000_nonbillable(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "0000" or any(d == "0000" for d in dx_list(row)):
return fail("rCPTDX0000", "CPT or DX 0000 is non-billable.")
return None
def rule_g2211_modifier25(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G2211" and has_modifier(row, "25"):
return fail("rG2211cpt25mf", "G2211 is not payable when reported with modifier 25.")
return None
def rule_g2011_modifier(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G2011" and any(m for m in get_modifiers(row) if m):
return fail("rCPTCodeG2011", "Modifier is inappropriate with CPT G2011.")
return None
def rule_smoking_cessation_requires_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) in ("99406", "99407") and not dx_list(row):
return fail("rSCCDVdxcode", f"CPT {cpt(row)} requires at least one valid diagnosis code.")
return None
def rule_q4221_units(row: Dict) -> Optional[RuleResult]:
units = to_float(get(row, "ServiceUnits", 1), 1)
if cpt(row) == "Q4221" and units > 52:
return fail("rcptQ4221", f"Max allowable units for Q4221 is 52. Current: {units:.0f}")
return None
def rule_q4250_units(row: Dict) -> Optional[RuleResult]:
units = to_float(get(row, "ServiceUnits", 1), 1)
if cpt(row) == "Q4250" and units > 24:
return fail("rcptQ4250", f"Max allowable units for Q4250 is 24. Current: {units:.0f}")
return None
def rule_95004_min_units(row: Dict) -> Optional[RuleResult]:
units = to_float(get(row, "ServiceUnits", 1), 1)
if cpt(row) == "95004" and units < 70:
return fail("r95004Unit<70", f"Less than 70 units not allowed for CPT 95004. Current: {units:.0f}")
return None
def rule_95165_min_units(row: Dict) -> Optional[RuleResult]:
units = to_float(get(row, "ServiceUnits", 1), 1)
if cpt(row) == "95165" and units < 10:
return fail("r95165Unit<10", f"Less than 10 units not allowed for CPT 95165. Current: {units:.0f}")
return None
def rule_zero_units(row: Dict) -> Optional[RuleResult]:
if to_float(get(row, "ServiceUnits", 1), 1) == 0:
return fail("r0Units", "0 units is not allowed.")
return None
def rule_33340_primary_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) != "33340":
return None
dxs = dx_list(row)
primary_dx = dxs[0].replace(".", "") if dxs else ""
valid = {"I480", "I4811", "I4819", "I4821"}
if primary_dx not in valid:
return fail("r33340dx1", "For 33340, primary DX should be I48.0, I48.11, I48.19, or I48.21.")
return None
def rule_33340_pos(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "33340" and str(get(row, "POS", "")).strip() not in ("", "21"):
return fail("r33340POS21", f"For 33340, POS should be 21. Current POS: {get(row, 'POS', '')}")
return None
def rule_rpm_checkbox(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"99453", "99454", "99457", "99458"}:
is_rpm = str(get(row, "ISRPM", "0")).strip().lower()
if is_rpm not in ("1", "true", "yes"):
return fail("rRPMchk", f"RPM box must be checked for RPM CPT {cpt(row)}.")
return None
def rule_q_series_note(row: Dict) -> Optional[RuleResult]:
code = cpt(row)
if code.startswith("Q") and not str(get(row, "NoteText", "")).strip():
return fail("rQCPTCode", "Claim note text is mandatory for Q-series CPT code.")
return None
def rule_colon_in_note(row: Dict) -> Optional[RuleResult]:
if ":" in str(get(row, "NoteText", "")).strip():
return fail("rColonNotAllowed", "Colon character is not allowed in NoteText.")
return None
def rule_wellmed_g0508(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0508" and "wellmed" in str(get(row, "ClaimPriIns", "")).lower():
return fail("rWellmedInsObjection", "Wellmed insurance does not accept CPT G0508.")
return None
def rule_prolong_service(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"99417", "G2212", "G0318", "G0317", "G0316", "99418"}:
return fail("rProlongService", f"Prolong service code {cpt(row)} requires a high level E&M pairing.")
return None
def rule_99490_not_bill(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99490":
return fail("rcpt99490", "Per provider instruction, do not bill CCM service 99490.")
return None
def rule_99024_nonreplacement(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99024":
return fail("r99024", "99024 is non-replacement CPT without provider instructions.")
return None
def rule_99498_add_on(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99498":
return fail("rcpt99498", "99498 is an add-on code and must be billed with 99497.")
return None
def rule_vaccine_admin_required(row: Dict) -> Optional[RuleResult]:
vax_combos = {
"91300": ["0001A", "0002A"],
"91301": ["0011A", "0012A"],
"91302": ["0021A", "0022A"],
"91303": ["0031A"],
}
code = cpt(row)
if code in vax_combos:
return fail(f"rAdmin{code}", f"Administration code {'/'.join(vax_combos[code])} is mandatory for CPT {code}.")
return None
def rule_90653_admin_required(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "90653":
return fail("rAdmcode90653", "Administration code G0008 or 90471 is mandatory for CPT 90653.")
return None
def rule_self_pay_with_active_insurance(row: Dict) -> Optional[RuleResult]:
if str(get(row, "BillAs", "")).lower() == "self-pay" and get(row, "ClaimPriIns"):
return fail("rInsAct", f"Patient has active insurance {get(row, 'ClaimPriIns')}. Do not bill as self-pay.")
return None
def rule_g9719_required_dx(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G9719":
required = {"F319", "F329", "F339"}
if not any(d.replace(".", "") in required for d in dx_list(row)):
return fail("rCPTG9719dx", "Any one of F31.9, F32.9, or F33.9 is mandatory with CPT G9719.")
return None
def rule_mips_99221_99223(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if cpt(row) in {"99221", "99222", "99223"} and age is not None and age >= 65:
return fail("rMIPS1", "MIPS code from QID#47 and G8427 is mandatory for CPT 99221-99223 when patient is 65+.")
return None
def rule_mips_99236(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "99236":
return fail("rMIPS2", "MIPS code from QID#128 is mandatory for CPT 99236.")
return None
def rule_mips_99304_99306(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if cpt(row) in {"99304", "99305", "99306"} and age is not None and age >= 65:
return fail("rMIPS3", "MIPS codes from QID#47 and QID#181 are mandatory for CPT 99304-99306 when patient is 65+.")
return None
def rule_mips_99307_99310(row: Dict) -> Optional[RuleResult]:
age = patient_age(row)
if cpt(row) in {"99307", "99308", "99309", "99310"} and age is not None and age >= 65:
return fail("rMIPS4", f"G8427 is mandatory for CPT {cpt(row)} when patient is 65+.")
return None
def rule_comb1(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"G8733", "G8734", "G8535"}:
return fail("rCOMB1", "CPT G8733, G8734, and G8535 are not acceptable together.")
return None
def rule_comb3(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"G8433", "G8431", "G8510"}:
return fail("rCOMB3", "CPT G8433, G8431, and G8510 are not acceptable together.")
return None
def rule_comb4(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"G8420", "G8417", "G8418", "G9716"}:
return fail("rCOMB4", "CPT G8420, G8417, G8418, and G9716 are not acceptable together.")
return None
def rule_comb2(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"1123F", "1124F"}:
return fail("rCOMB2", "CPT 1123F and 1124F are not acceptable together.")
return None
def rule_medicare_uhc_use_g0447(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"99401", "99402", "99403"}:
ins = str(get(row, "ClaimPriIns", "")).lower()
if any(x in ins for x in ("medicare", "uhc", "united", "wellmed")):
return fail("rcptG0447", f"For Medicare or UHC, use G0447 instead of {cpt(row)}.")
return None
def rule_99441_99443_pos11(row: Dict) -> Optional[RuleResult]:
pos = str(get(row, "POS", "")).strip()
if cpt(row) in {"99441", "99442", "99443"} and pos not in ("", "11"):
return fail("rPOS11SpecCPT", "POS 11 is only acceptable for CPT codes 99441-99443.")
return None
def rule_99354_99359_not_pos11(row: Dict) -> Optional[RuleResult]:
pos = str(get(row, "POS", "")).strip()
if cpt(row) in {"99354", "99355", "99358", "99359"} and pos == "11":
return fail("rCptPos11", f"CPT {cpt(row)} is not billable for office services POS 11.")
return None
def rule_0034a_date(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if cpt(row) == "0034A" and dos and dos < date(2021, 10, 20):
return fail("rCPT0034A", "CPT 0034A is effective from 2021-10-20.")
return None
def rule_0071a_0072a_date(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if cpt(row) in {"0071A", "0072A"} and dos and dos < date(2021, 10, 29):
return fail("rCPT0071A", f"CPT {cpt(row)} is effective from 2021-10-29.")
return None
def rule_g0438_use_99203(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "G0438":
return fail("rCPT99203", "Please use bill code 99203 instead of G0438.")
return None
def rule_99470_99457_99458(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"99470", "99457", "99458"}:
return fail("rCPTCode99470&99457/99458", "99470 and 99457/99458 are not billable on the same DOS.")
return None
def rule_99454_99445(row: Dict) -> Optional[RuleResult]:
if cpt(row) in {"99454", "99445"}:
return fail("rCPTCode99454&99445", "CPT 99454 and 99445 must not be billed for same patient on same DOS.")
return None
def rule_office_cpt_pos(row: Dict) -> Optional[RuleResult]:
office_cpts = {"99201", "99202", "99203", "99204", "99205", "99211", "99212", "99213", "99214", "99215"}
pos = str(get(row, "POS", "")).strip()
if cpt(row) in office_cpts and pos not in ("", "11", "02"):
return fail("rCPTCode=2", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 11 or 02.")
return None
def rule_inpatient_cpt_pos(row: Dict) -> Optional[RuleResult]:
inpatient_cpts = {"99221", "99222", "99223", "99231", "99232", "99233", "99238", "99239"}
pos = str(get(row, "POS", "")).strip()
if cpt(row) in inpatient_cpts and pos not in ("", "21", "61"):
return fail("rCPTCode=3", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 21 or 61.")
return None
def rule_snf_cpt_pos(row: Dict) -> Optional[RuleResult]:
snf_cpts = {"99304", "99305", "99306", "99307", "99308", "99309", "99310", "99315", "99316"}
pos = str(get(row, "POS", "")).strip()
if cpt(row) in snf_cpts and pos not in ("", "31", "32", "02", "13"):
return fail("rCPTCode=4", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 31, 32, 02, or 13.")
return None
def rule_observation_cpt_pos(row: Dict) -> Optional[RuleResult]:
pos = str(get(row, "POS", "")).strip()
if cpt(row) in {"99217", "99218", "99219", "99220"} and pos not in ("", "22"):
return fail("rCPTCode=5", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 22.")
return None
def rule_er_cpt_pos(row: Dict) -> Optional[RuleResult]:
pos = str(get(row, "POS", "")).strip()
if cpt(row) in {"99281", "99282", "99283", "99284", "99285", "99286", "99287", "99288"} and pos not in ("", "23"):
return fail("rCPTCode=6", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 23.")
return None
def rule_home_visit_cpt_pos(row: Dict) -> Optional[RuleResult]:
pos = str(get(row, "POS", "")).strip()
if cpt(row) in {"99341", "99342", "99343", "99344", "99345", "99347", "99348", "99349", "99350"} and pos not in ("", "02", "10", "12", "13", "14"):
return fail("rCPTCode=7", f"CPT {cpt(row)} is not valid for POS {pos}. Requires POS 02, 10, 12, 13, or 14.")
return None
def rule_admission_date_empty_for_office(row: Dict) -> Optional[RuleResult]:
if str(get(row, "POS", "")).strip() == "11" and adm_date(row):
return fail("rAdmissionDate=1", "Admission date must be empty for office visit POS 11.")
return None
def rule_admission_date_required_inpatient(row: Dict) -> Optional[RuleResult]:
if str(get(row, "POS", "")).strip() == "21" and not adm_date(row):
return fail("rAdmissionDate=2", "Admission date must be entered for inpatient POS 21.")
return None
def rule_discharge_not_before_admission(row: Dict) -> Optional[RuleResult]:
adm = adm_date(row)
dis = dis_date(row)
if adm and dis and dis < adm:
return fail("rAdmissionDate3", f"Discharge date {dis} should not be earlier than admission date {adm}.")
return None
def rule_future_admission_date(row: Dict) -> Optional[RuleResult]:
adm = adm_date(row)
if adm and adm > date.today():
return fail("rAdmComp", "Future admission date is not allowed.")
return None
def rule_future_dos(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if dos and dos > date.today():
return fail("rnotsubclm", f"Claim must not be submitted with future DOS {dos}.")
return None
def rule_january_2020_dos(row: Dict) -> Optional[RuleResult]:
dos = dos_date(row)
if dos and dos.year == 2020 and dos.month == 1:
return fail("rDOSJan20", "Claim is not valid for DOS in January 2020.")
return None
def rule_dos_required(row: Dict) -> Optional[RuleResult]:
if not dos_date(row):
return fail("rDOSMissed", "DOS is mandatory field.")
return None
def rule_dummy_dos(row: Dict) -> Optional[RuleResult]:
if "1900" in str(get(row, "DOS", "")):
return fail("rDummyDOS", "Dummy DOS entered in claim.")
return None
def rule_charges_positive(row: Dict) -> Optional[RuleResult]:
if to_float(get(row, "TotalCharges", 0), 0) <= 0:
return fail("rCharges", "CPT charges must be greater than 0.")
return None
def rule_charges_max(row: Dict) -> Optional[RuleResult]:
charges = to_float(get(row, "TotalCharges", 0), 0)
if charges > 99999:
return fail("r99Kcharges", f"Charges cannot exceed 99,999. Current: {charges:,.2f}")
return None
def rule_patient_payment_not_exceed_charges(row: Dict) -> Optional[RuleResult]:
charges = to_float(get(row, "TotalCharges", 0), 0)
pat_pmt = to_float(get(row, "PatientPayment", 0), 0)
if pat_pmt > 0 and charges < pat_pmt:
return fail("rChgPatPmt", f"Claim charges {charges:,.2f} cannot be less than patient payment {pat_pmt:,.2f}.")
return None
def rule_referring_required_for_medicare(row: Dict) -> Optional[RuleResult]:
if is_medicare(row) and not str(get(row, "ReferringPhysician", "")).strip():
return fail("rRefferingPhysician", "Referring physician is mandatory for Medicare claims.")
return None
def rule_referral_requires_referring(row: Dict) -> Optional[RuleResult]:
ref_num = str(get(row, "ReferralNumber", "")).strip()
if ref_num and ref_num != "0" and not str(get(row, "ReferringPhysician", "")).strip():
return fail("rReferralNo", "When referral number is sent then referring provider is mandatory.")
return None
def rule_attending_required(row: Dict) -> Optional[RuleResult]:
if not str(get(row, "AttendingPhysician", "")).strip():
return fail("rAttendingPhysician", "Attending Physician is mandatory field.")
return None
def rule_billing_required(row: Dict) -> Optional[RuleResult]:
if not str(get(row, "BillingPhysician", "")).strip():
return fail("rBillingPhysician", "Billing Physician is mandatory field.")
return None
def rule_medicare_mbi_format(row: Dict) -> Optional[RuleResult]:
if not is_medicare(row):
return None
mbi = str(get(row, "PolicyNumber", "")).strip().replace("-", "").replace(" ", "")
if mbi and len(mbi) == 11:
valid_mbi = True
letter_pos = {1, 4, 7, 8}
number_pos = {0, 3, 6, 9, 10}
for i, ch in enumerate(mbi):
if i in letter_pos and not ch.isalpha():
valid_mbi = False
if i in number_pos and not ch.isdigit():
valid_mbi = False
if not valid_mbi:
return fail("rPolicyNumber", f"Patient MBI format is invalid: {mbi}")
return None
def rule_dummy_dob(row: Dict) -> Optional[RuleResult]:
if "1900" in str(get(row, "DateOfBirth", "")):
return fail("rDummyDOB", "Dummy DOB entered in patient.")
return None
def rule_dob_required(row: Dict) -> Optional[RuleResult]:
if not get(row, "DateOfBirth"):
return fail("rDobMis", "DOB is Mandatory field.")
return None
def rule_gender_required(row: Dict) -> Optional[RuleResult]:
gender = str(get(row, "Gender", "")).strip().lower()
if not gender or gender in ("unknown", "none"):
return fail("rGender", "Subscriber gender code is mandatory.")
return None
def rule_insurance_required(row: Dict) -> Optional[RuleResult]:
if not get(row, "ClaimPriIns"):
return fail("rInsRequired", "Insurance Required. Please select valid insurance.")
return None
def rule_molina_9_digit_id(row: Dict) -> Optional[RuleResult]:
pol_num = str(get(row, "PolicyNumber", "")).strip().replace("-", "")
if len(pol_num) == 9 and pol_num.isdigit():
ins = str(get(row, "ClaimPriIns", "")).lower()
if "medicaid" in ins and "molina" not in ins:
return fail("rIns276PolicyNumberequal9", "Please select Molina Medicaid because member ID has 9 digits.")
return None
def rule_aarp_not_primary(row: Dict) -> Optional[RuleResult]:
ins_name = str(get(row, "ClaimPriIns", "")).lower()
if str(get(row, "BillAs", "")).lower() == "primary" and any(x in ins_name for x in ("aarp", "trycare", "tricare for life")):
return fail("rAARPTryCare", f"Insurance {get(row, 'ClaimPriIns')} can never be primary payer.")
return None
def rule_workers_comp_accident_date(row: Dict) -> Optional[RuleResult]:
ins_lower = str(get(row, "ClaimPriIns", "")).lower()
if any(x in ins_lower for x in ("worker", "workers comp", "wc ")) and not get(row, "AccidentDate"):
return fail("WCOMClaims", "Accident date is necessary for worker compensation claims.")
return None
def rule_11042_requires_pa(row: Dict) -> Optional[RuleResult]:
if cpt(row) == "11042" and str(get(row, "BillAs", "")).lower() == "primary" and not get(row, "PANumber"):
return fail("rPACPT11042", "Pre-authorization is missing for wound care 11042.")
return None
# ============================================================
# ICD-10 → CPT Mapping Rules
# Common diagnosis codes with their medically appropriate CPT
# procedure codes for denial prevention.
# ============================================================
# ICD-10 primary diagnosis → acceptable CPT codes
ICD_CPT_MAP: Dict[str, set] = {
# --- Diabetes ---
"E11.9": {"99213","99214","99215","99490","99491","83036","82947","81003","G0447"},
"E11.65": {"99213","99214","99215","83036","82947","80048","99490"},
"E11.40": {"99213","99214","99215","95905","95907","95908","95913","64490","64491"},
"E11.311": {"92004","92012","92014","92250","92228"},
"E10.9": {"99213","99214","99215","83036","82947","95249","99490"},
# --- Hypertension ---
"I10": {"99213","99214","99215","93000","80048","80053","81003","99490","93786","93788"},
"I11.9": {"99213","99214","99215","93000","93306","93320","93325","80053"},
"I12.9": {"99213","99214","99215","80053","82565","81001","82042"},
# --- Heart Disease ---
"I25.10": {"99213","99214","99215","93000","93306","93307","93350","93351","78452","93015"},
"I20.9": {"99213","99214","99215","93000","93015","93306","93350","78452"},
"I50.9": {"99213","99214","99215","93306","93307","93320","71046","80053","83880"},
"I48.91": {"99213","99214","99215","93000","93306","93268","93241","93243","93650"},
# --- Respiratory ---
"J06.9": {"99212","99213","99214","87880","87804"},
"J18.9": {"99213","99214","99215","71045","71046","87046","87081","71250"},
"J44.1": {"99213","99214","99215","94010","94060","71046","94640","94726"},
"J44.0": {"99213","99214","99215","94010","71046","87046"},
"J45.901": {"99213","99214","99215","94010","94060","94640","94664","71046"},
# --- Musculoskeletal ---
"M54.5": {"99213","99214","72100","72110","72148","97110","97530","64483","64484","63030","63047"},
"M54.2": {"99213","99214","72040","72050","72141","72142","97110","97530","64490","64491"},
"M17.11": {"99213","99214","73560","73562","73721","20610","27447","97110"},
"M16.11": {"99213","99214","73502","73521","73721","27130","20610"},
"M25.511": {"99213","99214","73010","73030","73221","20610","29827","97110"},
# --- Mental Health ---
"F32.1": {"99213","99214","90832","90834","90837","96127","G2212"},
"F41.1": {"99213","99214","90832","90834","90837","96127"},
"F31.9": {"99213","99214","99215","90837","90847","80305","80306"},
"F20.9": {"99213","99214","99215","90837","90853","80305"},
"F10.20": {"99213","99214","90832","90837","G2067","G2068"},
"F11.20": {"99213","99214","99215","90837","G2067","G2068","G2069"},
# --- Gastrointestinal ---
"K21.0": {"99213","99214","43239","43235","91034"},
"K57.30": {"99213","99214","45378","45380","74177"},
"K92.1": {"99213","99214","99215","43235","43239","45378","45380","74177"},
"K72.10": {"99214","99215","80076","80053","76700","74177"},
"K74.60": {"99214","99215","80076","80053","76700","43244"},
# --- Cancer ---
"C34.10": {"99213","99214","99215","71250","78816","32663","96413","96415","77263","77301"},
"C50.911": {"99213","99214","99215","77067","76641","78816","19307","96413","96415","77263"},
"C18.9": {"99214","99215","45378","45384","74177","78816","96413","96415"},
"C61": {"99213","99214","99215","84153","76872","55866","96413","77263"},
"C64.1": {"99214","99215","74177","78816","50543","96413"},
# --- Infectious Disease ---
"A41.9": {"99221","99222","99223","99231","99232","99233","87046","87081","36415","80053","85025"},
"B20": {"99213","99214","99215","86703","87536","86360","80305"},
"A09": {"99212","99213","99214","87046","87081","82270"},
# --- Neurological ---
"G43.909": {"99213","99214","70553","95923","64615"},
"G35": {"99213","99214","99215","70553","95925","95926","96413"},
"G20": {"99213","99214","99215","70553","97110","97530","95923"},
"G47.33": {"99213","99214","95810","95811","94660","E0601"},
"G89.29": {"99213","99214","64483","64484","64490","64491","62322","62323","97110"},
# --- Genitourinary ---
"N39.0": {"99212","99213","81001","81003","87086"},
"N18.3": {"99213","99214","80053","82565","81001","82042"},
"N18.6": {"99213","99214","99215","90935","90937","90945","90947","80053"},
"N40.1": {"99213","99214","52601","52441","76872","84153"},
# --- Endocrine ---
"E03.9": {"99213","99214","84439","84480","84481","76536"},
"E05.90": {"99213","99214","84439","84443","76536","78012"},
"E66.01": {"99213","99214","99215","G0447","43644","43770","97802","97803"},
"E78.5": {"99213","99214","80061","82465","83718","84478"},
# --- Preventive / Wellness ---
"Z00.00": {"99395","99396","99397","G0438","G0439","85025","80053","83036","80061","93000"},
"Z12.31": {"45378","45380","G0121","82274"},
"Z12.11": {"77066","77067","G0202"},
"Z23": {"90471","90472","90686","90714","90732"},
# --- OB/GYN ---
"Z34.00": {"99213","99214","76801","80055","86703","85025"},
"N87.1": {"99213","99214","57460","57455","88141"},
# --- Injury ---
"S72.001A":{"99221","99222","99223","27235","27236","73502"},
"S52.501A":{"99213","99214","25600","25605","73100","29125"},
"S13.4XXA":{"99213","99214","72050","97110","97530"},
}
# CPT codes that require at least one matching ICD-10 on the claim
CPT_REQUIRES_DX: Dict[str, set] = {
"83036": {"E10.9","E11.9","E11.65","E10.10","R73.09","Z13.1"}, # HbA1c
"84153": {"C61","N40.0","N40.1","Z12.5"}, # PSA
"86703": {"B20","Z11.4","Z34.00","O09.90"}, # HIV 1/2 antibody
"80061": {"E78.5","E78.00","E78.2","I25.10","I10","Z00.00"}, # Lipid panel
"82565": {"N18.3","N18.4","N18.5","N18.6","I12.9","E11.65"}, # Creatinine
"70553": {"G35","G43.909","G20","C71.9","I63.9","G40.909"}, # MRI brain w/wo
"72148": {"M54.5","M51.16","M51.17","G89.29","S33.5XXA"}, # MRI lumbar
"72141": {"M54.2","M50.10","G89.29","S14.9XXA"}, # MRI cervical
"73721": {"M17.11","M17.12","M23.200","S83.9XXA"}, # MRI joint
"74177": {"K57.30","K72.10","K74.60","C18.9","C64.1","R10.9"}, # CT abdomen/pelvis
"78816": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90"}, # PET-CT
"71046": {"J18.9","J44.1","I50.9","R05.9","R06.09"}, # Chest X-ray
"93015": {"I25.10","I20.9","R07.9","I10"}, # Stress test
"93306": {"I50.9","I48.91","I25.10","I11.9","I34.0"}, # Complete echo
"78452": {"I25.10","I20.9","I48.91"}, # Nuclear stress
"27447": {"M17.11","M17.12","M17.31"}, # TKA
"27130": {"M16.11","M16.12","M16.31"}, # THA
"45378": {"K57.30","K92.1","Z12.31","C18.9","D12.5"}, # Colonoscopy
"43235": {"K21.0","K92.1","K29.70","D13.1"}, # EGD diagnostic
"63030": {"M51.16","M51.17","G89.29","M54.5"}, # Lumbar discectomy
"64483": {"M54.5","M51.16","G89.29"}, # Transforaminal ESI
"64490": {"M54.2","M50.10","G89.29"}, # Cervical facet inj
"90837": {"F32.1","F41.1","F31.9","F20.9","F10.20","F11.20"}, # Psychotherapy 60m
"95810": {"G47.33","G47.31","G47.30"}, # Polysomnography
"90935": {"N18.6","N17.9"}, # Hemodialysis
"43644": {"E66.01","E66.09"}, # Gastric bypass
"G0447": {"E66.01","E66.09","E66.1","Z68.35","Z68.36","Z68.37","Z68.38","Z68.39","Z68.41"},
"20610": {"M17.11","M17.12","M16.11","M25.511","M25.512","M25.311","M25.312"},
"94640": {"J44.1","J44.0","J45.901","J43.9"}, # Nebulizer
"96413": {"C34.10","C50.911","C18.9","C61","C64.1","C81.90","C82.90","C83.90"},
}
def rule_dx_cpt_mismatch(row: Dict) -> Optional[RuleResult]:
"""
ICD-10 → CPT direction:
If the primary diagnosis is in ICD_CPT_MAP, the billed CPT must appear
in the allowed set for that diagnosis.
"""
claim_cpt = cpt(row)
dxs = dx_list(row)
if not claim_cpt or not dxs:
return None
primary_dx = dxs[0].upper().strip()
if primary_dx in ICD_CPT_MAP:
allowed = ICD_CPT_MAP[primary_dx]
if claim_cpt not in allowed:
sample = ", ".join(sorted(allowed)[:6])
return fail(
"rDxCptMismatch",
f"CPT {claim_cpt} is not a recognized match for primary diagnosis "
f"{primary_dx}. Common expected codes: {sample} (and others).",
)
return None
def rule_cpt_requires_dx(row: Dict) -> Optional[RuleResult]:
"""
CPT → ICD-10 direction:
If the billed CPT is in CPT_REQUIRES_DX, at least one claim diagnosis
must match the required set.
"""
claim_cpt = cpt(row)
dxs = dx_list(row)
if not claim_cpt or claim_cpt not in CPT_REQUIRES_DX:
return None
required = CPT_REQUIRES_DX[claim_cpt]
claim_dx_set = {d.upper().strip() for d in dxs}
if not claim_dx_set & required:
sample = ", ".join(sorted(required)[:4])
return fail(
"rCptNoDx",
f"CPT {claim_cpt} requires at least one supporting diagnosis "
f"(e.g., {sample} …) but none found on the claim.",
)
return None
HARD_RULES: List[RuleFunc] = [
rule_rdxcode1,
rule_rdxcode_missing,
rule_perinatal_dx_age,
rule_pediatric_dx_age,
rule_maternity_dx_gender,
rule_female_only_dx_gender,
rule_external_cause_not_primary,
rule_z6830_not_primary,
rule_dx_g40009_g40909,
rule_dx_g4089_g40009,
rule_dx_f17200_z87891,
rule_dx_n183_date,
rule_dx_n1830_n1831_n1832_date,
rule_dx_m5459_r051_date,
rule_z01818_z01810_only_primary,
rule_wellmed_f_series_dx,
rule_90653_requires_z23,
rule_zip_valid,
rule_state_valid,
rule_city_valid,
rule_last_name_required,
rule_first_name_required,
rule_patient_name_special_chars,
rule_address_required,
rule_is_ptl,
rule_cpt_required,
rule_g0389_replaced,
rule_94620_replaced,
rule_g0436_replaced,
rule_g0437_replaced,
rule_q2037_replaced,
rule_92585_deleted,
rule_g8553_deleted,
rule_0031a_wrong_code_for_age,
rule_consult_code_medicare,
rule_g0008_insurance,
rule_90471_medicare,
rule_consult_requires_referring,
rule_25000_not_allowed,
rule_jcode_ndc,
rule_u0002_clia,
rule_lab_cpt_clia,
rule_high_level_99203,
rule_high_level_99213,
rule_newborn_cpt_age,
rule_gender_specific_cpt,
rule_maternity_cpt_gender,
rule_maternity_cpt_age,
rule_delivery_cpt_pos,
rule_chemo_requires_cancer_dx,
rule_dialysis_requires_renal_dx,
rule_cardiac_rehab_requires_cardiac_dx,
rule_awv_medicare_age,
rule_screening_colonoscopy_age,
rule_inpatient_only_cpt_office,
rule_prostate_cpt_gender,
rule_pulmonary_rehab_requires_pulmonary_dx,
rule_cardiac_cath_requires_cardiac_dx,
rule_radiation_oncology_requires_cancer,
rule_pt_requires_msk_neuro_dx,
rule_cataract_surgery_requires_cataract_dx,
rule_joint_injection_requires_arthritis_dx,
rule_spine_surgery_requires_spine_dx,
rule_mammography_female,
rule_hysterectomy_female,
rule_testicular_cpt_male,
rule_gallbladder_surgery_requires_dx,
rule_appendectomy_requires_dx,
rule_tonsillectomy_requires_dx,
rule_pediatric_preventive_age,
rule_adult_preventive_age,
rule_ed_visit_requires_pos23,
rule_office_em_inpatient_pos,
rule_bone_density_age,
rule_screening_mammo_requires_z1231,
rule_wellness_visit_requires_z00,
rule_mri_brain_requires_neuro_dx,
rule_ct_chest_requires_chest_dx,
rule_ct_abdomen_requires_abdominal_dx,
rule_echo_requires_cardiac_dx,
rule_a1c_requires_diabetes_dx,
rule_lipid_panel_requires_cv_dx,
rule_psa_male,
rule_pap_smear_female,
rule_cpap_requires_osa_dx,
rule_wheelchair_requires_mobility_dx,
rule_fetal_us_female,
rule_circumcision_male,
rule_iud_female,
rule_colposcopy_female,
rule_inpatient_em_requires_inpatient_pos,
rule_observation_em_requires_pos22,
rule_nf_em_requires_pos_31_32,
rule_home_visit_requires_pos12,
rule_aaa_screening_male_age,
rule_lung_ct_screening_age,
rule_q4169_supervision,
rule_medicare_forbidden_cpt,
rule_g0444_unbundle,
rule_0000_nonbillable,
rule_g2211_modifier25,
rule_g2011_modifier,
rule_smoking_cessation_requires_dx,
rule_q4221_units,
rule_q4250_units,
rule_95004_min_units,
rule_95165_min_units,
rule_zero_units,
rule_33340_primary_dx,
rule_33340_pos,
rule_rpm_checkbox,
rule_q_series_note,
rule_colon_in_note,
rule_wellmed_g0508,
rule_prolong_service,
rule_99490_not_bill,
rule_99024_nonreplacement,
rule_99498_add_on,
rule_vaccine_admin_required,
rule_90653_admin_required,
rule_self_pay_with_active_insurance,
rule_g9719_required_dx,
rule_mips_99221_99223,
rule_mips_99236,
rule_mips_99304_99306,
rule_mips_99307_99310,
rule_comb1,
rule_comb3,
rule_comb4,
rule_comb2,
rule_medicare_uhc_use_g0447,
rule_99441_99443_pos11,
rule_99354_99359_not_pos11,
rule_0034a_date,
rule_0071a_0072a_date,
rule_g0438_use_99203,
rule_99470_99457_99458,
rule_99454_99445,
rule_office_cpt_pos,
rule_inpatient_cpt_pos,
rule_snf_cpt_pos,
rule_observation_cpt_pos,
rule_er_cpt_pos,
rule_home_visit_cpt_pos,
rule_admission_date_empty_for_office,
rule_admission_date_required_inpatient,
rule_discharge_not_before_admission,
rule_future_admission_date,
rule_future_dos,
rule_january_2020_dos,
rule_dos_required,
rule_dummy_dos,
rule_charges_positive,
rule_charges_max,
rule_patient_payment_not_exceed_charges,
rule_referring_required_for_medicare,
rule_referral_requires_referring,
rule_attending_required,
rule_billing_required,
rule_medicare_mbi_format,
rule_dummy_dob,
rule_dob_required,
rule_gender_required,
rule_insurance_required,
rule_molina_9_digit_id,
rule_aarp_not_primary,
rule_workers_comp_accident_date,
rule_11042_requires_pa,
# ICD-10 ↔ CPT mapping rules
rule_dx_cpt_mismatch,
rule_cpt_requires_dx,
]
# ============================================================
# Rule engine
# ============================================================
def apply_hard_rules(row: Dict) -> List[RuleResult]:
broken: List[RuleResult] = []
broken.extend(collect_duplicate_dx_rules(row))
for rule in HARD_RULES:
result = rule(row)
if result and result.fired:
broken.append(result)
return broken
def evaluate_claim(row: Dict) -> Dict:
broken = apply_hard_rules(row)
return {
"ClaimId": get(row, "ClaimId"),
"PatientName": get(row, "PatientName"),
"CPTCode": get(row, "CPTCode"),
"DOS": get(row, "DOS"),
"Status": "REJECTED" if broken else "ACCEPTED",
"Rules_Broken": len(broken),
"Rule_Names": " | ".join(r.name for r in broken),
"Error_Messages": " | ".join(r.message for r in broken),
}
# ============================================================
# Runner
# ============================================================
def run_hard_rule_engine(input_file: str = "claims_to_check.xlsx", output_file: str = "Hard_Rule_Results.xlsx"):
print("=" * 60)
print(" HARD RULE ENGINE")
print("=" * 60)
try:
df = pd.read_excel(input_file, sheet_name="Claims", header=2)
df = df[df["ClaimId"].notna() & (df["ClaimId"].astype(str).str.strip() != "")].copy()
df = df.reset_index(drop=True)
print(f"Loaded {len(df)} claims from {input_file}")
except Exception as exc:
print(f"Error loading {input_file}: {exc}")
return None
results = []
for _, row in df.iterrows():
result = evaluate_claim(row.to_dict())
results.append(result)
icon = "ACCEPTED" if result["Status"] == "ACCEPTED" else f"REJECTED ({result['Rules_Broken']} rules)"
print(
f"Claim {str(result['ClaimId']):>8} | "
f"{str(result['PatientName']):<22} | "
f"CPT {str(result['CPTCode']):<8} | {icon}"
)
out_df = pd.DataFrame(results)
out_df.to_excel(output_file, index=False, sheet_name="Results")
total = len(out_df)
rejected = int((out_df["Status"] == "REJECTED").sum())
accepted = total - rejected
rejection_pct = (rejected / total * 100) if total else 0
print("-" * 60)
print(f"Total Claims : {total}")
print(f"Accepted : {accepted}")
print(f"Rejected : {rejected}")
print(f"Rejection % : {rejection_pct:.1f}%")
print(f"Saved results to: {output_file}")
return out_df
if __name__ == "__main__":
import sys
input_path = sys.argv[1] if len(sys.argv) > 1 else "claims_to_check.xlsx"
output_path = sys.argv[2] if len(sys.argv) > 2 else "Hard_Rule_Results.xlsx"
run_hard_rule_engine(input_path, output_path)