File size: 4,242 Bytes
a309487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import pandas as pd
import numpy as np
from backend.utils.logger import logger

def convert_numpy_types(obj):
    """Recursively convert numpy types to Python types for JSON serialization."""
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (np.integer, np.int64, np.int32)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float64, np.float32)):
        return float(obj)
    elif isinstance(obj, np.bool_):
        return bool(obj)
    elif isinstance(obj, dict):
        return {key: convert_numpy_types(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_types(item) for item in obj]
    else:
        return obj

class DatasetAnalyzer:
    def analyze(self, df: pd.DataFrame, target_column: str = None):
        logger.info("Starting dataset analysis...")
        # Remove all-null columns
        null_columns = df.columns[df.isnull().all()]
        if len(null_columns) > 0:
            logger.warning(f"Removing all-null columns: {list(null_columns)}")
            df = df.drop(columns=null_columns)

        # Remove duplicate rows
        duplicate_rows = df.duplicated().sum()
        if duplicate_rows > 0:
            logger.warning(f"Removing {duplicate_rows} duplicate rows")
            df = df.drop_duplicates()

        # Remove constant columns
        constant_columns = [col for col in df.columns if df[col].nunique() == 1]
        if len(constant_columns) > 0:
            logger.warning(f"Removing constant columns: {constant_columns}")
            df = df.drop(columns=constant_columns)

        # Ensure at least 2 usable features after preprocessing
        usable_features = [col for col in df.columns if col != target_column]
        if len(usable_features) < 2:
            raise ValueError(f"Insufficient features: only {len(usable_features)} usable features after preprocessing, need at least 2")

        info = {}
        info["num_rows"] = df.shape[0]
        info["num_columns"] = df.shape[1]
        info["missing_ratio"] = df.isnull().mean().mean()
        info["row_count"] = df.shape[0]
        info["high_dimensional"] = bool(df.shape[1] > 50)
        info["small_data"] = bool(df.shape[0] < 1200)
        info["sparse_data"] = bool(df.isnull().mean().mean() > 0.4)
        all_numeric_cols = df.select_dtypes(include="number").columns.tolist()
        all_categorical_cols = df.select_dtypes(exclude="number").columns.tolist()
        info["numeric_cols"] = [col for col in all_numeric_cols if col != target_column]
        info["categorical_cols"] = [col for col in all_categorical_cols if col != target_column]

        if len(info["numeric_cols"]) + len(info["categorical_cols"]) < 2:
            raise ValueError("Dataset must have at least 2 usable features after preprocessing")

        # Cardinality
        cardinality = {col: df[col].nunique() for col in df.columns}
        info["cardinality"] = cardinality

        # Target-specific checks
        if target_column and target_column in df.columns:
            target = df[target_column]
            unique_vals = target.nunique()
            if target.dtype in ['int64', 'float64'] and unique_vals > 10:
                info["target_type"] = "regression"
                info["class_distribution"] = None
                info["imbalance"] = None
            else:
                info["target_type"] = "classification"
                value_counts = target.value_counts(normalize=True)
                info["class_distribution"] = value_counts.to_dict()
                info["imbalance"] = bool(value_counts.max() > 0.8)
        else:
            info["target_type"] = None
            info["class_distribution"] = None
            info["imbalance"] = None

        # NLP detection heuristic
        avg_text_len = None
        text_columns = []
        for col in info["categorical_cols"]:
            if df[col].astype(str).str.len().mean() > 30:
                text_columns.append(col)
        info["text_columns"] = text_columns
        info["possible_nlp"] = len(text_columns) > 0

        return convert_numpy_types(info)