import streamlit as st import pandas as pd import numpy as np import os from io import BytesIO import google.generativeai as genai import json import tempfile import re # Initialize Google Gemini AI client genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp') def clean_column_name(col_name): """ Clean column names: convert to lowercase, replace non-alphanumeric characters with underscores. """ if not isinstance(col_name, str): return str(col_name) cleaned = re.sub(r"[^\w\s]", " ", col_name) return re.sub(r"\s+", "_", cleaned.strip().lower()) def clean_tin_value(val): """ Clean the TIN value by stripping whitespace and, if it ends with '.0', converting it to an integer string. """ val_str = str(val).strip() if val_str.endswith('.0'): try: return str(int(float(val_str))) except Exception: return val_str return val_str def standardize_dataframe(df: pd.DataFrame) -> pd.DataFrame: """ Standardize DataFrame column names and data types: - Drop any middle name columns. - Clean column names (e.g. "Employee Name" becomes "employee_name"). - Rename synonyms (e.g., "Personal ID of Employee" to "tin"). - If missing, construct an 'employee_name' column from first and last names. - Ensure key columns (tin and employee_name) are strings. """ # Drop columns containing 'middle_name' middle_name_cols = [col for col in df.columns if 'middle_name' in col.lower()] if middle_name_cols: df = df.drop(columns=middle_name_cols) # Clean all column names df.columns = [clean_column_name(col) for col in df.columns] # Rename synonyms for TIN and salary rename_map = {} for col in df.columns: if col in ['personal id', 'personal_id', 'tax id', 'taxid'] or "personal_id_of_employee" in col: rename_map[col] = 'tin' elif 'tin' in col: rename_map[col] = 'tin' if any(keyword in col for keyword in ['salary', 'wage', 'earning', 'commission', 'fee', 'payment', 'compensation']): rename_map[col] = 'salary' if rename_map: df = df.rename(columns=rename_map) # Combine duplicate columns if necessary if 'salary' in df.columns and list(df.columns).count('salary') > 1: salary_cols = [col for col in df.columns if col == 'salary'] df['salary'] = df[salary_cols].bfill(axis=1).iloc[:, 0] df = df.loc[:, ~df.columns.duplicated()] if 'tin' in df.columns and list(df.columns).count('tin') > 1: tin_cols = [col for col in df.columns if col == 'tin'] df['tin'] = df[tin_cols].bfill(axis=1).iloc[:, 0] df = df.loc[:, ~df.columns.duplicated()] # Construct employee_name if missing if 'employee_name' not in df.columns and 'first_name' in df.columns and 'last_name' in df.columns: df['employee_name'] = df['first_name'].astype(str).str.strip() + ' ' + df['last_name'].astype(str).str.strip() # Ensure proper types for key columns if 'salary' in df.columns: df['salary'] = pd.to_numeric(df['salary'], errors='coerce') if 'tin' in df.columns: df['tin'] = df['tin'].fillna('').astype(str).apply(clean_tin_value) if 'employee_name' in df.columns: df['employee_name'] = df['employee_name'].fillna('').astype(str).str.strip() return df def analyze_columns(df: pd.DataFrame, filename: str) -> dict: """ Use Gemini AI to analyze DataFrame columns and suggest key columns and renames. """ try: display_df = df.head(5).copy() for col in display_df.columns: display_df[col] = display_df[col].astype(str) sample_csv = display_df.to_csv(index=False) prompt = f""" Analyze this CSV data, which may represent an employee earnings schedule, PAYE figures, or template info for payroll processing. Filename: {filename} Sample data (first 5 rows): {sample_csv} Identify potential key columns for merging and suggest renames. Respond with a valid JSON object. """ response = model.generate_content(prompt) response_text = response.text.strip() if response_text.startswith("```json"): response_text = response_text[7:-3] elif response_text.startswith("```"): response_text = response_text[3:-3] response_text = response_text.strip() try: analysis = json.loads(response_text) return analysis except json.JSONDecodeError as je: st.error(f"JSON parsing error: {str(je)}") st.text("Raw response:") st.text(response_text) return {"subject": "Error parsing analysis", "columns": [], "key_columns": [], "issues": ["Error analyzing columns"], "suggested_renames": {}} except Exception as e: st.error(f"Error in column analysis: {str(e)}") return {"subject": "Error in analysis", "columns": [], "key_columns": [], "issues": [str(e)], "suggested_renames": {}} def read_excel_file(file) -> pd.DataFrame: """ Read an Excel file with error handling. """ try: return pd.read_excel(file, engine="openpyxl") except Exception as e1: try: return pd.read_excel(file, engine="xlrd") except Exception as e2: st.error(f"Failed to read Excel file: {str(e2)}") return None def merge_with_master(processed_files): """ Merge DataFrames in two steps: 1. Use the earnings file as master (dropping its inaccurate 'tin'). 2. Merge the template file (which supplies the trusted TIN via its first column) with the earnings data using 'employee_name'. 3. Finally, merge the combined data with the PAYE file using 'tin'. """ earnings_file = None paye_file = None template_file = None # Identify files by filename keywords for file_info in processed_files: lower_filename = file_info["filename"].lower() if "earnings" in lower_filename: earnings_file = file_info elif "paye" in lower_filename: paye_file = file_info elif "template" in lower_filename: template_file = file_info if not earnings_file: st.warning("No earnings file found as master. Using the first file as master.") earnings_file = processed_files[0] # Process earnings file: drop its inaccurate TIN column earnings_df = earnings_file["df"] if 'tin' in earnings_df.columns: earnings_df = earnings_df.drop(columns=['tin']) if 'middle_name' in earnings_df.columns: earnings_df = earnings_df.drop(columns=['middle_name']) merged_df = earnings_df.copy() # Process and merge the template file using employee_name if template_file is not None: st.write(f"Merging template info from '{template_file['filename']}' using key 'employee_name'.") template_df = template_file["df"].copy() # Force the first column (Personal ID of Employee) to be 'tin' if not template_df.empty: cols = list(template_df.columns) cols[0] = "tin" template_df.columns = cols if 'middle_name' in template_df.columns: template_df = template_df.drop(columns=['middle_name']) # If employee_name is not present, construct it from first_name and last_name if 'employee_name' not in template_df.columns and 'first_name' in template_df.columns and 'last_name' in template_df.columns: template_df['employee_name'] = template_df['first_name'].astype(str).str.strip() + ' ' + template_df['last_name'].astype(str).str.strip() if 'employee_name' in merged_df.columns and 'employee_name' in template_df.columns: merged_df = merged_df.merge(template_df, on='employee_name', how='left', suffixes=('', '_template')) else: st.warning("Column 'employee_name' missing in either earnings or template file. Skipping template merge.") else: st.warning("No template file detected. Cannot proceed without a trusted TIN.") # Check for a trusted 'tin' column after merging earnings and template if 'tin' not in merged_df.columns or merged_df['tin'].isnull().all(): st.error("No trusted 'tin' column found in the merged earnings-template data. Aborting further merge. " "Ensure the template file's first column (Personal ID of Employee) is correctly populated.") return merged_df # Merge PAYE file using the trusted 'tin' if paye_file is not None: st.write(f"Merging PAYE figures from '{paye_file['filename']}' using key 'tin'.") paye_df = paye_file["df"] if 'tin' in paye_df.columns: merged_df = merged_df.merge(paye_df, on='tin', how='left', suffixes=('', '_paye')) else: st.warning("Column 'tin' missing in the PAYE file. Skipping PAYE merge.") else: st.warning("No PAYE file detected.") return merged_df def safe_display_df(df: pd.DataFrame) -> pd.DataFrame: """ Convert all entries in the DataFrame to strings and replace common null placeholders. """ return df.astype(str).replace({"nan": "", "None": ""}) def main(): st.title("Smart CSV Processor") st.write("Upload CSV or Excel files for intelligent analysis and merging.") uploaded_files = st.file_uploader("Choose files", accept_multiple_files=True, type=["csv", "xlsx", "xls"]) if uploaded_files: st.write("### Processing Files") processed_files = [] for uploaded_file in uploaded_files: st.write(f"#### Analyzing: {uploaded_file.name}") try: if uploaded_file.name.endswith((".xlsx", ".xls")): df = read_excel_file(uploaded_file) else: df = pd.read_csv(uploaded_file) if df is not None: if df.empty: st.warning(f"DataFrame from '{uploaded_file.name}' is empty. Please check the file.") continue df = standardize_dataframe(df) st.write("Initial Preview:") st.dataframe(df.head()) with st.spinner("Analyzing columns..."): analysis = analyze_columns(df, uploaded_file.name) if analysis: st.write("Column Analysis:") st.json(analysis) if 'suggested_renames' in analysis: df = df.rename(columns=analysis['suggested_renames']) processed_files.append({"filename": uploaded_file.name, "df": df, "analysis": analysis}) else: st.error(f"Could not read data from '{uploaded_file.name}'.") except Exception as e: st.error(f"Error processing {uploaded_file.name}: {str(e)}") continue if len(processed_files) > 1: if not any(file_info["df"].empty for file_info in processed_files): st.write("### Merging DataFrames (Earnings as Master)") merged_df = merge_with_master(processed_files) if merged_df is not None and not merged_df.empty: st.write("### Preview of Merged Data") st.dataframe(safe_display_df(merged_df.head())) try: csv = merged_df.to_csv(index=False) st.download_button(label="Download Merged CSV", data=csv, file_name="merged_data.csv", mime="text/csv") st.write("### Dataset Statistics") st.write(f"Total rows: {len(merged_df)}") st.write(f"Total columns: {len(merged_df.columns)}") st.write("### Data Quality Metrics") missing_df = pd.DataFrame({ "Column": merged_df.columns, "Missing Values": merged_df.isnull().sum().values, "Missing Percentage": (merged_df.isnull().sum().values / len(merged_df) * 100).round(2), }) st.dataframe(missing_df) duplicates = merged_df.duplicated().sum() st.write(f"Number of duplicate rows: {duplicates}") except Exception as e: st.error(f"Error preparing download: {str(e)}") elif merged_df is not None and merged_df.empty: st.warning("The merged DataFrame is empty. Please check the input files and merging keys.") else: st.warning("One or more of the processed DataFrames is empty. Merging cannot proceed meaningfully.") else: st.warning("Please upload at least 2 files to merge.") if __name__ == "__main__": main()