from sklearn.impute import SimpleImputer import pandas as pd import numpy as np import json def data_quality(df: pd.DataFrame): df.drop_duplicates(inplace=True) return df def standardize_data_types(df: pd.DataFrame) -> pd.DataFrame: for col in df.columns: if df[col].isin([True, False]).all(): continue if df[col].dtype == 'object' and df[col].str.replace('.', '', 1).str.isnumeric().all(): df[col] = pd.to_numeric(df[col], errors='ignore') try: df[col] = pd.to_datetime(df[col], errors='coerce') if df[col].notna().sum() == 0: df[col] = df[col].astype(str) except Exception: pass try: if df[col].apply(lambda x: isinstance(x, str) and x.startswith("[") and x.endswith("]")).all(): df[col] = df[col].apply(json.loads) except Exception: pass if df[col].dtype == 'object' and df[col].dropna().isin(["TRUE", "FALSE"]).all(): df[col] = df[col].map({"TRUE": True, "FALSE": False}) if df[col].dtype == 'object': df[col] = df[col].astype(str) df.fillna("", inplace=True) return df def handle_missing_data(df: pd.DataFrame) -> pd.DataFrame: numeric_col = df.select_dtypes(include=['number']).columns if not numeric_col.empty: df[numeric_col] = SimpleImputer(strategy='median').fit_transform(df[numeric_col]) categorical_col = df.select_dtypes(include=['object']).columns if not categorical_col.empty: df[categorical_col] = SimpleImputer(strategy='most_frequent').fit_transform(df[categorical_col]) return df def handle_outliers(df: pd.DataFrame) -> pd.DataFrame: numeric_col = df.select_dtypes(include=['number','int64', 'float64']).columns if not numeric_col.empty: for col in numeric_col: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR df[col] = df[col].apply(lambda x: lower if x < lower else upper if x > upper else x) return df def generate_final_report(df: pd.DataFrame, file_path: str): with open(file_path, "w") as file: file.write("FINAL DATA PREPROCESSING REPORT\n") file.write("=" * 50 + "\n\n") missing = df.isnull().sum() for col, count in missing.items(): file.write(f"{col}: {count} missing values\n") file.write(f"Total Duplicate Rows: {df.duplicated().sum()}\n") file.write("Preprocessing Completed Successfully!\n") def save_cleaned_data(df: pd.DataFrame, file_path: str): df.to_csv(file_path, index=False) return file_path