streamlit_app / airflow /dags /tasks /fetch_weather_data.py
Alquilar78's picture
Cleaning dossier
b7ff5e0
import os
import json
import logging
import requests
from datetime import datetime
from dotenv import load_dotenv
load_dotenv() # charge automatiquement le fichier .env dans os.environ
# Charger la configuration (depuis Airflow ou .env)
from tasks.config import get_config
#print("✅ .env chargé, clé API :", os.getenv("OpenWeatherApiKey"))
# Utilise le S3Hook uniquement si disponible (dans Airflow)
try:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
except ImportError:
S3Hook = None
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def fetch_weather_data(**context):
logger.info("📡 Fetching weather data from OpenWeatherMap")
api_key = get_config("OpenWeatherApiKey")
s3_bucket = get_config("S3BucketName")
url = f"https://api.openweathermap.org/data/2.5/weather?q=Paris&appid={api_key}&units=metric"
response = requests.get(url)
response.raise_for_status()
filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_openweather_data.json"
if context:
local_path = f"/opt/airflow/data/{filename}"
else:
local_path = f"../data/{filename}"
s3_key = f"datasets/input/meteo/{filename}"
with open(local_path, "w") as f:
json.dump(response.json(), f)
if S3Hook is not None:
s3_hook = S3Hook(aws_conn_id="aws_default")
s3_hook.load_file(filename=local_path, key=s3_key, bucket_name=s3_bucket, replace=True)
if context and "task_instance" in context:
context["task_instance"].xcom_push(key="weather_filename", value=filename)
logger.info(f"✅ Uploaded to S3: {s3_key}")
else:
logger.info("ℹ️ S3Hook non disponible (hors Airflow). Fichier seulement écrit en local.")
return filename
# ✅ Fonction main pour tester localement
def main():
print("▶️ Test local : récupération météo")
filename = fetch_weather_data()
print(f"✅ Fichier météo généré : {filename}")
if __name__ == "__main__":
main()