Abeshith commited on
Commit
424c660
·
1 Parent(s): 4545019

Airflow Implemented

Browse files
airflow/airflow.cfg ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Minimal Airflow configuration for local development
2
+
3
+ [core]
4
+ dags_folder = d:\AutoML MLOps PipeLine\airflow\dags
5
+ base_log_folder = d:\AutoML MLOps PipeLine\airflow\logs
6
+ executor = LocalExecutor
7
+ load_examples = False
8
+ parallelism = 2
9
+
10
+ [webserver]
11
+ web_server_port = 8080
12
+ reload_on_plugin_change = True
13
+
14
+ [scheduler]
15
+ catchup_by_default = False
16
+
17
+ [api]
18
+ auth_backend = airflow.api.auth.backend.basic_auth
airflow/dags/ml_training_pipeline_dag.py CHANGED
@@ -5,26 +5,19 @@ import sys
5
  from pathlib import Path
6
 
7
  # Add project root to path
8
- project_root = Path(__file__).parent.parent.parent
9
- sys.path.insert(0, str(project_root))
10
 
 
11
  from mlpipeline.logging.logger import get_logger
12
- from mlpipeline.stages.data_ingestion_pipeline import DataIngestionPipeline
13
- from mlpipeline.stages.data_validation_pipeline import DataValidationPipeline
14
- from mlpipeline.stages.data_transformation_pipeline import DataTransformationPipeline
15
- from mlpipeline.stages.feature_engineering_pipeline import FeatureEngineeringPipeline
16
- from mlpipeline.stages.model_trainer_pipeline import ModelTrainerPipeline
17
- from mlpipeline.stages.model_evaluation_pipeline import ModelEvaluationPipeline
18
- from mlpipeline.stages.model_pusher_pipeline import ModelPusherPipeline
19
 
20
  logger = get_logger(__name__)
21
 
 
22
  default_args = {
23
  'owner': 'automl-mlops',
24
  'depends_on_past': False,
25
  'start_date': datetime(2026, 1, 1),
26
  'email_on_failure': False,
27
- 'email_on_retry': False,
28
  'retries': 1,
29
  'retry_delay': timedelta(minutes=5)
30
  }
@@ -38,106 +31,16 @@ dag = DAG(
38
  tags=['automl', 'mlops', 'mlflow']
39
  )
40
 
 
 
 
 
 
 
41
 
42
- def run_data_ingestion():
43
- logger.info("Stage 1: Starting Data Ingestion")
44
- pipeline = DataIngestionPipeline()
45
- pipeline.run()
46
- logger.info("Stage 1: Data Ingestion completed")
47
-
48
-
49
- def run_data_validation():
50
- logger.info("Stage 2: Starting Data Validation")
51
- pipeline = DataValidationPipeline()
52
- pipeline.run()
53
- logger.info("Stage 2: Data Validation completed")
54
-
55
-
56
- def run_data_transformation():
57
- logger.info("Stage 3: Starting Data Transformation")
58
- pipeline = DataTransformationPipeline()
59
- pipeline.run()
60
- logger.info("Stage 3: Data Transformation completed")
61
-
62
-
63
- def run_feature_engineering():
64
- logger.info("Stage 4: Starting Feature Engineering")
65
- pipeline = FeatureEngineeringPipeline()
66
- pipeline.run()
67
- logger.info("Stage 4: Feature Engineering completed")
68
-
69
-
70
- def run_model_trainer():
71
- logger.info("Stage 5: Starting Model Training")
72
- pipeline = ModelTrainerPipeline()
73
- pipeline.run()
74
- logger.info("Stage 5: Model Training completed")
75
-
76
-
77
- def run_model_evaluation():
78
- logger.info("Stage 6: Starting Model Evaluation")
79
- pipeline = ModelEvaluationPipeline()
80
- pipeline.run()
81
- logger.info("Stage 6: Model Evaluation completed")
82
-
83
-
84
- def run_model_pusher():
85
- logger.info("Stage 7: Starting Model Pusher")
86
- pipeline = ModelPusherPipeline()
87
- pipeline.run()
88
- logger.info("Stage 7: Model Pusher completed")
89
-
90
-
91
- # Define tasks
92
- data_ingestion_task = PythonOperator(
93
- task_id='data_ingestion',
94
- python_callable=run_data_ingestion,
95
- dag=dag
96
- )
97
-
98
- data_validation_task = PythonOperator(
99
- task_id='data_validation',
100
- python_callable=run_data_validation,
101
  dag=dag
102
  )
103
-
104
- data_transformation_task = PythonOperator(
105
- task_id='data_transformation',
106
- python_callable=run_data_transformation,
107
- dag=dag
108
- )
109
-
110
- feature_engineering_task = PythonOperator(
111
- task_id='feature_engineering',
112
- python_callable=run_feature_engineering,
113
- dag=dag
114
- )
115
-
116
- model_trainer_task = PythonOperator(
117
- task_id='model_trainer',
118
- python_callable=run_model_trainer,
119
- dag=dag
120
- )
121
-
122
- model_evaluation_task = PythonOperator(
123
- task_id='model_evaluation',
124
- python_callable=run_model_evaluation,
125
- dag=dag
126
- )
127
-
128
- model_pusher_task = PythonOperator(
129
- task_id='model_pusher',
130
- python_callable=run_model_pusher,
131
- dag=dag
132
- )
133
-
134
- # Define pipeline dependencies
135
- (
136
- data_ingestion_task
137
- >> data_validation_task
138
- >> data_transformation_task
139
- >> feature_engineering_task
140
- >> model_trainer_task
141
- >> model_evaluation_task
142
- >> model_pusher_task
143
- )
 
5
  from pathlib import Path
6
 
7
  # Add project root to path
8
+ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
9
 
10
+ from mlpipeline.pipeline.training_pipeline import TrainingPipeline
11
  from mlpipeline.logging.logger import get_logger
 
 
 
 
 
 
 
12
 
13
  logger = get_logger(__name__)
14
 
15
+ # DAG configuration
16
  default_args = {
17
  'owner': 'automl-mlops',
18
  'depends_on_past': False,
19
  'start_date': datetime(2026, 1, 1),
20
  'email_on_failure': False,
 
21
  'retries': 1,
22
  'retry_delay': timedelta(minutes=5)
23
  }
 
31
  tags=['automl', 'mlops', 'mlflow']
32
  )
33
 
34
+ # Single function to run entire pipeline
35
+ def run_training_pipeline():
36
+ logger.info("Starting AutoML Training Pipeline")
37
+ pipeline = TrainingPipeline()
38
+ pipeline.run_pipeline()
39
+ logger.info("Training Pipeline completed")
40
 
41
+ # Single task for entire pipeline
42
+ training_task = PythonOperator(
43
+ task_id='automl_training_pipeline',
44
+ python_callable=run_training_pipeline,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  dag=dag
46
  )