arthurcornelio88 commited on
Commit
0f6c39b
·
1 Parent(s): 782d145

refactor: wait for variables sleep 10s and get-variable function

Browse files
Files changed (2) hide show
  1. dags/ml-training.py +19 -17
  2. start.sh +2 -1
dags/ml-training.py CHANGED
@@ -11,30 +11,32 @@ import time
11
 
12
  import time
13
 
14
- def get_variable(var_name, default=None):
15
  """
16
- Retrieves an Airflow variable safely. If the variable does not exist, it logs a warning and returns a default value.
17
-
18
- Args:
19
- var_name (str): The name of the variable.
20
- default (any): Default value if the variable is not found.
21
-
22
- Returns:
23
- str: The variable value or default if not found.
24
  """
25
- try:
26
- value = Variable.get(var_name)
27
- print(f"✅ DEBUG: Retrieved {var_name} = {value}") # Debugging log
28
- return value
29
- except KeyError:
30
- print(f"⚠️ WARNING: {var_name} not found. Using default: {default}") # Debug warning
31
- return default # Instead of retrying infinitely, just use the default
 
 
 
 
 
 
 
 
32
 
33
  # Fetch Airflow Variables
34
  GCP_PROJECT_ID = get_variable("GCP_PROJECT_ID")
35
  GCP_ZONE = get_variable("GCP_ZONE")
36
- GCE_INSTANCE_NAME = f"training-instance-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
37
  SERVICE_ACCOUNT_EMAIL = get_variable("SERVICE_ACCOUNT_EMAIL")
 
38
 
39
  # Debug Log
40
  print("\n===== ✅ ENVIRONMENT CONFIGURATION CHECK (Inside DAG) =====")
 
11
 
12
  import time
13
 
14
+ def get_variable(var_name, default=None, retries=5, delay=5):
15
  """
16
+ Retrieves an Airflow variable safely with retries.
17
+ If the variable does not exist or is empty, it waits and retries.
 
 
 
 
 
 
18
  """
19
+ for attempt in range(retries):
20
+ try:
21
+ value = Variable.get(var_name)
22
+ if value.strip(): # Check if not empty
23
+ print(f"✅ DEBUG: Retrieved {var_name} = {value}")
24
+ return value
25
+ else:
26
+ print(f"⚠️ WARNING: {var_name} is set but EMPTY in Airflow! Retrying in {delay}s...")
27
+ except KeyError:
28
+ print(f"⚠️ WARNING: {var_name} not found. Retrying in {delay}s...")
29
+
30
+ time.sleep(delay) # Wait before retrying
31
+
32
+ print(f"❌ ERROR: {var_name} could not be retrieved after {retries} retries!")
33
+ return default
34
 
35
  # Fetch Airflow Variables
36
  GCP_PROJECT_ID = get_variable("GCP_PROJECT_ID")
37
  GCP_ZONE = get_variable("GCP_ZONE")
 
38
  SERVICE_ACCOUNT_EMAIL = get_variable("SERVICE_ACCOUNT_EMAIL")
39
+ GCE_INSTANCE_NAME = f"training-instance-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
40
 
41
  # Debug Log
42
  print("\n===== ✅ ENVIRONMENT CONFIGURATION CHECK (Inside DAG) =====")
start.sh CHANGED
@@ -78,7 +78,6 @@ fi
78
  export SERVICE_ACCOUNT_EMAIL
79
  log "✅ Retrieved SERVICE_ACCOUNT_EMAIL: $SERVICE_ACCOUNT_EMAIL"
80
 
81
-
82
  ### 🔹 Enable Required GCP APIs
83
  log "🔹 Enabling required GCP APIs..."
84
  GCP_APIS=(
@@ -112,8 +111,10 @@ log "✅ Airflow variables set."
112
 
113
  ### 🚀 Start Airflow Services
114
  log "🚀 Starting Airflow services..."
 
115
  airflow scheduler > /dev/null 2>&1 &
116
  airflow webserver > /dev/null 2>&1 &
 
117
 
118
  log "✅ Airflow services started."
119
  wait
 
78
  export SERVICE_ACCOUNT_EMAIL
79
  log "✅ Retrieved SERVICE_ACCOUNT_EMAIL: $SERVICE_ACCOUNT_EMAIL"
80
 
 
81
  ### 🔹 Enable Required GCP APIs
82
  log "🔹 Enabling required GCP APIs..."
83
  GCP_APIS=(
 
111
 
112
  ### 🚀 Start Airflow Services
113
  log "🚀 Starting Airflow services..."
114
+ sleep 10 # Ensure variables are set before DAG execution
115
  airflow scheduler > /dev/null 2>&1 &
116
  airflow webserver > /dev/null 2>&1 &
117
+ log "✅ Airflow services started."
118
 
119
  log "✅ Airflow services started."
120
  wait