File size: 3,757 Bytes
d170e1e 43f7806 d170e1e 460718f 43f7806 460718f 43f7806 d170e1e 43f7806 d170e1e 460718f 43f7806 d170e1e 43f7806 d170e1e 43f7806 d170e1e 460718f 43f7806 460718f d170e1e 43f7806 d170e1e 43f7806 d170e1e 43f7806 d170e1e 43f7806 d170e1e 43f7806 d170e1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
from sklearn.impute import SimpleImputer
import pandas as pd
import numpy as np
import json
def data_quality(df: pd.DataFrame):
print("Missing values before handling:")
print(df.isnull().sum())
print("Duplicate rows before handling:")
print(int(df.duplicated().sum()))
df.drop_duplicates(inplace=True)
print("Duplicate rows after handling:")
print(int(df.duplicated().sum()))
return df
def standardize_data_types(df: pd.DataFrame) -> pd.DataFrame:
for col in df.columns:
if df[col].isin([True, False]).all():
continue # already boolean
# Handle boolean strings
if df[col].dropna().astype(str).isin(["TRUE", "FALSE", "true", "false"]).all():
df[col] = df[col].map({
"TRUE": True, "FALSE": False,
"true": True, "false": False
})
continue
# Try to parse as datetime, if at least 50% parse correctly
try:
temp = pd.to_datetime(df[col], errors='coerce')
if temp.notna().mean() > 0.5:
df[col] = temp
continue
except:
pass
# Try to parse numeric if at least 50% can be converted
try:
temp = pd.to_numeric(df[col], errors='coerce')
if temp.notna().mean() > 0.5:
df[col] = temp
continue
except:
pass
# Convert JSON-like strings
try:
if df[col].dropna().apply(lambda x: isinstance(x, str) and x.strip().startswith("[") and x.strip().endswith("]")).all():
df[col] = df[col].apply(json.loads)
continue
except:
pass
# Default: make sure column is string
df[col] = df[col].astype(str)
return df
def handle_missing_data(df: pd.DataFrame) -> pd.DataFrame:
print("Before Imputation (NA Counts):")
print(df.isnull().sum())
numeric_col = df.select_dtypes(include=['number']).columns
if not numeric_col.empty:
num_imputer = SimpleImputer(strategy='median')
df[numeric_col] = num_imputer.fit_transform(df[numeric_col])
categorical_col = df.select_dtypes(include=['object', 'category']).columns
if not categorical_col.empty:
cat_imputer = SimpleImputer(strategy='most_frequent')
df[categorical_col] = cat_imputer.fit_transform(df[categorical_col])
print("After Imputation (NA Counts):")
print(df.isnull().sum())
return df
def handle_outliers(df: pd.DataFrame) -> pd.DataFrame:
numeric_col = df.select_dtypes(include=['number','int64', 'float64']).columns
if not numeric_col.empty:
for col in numeric_col:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df[col] = df[col].apply(lambda x: lower_bound if x < lower_bound else upper_bound if x > upper_bound else x)
return df
def generate_final_report(df: pd.DataFrame, file_path: str):
with open(file_path, "w") as file:
file.write("FINAL DATA PREPROCESSING REPORT\n")
file.write("=" * 50 + "\n\n")
file.write("Missing Values (After Preprocessing):\n")
missing_values = df.isnull().sum()
for col, count in missing_values.items():
file.write(f"{col}: {count} missing values\n")
file.write("\nDuplicate Rows (After Preprocessing):\n")
file.write(f"Total Duplicate Rows: {df.duplicated().sum()}\n\n")
file.write("Preprocessing Completed Successfully!\n")
def save_cleaned_data(df: pd.DataFrame, file_path: str):
df.to_csv(file_path, index=False)
return file_path
|