import streamlit as st import pandas as pd import re def clean_column_name(col_name): """Clean column names by replacing non-word characters with underscores.""" if not isinstance(col_name, str): return str(col_name) cleaned = re.sub(r"[^\w\s]", " ", col_name) return re.sub(r"\s+", "_", cleaned.strip().lower()) def standardize_tin_column(df: pd.DataFrame) -> pd.DataFrame: """ Clean column names and rename any column that contains 'tin' or both 'personal' and 'id' to 'tin'. Then strip extra spaces from all string values. """ df.columns = [clean_column_name(col) for col in df.columns] rename_map = {} for col in df.columns: col_lower = col.lower() if "tin" in col_lower or (("personal" in col_lower) and ("id" in col_lower)): rename_map[col] = "tin" if rename_map: df = df.rename(columns=rename_map) # Remove trailing and leading spaces in string cells for col in df.columns: if df[col].dtype == object: df[col] = df[col].astype(str).str.strip() return df def read_file(file, skip_first_row=False) -> pd.DataFrame: """ Read a CSV or Excel file into a DataFrame. For the earnings file, skip_first_row=True will skip the first row (with currency labels). """ try: if file.name.endswith((".xlsx", ".xls")): return pd.read_excel(file, skiprows=1 if skip_first_row else None) else: return pd.read_csv(file, skiprows=1 if skip_first_row else None) except Exception as e: st.error(f"Error reading {file.name}: {str(e)}") return None def safe_display_df(df: pd.DataFrame) -> pd.DataFrame: """Convert DataFrame values to strings for safe display.""" return df.astype(str).replace({"nan": "", "None": ""}) def main(): st.title("Merge Employee Name from Earnings into PAYE Sheet") st.write( "Upload an Earnings Sheet and a PAYE Sheet. The Earnings Sheet is assumed to have a first row with currency labels " "which will be skipped. The app will extract the first two columns (TIN and Employee Name) from the Earnings Sheet, " "and merge the Employee Name onto the PAYE sheet using the cleaned TIN." ) earnings_file = st.file_uploader("Upload Earnings Sheet", type=["csv", "xlsx", "xls"], key="earnings") paye_file = st.file_uploader("Upload PAYE Sheet", type=["csv", "xlsx", "xls"], key="paye") if earnings_file and paye_file: # Read the earnings file with the first row skipped and the PAYE file normally. earnings_df = read_file(earnings_file, skip_first_row=True) paye_df = read_file(paye_file, skip_first_row=False) if earnings_df is None or paye_df is None: st.error("One of the files could not be read. Please check the files and try again.") return # Standardize columns and TIN values for both files. earnings_df = standardize_tin_column(earnings_df) paye_df = standardize_tin_column(paye_df) # Debug: display unique TIN values from both files st.write("Unique TIN values in Earnings file:", earnings_df.iloc[:, 0].unique()) if "tin" in paye_df.columns: st.write("Unique TIN values in PAYE file:", paye_df["tin"].unique()) else: st.write("PAYE file columns:", list(paye_df.columns)) # Check that the earnings file has at least two columns. if earnings_df.shape[1] < 2: st.error("Earnings sheet must have at least two columns (TIN and Employee Name).") return # Extract the first two columns from the earnings file. earnings_subset = earnings_df.iloc[:, :2].copy() earnings_subset.columns = ["tin", "employee_name"] earnings_subset["tin"] = earnings_subset["tin"].astype(str).str.strip() earnings_subset["employee_name"] = earnings_subset["employee_name"].astype(str).str.strip() st.write("Preview of extracted TIN and Employee Name from Earnings Sheet:") st.dataframe(safe_display_df(earnings_subset.head())) # Verify the PAYE sheet has a 'tin' column. if "tin" not in paye_df.columns: st.error("The PAYE sheet does not have a recognized TIN column (e.g., 'tin' or 'personal id').") return else: paye_df["tin"] = paye_df["tin"].astype(str).str.strip() # Merge the employee name from earnings_subset onto the PAYE sheet using 'tin'. merged_df = paye_df.merge(earnings_subset, on="tin", how="left") st.write("### Merged PAYE Sheet with Employee Name") st.dataframe(safe_display_df(merged_df.head())) # Option to download the merged data as CSV. csv_data = merged_df.to_csv(index=False).encode("utf-8") st.download_button( label="Download Merged CSV", data=csv_data, file_name="merged_paye.csv", mime="text/csv" ) st.write(f"Total rows in merged data: {len(merged_df)}") else: st.info("Please upload both an Earnings Sheet and a PAYE Sheet.") if __name__ == "__main__": main()