p2ov commited on
Commit
21a79b8
·
1 Parent(s): 11d74b7

DAG sans modele et à séparer

Browse files
Files changed (1) hide show
  1. airflow/dags/quality_air_etl.py +107 -61
airflow/dags/quality_air_etl.py CHANGED
@@ -9,6 +9,7 @@ Also set the connection for the Postgres database and the AWS account.
9
  import json
10
  import logging
11
  from datetime import datetime
 
12
 
13
  import pandas as pd
14
  import requests
@@ -21,18 +22,24 @@ from airflow.providers.postgres.operators.postgres import PostgresOperator
21
  from s3_to_postgres import S3ToPostgresOperator
22
  from airflow.utils.task_group import TaskGroup
23
 
 
24
  default_args = {
25
  "owner": "airflow",
26
  "start_date": datetime(2022, 6, 1),
27
  }
28
 
 
 
 
 
 
29
  def _fetch_weather_data(**context):
30
- """Fetches data from WeatherBit API and save it to S3.
31
  """
32
- logging.info(f"Fetching weather data")
33
  # Get the API key from the Variables
34
  api_key = Variable.get("OpenWeatherApiKey")
35
- # Fetch WeatherBit
36
  full_url = f"https://api.openweathermap.org/data/2.5/weather?q=Paris&appid={api_key}&units=metric"
37
  response = requests.get(full_url)
38
  # We create a filename like: 20220601-123000_weather_data.json
@@ -52,90 +59,129 @@ def _fetch_weather_data(**context):
52
  context["task_instance"].xcom_push(key="weather_filename", value=filename)
53
  logging.info(f"Saved weather data to {filename}")
54
 
55
- def _transform_weather_data(**context):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  """Transforms raw data from JSON file to ingestable data for Postgres.
57
  """
 
 
 
58
  # We get the filename from the context
59
- filename = context["task_instance"].xcom_pull(key="weather_filename")
60
  # Connect to our S3 bucket and download the JSON file
61
  s3_hook = S3Hook(aws_conn_id="aws_default")
62
  s3_path = 'datasets/input/meteo/'
63
- returned_filename = s3_hook.download_file(s3_path+filename, bucket_name=Variable.get("S3BucketName"), local_path="/tmp")
64
- with open(returned_filename, "r") as f:
65
- raw_data_json = json.load(f)
66
- # We transform the data into a pandas DataFrame
67
- #raw_data_json = raw_data_json[0]
 
 
68
  transformed_data = {
69
- "name" : raw_data_json["name"],
70
- "coord_lon": raw_data_json["coord"]["lon"],
71
- "coord_lat": raw_data_json["coord"]["lat"],
72
- "weather_main" : raw_data_json["weather"][0]["main"],
73
- "weather_description" : raw_data_json["weather"][0]["description"],
74
- "main_temp" : raw_data_json["main"]["temp"],
75
- "main_pressure" : raw_data_json["main"]["pressure"],
76
- "main_humidity" : raw_data_json["main"]["humidity"],
77
- "wind_speed" : raw_data_json["wind"]["speed"],
78
- "wind_deg" : raw_data_json["wind"]["deg"]
79
  }
80
  df = pd.DataFrame(transformed_data, index=[0])
81
  # Keep the same filename between the JSON file and the CSV
82
  csv_filename = filename.split(".")[0] + ".csv"
83
  csv_filename_full_path = f"/tmp/{csv_filename}"
84
- s3_csv_key = 'datasets/input/meteo/'+ csv_filename
85
  # Save it temporarily in /tmp folder
86
  df.to_csv(csv_filename_full_path, index=False, header=False)
 
87
  # Load it to S3
88
  s3_hook.load_file(filename=csv_filename_full_path, key=s3_csv_key, bucket_name=Variable.get("S3BucketName"))
89
  # Push the filename to the context so that we can use it later
90
- context["task_instance"].xcom_push(key="weather_csv_filename", value=s3_csv_key)
 
91
 
92
-
93
 
94
  with DAG(dag_id="quality_air_etl_dag", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
95
  start = DummyOperator(task_id="start")
96
 
97
  with TaskGroup(group_id="weather_branch") as weather_branch:
 
98
  fetch_weather_data = PythonOperator(task_id="fetch_weather_data", python_callable=_fetch_weather_data)
 
 
 
 
 
99
 
100
- transform_weather_data = PythonOperator(
101
- task_id="transform_weather_data",
102
- python_callable=_transform_weather_data
103
- )
104
-
105
- create_weather_table = PostgresOperator(
106
- task_id="create_weather_table",
107
- # In the SQL do not forget to put `IF NOT EXISTS`
108
- sql="""
109
- CREATE TABLE IF NOT EXISTS weather_data (
110
- id SERIAL PRIMARY KEY,
111
- observation_time TIMESTAMP,
112
- name VARCHAR,
113
- weather_main VARCHAR,
114
- weather_description VARCHAR,
115
- coord_lon DECIMAL(5, 2),
116
- coord_lat DECIMAL(5, 2),
117
- main_temp DECIMAL(5, 2),
118
- main_humidity DECIMAL(5, 2),
119
- main_pressure DECIMAL(5, 2),
120
- wind_speed DECIMAL(5, 2),
121
- wind_deg DECIMAL(5, 2)
122
- )
123
- """,
124
- postgres_conn_id="postgres_default",
125
- )
126
-
127
- transfer_weather_data_to_postgres = S3ToPostgresOperator(
128
- task_id="transfer_weather_data_to_postgres",
129
- table="weather_data",
130
- bucket="{{ var.value.S3BucketName }}",
131
- key="{{ task_instance.xcom_pull(key='weather_csv_filename') }}",
132
- postgres_conn_id="postgres_default",
133
- aws_conn_id="aws_default",
134
- )
135
-
136
- fetch_weather_data >> transform_weather_data >> create_weather_table >> transfer_weather_data_to_postgres
137
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  end = DummyOperator(task_id="end")
140
 
141
- start >> [weather_branch] >> end
 
9
  import json
10
  import logging
11
  from datetime import datetime
12
+ from zoneinfo import ZoneInfo
13
 
14
  import pandas as pd
15
  import requests
 
22
  from s3_to_postgres import S3ToPostgresOperator
23
  from airflow.utils.task_group import TaskGroup
24
 
25
+
26
  default_args = {
27
  "owner": "airflow",
28
  "start_date": datetime(2022, 6, 1),
29
  }
30
 
31
+ # Configure le logger au niveau du module
32
+ logging.basicConfig(level=logging.INFO)
33
+ logger = logging.getLogger(__name__)
34
+ paris_time = datetime.now(ZoneInfo("Europe/Paris"))
35
+
36
  def _fetch_weather_data(**context):
37
+ """Fetches data from OpenWeatherMap API and save it to S3.
38
  """
39
+ logging.info(f"Fetching Weather data")
40
  # Get the API key from the Variables
41
  api_key = Variable.get("OpenWeatherApiKey")
42
+ # Fetch OpenWeatherMap
43
  full_url = f"https://api.openweathermap.org/data/2.5/weather?q=Paris&appid={api_key}&units=metric"
44
  response = requests.get(full_url)
45
  # We create a filename like: 20220601-123000_weather_data.json
 
59
  context["task_instance"].xcom_push(key="weather_filename", value=filename)
60
  logging.info(f"Saved weather data to {filename}")
61
 
62
+ def _fetch_trafic_data():
63
+ """Récupère les données de trafic de Rennes Métropole"""
64
+ url = "https://data.rennesmetropole.fr/api/explore/v2.1/catalog/datasets/etat-du-trafic-en-temps-reel/records"
65
+ params = {
66
+ "select": "datetime,denomination,averagevehiclespeed,traveltime,trafficstatus",
67
+ "where": "averagevehiclespeed > 0 and trafficstatus != 'unknown'",
68
+ "order_by": "datetime desc",
69
+ "limit": 100,
70
+ "timezone": "Europe/Paris"
71
+ }
72
+ try:
73
+ response = requests.get(url, params=params)
74
+ response.raise_for_status()
75
+ logger.info("✅ Données récupérées avec succès depuis l'API Rennes Métropole.")
76
+ return response.json()["results"]
77
+ except Exception as e:
78
+ logger.error(f"❌ Erreur lors de la récupération des données : {e}")
79
+ raise
80
+
81
+ def _process_traffic_data(data):
82
+ """Nettoie les données sans les agréger"""
83
+ df = pd.DataFrame(data)
84
+ df["datetime"] = pd.to_datetime(df["datetime"])
85
+ df["averagevehiclespeed"] = pd.to_numeric(df["averagevehiclespeed"], errors="coerce")
86
+ df["traveltime"] = pd.to_numeric(df["traveltime"], errors="coerce")
87
+
88
+ latest_datetime = df["datetime"].max()
89
+ df_latest = df[df["datetime"] == latest_datetime]
90
+
91
+ agg_df = (
92
+ df_latest.groupby(["denomination", "datetime"], as_index=False)
93
+ .agg({
94
+ "averagevehiclespeed": "mean",
95
+ "traveltime": "mean",
96
+ "trafficstatus": "first"
97
+ })
98
+ .sort_values(by="trafficstatus", ascending=False)
99
+ .reset_index(drop=True) # <-- reset index ici
100
+ )
101
+
102
+ # Remplacer les valeurs textuelles de trafficstatus par des valeurs numériques
103
+ agg_df["trafficstatus_numeric"] = agg_df["trafficstatus"].map({"heavy": 1, "freeFlow":0, "congested":2})
104
+
105
+ # Calculer la moyenne (si tu veux l'afficher ou l'utiliser plus tard)
106
+ mean_trafficstatus = agg_df["trafficstatus_numeric"].mean()
107
+
108
+ logger.info(f"📊 Moyenne du trafficstatus (freeFlow=0,heavy=1,congested=2): {mean_trafficstatus:.2f}")
109
+
110
+ return mean_trafficstatus
111
+
112
+
113
+ def _get_traffic_data(**context):
114
+ data = _fetch_trafic_data()
115
+ traffic_value = _process_traffic_data(data)
116
+ context["task_instance"].xcom_push(key="traffic_value", value=traffic_value)
117
+ logging.info(f"Saved traffic value : {traffic_value}")
118
+
119
+ def _transform_weather_traffic_data(**context):
120
  """Transforms raw data from JSON file to ingestable data for Postgres.
121
  """
122
+ # We create a filename like: 20220601-123000_weather_traffic_data.json
123
+ filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_weather_traffic_data.json"
124
+
125
  # We get the filename from the context
126
+ filename_weather = context["task_instance"].xcom_pull(key="weather_filename")
127
  # Connect to our S3 bucket and download the JSON file
128
  s3_hook = S3Hook(aws_conn_id="aws_default")
129
  s3_path = 'datasets/input/meteo/'
130
+ returned_filename_weather = s3_hook.download_file(s3_path+filename_weather, bucket_name=Variable.get("S3BucketName"), local_path="/tmp")
131
+ with open(returned_filename_weather, "r") as f_weather:
132
+ raw_data_json_weather = json.load(f_weather)
133
+
134
+ #Weather CSV like : Paris,Clear,clear sky,2.3488,48.8534,25.51,1023,43,3.6,360
135
+ #Traffic value : Between 0 and 2
136
+
137
  transformed_data = {
138
+ "main_temp" : raw_data_json_weather["main"]["temp"],
139
+ "main_pressure" : raw_data_json_weather["main"]["pressure"],
140
+ "main_humidity" : raw_data_json_weather["main"]["humidity"],
141
+ "wind_speed" : raw_data_json_weather["wind"]["speed"],
142
+ "traffic" : context["task_instance"].xcom_pull(key="traffic_value")
 
 
 
 
 
143
  }
144
  df = pd.DataFrame(transformed_data, index=[0])
145
  # Keep the same filename between the JSON file and the CSV
146
  csv_filename = filename.split(".")[0] + ".csv"
147
  csv_filename_full_path = f"/tmp/{csv_filename}"
148
+ s3_csv_key = 'datasets/input/'+ csv_filename
149
  # Save it temporarily in /tmp folder
150
  df.to_csv(csv_filename_full_path, index=False, header=False)
151
+
152
  # Load it to S3
153
  s3_hook.load_file(filename=csv_filename_full_path, key=s3_csv_key, bucket_name=Variable.get("S3BucketName"))
154
  # Push the filename to the context so that we can use it later
155
+ context["task_instance"].xcom_push(key="input_data_csv_filename", value=s3_csv_key)
156
+
157
 
 
158
 
159
  with DAG(dag_id="quality_air_etl_dag", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
160
  start = DummyOperator(task_id="start")
161
 
162
  with TaskGroup(group_id="weather_branch") as weather_branch:
163
+
164
  fetch_weather_data = PythonOperator(task_id="fetch_weather_data", python_callable=_fetch_weather_data)
165
+
166
+ fetch_weather_data
167
+
168
+ with TaskGroup(group_id="traffic_branch") as traffic_branch:
169
+ fetch_traffic_data = PythonOperator(task_id="fetch_traffic_data", python_callable=_get_traffic_data)
170
 
171
+ fetch_traffic_data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
 
173
+ with TaskGroup(group_id="ml_branch") as ml_branch:
174
+ get_input_meteo_traffic_csv = DummyOperator(task_id="get_input_meteo_traffic_csv")
175
+ pull_run_model = DummyOperator(task_id="pull_run_model")
176
+
177
+ get_input_meteo_traffic_csv >> pull_run_model
178
+
179
+ transform_weather_traffic_data = PythonOperator(
180
+ task_id="transform_weather_traffic_data", python_callable=_transform_weather_traffic_data
181
+ )
182
+
183
+
184
 
185
  end = DummyOperator(task_id="end")
186
 
187
+ start >> [weather_branch, traffic_branch] >> transform_weather_traffic_data >> ml_branch >> end