Spaces:
Sleeping
Sleeping
| import sys | |
| from sklearn.model_selection import train_test_split | |
| from src.core.constants import DATABASE_NAME, TRAINING_COLLECTION_NAME | |
| from src.core.mongo_client import MongoDBClient | |
| from src.core.logger import logging | |
| from src.core.exception import AppException | |
| from src.core.configuration import AppConfiguration | |
| import gc | |
| class DataIngestion: | |
| def __init__(self, app_config = AppConfiguration()): | |
| """ | |
| DataIngestion Intialization | |
| data_ingestion_config: DataIngestionConfig | |
| """ | |
| try: | |
| self.data_ingestion_config = app_config.data_ingestion_config() | |
| except Exception as e: | |
| logging.error(f"Data Ingestion error: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def download_data(self): | |
| """ | |
| Downloads data from MongoDB and saves it in csv format, and also | |
| splits the data into training and testing sets and saves them in parquet format. | |
| Raises: | |
| AppException: If there is an error downloading data from MongoDB | |
| """ | |
| try: | |
| save_ingested_data_path = self.data_ingestion_config.ingested_data_path | |
| save_train_data_path = self.data_ingestion_config.train_data_path | |
| save_test_data_path = self.data_ingestion_config.test_data_path | |
| logging.info("Connecting to MongoDB") | |
| client = MongoDBClient() | |
| data = client.get_all_docs(database_name=DATABASE_NAME, collection_name=TRAINING_COLLECTION_NAME) | |
| data.to_csv(save_ingested_data_path, index=False) | |
| logging.info(f"Raw ingested data saved successfully saved at {save_ingested_data_path}") | |
| # close mongo connection | |
| client.close_connection() | |
| logging.info("Splitting data into training and testing sets") | |
| train_data, test_data = train_test_split(data, test_size=0.2, random_state=42) | |
| train_data.to_parquet(save_train_data_path, index=False) | |
| test_data.to_parquet(save_test_data_path, index=False) | |
| logging.info(f"Successfully saved train data and test data") | |
| # free memory | |
| del data, train_data, test_data | |
| gc.collect() | |
| except Exception as e: | |
| logging.error(f"Failed to download data from MongoDB: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| def initiate_data_ingestion(): | |
| """ | |
| Main function to initiate the Data Ingestion workflow. It downloads data from MongoDB and | |
| saves it to the specified directory using the configuration provided in the | |
| data_ingestion_config. | |
| Raises: | |
| AppException: If an error occurs during data ingestion. | |
| """ | |
| obj = DataIngestion() | |
| try: | |
| logging.info(f"{'='*20}Data Ingestion{'='*20}") | |
| obj.download_data() | |
| logging.info(f"{'='*20}Data Ingestion Completed Successfully{'='*20} \n\n") | |
| except Exception as e: | |
| logging.error(f"Error in Data Ingestion process: {e}", exc_info=True) | |
| raise AppException(e, sys) | |
| if __name__ == "__main__": | |
| initiate_data_ingestion() |