File size: 8,827 Bytes
4c01182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import sys
import pandas as pd
import json
from pathlib import Path
from evidently import DataDefinition, Dataset, Report
from evidently.presets import DataDriftPreset
from src.core.logger import logging
from src.core.exception import AppException
from src.core.configuration import AppConfiguration
import gc

class DataValidation:
    def __init__(self, config = AppConfiguration()):
        """

         Initializes the DataPreprocessing object by creating the data prepocessing configuration.

         Args:

             config (AppConfiguration): The configuration object containing the application configuration.

         """
        self.validation_config = config.data_validation_config()


    def get_data_drift_report(self, train_data: pd.DataFrame, test_data: pd.DataFrame):
        try:
            logging.info("Preparing data drift report")
            train_df, test_df = self._get_features(train_data, test_data)

            schema = DataDefinition(
                    text_columns=["Content"],                          
                    numerical_columns=["num_words", "num_characters", "avg_word_len"],
                )
            
            train_ds = Dataset.from_pandas(train_df, data_definition=schema)
            test_ds  = Dataset.from_pandas(test_df,  data_definition=schema)

            report = Report(metrics=[DataDriftPreset()])
            result = report.run(reference_data=train_ds, current_data=test_ds)
            
            result.save_json("reports/data_drift.json")
            logging.info("Data drift report prepared and saved in json format")
        
        except Exception as e:
            logging.error(f"Faild to get the data drift report: {e}", exc_info=True)
            raise AppException(e, sys)


    def check_data_drift(self):  
        """

        Checks if there is a significant drift in the data distribution.



        Reads the data drift report generated by the `get_data_drift_report` method and 

        checks if there is a significant drift in the data distribution. If a drift is detected, 

        it raises a `ValueError` with a message indicating that a data drift has been detected.

        """
        try:
            data_drift_report_path = Path("reports/data_drift.json")
            if not data_drift_report_path.exists():
                raise FileNotFoundError("'data_drift.json' file not found")

            with open(data_drift_report_path, "r") as file:
                report = json.load(file)

            logging.info("Evaluating data drift report")

            metrics = report.get("metrics", [])
            drift_count_metric = next(
                (m for m in metrics if m["metric_id"].startswith("DriftedColumnsCount")), None
            )

            if drift_count_metric is None:
                raise ValueError("DriftedColumnsCount metric not found in report")

            drift_count = drift_count_metric["value"]["count"]
            drift_share = drift_count_metric["value"]["share"]

            # Evidently default drift detection threshold: drift_share >= 0.5
            DRIFT_SHARE_THRESHOLD = 0.5

            if drift_share >= DRIFT_SHARE_THRESHOLD:
                logging.error(f"Data drift detected: {drift_count} columns drifted ({drift_share:.0%} of total columns).")
                with open(self.validation_config.validation_status_file, "w") as f:
                    f.write("Validation Status : Failed")
                raise ValueError("Data drift detected")
            else:
                logging.info("No significant data drift detected")
                with open(self.validation_config.validation_status_file, "w") as f:
                    f.write("Validation Status : Successful")

        except Exception as e:
            logging.error(f"Error occured in checking for data drift: {e}", exc_info=True)
            raise AppException(e, sys)
    
    
    def _get_features(self, train_data: pd.DataFrame, test_data: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
        """

        Gets the features required for preparing the data drift report.

        Args:

            train_data (pd.DataFrame): The training data.

            test_data (pd.DataFrame): The test data.



        Returns:

            tuple: A tuple containing the modified train and test dataframes with added features.

        """
        try:
            train_data["num_words"] = train_data["Content"].apply(lambda x: len(x.split()))
            train_data["num_characters"] = train_data["Content"].apply(lambda x: len(x.replace(" ", "")))
            train_data['avg_word_len'] = round(train_data["num_characters"]/train_data["num_words"], 4)

            test_data["num_words"] = test_data["Content"].apply(lambda x: len(x.split()))
            test_data["num_characters"] = test_data["Content"].apply(lambda x: len(x.replace(" ", "")))
            test_data['avg_word_len'] = round(test_data["num_characters"]/test_data["num_words"], 4)

            return train_data, test_data

        except Exception as e:
            logging.error(f"Failed to get features required for preparing data drift report: {e}", exc_info=True)
            raise AppException(e, sys)


    def validate_dataset(self, df: pd.DataFrame):
        """

        Validates the given dataframe for required columns and labels. Also checks for class imbalance.

        Args:

            df (pd.DataFrame): The dataframe to be validated.

        

        Raises:

            ValueError: If the dataframe does not have required columns or labels.

            ValueError: If the class imbalance ratio is greater than 1.6.

            AppException: If any other error occurs during the validation process.

        """
        try:
            required_columns = ["Content", "Label"]
            required_labels = [1, 0]
            
            columns = [col for col in required_columns if col not in df.columns]
            labels = [label for label in required_labels if label not in df["Label"].unique()]
            
            if not columns or not labels:
                logging.info("All required columns and labels are present")
            else:
                with open(self.validation_config.validation_status_file, "w") as f:
                    f.write("Validation Status : Failed")
                raise ValueError(f"Required columns/labels not satisfied. Verify data.")
            
            # Check for class imbalance
            labels_count = df["Label"].value_counts()
            majority_class_count = labels_count.max()
            minority_class_count = labels_count.min()
            class_imbalance_ratio = majority_class_count/minority_class_count

            if class_imbalance_ratio > 1.6 :
                raise ValueError(f"High class label imbalance found in data")
            elif class_imbalance_ratio > 1.3 :
                logging.warning("Class label imbalance exists. Model performance might be affected.")
            else:
                logging.info("No major class imbalance detected")

        except Exception as e:
            logging.error(f"Feature engineering operation terminated: {e}", exc_info=True)
            raise AppException(e, sys)
        

def initiate_data_validation():
    """

    Initiates the data validation workflow.



    This function initializes a DataValidation object, reads the ingested, 

    training, and testing datasets, and performs validation checks on the 

    ingested dataset. It generates a data drift report and checks for any 

    significant data drift between the training and testing datasets.



    Raises:

        AppException: If any error occurs during the data validation process.

    """

    obj = DataValidation()

    try:
        logging.info(f"{'='*20}Data Validation{'='*20}")
        ingested_data_path = obj.validation_config.ingested_data_path
        train_data_path = obj.validation_config.train_data_path
        test_data_path = obj.validation_config.test_data_path

        ingested_df = pd.read_csv(ingested_data_path)
        train_df = pd.read_parquet(train_data_path)
        test_df = pd.read_parquet(test_data_path)

        obj.validate_dataset(df=ingested_df)
        obj.get_data_drift_report(train_df, test_df)
        obj.check_data_drift()
        # free memory
        del ingested_df, train_df, test_df
        gc.collect()
        logging.info(f"{'='*20}Data Validation Completed Successfully{'='*20} \n\n")

    except Exception as e:
        logging.error(f"Data Validation failed: {e}", exc_info=True)
        raise AppException(e, sys)
    

if __name__ == "__main__":
    initiate_data_validation()