Spaces:
Sleeping
Sleeping
File size: 4,671 Bytes
122a65f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
from src.entity.artifact_entity import DataIngestionArtifact, DataValidationArtifact
from src.entity.config_entity import DataValidationConfig
from src.exception.exception import DeliveryTimeException
from src.logging.logger import logging
from src.constants.training_pipeline import SCHEMA_FILE_PATH
from scipy.stats import ks_2samp
import pandas as pd
import os, sys
from src.utils.main_utils.utils import read_yaml_file, write_yaml_file
class DataValidation:
def __init__(self, data_ingestion_artifact:DataIngestionArtifact,
data_validation_config:DataValidationConfig):
try:
self.data_ingestion_artifact=data_ingestion_artifact
self.data_validation_config=data_validation_config
self._schema_config=read_yaml_file(SCHEMA_FILE_PATH)
except Exception as e:
raise DeliveryTimeException(e, sys)
@staticmethod
def read_data(file_path)->pd.DataFrame:
try:
return pd.read_csv(file_path)
except Exception as e:
raise DeliveryTimeException(e, sys)
def validate_number_of_columns(self, dataframe:pd.DataFrame)->bool:
try:
number_of_columns=len(self._schema_config)
logging.info(f"Required number of columns: {number_of_columns}")
logging.info(f"Data frame has columns: {len(dataframe.columns)}")
if len(dataframe.columns) == number_of_columns:
return True
return False
except Exception as e:
raise DeliveryTimeException(e, sys)
def detect_dataset_drift(self, base_df, current_df, threshold=0.05)->bool:
try:
status=True
report={}
for column in base_df.columns:
d1=base_df[column]
d2=current_df[column]
is_same_dist=ks_2samp(d1, d2)
if threshold <= is_same_dist.pvalue:
is_found=False
else:
is_found=True
status=False
report.update({column:{
"p_value":float(is_same_dist.pvalue),
"drift_status":is_found
}})
drift_report_file_path=self.data_validation_config.drift_report_file_path
# Create directory
dir_path=os.path.dirname(drift_report_file_path)
os.makedirs(dir_path, exist_ok=True)
write_yaml_file(file_path=drift_report_file_path, content=report)
except Exception as e:
raise DeliveryTimeException(e, sys)
def initiate_data_validation(self)->DataValidationArtifact:
try:
train_file_path=self.data_ingestion_artifact.trained_file_path
test_file_path=self.data_ingestion_artifact.test_file_path
# Read the data from train and test
train_dataframe=DataValidation.read_data(train_file_path)
test_dataframe=DataValidation.read_data(test_file_path)
# Validate the numer of columns
status=self.validate_number_of_columns(dataframe=train_dataframe)
if not status:
error_message=f"Train dataframe does not contain all columns. \n"
status = self.validate_number_of_columns(dataframe=test_dataframe)
if not status:
error_message=f"Test dataframe does not contain all columns.\n"
# Let's check datadrift
status=self.detect_dataset_drift(base_df=train_dataframe, current_df=test_dataframe)
dir_path=os.path.dirname(self.data_validation_config.valid_train_file_path)
os.makedirs(dir_path, exist_ok=True)
train_dataframe.to_csv(
self.data_validation_config.valid_train_file_path, index=False, header=True
)
test_dataframe.to_csv(
self.data_validation_config.valid_test_file_path, index=False, header=True
)
data_validation_artifact=DataValidationArtifact(
validation_status=status,
valid_train_file_path=self.data_ingestion_artifact.trained_file_path,
valid_test_file_path=self.data_ingestion_artifact.test_file_path,
invalid_train_file_path=None,
invalid_test_file_path=None,
drift_report_file_path=self.data_validation_config.drift_report_file_path,
)
return data_validation_artifact
except Exception as e:
raise DeliveryTimeException(e, sys) |