File size: 2,735 Bytes
8e152f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

from sklearn.impute import SimpleImputer
import pandas as pd
import numpy as np
import json

def data_quality(df: pd.DataFrame):
    df.drop_duplicates(inplace=True)
    return df

def standardize_data_types(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        if df[col].isin([True, False]).all():
            continue
        if df[col].dtype == 'object' and df[col].str.replace('.', '', 1).str.isnumeric().all():
            df[col] = pd.to_numeric(df[col], errors='ignore')
        try:
            df[col] = pd.to_datetime(df[col], errors='coerce')
            if df[col].notna().sum() == 0:
                df[col] = df[col].astype(str)
        except Exception:
            pass
        try:
            if df[col].apply(lambda x: isinstance(x, str) and x.startswith("[") and x.endswith("]")).all():
                df[col] = df[col].apply(json.loads)
        except Exception:
            pass
        if df[col].dtype == 'object' and df[col].dropna().isin(["TRUE", "FALSE"]).all():
            df[col] = df[col].map({"TRUE": True, "FALSE": False})
        if df[col].dtype == 'object':
            df[col] = df[col].astype(str)
    df.fillna("", inplace=True)
    return df

def handle_missing_data(df: pd.DataFrame) -> pd.DataFrame:
    numeric_col = df.select_dtypes(include=['number']).columns
    if not numeric_col.empty:
        df[numeric_col] = SimpleImputer(strategy='median').fit_transform(df[numeric_col])
    categorical_col = df.select_dtypes(include=['object']).columns
    if not categorical_col.empty:
        df[categorical_col] = SimpleImputer(strategy='most_frequent').fit_transform(df[categorical_col])
    return df

def handle_outliers(df: pd.DataFrame) -> pd.DataFrame:
    numeric_col = df.select_dtypes(include=['number','int64', 'float64']).columns
    if not numeric_col.empty:
        for col in numeric_col:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75) 
            IQR = Q3 - Q1
            lower = Q1 - 1.5 * IQR
            upper = Q3 + 1.5 * IQR
            df[col] = df[col].apply(lambda x: lower if x < lower else upper if x > upper else x)
    return df

def generate_final_report(df: pd.DataFrame, file_path: str):
    with open(file_path, "w") as file:
        file.write("FINAL DATA PREPROCESSING REPORT\n")
        file.write("=" * 50 + "\n\n")
        missing = df.isnull().sum()
        for col, count in missing.items():
            file.write(f"{col}: {count} missing values\n")
        file.write(f"Total Duplicate Rows: {df.duplicated().sum()}\n")
        file.write("Preprocessing Completed Successfully!\n")

def save_cleaned_data(df: pd.DataFrame, file_path: str):
    df.to_csv(file_path, index=False)
    return file_path