Upload training_flow.py
Browse files- flows/training_flow.py +80 -0
flows/training_flow.py
ADDED
|
@@ -0,0 +1,80 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from prefect import flow, task
|
| 2 |
+
import requests
|
| 3 |
+
import sys
|
| 4 |
+
import os
|
| 5 |
+
|
| 6 |
+
# Add the project root to python path so we can import src
|
| 7 |
+
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
| 8 |
+
|
| 9 |
+
# Import your actual training logic
|
| 10 |
+
from src.train import train_all_tasks
|
| 11 |
+
|
| 12 |
+
# --- CONFIG ---
|
| 13 |
+
# ⚠️ SECURITY NOTE: Ideally, use os.getenv('DISCORD_WEBHOOK_URL')
|
| 14 |
+
# But for your project demo, this hardcoded variable works.
|
| 15 |
+
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/1450136736238796947/Bl0JBjHUUeQbiqt7f_o-76zcrnVlu1-bHHHKZS8Y4r-dORJwZVJOy-5kyB2z6pPNWoKE"
|
| 16 |
+
|
| 17 |
+
# 1. RETRY LOGIC (Satisfies Project Objective 3)
|
| 18 |
+
@task(name="Run ML Training", retries=3, retry_delay_seconds=10)
|
| 19 |
+
def run_training_script():
|
| 20 |
+
"""
|
| 21 |
+
Executes the Random Forest & Clustering training.
|
| 22 |
+
NO try/except here! We want it to fail so Prefect triggers retries.
|
| 23 |
+
"""
|
| 24 |
+
print("🔥 Orchestrator starting model training...")
|
| 25 |
+
train_all_tasks() # If this crashes, Prefect will auto-retry 3 times
|
| 26 |
+
return "Success"
|
| 27 |
+
|
| 28 |
+
@task(name="Send Notification")
|
| 29 |
+
def send_discord_alert(status: str):
|
| 30 |
+
"""Sends a notification to Discord/Slack upon completion."""
|
| 31 |
+
if not DISCORD_WEBHOOK_URL or "YOUR_DISCORD" in DISCORD_WEBHOOK_URL:
|
| 32 |
+
print(f"🔔 (Simulation) Notification: Training finished with status: {status}")
|
| 33 |
+
return
|
| 34 |
+
|
| 35 |
+
# Dynamic Message based on Status
|
| 36 |
+
color_code = 5763719 if status == "Success" else 15548997 # Green or Red
|
| 37 |
+
|
| 38 |
+
message = {
|
| 39 |
+
"embeds": [{
|
| 40 |
+
"title": f"🚨 Wildfire MLOps Pipeline: {status}",
|
| 41 |
+
"description": "The automated retraining pipeline has finished execution.",
|
| 42 |
+
"color": color_code,
|
| 43 |
+
"fields": [
|
| 44 |
+
{"name": "Status", "value": f"**{status}**", "inline": True},
|
| 45 |
+
{"name": "Retries Used", "value": "Checked via Prefect", "inline": True}
|
| 46 |
+
],
|
| 47 |
+
"footer": {"text": "Certified MLOps System"}
|
| 48 |
+
}]
|
| 49 |
+
}
|
| 50 |
+
|
| 51 |
+
try:
|
| 52 |
+
requests.post(DISCORD_WEBHOOK_URL, json=message)
|
| 53 |
+
print("✅ Discord Notification Sent!")
|
| 54 |
+
except Exception as e:
|
| 55 |
+
print(f"⚠️ Could not send notification: {e}")
|
| 56 |
+
|
| 57 |
+
@flow(name="Wildfire Model Retraining")
|
| 58 |
+
def training_flow():
|
| 59 |
+
"""The Master Flow that trains and notifies."""
|
| 60 |
+
print("⚙️ Starting Retraining Flow...")
|
| 61 |
+
|
| 62 |
+
final_status = "Failed" # Default to failed
|
| 63 |
+
|
| 64 |
+
# 2. ERROR HANDLING (Satisfies Project Objective 3)
|
| 65 |
+
try:
|
| 66 |
+
# This will retry 3 times if it crashes.
|
| 67 |
+
# If it fails the 4th time, it raises the Exception to here.
|
| 68 |
+
run_training_script()
|
| 69 |
+
final_status = "Success"
|
| 70 |
+
|
| 71 |
+
except Exception as e:
|
| 72 |
+
print(f"❌ CRITICAL: Pipeline failed after retries! Error: {e}")
|
| 73 |
+
final_status = "Failed"
|
| 74 |
+
|
| 75 |
+
# 3. NOTIFICATION (Satisfies Project Objective 3)
|
| 76 |
+
# We run this regardless of success or failure
|
| 77 |
+
send_discord_alert(final_status)
|
| 78 |
+
|
| 79 |
+
if __name__ == "__main__":
|
| 80 |
+
training_flow()
|