kpi_analysis / csv_utils.py
zh3036's picture
Deploy KPI snapshot 2025-06-12
4e67a93
#!/usr/bin/env python3
"""
CSV Utilities for robust loading of CSV files
"""
import pandas as pd
import os
import tempfile
def is_excel_file(filepath):
"""
Check if a file is an Excel file based on its extension.
Args:
filepath: Path to the file
Returns:
True if file has Excel extension, False otherwise
"""
excel_extensions = ['.xls', '.xlsx', '.xlsm', '.xlsb', '.xltx', '.xltm']
ext = os.path.splitext(filepath)[1].lower()
return ext in excel_extensions
def convert_excel_to_csv(excel_filepath):
"""
Convert an Excel file to CSV format.
Args:
excel_filepath: Path to the Excel file
Returns:
Path to the converted CSV file (temporary file)
"""
print(f"Detected Excel file: {os.path.basename(excel_filepath)}")
print("Converting to CSV format...")
try:
# Read the Excel file
# Try reading with different engines for compatibility
try:
df = pd.read_excel(excel_filepath, engine='openpyxl')
except:
try:
df = pd.read_excel(excel_filepath, engine='xlrd')
except:
df = pd.read_excel(excel_filepath)
# Create a temporary CSV file
temp_csv = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False)
temp_csv_path = temp_csv.name
temp_csv.close()
# Save as CSV
df.to_csv(temp_csv_path, index=False)
print(f"Successfully converted to temporary CSV: {temp_csv_path}")
return temp_csv_path
except Exception as e:
raise Exception(f"Failed to convert Excel file to CSV: {str(e)}")
def robust_csv_loader(filepath, required_columns=None, max_skip_rows=3):
"""
Robustly load a CSV file by trying different skiprows values.
Also handles Excel files by converting them to CSV first.
Args:
filepath: Path to the CSV or Excel file
required_columns: List of column names that must be present
max_skip_rows: Maximum number of rows to try skipping
Returns:
DataFrame if successful, raises exception if all attempts fail
"""
# Check if file is Excel format and convert if necessary
original_filepath = filepath
temp_csv_path = None
if is_excel_file(filepath):
temp_csv_path = convert_excel_to_csv(filepath)
filepath = temp_csv_path
try:
# Convert required_columns to lowercase for case-insensitive matching
if required_columns:
required_columns = [col.lower() for col in required_columns]
for skip_rows in range(max_skip_rows + 1):
try:
if skip_rows == 0:
print(f"Trying to read {os.path.basename(original_filepath)} without skipping rows...")
else:
print(f"Trying to read {os.path.basename(original_filepath)} with skiprows={skip_rows}...")
df = pd.read_csv(filepath, skiprows=skip_rows if skip_rows > 0 else None)
# Print columns for debugging
print(f"Columns found: {df.columns.tolist()}")
# Check if required columns exist
if required_columns:
missing_cols = [col for col in required_columns if col not in [c.lower() for c in df.columns]]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
print(f"Successfully loaded {os.path.basename(original_filepath)}")
return df
except Exception as e:
if skip_rows == max_skip_rows:
raise Exception(f"Failed to load {original_filepath} after trying {max_skip_rows + 1} skiprows values") from e
continue
finally:
# Clean up temporary CSV file if it was created
if temp_csv_path and os.path.exists(temp_csv_path):
try:
os.remove(temp_csv_path)
print(f"Cleaned up temporary CSV file")
except:
pass
def find_email_column(df):
"""Find the email column in the dataframe."""
for col in df.columns:
if 'email' in col.lower():
return col
return None
def find_ipm_columns(df):
"""Find FY IPM columns in the dataframe."""
fy2425_col = None
fy2324_col = None
for col in df.columns:
if 'FY24/25' in col and 'IPM' in col:
fy2425_col = col
if 'FY23/24' in col and 'IPM' in col:
fy2324_col = col
if not fy2425_col or not fy2324_col:
print("\nWarning: Could not find IPM columns automatically")
print("Available columns containing 'FY' and 'IPM':")
for col in df.columns:
if 'FY' in col and 'IPM' in col:
print(f" - {col}")
# Use the expected column names
fy2425_col = 'FY24/25 全年IPM'
fy2324_col = 'FY23/24 全年IPM'
return fy2425_col, fy2324_col