from sklearn.impute import SimpleImputer import pandas as pd import numpy as np import json def data_quality(df: pd.DataFrame): print("Missing values before handling:") print(df.isnull().sum()) print("Duplicate rows before handling:") print(int(df.duplicated().sum())) df.drop_duplicates(inplace=True) print("Duplicate rows after handling:") print(int(df.duplicated().sum())) return df def standardize_data_types(df: pd.DataFrame) -> pd.DataFrame: for col in df.columns: if df[col].isin([True, False]).all(): continue # already boolean # Handle boolean strings if df[col].dropna().astype(str).isin(["TRUE", "FALSE", "true", "false"]).all(): df[col] = df[col].map({ "TRUE": True, "FALSE": False, "true": True, "false": False }) continue # Try to parse as datetime, if at least 50% parse correctly try: temp = pd.to_datetime(df[col], errors='coerce') if temp.notna().mean() > 0.5: df[col] = temp continue except: pass # Try to parse numeric if at least 50% can be converted try: temp = pd.to_numeric(df[col], errors='coerce') if temp.notna().mean() > 0.5: df[col] = temp continue except: pass # Convert JSON-like strings try: if df[col].dropna().apply(lambda x: isinstance(x, str) and x.strip().startswith("[") and x.strip().endswith("]")).all(): df[col] = df[col].apply(json.loads) continue except: pass # Default: make sure column is string df[col] = df[col].astype(str) return df def handle_missing_data(df: pd.DataFrame) -> pd.DataFrame: print("Before Imputation (NA Counts):") print(df.isnull().sum()) numeric_col = df.select_dtypes(include=['number']).columns if not numeric_col.empty: num_imputer = SimpleImputer(strategy='median') df[numeric_col] = num_imputer.fit_transform(df[numeric_col]) categorical_col = df.select_dtypes(include=['object', 'category']).columns if not categorical_col.empty: cat_imputer = SimpleImputer(strategy='most_frequent') df[categorical_col] = cat_imputer.fit_transform(df[categorical_col]) print("After Imputation (NA Counts):") print(df.isnull().sum()) return df def handle_outliers(df: pd.DataFrame) -> pd.DataFrame: numeric_col = df.select_dtypes(include=['number','int64', 'float64']).columns if not numeric_col.empty: for col in numeric_col: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR df[col] = df[col].apply(lambda x: lower_bound if x < lower_bound else upper_bound if x > upper_bound else x) return df def generate_final_report(df: pd.DataFrame, file_path: str): with open(file_path, "w") as file: file.write("FINAL DATA PREPROCESSING REPORT\n") file.write("=" * 50 + "\n\n") file.write("Missing Values (After Preprocessing):\n") missing_values = df.isnull().sum() for col, count in missing_values.items(): file.write(f"{col}: {count} missing values\n") file.write("\nDuplicate Rows (After Preprocessing):\n") file.write(f"Total Duplicate Rows: {df.duplicated().sum()}\n\n") file.write("Preprocessing Completed Successfully!\n") def save_cleaned_data(df: pd.DataFrame, file_path: str): df.to_csv(file_path, index=False) return file_path