OneExcelZimraAI / app.py
rairo's picture
Update app.py
8018c05 verified
import streamlit as st
import pandas as pd
import numpy as np
import os
from io import BytesIO
import google.generativeai as genai
import json
import tempfile
import re
# Initialize Google Gemini AI client
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
def clean_column_name(col_name):
"""
Clean column names: convert to lowercase, replace non-alphanumeric characters with underscores.
"""
if not isinstance(col_name, str):
return str(col_name)
cleaned = re.sub(r"[^\w\s]", " ", col_name)
return re.sub(r"\s+", "_", cleaned.strip().lower())
def clean_tin_value(val):
"""
Clean the TIN value by stripping whitespace and, if it ends with '.0',
converting it to an integer string.
"""
val_str = str(val).strip()
if val_str.endswith('.0'):
try:
return str(int(float(val_str)))
except Exception:
return val_str
return val_str
def standardize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""
Standardize DataFrame column names and data types:
- Drop any middle name columns.
- Clean column names (e.g. "Employee Name" becomes "employee_name").
- Rename synonyms (e.g., "Personal ID of Employee" to "tin").
- If missing, construct an 'employee_name' column from first and last names.
- Ensure key columns (tin and employee_name) are strings.
"""
# Drop columns containing 'middle_name'
middle_name_cols = [col for col in df.columns if 'middle_name' in col.lower()]
if middle_name_cols:
df = df.drop(columns=middle_name_cols)
# Clean all column names
df.columns = [clean_column_name(col) for col in df.columns]
# Rename synonyms for TIN and salary
rename_map = {}
for col in df.columns:
if col in ['personal id', 'personal_id', 'tax id', 'taxid'] or "personal_id_of_employee" in col:
rename_map[col] = 'tin'
elif 'tin' in col:
rename_map[col] = 'tin'
if any(keyword in col for keyword in ['salary', 'wage', 'earning', 'commission', 'fee', 'payment', 'compensation']):
rename_map[col] = 'salary'
if rename_map:
df = df.rename(columns=rename_map)
# Combine duplicate columns if necessary
if 'salary' in df.columns and list(df.columns).count('salary') > 1:
salary_cols = [col for col in df.columns if col == 'salary']
df['salary'] = df[salary_cols].bfill(axis=1).iloc[:, 0]
df = df.loc[:, ~df.columns.duplicated()]
if 'tin' in df.columns and list(df.columns).count('tin') > 1:
tin_cols = [col for col in df.columns if col == 'tin']
df['tin'] = df[tin_cols].bfill(axis=1).iloc[:, 0]
df = df.loc[:, ~df.columns.duplicated()]
# Construct employee_name if missing
if 'employee_name' not in df.columns and 'first_name' in df.columns and 'last_name' in df.columns:
df['employee_name'] = df['first_name'].astype(str).str.strip() + ' ' + df['last_name'].astype(str).str.strip()
# Ensure proper types for key columns
if 'salary' in df.columns:
df['salary'] = pd.to_numeric(df['salary'], errors='coerce')
if 'tin' in df.columns:
df['tin'] = df['tin'].fillna('').astype(str).apply(clean_tin_value)
if 'employee_name' in df.columns:
df['employee_name'] = df['employee_name'].fillna('').astype(str).str.strip()
return df
def analyze_columns(df: pd.DataFrame, filename: str) -> dict:
"""
Use Gemini AI to analyze DataFrame columns and suggest key columns and renames.
"""
try:
display_df = df.head(5).copy()
for col in display_df.columns:
display_df[col] = display_df[col].astype(str)
sample_csv = display_df.to_csv(index=False)
prompt = f"""
Analyze this CSV data, which may represent an employee earnings schedule, PAYE figures, or template info for payroll processing.
Filename: {filename}
Sample data (first 5 rows):
{sample_csv}
Identify potential key columns for merging and suggest renames.
Respond with a valid JSON object.
"""
response = model.generate_content(prompt)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:-3]
elif response_text.startswith("```"):
response_text = response_text[3:-3]
response_text = response_text.strip()
try:
analysis = json.loads(response_text)
return analysis
except json.JSONDecodeError as je:
st.error(f"JSON parsing error: {str(je)}")
st.text("Raw response:")
st.text(response_text)
return {"subject": "Error parsing analysis", "columns": [], "key_columns": [], "issues": ["Error analyzing columns"], "suggested_renames": {}}
except Exception as e:
st.error(f"Error in column analysis: {str(e)}")
return {"subject": "Error in analysis", "columns": [], "key_columns": [], "issues": [str(e)], "suggested_renames": {}}
def read_excel_file(file) -> pd.DataFrame:
"""
Read an Excel file with error handling.
"""
try:
return pd.read_excel(file, engine="openpyxl")
except Exception as e1:
try:
return pd.read_excel(file, engine="xlrd")
except Exception as e2:
st.error(f"Failed to read Excel file: {str(e2)}")
return None
def merge_with_master(processed_files):
"""
Merge DataFrames in two steps:
1. Use the earnings file as master (dropping its inaccurate 'tin').
2. Merge the template file (which supplies the trusted TIN via its first column)
with the earnings data using 'employee_name'.
3. Finally, merge the combined data with the PAYE file using 'tin'.
"""
earnings_file = None
paye_file = None
template_file = None
# Identify files by filename keywords
for file_info in processed_files:
lower_filename = file_info["filename"].lower()
if "earnings" in lower_filename:
earnings_file = file_info
elif "paye" in lower_filename:
paye_file = file_info
elif "template" in lower_filename:
template_file = file_info
if not earnings_file:
st.warning("No earnings file found as master. Using the first file as master.")
earnings_file = processed_files[0]
# Process earnings file: drop its inaccurate TIN column
earnings_df = earnings_file["df"]
if 'tin' in earnings_df.columns:
earnings_df = earnings_df.drop(columns=['tin'])
if 'middle_name' in earnings_df.columns:
earnings_df = earnings_df.drop(columns=['middle_name'])
merged_df = earnings_df.copy()
# Process and merge the template file using employee_name
if template_file is not None:
st.write(f"Merging template info from '{template_file['filename']}' using key 'employee_name'.")
template_df = template_file["df"].copy()
# Force the first column (Personal ID of Employee) to be 'tin'
if not template_df.empty:
cols = list(template_df.columns)
cols[0] = "tin"
template_df.columns = cols
if 'middle_name' in template_df.columns:
template_df = template_df.drop(columns=['middle_name'])
# If employee_name is not present, construct it from first_name and last_name
if 'employee_name' not in template_df.columns and 'first_name' in template_df.columns and 'last_name' in template_df.columns:
template_df['employee_name'] = template_df['first_name'].astype(str).str.strip() + ' ' + template_df['last_name'].astype(str).str.strip()
if 'employee_name' in merged_df.columns and 'employee_name' in template_df.columns:
merged_df = merged_df.merge(template_df, on='employee_name', how='left', suffixes=('', '_template'))
else:
st.warning("Column 'employee_name' missing in either earnings or template file. Skipping template merge.")
else:
st.warning("No template file detected. Cannot proceed without a trusted TIN.")
# Check for a trusted 'tin' column after merging earnings and template
if 'tin' not in merged_df.columns or merged_df['tin'].isnull().all():
st.error("No trusted 'tin' column found in the merged earnings-template data. Aborting further merge. "
"Ensure the template file's first column (Personal ID of Employee) is correctly populated.")
return merged_df
# Merge PAYE file using the trusted 'tin'
if paye_file is not None:
st.write(f"Merging PAYE figures from '{paye_file['filename']}' using key 'tin'.")
paye_df = paye_file["df"]
if 'tin' in paye_df.columns:
merged_df = merged_df.merge(paye_df, on='tin', how='left', suffixes=('', '_paye'))
else:
st.warning("Column 'tin' missing in the PAYE file. Skipping PAYE merge.")
else:
st.warning("No PAYE file detected.")
return merged_df
def safe_display_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all entries in the DataFrame to strings and replace common null placeholders.
"""
return df.astype(str).replace({"nan": "", "None": ""})
def main():
st.title("Smart CSV Processor")
st.write("Upload CSV or Excel files for intelligent analysis and merging.")
uploaded_files = st.file_uploader("Choose files", accept_multiple_files=True, type=["csv", "xlsx", "xls"])
if uploaded_files:
st.write("### Processing Files")
processed_files = []
for uploaded_file in uploaded_files:
st.write(f"#### Analyzing: {uploaded_file.name}")
try:
if uploaded_file.name.endswith((".xlsx", ".xls")):
df = read_excel_file(uploaded_file)
else:
df = pd.read_csv(uploaded_file)
if df is not None:
if df.empty:
st.warning(f"DataFrame from '{uploaded_file.name}' is empty. Please check the file.")
continue
df = standardize_dataframe(df)
st.write("Initial Preview:")
st.dataframe(df.head())
with st.spinner("Analyzing columns..."):
analysis = analyze_columns(df, uploaded_file.name)
if analysis:
st.write("Column Analysis:")
st.json(analysis)
if 'suggested_renames' in analysis:
df = df.rename(columns=analysis['suggested_renames'])
processed_files.append({"filename": uploaded_file.name, "df": df, "analysis": analysis})
else:
st.error(f"Could not read data from '{uploaded_file.name}'.")
except Exception as e:
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
continue
if len(processed_files) > 1:
if not any(file_info["df"].empty for file_info in processed_files):
st.write("### Merging DataFrames (Earnings as Master)")
merged_df = merge_with_master(processed_files)
if merged_df is not None and not merged_df.empty:
st.write("### Preview of Merged Data")
st.dataframe(safe_display_df(merged_df.head()))
try:
csv = merged_df.to_csv(index=False)
st.download_button(label="Download Merged CSV", data=csv, file_name="merged_data.csv", mime="text/csv")
st.write("### Dataset Statistics")
st.write(f"Total rows: {len(merged_df)}")
st.write(f"Total columns: {len(merged_df.columns)}")
st.write("### Data Quality Metrics")
missing_df = pd.DataFrame({
"Column": merged_df.columns,
"Missing Values": merged_df.isnull().sum().values,
"Missing Percentage": (merged_df.isnull().sum().values / len(merged_df) * 100).round(2),
})
st.dataframe(missing_df)
duplicates = merged_df.duplicated().sum()
st.write(f"Number of duplicate rows: {duplicates}")
except Exception as e:
st.error(f"Error preparing download: {str(e)}")
elif merged_df is not None and merged_df.empty:
st.warning("The merged DataFrame is empty. Please check the input files and merging keys.")
else:
st.warning("One or more of the processed DataFrames is empty. Merging cannot proceed meaningfully.")
else:
st.warning("Please upload at least 2 files to merge.")
if __name__ == "__main__":
main()