Abeshith's picture
Add pipeline stages implementation
a7d80f2
import pandas as pd
from mlpipeline.entity import DataValidationConfig, DataValidationArtifact
from mlpipeline.logging.logger import get_logger
from mlpipeline.exception import DataValidationException
from mlpipeline.utils.common import read_yaml
import sys
logger = get_logger(__name__)
class DataValidation:
def __init__(self, config: DataValidationConfig):
self.config = config
def validate_schema(self) -> DataValidationArtifact:
try:
logger.info("Starting data validation")
df = pd.read_csv(self.config.data_dir)
validation_status = True
errors = []
if df.empty:
validation_status = False
errors.append("Dataset is empty")
if df.isnull().all().any():
validation_status = False
errors.append("Columns with all null values found")
status_message = "Validation passed" if validation_status else "; ".join(errors)
with open(self.config.status_file, "w") as f:
f.write(status_message)
logger.info(f"Validation status: {status_message}")
return DataValidationArtifact(
validation_status=validation_status,
message=status_message,
schema_file_path=self.config.schema_file
)
except Exception as e:
raise DataValidationException(str(e), sys)