Spaces:
Sleeping
Sleeping
update pour jouer en local et sur airflow
Browse files- airflow/dags/tasks/run_model.py +17 -11
airflow/dags/tasks/run_model.py
CHANGED
|
@@ -27,8 +27,11 @@ def run_model(**context):
|
|
| 27 |
logger.info("π‘ Launch model on data")
|
| 28 |
model_filename = "linear_model_2025_07_10_16_28_59.pkl"
|
| 29 |
|
|
|
|
|
|
|
|
|
|
| 30 |
#Je suis dans Airflow
|
| 31 |
-
if S3Hook is not None and context
|
| 32 |
s3_bucket = get_config("S3BucketName")
|
| 33 |
s3_path = 'models/'
|
| 34 |
# Connect to our S3 bucket and download the model file
|
|
@@ -39,18 +42,21 @@ def run_model(**context):
|
|
| 39 |
#input_data_model = {"Pressure": 1021, "Temperature": 22.73, "Wind Speed": 4.12, "Humidity": 59, "Traffic Status": 0.23}
|
| 40 |
print(type(input_data_model))
|
| 41 |
logger.info(f"β
input data model : {input_data_model}")
|
|
|
|
|
|
|
| 42 |
else:
|
| 43 |
#recuperer le model dans git ou ailleurs
|
| 44 |
logger.info(f"β
Model local used en local: {model_filename}")
|
| 45 |
filename_model = '../data/'+model_filename
|
| 46 |
# test the model with random values
|
| 47 |
input_data_model = {
|
| 48 |
-
"Pressure":
|
| 49 |
-
"Temperature":
|
| 50 |
-
"Wind Speed":
|
| 51 |
-
"Humidity":
|
| 52 |
-
"Traffic Status":
|
| 53 |
}
|
|
|
|
| 54 |
|
| 55 |
with open(filename_model, "rb") as f:
|
| 56 |
model = pickle.load(f)
|
|
@@ -62,20 +68,20 @@ def run_model(**context):
|
|
| 62 |
|
| 63 |
|
| 64 |
# Les noms de colonnes
|
| 65 |
-
columns = ["
|
| 66 |
|
| 67 |
# Conversion en dictionnaire
|
| 68 |
formatted_result = dict(zip(columns, clipped_predictions[0]))
|
| 69 |
-
|
| 70 |
-
|
| 71 |
-
local_path = f"/tmp/{filename}"
|
| 72 |
s3_key = f"datasets/output/{filename}"
|
| 73 |
|
|
|
|
| 74 |
with open(local_path, "w") as f:
|
| 75 |
json.dump(formatted_result, f)
|
| 76 |
|
| 77 |
# Upload du resultat sur S3
|
| 78 |
-
if S3Hook is not None:
|
| 79 |
s3_hook = S3Hook(aws_conn_id="aws_default")
|
| 80 |
s3_hook.load_file(filename=local_path, key=s3_key, bucket_name=s3_bucket, replace=True)
|
| 81 |
logger.info(f"β
Uploaded to S3: {s3_key}")
|
|
|
|
| 27 |
logger.info("π‘ Launch model on data")
|
| 28 |
model_filename = "linear_model_2025_07_10_16_28_59.pkl"
|
| 29 |
|
| 30 |
+
# Nom du fichier result de la prediction
|
| 31 |
+
filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_prediction_data.json"
|
| 32 |
+
logger.info(context)
|
| 33 |
#Je suis dans Airflow
|
| 34 |
+
if S3Hook is not None and context:
|
| 35 |
s3_bucket = get_config("S3BucketName")
|
| 36 |
s3_path = 'models/'
|
| 37 |
# Connect to our S3 bucket and download the model file
|
|
|
|
| 42 |
#input_data_model = {"Pressure": 1021, "Temperature": 22.73, "Wind Speed": 4.12, "Humidity": 59, "Traffic Status": 0.23}
|
| 43 |
print(type(input_data_model))
|
| 44 |
logger.info(f"β
input data model : {input_data_model}")
|
| 45 |
+
local_path = f"/opt/airflow/data/{filename}"
|
| 46 |
+
print(local_path)
|
| 47 |
else:
|
| 48 |
#recuperer le model dans git ou ailleurs
|
| 49 |
logger.info(f"β
Model local used en local: {model_filename}")
|
| 50 |
filename_model = '../data/'+model_filename
|
| 51 |
# test the model with random values
|
| 52 |
input_data_model = {
|
| 53 |
+
"Pressure":1020,
|
| 54 |
+
"Temperature":25,
|
| 55 |
+
"Wind Speed":3.09,
|
| 56 |
+
"Humidity":53,
|
| 57 |
+
"Traffic Status":0.44
|
| 58 |
}
|
| 59 |
+
local_path = f"../data/{filename}"
|
| 60 |
|
| 61 |
with open(filename_model, "rb") as f:
|
| 62 |
model = pickle.load(f)
|
|
|
|
| 68 |
|
| 69 |
|
| 70 |
# Les noms de colonnes
|
| 71 |
+
columns = ["NOX", "PM10", "PM25", "O3"]
|
| 72 |
|
| 73 |
# Conversion en dictionnaire
|
| 74 |
formatted_result = dict(zip(columns, clipped_predictions[0]))
|
| 75 |
+
|
| 76 |
+
# Path vers le fichier sur le s3
|
|
|
|
| 77 |
s3_key = f"datasets/output/{filename}"
|
| 78 |
|
| 79 |
+
# Conversion du fichier JSON
|
| 80 |
with open(local_path, "w") as f:
|
| 81 |
json.dump(formatted_result, f)
|
| 82 |
|
| 83 |
# Upload du resultat sur S3
|
| 84 |
+
if S3Hook is not None and context:
|
| 85 |
s3_hook = S3Hook(aws_conn_id="aws_default")
|
| 86 |
s3_hook.load_file(filename=local_path, key=s3_key, bucket_name=s3_bucket, replace=True)
|
| 87 |
logger.info(f"β
Uploaded to S3: {s3_key}")
|