HTSReivewTool / hts_validator.py
joycecast's picture
Upload 3 files
c3ea20a verified
"""
HTS Validator - Core validation logic for HTS tariff auditing
Validates primary HTS codes against additional HTS and description keywords
Logic Flow:
1. Check Override Keywords (Zinc, Plastics) - highest priority
2. Check Special HTS (Computer Parts, Auto Parts)
3. Check Primary HTS membership:
a. If in 2+ HTS categories -> Dual HTS logic
b. If in 1 HTS category -> Single HTS logic
c. If in 0 HTS categories -> Keyword-only logic
4. Within each HTS category, check description keywords
"""
import re
from typing import Dict, List, Optional, Tuple, Set
from dataclasses import dataclass
from HTS_list import (Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list,
Computer_parts_HTS_list, Auto_parts_HTS_list, Semiconductor_HTS_list)
# Key Additional HTS codes
STEEL_232_CODES = {"99038190", "99038191"}
ALUMINUM_232_CODES = {"99038507", "99038508"}
COPPER_CODES = {"99037801", "99037802"}
GENERAL_301_CODE = "99030133"
MISMATCH_CODE = "99030125"
# All 232/tariff codes for checking forbidden
ALL_232_CODES = STEEL_232_CODES | ALUMINUM_232_CODES | COPPER_CODES
# Scenario summaries - updated for new case IDs
SCENARIO_SUMMARIES = {
# Level 0: Override
"Z1": "Zinc keyword - only 99030125, no 232/copper tariffs",
"P1": "Plastics + Steel HTS - only 99030125, no 232",
"P2": "Plastics + Aluminum HTS - only 99030125, no 232",
"P3": "Plastics + Steel+Alum HTS - only 99030125, no 232",
"P4": "Plastics + no metal HTS - no action",
"P5": "Plastics + Copper HTS - only 99030125, no copper tariff",
"P6": "Plastics + Alum+Copper HTS - only 99030125, no 232/copper",
# Level 1: Special HTS
"C1": "Computer Parts HTS - FLAG for manual review",
"A1": "Auto Parts HTS - FLAG for manual review",
"SC1": "Semiconductor HTS - FLAG for manual review (overlaps Computer/Aluminum)",
# Level 2: Dual HTS - Steel + Aluminum
"D1": "Steel+Alum HTS, no keyword - FLAG",
"D2": "Steel+Alum HTS + metal keyword - Steel 232 + 99030133",
"D3": "Steel+Alum HTS + aluminum keyword - Alum 232 + 99030133",
"D4": "Steel+Alum HTS + copper keyword - 99030125 (mismatch)",
"D5": "Steel+Alum HTS + metal+alum keywords - FLAG ambiguous",
"D6": "Steel+Alum HTS + metal+copper keywords - Steel 232 + 99030133",
"D7": "Steel+Alum HTS + alum+copper keywords - Alum 232 + 99030133",
"D8": "Steel+Alum HTS + all keywords - FLAG ambiguous",
# Level 2: Dual HTS - Aluminum + Copper
"E1": "Alum+Copper HTS, no keyword - FLAG",
"E2": "Alum+Copper HTS + metal keyword - 99030125 (mismatch)",
"E3": "Alum+Copper HTS + aluminum keyword - Alum 232 + 99030133",
"E4": "Alum+Copper HTS + copper keyword - Copper + 99030133",
"E5": "Alum+Copper HTS + metal+alum keywords - Alum 232 + 99030133",
"E6": "Alum+Copper HTS + metal+copper keywords - Copper + 99030133",
"E7": "Alum+Copper HTS + alum+copper keywords - FLAG ambiguous",
"E8": "Alum+Copper HTS + all keywords - FLAG ambiguous",
# Level 2: Dual HTS - Steel + Copper
"F1": "Steel+Copper HTS, no keyword - FLAG",
"F2": "Steel+Copper HTS + metal keyword - Steel 232 + 99030133",
"F3": "Steel+Copper HTS + aluminum keyword - 99030125 (mismatch)",
"F4": "Steel+Copper HTS + copper keyword - Copper + 99030133",
"F5": "Steel+Copper HTS + metal+alum keywords - Steel 232 + 99030133",
"F6": "Steel+Copper HTS + metal+copper keywords - FLAG ambiguous",
"F7": "Steel+Copper HTS + alum+copper keywords - Copper + 99030133",
"F8": "Steel+Copper HTS + all keywords - FLAG ambiguous",
# Level 3: Single HTS - Steel
"S1": "Steel HTS, no keyword - Steel 232 + 99030133",
"S2": "Steel HTS + metal keyword - Steel 232 + 99030133",
"S3": "Steel HTS + aluminum keyword - 99030125 (mismatch)",
"S4": "Steel HTS + copper keyword - 99030125 (mismatch)",
"S5": "Steel HTS + metal+alum keywords - Steel 232 + 99030133",
"S6": "Steel HTS + metal+copper keywords - Steel 232 + 99030133",
"S7": "Steel HTS + alum+copper keywords - 99030125 (mismatch)",
"S8": "Steel HTS + all keywords - Steel 232 + 99030133",
# Level 3: Single HTS - Aluminum
"L1": "Aluminum HTS, no keyword - Alum 232 + 99030133",
"L2": "Aluminum HTS + metal keyword - 99030125 (mismatch)",
"L3": "Aluminum HTS + aluminum keyword - Alum 232 + 99030133",
"L4": "Aluminum HTS + copper keyword - 99030125 (mismatch)",
"L5": "Aluminum HTS + metal+alum keywords - Alum 232 + 99030133",
"L6": "Aluminum HTS + metal+copper keywords - 99030125 (mismatch)",
"L7": "Aluminum HTS + alum+copper keywords - Alum 232 + 99030133",
"L8": "Aluminum HTS + all keywords - Alum 232 + 99030133",
# Level 3: Single HTS - Copper
"U1": "Copper HTS, no keyword - 99037802 (no copper content)",
"U2": "Copper HTS + metal keyword - 99030125 (mismatch)",
"U3": "Copper HTS + aluminum keyword - 99037802 + Alum 232",
"U4": "Copper HTS + copper keyword - 99037801 (copper content)",
"U5": "Copper HTS + metal+alum keywords - 99037802 + Alum 232",
"U6": "Copper HTS + metal+copper keywords - 99037801 (copper content)",
"U7": "Copper HTS + alum+copper keywords - FLAG ambiguous",
"U8": "Copper HTS + all keywords - FLAG ambiguous",
# Level 4: No HTS Match
"N1": "No metal HTS, no keyword - no action",
"N2": "No metal HTS + metal keyword - 99030125",
"N3": "No metal HTS + aluminum keyword - 99030125",
"N4": "No metal HTS + copper keyword - 99030125",
"N5": "No metal HTS + metal+alum keywords - 99030125",
"N6": "No metal HTS + metal+copper keywords - 99030125",
"N7": "No metal HTS + alum+copper keywords - 99030125",
"N8": "No metal HTS + all keywords - 99030125",
# Legacy
"NONE": "No applicable scenario - entry does not match any validation rules",
}
@dataclass
class ValidationResult:
"""Result of validating a single entry"""
entry_number: str
description: str
primary_hts: str
additional_hts: List[str]
scenario_id: str
scenario_summary: str
status: str # PASS, FLAG
expected_hts: List[str]
missing_hts: List[str]
unexpected_hts: List[str]
issue: str
# HTS membership indicators
in_steel_hts: bool = False
in_aluminum_hts: bool = False
in_copper_hts: bool = False
in_computer_hts: bool = False
in_auto_hts: bool = False
in_semiconductor_hts: bool = False
# Keyword indicators
has_metal_keyword: bool = False
has_aluminum_keyword: bool = False
has_copper_keyword: bool = False
has_zinc_keyword: bool = False
has_plastics_keyword: bool = False
class HTSValidator:
"""Validates HTS codes against descriptions and additional tariffs"""
def __init__(self,
metal_keywords: Optional[List[str]] = None,
aluminum_keywords: Optional[List[str]] = None,
copper_keywords: Optional[List[str]] = None,
zinc_keywords: Optional[List[str]] = None,
plastics_keywords: Optional[List[str]] = None):
"""Initialize validator with keyword lists"""
# Default keyword lists
self.metal_keywords = metal_keywords or [
"steel", "stainless steel", "carbon steel", "iron", "metal"
]
self.aluminum_keywords = aluminum_keywords or [
"aluminum", "aluminium"
]
self.copper_keywords = copper_keywords or [
"copper"
]
self.zinc_keywords = zinc_keywords or [
"zinc"
]
self.plastics_keywords = plastics_keywords or [
"plastic", "abs", "pu", "pvc", "polyester", "nylon"
]
# Convert HTS lists to string sets for matching
self.steel_hts_set = self._convert_hts_list(Steel_primary_HTS_list)
self.aluminum_hts_set = self._convert_hts_list(Aluminum_primary_HTS_list)
self.copper_hts_set = self._convert_hts_list(Copper_primary_HTS_list)
self.computer_parts_hts_set = self._convert_hts_list(Computer_parts_HTS_list)
self.auto_parts_hts_set = self._convert_hts_list(Auto_parts_HTS_list)
self.semiconductor_hts_set = self._convert_hts_list(Semiconductor_HTS_list)
def _convert_hts_list(self, hts_list: List) -> Set[str]:
"""Convert HTS list to set of strings"""
return {str(hts) for hts in hts_list}
def _hts_matches_list(self, primary_hts: str, hts_set: Set[str]) -> bool:
"""Check if primary HTS matches any entry in HTS set using prefix matching"""
primary_str = str(primary_hts).replace(".", "").strip()
for list_hts in hts_set:
list_hts_str = str(list_hts).replace(".", "").strip()
# Prefix match: if list entry is 8 digits, match any 10-digit starting with it
if len(list_hts_str) <= len(primary_str):
if primary_str.startswith(list_hts_str):
return True
else:
# List entry is longer, check if primary starts with it
if list_hts_str.startswith(primary_str):
return True
return False
def _contains_keywords(self, text: str, keywords: List[str]) -> bool:
"""Check if text contains any of the keywords (case-insensitive, word boundary)"""
if not text:
return False
text_lower = text.lower()
for kw in keywords:
# Use word boundary matching to avoid partial matches (e.g., "pu" in "punch")
# \b matches word boundaries
pattern = r'\b' + re.escape(kw.lower()) + r'\b'
if re.search(pattern, text_lower):
return True
return False
def _get_additional_hts_set(self, additional_hts: List[str]) -> Set[str]:
"""Convert additional HTS list to normalized set"""
result = set()
for hts in additional_hts:
if hts:
# Remove decimal points and convert to string
normalized = str(hts).replace(".", "").strip()
# Remove trailing .0 from floats
if normalized.endswith("0") and len(normalized) > 8:
# Check if it's a float representation
try:
float_val = float(hts)
normalized = str(int(float_val))
except (ValueError, TypeError):
pass
result.add(normalized)
return result
def _check_hts_present(self, hts_code: str, additional_set: Set[str]) -> bool:
"""Check if an HTS code is present in additional HTS set"""
return hts_code in additional_set
def _check_any_hts_present(self, hts_codes: Set[str], additional_set: Set[str]) -> bool:
"""Check if any of the HTS codes are present"""
return bool(hts_codes & additional_set)
def _get_keyword_category(self, has_metal: bool, has_aluminum: bool, has_copper: bool) -> str:
"""Determine keyword category code (K0-K7)"""
if has_metal and has_aluminum and has_copper:
return "K7"
elif has_aluminum and has_copper:
return "K6"
elif has_metal and has_copper:
return "K5"
elif has_metal and has_aluminum:
return "K4"
elif has_copper:
return "K3"
elif has_aluminum:
return "K2"
elif has_metal:
return "K1"
else:
return "K0"
def validate_entry(self, entry_number: str, description: str,
primary_hts: str, additional_hts: List[str]) -> ValidationResult:
"""Validate a single entry against all scenarios"""
# Normalize data
primary_str = str(primary_hts).replace(".", "").strip() if primary_hts else ""
desc = str(description) if description else ""
additional_set = self._get_additional_hts_set(additional_hts)
# Check which HTS lists the primary belongs to
in_steel = self._hts_matches_list(primary_str, self.steel_hts_set)
in_aluminum = self._hts_matches_list(primary_str, self.aluminum_hts_set)
in_copper = self._hts_matches_list(primary_str, self.copper_hts_set)
in_computer_parts = self._hts_matches_list(primary_str, self.computer_parts_hts_set)
in_auto_parts = self._hts_matches_list(primary_str, self.auto_parts_hts_set)
in_semiconductor = self._hts_matches_list(primary_str, self.semiconductor_hts_set)
# Check description keywords
has_metal_kw = self._contains_keywords(desc, self.metal_keywords)
has_aluminum_kw = self._contains_keywords(desc, self.aluminum_keywords)
has_copper_kw = self._contains_keywords(desc, self.copper_keywords)
has_zinc_kw = self._contains_keywords(desc, self.zinc_keywords)
has_plastics_kw = self._contains_keywords(desc, self.plastics_keywords)
# Check which additional HTS are applied
has_steel_232 = self._check_any_hts_present(STEEL_232_CODES, additional_set)
has_aluminum_232 = self._check_any_hts_present(ALUMINUM_232_CODES, additional_set)
has_copper_tariff = self._check_any_hts_present(COPPER_CODES, additional_set)
has_301 = self._check_hts_present(GENERAL_301_CODE, additional_set)
has_mismatch = self._check_hts_present(MISMATCH_CODE, additional_set)
# Get keyword category
keyword_cat = self._get_keyword_category(has_metal_kw, has_aluminum_kw, has_copper_kw)
# Apply validation rules in level order
return self._apply_validation_rules(
entry_number=entry_number,
description=desc,
primary_hts=primary_str,
additional_hts=list(additional_set),
in_steel=in_steel,
in_aluminum=in_aluminum,
in_copper=in_copper,
in_computer_parts=in_computer_parts,
in_auto_parts=in_auto_parts,
in_semiconductor=in_semiconductor,
has_metal_kw=has_metal_kw,
has_aluminum_kw=has_aluminum_kw,
has_copper_kw=has_copper_kw,
has_zinc_kw=has_zinc_kw,
has_plastics_kw=has_plastics_kw,
has_steel_232=has_steel_232,
has_aluminum_232=has_aluminum_232,
has_copper_tariff=has_copper_tariff,
has_301=has_301,
has_mismatch=has_mismatch,
additional_set=additional_set,
keyword_cat=keyword_cat
)
def _create_result(self, entry_number: str, description: str, primary_hts: str,
additional_hts: List[str], scenario_id: str,
expected_codes: List[str], forbidden_codes: Set[str],
additional_set: Set[str], always_flag: bool = False,
flag_reason: str = "",
# Indicators
in_steel: bool = False, in_aluminum: bool = False, in_copper: bool = False,
in_computer: bool = False, in_auto: bool = False, in_semiconductor: bool = False,
has_metal_kw: bool = False, has_aluminum_kw: bool = False,
has_copper_kw: bool = False, has_zinc_kw: bool = False,
has_plastics_kw: bool = False) -> ValidationResult:
"""Create validation result by checking expected vs actual
For tariff code groups (Steel 232, Aluminum 232, Copper), we check if ANY is present.
For individual codes (99030125, 99030133), we check if that specific code is present.
"""
if always_flag:
return ValidationResult(
entry_number=entry_number,
description=description,
primary_hts=primary_hts,
additional_hts=additional_hts,
scenario_id=scenario_id,
scenario_summary=SCENARIO_SUMMARIES.get(scenario_id, ""),
status="FLAG",
expected_hts=[],
missing_hts=[],
unexpected_hts=[],
issue=flag_reason or "Manual review required",
in_steel_hts=in_steel,
in_aluminum_hts=in_aluminum,
in_copper_hts=in_copper,
in_computer_hts=in_computer,
in_auto_hts=in_auto,
in_semiconductor_hts=in_semiconductor,
has_metal_keyword=has_metal_kw,
has_aluminum_keyword=has_aluminum_kw,
has_copper_keyword=has_copper_kw,
has_zinc_keyword=has_zinc_kw,
has_plastics_keyword=has_plastics_kw
)
# Group expected codes by tariff type
# Check if ANY code from each group is present
missing = []
expected_display = []
# Check Steel 232 group
steel_232_expected = [c for c in expected_codes if c in STEEL_232_CODES]
if steel_232_expected:
expected_display.append("Steel 232")
if not (STEEL_232_CODES & additional_set):
missing.append("Steel 232 (99038190/91)")
# Check Aluminum 232 group
alum_232_expected = [c for c in expected_codes if c in ALUMINUM_232_CODES]
if alum_232_expected:
expected_display.append("Alum 232")
if not (ALUMINUM_232_CODES & additional_set):
missing.append("Alum 232 (99038507/08)")
# Check Copper group
copper_expected = [c for c in expected_codes if c in COPPER_CODES]
if copper_expected:
expected_display.append("Copper")
if not (COPPER_CODES & additional_set):
missing.append("Copper (99037801/02)")
# Check individual codes (99030133, 99030125)
for code in expected_codes:
if code not in STEEL_232_CODES and code not in ALUMINUM_232_CODES and code not in COPPER_CODES:
expected_display.append(code)
if code not in additional_set:
missing.append(code)
# Check for forbidden codes present
unexpected = list(forbidden_codes & additional_set)
# Determine status
if not missing and not unexpected:
status = "PASS"
issue = "Correct tariff application"
else:
status = "FLAG"
issues = []
if missing:
issues.append(f"Missing: {', '.join(missing)}")
if unexpected:
issues.append(f"Unexpected: {', '.join(unexpected)}")
issue = "; ".join(issues)
return ValidationResult(
entry_number=entry_number,
description=description,
primary_hts=primary_hts,
additional_hts=additional_hts,
scenario_id=scenario_id,
scenario_summary=SCENARIO_SUMMARIES.get(scenario_id, ""),
status=status,
expected_hts=expected_display,
missing_hts=missing,
unexpected_hts=unexpected,
issue=issue,
in_steel_hts=in_steel,
in_aluminum_hts=in_aluminum,
in_copper_hts=in_copper,
in_computer_hts=in_computer,
in_auto_hts=in_auto,
in_semiconductor_hts=in_semiconductor,
has_metal_keyword=has_metal_kw,
has_aluminum_keyword=has_aluminum_kw,
has_copper_keyword=has_copper_kw,
has_zinc_keyword=has_zinc_kw,
has_plastics_keyword=has_plastics_kw
)
def _apply_validation_rules(self, entry_number: str, description: str,
primary_hts: str, additional_hts: List[str],
in_steel: bool, in_aluminum: bool, in_copper: bool,
in_computer_parts: bool, in_auto_parts: bool, in_semiconductor: bool,
has_metal_kw: bool, has_aluminum_kw: bool,
has_copper_kw: bool, has_zinc_kw: bool,
has_plastics_kw: bool, has_steel_232: bool,
has_aluminum_232: bool, has_copper_tariff: bool,
has_301: bool, has_mismatch: bool,
additional_set: Set[str], keyword_cat: str) -> ValidationResult:
"""Apply all validation rules in level order"""
# Common indicator parameters for all _create_result calls
indicators = {
"in_steel": in_steel,
"in_aluminum": in_aluminum,
"in_copper": in_copper,
"in_computer": in_computer_parts,
"in_auto": in_auto_parts,
"in_semiconductor": in_semiconductor,
"has_metal_kw": has_metal_kw,
"has_aluminum_kw": has_aluminum_kw,
"has_copper_kw": has_copper_kw,
"has_zinc_kw": has_zinc_kw,
"has_plastics_kw": has_plastics_kw,
}
# =====================================================================
# LEVEL 0: Override Cases (Highest Priority)
# =====================================================================
# Z1: Zinc keyword - only 99030125, no 232/copper tariffs
if has_zinc_kw:
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="Z1",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES,
additional_set=additional_set,
**indicators
)
# Plastics override cases
if has_plastics_kw:
if in_steel and not in_aluminum and not in_copper:
# P1: Plastics + Steel HTS
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P1",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES,
additional_set=additional_set,
**indicators
)
elif in_aluminum and not in_steel and not in_copper:
# P2: Plastics + Aluminum HTS
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P2",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES,
additional_set=additional_set,
**indicators
)
elif in_steel and in_aluminum and not in_copper:
# P3: Plastics + Steel+Alum HTS
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P3",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES,
additional_set=additional_set,
**indicators
)
elif in_copper and not in_steel and not in_aluminum:
# P5: Plastics + Copper HTS
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P5",
expected_codes=[MISMATCH_CODE],
forbidden_codes=COPPER_CODES,
additional_set=additional_set,
**indicators
)
elif in_aluminum and in_copper:
# P6: Plastics + Alum+Copper HTS
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P6",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES,
additional_set=additional_set,
**indicators
)
else:
# P4: Plastics + no metal HTS - no action needed
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="P4",
expected_codes=[],
forbidden_codes=set(),
additional_set=additional_set,
**indicators
)
# =====================================================================
# LEVEL 1: Special HTS Categories
# =====================================================================
# C1: Computer Parts HTS - always FLAG
if in_computer_parts:
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="C1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Computer parts HTS - manual review required"
)
# A1: Auto Parts HTS - always FLAG
if in_auto_parts:
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="A1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Auto parts HTS - manual review required"
)
# SC1: Semiconductor HTS - always FLAG (overlaps with Computer Parts and Aluminum)
if in_semiconductor:
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="SC1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Semiconductor HTS - manual review required (overlaps Computer/Aluminum)"
)
# =====================================================================
# LEVEL 2: Dual HTS Categories
# =====================================================================
# H4: Steel + Aluminum
if in_steel and in_aluminum and not in_copper:
if keyword_cat == "K0": # D1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Aluminum HTS with no keyword - cannot determine tariff"
)
elif keyword_cat == "K1": # D2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D2",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # D3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D3",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # D4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D4",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # D5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D5",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Aluminum HTS with both metal and aluminum keywords - ambiguous"
)
elif keyword_cat == "K5": # D6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D6",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # D7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D7",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K7": # D8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="D8",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Aluminum HTS with all keywords - ambiguous"
)
# H5: Aluminum + Copper
if in_aluminum and in_copper and not in_steel:
if keyword_cat == "K0": # E1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Aluminum+Copper HTS with no keyword - cannot determine tariff"
)
elif keyword_cat == "K1": # E2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E2",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # E3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E3",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # E4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E4",
expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # E5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E5",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # E6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E6",
expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # E7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E7",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Aluminum+Copper HTS with both aluminum and copper keywords - ambiguous"
)
elif keyword_cat == "K7": # E8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="E8",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Aluminum+Copper HTS with all keywords - ambiguous"
)
# H6: Steel + Copper
if in_steel and in_copper and not in_aluminum:
if keyword_cat == "K0": # F1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F1",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Copper HTS with no keyword - cannot determine tariff"
)
elif keyword_cat == "K1": # F2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F2",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # F3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F3",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # F4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F4",
expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # F5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F5",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # F6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F6",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Copper HTS with both metal and copper keywords - ambiguous"
)
elif keyword_cat == "K6": # F7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F7",
expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K7": # F8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="F8",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Steel+Copper HTS with all keywords - ambiguous"
)
# =====================================================================
# LEVEL 3: Single HTS Category
# =====================================================================
# H1: Steel Only
if in_steel and not in_aluminum and not in_copper:
if keyword_cat == "K0": # S1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S1",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K1": # S2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S2",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # S3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S3",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # S4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S4",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | COPPER_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # S5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S5",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # S6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S6",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # S7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S7",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K7": # S8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="S8",
expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
# H2: Aluminum Only
if in_aluminum and not in_steel and not in_copper:
if keyword_cat == "K0": # L1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L1",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K1": # L2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L2",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # L3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L3",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # L4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L4",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # L5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L5",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # L6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L6",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # L7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L7",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K7": # L8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="L8",
expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE},
additional_set=additional_set,
**indicators
)
# H3: Copper Only
# New logic: Copper keyword -> 99037801, No copper keyword -> 99037802
if in_copper and not in_steel and not in_aluminum:
if keyword_cat == "K0": # U1: No keyword -> 99037802 (no copper content)
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U1",
expected_codes=["99037802"],
forbidden_codes={MISMATCH_CODE, "99037801"},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K1": # U2: Metal only -> mismatch
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U2",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | COPPER_CODES | {GENERAL_301_CODE},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # U3: Aluminum only -> 99037802 + Alum 232
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U3",
expected_codes=["99037802"] + list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE, "99037801"},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # U4: Copper only -> 99037801 (copper content)
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U4",
expected_codes=["99037801"],
forbidden_codes={MISMATCH_CODE, "99037802"},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # U5: Metal + Aluminum -> 99037802 + Alum 232
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U5",
expected_codes=["99037802"] + list(ALUMINUM_232_CODES) + [GENERAL_301_CODE],
forbidden_codes={MISMATCH_CODE, "99037801"},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # U6: Metal + Copper -> 99037801 (copper keyword present)
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U6",
expected_codes=["99037801"],
forbidden_codes={MISMATCH_CODE, "99037802"},
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # U7: Aluminum + Copper -> FLAG ambiguous
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U7",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Copper HTS with both aluminum and copper keywords - ambiguous"
)
elif keyword_cat == "K7": # U8: All three -> FLAG ambiguous
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="U8",
expected_codes=[], forbidden_codes=set(),
additional_set=additional_set,
**indicators,
always_flag=True,
flag_reason="Copper HTS with all keywords - ambiguous"
)
# =====================================================================
# LEVEL 4: No HTS Match
# =====================================================================
# H0: Not in any metal list
if keyword_cat == "K0": # N1: No keyword
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N1",
expected_codes=[],
forbidden_codes=set(),
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K1": # N2: Metal only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N2",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K2": # N3: Aluminum only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N3",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALUMINUM_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K3": # N4: Copper only
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N4",
expected_codes=[MISMATCH_CODE],
forbidden_codes=COPPER_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K4": # N5: Metal + Aluminum
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N5",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K5": # N6: Metal + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N6",
expected_codes=[MISMATCH_CODE],
forbidden_codes=STEEL_232_CODES | COPPER_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K6": # N7: Aluminum + Copper
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N7",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES,
additional_set=additional_set,
**indicators
)
elif keyword_cat == "K7": # N8: All three
return self._create_result(
entry_number, description, primary_hts, additional_hts,
scenario_id="N8",
expected_codes=[MISMATCH_CODE],
forbidden_codes=ALL_232_CODES,
additional_set=additional_set,
**indicators
)
# Fallback - should not reach here
return ValidationResult(
entry_number=entry_number,
description=description,
primary_hts=primary_hts,
additional_hts=additional_hts,
scenario_id="NONE",
scenario_summary=SCENARIO_SUMMARIES["NONE"],
status="PASS",
expected_hts=[],
missing_hts=[],
unexpected_hts=[],
issue="No validation rule matched",
in_steel_hts=in_steel,
in_aluminum_hts=in_aluminum,
in_copper_hts=in_copper,
in_computer_hts=in_computer_parts,
in_auto_hts=in_auto_parts,
in_semiconductor_hts=in_semiconductor,
has_metal_keyword=has_metal_kw,
has_aluminum_keyword=has_aluminum_kw,
has_copper_keyword=has_copper_kw,
has_zinc_keyword=has_zinc_kw,
has_plastics_keyword=has_plastics_kw
)
def validate_dataframe(df, validator: HTSValidator,
description_col: str = "Description",
tariff_col: str = "Tariff",
entry_col: str = "Entry Number",
additional_cols: List[str] = None) -> List[ValidationResult]:
"""Validate all entries in a DataFrame"""
if additional_cols is None:
additional_cols = ["Primary 1", "Primary 2", "Primary 3",
"Primary 4", "Primary 5", "Primary 6"]
results = []
for idx, row in df.iterrows():
entry_number = str(row.get(entry_col, f"Row_{idx}"))
description = str(row.get(description_col, ""))
primary_hts = str(row.get(tariff_col, ""))
# Get additional HTS codes
additional_hts = []
for col in additional_cols:
if col in row and row[col] is not None:
val = row[col]
if str(val).strip() and str(val).lower() != "nan":
additional_hts.append(str(val))
result = validator.validate_entry(
entry_number=entry_number,
description=description,
primary_hts=primary_hts,
additional_hts=additional_hts
)
results.append(result)
return results