Alquilar78 commited on
Commit
b7ff5e0
·
1 Parent(s): 9b10f02

Cleaning dossier

Browse files
airflow/dags/etl_main.py CHANGED
@@ -44,8 +44,8 @@ def merge_weather_traffic_data(**context):
44
  filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_weather_traffic_data.json"
45
 
46
 
47
- local_weather_path = f"/tmp/{filename_weather}"
48
- result_local_path = f"/tmp/{filename}"
49
 
50
  with open(local_weather_path, "r") as f:
51
  weather_data = json.load(f)
 
44
  filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_weather_traffic_data.json"
45
 
46
 
47
+ local_weather_path = f"/opt/airflow/data/{filename_weather}"
48
+ result_local_path = f"/opt/airflow/data/{filename}"
49
 
50
  with open(local_weather_path, "r") as f:
51
  weather_data = json.load(f)
airflow/dags/quality_air_etl.py DELETED
@@ -1,187 +0,0 @@
1
- """
2
- To use this DAG, you need to set some variables within the Airflow UI:
3
-
4
- - `WeatherBitApiKey`: the API key to use to access the WeatherBit API.
5
- - `S3BucketName`: the name of the S3 bucket where the data will be stored.
6
-
7
- Also set the connection for the Postgres database and the AWS account.
8
- """
9
- import json
10
- import logging
11
- from datetime import datetime
12
- from zoneinfo import ZoneInfo
13
-
14
- import pandas as pd
15
- import requests
16
- from airflow import DAG
17
- from airflow.hooks.S3_hook import S3Hook
18
- from airflow.models import Variable
19
- from airflow.operators.dummy_operator import DummyOperator
20
- from airflow.operators.python_operator import PythonOperator
21
- from airflow.providers.postgres.operators.postgres import PostgresOperator
22
- from s3_to_postgres import S3ToPostgresOperator
23
- from airflow.utils.task_group import TaskGroup
24
-
25
-
26
- default_args = {
27
- "owner": "airflow",
28
- "start_date": datetime(2022, 6, 1),
29
- }
30
-
31
- # Configure le logger au niveau du module
32
- logging.basicConfig(level=logging.INFO)
33
- logger = logging.getLogger(__name__)
34
- paris_time = datetime.now(ZoneInfo("Europe/Paris"))
35
-
36
- def _fetch_weather_data(**context):
37
- """Fetches data from OpenWeatherMap API and save it to S3.
38
- """
39
- logging.info(f"Fetching Weather data")
40
- # Get the API key from the Variables
41
- api_key = Variable.get("OpenWeatherApiKey")
42
- # Fetch OpenWeatherMap
43
- full_url = f"https://api.openweathermap.org/data/2.5/weather?q=Paris&appid={api_key}&units=metric"
44
- response = requests.get(full_url)
45
- # We create a filename like: 20220601-123000_weather_data.json
46
- filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_openweather_data.json"
47
- # Let's temprorary save this file into /tmp folder
48
- full_path_to_file = f"/tmp/{filename}"
49
- s3_path = 'datasets/input/meteo/'
50
-
51
- with open(full_path_to_file, "w") as f:
52
- json.dump(response.json(), f)
53
- # Connect to our S3 bucket and load the file
54
- # filename is the path to our file and key is the full path inside the
55
- # bucket
56
- s3_hook = S3Hook(aws_conn_id="aws_default")
57
- s3_hook.load_file(filename=full_path_to_file, key=s3_path+filename, bucket_name=Variable.get("S3BucketName"))
58
- # Let's push the filename to the context so that we can use it later
59
- context["task_instance"].xcom_push(key="weather_filename", value=filename)
60
- logging.info(f"Saved weather data to {filename}")
61
-
62
- def _fetch_trafic_data():
63
- """Récupère les données de trafic de Rennes Métropole"""
64
- url = "https://data.rennesmetropole.fr/api/explore/v2.1/catalog/datasets/etat-du-trafic-en-temps-reel/records"
65
- params = {
66
- "select": "datetime,denomination,averagevehiclespeed,traveltime,trafficstatus",
67
- "where": "averagevehiclespeed > 0 and trafficstatus != 'unknown'",
68
- "order_by": "datetime desc",
69
- "limit": 100,
70
- "timezone": "Europe/Paris"
71
- }
72
- try:
73
- response = requests.get(url, params=params)
74
- response.raise_for_status()
75
- logger.info("✅ Données récupérées avec succès depuis l'API Rennes Métropole.")
76
- return response.json()["results"]
77
- except Exception as e:
78
- logger.error(f"❌ Erreur lors de la récupération des données : {e}")
79
- raise
80
-
81
- def _process_traffic_data(data):
82
- """Nettoie les données sans les agréger"""
83
- df = pd.DataFrame(data)
84
- df["datetime"] = pd.to_datetime(df["datetime"])
85
- df["averagevehiclespeed"] = pd.to_numeric(df["averagevehiclespeed"], errors="coerce")
86
- df["traveltime"] = pd.to_numeric(df["traveltime"], errors="coerce")
87
-
88
- latest_datetime = df["datetime"].max()
89
- df_latest = df[df["datetime"] == latest_datetime]
90
-
91
- agg_df = (
92
- df_latest.groupby(["denomination", "datetime"], as_index=False)
93
- .agg({
94
- "averagevehiclespeed": "mean",
95
- "traveltime": "mean",
96
- "trafficstatus": "first"
97
- })
98
- .sort_values(by="trafficstatus", ascending=False)
99
- .reset_index(drop=True) # <-- reset index ici
100
- )
101
-
102
- # Remplacer les valeurs textuelles de trafficstatus par des valeurs numériques
103
- agg_df["trafficstatus_numeric"] = agg_df["trafficstatus"].map({"heavy": 1, "freeFlow":0, "congested":2})
104
-
105
- # Calculer la moyenne (si tu veux l'afficher ou l'utiliser plus tard)
106
- mean_trafficstatus = agg_df["trafficstatus_numeric"].mean()
107
-
108
- logger.info(f"📊 Moyenne du trafficstatus (freeFlow=0,heavy=1,congested=2): {mean_trafficstatus:.2f}")
109
-
110
- return mean_trafficstatus
111
-
112
-
113
- def _get_traffic_data(**context):
114
- data = _fetch_trafic_data()
115
- traffic_value = _process_traffic_data(data)
116
- context["task_instance"].xcom_push(key="traffic_value", value=traffic_value)
117
- logging.info(f"Saved traffic value : {traffic_value}")
118
-
119
- def _transform_weather_traffic_data(**context):
120
- """Transforms raw data from JSON file to ingestable data for Postgres.
121
- """
122
- # We create a filename like: 20220601-123000_weather_traffic_data.json
123
- filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_weather_traffic_data.json"
124
-
125
- # We get the filename from the context
126
- filename_weather = context["task_instance"].xcom_pull(key="weather_filename")
127
- # Connect to our S3 bucket and download the JSON file
128
- s3_hook = S3Hook(aws_conn_id="aws_default")
129
- s3_path = 'datasets/input/meteo/'
130
- returned_filename_weather = s3_hook.download_file(s3_path+filename_weather, bucket_name=Variable.get("S3BucketName"), local_path="/tmp")
131
- with open(returned_filename_weather, "r") as f_weather:
132
- raw_data_json_weather = json.load(f_weather)
133
-
134
- #Weather CSV like : Paris,Clear,clear sky,2.3488,48.8534,25.51,1023,43,3.6,360
135
- #Traffic value : Between 0 and 2
136
-
137
- transformed_data = {
138
- "main_temp" : raw_data_json_weather["main"]["temp"],
139
- "main_pressure" : raw_data_json_weather["main"]["pressure"],
140
- "main_humidity" : raw_data_json_weather["main"]["humidity"],
141
- "wind_speed" : raw_data_json_weather["wind"]["speed"],
142
- "traffic" : context["task_instance"].xcom_pull(key="traffic_value")
143
- }
144
- df = pd.DataFrame(transformed_data, index=[0])
145
- # Keep the same filename between the JSON file and the CSV
146
- csv_filename = filename.split(".")[0] + ".csv"
147
- csv_filename_full_path = f"/tmp/{csv_filename}"
148
- s3_csv_key = 'datasets/input/'+ csv_filename
149
- # Save it temporarily in /tmp folder
150
- df.to_csv(csv_filename_full_path, index=False, header=False)
151
-
152
- # Load it to S3
153
- s3_hook.load_file(filename=csv_filename_full_path, key=s3_csv_key, bucket_name=Variable.get("S3BucketName"))
154
- # Push the filename to the context so that we can use it later
155
- context["task_instance"].xcom_push(key="input_data_csv_filename", value=s3_csv_key)
156
-
157
-
158
-
159
- with DAG(dag_id="quality_air_etl_dag", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
160
- start = DummyOperator(task_id="start")
161
-
162
- with TaskGroup(group_id="weather_branch") as weather_branch:
163
-
164
- fetch_weather_data = PythonOperator(task_id="fetch_weather_data", python_callable=_fetch_weather_data)
165
-
166
- fetch_weather_data
167
-
168
- with TaskGroup(group_id="traffic_branch") as traffic_branch:
169
- fetch_traffic_data = PythonOperator(task_id="fetch_traffic_data", python_callable=_get_traffic_data)
170
-
171
- fetch_traffic_data
172
-
173
- with TaskGroup(group_id="ml_branch") as ml_branch:
174
- get_input_meteo_traffic_csv = DummyOperator(task_id="get_input_meteo_traffic_csv")
175
- pull_run_model = DummyOperator(task_id="pull_run_model")
176
-
177
- get_input_meteo_traffic_csv >> pull_run_model
178
-
179
- transform_weather_traffic_data = PythonOperator(
180
- task_id="transform_weather_traffic_data", python_callable=_transform_weather_traffic_data
181
- )
182
-
183
-
184
-
185
- end = DummyOperator(task_id="end")
186
-
187
- start >> [weather_branch, traffic_branch] >> transform_weather_traffic_data >> ml_branch >> end
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
airflow/dags/tasks/fetch_weather_data.py CHANGED
@@ -34,7 +34,12 @@ def fetch_weather_data(**context):
34
  response.raise_for_status()
35
 
36
  filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_openweather_data.json"
37
- local_path = f"/tmp/{filename}"
 
 
 
 
 
38
  s3_key = f"datasets/input/meteo/{filename}"
39
 
40
  with open(local_path, "w") as f:
@@ -56,7 +61,7 @@ def fetch_weather_data(**context):
56
  def main():
57
  print("▶️ Test local : récupération météo")
58
  filename = fetch_weather_data()
59
- print(f"✅ Fichier météo généré : /tmp/{filename}")
60
 
61
 
62
  if __name__ == "__main__":
 
34
  response.raise_for_status()
35
 
36
  filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_openweather_data.json"
37
+
38
+ if context:
39
+ local_path = f"/opt/airflow/data/{filename}"
40
+ else:
41
+ local_path = f"../data/{filename}"
42
+
43
  s3_key = f"datasets/input/meteo/{filename}"
44
 
45
  with open(local_path, "w") as f:
 
61
  def main():
62
  print("▶️ Test local : récupération météo")
63
  filename = fetch_weather_data()
64
+ print(f"✅ Fichier météo généré : {filename}")
65
 
66
 
67
  if __name__ == "__main__":
airflow/dags/test.py DELETED
@@ -1,6 +0,0 @@
1
- from airflow import DAG
2
- from datetime import datetime
3
- import pandas as pd
4
-
5
- with DAG("crypto_dag", start_date=datetime(2022, 1, 1), schedule_interval="@hourly", catchup=False) as dag:
6
- pass