#!/usr/bin/env python3 """ CSV Utilities for robust loading of CSV files """ import pandas as pd import os import tempfile def is_excel_file(filepath): """ Check if a file is an Excel file based on its extension. Args: filepath: Path to the file Returns: True if file has Excel extension, False otherwise """ excel_extensions = ['.xls', '.xlsx', '.xlsm', '.xlsb', '.xltx', '.xltm'] ext = os.path.splitext(filepath)[1].lower() return ext in excel_extensions def convert_excel_to_csv(excel_filepath): """ Convert an Excel file to CSV format. Args: excel_filepath: Path to the Excel file Returns: Path to the converted CSV file (temporary file) """ print(f"Detected Excel file: {os.path.basename(excel_filepath)}") print("Converting to CSV format...") try: # Read the Excel file # Try reading with different engines for compatibility try: df = pd.read_excel(excel_filepath, engine='openpyxl') except: try: df = pd.read_excel(excel_filepath, engine='xlrd') except: df = pd.read_excel(excel_filepath) # Create a temporary CSV file temp_csv = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) temp_csv_path = temp_csv.name temp_csv.close() # Save as CSV df.to_csv(temp_csv_path, index=False) print(f"Successfully converted to temporary CSV: {temp_csv_path}") return temp_csv_path except Exception as e: raise Exception(f"Failed to convert Excel file to CSV: {str(e)}") def robust_csv_loader(filepath, required_columns=None, max_skip_rows=3): """ Robustly load a CSV file by trying different skiprows values. Also handles Excel files by converting them to CSV first. Args: filepath: Path to the CSV or Excel file required_columns: List of column names that must be present max_skip_rows: Maximum number of rows to try skipping Returns: DataFrame if successful, raises exception if all attempts fail """ # Check if file is Excel format and convert if necessary original_filepath = filepath temp_csv_path = None if is_excel_file(filepath): temp_csv_path = convert_excel_to_csv(filepath) filepath = temp_csv_path try: # Convert required_columns to lowercase for case-insensitive matching if required_columns: required_columns = [col.lower() for col in required_columns] for skip_rows in range(max_skip_rows + 1): try: if skip_rows == 0: print(f"Trying to read {os.path.basename(original_filepath)} without skipping rows...") else: print(f"Trying to read {os.path.basename(original_filepath)} with skiprows={skip_rows}...") df = pd.read_csv(filepath, skiprows=skip_rows if skip_rows > 0 else None) # Print columns for debugging print(f"Columns found: {df.columns.tolist()}") # Check if required columns exist if required_columns: missing_cols = [col for col in required_columns if col not in [c.lower() for c in df.columns]] if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") print(f"Successfully loaded {os.path.basename(original_filepath)}") return df except Exception as e: if skip_rows == max_skip_rows: raise Exception(f"Failed to load {original_filepath} after trying {max_skip_rows + 1} skiprows values") from e continue finally: # Clean up temporary CSV file if it was created if temp_csv_path and os.path.exists(temp_csv_path): try: os.remove(temp_csv_path) print(f"Cleaned up temporary CSV file") except: pass def find_email_column(df): """Find the email column in the dataframe.""" for col in df.columns: if 'email' in col.lower(): return col return None def find_ipm_columns(df): """Find FY IPM columns in the dataframe.""" fy2425_col = None fy2324_col = None for col in df.columns: if 'FY24/25' in col and 'IPM' in col: fy2425_col = col if 'FY23/24' in col and 'IPM' in col: fy2324_col = col if not fy2425_col or not fy2324_col: print("\nWarning: Could not find IPM columns automatically") print("Available columns containing 'FY' and 'IPM':") for col in df.columns: if 'FY' in col and 'IPM' in col: print(f" - {col}") # Use the expected column names fy2425_col = 'FY24/25 全年IPM' fy2324_col = 'FY23/24 全年IPM' return fy2425_col, fy2324_col