Spaces:
Sleeping
Sleeping
File size: 5,173 Bytes
4e67a93 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | #!/usr/bin/env python3
"""
CSV Utilities for robust loading of CSV files
"""
import pandas as pd
import os
import tempfile
def is_excel_file(filepath):
"""
Check if a file is an Excel file based on its extension.
Args:
filepath: Path to the file
Returns:
True if file has Excel extension, False otherwise
"""
excel_extensions = ['.xls', '.xlsx', '.xlsm', '.xlsb', '.xltx', '.xltm']
ext = os.path.splitext(filepath)[1].lower()
return ext in excel_extensions
def convert_excel_to_csv(excel_filepath):
"""
Convert an Excel file to CSV format.
Args:
excel_filepath: Path to the Excel file
Returns:
Path to the converted CSV file (temporary file)
"""
print(f"Detected Excel file: {os.path.basename(excel_filepath)}")
print("Converting to CSV format...")
try:
# Read the Excel file
# Try reading with different engines for compatibility
try:
df = pd.read_excel(excel_filepath, engine='openpyxl')
except:
try:
df = pd.read_excel(excel_filepath, engine='xlrd')
except:
df = pd.read_excel(excel_filepath)
# Create a temporary CSV file
temp_csv = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False)
temp_csv_path = temp_csv.name
temp_csv.close()
# Save as CSV
df.to_csv(temp_csv_path, index=False)
print(f"Successfully converted to temporary CSV: {temp_csv_path}")
return temp_csv_path
except Exception as e:
raise Exception(f"Failed to convert Excel file to CSV: {str(e)}")
def robust_csv_loader(filepath, required_columns=None, max_skip_rows=3):
"""
Robustly load a CSV file by trying different skiprows values.
Also handles Excel files by converting them to CSV first.
Args:
filepath: Path to the CSV or Excel file
required_columns: List of column names that must be present
max_skip_rows: Maximum number of rows to try skipping
Returns:
DataFrame if successful, raises exception if all attempts fail
"""
# Check if file is Excel format and convert if necessary
original_filepath = filepath
temp_csv_path = None
if is_excel_file(filepath):
temp_csv_path = convert_excel_to_csv(filepath)
filepath = temp_csv_path
try:
# Convert required_columns to lowercase for case-insensitive matching
if required_columns:
required_columns = [col.lower() for col in required_columns]
for skip_rows in range(max_skip_rows + 1):
try:
if skip_rows == 0:
print(f"Trying to read {os.path.basename(original_filepath)} without skipping rows...")
else:
print(f"Trying to read {os.path.basename(original_filepath)} with skiprows={skip_rows}...")
df = pd.read_csv(filepath, skiprows=skip_rows if skip_rows > 0 else None)
# Print columns for debugging
print(f"Columns found: {df.columns.tolist()}")
# Check if required columns exist
if required_columns:
missing_cols = [col for col in required_columns if col not in [c.lower() for c in df.columns]]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
print(f"Successfully loaded {os.path.basename(original_filepath)}")
return df
except Exception as e:
if skip_rows == max_skip_rows:
raise Exception(f"Failed to load {original_filepath} after trying {max_skip_rows + 1} skiprows values") from e
continue
finally:
# Clean up temporary CSV file if it was created
if temp_csv_path and os.path.exists(temp_csv_path):
try:
os.remove(temp_csv_path)
print(f"Cleaned up temporary CSV file")
except:
pass
def find_email_column(df):
"""Find the email column in the dataframe."""
for col in df.columns:
if 'email' in col.lower():
return col
return None
def find_ipm_columns(df):
"""Find FY IPM columns in the dataframe."""
fy2425_col = None
fy2324_col = None
for col in df.columns:
if 'FY24/25' in col and 'IPM' in col:
fy2425_col = col
if 'FY23/24' in col and 'IPM' in col:
fy2324_col = col
if not fy2425_col or not fy2324_col:
print("\nWarning: Could not find IPM columns automatically")
print("Available columns containing 'FY' and 'IPM':")
for col in df.columns:
if 'FY' in col and 'IPM' in col:
print(f" - {col}")
# Use the expected column names
fy2425_col = 'FY24/25 全年IPM'
fy2324_col = 'FY23/24 全年IPM'
return fy2425_col, fy2324_col |