OneExcelZimra / app.py
rairo's picture
Update app.py
0c23633 verified
import streamlit as st
import pandas as pd
import re
def clean_column_name(col_name):
"""Clean column names by replacing non-word characters with underscores."""
if not isinstance(col_name, str):
return str(col_name)
cleaned = re.sub(r"[^\w\s]", " ", col_name)
return re.sub(r"\s+", "_", cleaned.strip().lower())
def standardize_tin_column(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean column names and rename any column that contains 'tin'
or both 'personal' and 'id' to 'tin'. Then strip extra spaces from all string values.
"""
df.columns = [clean_column_name(col) for col in df.columns]
rename_map = {}
for col in df.columns:
col_lower = col.lower()
if "tin" in col_lower or (("personal" in col_lower) and ("id" in col_lower)):
rename_map[col] = "tin"
if rename_map:
df = df.rename(columns=rename_map)
# Remove trailing and leading spaces in string cells
for col in df.columns:
if df[col].dtype == object:
df[col] = df[col].astype(str).str.strip()
return df
def read_file(file, skip_first_row=False) -> pd.DataFrame:
"""
Read a CSV or Excel file into a DataFrame.
For the earnings file, skip_first_row=True will skip the first row (with currency labels).
"""
try:
if file.name.endswith((".xlsx", ".xls")):
return pd.read_excel(file, skiprows=1 if skip_first_row else None)
else:
return pd.read_csv(file, skiprows=1 if skip_first_row else None)
except Exception as e:
st.error(f"Error reading {file.name}: {str(e)}")
return None
def safe_display_df(df: pd.DataFrame) -> pd.DataFrame:
"""Convert DataFrame values to strings for safe display."""
return df.astype(str).replace({"nan": "", "None": ""})
def main():
st.title("Merge Employee Name from Earnings into PAYE Sheet")
st.write(
"Upload an Earnings Sheet and a PAYE Sheet. The Earnings Sheet is assumed to have a first row with currency labels "
"which will be skipped. The app will extract the first two columns (TIN and Employee Name) from the Earnings Sheet, "
"and merge the Employee Name onto the PAYE sheet using the cleaned TIN."
)
earnings_file = st.file_uploader("Upload Earnings Sheet", type=["csv", "xlsx", "xls"], key="earnings")
paye_file = st.file_uploader("Upload PAYE Sheet", type=["csv", "xlsx", "xls"], key="paye")
if earnings_file and paye_file:
# Read the earnings file with the first row skipped and the PAYE file normally.
earnings_df = read_file(earnings_file, skip_first_row=True)
paye_df = read_file(paye_file, skip_first_row=False)
if earnings_df is None or paye_df is None:
st.error("One of the files could not be read. Please check the files and try again.")
return
# Standardize columns and TIN values for both files.
earnings_df = standardize_tin_column(earnings_df)
paye_df = standardize_tin_column(paye_df)
# Debug: display unique TIN values from both files
st.write("Unique TIN values in Earnings file:", earnings_df.iloc[:, 0].unique())
if "tin" in paye_df.columns:
st.write("Unique TIN values in PAYE file:", paye_df["tin"].unique())
else:
st.write("PAYE file columns:", list(paye_df.columns))
# Check that the earnings file has at least two columns.
if earnings_df.shape[1] < 2:
st.error("Earnings sheet must have at least two columns (TIN and Employee Name).")
return
# Extract the first two columns from the earnings file.
earnings_subset = earnings_df.iloc[:, :2].copy()
earnings_subset.columns = ["tin", "employee_name"]
earnings_subset["tin"] = earnings_subset["tin"].astype(str).str.strip()
earnings_subset["employee_name"] = earnings_subset["employee_name"].astype(str).str.strip()
st.write("Preview of extracted TIN and Employee Name from Earnings Sheet:")
st.dataframe(safe_display_df(earnings_subset.head()))
# Verify the PAYE sheet has a 'tin' column.
if "tin" not in paye_df.columns:
st.error("The PAYE sheet does not have a recognized TIN column (e.g., 'tin' or 'personal id').")
return
else:
paye_df["tin"] = paye_df["tin"].astype(str).str.strip()
# Merge the employee name from earnings_subset onto the PAYE sheet using 'tin'.
merged_df = paye_df.merge(earnings_subset, on="tin", how="left")
st.write("### Merged PAYE Sheet with Employee Name")
st.dataframe(safe_display_df(merged_df.head()))
# Option to download the merged data as CSV.
csv_data = merged_df.to_csv(index=False).encode("utf-8")
st.download_button(
label="Download Merged CSV",
data=csv_data,
file_name="merged_paye.csv",
mime="text/csv"
)
st.write(f"Total rows in merged data: {len(merged_df)}")
else:
st.info("Please upload both an Earnings Sheet and a PAYE Sheet.")
if __name__ == "__main__":
main()