Abeshith commited on
Commit
4545019
·
1 Parent(s): b53ee19

Add Airflow DAG for training pipeline orchestration

Browse files
airflow/dags/ml_training_pipeline_dag.py ADDED
@@ -0,0 +1,143 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime, timedelta
2
+ from airflow import DAG
3
+ from airflow.operators.python import PythonOperator
4
+ import sys
5
+ from pathlib import Path
6
+
7
+ # Add project root to path
8
+ project_root = Path(__file__).parent.parent.parent
9
+ sys.path.insert(0, str(project_root))
10
+
11
+ from mlpipeline.logging.logger import get_logger
12
+ from mlpipeline.stages.data_ingestion_pipeline import DataIngestionPipeline
13
+ from mlpipeline.stages.data_validation_pipeline import DataValidationPipeline
14
+ from mlpipeline.stages.data_transformation_pipeline import DataTransformationPipeline
15
+ from mlpipeline.stages.feature_engineering_pipeline import FeatureEngineeringPipeline
16
+ from mlpipeline.stages.model_trainer_pipeline import ModelTrainerPipeline
17
+ from mlpipeline.stages.model_evaluation_pipeline import ModelEvaluationPipeline
18
+ from mlpipeline.stages.model_pusher_pipeline import ModelPusherPipeline
19
+
20
+ logger = get_logger(__name__)
21
+
22
+ default_args = {
23
+ 'owner': 'automl-mlops',
24
+ 'depends_on_past': False,
25
+ 'start_date': datetime(2026, 1, 1),
26
+ 'email_on_failure': False,
27
+ 'email_on_retry': False,
28
+ 'retries': 1,
29
+ 'retry_delay': timedelta(minutes=5)
30
+ }
31
+
32
+ dag = DAG(
33
+ 'automl_training_pipeline',
34
+ default_args=default_args,
35
+ description='AutoML Training Pipeline with MLflow Tracking',
36
+ schedule_interval='@daily',
37
+ catchup=False,
38
+ tags=['automl', 'mlops', 'mlflow']
39
+ )
40
+
41
+
42
+ def run_data_ingestion():
43
+ logger.info("Stage 1: Starting Data Ingestion")
44
+ pipeline = DataIngestionPipeline()
45
+ pipeline.run()
46
+ logger.info("Stage 1: Data Ingestion completed")
47
+
48
+
49
+ def run_data_validation():
50
+ logger.info("Stage 2: Starting Data Validation")
51
+ pipeline = DataValidationPipeline()
52
+ pipeline.run()
53
+ logger.info("Stage 2: Data Validation completed")
54
+
55
+
56
+ def run_data_transformation():
57
+ logger.info("Stage 3: Starting Data Transformation")
58
+ pipeline = DataTransformationPipeline()
59
+ pipeline.run()
60
+ logger.info("Stage 3: Data Transformation completed")
61
+
62
+
63
+ def run_feature_engineering():
64
+ logger.info("Stage 4: Starting Feature Engineering")
65
+ pipeline = FeatureEngineeringPipeline()
66
+ pipeline.run()
67
+ logger.info("Stage 4: Feature Engineering completed")
68
+
69
+
70
+ def run_model_trainer():
71
+ logger.info("Stage 5: Starting Model Training")
72
+ pipeline = ModelTrainerPipeline()
73
+ pipeline.run()
74
+ logger.info("Stage 5: Model Training completed")
75
+
76
+
77
+ def run_model_evaluation():
78
+ logger.info("Stage 6: Starting Model Evaluation")
79
+ pipeline = ModelEvaluationPipeline()
80
+ pipeline.run()
81
+ logger.info("Stage 6: Model Evaluation completed")
82
+
83
+
84
+ def run_model_pusher():
85
+ logger.info("Stage 7: Starting Model Pusher")
86
+ pipeline = ModelPusherPipeline()
87
+ pipeline.run()
88
+ logger.info("Stage 7: Model Pusher completed")
89
+
90
+
91
+ # Define tasks
92
+ data_ingestion_task = PythonOperator(
93
+ task_id='data_ingestion',
94
+ python_callable=run_data_ingestion,
95
+ dag=dag
96
+ )
97
+
98
+ data_validation_task = PythonOperator(
99
+ task_id='data_validation',
100
+ python_callable=run_data_validation,
101
+ dag=dag
102
+ )
103
+
104
+ data_transformation_task = PythonOperator(
105
+ task_id='data_transformation',
106
+ python_callable=run_data_transformation,
107
+ dag=dag
108
+ )
109
+
110
+ feature_engineering_task = PythonOperator(
111
+ task_id='feature_engineering',
112
+ python_callable=run_feature_engineering,
113
+ dag=dag
114
+ )
115
+
116
+ model_trainer_task = PythonOperator(
117
+ task_id='model_trainer',
118
+ python_callable=run_model_trainer,
119
+ dag=dag
120
+ )
121
+
122
+ model_evaluation_task = PythonOperator(
123
+ task_id='model_evaluation',
124
+ python_callable=run_model_evaluation,
125
+ dag=dag
126
+ )
127
+
128
+ model_pusher_task = PythonOperator(
129
+ task_id='model_pusher',
130
+ python_callable=run_model_pusher,
131
+ dag=dag
132
+ )
133
+
134
+ # Define pipeline dependencies
135
+ (
136
+ data_ingestion_task
137
+ >> data_validation_task
138
+ >> data_transformation_task
139
+ >> feature_engineering_task
140
+ >> model_trainer_task
141
+ >> model_evaluation_task
142
+ >> model_pusher_task
143
+ )
requirements.txt CHANGED
@@ -15,6 +15,7 @@ pycaret
15
 
16
  mlflow
17
  dvc
 
18
 
19
  evidently
20
 
 
15
 
16
  mlflow
17
  dvc
18
+ apache-airflow
19
 
20
  evidently
21