""" HTS Validator - Core validation logic for HTS tariff auditing Validates primary HTS codes against additional HTS and description keywords Logic Flow: 1. Check Override Keywords (Zinc, Plastics) - highest priority 2. Check Special HTS (Computer Parts, Auto Parts) 3. Check Primary HTS membership: a. If in 2+ HTS categories -> Dual HTS logic b. If in 1 HTS category -> Single HTS logic c. If in 0 HTS categories -> Keyword-only logic 4. Within each HTS category, check description keywords """ import re from typing import Dict, List, Optional, Tuple, Set from dataclasses import dataclass from HTS_list import (Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list, Computer_parts_HTS_list, Auto_parts_HTS_list, Semiconductor_HTS_list) # Key Additional HTS codes STEEL_232_CODES = {"99038190", "99038191"} ALUMINUM_232_CODES = {"99038507", "99038508"} COPPER_CODES = {"99037801", "99037802"} GENERAL_301_CODE = "99030133" MISMATCH_CODE = "99030125" # All 232/tariff codes for checking forbidden ALL_232_CODES = STEEL_232_CODES | ALUMINUM_232_CODES | COPPER_CODES # Scenario summaries - updated for new case IDs SCENARIO_SUMMARIES = { # Level 0: Override "Z1": "Zinc keyword - only 99030125, no 232/copper tariffs", "P1": "Plastics + Steel HTS - only 99030125, no 232", "P2": "Plastics + Aluminum HTS - only 99030125, no 232", "P3": "Plastics + Steel+Alum HTS - only 99030125, no 232", "P4": "Plastics + no metal HTS - no action", "P5": "Plastics + Copper HTS - only 99030125, no copper tariff", "P6": "Plastics + Alum+Copper HTS - only 99030125, no 232/copper", # Level 1: Special HTS "C1": "Computer Parts HTS - FLAG for manual review", "A1": "Auto Parts HTS - FLAG for manual review", "SC1": "Semiconductor HTS - FLAG for manual review (overlaps Computer/Aluminum)", # Level 2: Dual HTS - Steel + Aluminum "D1": "Steel+Alum HTS, no keyword - FLAG", "D2": "Steel+Alum HTS + metal keyword - Steel 232 + 99030133", "D3": "Steel+Alum HTS + aluminum keyword - Alum 232 + 99030133", "D4": "Steel+Alum HTS + copper keyword - 99030125 (mismatch)", "D5": "Steel+Alum HTS + metal+alum keywords - FLAG ambiguous", "D6": "Steel+Alum HTS + metal+copper keywords - Steel 232 + 99030133", "D7": "Steel+Alum HTS + alum+copper keywords - Alum 232 + 99030133", "D8": "Steel+Alum HTS + all keywords - FLAG ambiguous", # Level 2: Dual HTS - Aluminum + Copper "E1": "Alum+Copper HTS, no keyword - FLAG", "E2": "Alum+Copper HTS + metal keyword - 99030125 (mismatch)", "E3": "Alum+Copper HTS + aluminum keyword - Alum 232 + 99030133", "E4": "Alum+Copper HTS + copper keyword - Copper + 99030133", "E5": "Alum+Copper HTS + metal+alum keywords - Alum 232 + 99030133", "E6": "Alum+Copper HTS + metal+copper keywords - Copper + 99030133", "E7": "Alum+Copper HTS + alum+copper keywords - FLAG ambiguous", "E8": "Alum+Copper HTS + all keywords - FLAG ambiguous", # Level 2: Dual HTS - Steel + Copper "F1": "Steel+Copper HTS, no keyword - FLAG", "F2": "Steel+Copper HTS + metal keyword - Steel 232 + 99030133", "F3": "Steel+Copper HTS + aluminum keyword - 99030125 (mismatch)", "F4": "Steel+Copper HTS + copper keyword - Copper + 99030133", "F5": "Steel+Copper HTS + metal+alum keywords - Steel 232 + 99030133", "F6": "Steel+Copper HTS + metal+copper keywords - FLAG ambiguous", "F7": "Steel+Copper HTS + alum+copper keywords - Copper + 99030133", "F8": "Steel+Copper HTS + all keywords - FLAG ambiguous", # Level 3: Single HTS - Steel "S1": "Steel HTS, no keyword - Steel 232 + 99030133", "S2": "Steel HTS + metal keyword - Steel 232 + 99030133", "S3": "Steel HTS + aluminum keyword - 99030125 (mismatch)", "S4": "Steel HTS + copper keyword - 99030125 (mismatch)", "S5": "Steel HTS + metal+alum keywords - Steel 232 + 99030133", "S6": "Steel HTS + metal+copper keywords - Steel 232 + 99030133", "S7": "Steel HTS + alum+copper keywords - 99030125 (mismatch)", "S8": "Steel HTS + all keywords - Steel 232 + 99030133", # Level 3: Single HTS - Aluminum "L1": "Aluminum HTS, no keyword - Alum 232 + 99030133", "L2": "Aluminum HTS + metal keyword - 99030125 (mismatch)", "L3": "Aluminum HTS + aluminum keyword - Alum 232 + 99030133", "L4": "Aluminum HTS + copper keyword - 99030125 (mismatch)", "L5": "Aluminum HTS + metal+alum keywords - Alum 232 + 99030133", "L6": "Aluminum HTS + metal+copper keywords - 99030125 (mismatch)", "L7": "Aluminum HTS + alum+copper keywords - Alum 232 + 99030133", "L8": "Aluminum HTS + all keywords - Alum 232 + 99030133", # Level 3: Single HTS - Copper "U1": "Copper HTS, no keyword - 99037802 (no copper content)", "U2": "Copper HTS + metal keyword - 99030125 (mismatch)", "U3": "Copper HTS + aluminum keyword - 99037802 + Alum 232", "U4": "Copper HTS + copper keyword - 99037801 (copper content)", "U5": "Copper HTS + metal+alum keywords - 99037802 + Alum 232", "U6": "Copper HTS + metal+copper keywords - 99037801 (copper content)", "U7": "Copper HTS + alum+copper keywords - FLAG ambiguous", "U8": "Copper HTS + all keywords - FLAG ambiguous", # Level 4: No HTS Match "N1": "No metal HTS, no keyword - no action", "N2": "No metal HTS + metal keyword - 99030125", "N3": "No metal HTS + aluminum keyword - 99030125", "N4": "No metal HTS + copper keyword - 99030125", "N5": "No metal HTS + metal+alum keywords - 99030125", "N6": "No metal HTS + metal+copper keywords - 99030125", "N7": "No metal HTS + alum+copper keywords - 99030125", "N8": "No metal HTS + all keywords - 99030125", # Legacy "NONE": "No applicable scenario - entry does not match any validation rules", } @dataclass class ValidationResult: """Result of validating a single entry""" entry_number: str description: str primary_hts: str additional_hts: List[str] scenario_id: str scenario_summary: str status: str # PASS, FLAG expected_hts: List[str] missing_hts: List[str] unexpected_hts: List[str] issue: str # HTS membership indicators in_steel_hts: bool = False in_aluminum_hts: bool = False in_copper_hts: bool = False in_computer_hts: bool = False in_auto_hts: bool = False in_semiconductor_hts: bool = False # Keyword indicators has_metal_keyword: bool = False has_aluminum_keyword: bool = False has_copper_keyword: bool = False has_zinc_keyword: bool = False has_plastics_keyword: bool = False class HTSValidator: """Validates HTS codes against descriptions and additional tariffs""" def __init__(self, metal_keywords: Optional[List[str]] = None, aluminum_keywords: Optional[List[str]] = None, copper_keywords: Optional[List[str]] = None, zinc_keywords: Optional[List[str]] = None, plastics_keywords: Optional[List[str]] = None): """Initialize validator with keyword lists""" # Default keyword lists self.metal_keywords = metal_keywords or [ "steel", "stainless steel", "carbon steel", "iron", "metal" ] self.aluminum_keywords = aluminum_keywords or [ "aluminum", "aluminium" ] self.copper_keywords = copper_keywords or [ "copper" ] self.zinc_keywords = zinc_keywords or [ "zinc" ] self.plastics_keywords = plastics_keywords or [ "plastic", "abs", "pu", "pvc", "polyester", "nylon" ] # Convert HTS lists to string sets for matching self.steel_hts_set = self._convert_hts_list(Steel_primary_HTS_list) self.aluminum_hts_set = self._convert_hts_list(Aluminum_primary_HTS_list) self.copper_hts_set = self._convert_hts_list(Copper_primary_HTS_list) self.computer_parts_hts_set = self._convert_hts_list(Computer_parts_HTS_list) self.auto_parts_hts_set = self._convert_hts_list(Auto_parts_HTS_list) self.semiconductor_hts_set = self._convert_hts_list(Semiconductor_HTS_list) def _convert_hts_list(self, hts_list: List) -> Set[str]: """Convert HTS list to set of strings""" return {str(hts) for hts in hts_list} def _hts_matches_list(self, primary_hts: str, hts_set: Set[str]) -> bool: """Check if primary HTS matches any entry in HTS set using prefix matching""" primary_str = str(primary_hts).replace(".", "").strip() for list_hts in hts_set: list_hts_str = str(list_hts).replace(".", "").strip() # Prefix match: if list entry is 8 digits, match any 10-digit starting with it if len(list_hts_str) <= len(primary_str): if primary_str.startswith(list_hts_str): return True else: # List entry is longer, check if primary starts with it if list_hts_str.startswith(primary_str): return True return False def _contains_keywords(self, text: str, keywords: List[str]) -> bool: """Check if text contains any of the keywords (case-insensitive, word boundary)""" if not text: return False text_lower = text.lower() for kw in keywords: # Use word boundary matching to avoid partial matches (e.g., "pu" in "punch") # \b matches word boundaries pattern = r'\b' + re.escape(kw.lower()) + r'\b' if re.search(pattern, text_lower): return True return False def _get_additional_hts_set(self, additional_hts: List[str]) -> Set[str]: """Convert additional HTS list to normalized set""" result = set() for hts in additional_hts: if hts: # Remove decimal points and convert to string normalized = str(hts).replace(".", "").strip() # Remove trailing .0 from floats if normalized.endswith("0") and len(normalized) > 8: # Check if it's a float representation try: float_val = float(hts) normalized = str(int(float_val)) except (ValueError, TypeError): pass result.add(normalized) return result def _check_hts_present(self, hts_code: str, additional_set: Set[str]) -> bool: """Check if an HTS code is present in additional HTS set""" return hts_code in additional_set def _check_any_hts_present(self, hts_codes: Set[str], additional_set: Set[str]) -> bool: """Check if any of the HTS codes are present""" return bool(hts_codes & additional_set) def _get_keyword_category(self, has_metal: bool, has_aluminum: bool, has_copper: bool) -> str: """Determine keyword category code (K0-K7)""" if has_metal and has_aluminum and has_copper: return "K7" elif has_aluminum and has_copper: return "K6" elif has_metal and has_copper: return "K5" elif has_metal and has_aluminum: return "K4" elif has_copper: return "K3" elif has_aluminum: return "K2" elif has_metal: return "K1" else: return "K0" def validate_entry(self, entry_number: str, description: str, primary_hts: str, additional_hts: List[str]) -> ValidationResult: """Validate a single entry against all scenarios""" # Normalize data primary_str = str(primary_hts).replace(".", "").strip() if primary_hts else "" desc = str(description) if description else "" additional_set = self._get_additional_hts_set(additional_hts) # Check which HTS lists the primary belongs to in_steel = self._hts_matches_list(primary_str, self.steel_hts_set) in_aluminum = self._hts_matches_list(primary_str, self.aluminum_hts_set) in_copper = self._hts_matches_list(primary_str, self.copper_hts_set) in_computer_parts = self._hts_matches_list(primary_str, self.computer_parts_hts_set) in_auto_parts = self._hts_matches_list(primary_str, self.auto_parts_hts_set) in_semiconductor = self._hts_matches_list(primary_str, self.semiconductor_hts_set) # Check description keywords has_metal_kw = self._contains_keywords(desc, self.metal_keywords) has_aluminum_kw = self._contains_keywords(desc, self.aluminum_keywords) has_copper_kw = self._contains_keywords(desc, self.copper_keywords) has_zinc_kw = self._contains_keywords(desc, self.zinc_keywords) has_plastics_kw = self._contains_keywords(desc, self.plastics_keywords) # Check which additional HTS are applied has_steel_232 = self._check_any_hts_present(STEEL_232_CODES, additional_set) has_aluminum_232 = self._check_any_hts_present(ALUMINUM_232_CODES, additional_set) has_copper_tariff = self._check_any_hts_present(COPPER_CODES, additional_set) has_301 = self._check_hts_present(GENERAL_301_CODE, additional_set) has_mismatch = self._check_hts_present(MISMATCH_CODE, additional_set) # Get keyword category keyword_cat = self._get_keyword_category(has_metal_kw, has_aluminum_kw, has_copper_kw) # Apply validation rules in level order return self._apply_validation_rules( entry_number=entry_number, description=desc, primary_hts=primary_str, additional_hts=list(additional_set), in_steel=in_steel, in_aluminum=in_aluminum, in_copper=in_copper, in_computer_parts=in_computer_parts, in_auto_parts=in_auto_parts, in_semiconductor=in_semiconductor, has_metal_kw=has_metal_kw, has_aluminum_kw=has_aluminum_kw, has_copper_kw=has_copper_kw, has_zinc_kw=has_zinc_kw, has_plastics_kw=has_plastics_kw, has_steel_232=has_steel_232, has_aluminum_232=has_aluminum_232, has_copper_tariff=has_copper_tariff, has_301=has_301, has_mismatch=has_mismatch, additional_set=additional_set, keyword_cat=keyword_cat ) def _create_result(self, entry_number: str, description: str, primary_hts: str, additional_hts: List[str], scenario_id: str, expected_codes: List[str], forbidden_codes: Set[str], additional_set: Set[str], always_flag: bool = False, flag_reason: str = "", # Indicators in_steel: bool = False, in_aluminum: bool = False, in_copper: bool = False, in_computer: bool = False, in_auto: bool = False, in_semiconductor: bool = False, has_metal_kw: bool = False, has_aluminum_kw: bool = False, has_copper_kw: bool = False, has_zinc_kw: bool = False, has_plastics_kw: bool = False) -> ValidationResult: """Create validation result by checking expected vs actual For tariff code groups (Steel 232, Aluminum 232, Copper), we check if ANY is present. For individual codes (99030125, 99030133), we check if that specific code is present. """ if always_flag: return ValidationResult( entry_number=entry_number, description=description, primary_hts=primary_hts, additional_hts=additional_hts, scenario_id=scenario_id, scenario_summary=SCENARIO_SUMMARIES.get(scenario_id, ""), status="FLAG", expected_hts=[], missing_hts=[], unexpected_hts=[], issue=flag_reason or "Manual review required", in_steel_hts=in_steel, in_aluminum_hts=in_aluminum, in_copper_hts=in_copper, in_computer_hts=in_computer, in_auto_hts=in_auto, in_semiconductor_hts=in_semiconductor, has_metal_keyword=has_metal_kw, has_aluminum_keyword=has_aluminum_kw, has_copper_keyword=has_copper_kw, has_zinc_keyword=has_zinc_kw, has_plastics_keyword=has_plastics_kw ) # Group expected codes by tariff type # Check if ANY code from each group is present missing = [] expected_display = [] # Check Steel 232 group steel_232_expected = [c for c in expected_codes if c in STEEL_232_CODES] if steel_232_expected: expected_display.append("Steel 232") if not (STEEL_232_CODES & additional_set): missing.append("Steel 232 (99038190/91)") # Check Aluminum 232 group alum_232_expected = [c for c in expected_codes if c in ALUMINUM_232_CODES] if alum_232_expected: expected_display.append("Alum 232") if not (ALUMINUM_232_CODES & additional_set): missing.append("Alum 232 (99038507/08)") # Check Copper group copper_expected = [c for c in expected_codes if c in COPPER_CODES] if copper_expected: expected_display.append("Copper") if not (COPPER_CODES & additional_set): missing.append("Copper (99037801/02)") # Check individual codes (99030133, 99030125) for code in expected_codes: if code not in STEEL_232_CODES and code not in ALUMINUM_232_CODES and code not in COPPER_CODES: expected_display.append(code) if code not in additional_set: missing.append(code) # Check for forbidden codes present unexpected = list(forbidden_codes & additional_set) # Determine status if not missing and not unexpected: status = "PASS" issue = "Correct tariff application" else: status = "FLAG" issues = [] if missing: issues.append(f"Missing: {', '.join(missing)}") if unexpected: issues.append(f"Unexpected: {', '.join(unexpected)}") issue = "; ".join(issues) return ValidationResult( entry_number=entry_number, description=description, primary_hts=primary_hts, additional_hts=additional_hts, scenario_id=scenario_id, scenario_summary=SCENARIO_SUMMARIES.get(scenario_id, ""), status=status, expected_hts=expected_display, missing_hts=missing, unexpected_hts=unexpected, issue=issue, in_steel_hts=in_steel, in_aluminum_hts=in_aluminum, in_copper_hts=in_copper, in_computer_hts=in_computer, in_auto_hts=in_auto, in_semiconductor_hts=in_semiconductor, has_metal_keyword=has_metal_kw, has_aluminum_keyword=has_aluminum_kw, has_copper_keyword=has_copper_kw, has_zinc_keyword=has_zinc_kw, has_plastics_keyword=has_plastics_kw ) def _apply_validation_rules(self, entry_number: str, description: str, primary_hts: str, additional_hts: List[str], in_steel: bool, in_aluminum: bool, in_copper: bool, in_computer_parts: bool, in_auto_parts: bool, in_semiconductor: bool, has_metal_kw: bool, has_aluminum_kw: bool, has_copper_kw: bool, has_zinc_kw: bool, has_plastics_kw: bool, has_steel_232: bool, has_aluminum_232: bool, has_copper_tariff: bool, has_301: bool, has_mismatch: bool, additional_set: Set[str], keyword_cat: str) -> ValidationResult: """Apply all validation rules in level order""" # Common indicator parameters for all _create_result calls indicators = { "in_steel": in_steel, "in_aluminum": in_aluminum, "in_copper": in_copper, "in_computer": in_computer_parts, "in_auto": in_auto_parts, "in_semiconductor": in_semiconductor, "has_metal_kw": has_metal_kw, "has_aluminum_kw": has_aluminum_kw, "has_copper_kw": has_copper_kw, "has_zinc_kw": has_zinc_kw, "has_plastics_kw": has_plastics_kw, } # ===================================================================== # LEVEL 0: Override Cases (Highest Priority) # ===================================================================== # Z1: Zinc keyword - only 99030125, no 232/copper tariffs if has_zinc_kw: return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="Z1", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES, additional_set=additional_set, **indicators ) # Plastics override cases if has_plastics_kw: if in_steel and not in_aluminum and not in_copper: # P1: Plastics + Steel HTS return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P1", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES, additional_set=additional_set, **indicators ) elif in_aluminum and not in_steel and not in_copper: # P2: Plastics + Aluminum HTS return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P2", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES, additional_set=additional_set, **indicators ) elif in_steel and in_aluminum and not in_copper: # P3: Plastics + Steel+Alum HTS return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P3", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES, additional_set=additional_set, **indicators ) elif in_copper and not in_steel and not in_aluminum: # P5: Plastics + Copper HTS return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P5", expected_codes=[MISMATCH_CODE], forbidden_codes=COPPER_CODES, additional_set=additional_set, **indicators ) elif in_aluminum and in_copper: # P6: Plastics + Alum+Copper HTS return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P6", expected_codes=[MISMATCH_CODE], forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES, additional_set=additional_set, **indicators ) else: # P4: Plastics + no metal HTS - no action needed return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="P4", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators ) # ===================================================================== # LEVEL 1: Special HTS Categories # ===================================================================== # C1: Computer Parts HTS - always FLAG if in_computer_parts: return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="C1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Computer parts HTS - manual review required" ) # A1: Auto Parts HTS - always FLAG if in_auto_parts: return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="A1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Auto parts HTS - manual review required" ) # SC1: Semiconductor HTS - always FLAG (overlaps with Computer Parts and Aluminum) if in_semiconductor: return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="SC1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Semiconductor HTS - manual review required (overlaps Computer/Aluminum)" ) # ===================================================================== # LEVEL 2: Dual HTS Categories # ===================================================================== # H4: Steel + Aluminum if in_steel and in_aluminum and not in_copper: if keyword_cat == "K0": # D1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Aluminum HTS with no keyword - cannot determine tariff" ) elif keyword_cat == "K1": # D2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D2", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # D3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D3", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # D4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D4", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # D5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D5", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Aluminum HTS with both metal and aluminum keywords - ambiguous" ) elif keyword_cat == "K5": # D6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D6", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # D7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D7", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K7": # D8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="D8", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Aluminum HTS with all keywords - ambiguous" ) # H5: Aluminum + Copper if in_aluminum and in_copper and not in_steel: if keyword_cat == "K0": # E1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Aluminum+Copper HTS with no keyword - cannot determine tariff" ) elif keyword_cat == "K1": # E2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E2", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # E3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E3", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # E4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E4", expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # E5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E5", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # E6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E6", expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # E7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E7", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Aluminum+Copper HTS with both aluminum and copper keywords - ambiguous" ) elif keyword_cat == "K7": # E8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="E8", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Aluminum+Copper HTS with all keywords - ambiguous" ) # H6: Steel + Copper if in_steel and in_copper and not in_aluminum: if keyword_cat == "K0": # F1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Copper HTS with no keyword - cannot determine tariff" ) elif keyword_cat == "K1": # F2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F2", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # F3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F3", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # F4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F4", expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # F5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F5", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # F6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F6", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Copper HTS with both metal and copper keywords - ambiguous" ) elif keyword_cat == "K6": # F7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F7", expected_codes=list(COPPER_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K7": # F8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="F8", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Steel+Copper HTS with all keywords - ambiguous" ) # ===================================================================== # LEVEL 3: Single HTS Category # ===================================================================== # H1: Steel Only if in_steel and not in_aluminum and not in_copper: if keyword_cat == "K0": # S1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S1", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K1": # S2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S2", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # S3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S3", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # S4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S4", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | COPPER_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # S5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S5", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # S6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S6", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # S7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S7", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K7": # S8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="S8", expected_codes=list(STEEL_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) # H2: Aluminum Only if in_aluminum and not in_steel and not in_copper: if keyword_cat == "K0": # L1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L1", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K1": # L2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L2", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # L3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L3", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # L4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L4", expected_codes=[MISMATCH_CODE], forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # L5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L5", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # L6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L6", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # L7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L7", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K7": # L8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="L8", expected_codes=list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE}, additional_set=additional_set, **indicators ) # H3: Copper Only # New logic: Copper keyword -> 99037801, No copper keyword -> 99037802 if in_copper and not in_steel and not in_aluminum: if keyword_cat == "K0": # U1: No keyword -> 99037802 (no copper content) return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U1", expected_codes=["99037802"], forbidden_codes={MISMATCH_CODE, "99037801"}, additional_set=additional_set, **indicators ) elif keyword_cat == "K1": # U2: Metal only -> mismatch return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U2", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | COPPER_CODES | {GENERAL_301_CODE}, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # U3: Aluminum only -> 99037802 + Alum 232 return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U3", expected_codes=["99037802"] + list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE, "99037801"}, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # U4: Copper only -> 99037801 (copper content) return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U4", expected_codes=["99037801"], forbidden_codes={MISMATCH_CODE, "99037802"}, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # U5: Metal + Aluminum -> 99037802 + Alum 232 return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U5", expected_codes=["99037802"] + list(ALUMINUM_232_CODES) + [GENERAL_301_CODE], forbidden_codes={MISMATCH_CODE, "99037801"}, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # U6: Metal + Copper -> 99037801 (copper keyword present) return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U6", expected_codes=["99037801"], forbidden_codes={MISMATCH_CODE, "99037802"}, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # U7: Aluminum + Copper -> FLAG ambiguous return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U7", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Copper HTS with both aluminum and copper keywords - ambiguous" ) elif keyword_cat == "K7": # U8: All three -> FLAG ambiguous return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="U8", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators, always_flag=True, flag_reason="Copper HTS with all keywords - ambiguous" ) # ===================================================================== # LEVEL 4: No HTS Match # ===================================================================== # H0: Not in any metal list if keyword_cat == "K0": # N1: No keyword return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N1", expected_codes=[], forbidden_codes=set(), additional_set=additional_set, **indicators ) elif keyword_cat == "K1": # N2: Metal only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N2", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K2": # N3: Aluminum only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N3", expected_codes=[MISMATCH_CODE], forbidden_codes=ALUMINUM_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K3": # N4: Copper only return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N4", expected_codes=[MISMATCH_CODE], forbidden_codes=COPPER_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K4": # N5: Metal + Aluminum return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N5", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | ALUMINUM_232_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K5": # N6: Metal + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N6", expected_codes=[MISMATCH_CODE], forbidden_codes=STEEL_232_CODES | COPPER_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K6": # N7: Aluminum + Copper return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N7", expected_codes=[MISMATCH_CODE], forbidden_codes=ALUMINUM_232_CODES | COPPER_CODES, additional_set=additional_set, **indicators ) elif keyword_cat == "K7": # N8: All three return self._create_result( entry_number, description, primary_hts, additional_hts, scenario_id="N8", expected_codes=[MISMATCH_CODE], forbidden_codes=ALL_232_CODES, additional_set=additional_set, **indicators ) # Fallback - should not reach here return ValidationResult( entry_number=entry_number, description=description, primary_hts=primary_hts, additional_hts=additional_hts, scenario_id="NONE", scenario_summary=SCENARIO_SUMMARIES["NONE"], status="PASS", expected_hts=[], missing_hts=[], unexpected_hts=[], issue="No validation rule matched", in_steel_hts=in_steel, in_aluminum_hts=in_aluminum, in_copper_hts=in_copper, in_computer_hts=in_computer_parts, in_auto_hts=in_auto_parts, in_semiconductor_hts=in_semiconductor, has_metal_keyword=has_metal_kw, has_aluminum_keyword=has_aluminum_kw, has_copper_keyword=has_copper_kw, has_zinc_keyword=has_zinc_kw, has_plastics_keyword=has_plastics_kw ) def validate_dataframe(df, validator: HTSValidator, description_col: str = "Description", tariff_col: str = "Tariff", entry_col: str = "Entry Number", additional_cols: List[str] = None) -> List[ValidationResult]: """Validate all entries in a DataFrame""" if additional_cols is None: additional_cols = ["Primary 1", "Primary 2", "Primary 3", "Primary 4", "Primary 5", "Primary 6"] results = [] for idx, row in df.iterrows(): entry_number = str(row.get(entry_col, f"Row_{idx}")) description = str(row.get(description_col, "")) primary_hts = str(row.get(tariff_col, "")) # Get additional HTS codes additional_hts = [] for col in additional_cols: if col in row and row[col] is not None: val = row[col] if str(val).strip() and str(val).lower() != "nan": additional_hts.append(str(val)) result = validator.validate_entry( entry_number=entry_number, description=description, primary_hts=primary_hts, additional_hts=additional_hts ) results.append(result) return results