| from prefect import flow, task
|
| import requests
|
| import sys
|
| import os
|
|
|
|
|
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
|
| from src.train import train_all_tasks
|
|
|
|
|
|
|
|
|
| DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/1450136736238796947/Bl0JBjHUUeQbiqt7f_o-76zcrnVlu1-bHHHKZS8Y4r-dORJwZVJOy-5kyB2z6pPNWoKE"
|
|
|
|
|
| @task(name="Run ML Training", retries=3, retry_delay_seconds=10)
|
| def run_training_script():
|
| """
|
| Executes the Random Forest & Clustering training.
|
| NO try/except here! We want it to fail so Prefect triggers retries.
|
| """
|
| print("π₯ Orchestrator starting model training...")
|
| train_all_tasks()
|
| return "Success"
|
|
|
| @task(name="Send Notification")
|
| def send_discord_alert(status: str):
|
| """Sends a notification to Discord/Slack upon completion."""
|
| if not DISCORD_WEBHOOK_URL or "YOUR_DISCORD" in DISCORD_WEBHOOK_URL:
|
| print(f"π (Simulation) Notification: Training finished with status: {status}")
|
| return
|
|
|
|
|
| color_code = 5763719 if status == "Success" else 15548997
|
|
|
| message = {
|
| "embeds": [{
|
| "title": f"π¨ Wildfire MLOps Pipeline: {status}",
|
| "description": "The automated retraining pipeline has finished execution.",
|
| "color": color_code,
|
| "fields": [
|
| {"name": "Status", "value": f"**{status}**", "inline": True},
|
| {"name": "Retries Used", "value": "Checked via Prefect", "inline": True}
|
| ],
|
| "footer": {"text": "Certified MLOps System"}
|
| }]
|
| }
|
|
|
| try:
|
| requests.post(DISCORD_WEBHOOK_URL, json=message)
|
| print("β
Discord Notification Sent!")
|
| except Exception as e:
|
| print(f"β οΈ Could not send notification: {e}")
|
|
|
| @flow(name="Wildfire Model Retraining")
|
| def training_flow():
|
| """The Master Flow that trains and notifies."""
|
| print("βοΈ Starting Retraining Flow...")
|
|
|
| final_status = "Failed"
|
|
|
|
|
| try:
|
|
|
|
|
| run_training_script()
|
| final_status = "Success"
|
|
|
| except Exception as e:
|
| print(f"β CRITICAL: Pipeline failed after retries! Error: {e}")
|
| final_status = "Failed"
|
|
|
|
|
|
|
| send_discord_alert(final_status)
|
|
|
| if __name__ == "__main__":
|
| training_flow() |