Spaces:
Sleeping
Sleeping
| import sys | |
| import pandas as pd | |
| import json | |
| from pathlib import Path | |
| from evidently import DataDefinition, Dataset, Report | |
| from evidently.presets import DataDriftPreset | |
| from src.core.logger import logging | |
| from src.core.exception import AppException | |
| from src.core.configuration import AppConfiguration | |
| import gc | |
| class DataValidation: | |
| def __init__(self, config = AppConfiguration()): | |
| """ | |
| Initializes the DataPreprocessing object by creating the data prepocessing configuration. | |
| Args: | |
| config (AppConfiguration): The configuration object containing the application configuration. | |
| """ | |
| self.validation_config = config.data_validation_config() | |
| def get_data_drift_report(self, train_data: pd.DataFrame, test_data: pd.DataFrame): | |
| try: | |
| logging.info("Preparing data drift report") | |
| train_df, test_df = self._get_features(train_data, test_data) | |
| schema = DataDefinition( | |
| text_columns=["Content"], | |
| numerical_columns=["num_words", "num_characters", "avg_word_len"], | |
| ) | |
| train_ds = Dataset.from_pandas(train_df, data_definition=schema) | |
| test_ds = Dataset.from_pandas(test_df, data_definition=schema) | |
| report = Report(metrics=[DataDriftPreset()]) | |
| result = report.run(reference_data=train_ds, current_data=test_ds) | |
| result.save_json("reports/data_drift.json") | |
| logging.info("Data drift report prepared and saved in json format") | |
| except Exception as e: | |
| logging.error(f"Faild to get the data drift report: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def check_data_drift(self): | |
| """ | |
| Checks if there is a significant drift in the data distribution. | |
| Reads the data drift report generated by the `get_data_drift_report` method and | |
| checks if there is a significant drift in the data distribution. If a drift is detected, | |
| it raises a `ValueError` with a message indicating that a data drift has been detected. | |
| """ | |
| try: | |
| data_drift_report_path = Path("reports/data_drift.json") | |
| if not data_drift_report_path.exists(): | |
| raise FileNotFoundError("'data_drift.json' file not found") | |
| with open(data_drift_report_path, "r") as file: | |
| report = json.load(file) | |
| logging.info("Evaluating data drift report") | |
| metrics = report.get("metrics", []) | |
| drift_count_metric = next( | |
| (m for m in metrics if m["metric_id"].startswith("DriftedColumnsCount")), None | |
| ) | |
| if drift_count_metric is None: | |
| raise ValueError("DriftedColumnsCount metric not found in report") | |
| drift_count = drift_count_metric["value"]["count"] | |
| drift_share = drift_count_metric["value"]["share"] | |
| # Evidently default drift detection threshold: drift_share >= 0.5 | |
| DRIFT_SHARE_THRESHOLD = 0.5 | |
| if drift_share >= DRIFT_SHARE_THRESHOLD: | |
| logging.error(f"Data drift detected: {drift_count} columns drifted ({drift_share:.0%} of total columns).") | |
| with open(self.validation_config.validation_status_file, "w") as f: | |
| f.write("Validation Status : Failed") | |
| raise ValueError("Data drift detected") | |
| else: | |
| logging.info("No significant data drift detected") | |
| with open(self.validation_config.validation_status_file, "w") as f: | |
| f.write("Validation Status : Successful") | |
| except Exception as e: | |
| logging.error(f"Error occured in checking for data drift: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def _get_features(self, train_data: pd.DataFrame, test_data: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Gets the features required for preparing the data drift report. | |
| Args: | |
| train_data (pd.DataFrame): The training data. | |
| test_data (pd.DataFrame): The test data. | |
| Returns: | |
| tuple: A tuple containing the modified train and test dataframes with added features. | |
| """ | |
| try: | |
| train_data["num_words"] = train_data["Content"].apply(lambda x: len(x.split())) | |
| train_data["num_characters"] = train_data["Content"].apply(lambda x: len(x.replace(" ", ""))) | |
| train_data['avg_word_len'] = round(train_data["num_characters"]/train_data["num_words"], 4) | |
| test_data["num_words"] = test_data["Content"].apply(lambda x: len(x.split())) | |
| test_data["num_characters"] = test_data["Content"].apply(lambda x: len(x.replace(" ", ""))) | |
| test_data['avg_word_len'] = round(test_data["num_characters"]/test_data["num_words"], 4) | |
| return train_data, test_data | |
| except Exception as e: | |
| logging.error(f"Failed to get features required for preparing data drift report: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def validate_dataset(self, df: pd.DataFrame): | |
| """ | |
| Validates the given dataframe for required columns and labels. Also checks for class imbalance. | |
| Args: | |
| df (pd.DataFrame): The dataframe to be validated. | |
| Raises: | |
| ValueError: If the dataframe does not have required columns or labels. | |
| ValueError: If the class imbalance ratio is greater than 1.6. | |
| AppException: If any other error occurs during the validation process. | |
| """ | |
| try: | |
| required_columns = ["Content", "Label"] | |
| required_labels = [1, 0] | |
| columns = [col for col in required_columns if col not in df.columns] | |
| labels = [label for label in required_labels if label not in df["Label"].unique()] | |
| if not columns or not labels: | |
| logging.info("All required columns and labels are present") | |
| else: | |
| with open(self.validation_config.validation_status_file, "w") as f: | |
| f.write("Validation Status : Failed") | |
| raise ValueError(f"Required columns/labels not satisfied. Verify data.") | |
| # Check for class imbalance | |
| labels_count = df["Label"].value_counts() | |
| majority_class_count = labels_count.max() | |
| minority_class_count = labels_count.min() | |
| class_imbalance_ratio = majority_class_count/minority_class_count | |
| if class_imbalance_ratio > 1.6 : | |
| raise ValueError(f"High class label imbalance found in data") | |
| elif class_imbalance_ratio > 1.3 : | |
| logging.warning("Class label imbalance exists. Model performance might be affected.") | |
| else: | |
| logging.info("No major class imbalance detected") | |
| except Exception as e: | |
| logging.error(f"Feature engineering operation terminated: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def initiate_data_validation(): | |
| """ | |
| Initiates the data validation workflow. | |
| This function initializes a DataValidation object, reads the ingested, | |
| training, and testing datasets, and performs validation checks on the | |
| ingested dataset. It generates a data drift report and checks for any | |
| significant data drift between the training and testing datasets. | |
| Raises: | |
| AppException: If any error occurs during the data validation process. | |
| """ | |
| obj = DataValidation() | |
| try: | |
| logging.info(f"{'='*20}Data Validation{'='*20}") | |
| ingested_data_path = obj.validation_config.ingested_data_path | |
| train_data_path = obj.validation_config.train_data_path | |
| test_data_path = obj.validation_config.test_data_path | |
| ingested_df = pd.read_csv(ingested_data_path) | |
| train_df = pd.read_parquet(train_data_path) | |
| test_df = pd.read_parquet(test_data_path) | |
| obj.validate_dataset(df=ingested_df) | |
| obj.get_data_drift_report(train_df, test_df) | |
| obj.check_data_drift() | |
| # free memory | |
| del ingested_df, train_df, test_df | |
| gc.collect() | |
| logging.info(f"{'='*20}Data Validation Completed Successfully{'='*20} \n\n") | |
| except Exception as e: | |
| logging.error(f"Data Validation failed: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| if __name__ == "__main__": | |
| initiate_data_validation() |