p2ov commited on
Commit
81b8f34
·
1 Parent(s): 6b90ff6

ajout du upload des predictions sur s3

Browse files
Files changed (1) hide show
  1. airflow/dags/tasks/run_model.py +24 -2
airflow/dags/tasks/run_model.py CHANGED
@@ -60,8 +60,30 @@ def run_model(**context):
60
  # clip the predictions to be above 0
61
  clipped_predictions = raw_predictions.clip(0, 1e6).tolist()
62
 
63
- logger.info(f"✅ Clipped Predictions : {clipped_predictions}")
64
- return clipped_predictions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
66
 
67
  # ✅ Fonction main pour tester localement (verifier que le fichier model est présent)
 
60
  # clip the predictions to be above 0
61
  clipped_predictions = raw_predictions.clip(0, 1e6).tolist()
62
 
63
+
64
+ # Les noms de colonnes
65
+ columns = ["O3", "NOx", "PM10", "PM25"]
66
+
67
+ # Conversion en dictionnaire
68
+ formatted_result = dict(zip(columns, clipped_predictions[0]))
69
+ # Conversion en JSON
70
+ filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_prediction_data.json"
71
+ local_path = f"/tmp/{filename}"
72
+ s3_key = f"datasets/output/{filename}"
73
+
74
+ with open(local_path, "w") as f:
75
+ json.dump(formatted_result, f)
76
+
77
+ # Upload du resultat sur S3
78
+ if S3Hook is not None:
79
+ s3_hook = S3Hook(aws_conn_id="aws_default")
80
+ s3_hook.load_file(filename=local_path, key=s3_key, bucket_name=s3_bucket, replace=True)
81
+ logger.info(f"✅ Uploaded to S3: {s3_key}")
82
+ else:
83
+ logger.info("ℹ️ S3Hook non disponible (hors Airflow). Fichier seulement écrit en local.")
84
+
85
+ logger.info(f"✅ Clipped Predictions : {filename}")
86
+ return filename
87
 
88
 
89
  # ✅ Fonction main pour tester localement (verifier que le fichier model est présent)