from networksecurity.entity.artifact_entity import DataIngestionArtifact, DataValidationArtifact from networksecurity.entity.config_entity import DataValidationConfig from networksecurity.exception.exception import NetworkSecurityException from networksecurity.constant.training_pipeline import SCHEMA_FILE_PATH from networksecurity.utils.main_utils.utils import read_yaml_file from networksecurity.utils.main_utils.utils import write_yaml_file from networksecurity.logging.logger import logging from scipy.stats import ks_2samp import pandas as pd import os,sys class DataValidation: def __init__(self, data_ingestion_artifact: DataIngestionArtifact, data_validation_config: DataValidationConfig): try: self.data_ingestion_artifact = data_ingestion_artifact self.data_validation_config = data_validation_config self._schema_config = read_yaml_file(SCHEMA_FILE_PATH) except Exception as e: raise NetworkSecurityException(e, sys) @staticmethod def read_data(file_path)->pd.DataFrame: try: return pd.read_csv(file_path) except Exception as e: raise NetworkSecurityException(e, sys) def validate_number_of_columns(self, dataframe: pd.DataFrame)->bool: try: number_of_columns = len(self._schema_config) logging.info(f"Required number of columns: {number_of_columns}") logging.info(f"DataFrame columns count: {len(dataframe.columns)}") if len(dataframe.columns)==number_of_columns: return True return False except Exception as e: raise NetworkSecurityException(e, sys) def validate_number_of_numerical_columns(self, dataframe: pd.DataFrame) -> bool: try: # Required numerical columns from schema required_numerical_columns = self._schema_config["numerical_columns"] required_count = len(required_numerical_columns) # Actual numerical columns in dataframe numerical_df = dataframe.select_dtypes(include=["int64"]) actual_count = len(numerical_df.columns) logging.info(f"Required number of numerical columns: {required_count}") logging.info(f"Dataframe numerical columns count: {actual_count}") if actual_count == required_count: return True return False except Exception as e: raise NetworkSecurityException(e, sys) def detect_dataset_drift(self, base_df, current_df, threshold=0.01)-> bool: try: status = True report={} for column in base_df.columns: d1 = base_df[column] d2 = current_df[column] is_same_dist=ks_2samp(d1, d2) if threshold <=is_same_dist.pvalue: is_found = False else: is_found = True status = False report.update({column:{ "p_value":float(is_same_dist.pvalue), "drift_status":is_found }}) drift_report_file_path = self.data_validation_config.drift_report_file_path dir_path = os.path.dirname(drift_report_file_path) os.makedirs(dir_path, exist_ok=True) write_yaml_file(file_path=drift_report_file_path, content=report) except Exception as e: raise NetworkSecurityException(e, sys) def initiate_data_validation(self)-> DataValidationArtifact: try: train_file_path = self.data_ingestion_artifact.training_file_path test_file_path = self.data_ingestion_artifact.testing_file_path ## Read data from train and test train_dataframe = DataValidation.read_data(train_file_path) test_dataframe = DataValidation.read_data(test_file_path) ## Validate number of columns status = self.validate_number_of_columns(dataframe=train_dataframe) if not status: error_message = f"Train dataframe does not contain all columns \n" status = self.validate_number_of_columns(dataframe=test_dataframe) if not status: error_message = f"Test dataframe does not contain all columns \n" ## Check datadrift status=self.detect_dataset_drift(base_df=train_dataframe, current_df=test_dataframe) dir_path = os.path.dirname(self.data_validation_config.valid_train_file_path) os.makedirs(dir_path, exist_ok=True) train_dataframe.to_csv(self.data_validation_config.valid_train_file_path, index=False, header=True) test_dataframe.to_csv(self.data_validation_config.valid_test_file_path, index=False, header=True) data_validation_artifact = DataValidationArtifact( validation_status=status, valid_train_file_path=self.data_validation_config.valid_train_file_path, valid_test_file_path=self.data_validation_config.valid_test_file_path, invalid_train_file_path=None, invalid_test_file_path=None, drift_report_file_path=self.data_validation_config.drift_report_file_path ) return data_validation_artifact except Exception as e: raise NetworkSecurityException(e, sys)