Spaces:
Build error
Build error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| from io import BytesIO | |
| import google.generativeai as genai | |
| import json | |
| import tempfile | |
| import re | |
| # Initialize Google Gemini AI client | |
| genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) | |
| model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def clean_column_name(col_name): | |
| """ | |
| Clean column names: convert to lowercase, replace non-alphanumeric characters with underscores. | |
| """ | |
| if not isinstance(col_name, str): | |
| return str(col_name) | |
| cleaned = re.sub(r"[^\w\s]", " ", col_name) | |
| return re.sub(r"\s+", "_", cleaned.strip().lower()) | |
| def clean_tin_value(val): | |
| """ | |
| Clean the TIN value by stripping whitespace and, if it ends with '.0', | |
| converting it to an integer string. | |
| """ | |
| val_str = str(val).strip() | |
| if val_str.endswith('.0'): | |
| try: | |
| return str(int(float(val_str))) | |
| except Exception: | |
| return val_str | |
| return val_str | |
| def standardize_dataframe(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Standardize DataFrame column names and data types: | |
| - Drop any middle name columns. | |
| - Clean column names (e.g. "Employee Name" becomes "employee_name"). | |
| - Rename synonyms (e.g., "Personal ID of Employee" to "tin"). | |
| - If missing, construct an 'employee_name' column from first and last names. | |
| - Ensure key columns (tin and employee_name) are strings. | |
| """ | |
| # Drop columns containing 'middle_name' | |
| middle_name_cols = [col for col in df.columns if 'middle_name' in col.lower()] | |
| if middle_name_cols: | |
| df = df.drop(columns=middle_name_cols) | |
| # Clean all column names | |
| df.columns = [clean_column_name(col) for col in df.columns] | |
| # Rename synonyms for TIN and salary | |
| rename_map = {} | |
| for col in df.columns: | |
| if col in ['personal id', 'personal_id', 'tax id', 'taxid'] or "personal_id_of_employee" in col: | |
| rename_map[col] = 'tin' | |
| elif 'tin' in col: | |
| rename_map[col] = 'tin' | |
| if any(keyword in col for keyword in ['salary', 'wage', 'earning', 'commission', 'fee', 'payment', 'compensation']): | |
| rename_map[col] = 'salary' | |
| if rename_map: | |
| df = df.rename(columns=rename_map) | |
| # Combine duplicate columns if necessary | |
| if 'salary' in df.columns and list(df.columns).count('salary') > 1: | |
| salary_cols = [col for col in df.columns if col == 'salary'] | |
| df['salary'] = df[salary_cols].bfill(axis=1).iloc[:, 0] | |
| df = df.loc[:, ~df.columns.duplicated()] | |
| if 'tin' in df.columns and list(df.columns).count('tin') > 1: | |
| tin_cols = [col for col in df.columns if col == 'tin'] | |
| df['tin'] = df[tin_cols].bfill(axis=1).iloc[:, 0] | |
| df = df.loc[:, ~df.columns.duplicated()] | |
| # Construct employee_name if missing | |
| if 'employee_name' not in df.columns and 'first_name' in df.columns and 'last_name' in df.columns: | |
| df['employee_name'] = df['first_name'].astype(str).str.strip() + ' ' + df['last_name'].astype(str).str.strip() | |
| # Ensure proper types for key columns | |
| if 'salary' in df.columns: | |
| df['salary'] = pd.to_numeric(df['salary'], errors='coerce') | |
| if 'tin' in df.columns: | |
| df['tin'] = df['tin'].fillna('').astype(str).apply(clean_tin_value) | |
| if 'employee_name' in df.columns: | |
| df['employee_name'] = df['employee_name'].fillna('').astype(str).str.strip() | |
| return df | |
| def analyze_columns(df: pd.DataFrame, filename: str) -> dict: | |
| """ | |
| Use Gemini AI to analyze DataFrame columns and suggest key columns and renames. | |
| """ | |
| try: | |
| display_df = df.head(5).copy() | |
| for col in display_df.columns: | |
| display_df[col] = display_df[col].astype(str) | |
| sample_csv = display_df.to_csv(index=False) | |
| prompt = f""" | |
| Analyze this CSV data, which may represent an employee earnings schedule, PAYE figures, or template info for payroll processing. | |
| Filename: {filename} | |
| Sample data (first 5 rows): | |
| {sample_csv} | |
| Identify potential key columns for merging and suggest renames. | |
| Respond with a valid JSON object. | |
| """ | |
| response = model.generate_content(prompt) | |
| response_text = response.text.strip() | |
| if response_text.startswith("```json"): | |
| response_text = response_text[7:-3] | |
| elif response_text.startswith("```"): | |
| response_text = response_text[3:-3] | |
| response_text = response_text.strip() | |
| try: | |
| analysis = json.loads(response_text) | |
| return analysis | |
| except json.JSONDecodeError as je: | |
| st.error(f"JSON parsing error: {str(je)}") | |
| st.text("Raw response:") | |
| st.text(response_text) | |
| return {"subject": "Error parsing analysis", "columns": [], "key_columns": [], "issues": ["Error analyzing columns"], "suggested_renames": {}} | |
| except Exception as e: | |
| st.error(f"Error in column analysis: {str(e)}") | |
| return {"subject": "Error in analysis", "columns": [], "key_columns": [], "issues": [str(e)], "suggested_renames": {}} | |
| def read_excel_file(file) -> pd.DataFrame: | |
| """ | |
| Read an Excel file with error handling. | |
| """ | |
| try: | |
| return pd.read_excel(file, engine="openpyxl") | |
| except Exception as e1: | |
| try: | |
| return pd.read_excel(file, engine="xlrd") | |
| except Exception as e2: | |
| st.error(f"Failed to read Excel file: {str(e2)}") | |
| return None | |
| def merge_with_master(processed_files): | |
| """ | |
| Merge DataFrames in two steps: | |
| 1. Use the earnings file as master (dropping its inaccurate 'tin'). | |
| 2. Merge the template file (which supplies the trusted TIN via its first column) | |
| with the earnings data using 'employee_name'. | |
| 3. Finally, merge the combined data with the PAYE file using 'tin'. | |
| """ | |
| earnings_file = None | |
| paye_file = None | |
| template_file = None | |
| # Identify files by filename keywords | |
| for file_info in processed_files: | |
| lower_filename = file_info["filename"].lower() | |
| if "earnings" in lower_filename: | |
| earnings_file = file_info | |
| elif "paye" in lower_filename: | |
| paye_file = file_info | |
| elif "template" in lower_filename: | |
| template_file = file_info | |
| if not earnings_file: | |
| st.warning("No earnings file found as master. Using the first file as master.") | |
| earnings_file = processed_files[0] | |
| # Process earnings file: drop its inaccurate TIN column | |
| earnings_df = earnings_file["df"] | |
| if 'tin' in earnings_df.columns: | |
| earnings_df = earnings_df.drop(columns=['tin']) | |
| if 'middle_name' in earnings_df.columns: | |
| earnings_df = earnings_df.drop(columns=['middle_name']) | |
| merged_df = earnings_df.copy() | |
| # Process and merge the template file using employee_name | |
| if template_file is not None: | |
| st.write(f"Merging template info from '{template_file['filename']}' using key 'employee_name'.") | |
| template_df = template_file["df"].copy() | |
| # Force the first column (Personal ID of Employee) to be 'tin' | |
| if not template_df.empty: | |
| cols = list(template_df.columns) | |
| cols[0] = "tin" | |
| template_df.columns = cols | |
| if 'middle_name' in template_df.columns: | |
| template_df = template_df.drop(columns=['middle_name']) | |
| # If employee_name is not present, construct it from first_name and last_name | |
| if 'employee_name' not in template_df.columns and 'first_name' in template_df.columns and 'last_name' in template_df.columns: | |
| template_df['employee_name'] = template_df['first_name'].astype(str).str.strip() + ' ' + template_df['last_name'].astype(str).str.strip() | |
| if 'employee_name' in merged_df.columns and 'employee_name' in template_df.columns: | |
| merged_df = merged_df.merge(template_df, on='employee_name', how='left', suffixes=('', '_template')) | |
| else: | |
| st.warning("Column 'employee_name' missing in either earnings or template file. Skipping template merge.") | |
| else: | |
| st.warning("No template file detected. Cannot proceed without a trusted TIN.") | |
| # Check for a trusted 'tin' column after merging earnings and template | |
| if 'tin' not in merged_df.columns or merged_df['tin'].isnull().all(): | |
| st.error("No trusted 'tin' column found in the merged earnings-template data. Aborting further merge. " | |
| "Ensure the template file's first column (Personal ID of Employee) is correctly populated.") | |
| return merged_df | |
| # Merge PAYE file using the trusted 'tin' | |
| if paye_file is not None: | |
| st.write(f"Merging PAYE figures from '{paye_file['filename']}' using key 'tin'.") | |
| paye_df = paye_file["df"] | |
| if 'tin' in paye_df.columns: | |
| merged_df = merged_df.merge(paye_df, on='tin', how='left', suffixes=('', '_paye')) | |
| else: | |
| st.warning("Column 'tin' missing in the PAYE file. Skipping PAYE merge.") | |
| else: | |
| st.warning("No PAYE file detected.") | |
| return merged_df | |
| def safe_display_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Convert all entries in the DataFrame to strings and replace common null placeholders. | |
| """ | |
| return df.astype(str).replace({"nan": "", "None": ""}) | |
| def main(): | |
| st.title("Smart CSV Processor") | |
| st.write("Upload CSV or Excel files for intelligent analysis and merging.") | |
| uploaded_files = st.file_uploader("Choose files", accept_multiple_files=True, type=["csv", "xlsx", "xls"]) | |
| if uploaded_files: | |
| st.write("### Processing Files") | |
| processed_files = [] | |
| for uploaded_file in uploaded_files: | |
| st.write(f"#### Analyzing: {uploaded_file.name}") | |
| try: | |
| if uploaded_file.name.endswith((".xlsx", ".xls")): | |
| df = read_excel_file(uploaded_file) | |
| else: | |
| df = pd.read_csv(uploaded_file) | |
| if df is not None: | |
| if df.empty: | |
| st.warning(f"DataFrame from '{uploaded_file.name}' is empty. Please check the file.") | |
| continue | |
| df = standardize_dataframe(df) | |
| st.write("Initial Preview:") | |
| st.dataframe(df.head()) | |
| with st.spinner("Analyzing columns..."): | |
| analysis = analyze_columns(df, uploaded_file.name) | |
| if analysis: | |
| st.write("Column Analysis:") | |
| st.json(analysis) | |
| if 'suggested_renames' in analysis: | |
| df = df.rename(columns=analysis['suggested_renames']) | |
| processed_files.append({"filename": uploaded_file.name, "df": df, "analysis": analysis}) | |
| else: | |
| st.error(f"Could not read data from '{uploaded_file.name}'.") | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {str(e)}") | |
| continue | |
| if len(processed_files) > 1: | |
| if not any(file_info["df"].empty for file_info in processed_files): | |
| st.write("### Merging DataFrames (Earnings as Master)") | |
| merged_df = merge_with_master(processed_files) | |
| if merged_df is not None and not merged_df.empty: | |
| st.write("### Preview of Merged Data") | |
| st.dataframe(safe_display_df(merged_df.head())) | |
| try: | |
| csv = merged_df.to_csv(index=False) | |
| st.download_button(label="Download Merged CSV", data=csv, file_name="merged_data.csv", mime="text/csv") | |
| st.write("### Dataset Statistics") | |
| st.write(f"Total rows: {len(merged_df)}") | |
| st.write(f"Total columns: {len(merged_df.columns)}") | |
| st.write("### Data Quality Metrics") | |
| missing_df = pd.DataFrame({ | |
| "Column": merged_df.columns, | |
| "Missing Values": merged_df.isnull().sum().values, | |
| "Missing Percentage": (merged_df.isnull().sum().values / len(merged_df) * 100).round(2), | |
| }) | |
| st.dataframe(missing_df) | |
| duplicates = merged_df.duplicated().sum() | |
| st.write(f"Number of duplicate rows: {duplicates}") | |
| except Exception as e: | |
| st.error(f"Error preparing download: {str(e)}") | |
| elif merged_df is not None and merged_df.empty: | |
| st.warning("The merged DataFrame is empty. Please check the input files and merging keys.") | |
| else: | |
| st.warning("One or more of the processed DataFrames is empty. Merging cannot proceed meaningfully.") | |
| else: | |
| st.warning("Please upload at least 2 files to merge.") | |
| if __name__ == "__main__": | |
| main() |