File size: 5,783 Bytes
17c5137 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | from src.entity import artifact_entity
from src.entity import config_entity
from src.logger import logging
from src.exception import CropException
from src.config import TARGET_COLUMN
from src import utils
from typing import Optional
from scipy.stats import ks_2samp
import pandas as pd
import numpy as np
import sys
import os
class DataValidation:
def __init__(
self,
data_validation_config: config_entity.DataValidationConfig,
data_ingestion_artifact: artifact_entity.DataIngestionArtifact,
):
try:
logging.info(f"{'>'*20} Data Validation iniated {'<'*20}")
self.data_validation_config = data_validation_config
self.data_ingestion_artifact = data_ingestion_artifact
self.validation_error = dict()
except Exception as e:
raise CropException(e, sys)
def is_required_columns_exists(
self, base_df: pd.DataFrame, current_df: pd.DataFrame, report_key_name: str
) -> bool:
try:
base_columns = base_df.columns
current_columns = current_df.columns
missing_columns = []
for base_column in base_columns:
if base_column not in current_columns:
logging.info(f"Column: {base_column} is not available")
missing_columns.append(base_column)
if len(missing_columns) > 0:
self.validation_error[report_key_name] = missing_columns
return False
return True
except Exception as e:
raise CropException(e, sys)
def data_drift(
self, base_df: pd.DataFrame, current_df: pd.DataFrame, report_key_name: str
):
try:
drift_report = dict()
base_columns = base_df.columns
current_columns = current_df.columns
for base_column in base_columns:
base_data, current_data = base_df[base_column], current_df[base_column]
# Null hypothesis is that both columns data drawn from same distribution
logging.info(
f"Hypothesis {base_column} : {base_data.dtype}, {current_data.dtype}"
)
same_distribution = ks_2samp(base_data, current_data)
if same_distribution.pvalue > 0.05:
# we are accepting the null hypothesis
drift_report[base_column] = {
"pvalue": float(same_distribution.pvalue),
"same_distribution": True,
}
else:
drift_report[base_column] = {
"pvalue": float(same_distribution.pvalue),
"same_distribution": False,
}
self.validation_error[report_key_name] = drift_report
except Exception as e:
raise CropException(e, sys)
def initiate_data_validation(self) -> artifact_entity.DataValidationArtifact:
try:
logging.info(f"Reading base dataframe")
base_df = pd.read_csv(self.data_validation_config.base_file_path)
logging.info(f"Reading train dataframe")
train_df = pd.read_csv(self.data_ingestion_artifact.train_file_path)
logging.info(f"Reading test dataframe")
test_df = pd.read_csv(self.data_ingestion_artifact.test_file_path)
exclude_column = [TARGET_COLUMN]
base_df = utils.seperate_dependant_column(
df=base_df, exclude_column=exclude_column
)
train_df = utils.seperate_dependant_column(
df=train_df, exclude_column=exclude_column
)
test_df = utils.seperate_dependant_column(
df=test_df, exclude_column=exclude_column
)
logging.info(f"Is all required columns present in the train_df")
train_df_columns_status = self.is_required_columns_exists(
base_df=base_df,
current_df=train_df,
report_key_name="missing_columns_within_train_dataset",
)
test_df_columns_status = self.is_required_columns_exists(
base_df=base_df,
current_df=test_df,
report_key_name="missing_columns_within_test_dataset",
)
if train_df_columns_status:
logging.info(
f"As all column are available in train df hence detecting data drift"
)
self.data_drift(
base_df=base_df,
current_df=train_df,
report_key_name="data_drift_within_train_dataset",
)
if test_df_columns_status:
logging.info(
f"As all column are available in test df hence detecting data drift"
)
self.data_drift(
base_df=base_df,
current_df=test_df,
report_key_name="data_drift_within_test_dataset",
)
# writing the report
logging.info("Writing report in yaml format")
utils.write_yaml_file(
file_path=self.data_validation_config.report_file_path,
data=self.validation_error,
)
data_validation_artifact = artifact_entity.DataValidationArtifact(
report_file_path=self.data_validation_config.report_file_path
)
logging.info(f"Data validation artifact: {data_validation_artifact}")
return data_validation_artifact
except Exception as e:
raise CropException(e, sys)
|