Abeshith's picture
Add pipeline stages implementation
a7d80f2
import sys
from typing import Optional
class MLPipelineException(Exception):
def __init__(self, error_message: str, error_detail: Optional[sys] = None):
super().__init__(error_message)
self.error_message = error_message
if error_detail:
_, _, exc_tb = error_detail.exc_info()
if exc_tb:
self.file_name = exc_tb.tb_frame.f_code.co_filename
self.line_number = exc_tb.tb_lineno
else:
self.file_name = "Unknown"
self.line_number = 0
else:
self.file_name = "Unknown"
self.line_number = 0
def __str__(self):
return f"Error in {self.file_name} at line {self.line_number}: {self.error_message}"
class DataIngestionException(MLPipelineException):
pass
class DataValidationException(MLPipelineException):
pass
class DataTransformationException(MLPipelineException):
pass
class FeatureEngineeringException(MLPipelineException):
pass
class ModelTrainingException(MLPipelineException):
pass
class ModelEvaluationException(MLPipelineException):
pass
class ModelPusherException(MLPipelineException):
pass
class ConfigurationException(MLPipelineException):
pass