03chrisk commited on
Commit
6a9e30e
·
1 Parent(s): 9fcc62f

creating a pipeline

Browse files
air-quality-forecast/data_pipeline.py CHANGED
@@ -1,6 +1,8 @@
1
  import pandas as pd
2
  import os
3
 
 
 
4
 
5
  class PreprocessingPipeline:
6
  def __init__(self):
@@ -10,51 +12,123 @@ class PreprocessingPipeline:
10
  :param raw_data_path: Path to the raw data directory
11
  :param processed_data_path: Path to the processed data directory
12
  """
13
-
 
14
  project_root = os.path.dirname(os.path.dirname(__file__))
15
- raw_data_path = os.path.join(project_root, 'data', 'raw')
 
 
 
 
 
16
 
17
- print(raw_data_path)
18
- for file in os.listdir(raw_data_path):
19
- print(file)
20
 
 
 
 
21
  def load_raw_data(self):
22
  """
23
  Load the raw data from the specified path.
24
 
25
  :return: Raw data as a Pandas DataFrame
26
  """
27
- # TO DO: Implement loading of raw data
28
- pass
 
 
 
29
 
30
- def merge_raw_data(self, raw_data):
 
 
31
  """
32
  Merge the raw data with additional data from the specified path.
33
 
34
  :param raw_data: Raw data as a Pandas DataFrame
35
  :return: Merged data as a Pandas DataFrame
36
  """
37
- # TO DO: Implement merging of raw data
38
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
- def preprocess_data(self, raw_data):
 
 
 
41
  """
42
  Preprocess the raw data according to the steps outlined in the notebooks.
43
 
44
  :param raw_data: Raw data as a Pandas DataFrame
45
  :return: Preprocessed data as a Pandas DataFrame
46
  """
47
- # TO DO: Implement preprocessing steps
48
- pass
 
 
 
 
 
 
49
 
50
- def save_to_csv(self, name, data):
51
  """
52
  Save the preprocessed data to the specified path.
53
 
54
  :param preprocessed_data: Preprocessed data as a Pandas DataFrame
55
  """
56
- # TO DO: Implement saving of preprocessed data
57
- pass
58
 
59
  def run_pipeline(self):
60
  """
 
1
  import pandas as pd
2
  import os
3
 
4
+ from utils import FeatureSelector
5
+
6
 
7
  class PreprocessingPipeline:
8
  def __init__(self):
 
12
  :param raw_data_path: Path to the raw data directory
13
  :param processed_data_path: Path to the processed data directory
14
  """
15
+
16
+ # Global project root path
17
  project_root = os.path.dirname(os.path.dirname(__file__))
18
+
19
+ # Path to the raw data directory
20
+ self.raw_data_path = os.path.join(project_root, 'data', 'raw')
21
+
22
+ # Path to the processed data directory
23
+ self.processed_data_path = os.path.join(project_root, 'data', 'processed')
24
 
25
+ # Initializing the raw datasets
26
+ self.raw_griftpark_data, self.raw_utrecht_data = self.load_raw_data()
 
27
 
28
+ # Initializing the merged dataset
29
+ self.merged_data = self.merge_raw_data()
30
+
31
  def load_raw_data(self):
32
  """
33
  Load the raw data from the specified path.
34
 
35
  :return: Raw data as a Pandas DataFrame
36
  """
37
+ # Load the first data file
38
+ raw_griftpark_data = pd.read_csv(os.path.join(self.raw_data_path, 'v1_raw_griftpark,-utrecht-air-quality.csv'))
39
+
40
+ # Load the second data file
41
+ raw_utrecht_data = pd.read_csv(os.path.join(self.raw_data_path, 'v1_utrecht 2014-01-29 to 2024-09-11.csv'))
42
 
43
+ return raw_griftpark_data, raw_utrecht_data
44
+
45
+ def merge_raw_data(self):
46
  """
47
  Merge the raw data with additional data from the specified path.
48
 
49
  :param raw_data: Raw data as a Pandas DataFrame
50
  :return: Merged data as a Pandas DataFrame
51
  """
52
+ raw_additional_data = self.raw_utrecht_data
53
+ griftpark_data = self.raw_griftpark_data
54
+
55
+ # Convert the 'date' column to datetime format and format the datetime column to 'dd/mm/yyyy'
56
+ raw_additional_data['datetime'] = pd.to_datetime(raw_additional_data['datetime'], format='%Y-%m-%d').dt.strftime('%d/%m/%Y')
57
+
58
+ # Merge the additional data with the raw data
59
+ merged_df = pd.merge(griftpark_data, raw_additional_data, left_on='date', right_on='datetime')
60
+
61
+ # Save the merged data
62
+ self.save_to_csv('v1_merged_weather_data.csv', merged_df, self.processed_data_path)
63
+
64
+ return merged_df
65
+
66
+ def select_features(self, data):
67
+ """
68
+ Select the relevant features from the raw data.
69
+
70
+ :param data: Raw data as a Pandas DataFrame
71
+ :return: Data with selected features as a Pandas DataFrame
72
+ """
73
+ #Remove textual/uninformative features
74
+ cols_to_drop = FeatureSelector.uninformative_columns()
75
+ data.drop(cols_to_drop, axis=1, inplace=True)
76
+
77
+ #Rename wrongly named columns
78
+ data = FeatureSelector.rename_initial_columns(data)
79
+
80
+ #Convert columns to numeric
81
+ data = FeatureSelector.change_to_numeric(data)
82
+
83
+ #Calculate correlations between features and O3/NO2
84
+ selected_columns = FeatureSelector.select_cols_by_correlation(data)
85
+
86
+ #Add domain knowledge columns
87
+ domain_knowledge_columns = ['precip','windspeed', 'winddir']
88
+ selected_columns = selected_columns + domain_knowledge_columns
89
+
90
+ return data[selected_columns]
91
+
92
+ def apply_time_shift(self, data, t = 3):
93
+ """
94
+ Applies the time shift to the dataset and adds the shifted columns.
95
+ """
96
+ all_cols = data.columns
97
+
98
+ for t in range(1,t+1):
99
+ for col in all_cols:
100
+ data[[f'{col} - day {t}']] = data[[col]].shift(-t)
101
+
102
+ for t in range(1,t):
103
+ for col in ['o3', 'no2']:
104
+ data[[f'{col} + day {t}']] = data[[col]].shift(t)
105
 
106
+ data[data.columns] = data[data.columns].apply(pd.to_numeric)
107
+ return data
108
+
109
+ def preprocess_data(self, data):
110
  """
111
  Preprocess the raw data according to the steps outlined in the notebooks.
112
 
113
  :param raw_data: Raw data as a Pandas DataFrame
114
  :return: Preprocessed data as a Pandas DataFrame
115
  """
116
+ data = self.select_features(data)
117
+ data = self.apply_time_shift(data)
118
+ data.drop(['pm25','pm10','temp','humidity','visibility','solarradiation','precip','windspeed','winddir'], axis=1, inplace=True)
119
+ data.drop(index=['29/01/2014','30/01/2014','31/01/2014', '10/09/2024', '11/09/2024'], inplace=True)
120
+
121
+ return data
122
+
123
+
124
 
125
+ def save_to_csv(self, name, data, path):
126
  """
127
  Save the preprocessed data to the specified path.
128
 
129
  :param preprocessed_data: Preprocessed data as a Pandas DataFrame
130
  """
131
+ data.to_csv(os.path.join(path, name))
 
132
 
133
  def run_pipeline(self):
134
  """
air-quality-forecast/utils.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File with utilities
3
+ """
4
+
5
+ class FeatureSelector:
6
+ def uninformative_columns() -> list:
7
+ """ Those columns provide no information that the model can use"""
8
+ return ["Unnamed: 0", 'name', 'datetime', 'sunrise', 'sunset', 'preciptype', 'conditions', 'description', 'icon', 'stations']
9
+ def rename_initial_columns(data):
10
+ """ Rename the columns of the datasets to remove whitespaces."""
11
+ data = data.rename(columns={" pm25": "pm25", " pm10": "pm10", " o3": "o3", " no2": "no2", " so2": "so2"})
12
+ return data
13
+ def change_to_numeric(data):
14
+ """ Change each entry to a numerical value."""
15
+ data.loc[:, data.columns != 'date'] = data.loc[:, data.columns != 'date'].apply(pd.to_numeric, errors='coerce')
16
+ return data
17
+ def select_cols_by_correlation(data) -> list:
18
+ """ Select columns based on correlation criteria."""
19
+ #Step 1: Calculate correlations between features and O3/NO2
20
+ corr_no2 = abs(data.loc[:, data.columns != 'date'].corr()['no2'])
21
+ corr_o3 = abs(data.loc[:, data.columns != 'date'].corr()['o3'])
22
+ #Step 2: Remove the columns not correlated with any of the labels
23
+ columns_above_threshold = (corr_no2 > 0.3) | (corr_o3 > 0.3)
24
+ selected_columns = columns_above_threshold[columns_above_threshold].index
25
+ #Step 3: Remove the columns with high correlations with each other (chosen by manual inspection of the correlation matrix)
26
+ to_remove = ['feelslikemax', 'feelslikemin', 'feelslike', 'tempmin', 'tempmax', 'dew', 'solarenergy', 'uvindex']
27
+ selected_columns = [item for item in selected_columns if item not in to_remove]
28
+ return selected_columns
29
+
30
+
data/processed/v1_merged_weather_data.csv CHANGED
The diff for this file is too large to render. See raw diff
 
data/processed/v2_merged_selected_features_with_missing.csv CHANGED
The diff for this file is too large to render. See raw diff
 
notebooks/n2_exploratory_data_analysis.ipynb CHANGED
The diff for this file is too large to render. See raw diff