| |
| """ |
| Data Processing Pipeline Orchestrator |
| |
| This script orchestrates the three main data processing steps: |
| 1. EUV data cleaning (euv_data_cleaning.py) - removes bad AIA files |
| 2. ITI data processing (iti_data_processing.py) - processes the good data |
| 3. Data alignment (align_data.py) - concatenates GOES data and checks for missing data |
| |
| Each step can be skipped if it's already completed. |
| |
| Configuration: |
| - Use --config to specify a custom configuration file (YAML or Python) |
| - Use --show-config to display current configuration |
| - Use --create-template to create a YAML configuration template |
| """ |
|
|
| import os |
| import sys |
| import subprocess |
| import time |
| import logging |
| from datetime import datetime |
| from pathlib import Path |
| from pipeline_config import PipelineConfig |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler('data_processing_pipeline.log'), |
| logging.StreamHandler(sys.stdout) |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| class DataProcessingPipeline: |
| def __init__(self, base_dir=None, config=None): |
| """ |
| Initialize the data processing pipeline. |
| |
| Args: |
| base_dir: Base directory for the project. If None, uses current script's directory. |
| config: PipelineConfig instance. If None, uses default configuration. |
| """ |
| if base_dir is None: |
| self.base_dir = Path(__file__).parent |
| else: |
| self.base_dir = Path(base_dir) |
| |
| |
| self.config = config if config is not None else PipelineConfig() |
| |
| |
| self.scripts = { |
| 'euv_cleaning': self.base_dir / 'euv_data_cleaning.py', |
| 'iti_processing': self.base_dir / 'iti_data_processing.py', |
| 'align_data': self.base_dir / 'align_data.py' |
| } |
| |
| |
| self.steps = { |
| 'euv_cleaning': { |
| 'name': 'EUV Data Cleaning', |
| 'description': 'Remove bad AIA files based on timestamp validation', |
| 'output_check': self._check_euv_cleaning_complete |
| }, |
| 'iti_processing': { |
| 'name': 'ITI Data Processing', |
| 'description': 'Process good AIA data using ITI methods', |
| 'output_check': self._check_iti_processing_complete |
| }, |
| 'align_data': { |
| 'name': 'Data Alignment', |
| 'description': 'Concatenate GOES data and check for missing data', |
| 'output_check': self._check_align_data_complete |
| } |
| } |
| |
| def _check_euv_cleaning_complete(self): |
| """ |
| Check if EUV data cleaning is complete by looking for the bad files directory. |
| """ |
| bad_files_dir = Path(self.config.get_path('euv', 'bad_files_dir')) |
| if bad_files_dir.exists(): |
| |
| wavelengths = self.config.get_path('euv', 'wavelengths') |
| wavelength_dirs = [bad_files_dir / str(wl) for wl in wavelengths] |
| return any(d.exists() and any(d.iterdir()) for d in wavelength_dirs) |
| return False |
| |
| def _check_iti_processing_complete(self): |
| """ |
| Check if ITI data processing is complete by looking for processed files. |
| """ |
| output_dir = Path(self.config.get_path('iti', 'output_folder')) |
| if output_dir.exists(): |
| |
| npy_files = list(output_dir.glob('*.npy')) |
| return len(npy_files) > 0 |
| return False |
| |
| def _check_align_data_complete(self): |
| """ |
| Check if data alignment is complete by looking for output directories. |
| """ |
| output_dir = Path(self.config.get_path('alignment', 'output_sxr_dir')) |
| return output_dir.exists() and any(output_dir.iterdir()) |
| |
| def run_script(self, script_name, step_info): |
| """ |
| Run a single processing script. |
| |
| Args: |
| script_name: Name of the script to run |
| step_info: Dictionary containing step information |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
| script_path = self.scripts[script_name] |
| |
| if not script_path.exists(): |
| logger.error(f"Script not found: {script_path}") |
| return False |
| |
| logger.info(f"Starting {step_info['name']}...") |
| logger.info(f"Description: {step_info['description']}") |
| logger.info(f"Running: {script_path}") |
| |
| |
| env = os.environ.copy() |
| env.update({ |
| 'PIPELINE_CONFIG': self.config.to_json(), |
| 'BASE_DATA_DIR': self.config.get_path('base_data_dir', 'base_data_dir') |
| }) |
| |
| start_time = time.time() |
| |
| try: |
| |
| result = subprocess.run( |
| [sys.executable, str(script_path)], |
| cwd=self.base_dir, |
| env=env |
| ) |
|
|
| end_time = time.time() |
| duration = end_time - start_time |
|
|
| if result.returncode == 0: |
| logger.info(f"✓ {step_info['name']} completed successfully in {duration:.2f} seconds") |
| return True |
| else: |
| logger.error(f"✗ {step_info['name']} failed with return code {result.returncode}") |
| return False |
| |
| except Exception as e: |
| end_time = time.time() |
| duration = end_time - start_time |
| logger.error(f"✗ {step_info['name']} failed with exception: {e}") |
| logger.error(f"Duration: {duration:.2f} seconds") |
| return False |
| |
| def run_pipeline(self, force_rerun=False): |
| """ |
| Run the complete data processing pipeline. |
| |
| Args: |
| force_rerun: If True, run all steps regardless of completion status |
| """ |
| logger.info("=" * 80) |
| logger.info("Starting Data Processing Pipeline") |
| logger.info("=" * 80) |
| logger.info(f"Base directory: {self.base_dir}") |
| logger.info(f"Force rerun: {force_rerun}") |
| logger.info("=" * 80) |
| |
| pipeline_start_time = time.time() |
| successful_steps = 0 |
| failed_steps = 0 |
| |
| for step_name, step_info in self.steps.items(): |
| logger.info(f"\n--- Step: {step_info['name']} ---") |
| |
| |
| if not force_rerun and step_info['output_check'](): |
| logger.info(f"✓ {step_info['name']} already completed - skipping") |
| successful_steps += 1 |
| continue |
| |
| |
| if self.run_script(step_name, step_info): |
| successful_steps += 1 |
| else: |
| failed_steps += 1 |
| logger.error(f"Pipeline stopped due to failure in {step_info['name']}") |
| break |
| |
| pipeline_end_time = time.time() |
| total_duration = pipeline_end_time - pipeline_start_time |
| |
| |
| logger.info("\n" + "=" * 80) |
| logger.info("PIPELINE SUMMARY") |
| logger.info("=" * 80) |
| logger.info(f"Total duration: {total_duration:.2f} seconds") |
| logger.info(f"Successful steps: {successful_steps}") |
| logger.info(f"Failed steps: {failed_steps}") |
| |
| if failed_steps == 0: |
| logger.info("✓ All steps completed successfully!") |
| else: |
| logger.error("✗ Pipeline completed with errors") |
| |
| logger.info("=" * 80) |
| |
| return failed_steps == 0 |
|
|
| def main(): |
| """Main function to run the pipeline.""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description='Data Processing Pipeline Orchestrator') |
| parser.add_argument('--force', action='store_true', |
| help='Force rerun all steps regardless of completion status') |
| parser.add_argument('--base-dir', type=str, |
| help='Base directory for the project (default: script directory)') |
| parser.add_argument('--config', type=str, |
| help='Path to custom configuration file (YAML or Python)') |
| parser.add_argument('--show-config', action='store_true', |
| help='Display current configuration and exit') |
| parser.add_argument('--create-template', action='store_true', |
| help='Create a YAML configuration template file and exit') |
| parser.add_argument('--validate', action='store_true', |
| help='Validate configuration paths and exit') |
| |
| args = parser.parse_args() |
| |
| |
| if args.create_template: |
| config = PipelineConfig() |
| config.save_config_template() |
| return |
| |
| |
| config = PipelineConfig(args.config) |
| |
| if args.show_config: |
| config.print_config() |
| return |
| |
| if args.validate: |
| is_valid, missing_paths = config.validate_paths() |
| if is_valid: |
| print("✓ All required paths exist") |
| else: |
| print("✗ Missing required paths:") |
| for path in missing_paths: |
| print(f" - {path}") |
| return |
| |
| |
| pipeline = DataProcessingPipeline(args.base_dir, config) |
| |
| |
| is_valid, missing_paths = config.validate_paths() |
| if not is_valid: |
| logger.error("Configuration validation failed. Missing required paths:") |
| for path in missing_paths: |
| logger.error(f" - {path}") |
| logger.error("Use --validate to check configuration") |
| sys.exit(1) |
| |
| |
| config.create_directories() |
| |
| |
| success = pipeline.run_pipeline(force_rerun=args.force) |
| |
| |
| sys.exit(0 if success else 1) |
|
|
| if __name__ == "__main__": |
| main() |
|
|