Sentence-Translator / src /components /data_ingestion.py
VashuTheGreat2's picture
Upload folder using huggingface_hub
b758d48 verified
Raw
History Blame Contribute Delete
4.3 kB
from src.exception import MyException
import sys
import logging
from src.entity.config_entity import DataIngestionConfig
from src.entity.artifact_entity import DataIngestionArtifact
from src.data_access.data_fetcher import SentenceDataFetcher
from abc import ABC, abstractmethod
from sklearn.model_selection import train_test_split
import os
from pandas import DataFrame
import pandas as pd
class Data_Ingestion(ABC):
def __init__(self):
super().__init__()
@abstractmethod
async def initiate_data_ingestion(self) -> DataIngestionArtifact:
pass
class Sentence_Data_Ingestion(Data_Ingestion):
def __init__(self, data_ingestion_config: DataIngestionConfig):
super().__init__()
self.data_ingestion_config = data_ingestion_config
async def split_data_as_train_test(self, dataframe: DataFrame) -> None:
logging.info("Entered split_data_as_train_test method of Data_Ingestion class")
try:
train_test_ratio = self.data_ingestion_config.train_test_split_ratio
train, test = train_test_split(dataframe, random_state=42, shuffle=True, test_size=train_test_ratio)
logging.info("Performed train test split on the dataframe")
logging.info(f"Train shape: {train.shape}, Test shape: {test.shape}")
dir_path = os.path.dirname(self.data_ingestion_config.training_file_path)
os.makedirs(dir_path, exist_ok=True)
logging.info("Created directories for train and test files")
logging.info("Exporting train and test file path.")
train.to_csv(self.data_ingestion_config.training_file_path, index=False, header=True)
test.to_csv(self.data_ingestion_config.testing_file_path, index=False, header=True)
logging.info("Exported train and test file path.")
logging.info("Exited split_data_as_train_test method")
except Exception as e:
logging.error(f"Error in split_data_as_train_test: {str(e)}")
raise MyException(e, sys)
async def export_data_into_feature_store(self) -> DataFrame:
try:
logging.info("Entered export_data_into_feature_store method")
data_fetcher = SentenceDataFetcher(url=self.data_ingestion_config.data_base_url)
logging.info("DataFetcher initialized successfully")
data = await data_fetcher.export_data_as_df(split='train')
logging.info("Data fetched successfully from parquet")
dir_path = os.path.dirname(self.data_ingestion_config.feature_store_file_path)
os.makedirs(dir_path, exist_ok=True)
logging.info("Created directory for feature store")
data.to_csv(self.data_ingestion_config.feature_store_file_path, index=False, header=True)
logging.info(f"Data exported to feature store. Shape: {data.shape}")
logging.info("Exited export_data_into_feature_store method")
return data
except Exception as e:
logging.error(f"Error in export_data_into_feature_store: {str(e)}")
raise MyException(e, sys)
async def initiate_data_ingestion(self) -> DataIngestionArtifact:
try:
logging.info("Entered initiate_data_ingestion method")
data = await self.export_data_into_feature_store()
logging.info("Feature store export completed")
await self.split_data_as_train_test(dataframe=data)
logging.info("Train-test split completed")
data_ingestion_artifact = DataIngestionArtifact(
train_file_path=self.data_ingestion_config.training_file_path,
test_file_path=self.data_ingestion_config.testing_file_path,
features_file_path=self.data_ingestion_config.feature_store_file_path
)
logging.info("DataIngestionArtifact created successfully")
logging.info("Data ingestion pipeline completed successfully")
logging.info("Exited initiate_data_ingestion method")
return data_ingestion_artifact
except Exception as e:
logging.error(f"Error in initiate_data_ingestion: {str(e)}")
raise MyException(e, sys)