HTSReivewTool / app.py
joycecast's picture
Upload app.py
5030e0d verified
"""
HTS Checker - Streamlit Application for HTS Tariff Auditing
Deployed on Hugging Face Spaces
"""
import streamlit as st
import pandas as pd
from io import BytesIO
import hashlib
import os
from hts_validator import HTSValidator, validate_dataframe, SCENARIO_SUMMARIES
from HTS_list import Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list, Semiconductor_HTS_list
# Path to reviewed combinations CSV file
REVIEWED_COMBINATIONS_FILE = "Reviewed_combination.csv"
# Page configuration
st.set_page_config(
page_title="HTS Checker - Tariff Audit Tool",
page_icon="",
layout="wide"
)
# =============================================================================
# Authentication
# =============================================================================
def get_app_password():
"""Get password from secrets or environment variable."""
# Try Streamlit secrets first (for Hugging Face Spaces)
try:
return st.secrets["APP_PASSWORD"]
except (KeyError, FileNotFoundError):
pass
# Fall back to environment variable
return os.environ.get("HTS_CHECKER_PASSWORD", "")
def check_password():
"""Returns True if the user has entered the correct password."""
app_password = get_app_password()
# If no password is set, allow access (for local development)
if not app_password:
return True
# Initialize session state for authentication
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
# If already authenticated, return True
if st.session_state.authenticated:
return True
# Show login form
st.markdown("## HTS Checker - Login Required")
st.markdown("Please enter the password to access this application.")
with st.form("login_form"):
password_input = st.text_input("Password", type="password", key="password_input")
submit_button = st.form_submit_button("Login")
if submit_button:
if password_input == app_password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("Incorrect password. Please try again.")
return False
# Check authentication before showing main app
if not check_password():
st.stop()
def load_single_excel(file_content):
"""Load a single Excel file with proper HTS column types"""
df = pd.read_excel(BytesIO(file_content), dtype={
"Tariff": str,
"Primary 1": str,
"Primary 2": str,
"Primary 3": str,
"Primary 4": str,
"Primary 5": str,
"Primary 6": str,
})
# Clean up HTS columns
hts_columns = ["Tariff", "Primary 1", "Primary 2", "Primary 3",
"Primary 4", "Primary 5", "Primary 6"]
for col in hts_columns:
if col in df.columns:
df[col] = df[col].astype(str).str.replace(r'\.0$', '', regex=True)
df[col] = df[col].replace('nan', '')
return df
@st.cache_data
def load_and_validate_excel(file_contents_list, file_names_list, keywords_hash):
"""Load multiple Excel files and combine - cached to avoid re-running on filter changes"""
all_dfs = []
for file_content in file_contents_list:
df = load_single_excel(file_content)
all_dfs.append(df)
# Concatenate all dataframes
combined_df = pd.concat(all_dfs, ignore_index=True)
return combined_df
@st.cache_data
def run_validation(df_hash, _df, _validator):
"""Run validation - cached based on dataframe hash"""
results = validate_dataframe(_df, _validator)
return results
def get_df_hash(df):
"""Get hash of dataframe for caching"""
return hashlib.md5(pd.util.hash_pandas_object(df).values.tobytes()).hexdigest()
def get_keywords_hash(keywords):
"""Get hash of keywords for cache invalidation"""
return hashlib.md5(str(keywords).encode()).hexdigest()
def load_reviewed_combinations():
"""Load reviewed HTS+Description combinations from CSV file"""
reviewed_set = set()
csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE)
if os.path.exists(csv_path):
# Try multiple encodings
encodings = ["utf-8", "cp1252", "latin-1", "utf-8-sig"]
df = None
for encoding in encodings:
try:
df = pd.read_csv(csv_path, dtype=str, encoding=encoding)
break
except UnicodeDecodeError:
continue
except Exception as e:
st.warning(f"Could not load reviewed combinations: {e}")
return reviewed_set
if df is not None and "HTS" in df.columns and "Description" in df.columns:
for _, row in df.iterrows():
hts = str(row["HTS"]).strip() if pd.notna(row["HTS"]) else ""
desc = str(row["Description"]).strip().upper() if pd.notna(row["Description"]) else ""
if hts and desc:
reviewed_set.add((hts, desc))
return reviewed_set
def is_combination_reviewed(hts, description, reviewed_set):
"""Check if HTS+Description combination has been reviewed"""
hts_str = str(hts).strip() if pd.notna(hts) else ""
desc_str = str(description).strip().upper() if pd.notna(description) else ""
return (hts_str, desc_str) in reviewed_set
# Initialize session state
if "keywords" not in st.session_state:
st.session_state.keywords = {
"metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"],
"aluminum": ["aluminum", "aluminium"],
"copper": ["copper"],
"zinc": ["zinc"],
"plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"]
}
if "export_cache" not in st.session_state:
st.session_state.export_cache = []
if "validation_results" not in st.session_state:
st.session_state.validation_results = None
if "original_df" not in st.session_state:
st.session_state.original_df = None
def get_validator():
"""Create validator with current keyword settings"""
return HTSValidator(
metal_keywords=st.session_state.keywords["metal"],
aluminum_keywords=st.session_state.keywords["aluminum"],
copper_keywords=st.session_state.keywords["copper"],
zinc_keywords=st.session_state.keywords["zinc"],
plastics_keywords=st.session_state.keywords["plastics"]
)
def color_status(val):
"""Color code status column"""
if val == "PASS":
return "background-color: #90EE90" # Light green
elif val == "FAIL":
return "background-color: #FFB6C1" # Light red
elif val == "FLAG":
return "background-color: #FFFFE0" # Light yellow
return ""
def format_hts(hts_value):
"""Format HTS value as string, removing .0 suffix"""
if not hts_value:
return ""
s = str(hts_value)
# Remove .0 suffix if present (from float conversion)
if s.endswith(".0"):
s = s[:-2]
return s
def bool_to_symbol(value: bool) -> str:
"""Convert boolean to check/cross symbol"""
return "Y" if value else "N"
def color_indicator(val):
"""Color Y values with light green background"""
if val == "Y":
return "background-color: #90EE90" # Light green
return ""
# Indicator columns for styling
INDICATOR_COLUMNS = [
"Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS",
"Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW"
]
def results_to_dataframe(results):
"""Convert validation results to DataFrame"""
data = []
for r in results:
# Format additional HTS as strings
additional_hts_str = ", ".join([format_hts(h) for h in r.additional_hts if h])
expected_hts_str = ", ".join([format_hts(h) for h in r.expected_hts if h])
missing_hts_str = ", ".join([format_hts(h) for h in r.missing_hts if h])
unexpected_hts_str = ", ".join([format_hts(h) for h in r.unexpected_hts if h])
data.append({
"Entry Number": r.entry_number,
"Description": r.description[:100] + "..." if len(r.description) > 100 else r.description,
"Full Description": r.description,
"Primary HTS": format_hts(r.primary_hts),
"Additional HTS": additional_hts_str,
# HTS membership indicators
"Steel HTS": bool_to_symbol(r.in_steel_hts),
"Alum HTS": bool_to_symbol(r.in_aluminum_hts),
"Copper HTS": bool_to_symbol(r.in_copper_hts),
"Computer HTS": bool_to_symbol(r.in_computer_hts),
"Auto HTS": bool_to_symbol(r.in_auto_hts),
"Semi HTS": bool_to_symbol(r.in_semiconductor_hts),
# Keyword indicators
"Metal KW": bool_to_symbol(r.has_metal_keyword),
"Alum KW": bool_to_symbol(r.has_aluminum_keyword),
"Copper KW": bool_to_symbol(r.has_copper_keyword),
"Zinc KW": bool_to_symbol(r.has_zinc_keyword),
"Plastics KW": bool_to_symbol(r.has_plastics_keyword),
# Validation results
"Scenario": r.scenario_id,
"Scenario Summary": r.scenario_summary,
"Status": r.status,
"Expected HTS": expected_hts_str,
"Missing HTS": missing_hts_str,
"Unexpected HTS": unexpected_hts_str,
"Issue": r.issue
})
return pd.DataFrame(data)
def export_to_excel(df, results_df=None):
"""Export DataFrame to Excel with optional validation results"""
output = BytesIO()
with pd.ExcelWriter(output, engine="openpyxl") as writer:
if results_df is not None:
# Merge original data with validation results
# Use Full Description for export
export_df = df.copy()
# Add validation columns
if len(results_df) == len(export_df):
export_df["Scenario ID"] = results_df["Scenario"].values
export_df["Scenario Summary"] = results_df["Scenario Summary"].values
export_df["Status"] = results_df["Status"].values
export_df["Expected HTS"] = results_df["Expected HTS"].values
export_df["Missing HTS"] = results_df["Missing HTS"].values
export_df["Unexpected HTS"] = results_df["Unexpected HTS"].values
export_df["Issue Description"] = results_df["Issue"].values
export_df.to_excel(writer, sheet_name="Audit Results", index=False)
else:
df.to_excel(writer, sheet_name="Export", index=False)
output.seek(0)
return output
# Main app
st.title("HTS Checker - Tariff Audit Tool")
st.markdown("Audit primary HTS codes against additional tariffs and description keywords")
# Create tabs
tab1, tab2, tab2b, tab3, tab4, tab5 = st.tabs([
"Upload & Filter",
"Validation Results",
"Unique Combinations",
"Keyword Management",
"Export Selection",
"HTS Reference"
])
# Tab 1: Upload & Filter
with tab1:
st.header("Upload Excel Files")
uploaded_files = st.file_uploader(
"Upload entry report Excel files (multiple allowed)",
type=["xlsx", "xls"],
accept_multiple_files=True,
help="Upload one or more customizable entry reports from NetCHB. Duplicates across files will be removed."
)
if uploaded_files:
try:
# Use cached loading function with multiple files
keywords_hash = get_keywords_hash(st.session_state.keywords)
file_contents = [f.read() for f in uploaded_files]
file_names = [f.name for f in uploaded_files]
# Reset file positions for potential re-read
for f in uploaded_files:
f.seek(0)
df = load_and_validate_excel(file_contents, file_names, keywords_hash)
st.session_state.original_df = df
# Show load summary
if len(uploaded_files) > 1:
st.success(f"Loaded {len(df)} rows from {len(uploaded_files)} files")
else:
st.success(f"Loaded {len(df)} rows")
# Display column mapping info
with st.expander("Column Mapping"):
st.markdown("""
**Expected Columns:**
- Column E: `Description` - Product description for keyword matching
- Column F: `Tariff` - 10-digit Primary HTS code
- Columns I-N: `Primary 1-6` - Additional HTS codes
""")
st.write("**Detected columns:**", df.columns.tolist())
# Filter controls
st.subheader("Filter Options")
col1, col2 = st.columns(2)
with col1:
hts_filter = st.text_input(
"Filter by Primary HTS (partial match)",
placeholder="e.g., 7301 or 730120",
help="Enter partial HTS to filter entries"
)
with col2:
desc_exclude = st.text_input(
"Exclude by description keyword",
placeholder="e.g., polyester",
help="Exclude entries containing this keyword in description"
)
# Apply filters
filtered_df = df.copy()
if hts_filter:
tariff_col = "Tariff" if "Tariff" in df.columns else df.columns[5]
filtered_df = filtered_df[
filtered_df[tariff_col].astype(str).str.contains(hts_filter, na=False)
]
if desc_exclude:
desc_col = "Description" if "Description" in df.columns else df.columns[4]
filtered_df = filtered_df[
~filtered_df[desc_col].astype(str).str.lower().str.contains(
desc_exclude.lower(), na=False
)
]
st.write(f"**{len(filtered_df)} of {len(df)} entries after filters**")
if len(filtered_df) > 0:
# Manual validation button
file_names_key = ",".join(sorted(file_names))
# Check if validation already done for these files
validation_done = (
"cached_full_results" in st.session_state and
st.session_state.get("cached_file_names") == file_names_key
)
if validation_done:
# Filter cached results based on current filters
full_results_df = st.session_state.cached_full_results
filtered_indices = filtered_df.index.tolist()
filtered_results_df = full_results_df.iloc[filtered_indices].copy()
st.session_state.validation_results = filtered_results_df
st.session_state.filtered_df = filtered_df
st.success(f"Validated {len(filtered_df)} entries. Go to 'Validation Results' tab to review.")
else:
st.session_state.filtered_df = filtered_df
if st.button("Validate", type="primary"):
with st.spinner("Validating all entries..."):
validator = get_validator()
full_results = validate_dataframe(df, validator)
full_results_df = results_to_dataframe(full_results)
st.session_state.cached_full_results = full_results_df
st.session_state.cached_file_names = file_names_key
filtered_indices = filtered_df.index.tolist()
filtered_results_df = full_results_df.iloc[filtered_indices].copy()
st.session_state.validation_results = filtered_results_df
st.rerun()
except Exception as e:
st.error(f"Error loading file: {str(e)}")
# Tab 2: Validation Results
with tab2:
st.header("Validation Results")
if st.session_state.validation_results is None:
st.info("Upload a file and run validation first.")
else:
# Results are already a DataFrame now (cached)
results_df = st.session_state.validation_results.copy()
# Summary statistics
col1, col2, col3, col4 = st.columns(4)
with col1:
pass_count = len(results_df[results_df["Status"] == "PASS"])
st.metric("PASS", pass_count)
with col2:
fail_count = len(results_df[results_df["Status"] == "FAIL"])
st.metric("FAIL", fail_count)
with col3:
flag_count = len(results_df[results_df["Status"] == "FLAG"])
st.metric("FLAG", flag_count)
with col4:
none_count = len(results_df[results_df["Scenario"] == "NONE"])
st.metric("No Match", none_count)
# Filter by status
st.subheader("Filter Results")
col1, col2 = st.columns(2)
with col1:
status_filter = st.multiselect(
"Filter by Status",
options=["PASS", "FAIL", "FLAG"],
default=["FAIL", "FLAG"]
)
with col2:
scenario_filter = st.multiselect(
"Filter by Scenario",
options=list(SCENARIO_SUMMARIES.keys()),
default=[]
)
# Apply filters
display_df = results_df.copy()
if status_filter:
display_df = display_df[display_df["Status"].isin(status_filter)]
if scenario_filter:
display_df = display_df[display_df["Scenario"].isin(scenario_filter)]
# Exclude "NONE" scenario by default
show_none = st.checkbox("Show 'No Match' entries", value=False)
if not show_none:
display_df = display_df[display_df["Scenario"] != "NONE"]
st.write(f"**Showing {len(display_df)} results**")
# Display results table
if len(display_df) > 0:
# Select columns to display
display_columns = [
"Entry Number", "Description", "Primary HTS",
"Additional HTS",
# HTS indicators
"Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS",
# Keyword indicators
"Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW",
# Validation
"Scenario", "Status", "Issue"
]
# Interactive filtering section
st.markdown("**Interactive Filters:**")
filter_col1, filter_col2, filter_col3 = st.columns(3)
with filter_col1:
hts_search = st.text_input(
"Filter by Primary HTS",
placeholder="e.g., 7301 or 8302",
key="results_hts_filter"
)
with filter_col2:
desc_search = st.text_input(
"Filter by Description",
placeholder="e.g., steel, aluminum",
key="results_desc_filter"
)
with filter_col3:
additional_hts_search = st.text_input(
"Filter by Additional HTS",
placeholder="e.g., 99038191",
key="results_additional_filter"
)
# Apply interactive filters
interactive_df = display_df.copy()
if hts_search:
interactive_df = interactive_df[
interactive_df["Primary HTS"].astype(str).str.contains(hts_search, case=False, na=False)
]
if desc_search:
interactive_df = interactive_df[
interactive_df["Description"].astype(str).str.contains(desc_search, case=False, na=False)
]
if additional_hts_search:
interactive_df = interactive_df[
interactive_df["Additional HTS"].astype(str).str.contains(additional_hts_search, case=False, na=False)
]
st.write(f"**Filtered: {len(interactive_df)} of {len(display_df)} results**")
# Store interactive filtered df for export
st.session_state.interactive_filtered_df = interactive_df
# Get indicator columns that exist in display_columns
indicator_cols_in_df = [c for c in INDICATOR_COLUMNS if c in display_columns]
styled_df = interactive_df[display_columns].style.applymap(
color_status, subset=["Status"]
).applymap(
color_indicator, subset=indicator_cols_in_df
)
st.dataframe(styled_df, use_container_width=True, height=400)
# Scenario legend
with st.expander("Scenario Legend"):
for scenario_id, summary in SCENARIO_SUMMARIES.items():
st.write(f"**{scenario_id}**: {summary}")
# Bulk Export Actions
st.subheader("Add to Export Cache")
st.markdown("Use bulk actions to add **currently filtered** results to export cache")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Add ALL Filtered to Cache", type="primary"):
added_count = 0
for _, row in interactive_df.iterrows():
row_dict = row.to_dict()
# Check if not already in cache (by Entry + HTS + Description for uniqueness)
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} entries to cache ({len(st.session_state.export_cache)} total)")
with col2:
if st.button("Add FAIL Only to Cache"):
fail_df = interactive_df[interactive_df["Status"] == "FAIL"]
added_count = 0
for _, row in fail_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} FAIL entries to cache")
with col3:
if st.button("Add FLAG Only to Cache"):
flag_df = interactive_df[interactive_df["Status"] == "FLAG"]
added_count = 0
for _, row in flag_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} FLAG entries to cache")
# Add by scenario
st.markdown("**Add by Scenario (from filtered results):**")
scenario_cols = st.columns(4)
available_scenarios = interactive_df["Scenario"].unique().tolist()
for idx, scenario in enumerate(available_scenarios[:8]): # Limit to 8 buttons
col_idx = idx % 4
with scenario_cols[col_idx]:
scenario_count = len(interactive_df[interactive_df["Scenario"] == scenario])
if st.button(f"{scenario} ({scenario_count})", key=f"add_scenario_{scenario}"):
scenario_df = interactive_df[interactive_df["Scenario"] == scenario]
added_count = 0
for _, row in scenario_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} {scenario} entries to cache")
# Show cache status
st.info(f"Current cache: {len(st.session_state.export_cache)} entries. Go to 'Export Selection' tab to download.")
# Tab 2b: Unique Combinations
with tab2b:
st.header("Unique HTS + Description Combinations")
st.markdown("View unique combinations to avoid reviewing duplicates")
if st.session_state.validation_results is None:
st.info("Upload a file and run validation first.")
else:
results_df = st.session_state.validation_results.copy()
# Load reviewed combinations
reviewed_combinations = load_reviewed_combinations()
reviewed_count = len(reviewed_combinations)
# Filter by status first
st.subheader("Filter Options")
# Reviewed combinations filter
filter_reviewed = st.checkbox(
f"Hide reviewed combinations ({reviewed_count} in list)",
value=True,
key="filter_reviewed_combinations",
help="Filter out HTS+Description combinations that have already been reviewed"
)
# Show reviewed combinations info
if reviewed_count > 0:
with st.expander(f"View {reviewed_count} reviewed combinations"):
csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE)
st.caption(f"File: {csv_path}")
try:
# Try multiple encodings
reviewed_df = None
for enc in ["utf-8", "cp1252", "latin-1", "utf-8-sig"]:
try:
reviewed_df = pd.read_csv(csv_path, dtype=str, encoding=enc)
break
except UnicodeDecodeError:
continue
if reviewed_df is not None:
st.dataframe(reviewed_df, use_container_width=True, height=200)
else:
st.error("Could not decode CSV file with any supported encoding")
except Exception as e:
st.error(f"Error loading file: {e}")
else:
st.info(f"No reviewed combinations found. Add HTS,Description rows to '{REVIEWED_COMBINATIONS_FILE}' to filter them out.")
col1, col2 = st.columns(2)
with col1:
unique_status_filter = st.multiselect(
"Filter by Status",
options=["PASS", "FAIL", "FLAG"],
default=["FAIL", "FLAG"],
key="unique_status_filter"
)
with col2:
unique_scenario_filter = st.multiselect(
"Filter by Scenario",
options=list(SCENARIO_SUMMARIES.keys()),
default=[],
key="unique_scenario_filter"
)
# Apply filters
filtered_df = results_df.copy()
if unique_status_filter:
filtered_df = filtered_df[filtered_df["Status"].isin(unique_status_filter)]
if unique_scenario_filter:
filtered_df = filtered_df[filtered_df["Scenario"].isin(unique_scenario_filter)]
# Exclude NONE by default
show_none_unique = st.checkbox("Show 'No Match' entries", value=False, key="show_none_unique")
if not show_none_unique:
filtered_df = filtered_df[filtered_df["Scenario"] != "NONE"]
if len(filtered_df) > 0:
# Group by Primary HTS + Description (use Full Description for grouping)
# Aggregate to get unique combinations
unique_df = filtered_df.groupby(
["Primary HTS", "Full Description"], as_index=False
).agg({
"Entry Number": "count", # Count occurrences
"Additional HTS": "first", # Take first (should be same for same HTS+desc)
# HTS indicators
"Steel HTS": "first",
"Alum HTS": "first",
"Copper HTS": "first",
"Computer HTS": "first",
"Auto HTS": "first",
"Semi HTS": "first",
# Keyword indicators
"Metal KW": "first",
"Alum KW": "first",
"Copper KW": "first",
"Zinc KW": "first",
"Plastics KW": "first",
# Validation
"Scenario": "first",
"Scenario Summary": "first",
"Status": "first",
"Expected HTS": "first",
"Missing HTS": "first",
"Unexpected HTS": "first",
"Issue": "first"
}).rename(columns={"Entry Number": "Count"})
# Sort by count descending to show most common first
unique_df = unique_df.sort_values("Count", ascending=False).reset_index(drop=True)
# Filter out reviewed combinations if checkbox is checked
if filter_reviewed and reviewed_count > 0:
# Mark which combinations are reviewed
unique_df["_is_reviewed"] = unique_df.apply(
lambda row: is_combination_reviewed(
row["Primary HTS"],
row["Full Description"],
reviewed_combinations
),
axis=1
)
reviewed_in_data = unique_df["_is_reviewed"].sum()
unique_df = unique_df[~unique_df["_is_reviewed"]].drop(columns=["_is_reviewed"])
unique_df = unique_df.reset_index(drop=True)
# Re-index starting from 1
unique_df.index = unique_df.index + 1
# Create shorter description for display
unique_df["Description"] = unique_df["Full Description"].apply(
lambda x: x[:80] + "..." if len(str(x)) > 80 else x
)
# Show count info
if filter_reviewed and reviewed_count > 0:
st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries, {reviewed_in_data} reviewed combinations hidden)")
else:
st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries)")
# Interactive filters for unique view
st.markdown("**Search Filters:**")
ucol1, ucol2 = st.columns(2)
with ucol1:
unique_hts_search = st.text_input(
"Filter by Primary HTS",
placeholder="e.g., 7301 or 8302",
key="unique_hts_search"
)
with ucol2:
unique_desc_search = st.text_input(
"Filter by Description",
placeholder="e.g., steel, aluminum",
key="unique_desc_search"
)
# Apply search filters
display_unique_df = unique_df.copy()
if unique_hts_search:
display_unique_df = display_unique_df[
display_unique_df["Primary HTS"].astype(str).str.contains(unique_hts_search, case=False, na=False)
]
if unique_desc_search:
display_unique_df = display_unique_df[
display_unique_df["Description"].astype(str).str.contains(unique_desc_search, case=False, na=False)
]
# Re-index after filtering
display_unique_df = display_unique_df.reset_index(drop=True)
display_unique_df.index = display_unique_df.index + 1
st.write(f"**Showing {len(display_unique_df)} unique combinations**")
# Display columns
display_cols = [
"Primary HTS", "Description", "Additional HTS",
# HTS indicators
"Steel HTS", "Alum HTS", "Copper HTS", "Computer HTS", "Auto HTS", "Semi HTS",
# Keyword indicators
"Metal KW", "Alum KW", "Copper KW", "Zinc KW", "Plastics KW",
# Validation
"Scenario", "Status", "Count", "Issue"
]
# Get indicator columns that exist in display_cols
indicator_cols_unique = [c for c in INDICATOR_COLUMNS if c in display_cols]
styled_unique = display_unique_df[display_cols].style.applymap(
color_status, subset=["Status"]
).applymap(
color_indicator, subset=indicator_cols_unique
)
st.dataframe(styled_unique, use_container_width=True, height=400)
# Bulk export for unique combinations
st.subheader("Add Unique Combinations to Cache")
col1, col2 = st.columns(2)
with col1:
if st.button("Add ALL Unique to Cache", type="primary", key="add_all_unique"):
added_count = 0
for _, row in display_unique_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", ""))
existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} unique combinations to cache")
with col2:
if st.button("Add FAIL/FLAG Unique to Cache", key="add_fail_flag_unique"):
fail_flag_df = display_unique_df[display_unique_df["Status"].isin(["FAIL", "FLAG"])]
added_count = 0
for _, row in fail_flag_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", ""))
existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} FAIL/FLAG combinations to cache")
st.info(f"Current cache: {len(st.session_state.export_cache)} entries")
else:
st.info("No results matching the selected filters.")
# Tab 3: Keyword Management
with tab3:
st.header("Keyword Management")
st.markdown("Edit keyword lists used for validation. Changes apply immediately.")
col1, col2 = st.columns(2)
with col1:
st.subheader("Metal Keywords")
metal_text = st.text_area(
"Metal keywords (one per line)",
value="\n".join(st.session_state.keywords["metal"]),
height=150,
key="metal_input"
)
st.subheader("Aluminum Keywords")
aluminum_text = st.text_area(
"Aluminum keywords (one per line)",
value="\n".join(st.session_state.keywords["aluminum"]),
height=100,
key="aluminum_input"
)
st.subheader("Copper Keywords")
copper_text = st.text_area(
"Copper keywords (one per line)",
value="\n".join(st.session_state.keywords["copper"]),
height=100,
key="copper_input"
)
with col2:
st.subheader("Zinc Keywords")
zinc_text = st.text_area(
"Zinc keywords (one per line)",
value="\n".join(st.session_state.keywords["zinc"]),
height=100,
key="zinc_input"
)
st.subheader("Plastics Keywords")
plastics_text = st.text_area(
"Plastics keywords (one per line)",
value="\n".join(st.session_state.keywords["plastics"]),
height=150,
key="plastics_input"
)
col1, col2 = st.columns(2)
with col1:
if st.button("Save Keywords", type="primary"):
st.session_state.keywords["metal"] = [
k.strip() for k in metal_text.split("\n") if k.strip()
]
st.session_state.keywords["aluminum"] = [
k.strip() for k in aluminum_text.split("\n") if k.strip()
]
st.session_state.keywords["copper"] = [
k.strip() for k in copper_text.split("\n") if k.strip()
]
st.session_state.keywords["zinc"] = [
k.strip() for k in zinc_text.split("\n") if k.strip()
]
st.session_state.keywords["plastics"] = [
k.strip() for k in plastics_text.split("\n") if k.strip()
]
# Clear cached results to force re-validation
if "cached_full_results" in st.session_state:
del st.session_state.cached_full_results
if "cached_file_names" in st.session_state:
del st.session_state.cached_file_names
st.success("Keywords saved! Re-upload file or refresh to apply changes.")
with col2:
if st.button("Reset to Defaults"):
st.session_state.keywords = {
"metal": ["steel", "stainless steel", "carbon steel", "iron", "metal"],
"aluminum": ["aluminum", "aluminium"],
"copper": ["copper"],
"zinc": ["zinc"],
"plastics": ["plastic", "abs", "pu", "pvc", "polyester", "nylon"]
}
# Clear cached results
if "cached_full_results" in st.session_state:
del st.session_state.cached_full_results
if "cached_file_names" in st.session_state:
del st.session_state.cached_file_names
st.success("Keywords reset to defaults!")
st.rerun()
# Tab 4: Export Selection
with tab4:
st.header("Export Selection")
if len(st.session_state.export_cache) == 0:
st.info("No entries in export cache. Select entries from Validation Results tab.")
else:
st.write(f"**{len(st.session_state.export_cache)} entries in cache**")
# Display cache contents
cache_df = pd.DataFrame(st.session_state.export_cache)
st.dataframe(cache_df, use_container_width=True)
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Clear Cache"):
st.session_state.export_cache = []
st.success("Cache cleared!")
st.rerun()
with col2:
# Export cached entries only
if st.button("Export Cache to Excel"):
excel_data = export_to_excel(cache_df)
st.download_button(
label="Download Excel (Cache Only)",
data=excel_data,
file_name="hts_audit_cache.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
# Export full results with original data
st.subheader("Export Full Results")
if st.session_state.validation_results is not None and st.session_state.original_df is not None:
# validation_results is already a DataFrame now
results_df = st.session_state.validation_results.copy()
# Status filter for export
export_status = st.multiselect(
"Export entries with status:",
options=["PASS", "FAIL", "FLAG"],
default=["FAIL", "FLAG"],
key="export_status_filter"
)
# Create filtered export
if export_status:
filtered_results = results_df[results_df["Status"].isin(export_status)]
filtered_indices = filtered_results.index.tolist()
if hasattr(st.session_state, "filtered_df"):
export_original = st.session_state.filtered_df.iloc[filtered_indices].copy()
else:
export_original = st.session_state.original_df.iloc[filtered_indices].copy()
st.write(f"**{len(filtered_results)} entries will be exported**")
if st.button("Generate Full Export", type="primary"):
excel_data = export_to_excel(export_original, filtered_results)
st.download_button(
label="Download Full Excel Report",
data=excel_data,
file_name="hts_audit_full_report.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
else:
st.info("Run validation first to enable full export.")
# Tab 5: HTS Reference
with tab5:
st.header("HTS Reference Lists")
st.markdown("Reference lists of Steel, Aluminum, Copper, and Semiconductor HTS codes used for validation")
# Search filter
hts_search = st.text_input(
"Search HTS code",
placeholder="Enter HTS to search across all lists",
key="hts_reference_search"
)
col1, col2, col3, col4 = st.columns(4)
with col1:
st.subheader(f"Steel HTS ({len(Steel_primary_HTS_list)})")
steel_list = [str(h) for h in Steel_primary_HTS_list]
if hts_search:
steel_list = [h for h in steel_list if hts_search in h]
steel_df = pd.DataFrame({"Steel HTS": steel_list})
st.dataframe(steel_df, use_container_width=True, height=400)
with col2:
st.subheader(f"Aluminum HTS ({len(Aluminum_primary_HTS_list)})")
aluminum_list = [str(h) for h in Aluminum_primary_HTS_list]
if hts_search:
aluminum_list = [h for h in aluminum_list if hts_search in h]
aluminum_df = pd.DataFrame({"Aluminum HTS": aluminum_list})
st.dataframe(aluminum_df, use_container_width=True, height=400)
with col3:
st.subheader(f"Copper HTS ({len(Copper_primary_HTS_list)})")
copper_list = [str(h) for h in Copper_primary_HTS_list]
if hts_search:
copper_list = [h for h in copper_list if hts_search in h]
copper_df = pd.DataFrame({"Copper HTS": copper_list})
st.dataframe(copper_df, use_container_width=True, height=400)
with col4:
st.subheader(f"Semiconductor HTS ({len(Semiconductor_HTS_list)})")
semi_list = [str(h) for h in Semiconductor_HTS_list]
if hts_search:
semi_list = [h for h in semi_list if hts_search in h]
semi_df = pd.DataFrame({"Semiconductor HTS": semi_list})
st.dataframe(semi_df, use_container_width=True, height=400)
st.caption("Note: Overlaps with Computer Parts and Aluminum HTS")
# Show overlap info
st.subheader("HTS Overlap Analysis")
steel_set = set(str(h) for h in Steel_primary_HTS_list)
aluminum_set = set(str(h) for h in Aluminum_primary_HTS_list)
copper_set = set(str(h) for h in Copper_primary_HTS_list)
steel_aluminum = steel_set & aluminum_set
aluminum_copper = aluminum_set & copper_set
steel_copper = steel_set & copper_set
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Steel & Aluminum Overlap", len(steel_aluminum))
if steel_aluminum:
with st.expander("View overlapping HTS"):
st.write(sorted(steel_aluminum))
with col2:
st.metric("Aluminum & Copper Overlap", len(aluminum_copper))
if aluminum_copper:
with st.expander("View overlapping HTS"):
st.write(sorted(aluminum_copper))
with col3:
st.metric("Steel & Copper Overlap", len(steel_copper))
if steel_copper:
with st.expander("View overlapping HTS"):
st.write(sorted(steel_copper))
# Footer
st.markdown("---")
st.markdown("HTS Checker v1.0 - Tariff Audit Tool")