griffingoodwin04 commited on
Commit
c14af1c
·
1 Parent(s): 0f9cb89

made data pipeline for data processing in process_data_pipeline.py

Browse files
data/README_pipeline.md ADDED
@@ -0,0 +1,144 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Data Processing Pipeline
2
+
3
+ This directory contains a comprehensive data processing pipeline for solar flare data analysis.
4
+
5
+ ## Pipeline Scripts
6
+
7
+ ### Main Orchestrator
8
+ - **`process_data_pipeline.py`** - Main orchestrator script that runs all processing steps in sequence
9
+
10
+ ### Individual Processing Steps
11
+ 1. **`euv_data_cleaning.py`** - Removes bad AIA files based on timestamp validation
12
+ 2. **`iti_data_processing.py`** - Processes good AIA data using ITI methods
13
+ 3. **`align_data.py`** - Concatenates GOES data and checks for missing data
14
+
15
+ ### Configuration
16
+ - **`pipeline_config.py`** - Configuration system for all directory paths and settings
17
+ - **`pipeline_config_template.yaml`** - YAML template for creating custom configurations
18
+ - **`pipeline_config_template.py`** - Python template (backward compatibility)
19
+
20
+ ## Usage
21
+
22
+ ### Run the Complete Pipeline
23
+ ```bash
24
+ # Run all steps (skip completed ones) with default configuration
25
+ python process_data_pipeline.py
26
+
27
+ # Force rerun all steps
28
+ python process_data_pipeline.py --force
29
+
30
+ # Use custom configuration file (YAML or Python)
31
+ python process_data_pipeline.py --config my_config.yaml
32
+
33
+ # Display current configuration
34
+ python process_data_pipeline.py --show-config
35
+
36
+ # Validate configuration paths
37
+ python process_data_pipeline.py --validate
38
+
39
+ # Create YAML configuration template
40
+ python process_data_pipeline.py --create-template
41
+ ```
42
+
43
+ ### Run Individual Steps
44
+ ```bash
45
+ # EUV data cleaning
46
+ python euv_data_cleaning.py
47
+
48
+ # ITI data processing
49
+ python iti_data_processing.py
50
+
51
+ # Data alignment
52
+ python align_data.py
53
+ ```
54
+
55
+ ## Pipeline Features
56
+
57
+ - **Modular Configuration**: Easy to change directory paths and settings
58
+ - **Automatic Step Skipping**: Steps that are already completed are automatically skipped
59
+ - **Comprehensive Logging**: All operations are logged to both console and file (`data_processing_pipeline.log`)
60
+ - **Error Handling**: Pipeline stops on first error with detailed error reporting
61
+ - **Progress Tracking**: Real-time progress updates and timing information
62
+ - **Configuration Validation**: Check if all required paths exist before running
63
+ - **Template Generation**: Create custom configuration templates easily
64
+
65
+ ## Configuration
66
+
67
+ ### Default Directories
68
+
69
+ The pipeline uses the following default directories (configurable):
70
+
71
+ **Input Directories:**
72
+ - `/mnt/data/AUGUST/SDO-AIA-timespan/` - Raw AIA data
73
+ - `/mnt/data/NEW-FLARE/combined/` - GOES CSV files
74
+ - `/mnt/data/NEW-FLARE/AIA_processed/` - Processed AIA files
75
+
76
+ **Output Directories:**
77
+ - `/mnt/data/AUGUST/SDO-AIA_bad/` - Bad AIA files (from EUV cleaning)
78
+ - `/mnt/data/AUGUST/AIA_ITI/` - Processed AIA data (from ITI processing)
79
+ - `/mnt/data/NEW-FLARE/GOES-SXR-A/` - GOES SXR-A data (from alignment)
80
+ - `/mnt/data/NEW-FLARE/GOES-SXR-B/` - GOES SXR-B data (from alignment)
81
+ - `/mnt/data/NEW-FLARE/AIA_ITI_MISSING/` - AIA files with missing GOES data
82
+
83
+ ### Custom Configuration
84
+
85
+ To use custom directories:
86
+
87
+ 1. **Create a YAML configuration file:**
88
+ ```bash
89
+ python process_data_pipeline.py --create-template
90
+ # Edit pipeline_config_template.yaml with your paths
91
+ ```
92
+
93
+ 2. **Use your custom configuration:**
94
+ ```bash
95
+ python process_data_pipeline.py --config my_config.yaml
96
+ ```
97
+
98
+ 3. **Validate your configuration:**
99
+ ```bash
100
+ python process_data_pipeline.py --config my_config.yaml --validate
101
+ ```
102
+
103
+ **YAML Configuration Example:**
104
+ ```yaml
105
+ # Data Processing Pipeline Configuration
106
+ base_data_dir: /your/data/path
107
+
108
+ euv:
109
+ input_folder: /your/data/AIA-raw
110
+ bad_files_dir: /your/data/AIA-bad
111
+ wavelengths: [94, 131, 171, 193, 211, 304]
112
+
113
+ iti:
114
+ input_folder: /your/data/AIA-raw
115
+ output_folder: /your/data/AIA-processed
116
+ wavelengths: [94, 131, 171, 193, 211, 304]
117
+
118
+ alignment:
119
+ goes_data_dir: /your/data/GOES
120
+ aia_processed_dir: /your/data/AIA-processed
121
+ output_sxr_a_dir: /your/data/GOES-SXR-A
122
+ output_sxr_b_dir: /your/data/GOES-SXR-B
123
+ aia_missing_dir: /your/data/AIA-missing
124
+
125
+ processing:
126
+ max_processes: null # null = use all CPU cores
127
+ batch_size_multiplier: 4
128
+ min_batch_size: 1
129
+ ```
130
+
131
+ ## Requirements
132
+
133
+ - Python 3.6+
134
+ - Required packages: pandas, numpy, astropy, tqdm, multiprocessing, pyyaml
135
+ - Sufficient disk space for data processing
136
+ - Access to the specified data directories
137
+
138
+ ## Logging
139
+
140
+ The pipeline creates detailed logs in `data_processing_pipeline.log` including:
141
+ - Start/end times for each step
142
+ - Success/failure status
143
+ - Error messages and stack traces
144
+ - Processing statistics
data/align_data.py CHANGED
@@ -13,29 +13,57 @@ import re
13
  warnings.filterwarnings('ignore')
14
 
15
  # =============================================================================
16
- # CONFIGURATION - Change these paths as needed
17
  # =============================================================================
18
  #
19
- # To change directories, simply modify the variables below:
20
- # - All paths are relative to your system
21
- # - Directories will be created automatically if they don't exist
22
- # - Use absolute paths for best results
23
  #
24
  # =============================================================================
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  # Input directories
27
- GOES_DATA_DIR = "/mnt/data/NEW-FLARE/combined" # Directory containing GOES CSV files
28
- AIA_PROCESSED_DIR = "/mnt/data/NEW-FLARE/AIA_processed" # Directory with processed AIA files
29
 
30
  # Output directories
31
- OUTPUT_SXR_A_DIR = "/mnt/data/NEW-FLARE/GOES-SXR-A" # Output directory for SXR-A data
32
- OUTPUT_SXR_B_DIR = "/mnt/data/NEW-FLARE/GOES-SXR-B" # Output directory for SXR-B data
33
- AIA_MISSING_DIR = "/mnt/data/NEW-FLARE/AIA_ITI_MISSING" # Directory for AIA files with missing GOES data
34
 
35
  # Processing configuration
36
- BATCH_SIZE_MULTIPLIER = 4 # Number of batches per process (adjust for performance)
37
- MIN_BATCH_SIZE = 1 # Minimum batch size
38
- MAX_PROCESSES = None # Maximum number of processes (None = use all CPU cores)
39
 
40
  # =============================================================================
41
 
 
13
  warnings.filterwarnings('ignore')
14
 
15
  # =============================================================================
16
+ # CONFIGURATION - Load from environment or use defaults
17
  # =============================================================================
18
  #
19
+ # Configuration is loaded from environment variables set by the pipeline orchestrator
20
+ # or falls back to default values if running standalone
 
 
21
  #
22
  # =============================================================================
23
 
24
+ import os
25
+ import json
26
+
27
+ def load_config():
28
+ """Load configuration from environment or use defaults."""
29
+ if 'PIPELINE_CONFIG' in os.environ:
30
+ try:
31
+ config = json.loads(os.environ['PIPELINE_CONFIG'])
32
+ return config
33
+ except:
34
+ pass
35
+
36
+ # Default configuration
37
+ return {
38
+ 'alignment': {
39
+ 'goes_data_dir': "/mnt/data/NEW-FLARE/combined",
40
+ 'aia_processed_dir': "/mnt/data/NEW-FLARE/AIA_processed",
41
+ 'output_sxr_a_dir': "/mnt/data/NEW-FLARE/GOES-SXR-A",
42
+ 'output_sxr_b_dir': "/mnt/data/NEW-FLARE/GOES-SXR-B",
43
+ 'aia_missing_dir': "/mnt/data/NEW-FLARE/AIA_ITI_MISSING"
44
+ },
45
+ 'processing': {
46
+ 'batch_size_multiplier': 4,
47
+ 'min_batch_size': 1,
48
+ 'max_processes': None
49
+ }
50
+ }
51
+
52
+ config = load_config()
53
+
54
  # Input directories
55
+ GOES_DATA_DIR = config['alignment']['goes_data_dir']
56
+ AIA_PROCESSED_DIR = config['alignment']['aia_processed_dir']
57
 
58
  # Output directories
59
+ OUTPUT_SXR_A_DIR = config['alignment']['output_sxr_a_dir']
60
+ OUTPUT_SXR_B_DIR = config['alignment']['output_sxr_b_dir']
61
+ AIA_MISSING_DIR = config['alignment']['aia_missing_dir']
62
 
63
  # Processing configuration
64
+ BATCH_SIZE_MULTIPLIER = config['processing']['batch_size_multiplier']
65
+ MIN_BATCH_SIZE = config['processing']['min_batch_size']
66
+ MAX_PROCESSES = config['processing']['max_processes']
67
 
68
  # =============================================================================
69
 
data/euv_data_cleaning.py CHANGED
@@ -15,8 +15,31 @@ from itipy.data.dataset import get_intersecting_files
15
  from astropy.io import fits
16
 
17
  # Configuration for all wavelengths to process
18
- wavelengths = [94, 131, 171, 193, 211, 304]
19
- base_input_folder = '/mnt/data/NEW-FLARE/SDO-AIA-flaring'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
  aia_files = get_intersecting_files(base_input_folder, wavelengths)
22
 
@@ -64,7 +87,7 @@ for wavelength in wavelengths:
64
  filename = pd.to_datetime(names).strftime('%Y-%m-%dT%H:%M:%S') + ".fits"
65
  file_path = os.path.join(base_input_folder, f"{wavelength}/{filename}")
66
  # Destination path
67
- destination_folder = os.path.join("/mnt/data/NEW-FLARE/SDO-AIA_bad", str(wavelength))
68
  os.makedirs(destination_folder, exist_ok=True)
69
  # Move or report missing
70
  if os.path.exists(file_path):
 
15
  from astropy.io import fits
16
 
17
  # Configuration for all wavelengths to process
18
+ # Load configuration from environment or use defaults
19
+ import os
20
+ import json
21
+
22
+ def load_config():
23
+ """Load configuration from environment or use defaults."""
24
+ if 'PIPELINE_CONFIG' in os.environ:
25
+ try:
26
+ config = json.loads(os.environ['PIPELINE_CONFIG'])
27
+ return config
28
+ except:
29
+ pass
30
+
31
+ # Default configuration
32
+ return {
33
+ 'euv': {
34
+ 'wavelengths': [94, 131, 171, 193, 211, 304],
35
+ 'input_folder': '/mnt/data/AUGUST/SDO-AIA-timespan',
36
+ 'bad_files_dir': '/mnt/data/AUGUST/SDO-AIA_bad'
37
+ }
38
+ }
39
+
40
+ config = load_config()
41
+ wavelengths = config['euv']['wavelengths']
42
+ base_input_folder = config['euv']['input_folder']
43
 
44
  aia_files = get_intersecting_files(base_input_folder, wavelengths)
45
 
 
87
  filename = pd.to_datetime(names).strftime('%Y-%m-%dT%H:%M:%S') + ".fits"
88
  file_path = os.path.join(base_input_folder, f"{wavelength}/{filename}")
89
  # Destination path
90
+ destination_folder = os.path.join(config['euv']['bad_files_dir'], str(wavelength))
91
  os.makedirs(destination_folder, exist_ok=True)
92
  # Move or report missing
93
  if os.path.exists(file_path):
data/iti_data_processing.py CHANGED
@@ -14,9 +14,32 @@ from multiprocessing import Pool
14
  from tqdm import tqdm
15
 
16
  # Configuration for all wavelengths to process
17
- wavelengths = [94, 131, 171, 193, 211, 304]
18
- base_input_folder = '/mnt/data/SDO-AIA-flaring'
19
- output_folder = '/mnt/data/AIA_ITI'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  os.makedirs(output_folder, exist_ok=True)
21
 
22
  sdo_norms = {
 
14
  from tqdm import tqdm
15
 
16
  # Configuration for all wavelengths to process
17
+ # Load configuration from environment or use defaults
18
+ import os
19
+ import json
20
+
21
+ def load_config():
22
+ """Load configuration from environment or use defaults."""
23
+ if 'PIPELINE_CONFIG' in os.environ:
24
+ try:
25
+ config = json.loads(os.environ['PIPELINE_CONFIG'])
26
+ return config
27
+ except:
28
+ pass
29
+
30
+ # Default configuration
31
+ return {
32
+ 'iti': {
33
+ 'wavelengths': [94, 131, 171, 193, 211, 304],
34
+ 'input_folder': '/mnt/data/AUGUST/SDO-AIA-timespan',
35
+ 'output_folder': '/mnt/data/AUGUST/AIA_ITI'
36
+ }
37
+ }
38
+
39
+ config = load_config()
40
+ wavelengths = config['iti']['wavelengths']
41
+ base_input_folder = config['iti']['input_folder']
42
+ output_folder = config['iti']['output_folder']
43
  os.makedirs(output_folder, exist_ok=True)
44
 
45
  sdo_norms = {
data/pipeline_config_template.yaml ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Data Processing Pipeline Configuration
2
+ #
3
+ # Modify the paths below to match your system setup.
4
+ # All paths should be absolute paths for best results.
5
+ #
6
+ # Usage: python process_data_pipeline.py --config this_file.yaml
7
+ #
8
+
9
+ base_data_dir: /mnt/data
10
+ euv:
11
+ input_folder: /mnt/data/AUGUST/SDO-AIA-timespan
12
+ bad_files_dir: /mnt/data/AUGUST/SDO-AIA_bad
13
+ wavelengths:
14
+ - 94
15
+ - 131
16
+ - 171
17
+ - 193
18
+ - 211
19
+ - 304
20
+ iti:
21
+ input_folder: /mnt/data/AUGUST/SDO-AIA-timespan
22
+ output_folder: /mnt/data/AUGUST/AIA_ITI
23
+ wavelengths:
24
+ - 94
25
+ - 131
26
+ - 171
27
+ - 193
28
+ - 211
29
+ - 304
30
+ alignment:
31
+ goes_data_dir: /mnt/data/NEW-FLARE/combined
32
+ aia_processed_dir: /mnt/data/NEW-FLARE/AIA_processed
33
+ output_sxr_a_dir: /mnt/data/NEW-FLARE/GOES-SXR-A
34
+ output_sxr_b_dir: /mnt/data/NEW-FLARE/GOES-SXR-B
35
+ aia_missing_dir: /mnt/data/NEW-FLARE/AIA_ITI_MISSING
36
+ processing:
37
+ max_processes: null
38
+ batch_size_multiplier: 4
39
+ min_batch_size: 1
data/process_data_pipeline.py ADDED
@@ -0,0 +1,294 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Data Processing Pipeline Orchestrator
4
+
5
+ This script orchestrates the three main data processing steps:
6
+ 1. EUV data cleaning (euv_data_cleaning.py) - removes bad AIA files
7
+ 2. ITI data processing (iti_data_processing.py) - processes the good data
8
+ 3. Data alignment (align_data.py) - concatenates GOES data and checks for missing data
9
+
10
+ Each step can be skipped if it's already completed.
11
+
12
+ Configuration:
13
+ - Use --config to specify a custom configuration file (YAML or Python)
14
+ - Use --show-config to display current configuration
15
+ - Use --create-template to create a YAML configuration template
16
+ """
17
+
18
+ import os
19
+ import sys
20
+ import subprocess
21
+ import time
22
+ import logging
23
+ from datetime import datetime
24
+ from pathlib import Path
25
+ from pipeline_config import PipelineConfig
26
+
27
+ # Configure logging
28
+ logging.basicConfig(
29
+ level=logging.INFO,
30
+ format='%(asctime)s - %(levelname)s - %(message)s',
31
+ handlers=[
32
+ logging.FileHandler('data_processing_pipeline.log'),
33
+ logging.StreamHandler(sys.stdout)
34
+ ]
35
+ )
36
+ logger = logging.getLogger(__name__)
37
+
38
+ class DataProcessingPipeline:
39
+ def __init__(self, base_dir=None, config=None):
40
+ """
41
+ Initialize the data processing pipeline.
42
+
43
+ Args:
44
+ base_dir: Base directory for the project. If None, uses current script's directory.
45
+ config: PipelineConfig instance. If None, uses default configuration.
46
+ """
47
+ if base_dir is None:
48
+ self.base_dir = Path(__file__).parent
49
+ else:
50
+ self.base_dir = Path(base_dir)
51
+
52
+ # Load configuration
53
+ self.config = config if config is not None else PipelineConfig()
54
+
55
+ # Define script paths
56
+ self.scripts = {
57
+ 'euv_cleaning': self.base_dir / 'euv_data_cleaning.py',
58
+ 'iti_processing': self.base_dir / 'iti_data_processing.py',
59
+ 'align_data': self.base_dir / 'align_data.py'
60
+ }
61
+
62
+ # Define step names and descriptions
63
+ self.steps = {
64
+ 'euv_cleaning': {
65
+ 'name': 'EUV Data Cleaning',
66
+ 'description': 'Remove bad AIA files based on timestamp validation',
67
+ 'output_check': self._check_euv_cleaning_complete
68
+ },
69
+ 'iti_processing': {
70
+ 'name': 'ITI Data Processing',
71
+ 'description': 'Process good AIA data using ITI methods',
72
+ 'output_check': self._check_iti_processing_complete
73
+ },
74
+ 'align_data': {
75
+ 'name': 'Data Alignment',
76
+ 'description': 'Concatenate GOES data and check for missing data',
77
+ 'output_check': self._check_align_data_complete
78
+ }
79
+ }
80
+
81
+ def _check_euv_cleaning_complete(self):
82
+ """
83
+ Check if EUV data cleaning is complete by looking for the bad files directory.
84
+ """
85
+ bad_files_dir = Path(self.config.get_path('euv', 'bad_files_dir'))
86
+ if bad_files_dir.exists():
87
+ # Check if any files were moved (indicating cleaning was done)
88
+ wavelengths = self.config.get_path('euv', 'wavelengths')
89
+ wavelength_dirs = [bad_files_dir / str(wl) for wl in wavelengths]
90
+ return any(d.exists() and any(d.iterdir()) for d in wavelength_dirs)
91
+ return False
92
+
93
+ def _check_iti_processing_complete(self):
94
+ """
95
+ Check if ITI data processing is complete by looking for processed files.
96
+ """
97
+ output_dir = Path(self.config.get_path('iti', 'output_folder'))
98
+ if output_dir.exists():
99
+ # Check if there are processed .npy files
100
+ npy_files = list(output_dir.glob('*.npy'))
101
+ return len(npy_files) > 0
102
+ return False
103
+
104
+ def _check_align_data_complete(self):
105
+ """
106
+ Check if data alignment is complete by looking for output directories.
107
+ """
108
+ output_dirs = [
109
+ Path(self.config.get_path('alignment', 'output_sxr_a_dir')),
110
+ Path(self.config.get_path('alignment', 'output_sxr_b_dir'))
111
+ ]
112
+ return all(d.exists() and any(d.iterdir()) for d in output_dirs)
113
+
114
+ def run_script(self, script_name, step_info):
115
+ """
116
+ Run a single processing script.
117
+
118
+ Args:
119
+ script_name: Name of the script to run
120
+ step_info: Dictionary containing step information
121
+
122
+ Returns:
123
+ bool: True if successful, False otherwise
124
+ """
125
+ script_path = self.scripts[script_name]
126
+
127
+ if not script_path.exists():
128
+ logger.error(f"Script not found: {script_path}")
129
+ return False
130
+
131
+ logger.info(f"Starting {step_info['name']}...")
132
+ logger.info(f"Description: {step_info['description']}")
133
+ logger.info(f"Running: {script_path}")
134
+
135
+ # Create environment variables for configuration
136
+ env = os.environ.copy()
137
+ env.update({
138
+ 'PIPELINE_CONFIG': str(self.config.config),
139
+ 'BASE_DATA_DIR': self.config.get_path('base_data_dir', 'base_data_dir')
140
+ })
141
+
142
+ start_time = time.time()
143
+
144
+ try:
145
+ # Run the script
146
+ result = subprocess.run(
147
+ [sys.executable, str(script_path)],
148
+ capture_output=True,
149
+ text=True,
150
+ cwd=self.base_dir,
151
+ env=env
152
+ )
153
+
154
+ end_time = time.time()
155
+ duration = end_time - start_time
156
+
157
+ if result.returncode == 0:
158
+ logger.info(f"✓ {step_info['name']} completed successfully in {duration:.2f} seconds")
159
+ if result.stdout:
160
+ logger.debug(f"Output: {result.stdout}")
161
+ return True
162
+ else:
163
+ logger.error(f"✗ {step_info['name']} failed with return code {result.returncode}")
164
+ logger.error(f"Error output: {result.stderr}")
165
+ return False
166
+
167
+ except Exception as e:
168
+ end_time = time.time()
169
+ duration = end_time - start_time
170
+ logger.error(f"✗ {step_info['name']} failed with exception: {e}")
171
+ logger.error(f"Duration: {duration:.2f} seconds")
172
+ return False
173
+
174
+ def run_pipeline(self, force_rerun=False):
175
+ """
176
+ Run the complete data processing pipeline.
177
+
178
+ Args:
179
+ force_rerun: If True, run all steps regardless of completion status
180
+ """
181
+ logger.info("=" * 80)
182
+ logger.info("Starting Data Processing Pipeline")
183
+ logger.info("=" * 80)
184
+ logger.info(f"Base directory: {self.base_dir}")
185
+ logger.info(f"Force rerun: {force_rerun}")
186
+ logger.info("=" * 80)
187
+
188
+ pipeline_start_time = time.time()
189
+ successful_steps = 0
190
+ failed_steps = 0
191
+
192
+ for step_name, step_info in self.steps.items():
193
+ logger.info(f"\n--- Step: {step_info['name']} ---")
194
+
195
+ # Check if step is already complete
196
+ if not force_rerun and step_info['output_check']():
197
+ logger.info(f"✓ {step_info['name']} already completed - skipping")
198
+ successful_steps += 1
199
+ continue
200
+
201
+ # Run the step
202
+ if self.run_script(step_name, step_info):
203
+ successful_steps += 1
204
+ else:
205
+ failed_steps += 1
206
+ logger.error(f"Pipeline stopped due to failure in {step_info['name']}")
207
+ break
208
+
209
+ pipeline_end_time = time.time()
210
+ total_duration = pipeline_end_time - pipeline_start_time
211
+
212
+ # Summary
213
+ logger.info("\n" + "=" * 80)
214
+ logger.info("PIPELINE SUMMARY")
215
+ logger.info("=" * 80)
216
+ logger.info(f"Total duration: {total_duration:.2f} seconds")
217
+ logger.info(f"Successful steps: {successful_steps}")
218
+ logger.info(f"Failed steps: {failed_steps}")
219
+
220
+ if failed_steps == 0:
221
+ logger.info("✓ All steps completed successfully!")
222
+ else:
223
+ logger.error("✗ Pipeline completed with errors")
224
+
225
+ logger.info("=" * 80)
226
+
227
+ return failed_steps == 0
228
+
229
+ def main():
230
+ """Main function to run the pipeline."""
231
+ import argparse
232
+
233
+ parser = argparse.ArgumentParser(description='Data Processing Pipeline Orchestrator')
234
+ parser.add_argument('--force', action='store_true',
235
+ help='Force rerun all steps regardless of completion status')
236
+ parser.add_argument('--base-dir', type=str,
237
+ help='Base directory for the project (default: script directory)')
238
+ parser.add_argument('--config', type=str,
239
+ help='Path to custom configuration file (YAML or Python)')
240
+ parser.add_argument('--show-config', action='store_true',
241
+ help='Display current configuration and exit')
242
+ parser.add_argument('--create-template', action='store_true',
243
+ help='Create a YAML configuration template file and exit')
244
+ parser.add_argument('--validate', action='store_true',
245
+ help='Validate configuration paths and exit')
246
+
247
+ args = parser.parse_args()
248
+
249
+ # Handle special commands
250
+ if args.create_template:
251
+ config = PipelineConfig()
252
+ config.save_config_template()
253
+ return
254
+
255
+ # Load configuration
256
+ config = PipelineConfig(args.config)
257
+
258
+ if args.show_config:
259
+ config.print_config()
260
+ return
261
+
262
+ if args.validate:
263
+ is_valid, missing_paths = config.validate_paths()
264
+ if is_valid:
265
+ print("✓ All required paths exist")
266
+ else:
267
+ print("✗ Missing required paths:")
268
+ for path in missing_paths:
269
+ print(f" - {path}")
270
+ return
271
+
272
+ # Create pipeline instance
273
+ pipeline = DataProcessingPipeline(args.base_dir, config)
274
+
275
+ # Validate paths before running
276
+ is_valid, missing_paths = config.validate_paths()
277
+ if not is_valid:
278
+ logger.error("Configuration validation failed. Missing required paths:")
279
+ for path in missing_paths:
280
+ logger.error(f" - {path}")
281
+ logger.error("Use --validate to check configuration")
282
+ sys.exit(1)
283
+
284
+ # Create necessary directories
285
+ config.create_directories()
286
+
287
+ # Run the pipeline
288
+ success = pipeline.run_pipeline(force_rerun=args.force)
289
+
290
+ # Exit with appropriate code
291
+ sys.exit(0 if success else 1)
292
+
293
+ if __name__ == "__main__":
294
+ main()