p2ov commited on
Commit
6b90ff6
·
1 Parent(s): 21a79b8

split en tache unitaires de etl_main.py

Browse files
airflow/dags/etl_main.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import json
3
+ import pandas as pd
4
+ from datetime import datetime
5
+ from airflow import DAG
6
+ from airflow.operators.dummy_operator import DummyOperator
7
+ from airflow.operators.python_operator import PythonOperator
8
+ from airflow.utils.task_group import TaskGroup
9
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
10
+
11
+ # Import des fonctions de tâches depuis tasks/
12
+ from tasks.fetch_weather_data import fetch_weather_data
13
+ from tasks.fetch_traffic_data import fetch_traffic_data
14
+ from tasks.run_model import run_model
15
+ from tasks.config import get_config
16
+
17
+ # Import des fonctions de tâches depuis tasks/
18
+ from tasks.fetch_weather_data import fetch_weather_data
19
+
20
+ default_args = {
21
+ "owner": "airflow",
22
+ "start_date": datetime(2022, 6, 1)
23
+ }
24
+
25
+ logger = logging.getLogger(__name__)
26
+ logging.basicConfig(level=logging.INFO)
27
+
28
+
29
+ def merge_weather_traffic_data(**context):
30
+ logger.info("🧪 Fusion des données météo + trafic...")
31
+
32
+ # Connect to our S3 bucket and download the JSON file
33
+ s3_hook = S3Hook(aws_conn_id="aws_default")
34
+ s3_bucket = get_config("S3BucketName")
35
+
36
+ #Input Weather values and Traffic status value
37
+ filename_weather = context["task_instance"].xcom_pull(key="weather_filename")
38
+ traffic_value = context["task_instance"].xcom_pull(key="traffic_value")
39
+
40
+ logger.info(f"filename_weather : {filename_weather}")
41
+ logger.info(f"traffic_value : {traffic_value}")
42
+
43
+ # Result JSON file for model as input
44
+ filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_weather_traffic_data.json"
45
+
46
+
47
+ local_weather_path = f"/tmp/{filename_weather}"
48
+ result_local_path = f"/tmp/{filename}"
49
+
50
+ with open(local_weather_path, "r") as f:
51
+ weather_data = json.load(f)
52
+
53
+ transformed = {"Pressure": weather_data["main"]["pressure"],"Temperature": weather_data["main"]["temp"],"Wind Speed": weather_data["wind"]["speed"],"Humidity": weather_data["main"]["humidity"],"Traffic Status": traffic_value}
54
+
55
+ df = pd.DataFrame([transformed])
56
+ df.to_json(result_local_path, orient="records", indent=4)
57
+
58
+ #S3 path in the bucket
59
+ s3_key = f"datasets/input/{filename}"
60
+
61
+ # Load it to S3
62
+ s3_hook.load_file(filename=result_local_path, key=s3_key, bucket_name=s3_bucket)
63
+
64
+ context["task_instance"].xcom_push(key="input_data_model", value=transformed)
65
+ logger.info(f"✅ Données transformées et sauvegardées : {s3_bucket}/{s3_key}")
66
+
67
+
68
+ with DAG(
69
+ dag_id="etl_main",
70
+ default_args=default_args,
71
+ schedule_interval="@hourly",
72
+ catchup=False,
73
+ ) as dag:
74
+
75
+ start = DummyOperator(task_id="start")
76
+
77
+ with TaskGroup(group_id="weather_branch") as weather_branch:
78
+ fetch_weather_task = PythonOperator(
79
+ task_id="fetch_weather_data",
80
+ python_callable=fetch_weather_data,
81
+ provide_context=True,
82
+ )
83
+
84
+ with TaskGroup(group_id="traffic_branch") as traffic_branch:
85
+ fetch_traffic_task = PythonOperator(
86
+ task_id="fetch_traffic_data",
87
+ python_callable=fetch_traffic_data,
88
+ provide_context=True,
89
+ )
90
+
91
+ merge_data_task = PythonOperator(
92
+ task_id="merge_weather_traffic_data",
93
+ python_callable=merge_weather_traffic_data,
94
+ provide_context=True,
95
+ )
96
+
97
+ run_model = PythonOperator(
98
+ task_id="run_model",
99
+ python_callable=run_model,
100
+ provide_context=True,
101
+ )
102
+
103
+ end = DummyOperator(task_id="end")
104
+
105
+ start >> [weather_branch, traffic_branch] >> merge_data_task >> run_model >> end
airflow/dags/tasks/.env ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ OpenWeatherApiKey=e96e90d4363a4f9e15e4c81d08ac6ed6
2
+ S3BucketName=jedha-quality-air
airflow/dags/tasks/__init__.py ADDED
File without changes
airflow/dags/tasks/config.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ def get_config(key: str) -> str:
4
+ # Priorité à l'environnement (pour les tests locaux avec .env)
5
+ val = os.getenv(key)
6
+ if val:
7
+ return val
8
+
9
+ # Sinon, on tente Airflow (en prod uniquement)
10
+ try:
11
+ from airflow.models import Variable
12
+ return Variable.get(key)
13
+ except Exception as e:
14
+ raise RuntimeError(f"❌ Impossible de récupérer la variable '{key}': {e}")
airflow/dags/tasks/fetch_traffic_data.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import pandas as pd
3
+ import requests
4
+
5
+ logger = logging.getLogger(__name__)
6
+ logging.basicConfig(level=logging.INFO)
7
+
8
+ def _fetch_trafic_data():
9
+ url = "https://data.rennesmetropole.fr/api/explore/v2.1/catalog/datasets/etat-du-trafic-en-temps-reel/records"
10
+ params = {
11
+ "select": "datetime,denomination,averagevehiclespeed,traveltime,trafficstatus",
12
+ "where": "averagevehiclespeed > 0 and trafficstatus != 'unknown'",
13
+ "order_by": "datetime desc",
14
+ "limit": 100,
15
+ "timezone": "Europe/Paris"
16
+ }
17
+ response = requests.get(url, params=params)
18
+ response.raise_for_status()
19
+ logger.info("🚗 Données trafic récupérées avec succès.")
20
+ return response.json()["results"]
21
+
22
+ def _process_traffic_data(data):
23
+ df = pd.DataFrame(data)
24
+ df["datetime"] = pd.to_datetime(df["datetime"])
25
+ df["averagevehiclespeed"] = pd.to_numeric(df["averagevehiclespeed"], errors="coerce")
26
+ df["traveltime"] = pd.to_numeric(df["traveltime"], errors="coerce")
27
+
28
+ latest_datetime = df["datetime"].max()
29
+ df_latest = df[df["datetime"] == latest_datetime]
30
+
31
+ agg_df = (
32
+ df_latest.groupby(["denomination", "datetime"], as_index=False)
33
+ .agg({
34
+ "averagevehiclespeed": "mean",
35
+ "traveltime": "mean",
36
+ "trafficstatus": "first"
37
+ })
38
+ .sort_values(by="trafficstatus", ascending=False)
39
+ .reset_index(drop=True)
40
+ )
41
+
42
+ agg_df["trafficstatus_numeric"] = agg_df["trafficstatus"].map({
43
+ "freeFlow": 0,
44
+ "heavy": 1,
45
+ "congested": 2
46
+ })
47
+
48
+ #mean_status = agg_df["trafficstatus_numeric"].mean()
49
+ mean_status = round(agg_df["trafficstatus_numeric"].mean(), 2)
50
+ logger.info(f"📊 Moyenne du trafficstatus : {mean_status}")
51
+ return mean_status
52
+
53
+ def fetch_traffic_data(**context):
54
+ data = _fetch_trafic_data()
55
+ traffic_value = _process_traffic_data(data)
56
+ if context and "task_instance" in context:
57
+ context["task_instance"].xcom_push(key="traffic_value", value=traffic_value)
58
+ logger.info(f"✅ Traffic value pushed: {traffic_value}")
59
+ return traffic_value
60
+
61
+
62
+
63
+ # ✅ Fonction main pour tester localement
64
+ def main():
65
+ print("▶️ Test local : recuperation du taux de traffic entre 0 et 2, 0 aucun trafic, 2 congestionné")
66
+ traffic_value = fetch_traffic_data()
67
+ print(f"✅ Moyenne récupérée : {traffic_value}")
68
+
69
+ if __name__ == "__main__":
70
+ main()
airflow/dags/tasks/fetch_weather_data.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import logging
4
+ import requests
5
+ from datetime import datetime
6
+ from dotenv import load_dotenv
7
+ load_dotenv() # charge automatiquement le fichier .env dans os.environ
8
+
9
+ # Charger la configuration (depuis Airflow ou .env)
10
+ from tasks.config import get_config
11
+
12
+
13
+ #print("✅ .env chargé, clé API :", os.getenv("OpenWeatherApiKey"))
14
+
15
+
16
+ # Utilise le S3Hook uniquement si disponible (dans Airflow)
17
+ try:
18
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
19
+ except ImportError:
20
+ S3Hook = None
21
+
22
+ logger = logging.getLogger(__name__)
23
+ logging.basicConfig(level=logging.INFO)
24
+
25
+
26
+ def fetch_weather_data(**context):
27
+ logger.info("📡 Fetching weather data from OpenWeatherMap")
28
+
29
+ api_key = get_config("OpenWeatherApiKey")
30
+ s3_bucket = get_config("S3BucketName")
31
+
32
+ url = f"https://api.openweathermap.org/data/2.5/weather?q=Paris&appid={api_key}&units=metric"
33
+ response = requests.get(url)
34
+ response.raise_for_status()
35
+
36
+ filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_openweather_data.json"
37
+ local_path = f"/tmp/{filename}"
38
+ s3_key = f"datasets/input/meteo/{filename}"
39
+
40
+ with open(local_path, "w") as f:
41
+ json.dump(response.json(), f)
42
+
43
+ if S3Hook is not None:
44
+ s3_hook = S3Hook(aws_conn_id="aws_default")
45
+ s3_hook.load_file(filename=local_path, key=s3_key, bucket_name=s3_bucket, replace=True)
46
+ if context and "task_instance" in context:
47
+ context["task_instance"].xcom_push(key="weather_filename", value=filename)
48
+ logger.info(f"✅ Uploaded to S3: {s3_key}")
49
+ else:
50
+ logger.info("ℹ️ S3Hook non disponible (hors Airflow). Fichier seulement écrit en local.")
51
+
52
+ return filename
53
+
54
+
55
+ # ✅ Fonction main pour tester localement
56
+ def main():
57
+ print("▶️ Test local : récupération météo")
58
+ filename = fetch_weather_data()
59
+ print(f"✅ Fichier météo généré : /tmp/{filename}")
60
+
61
+
62
+ if __name__ == "__main__":
63
+ main()
airflow/dags/tasks/run_model.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import logging
3
+ import json
4
+ import pandas as pd
5
+ import requests
6
+ from datetime import datetime
7
+ import pickle
8
+ from sklearn.linear_model import LinearRegression
9
+ from dotenv import load_dotenv
10
+ load_dotenv() # charge automatiquement le fichier .env dans os.environ
11
+
12
+ # Charger la configuration (depuis Airflow ou .env)
13
+ from tasks.config import get_config
14
+
15
+
16
+ # Utilise le S3Hook uniquement si disponible (dans Airflow)
17
+ try:
18
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
19
+ except ImportError:
20
+ S3Hook = None
21
+
22
+ logger = logging.getLogger(__name__)
23
+ logging.basicConfig(level=logging.INFO)
24
+
25
+
26
+ def run_model(**context):
27
+ logger.info("📡 Launch model on data")
28
+ model_filename = "linear_model_2025_07_10_16_28_59.pkl"
29
+
30
+ #Je suis dans Airflow
31
+ if S3Hook is not None and context is not None:
32
+ s3_bucket = get_config("S3BucketName")
33
+ s3_path = 'models/'
34
+ # Connect to our S3 bucket and download the model file
35
+ s3_hook = S3Hook(aws_conn_id="aws_default")
36
+ filename_model = s3_hook.download_file(s3_path+model_filename, bucket_name=s3_bucket, local_path="/tmp")
37
+ #Input Weather values and Traffic status value
38
+ input_data_model = context["task_instance"].xcom_pull(key="input_data_model")
39
+ #input_data_model = {"Pressure": 1021, "Temperature": 22.73, "Wind Speed": 4.12, "Humidity": 59, "Traffic Status": 0.23}
40
+ print(type(input_data_model))
41
+ logger.info(f"✅ input data model : {input_data_model}")
42
+ else:
43
+ #recuperer le model dans git ou ailleurs
44
+ logger.info(f"✅ Model local used en local: {model_filename}")
45
+ filename_model = '../data/'+model_filename
46
+ # test the model with random values
47
+ input_data_model = {
48
+ "Pressure": 999,
49
+ "Temperature": 22,
50
+ "Wind Speed": 10,
51
+ "Humidity": 50,
52
+ "Traffic Status": 0,
53
+ }
54
+
55
+ with open(filename_model, "rb") as f:
56
+ model = pickle.load(f)
57
+
58
+ raw_predictions = model.predict(pd.DataFrame([input_data_model]))
59
+
60
+ # clip the predictions to be above 0
61
+ clipped_predictions = raw_predictions.clip(0, 1e6).tolist()
62
+
63
+ logger.info(f"✅ Clipped Predictions : {clipped_predictions}")
64
+ return clipped_predictions
65
+
66
+
67
+ # ✅ Fonction main pour tester localement (verifier que le fichier model est présent)
68
+ def main():
69
+ print("▶️ Test local du modele de prediction")
70
+ result = run_model()
71
+ print(f"✅ Clipped Predictions : {result}")
72
+
73
+ if __name__ == "__main__":
74
+ main()
airflow/requirements.txt CHANGED
@@ -1 +1,7 @@
1
- pandas
 
 
 
 
 
 
 
1
+ apache-airflow-providers-postgres
2
+ apache-airflow-providers-amazon
3
+ scikit-learn
4
+ psycopg[binary]
5
+ pandas
6
+ python-dotenv
7
+ pytest