""" HTS Checker - Streamlit Application for HTS Tariff Auditing Deployed on Hugging Face Spaces """ import streamlit as st import pandas as pd from io import BytesIO import hashlib import os from hts_validator import HTSValidator, validate_dataframe, SCENARIO_SUMMARIES from HTS_list import Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list, Semiconductor_HTS_list # Path to reviewed combinations CSV file REVIEWED_COMBINATIONS_FILE = "Reviewed_combination.csv" # Page configuration st.set_page_config( page_title="HTS Checker - Tariff Audit Tool", page_icon="", layout="wide" ) # ============================================================================= # Authentication # ============================================================================= def get_app_password(): """Get password from secrets or environment variable.""" # Try Streamlit secrets first (for Hugging Face Spaces) try: return st.secrets["APP_PASSWORD"] except (KeyError, FileNotFoundError): pass # Fall back to environment variable return os.environ.get("HTS_CHECKER_PASSWORD", "") def check_password(): """Returns True if the user has entered the correct password.""" app_password = get_app_password() # If no password is set, allow access (for local development) if not app_password: return True # Initialize session state for authentication if "authenticated" not in st.session_state: st.session_state.authenticated = False # If already authenticated, return True if st.session_state.authenticated: return True # Show login form st.markdown("## HTS Checker - Login Required") st.markdown("Please enter the password to access this application.") with st.form("login_form"): password_input = st.text_input("Password", type="password", key="password_input") submit_button = st.form_submit_button("Login") if submit_button: if password_input == app_password: st.session_state.authenticated = True st.rerun() else: st.error("Incorrect password. Please try again.") return False # Check authentication before showing main app if not check_password(): st.stop() def load_single_excel(file_content): """Load a single Excel file with proper HTS column types""" df = pd.read_excel(BytesIO(file_content), dtype={ "Tariff": str, "Primary 1": str, "Primary 2": str, "Primary 3": str, "Primary 4": str, "Primary 5": str, "Primary 6": str, }) # Clean up HTS columns hts_columns = ["Tariff", "Primary 1", "Primary 2", "Primary 3", "Primary 4", "Primary 5", "Primary 6"] for col in hts_columns: if col in df.columns: df[col] = df[col].astype(str).str.replace(r'\.0$', '', regex=True) df[col] = df[col].replace('nan', '') return df @st.cache_data def load_and_validate_excel(file_contents_list, file_names_list, keywords_hash): """Load multiple Excel files and combine - cached to avoid re-running on filter changes""" all_dfs = [] for file_content in file_contents_list: df = load_single_excel(file_content) all_dfs.append(df) # Concatenate all dataframes combined_df = pd.concat(all_dfs, ignore_index=True) return combined_df @st.cache_data def run_validation(df_hash, _df, _validator): """Run validation - cached based on dataframe hash""" results = validate_dataframe(_df, _validator) return results def get_df_hash(df): """Get hash of dataframe for caching""" return hashlib.md5(pd.util.hash_pandas_object(df).values.tobytes()).hexdigest() def get_keywords_hash(keywords): """Get hash of keywords for cache invalidation""" return hashlib.md5(str(keywords).encode()).hexdigest() def load_reviewed_combinations(): """Load reviewed HTS+Description combinations from CSV file""" reviewed_set = set() csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE) if os.path.exists(csv_path): # Try multiple encodings encodings = ["utf-8", "cp1252", "latin-1", "utf-8-sig"] df = None for encoding in encodings: try: df = pd.read_csv(csv_path, dtype=str, encoding=encoding) break except UnicodeDecodeError: continue except Exception as e: st.warning(f"Could not load reviewed combinations: {e}") return reviewed_set if df is not None and "HTS" in df.columns and "Description" in df.columns: for _, row in df.iterrows(): hts = str(row["HTS"]).strip() if pd.notna(row["HTS"]) else "" desc = str(row["Description"]).strip().upper() if pd.notna(row["Description"]) else "" if hts and desc: reviewed_set.add((hts, desc)) return reviewed_set def is_combination_reviewed(hts, description, reviewed_set): """Check if HTS+Description combination has been reviewed""" hts_str = str(hts).strip() if pd.notna(hts) else "" desc_str = str(description).strip().upper() if pd.notna(description) else "" return (hts_str, desc_str) in reviewed_set # Initialize session state if "keywords" not in st.session_state: st.session_state.keywords = { "metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"], "aluminum": ["aluminum", "aluminium"], "copper": ["copper"], "zinc": ["zinc"], "plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"] } if "export_cache" not in st.session_state: st.session_state.export_cache = [] if "validation_results" not in st.session_state: st.session_state.validation_results = None if "original_df" not in st.session_state: st.session_state.original_df = None def get_validator(): """Create validator with current keyword settings""" return HTSValidator( metal_keywords=st.session_state.keywords["metal"], aluminum_keywords=st.session_state.keywords["aluminum"], copper_keywords=st.session_state.keywords["copper"], zinc_keywords=st.session_state.keywords["zinc"], plastics_keywords=st.session_state.keywords["plastics"] ) def color_status(val): """Color code status column""" if val == "PASS": return "background-color: #90EE90" # Light green elif val == "FAIL": return "background-color: #FFB6C1" # Light red elif val == "FLAG": return "background-color: #FFFFE0" # Light yellow return "" def format_hts(hts_value): """Format HTS value as string, removing .0 suffix""" if not hts_value: return "" s = str(hts_value) # Remove .0 suffix if present (from float conversion) if s.endswith(".0"): s = s[:-2] return s def bool_to_symbol(value: bool) -> str: """Convert boolean to check/cross symbol""" return "Y" if value else "N" def color_indicator(val): """Color Y values with light green background""" if val == "Y": return "background-color: #90EE90" # Light green return "" # Indicator columns for styling INDICATOR_COLUMNS = [ "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW" ] def results_to_dataframe(results): """Convert validation results to DataFrame""" data = [] for r in results: # Format additional HTS as strings additional_hts_str = ", ".join([format_hts(h) for h in r.additional_hts if h]) expected_hts_str = ", ".join([format_hts(h) for h in r.expected_hts if h]) missing_hts_str = ", ".join([format_hts(h) for h in r.missing_hts if h]) unexpected_hts_str = ", ".join([format_hts(h) for h in r.unexpected_hts if h]) data.append({ "Entry Number": r.entry_number, "Description": r.description[:100] + "..." if len(r.description) > 100 else r.description, "Full Description": r.description, "Primary HTS": format_hts(r.primary_hts), "Additional HTS": additional_hts_str, # HTS membership indicators "Steel HTS": bool_to_symbol(r.in_steel_hts), "Alum HTS": bool_to_symbol(r.in_aluminum_hts), "Copper HTS": bool_to_symbol(r.in_copper_hts), "Computer HTS": bool_to_symbol(r.in_computer_hts), "Auto HTS": bool_to_symbol(r.in_auto_hts), "Semi HTS": bool_to_symbol(r.in_semiconductor_hts), # Keyword indicators "Metal KW": bool_to_symbol(r.has_metal_keyword), "Alum KW": bool_to_symbol(r.has_aluminum_keyword), "Copper KW": bool_to_symbol(r.has_copper_keyword), "Zinc KW": bool_to_symbol(r.has_zinc_keyword), "Plastics KW": bool_to_symbol(r.has_plastics_keyword), # Validation results "Scenario": r.scenario_id, "Scenario Summary": r.scenario_summary, "Status": r.status, "Expected HTS": expected_hts_str, "Missing HTS": missing_hts_str, "Unexpected HTS": unexpected_hts_str, "Issue": r.issue }) return pd.DataFrame(data) def export_to_excel(df, results_df=None): """Export DataFrame to Excel with optional validation results""" output = BytesIO() with pd.ExcelWriter(output, engine="openpyxl") as writer: if results_df is not None: # Merge original data with validation results # Use Full Description for export export_df = df.copy() # Add validation columns if len(results_df) == len(export_df): export_df["Scenario ID"] = results_df["Scenario"].values export_df["Scenario Summary"] = results_df["Scenario Summary"].values export_df["Status"] = results_df["Status"].values export_df["Expected HTS"] = results_df["Expected HTS"].values export_df["Missing HTS"] = results_df["Missing HTS"].values export_df["Unexpected HTS"] = results_df["Unexpected HTS"].values export_df["Issue Description"] = results_df["Issue"].values export_df.to_excel(writer, sheet_name="Audit Results", index=False) else: df.to_excel(writer, sheet_name="Export", index=False) output.seek(0) return output # Main app st.title("HTS Checker - Tariff Audit Tool") st.markdown("Audit primary HTS codes against additional tariffs and description keywords") # Create tabs tab1, tab2, tab2b, tab3, tab4, tab5 = st.tabs([ "Upload & Filter", "Validation Results", "Unique Combinations", "Keyword Management", "Export Selection", "HTS Reference" ]) # Tab 1: Upload & Filter with tab1: st.header("Upload Excel Files") uploaded_files = st.file_uploader( "Upload entry report Excel files (multiple allowed)", type=["xlsx", "xls"], accept_multiple_files=True, help="Upload one or more customizable entry reports from NetCHB. Duplicates across files will be removed." ) if uploaded_files: try: # Use cached loading function with multiple files keywords_hash = get_keywords_hash(st.session_state.keywords) file_contents = [f.read() for f in uploaded_files] file_names = [f.name for f in uploaded_files] # Reset file positions for potential re-read for f in uploaded_files: f.seek(0) df = load_and_validate_excel(file_contents, file_names, keywords_hash) st.session_state.original_df = df # Show load summary if len(uploaded_files) > 1: st.success(f"Loaded {len(df)} rows from {len(uploaded_files)} files") else: st.success(f"Loaded {len(df)} rows") # Display column mapping info with st.expander("Column Mapping"): st.markdown(""" **Expected Columns:** - Column E: `Description` - Product description for keyword matching - Column F: `Tariff` - 10-digit Primary HTS code - Columns I-N: `Primary 1-6` - Additional HTS codes """) st.write("**Detected columns:**", df.columns.tolist()) # Filter controls st.subheader("Filter Options") col1, col2 = st.columns(2) with col1: hts_filter = st.text_input( "Filter by Primary HTS (partial match)", placeholder="e.g., 7301 or 730120", help="Enter partial HTS to filter entries" ) with col2: desc_exclude = st.text_input( "Exclude by description keyword", placeholder="e.g., polyester", help="Exclude entries containing this keyword in description" ) # Apply filters filtered_df = df.copy() if hts_filter: tariff_col = "Tariff" if "Tariff" in df.columns else df.columns[5] filtered_df = filtered_df[ filtered_df[tariff_col].astype(str).str.contains(hts_filter, na=False) ] if desc_exclude: desc_col = "Description" if "Description" in df.columns else df.columns[4] filtered_df = filtered_df[ ~filtered_df[desc_col].astype(str).str.lower().str.contains( desc_exclude.lower(), na=False ) ] st.write(f"**{len(filtered_df)} of {len(df)} entries after filters**") if len(filtered_df) > 0: # Manual validation button file_names_key = ",".join(sorted(file_names)) # Check if validation already done for these files validation_done = ( "cached_full_results" in st.session_state and st.session_state.get("cached_file_names") == file_names_key ) if validation_done: # Filter cached results based on current filters full_results_df = st.session_state.cached_full_results filtered_indices = filtered_df.index.tolist() filtered_results_df = full_results_df.iloc[filtered_indices].copy() st.session_state.validation_results = filtered_results_df st.session_state.filtered_df = filtered_df st.success(f"Validated {len(filtered_df)} entries. Go to 'Validation Results' tab to review.") else: st.session_state.filtered_df = filtered_df if st.button("Validate", type="primary"): with st.spinner("Validating all entries..."): validator = get_validator() full_results = validate_dataframe(df, validator) full_results_df = results_to_dataframe(full_results) st.session_state.cached_full_results = full_results_df st.session_state.cached_file_names = file_names_key filtered_indices = filtered_df.index.tolist() filtered_results_df = full_results_df.iloc[filtered_indices].copy() st.session_state.validation_results = filtered_results_df st.rerun() except Exception as e: st.error(f"Error loading file: {str(e)}") # Tab 2: Validation Results with tab2: st.header("Validation Results") if st.session_state.validation_results is None: st.info("Upload a file and run validation first.") else: # Results are already a DataFrame now (cached) results_df = st.session_state.validation_results.copy() # Summary statistics col1, col2, col3, col4 = st.columns(4) with col1: pass_count = len(results_df[results_df["Status"] == "PASS"]) st.metric("PASS", pass_count) with col2: fail_count = len(results_df[results_df["Status"] == "FAIL"]) st.metric("FAIL", fail_count) with col3: flag_count = len(results_df[results_df["Status"] == "FLAG"]) st.metric("FLAG", flag_count) with col4: none_count = len(results_df[results_df["Scenario"] == "NONE"]) st.metric("No Match", none_count) # Filter by status st.subheader("Filter Results") col1, col2 = st.columns(2) with col1: status_filter = st.multiselect( "Filter by Status", options=["PASS", "FAIL", "FLAG"], default=["FAIL", "FLAG"] ) with col2: scenario_filter = st.multiselect( "Filter by Scenario", options=list(SCENARIO_SUMMARIES.keys()), default=[] ) # Apply filters display_df = results_df.copy() if status_filter: display_df = display_df[display_df["Status"].isin(status_filter)] if scenario_filter: display_df = display_df[display_df["Scenario"].isin(scenario_filter)] # Exclude "NONE" scenario by default show_none = st.checkbox("Show 'No Match' entries", value=False) if not show_none: display_df = display_df[display_df["Scenario"] != "NONE"] st.write(f"**Showing {len(display_df)} results**") # Display results table if len(display_df) > 0: # Select columns to display display_columns = [ "Entry Number", "Description", "Primary HTS", "Additional HTS", # HTS indicators "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", # Keyword indicators "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW", # Validation "Scenario", "Status", "Issue" ] # Interactive filtering section st.markdown("**Interactive Filters:**") filter_col1, filter_col2, filter_col3 = st.columns(3) with filter_col1: hts_search = st.text_input( "Filter by Primary HTS", placeholder="e.g., 7301 or 8302", key="results_hts_filter" ) with filter_col2: desc_search = st.text_input( "Filter by Description", placeholder="e.g., steel, aluminum", key="results_desc_filter" ) with filter_col3: additional_hts_search = st.text_input( "Filter by Additional HTS", placeholder="e.g., 99038191", key="results_additional_filter" ) # Apply interactive filters interactive_df = display_df.copy() if hts_search: interactive_df = interactive_df[ interactive_df["Primary HTS"].astype(str).str.contains(hts_search, case=False, na=False) ] if desc_search: interactive_df = interactive_df[ interactive_df["Description"].astype(str).str.contains(desc_search, case=False, na=False) ] if additional_hts_search: interactive_df = interactive_df[ interactive_df["Additional HTS"].astype(str).str.contains(additional_hts_search, case=False, na=False) ] st.write(f"**Filtered: {len(interactive_df)} of {len(display_df)} results**") # Store interactive filtered df for export st.session_state.interactive_filtered_df = interactive_df # Get indicator columns that exist in display_columns indicator_cols_in_df = [c for c in INDICATOR_COLUMNS if c in display_columns] styled_df = interactive_df[display_columns].style.applymap( color_status, subset=["Status"] ).applymap( color_indicator, subset=indicator_cols_in_df ) st.dataframe(styled_df, use_container_width=True, height=400) # Scenario legend with st.expander("Scenario Legend"): for scenario_id, summary in SCENARIO_SUMMARIES.items(): st.write(f"**{scenario_id}**: {summary}") # Bulk Export Actions st.subheader("Add to Export Cache") st.markdown("Use bulk actions to add **currently filtered** results to export cache") col1, col2, col3 = st.columns(3) with col1: if st.button("Add ALL Filtered to Cache", type="primary"): added_count = 0 for _, row in interactive_df.iterrows(): row_dict = row.to_dict() # Check if not already in cache (by Entry + HTS + Description for uniqueness) key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} entries to cache ({len(st.session_state.export_cache)} total)") with col2: if st.button("Add FAIL Only to Cache"): fail_df = interactive_df[interactive_df["Status"] == "FAIL"] added_count = 0 for _, row in fail_df.iterrows(): row_dict = row.to_dict() key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} FAIL entries to cache") with col3: if st.button("Add FLAG Only to Cache"): flag_df = interactive_df[interactive_df["Status"] == "FLAG"] added_count = 0 for _, row in flag_df.iterrows(): row_dict = row.to_dict() key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} FLAG entries to cache") # Add by scenario st.markdown("**Add by Scenario (from filtered results):**") scenario_cols = st.columns(4) available_scenarios = interactive_df["Scenario"].unique().tolist() for idx, scenario in enumerate(available_scenarios[:8]): # Limit to 8 buttons col_idx = idx % 4 with scenario_cols[col_idx]: scenario_count = len(interactive_df[interactive_df["Scenario"] == scenario]) if st.button(f"{scenario} ({scenario_count})", key=f"add_scenario_{scenario}"): scenario_df = interactive_df[interactive_df["Scenario"] == scenario] added_count = 0 for _, row in scenario_df.iterrows(): row_dict = row.to_dict() key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} {scenario} entries to cache") # Show cache status st.info(f"Current cache: {len(st.session_state.export_cache)} entries. Go to 'Export Selection' tab to download.") # Tab 2b: Unique Combinations with tab2b: st.header("Unique HTS + Description Combinations") st.markdown("View unique combinations to avoid reviewing duplicates") if st.session_state.validation_results is None: st.info("Upload a file and run validation first.") else: results_df = st.session_state.validation_results.copy() # Load reviewed combinations reviewed_combinations = load_reviewed_combinations() reviewed_count = len(reviewed_combinations) # Filter by status first st.subheader("Filter Options") # Reviewed combinations filter filter_reviewed = st.checkbox( f"Hide reviewed combinations ({reviewed_count} in list)", value=True, key="filter_reviewed_combinations", help="Filter out HTS+Description combinations that have already been reviewed" ) # Show reviewed combinations info if reviewed_count > 0: with st.expander(f"View {reviewed_count} reviewed combinations"): csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE) st.caption(f"File: {csv_path}") try: # Try multiple encodings reviewed_df = None for enc in ["utf-8", "cp1252", "latin-1", "utf-8-sig"]: try: reviewed_df = pd.read_csv(csv_path, dtype=str, encoding=enc) break except UnicodeDecodeError: continue if reviewed_df is not None: st.dataframe(reviewed_df, use_container_width=True, height=200) else: st.error("Could not decode CSV file with any supported encoding") except Exception as e: st.error(f"Error loading file: {e}") else: st.info(f"No reviewed combinations found. Add HTS,Description rows to '{REVIEWED_COMBINATIONS_FILE}' to filter them out.") col1, col2 = st.columns(2) with col1: unique_status_filter = st.multiselect( "Filter by Status", options=["PASS", "FAIL", "FLAG"], default=["FAIL", "FLAG"], key="unique_status_filter" ) with col2: unique_scenario_filter = st.multiselect( "Filter by Scenario", options=list(SCENARIO_SUMMARIES.keys()), default=[], key="unique_scenario_filter" ) # Apply filters filtered_df = results_df.copy() if unique_status_filter: filtered_df = filtered_df[filtered_df["Status"].isin(unique_status_filter)] if unique_scenario_filter: filtered_df = filtered_df[filtered_df["Scenario"].isin(unique_scenario_filter)] # Exclude NONE by default show_none_unique = st.checkbox("Show 'No Match' entries", value=False, key="show_none_unique") if not show_none_unique: filtered_df = filtered_df[filtered_df["Scenario"] != "NONE"] if len(filtered_df) > 0: # Group by Primary HTS + Description (use Full Description for grouping) # Aggregate to get unique combinations unique_df = filtered_df.groupby( ["Primary HTS", "Full Description"], as_index=False ).agg({ "Entry Number": "count", # Count occurrences "Additional HTS": "first", # Take first (should be same for same HTS+desc) # HTS indicators "Steel HTS": "first", "Alum HTS": "first", "Copper HTS": "first", "Computer HTS": "first", "Auto HTS": "first", "Semi HTS": "first", # Keyword indicators "Metal KW": "first", "Alum KW": "first", "Copper KW": "first", "Zinc KW": "first", "Plastics KW": "first", # Validation "Scenario": "first", "Scenario Summary": "first", "Status": "first", "Expected HTS": "first", "Missing HTS": "first", "Unexpected HTS": "first", "Issue": "first" }).rename(columns={"Entry Number": "Count"}) # Sort by count descending to show most common first unique_df = unique_df.sort_values("Count", ascending=False).reset_index(drop=True) # Filter out reviewed combinations if checkbox is checked if filter_reviewed and reviewed_count > 0: # Mark which combinations are reviewed unique_df["_is_reviewed"] = unique_df.apply( lambda row: is_combination_reviewed( row["Primary HTS"], row["Full Description"], reviewed_combinations ), axis=1 ) reviewed_in_data = unique_df["_is_reviewed"].sum() unique_df = unique_df[~unique_df["_is_reviewed"]].drop(columns=["_is_reviewed"]) unique_df = unique_df.reset_index(drop=True) # Re-index starting from 1 unique_df.index = unique_df.index + 1 # Create shorter description for display unique_df["Description"] = unique_df["Full Description"].apply( lambda x: x[:80] + "..." if len(str(x)) > 80 else x ) # Show count info if filter_reviewed and reviewed_count > 0: st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries, {reviewed_in_data} reviewed combinations hidden)") else: st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries)") # Interactive filters for unique view st.markdown("**Search Filters:**") ucol1, ucol2 = st.columns(2) with ucol1: unique_hts_search = st.text_input( "Filter by Primary HTS", placeholder="e.g., 7301 or 8302", key="unique_hts_search" ) with ucol2: unique_desc_search = st.text_input( "Filter by Description", placeholder="e.g., steel, aluminum", key="unique_desc_search" ) # Apply search filters display_unique_df = unique_df.copy() if unique_hts_search: display_unique_df = display_unique_df[ display_unique_df["Primary HTS"].astype(str).str.contains(unique_hts_search, case=False, na=False) ] if unique_desc_search: display_unique_df = display_unique_df[ display_unique_df["Description"].astype(str).str.contains(unique_desc_search, case=False, na=False) ] # Re-index after filtering display_unique_df = display_unique_df.reset_index(drop=True) display_unique_df.index = display_unique_df.index + 1 st.write(f"**Showing {len(display_unique_df)} unique combinations**") # Display columns display_cols = [ "Primary HTS", "Description", "Additional HTS", # HTS indicators "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", # Keyword indicators "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW", # Validation "Scenario", "Status", "Count", "Issue" ] # Get indicator columns that exist in display_cols indicator_cols_unique = [c for c in INDICATOR_COLUMNS if c in display_cols] styled_unique = display_unique_df[display_cols].style.applymap( color_status, subset=["Status"] ).applymap( color_indicator, subset=indicator_cols_unique ) st.dataframe(styled_unique, use_container_width=True, height=400) # Bulk export for unique combinations st.subheader("Add Unique Combinations to Cache") col1, col2 = st.columns(2) with col1: if st.button("Add ALL Unique to Cache", type="primary", key="add_all_unique"): added_count = 0 for _, row in display_unique_df.iterrows(): row_dict = row.to_dict() key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", "")) existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} unique combinations to cache") with col2: if st.button("Add FAIL/FLAG Unique to Cache", key="add_fail_flag_unique"): fail_flag_df = display_unique_df[display_unique_df["Status"].isin(["FAIL", "FLAG"])] added_count = 0 for _, row in fail_flag_df.iterrows(): row_dict = row.to_dict() key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", "")) existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", "")) for d in st.session_state.export_cache] if key not in existing_keys: st.session_state.export_cache.append(row_dict) added_count += 1 st.success(f"Added {added_count} FAIL/FLAG combinations to cache") st.info(f"Current cache: {len(st.session_state.export_cache)} entries") else: st.info("No results matching the selected filters.") # Tab 3: Keyword Management with tab3: st.header("Keyword Management") st.markdown("Edit keyword lists used for validation. Changes apply immediately.") col1, col2 = st.columns(2) with col1: st.subheader("Metal Keywords") metal_text = st.text_area( "Metal keywords (one per line)", value="\n".join(st.session_state.keywords["metal"]), height=150, key="metal_input" ) st.subheader("Aluminum Keywords") aluminum_text = st.text_area( "Aluminum keywords (one per line)", value="\n".join(st.session_state.keywords["aluminum"]), height=100, key="aluminum_input" ) st.subheader("Copper Keywords") copper_text = st.text_area( "Copper keywords (one per line)", value="\n".join(st.session_state.keywords["copper"]), height=100, key="copper_input" ) with col2: st.subheader("Zinc Keywords") zinc_text = st.text_area( "Zinc keywords (one per line)", value="\n".join(st.session_state.keywords["zinc"]), height=100, key="zinc_input" ) st.subheader("Plastics Keywords") plastics_text = st.text_area( "Plastics keywords (one per line)", value="\n".join(st.session_state.keywords["plastics"]), height=150, key="plastics_input" ) col1, col2 = st.columns(2) with col1: if st.button("Save Keywords", type="primary"): st.session_state.keywords["metal"] = [ k.strip() for k in metal_text.split("\n") if k.strip() ] st.session_state.keywords["aluminum"] = [ k.strip() for k in aluminum_text.split("\n") if k.strip() ] st.session_state.keywords["copper"] = [ k.strip() for k in copper_text.split("\n") if k.strip() ] st.session_state.keywords["zinc"] = [ k.strip() for k in zinc_text.split("\n") if k.strip() ] st.session_state.keywords["plastics"] = [ k.strip() for k in plastics_text.split("\n") if k.strip() ] # Clear cached results to force re-validation if "cached_full_results" in st.session_state: del st.session_state.cached_full_results if "cached_file_names" in st.session_state: del st.session_state.cached_file_names st.success("Keywords saved! Re-upload file or refresh to apply changes.") with col2: if st.button("Reset to Defaults"): st.session_state.keywords = { "metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"], "aluminum": ["aluminum", "aluminium"], "copper": ["copper"], "zinc": ["zinc"], "plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"] } # Clear cached results if "cached_full_results" in st.session_state: del st.session_state.cached_full_results if "cached_file_names" in st.session_state: del st.session_state.cached_file_names st.success("Keywords reset to defaults!") st.rerun() # Tab 4: Export Selection with tab4: st.header("Export Selection") if len(st.session_state.export_cache) == 0: st.info("No entries in export cache. Select entries from Validation Results tab.") else: st.write(f"**{len(st.session_state.export_cache)} entries in cache**") # Display cache contents cache_df = pd.DataFrame(st.session_state.export_cache) st.dataframe(cache_df, use_container_width=True) col1, col2, col3 = st.columns(3) with col1: if st.button("Clear Cache"): st.session_state.export_cache = [] st.success("Cache cleared!") st.rerun() with col2: # Export cached entries only if st.button("Export Cache to Excel"): excel_data = export_to_excel(cache_df) st.download_button( label="Download Excel (Cache Only)", data=excel_data, file_name="hts_audit_cache.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) # Export full results with original data st.subheader("Export Full Results") if st.session_state.validation_results is not None and st.session_state.original_df is not None: # validation_results is already a DataFrame now results_df = st.session_state.validation_results.copy() # Status filter for export export_status = st.multiselect( "Export entries with status:", options=["PASS", "FAIL", "FLAG"], default=["FAIL", "FLAG"], key="export_status_filter" ) # Create filtered export if export_status: filtered_results = results_df[results_df["Status"].isin(export_status)] filtered_indices = filtered_results.index.tolist() if hasattr(st.session_state, "filtered_df"): export_original = st.session_state.filtered_df.iloc[filtered_indices].copy() else: export_original = st.session_state.original_df.iloc[filtered_indices].copy() st.write(f"**{len(filtered_results)} entries will be exported**") if st.button("Generate Full Export", type="primary"): excel_data = export_to_excel(export_original, filtered_results) st.download_button( label="Download Full Excel Report", data=excel_data, file_name="hts_audit_full_report.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) else: st.info("Run validation first to enable full export.") # Tab 5: HTS Reference with tab5: st.header("HTS Reference Lists") st.markdown("Reference lists of Steel, Aluminum, Copper, and Semiconductor HTS codes used for validation") # Search filter hts_search = st.text_input( "Search HTS code", placeholder="Enter HTS to search across all lists", key="hts_reference_search" ) col1, col2, col3, col4 = st.columns(4) with col1: st.subheader(f"Steel HTS ({len(Steel_primary_HTS_list)})") steel_list = [str(h) for h in Steel_primary_HTS_list] if hts_search: steel_list = [h for h in steel_list if hts_search in h] steel_df = pd.DataFrame({"Steel HTS": steel_list}) st.dataframe(steel_df, use_container_width=True, height=400) with col2: st.subheader(f"Aluminum HTS ({len(Aluminum_primary_HTS_list)})") aluminum_list = [str(h) for h in Aluminum_primary_HTS_list] if hts_search: aluminum_list = [h for h in aluminum_list if hts_search in h] aluminum_df = pd.DataFrame({"Aluminum HTS": aluminum_list}) st.dataframe(aluminum_df, use_container_width=True, height=400) with col3: st.subheader(f"Copper HTS ({len(Copper_primary_HTS_list)})") copper_list = [str(h) for h in Copper_primary_HTS_list] if hts_search: copper_list = [h for h in copper_list if hts_search in h] copper_df = pd.DataFrame({"Copper HTS": copper_list}) st.dataframe(copper_df, use_container_width=True, height=400) with col4: st.subheader(f"Semiconductor HTS ({len(Semiconductor_HTS_list)})") semi_list = [str(h) for h in Semiconductor_HTS_list] if hts_search: semi_list = [h for h in semi_list if hts_search in h] semi_df = pd.DataFrame({"Semiconductor HTS": semi_list}) st.dataframe(semi_df, use_container_width=True, height=400) st.caption("Note: Overlaps with Computer Parts and Aluminum HTS") # Show overlap info st.subheader("HTS Overlap Analysis") steel_set = set(str(h) for h in Steel_primary_HTS_list) aluminum_set = set(str(h) for h in Aluminum_primary_HTS_list) copper_set = set(str(h) for h in Copper_primary_HTS_list) steel_aluminum = steel_set & aluminum_set aluminum_copper = aluminum_set & copper_set steel_copper = steel_set & copper_set col1, col2, col3 = st.columns(3) with col1: st.metric("Steel & Aluminum Overlap", len(steel_aluminum)) if steel_aluminum: with st.expander("View overlapping HTS"): st.write(sorted(steel_aluminum)) with col2: st.metric("Aluminum & Copper Overlap", len(aluminum_copper)) if aluminum_copper: with st.expander("View overlapping HTS"): st.write(sorted(aluminum_copper)) with col3: st.metric("Steel & Copper Overlap", len(steel_copper)) if steel_copper: with st.expander("View overlapping HTS"): st.write(sorted(steel_copper)) # Footer st.markdown("---") st.markdown("HTS Checker v1.0 - Tariff Audit Tool")