Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import re | |
| def clean_column_name(col_name): | |
| """Clean column names by replacing non-word characters with underscores.""" | |
| if not isinstance(col_name, str): | |
| return str(col_name) | |
| cleaned = re.sub(r"[^\w\s]", " ", col_name) | |
| return re.sub(r"\s+", "_", cleaned.strip().lower()) | |
| def standardize_tin_column(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Clean column names and rename any column that contains 'tin' | |
| or both 'personal' and 'id' to 'tin'. Then strip extra spaces from all string values. | |
| """ | |
| df.columns = [clean_column_name(col) for col in df.columns] | |
| rename_map = {} | |
| for col in df.columns: | |
| col_lower = col.lower() | |
| if "tin" in col_lower or (("personal" in col_lower) and ("id" in col_lower)): | |
| rename_map[col] = "tin" | |
| if rename_map: | |
| df = df.rename(columns=rename_map) | |
| # Remove trailing and leading spaces in string cells | |
| for col in df.columns: | |
| if df[col].dtype == object: | |
| df[col] = df[col].astype(str).str.strip() | |
| return df | |
| def read_file(file, skip_first_row=False) -> pd.DataFrame: | |
| """ | |
| Read a CSV or Excel file into a DataFrame. | |
| For the earnings file, skip_first_row=True will skip the first row (with currency labels). | |
| """ | |
| try: | |
| if file.name.endswith((".xlsx", ".xls")): | |
| return pd.read_excel(file, skiprows=1 if skip_first_row else None) | |
| else: | |
| return pd.read_csv(file, skiprows=1 if skip_first_row else None) | |
| except Exception as e: | |
| st.error(f"Error reading {file.name}: {str(e)}") | |
| return None | |
| def safe_display_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """Convert DataFrame values to strings for safe display.""" | |
| return df.astype(str).replace({"nan": "", "None": ""}) | |
| def main(): | |
| st.title("Merge Employee Name from Earnings into PAYE Sheet") | |
| st.write( | |
| "Upload an Earnings Sheet and a PAYE Sheet. The Earnings Sheet is assumed to have a first row with currency labels " | |
| "which will be skipped. The app will extract the first two columns (TIN and Employee Name) from the Earnings Sheet, " | |
| "and merge the Employee Name onto the PAYE sheet using the cleaned TIN." | |
| ) | |
| earnings_file = st.file_uploader("Upload Earnings Sheet", type=["csv", "xlsx", "xls"], key="earnings") | |
| paye_file = st.file_uploader("Upload PAYE Sheet", type=["csv", "xlsx", "xls"], key="paye") | |
| if earnings_file and paye_file: | |
| # Read the earnings file with the first row skipped and the PAYE file normally. | |
| earnings_df = read_file(earnings_file, skip_first_row=True) | |
| paye_df = read_file(paye_file, skip_first_row=False) | |
| if earnings_df is None or paye_df is None: | |
| st.error("One of the files could not be read. Please check the files and try again.") | |
| return | |
| # Standardize columns and TIN values for both files. | |
| earnings_df = standardize_tin_column(earnings_df) | |
| paye_df = standardize_tin_column(paye_df) | |
| # Debug: display unique TIN values from both files | |
| st.write("Unique TIN values in Earnings file:", earnings_df.iloc[:, 0].unique()) | |
| if "tin" in paye_df.columns: | |
| st.write("Unique TIN values in PAYE file:", paye_df["tin"].unique()) | |
| else: | |
| st.write("PAYE file columns:", list(paye_df.columns)) | |
| # Check that the earnings file has at least two columns. | |
| if earnings_df.shape[1] < 2: | |
| st.error("Earnings sheet must have at least two columns (TIN and Employee Name).") | |
| return | |
| # Extract the first two columns from the earnings file. | |
| earnings_subset = earnings_df.iloc[:, :2].copy() | |
| earnings_subset.columns = ["tin", "employee_name"] | |
| earnings_subset["tin"] = earnings_subset["tin"].astype(str).str.strip() | |
| earnings_subset["employee_name"] = earnings_subset["employee_name"].astype(str).str.strip() | |
| st.write("Preview of extracted TIN and Employee Name from Earnings Sheet:") | |
| st.dataframe(safe_display_df(earnings_subset.head())) | |
| # Verify the PAYE sheet has a 'tin' column. | |
| if "tin" not in paye_df.columns: | |
| st.error("The PAYE sheet does not have a recognized TIN column (e.g., 'tin' or 'personal id').") | |
| return | |
| else: | |
| paye_df["tin"] = paye_df["tin"].astype(str).str.strip() | |
| # Merge the employee name from earnings_subset onto the PAYE sheet using 'tin'. | |
| merged_df = paye_df.merge(earnings_subset, on="tin", how="left") | |
| st.write("### Merged PAYE Sheet with Employee Name") | |
| st.dataframe(safe_display_df(merged_df.head())) | |
| # Option to download the merged data as CSV. | |
| csv_data = merged_df.to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="Download Merged CSV", | |
| data=csv_data, | |
| file_name="merged_paye.csv", | |
| mime="text/csv" | |
| ) | |
| st.write(f"Total rows in merged data: {len(merged_df)}") | |
| else: | |
| st.info("Please upload both an Earnings Sheet and a PAYE Sheet.") | |
| if __name__ == "__main__": | |
| main() |