nivakaran commited on
Commit
7080f90
·
verified ·
1 Parent(s): dbc10a7

Create data_ingestion.py

Browse files
Files changed (1) hide show
  1. src/components/data_ingestion.py +103 -0
src/components/data_ingestion.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from src.exception.exception import DeliveryTimeException
3
+ from src.logging.logger import logging
4
+
5
+ from src.entity.config_entity import DataIngestionConfig
6
+ from src.entity.artifact_entity import DataIngestionArtifact
7
+ import os
8
+ import sys
9
+ import numpy as np
10
+ import pandas as pd
11
+ import pymongo
12
+ from typing import List
13
+ from sklearn.model_selection import train_test_split
14
+ from dotenv import load_dotenv
15
+
16
+ load_dotenv()
17
+
18
+ MONGO_DB_URL = os.getenv("MONGO_DB_URL")
19
+
20
+ class DataIngestion:
21
+ def __init__(self, data_ingestion_config:DataIngestionConfig):
22
+ try:
23
+ self.data_ingestion_config=data_ingestion_config
24
+ except Exception as e:
25
+ raise DeliveryTimeException(e, sys)
26
+
27
+ def export_collection_as_dataframe(self):
28
+ """
29
+ Read data from mongodb
30
+ """
31
+ try:
32
+ database_name = self.data_ingestion_config.database_name
33
+ collection_name = self.data_ingestion_config.collection_name
34
+ self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
35
+ collection = self.mongo_client[database_name][collection_name]
36
+
37
+ df=pd.DataFrame(list(collection.find()))
38
+ if "_id" in df.columns.to_list():
39
+ df=df.drop(columns=["_id"], axis=1)
40
+
41
+ df.replace({"na":np.nan}, inplace=True)
42
+ return df
43
+
44
+ except Exception as e:
45
+ raise DeliveryTimeException(sys, e)
46
+
47
+ def export_data_into_feature_store(self, dataframe:pd.DataFrame):
48
+ try:
49
+ feature_store_file_path=self.data_ingestion_config.feature_store_file_path
50
+
51
+ dir_path=os.path.dirname(feature_store_file_path)
52
+ os.makedirs(dir_path, exist_ok=True)
53
+ dataframe.to_csv(feature_store_file_path, index=False, header=True)
54
+ return dataframe
55
+
56
+ except Exception as e:
57
+ raise DeliveryTimeException(e, sys)
58
+
59
+
60
+
61
+ def split_data_as_train_test(self, dataframe:pd.DataFrame):
62
+ try:
63
+ train_set, test_set = train_test_split(
64
+ dataframe, test_size=self.data_ingestion_config.train_test_split_ratio
65
+ )
66
+ logging.info("Performed train test split on the dataframe")
67
+
68
+ logging.info(
69
+ "Exited split_data_as_train_test method of Data_Ingestion class"
70
+ )
71
+
72
+ dir_path = os.path.dirname(self.data_ingestion_config.training_file_path)
73
+
74
+ os.makedirs(dir_path, exist_ok=True)
75
+
76
+ logging.info(f"Exporting train and test file path")
77
+
78
+ train_set.to_csv(
79
+ self.data_ingestion_config.training_file_path, index=False, header=True
80
+ )
81
+
82
+ test_set.to_csv(
83
+ self.data_ingestion_config.testing_file_path, index=False, header=True
84
+ )
85
+ logging.info(f"Exported train and test file path")
86
+
87
+ except Exception as e:
88
+ raise DeliveryTimeException(e, sys)
89
+
90
+
91
+ def initiate_date_ingestion(self):
92
+ try:
93
+ dataframe=self.export_collection_as_dataframe()
94
+ print(dataframe.head(5))
95
+ dataframe=self.export_data_into_feature_store(dataframe)
96
+ self.split_data_as_train_test(dataframe)
97
+ dataIngestionArtifact = DataIngestionArtifact(trained_file_path=self.data_ingestion_config.training_file_path,
98
+ test_file_path=self.data_ingestion_config.testing_file_path)
99
+
100
+ return dataIngestionArtifact
101
+
102
+ except Exception as e:
103
+ raise DeliveryTimeException(e, sys)