ToxicTweet-Tagger / components /data_validation.py
Subi003's picture
Upload folder using huggingface_hub
4c01182 verified
import sys
import pandas as pd
import json
from pathlib import Path
from evidently import DataDefinition, Dataset, Report
from evidently.presets import DataDriftPreset
from src.core.logger import logging
from src.core.exception import AppException
from src.core.configuration import AppConfiguration
import gc
class DataValidation:
def __init__(self, config = AppConfiguration()):
"""
Initializes the DataPreprocessing object by creating the data prepocessing configuration.
Args:
config (AppConfiguration): The configuration object containing the application configuration.
"""
self.validation_config = config.data_validation_config()
def get_data_drift_report(self, train_data: pd.DataFrame, test_data: pd.DataFrame):
try:
logging.info("Preparing data drift report")
train_df, test_df = self._get_features(train_data, test_data)
schema = DataDefinition(
text_columns=["Content"],
numerical_columns=["num_words", "num_characters", "avg_word_len"],
)
train_ds = Dataset.from_pandas(train_df, data_definition=schema)
test_ds = Dataset.from_pandas(test_df, data_definition=schema)
report = Report(metrics=[DataDriftPreset()])
result = report.run(reference_data=train_ds, current_data=test_ds)
result.save_json("reports/data_drift.json")
logging.info("Data drift report prepared and saved in json format")
except Exception as e:
logging.error(f"Faild to get the data drift report: {e}", exc_info=True)
raise AppException(e, sys)
def check_data_drift(self):
"""
Checks if there is a significant drift in the data distribution.
Reads the data drift report generated by the `get_data_drift_report` method and
checks if there is a significant drift in the data distribution. If a drift is detected,
it raises a `ValueError` with a message indicating that a data drift has been detected.
"""
try:
data_drift_report_path = Path("reports/data_drift.json")
if not data_drift_report_path.exists():
raise FileNotFoundError("'data_drift.json' file not found")
with open(data_drift_report_path, "r") as file:
report = json.load(file)
logging.info("Evaluating data drift report")
metrics = report.get("metrics", [])
drift_count_metric = next(
(m for m in metrics if m["metric_id"].startswith("DriftedColumnsCount")), None
)
if drift_count_metric is None:
raise ValueError("DriftedColumnsCount metric not found in report")
drift_count = drift_count_metric["value"]["count"]
drift_share = drift_count_metric["value"]["share"]
# Evidently default drift detection threshold: drift_share >= 0.5
DRIFT_SHARE_THRESHOLD = 0.5
if drift_share >= DRIFT_SHARE_THRESHOLD:
logging.error(f"Data drift detected: {drift_count} columns drifted ({drift_share:.0%} of total columns).")
with open(self.validation_config.validation_status_file, "w") as f:
f.write("Validation Status : Failed")
raise ValueError("Data drift detected")
else:
logging.info("No significant data drift detected")
with open(self.validation_config.validation_status_file, "w") as f:
f.write("Validation Status : Successful")
except Exception as e:
logging.error(f"Error occured in checking for data drift: {e}", exc_info=True)
raise AppException(e, sys)
def _get_features(self, train_data: pd.DataFrame, test_data: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Gets the features required for preparing the data drift report.
Args:
train_data (pd.DataFrame): The training data.
test_data (pd.DataFrame): The test data.
Returns:
tuple: A tuple containing the modified train and test dataframes with added features.
"""
try:
train_data["num_words"] = train_data["Content"].apply(lambda x: len(x.split()))
train_data["num_characters"] = train_data["Content"].apply(lambda x: len(x.replace(" ", "")))
train_data['avg_word_len'] = round(train_data["num_characters"]/train_data["num_words"], 4)
test_data["num_words"] = test_data["Content"].apply(lambda x: len(x.split()))
test_data["num_characters"] = test_data["Content"].apply(lambda x: len(x.replace(" ", "")))
test_data['avg_word_len'] = round(test_data["num_characters"]/test_data["num_words"], 4)
return train_data, test_data
except Exception as e:
logging.error(f"Failed to get features required for preparing data drift report: {e}", exc_info=True)
raise AppException(e, sys)
def validate_dataset(self, df: pd.DataFrame):
"""
Validates the given dataframe for required columns and labels. Also checks for class imbalance.
Args:
df (pd.DataFrame): The dataframe to be validated.
Raises:
ValueError: If the dataframe does not have required columns or labels.
ValueError: If the class imbalance ratio is greater than 1.6.
AppException: If any other error occurs during the validation process.
"""
try:
required_columns = ["Content", "Label"]
required_labels = [1, 0]
columns = [col for col in required_columns if col not in df.columns]
labels = [label for label in required_labels if label not in df["Label"].unique()]
if not columns or not labels:
logging.info("All required columns and labels are present")
else:
with open(self.validation_config.validation_status_file, "w") as f:
f.write("Validation Status : Failed")
raise ValueError(f"Required columns/labels not satisfied. Verify data.")
# Check for class imbalance
labels_count = df["Label"].value_counts()
majority_class_count = labels_count.max()
minority_class_count = labels_count.min()
class_imbalance_ratio = majority_class_count/minority_class_count
if class_imbalance_ratio > 1.6 :
raise ValueError(f"High class label imbalance found in data")
elif class_imbalance_ratio > 1.3 :
logging.warning("Class label imbalance exists. Model performance might be affected.")
else:
logging.info("No major class imbalance detected")
except Exception as e:
logging.error(f"Feature engineering operation terminated: {e}", exc_info=True)
raise AppException(e, sys)
def initiate_data_validation():
"""
Initiates the data validation workflow.
This function initializes a DataValidation object, reads the ingested,
training, and testing datasets, and performs validation checks on the
ingested dataset. It generates a data drift report and checks for any
significant data drift between the training and testing datasets.
Raises:
AppException: If any error occurs during the data validation process.
"""
obj = DataValidation()
try:
logging.info(f"{'='*20}Data Validation{'='*20}")
ingested_data_path = obj.validation_config.ingested_data_path
train_data_path = obj.validation_config.train_data_path
test_data_path = obj.validation_config.test_data_path
ingested_df = pd.read_csv(ingested_data_path)
train_df = pd.read_parquet(train_data_path)
test_df = pd.read_parquet(test_data_path)
obj.validate_dataset(df=ingested_df)
obj.get_data_drift_report(train_df, test_df)
obj.check_data_drift()
# free memory
del ingested_df, train_df, test_df
gc.collect()
logging.info(f"{'='*20}Data Validation Completed Successfully{'='*20} \n\n")
except Exception as e:
logging.error(f"Data Validation failed: {e}", exc_info=True)
raise AppException(e, sys)
if __name__ == "__main__":
initiate_data_validation()