Spaces:
Sleeping
Sleeping
| """ | |
| HTS Checker - Streamlit Application for HTS Tariff Auditing | |
| Deployed on Hugging Face Spaces | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| from io import BytesIO | |
| import hashlib | |
| import os | |
| from hts_validator import HTSValidator, validate_dataframe, SCENARIO_SUMMARIES | |
| from HTS_list import Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list, Semiconductor_HTS_list | |
| # Path to reviewed combinations CSV file | |
| REVIEWED_COMBINATIONS_FILE = "Reviewed_combination.csv" | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="HTS Checker - Tariff Audit Tool", | |
| page_icon="", | |
| layout="wide" | |
| ) | |
| # ============================================================================= | |
| # Authentication | |
| # ============================================================================= | |
| def get_app_password(): | |
| """Get password from secrets or environment variable.""" | |
| # Try Streamlit secrets first (for Hugging Face Spaces) | |
| try: | |
| return st.secrets["APP_PASSWORD"] | |
| except (KeyError, FileNotFoundError): | |
| pass | |
| # Fall back to environment variable | |
| return os.environ.get("HTS_CHECKER_PASSWORD", "") | |
| def check_password(): | |
| """Returns True if the user has entered the correct password.""" | |
| app_password = get_app_password() | |
| # If no password is set, allow access (for local development) | |
| if not app_password: | |
| return True | |
| # Initialize session state for authentication | |
| if "authenticated" not in st.session_state: | |
| st.session_state.authenticated = False | |
| # If already authenticated, return True | |
| if st.session_state.authenticated: | |
| return True | |
| # Show login form | |
| st.markdown("## HTS Checker - Login Required") | |
| st.markdown("Please enter the password to access this application.") | |
| with st.form("login_form"): | |
| password_input = st.text_input("Password", type="password", key="password_input") | |
| submit_button = st.form_submit_button("Login") | |
| if submit_button: | |
| if password_input == app_password: | |
| st.session_state.authenticated = True | |
| st.rerun() | |
| else: | |
| st.error("Incorrect password. Please try again.") | |
| return False | |
| # Check authentication before showing main app | |
| if not check_password(): | |
| st.stop() | |
| def load_single_excel(file_content): | |
| """Load a single Excel file with proper HTS column types""" | |
| df = pd.read_excel(BytesIO(file_content), dtype={ | |
| "Tariff": str, | |
| "Primary 1": str, | |
| "Primary 2": str, | |
| "Primary 3": str, | |
| "Primary 4": str, | |
| "Primary 5": str, | |
| "Primary 6": str, | |
| }) | |
| # Clean up HTS columns | |
| hts_columns = ["Tariff", "Primary 1", "Primary 2", "Primary 3", | |
| "Primary 4", "Primary 5", "Primary 6"] | |
| for col in hts_columns: | |
| if col in df.columns: | |
| df[col] = df[col].astype(str).str.replace(r'\.0$', '', regex=True) | |
| df[col] = df[col].replace('nan', '') | |
| return df | |
| def load_and_validate_excel(file_contents_list, file_names_list, keywords_hash): | |
| """Load multiple Excel files and combine - cached to avoid re-running on filter changes""" | |
| all_dfs = [] | |
| for file_content in file_contents_list: | |
| df = load_single_excel(file_content) | |
| all_dfs.append(df) | |
| # Concatenate all dataframes | |
| combined_df = pd.concat(all_dfs, ignore_index=True) | |
| return combined_df | |
| def run_validation(df_hash, _df, _validator): | |
| """Run validation - cached based on dataframe hash""" | |
| results = validate_dataframe(_df, _validator) | |
| return results | |
| def get_df_hash(df): | |
| """Get hash of dataframe for caching""" | |
| return hashlib.md5(pd.util.hash_pandas_object(df).values.tobytes()).hexdigest() | |
| def get_keywords_hash(keywords): | |
| """Get hash of keywords for cache invalidation""" | |
| return hashlib.md5(str(keywords).encode()).hexdigest() | |
| def load_reviewed_combinations(): | |
| """Load reviewed HTS+Description combinations from CSV file""" | |
| reviewed_set = set() | |
| csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE) | |
| if os.path.exists(csv_path): | |
| # Try multiple encodings | |
| encodings = ["utf-8", "cp1252", "latin-1", "utf-8-sig"] | |
| df = None | |
| for encoding in encodings: | |
| try: | |
| df = pd.read_csv(csv_path, dtype=str, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| except Exception as e: | |
| st.warning(f"Could not load reviewed combinations: {e}") | |
| return reviewed_set | |
| if df is not None and "HTS" in df.columns and "Description" in df.columns: | |
| for _, row in df.iterrows(): | |
| hts = str(row["HTS"]).strip() if pd.notna(row["HTS"]) else "" | |
| desc = str(row["Description"]).strip().upper() if pd.notna(row["Description"]) else "" | |
| if hts and desc: | |
| reviewed_set.add((hts, desc)) | |
| return reviewed_set | |
| def is_combination_reviewed(hts, description, reviewed_set): | |
| """Check if HTS+Description combination has been reviewed""" | |
| hts_str = str(hts).strip() if pd.notna(hts) else "" | |
| desc_str = str(description).strip().upper() if pd.notna(description) else "" | |
| return (hts_str, desc_str) in reviewed_set | |
| # Initialize session state | |
| if "keywords" not in st.session_state: | |
| st.session_state.keywords = { | |
| "metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"], | |
| "aluminum": ["aluminum", "aluminium"], | |
| "copper": ["copper"], | |
| "zinc": ["zinc"], | |
| "plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"] | |
| } | |
| if "export_cache" not in st.session_state: | |
| st.session_state.export_cache = [] | |
| if "validation_results" not in st.session_state: | |
| st.session_state.validation_results = None | |
| if "original_df" not in st.session_state: | |
| st.session_state.original_df = None | |
| def get_validator(): | |
| """Create validator with current keyword settings""" | |
| return HTSValidator( | |
| metal_keywords=st.session_state.keywords["metal"], | |
| aluminum_keywords=st.session_state.keywords["aluminum"], | |
| copper_keywords=st.session_state.keywords["copper"], | |
| zinc_keywords=st.session_state.keywords["zinc"], | |
| plastics_keywords=st.session_state.keywords["plastics"] | |
| ) | |
| def color_status(val): | |
| """Color code status column""" | |
| if val == "PASS": | |
| return "background-color: #90EE90" # Light green | |
| elif val == "FAIL": | |
| return "background-color: #FFB6C1" # Light red | |
| elif val == "FLAG": | |
| return "background-color: #FFFFE0" # Light yellow | |
| return "" | |
| def format_hts(hts_value): | |
| """Format HTS value as string, removing .0 suffix""" | |
| if not hts_value: | |
| return "" | |
| s = str(hts_value) | |
| # Remove .0 suffix if present (from float conversion) | |
| if s.endswith(".0"): | |
| s = s[:-2] | |
| return s | |
| def bool_to_symbol(value: bool) -> str: | |
| """Convert boolean to check/cross symbol""" | |
| return "Y" if value else "N" | |
| def color_indicator(val): | |
| """Color Y values with light green background""" | |
| if val == "Y": | |
| return "background-color: #90EE90" # Light green | |
| return "" | |
| # Indicator columns for styling | |
| INDICATOR_COLUMNS = [ | |
| "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", | |
| "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW" | |
| ] | |
| def results_to_dataframe(results): | |
| """Convert validation results to DataFrame""" | |
| data = [] | |
| for r in results: | |
| # Format additional HTS as strings | |
| additional_hts_str = ", ".join([format_hts(h) for h in r.additional_hts if h]) | |
| expected_hts_str = ", ".join([format_hts(h) for h in r.expected_hts if h]) | |
| missing_hts_str = ", ".join([format_hts(h) for h in r.missing_hts if h]) | |
| unexpected_hts_str = ", ".join([format_hts(h) for h in r.unexpected_hts if h]) | |
| data.append({ | |
| "Entry Number": r.entry_number, | |
| "Description": r.description[:100] + "..." if len(r.description) > 100 else r.description, | |
| "Full Description": r.description, | |
| "Primary HTS": format_hts(r.primary_hts), | |
| "Additional HTS": additional_hts_str, | |
| # HTS membership indicators | |
| "Steel HTS": bool_to_symbol(r.in_steel_hts), | |
| "Alum HTS": bool_to_symbol(r.in_aluminum_hts), | |
| "Copper HTS": bool_to_symbol(r.in_copper_hts), | |
| "Computer HTS": bool_to_symbol(r.in_computer_hts), | |
| "Auto HTS": bool_to_symbol(r.in_auto_hts), | |
| "Semi HTS": bool_to_symbol(r.in_semiconductor_hts), | |
| # Keyword indicators | |
| "Metal KW": bool_to_symbol(r.has_metal_keyword), | |
| "Alum KW": bool_to_symbol(r.has_aluminum_keyword), | |
| "Copper KW": bool_to_symbol(r.has_copper_keyword), | |
| "Zinc KW": bool_to_symbol(r.has_zinc_keyword), | |
| "Plastics KW": bool_to_symbol(r.has_plastics_keyword), | |
| # Validation results | |
| "Scenario": r.scenario_id, | |
| "Scenario Summary": r.scenario_summary, | |
| "Status": r.status, | |
| "Expected HTS": expected_hts_str, | |
| "Missing HTS": missing_hts_str, | |
| "Unexpected HTS": unexpected_hts_str, | |
| "Issue": r.issue | |
| }) | |
| return pd.DataFrame(data) | |
| def export_to_excel(df, results_df=None): | |
| """Export DataFrame to Excel with optional validation results""" | |
| output = BytesIO() | |
| with pd.ExcelWriter(output, engine="openpyxl") as writer: | |
| if results_df is not None: | |
| # Merge original data with validation results | |
| # Use Full Description for export | |
| export_df = df.copy() | |
| # Add validation columns | |
| if len(results_df) == len(export_df): | |
| export_df["Scenario ID"] = results_df["Scenario"].values | |
| export_df["Scenario Summary"] = results_df["Scenario Summary"].values | |
| export_df["Status"] = results_df["Status"].values | |
| export_df["Expected HTS"] = results_df["Expected HTS"].values | |
| export_df["Missing HTS"] = results_df["Missing HTS"].values | |
| export_df["Unexpected HTS"] = results_df["Unexpected HTS"].values | |
| export_df["Issue Description"] = results_df["Issue"].values | |
| export_df.to_excel(writer, sheet_name="Audit Results", index=False) | |
| else: | |
| df.to_excel(writer, sheet_name="Export", index=False) | |
| output.seek(0) | |
| return output | |
| # Main app | |
| st.title("HTS Checker - Tariff Audit Tool") | |
| st.markdown("Audit primary HTS codes against additional tariffs and description keywords") | |
| # Create tabs | |
| tab1, tab2, tab2b, tab3, tab4, tab5 = st.tabs([ | |
| "Upload & Filter", | |
| "Validation Results", | |
| "Unique Combinations", | |
| "Keyword Management", | |
| "Export Selection", | |
| "HTS Reference" | |
| ]) | |
| # Tab 1: Upload & Filter | |
| with tab1: | |
| st.header("Upload Excel Files") | |
| uploaded_files = st.file_uploader( | |
| "Upload entry report Excel files (multiple allowed)", | |
| type=["xlsx", "xls"], | |
| accept_multiple_files=True, | |
| help="Upload one or more customizable entry reports from NetCHB. Duplicates across files will be removed." | |
| ) | |
| if uploaded_files: | |
| try: | |
| # Use cached loading function with multiple files | |
| keywords_hash = get_keywords_hash(st.session_state.keywords) | |
| file_contents = [f.read() for f in uploaded_files] | |
| file_names = [f.name for f in uploaded_files] | |
| # Reset file positions for potential re-read | |
| for f in uploaded_files: | |
| f.seek(0) | |
| df = load_and_validate_excel(file_contents, file_names, keywords_hash) | |
| st.session_state.original_df = df | |
| # Show load summary | |
| if len(uploaded_files) > 1: | |
| st.success(f"Loaded {len(df)} rows from {len(uploaded_files)} files") | |
| else: | |
| st.success(f"Loaded {len(df)} rows") | |
| # Display column mapping info | |
| with st.expander("Column Mapping"): | |
| st.markdown(""" | |
| **Expected Columns:** | |
| - Column E: `Description` - Product description for keyword matching | |
| - Column F: `Tariff` - 10-digit Primary HTS code | |
| - Columns I-N: `Primary 1-6` - Additional HTS codes | |
| """) | |
| st.write("**Detected columns:**", df.columns.tolist()) | |
| # Filter controls | |
| st.subheader("Filter Options") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| hts_filter = st.text_input( | |
| "Filter by Primary HTS (partial match)", | |
| placeholder="e.g., 7301 or 730120", | |
| help="Enter partial HTS to filter entries" | |
| ) | |
| with col2: | |
| desc_exclude = st.text_input( | |
| "Exclude by description keyword", | |
| placeholder="e.g., polyester", | |
| help="Exclude entries containing this keyword in description" | |
| ) | |
| # Apply filters | |
| filtered_df = df.copy() | |
| if hts_filter: | |
| tariff_col = "Tariff" if "Tariff" in df.columns else df.columns[5] | |
| filtered_df = filtered_df[ | |
| filtered_df[tariff_col].astype(str).str.contains(hts_filter, na=False) | |
| ] | |
| if desc_exclude: | |
| desc_col = "Description" if "Description" in df.columns else df.columns[4] | |
| filtered_df = filtered_df[ | |
| ~filtered_df[desc_col].astype(str).str.lower().str.contains( | |
| desc_exclude.lower(), na=False | |
| ) | |
| ] | |
| st.write(f"**{len(filtered_df)} of {len(df)} entries after filters**") | |
| if len(filtered_df) > 0: | |
| # Manual validation button | |
| file_names_key = ",".join(sorted(file_names)) | |
| # Check if validation already done for these files | |
| validation_done = ( | |
| "cached_full_results" in st.session_state and | |
| st.session_state.get("cached_file_names") == file_names_key | |
| ) | |
| if validation_done: | |
| # Filter cached results based on current filters | |
| full_results_df = st.session_state.cached_full_results | |
| filtered_indices = filtered_df.index.tolist() | |
| filtered_results_df = full_results_df.iloc[filtered_indices].copy() | |
| st.session_state.validation_results = filtered_results_df | |
| st.session_state.filtered_df = filtered_df | |
| st.success(f"Validated {len(filtered_df)} entries. Go to 'Validation Results' tab to review.") | |
| else: | |
| st.session_state.filtered_df = filtered_df | |
| if st.button("Validate", type="primary"): | |
| with st.spinner("Validating all entries..."): | |
| validator = get_validator() | |
| full_results = validate_dataframe(df, validator) | |
| full_results_df = results_to_dataframe(full_results) | |
| st.session_state.cached_full_results = full_results_df | |
| st.session_state.cached_file_names = file_names_key | |
| filtered_indices = filtered_df.index.tolist() | |
| filtered_results_df = full_results_df.iloc[filtered_indices].copy() | |
| st.session_state.validation_results = filtered_results_df | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error loading file: {str(e)}") | |
| # Tab 2: Validation Results | |
| with tab2: | |
| st.header("Validation Results") | |
| if st.session_state.validation_results is None: | |
| st.info("Upload a file and run validation first.") | |
| else: | |
| # Results are already a DataFrame now (cached) | |
| results_df = st.session_state.validation_results.copy() | |
| # Summary statistics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| pass_count = len(results_df[results_df["Status"] == "PASS"]) | |
| st.metric("PASS", pass_count) | |
| with col2: | |
| fail_count = len(results_df[results_df["Status"] == "FAIL"]) | |
| st.metric("FAIL", fail_count) | |
| with col3: | |
| flag_count = len(results_df[results_df["Status"] == "FLAG"]) | |
| st.metric("FLAG", flag_count) | |
| with col4: | |
| none_count = len(results_df[results_df["Scenario"] == "NONE"]) | |
| st.metric("No Match", none_count) | |
| # Filter by status | |
| st.subheader("Filter Results") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| status_filter = st.multiselect( | |
| "Filter by Status", | |
| options=["PASS", "FAIL", "FLAG"], | |
| default=["FAIL", "FLAG"] | |
| ) | |
| with col2: | |
| scenario_filter = st.multiselect( | |
| "Filter by Scenario", | |
| options=list(SCENARIO_SUMMARIES.keys()), | |
| default=[] | |
| ) | |
| # Apply filters | |
| display_df = results_df.copy() | |
| if status_filter: | |
| display_df = display_df[display_df["Status"].isin(status_filter)] | |
| if scenario_filter: | |
| display_df = display_df[display_df["Scenario"].isin(scenario_filter)] | |
| # Exclude "NONE" scenario by default | |
| show_none = st.checkbox("Show 'No Match' entries", value=False) | |
| if not show_none: | |
| display_df = display_df[display_df["Scenario"] != "NONE"] | |
| st.write(f"**Showing {len(display_df)} results**") | |
| # Display results table | |
| if len(display_df) > 0: | |
| # Select columns to display | |
| display_columns = [ | |
| "Entry Number", "Description", "Primary HTS", | |
| "Additional HTS", | |
| # HTS indicators | |
| "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", | |
| # Keyword indicators | |
| "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW", | |
| # Validation | |
| "Scenario", "Status", "Issue" | |
| ] | |
| # Interactive filtering section | |
| st.markdown("**Interactive Filters:**") | |
| filter_col1, filter_col2, filter_col3 = st.columns(3) | |
| with filter_col1: | |
| hts_search = st.text_input( | |
| "Filter by Primary HTS", | |
| placeholder="e.g., 7301 or 8302", | |
| key="results_hts_filter" | |
| ) | |
| with filter_col2: | |
| desc_search = st.text_input( | |
| "Filter by Description", | |
| placeholder="e.g., steel, aluminum", | |
| key="results_desc_filter" | |
| ) | |
| with filter_col3: | |
| additional_hts_search = st.text_input( | |
| "Filter by Additional HTS", | |
| placeholder="e.g., 99038191", | |
| key="results_additional_filter" | |
| ) | |
| # Apply interactive filters | |
| interactive_df = display_df.copy() | |
| if hts_search: | |
| interactive_df = interactive_df[ | |
| interactive_df["Primary HTS"].astype(str).str.contains(hts_search, case=False, na=False) | |
| ] | |
| if desc_search: | |
| interactive_df = interactive_df[ | |
| interactive_df["Description"].astype(str).str.contains(desc_search, case=False, na=False) | |
| ] | |
| if additional_hts_search: | |
| interactive_df = interactive_df[ | |
| interactive_df["Additional HTS"].astype(str).str.contains(additional_hts_search, case=False, na=False) | |
| ] | |
| st.write(f"**Filtered: {len(interactive_df)} of {len(display_df)} results**") | |
| # Store interactive filtered df for export | |
| st.session_state.interactive_filtered_df = interactive_df | |
| # Get indicator columns that exist in display_columns | |
| indicator_cols_in_df = [c for c in INDICATOR_COLUMNS if c in display_columns] | |
| styled_df = interactive_df[display_columns].style.applymap( | |
| color_status, subset=["Status"] | |
| ).applymap( | |
| color_indicator, subset=indicator_cols_in_df | |
| ) | |
| st.dataframe(styled_df, use_container_width=True, height=400) | |
| # Scenario legend | |
| with st.expander("Scenario Legend"): | |
| for scenario_id, summary in SCENARIO_SUMMARIES.items(): | |
| st.write(f"**{scenario_id}**: {summary}") | |
| # Bulk Export Actions | |
| st.subheader("Add to Export Cache") | |
| st.markdown("Use bulk actions to add **currently filtered** results to export cache") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("Add ALL Filtered to Cache", type="primary"): | |
| added_count = 0 | |
| for _, row in interactive_df.iterrows(): | |
| row_dict = row.to_dict() | |
| # Check if not already in cache (by Entry + HTS + Description for uniqueness) | |
| key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) | |
| existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} entries to cache ({len(st.session_state.export_cache)} total)") | |
| with col2: | |
| if st.button("Add FAIL Only to Cache"): | |
| fail_df = interactive_df[interactive_df["Status"] == "FAIL"] | |
| added_count = 0 | |
| for _, row in fail_df.iterrows(): | |
| row_dict = row.to_dict() | |
| key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) | |
| existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} FAIL entries to cache") | |
| with col3: | |
| if st.button("Add FLAG Only to Cache"): | |
| flag_df = interactive_df[interactive_df["Status"] == "FLAG"] | |
| added_count = 0 | |
| for _, row in flag_df.iterrows(): | |
| row_dict = row.to_dict() | |
| key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) | |
| existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} FLAG entries to cache") | |
| # Add by scenario | |
| st.markdown("**Add by Scenario (from filtered results):**") | |
| scenario_cols = st.columns(4) | |
| available_scenarios = interactive_df["Scenario"].unique().tolist() | |
| for idx, scenario in enumerate(available_scenarios[:8]): # Limit to 8 buttons | |
| col_idx = idx % 4 | |
| with scenario_cols[col_idx]: | |
| scenario_count = len(interactive_df[interactive_df["Scenario"] == scenario]) | |
| if st.button(f"{scenario} ({scenario_count})", key=f"add_scenario_{scenario}"): | |
| scenario_df = interactive_df[interactive_df["Scenario"] == scenario] | |
| added_count = 0 | |
| for _, row in scenario_df.iterrows(): | |
| row_dict = row.to_dict() | |
| key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", "")) | |
| existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} {scenario} entries to cache") | |
| # Show cache status | |
| st.info(f"Current cache: {len(st.session_state.export_cache)} entries. Go to 'Export Selection' tab to download.") | |
| # Tab 2b: Unique Combinations | |
| with tab2b: | |
| st.header("Unique HTS + Description Combinations") | |
| st.markdown("View unique combinations to avoid reviewing duplicates") | |
| if st.session_state.validation_results is None: | |
| st.info("Upload a file and run validation first.") | |
| else: | |
| results_df = st.session_state.validation_results.copy() | |
| # Load reviewed combinations | |
| reviewed_combinations = load_reviewed_combinations() | |
| reviewed_count = len(reviewed_combinations) | |
| # Filter by status first | |
| st.subheader("Filter Options") | |
| # Reviewed combinations filter | |
| filter_reviewed = st.checkbox( | |
| f"Hide reviewed combinations ({reviewed_count} in list)", | |
| value=True, | |
| key="filter_reviewed_combinations", | |
| help="Filter out HTS+Description combinations that have already been reviewed" | |
| ) | |
| # Show reviewed combinations info | |
| if reviewed_count > 0: | |
| with st.expander(f"View {reviewed_count} reviewed combinations"): | |
| csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE) | |
| st.caption(f"File: {csv_path}") | |
| try: | |
| # Try multiple encodings | |
| reviewed_df = None | |
| for enc in ["utf-8", "cp1252", "latin-1", "utf-8-sig"]: | |
| try: | |
| reviewed_df = pd.read_csv(csv_path, dtype=str, encoding=enc) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if reviewed_df is not None: | |
| st.dataframe(reviewed_df, use_container_width=True, height=200) | |
| else: | |
| st.error("Could not decode CSV file with any supported encoding") | |
| except Exception as e: | |
| st.error(f"Error loading file: {e}") | |
| else: | |
| st.info(f"No reviewed combinations found. Add HTS,Description rows to '{REVIEWED_COMBINATIONS_FILE}' to filter them out.") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| unique_status_filter = st.multiselect( | |
| "Filter by Status", | |
| options=["PASS", "FAIL", "FLAG"], | |
| default=["FAIL", "FLAG"], | |
| key="unique_status_filter" | |
| ) | |
| with col2: | |
| unique_scenario_filter = st.multiselect( | |
| "Filter by Scenario", | |
| options=list(SCENARIO_SUMMARIES.keys()), | |
| default=[], | |
| key="unique_scenario_filter" | |
| ) | |
| # Apply filters | |
| filtered_df = results_df.copy() | |
| if unique_status_filter: | |
| filtered_df = filtered_df[filtered_df["Status"].isin(unique_status_filter)] | |
| if unique_scenario_filter: | |
| filtered_df = filtered_df[filtered_df["Scenario"].isin(unique_scenario_filter)] | |
| # Exclude NONE by default | |
| show_none_unique = st.checkbox("Show 'No Match' entries", value=False, key="show_none_unique") | |
| if not show_none_unique: | |
| filtered_df = filtered_df[filtered_df["Scenario"] != "NONE"] | |
| if len(filtered_df) > 0: | |
| # Group by Primary HTS + Description (use Full Description for grouping) | |
| # Aggregate to get unique combinations | |
| unique_df = filtered_df.groupby( | |
| ["Primary HTS", "Full Description"], as_index=False | |
| ).agg({ | |
| "Entry Number": "count", # Count occurrences | |
| "Additional HTS": "first", # Take first (should be same for same HTS+desc) | |
| # HTS indicators | |
| "Steel HTS": "first", | |
| "Alum HTS": "first", | |
| "Copper HTS": "first", | |
| "Computer HTS": "first", | |
| "Auto HTS": "first", | |
| "Semi HTS": "first", | |
| # Keyword indicators | |
| "Metal KW": "first", | |
| "Alum KW": "first", | |
| "Copper KW": "first", | |
| "Zinc KW": "first", | |
| "Plastics KW": "first", | |
| # Validation | |
| "Scenario": "first", | |
| "Scenario Summary": "first", | |
| "Status": "first", | |
| "Expected HTS": "first", | |
| "Missing HTS": "first", | |
| "Unexpected HTS": "first", | |
| "Issue": "first" | |
| }).rename(columns={"Entry Number": "Count"}) | |
| # Sort by count descending to show most common first | |
| unique_df = unique_df.sort_values("Count", ascending=False).reset_index(drop=True) | |
| # Filter out reviewed combinations if checkbox is checked | |
| if filter_reviewed and reviewed_count > 0: | |
| # Mark which combinations are reviewed | |
| unique_df["_is_reviewed"] = unique_df.apply( | |
| lambda row: is_combination_reviewed( | |
| row["Primary HTS"], | |
| row["Full Description"], | |
| reviewed_combinations | |
| ), | |
| axis=1 | |
| ) | |
| reviewed_in_data = unique_df["_is_reviewed"].sum() | |
| unique_df = unique_df[~unique_df["_is_reviewed"]].drop(columns=["_is_reviewed"]) | |
| unique_df = unique_df.reset_index(drop=True) | |
| # Re-index starting from 1 | |
| unique_df.index = unique_df.index + 1 | |
| # Create shorter description for display | |
| unique_df["Description"] = unique_df["Full Description"].apply( | |
| lambda x: x[:80] + "..." if len(str(x)) > 80 else x | |
| ) | |
| # Show count info | |
| if filter_reviewed and reviewed_count > 0: | |
| st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries, {reviewed_in_data} reviewed combinations hidden)") | |
| else: | |
| st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries)") | |
| # Interactive filters for unique view | |
| st.markdown("**Search Filters:**") | |
| ucol1, ucol2 = st.columns(2) | |
| with ucol1: | |
| unique_hts_search = st.text_input( | |
| "Filter by Primary HTS", | |
| placeholder="e.g., 7301 or 8302", | |
| key="unique_hts_search" | |
| ) | |
| with ucol2: | |
| unique_desc_search = st.text_input( | |
| "Filter by Description", | |
| placeholder="e.g., steel, aluminum", | |
| key="unique_desc_search" | |
| ) | |
| # Apply search filters | |
| display_unique_df = unique_df.copy() | |
| if unique_hts_search: | |
| display_unique_df = display_unique_df[ | |
| display_unique_df["Primary HTS"].astype(str).str.contains(unique_hts_search, case=False, na=False) | |
| ] | |
| if unique_desc_search: | |
| display_unique_df = display_unique_df[ | |
| display_unique_df["Description"].astype(str).str.contains(unique_desc_search, case=False, na=False) | |
| ] | |
| # Re-index after filtering | |
| display_unique_df = display_unique_df.reset_index(drop=True) | |
| display_unique_df.index = display_unique_df.index + 1 | |
| st.write(f"**Showing {len(display_unique_df)} unique combinations**") | |
| # Display columns | |
| display_cols = [ | |
| "Primary HTS", "Description", "Additional HTS", | |
| # HTS indicators | |
| "Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS", | |
| # Keyword indicators | |
| "Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW", | |
| # Validation | |
| "Scenario", "Status", "Count", "Issue" | |
| ] | |
| # Get indicator columns that exist in display_cols | |
| indicator_cols_unique = [c for c in INDICATOR_COLUMNS if c in display_cols] | |
| styled_unique = display_unique_df[display_cols].style.applymap( | |
| color_status, subset=["Status"] | |
| ).applymap( | |
| color_indicator, subset=indicator_cols_unique | |
| ) | |
| st.dataframe(styled_unique, use_container_width=True, height=400) | |
| # Bulk export for unique combinations | |
| st.subheader("Add Unique Combinations to Cache") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Add ALL Unique to Cache", type="primary", key="add_all_unique"): | |
| added_count = 0 | |
| for _, row in display_unique_df.iterrows(): | |
| row_dict = row.to_dict() | |
| key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", "")) | |
| existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} unique combinations to cache") | |
| with col2: | |
| if st.button("Add FAIL/FLAG Unique to Cache", key="add_fail_flag_unique"): | |
| fail_flag_df = display_unique_df[display_unique_df["Status"].isin(["FAIL", "FLAG"])] | |
| added_count = 0 | |
| for _, row in fail_flag_df.iterrows(): | |
| row_dict = row.to_dict() | |
| key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", "")) | |
| existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", "")) | |
| for d in st.session_state.export_cache] | |
| if key not in existing_keys: | |
| st.session_state.export_cache.append(row_dict) | |
| added_count += 1 | |
| st.success(f"Added {added_count} FAIL/FLAG combinations to cache") | |
| st.info(f"Current cache: {len(st.session_state.export_cache)} entries") | |
| else: | |
| st.info("No results matching the selected filters.") | |
| # Tab 3: Keyword Management | |
| with tab3: | |
| st.header("Keyword Management") | |
| st.markdown("Edit keyword lists used for validation. Changes apply immediately.") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Metal Keywords") | |
| metal_text = st.text_area( | |
| "Metal keywords (one per line)", | |
| value="\n".join(st.session_state.keywords["metal"]), | |
| height=150, | |
| key="metal_input" | |
| ) | |
| st.subheader("Aluminum Keywords") | |
| aluminum_text = st.text_area( | |
| "Aluminum keywords (one per line)", | |
| value="\n".join(st.session_state.keywords["aluminum"]), | |
| height=100, | |
| key="aluminum_input" | |
| ) | |
| st.subheader("Copper Keywords") | |
| copper_text = st.text_area( | |
| "Copper keywords (one per line)", | |
| value="\n".join(st.session_state.keywords["copper"]), | |
| height=100, | |
| key="copper_input" | |
| ) | |
| with col2: | |
| st.subheader("Zinc Keywords") | |
| zinc_text = st.text_area( | |
| "Zinc keywords (one per line)", | |
| value="\n".join(st.session_state.keywords["zinc"]), | |
| height=100, | |
| key="zinc_input" | |
| ) | |
| st.subheader("Plastics Keywords") | |
| plastics_text = st.text_area( | |
| "Plastics keywords (one per line)", | |
| value="\n".join(st.session_state.keywords["plastics"]), | |
| height=150, | |
| key="plastics_input" | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Save Keywords", type="primary"): | |
| st.session_state.keywords["metal"] = [ | |
| k.strip() for k in metal_text.split("\n") if k.strip() | |
| ] | |
| st.session_state.keywords["aluminum"] = [ | |
| k.strip() for k in aluminum_text.split("\n") if k.strip() | |
| ] | |
| st.session_state.keywords["copper"] = [ | |
| k.strip() for k in copper_text.split("\n") if k.strip() | |
| ] | |
| st.session_state.keywords["zinc"] = [ | |
| k.strip() for k in zinc_text.split("\n") if k.strip() | |
| ] | |
| st.session_state.keywords["plastics"] = [ | |
| k.strip() for k in plastics_text.split("\n") if k.strip() | |
| ] | |
| # Clear cached results to force re-validation | |
| if "cached_full_results" in st.session_state: | |
| del st.session_state.cached_full_results | |
| if "cached_file_names" in st.session_state: | |
| del st.session_state.cached_file_names | |
| st.success("Keywords saved! Re-upload file or refresh to apply changes.") | |
| with col2: | |
| if st.button("Reset to Defaults"): | |
| st.session_state.keywords = { | |
| "metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"], | |
| "aluminum": ["aluminum", "aluminium"], | |
| "copper": ["copper"], | |
| "zinc": ["zinc"], | |
| "plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"] | |
| } | |
| # Clear cached results | |
| if "cached_full_results" in st.session_state: | |
| del st.session_state.cached_full_results | |
| if "cached_file_names" in st.session_state: | |
| del st.session_state.cached_file_names | |
| st.success("Keywords reset to defaults!") | |
| st.rerun() | |
| # Tab 4: Export Selection | |
| with tab4: | |
| st.header("Export Selection") | |
| if len(st.session_state.export_cache) == 0: | |
| st.info("No entries in export cache. Select entries from Validation Results tab.") | |
| else: | |
| st.write(f"**{len(st.session_state.export_cache)} entries in cache**") | |
| # Display cache contents | |
| cache_df = pd.DataFrame(st.session_state.export_cache) | |
| st.dataframe(cache_df, use_container_width=True) | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("Clear Cache"): | |
| st.session_state.export_cache = [] | |
| st.success("Cache cleared!") | |
| st.rerun() | |
| with col2: | |
| # Export cached entries only | |
| if st.button("Export Cache to Excel"): | |
| excel_data = export_to_excel(cache_df) | |
| st.download_button( | |
| label="Download Excel (Cache Only)", | |
| data=excel_data, | |
| file_name="hts_audit_cache.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| # Export full results with original data | |
| st.subheader("Export Full Results") | |
| if st.session_state.validation_results is not None and st.session_state.original_df is not None: | |
| # validation_results is already a DataFrame now | |
| results_df = st.session_state.validation_results.copy() | |
| # Status filter for export | |
| export_status = st.multiselect( | |
| "Export entries with status:", | |
| options=["PASS", "FAIL", "FLAG"], | |
| default=["FAIL", "FLAG"], | |
| key="export_status_filter" | |
| ) | |
| # Create filtered export | |
| if export_status: | |
| filtered_results = results_df[results_df["Status"].isin(export_status)] | |
| filtered_indices = filtered_results.index.tolist() | |
| if hasattr(st.session_state, "filtered_df"): | |
| export_original = st.session_state.filtered_df.iloc[filtered_indices].copy() | |
| else: | |
| export_original = st.session_state.original_df.iloc[filtered_indices].copy() | |
| st.write(f"**{len(filtered_results)} entries will be exported**") | |
| if st.button("Generate Full Export", type="primary"): | |
| excel_data = export_to_excel(export_original, filtered_results) | |
| st.download_button( | |
| label="Download Full Excel Report", | |
| data=excel_data, | |
| file_name="hts_audit_full_report.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| else: | |
| st.info("Run validation first to enable full export.") | |
| # Tab 5: HTS Reference | |
| with tab5: | |
| st.header("HTS Reference Lists") | |
| st.markdown("Reference lists of Steel, Aluminum, Copper, and Semiconductor HTS codes used for validation") | |
| # Search filter | |
| hts_search = st.text_input( | |
| "Search HTS code", | |
| placeholder="Enter HTS to search across all lists", | |
| key="hts_reference_search" | |
| ) | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.subheader(f"Steel HTS ({len(Steel_primary_HTS_list)})") | |
| steel_list = [str(h) for h in Steel_primary_HTS_list] | |
| if hts_search: | |
| steel_list = [h for h in steel_list if hts_search in h] | |
| steel_df = pd.DataFrame({"Steel HTS": steel_list}) | |
| st.dataframe(steel_df, use_container_width=True, height=400) | |
| with col2: | |
| st.subheader(f"Aluminum HTS ({len(Aluminum_primary_HTS_list)})") | |
| aluminum_list = [str(h) for h in Aluminum_primary_HTS_list] | |
| if hts_search: | |
| aluminum_list = [h for h in aluminum_list if hts_search in h] | |
| aluminum_df = pd.DataFrame({"Aluminum HTS": aluminum_list}) | |
| st.dataframe(aluminum_df, use_container_width=True, height=400) | |
| with col3: | |
| st.subheader(f"Copper HTS ({len(Copper_primary_HTS_list)})") | |
| copper_list = [str(h) for h in Copper_primary_HTS_list] | |
| if hts_search: | |
| copper_list = [h for h in copper_list if hts_search in h] | |
| copper_df = pd.DataFrame({"Copper HTS": copper_list}) | |
| st.dataframe(copper_df, use_container_width=True, height=400) | |
| with col4: | |
| st.subheader(f"Semiconductor HTS ({len(Semiconductor_HTS_list)})") | |
| semi_list = [str(h) for h in Semiconductor_HTS_list] | |
| if hts_search: | |
| semi_list = [h for h in semi_list if hts_search in h] | |
| semi_df = pd.DataFrame({"Semiconductor HTS": semi_list}) | |
| st.dataframe(semi_df, use_container_width=True, height=400) | |
| st.caption("Note: Overlaps with Computer Parts and Aluminum HTS") | |
| # Show overlap info | |
| st.subheader("HTS Overlap Analysis") | |
| steel_set = set(str(h) for h in Steel_primary_HTS_list) | |
| aluminum_set = set(str(h) for h in Aluminum_primary_HTS_list) | |
| copper_set = set(str(h) for h in Copper_primary_HTS_list) | |
| steel_aluminum = steel_set & aluminum_set | |
| aluminum_copper = aluminum_set & copper_set | |
| steel_copper = steel_set & copper_set | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Steel & Aluminum Overlap", len(steel_aluminum)) | |
| if steel_aluminum: | |
| with st.expander("View overlapping HTS"): | |
| st.write(sorted(steel_aluminum)) | |
| with col2: | |
| st.metric("Aluminum & Copper Overlap", len(aluminum_copper)) | |
| if aluminum_copper: | |
| with st.expander("View overlapping HTS"): | |
| st.write(sorted(aluminum_copper)) | |
| with col3: | |
| st.metric("Steel & Copper Overlap", len(steel_copper)) | |
| if steel_copper: | |
| with st.expander("View overlapping HTS"): | |
| st.write(sorted(steel_copper)) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("HTS Checker v1.0 - Tariff Audit Tool") | |