| import glob |
| import os |
| import time |
| import warnings |
| from datetime import datetime |
| from multiprocessing import Pool, cpu_count |
|
|
| import numpy as np |
| import pandas as pd |
| from tqdm import tqdm |
| import re |
|
|
| warnings.filterwarnings('ignore') |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import json |
|
|
| def load_config(): |
| """Load configuration from environment or use defaults.""" |
| if 'PIPELINE_CONFIG' in os.environ: |
| try: |
| config = json.loads(os.environ['PIPELINE_CONFIG']) |
| return config |
| except: |
| pass |
| |
| |
| return { |
| 'alignment': { |
| 'goes_data_dir': "/mnt/data/PAPER/GOES-timespan/combined", |
| 'aia_processed_dir': "/mnt/data/PAPER/SDOITI", |
| 'output_sxr_dir': "/Volumes/T9/Data_FOXES/SXR_processed", |
| 'aia_missing_dir': "/Volumes/T9/Data_FOXES/AIA_missing" |
| }, |
| 'processing': { |
| 'batch_size_multiplier': 4, |
| 'min_batch_size': 1, |
| 'max_processes': None |
| } |
| } |
|
|
| config = load_config() |
|
|
| |
| GOES_DATA_DIR = config['alignment']['goes_data_dir'] |
| AIA_PROCESSED_DIR = config['alignment']['aia_processed_dir'] |
|
|
| |
| OUTPUT_SXR_DIR = config['alignment']['output_sxr_dir'] |
| AIA_MISSING_DIR = config['alignment']['aia_missing_dir'] |
|
|
| |
| BATCH_SIZE_MULTIPLIER = config['processing']['batch_size_multiplier'] |
| MIN_BATCH_SIZE = config['processing']['min_batch_size'] |
| MAX_PROCESSES = config['processing']['max_processes'] |
|
|
| |
|
|
|
|
| def load_and_prepare_goes_data(goes_data_dir): |
| """ |
| Load all GOES data and prepare it for efficient lookups. |
| """ |
| print(f"Loading GOES data from: {goes_data_dir}") |
| |
| |
| pattern = re.compile(r"combined_g(\d+)_avg1m_\d+_\d+\.csv") |
| |
| |
| goes_files = [] |
| for fname in os.listdir(goes_data_dir): |
| match = pattern.match(fname) |
| if match: |
| g_number = int(match.group(1)) |
| goes_files.append((g_number, fname)) |
| |
| if not goes_files: |
| raise FileNotFoundError(f"No GOES CSV files found in directory: {goes_data_dir}") |
| |
| |
| goes_data_dict = {} |
| print(f"Found {len(goes_files)} GOES instrument files:") |
| |
| for g_number, filename in sorted(goes_files, reverse=True): |
| print(f" Loading GOES-{g_number} from {filename}") |
| try: |
| goes_df = pd.read_csv(os.path.join(goes_data_dir, filename)) |
| goes_df['time'] = pd.to_datetime(goes_df['time'], format='%Y-%m-%d %H:%M:%S') |
| |
| goes_df.set_index('time', inplace=True) |
| goes_df.sort_index(inplace=True) |
| |
| goes_df = goes_df[goes_df['xrsb_flag']==0] |
|
|
| goes_data_dict[g_number] = goes_df |
| print(f" Loaded {len(goes_df)} records from {goes_df.index.min()} to {goes_df.index.max()}") |
| except Exception as e: |
| print(f" Warning: Failed to load {filename}: {e}") |
| continue |
| |
| if not goes_data_dict: |
| raise FileNotFoundError("No valid GOES data files could be loaded.") |
| |
| print(f"Successfully loaded {len(goes_data_dict)} GOES instruments: {sorted(goes_data_dict.keys())}") |
| |
| |
| print("\nAnalyzing timestamp coverage...") |
| for g_number in sorted(goes_data_dict.keys(), reverse=True): |
| goes_data = goes_data_dict[g_number] |
| time_range = f"{goes_data.index.min()} to {goes_data.index.max()}" |
| print(f" GOES-{g_number}: {len(goes_data)} records, {time_range}") |
| |
| return goes_data_dict |
|
|
|
|
| def create_combined_lookup_table(goes_data_dict, target_timestamps): |
| """ |
| Create a single lookup table with the best available data for each timestamp. |
| This eliminates the need to search through multiple DataFrames during processing. |
| """ |
| print("Creating optimized lookup table...") |
| |
| target_times = pd.to_datetime(target_timestamps) |
| lookup_data = [] |
| |
| |
| for target_time in tqdm(target_times, desc="Building lookup table"): |
| sxr_b_values = [] |
| available_instruments = [] |
|
|
| for g_number in sorted(goes_data_dict.keys(), reverse=True): |
| goes_data = goes_data_dict[g_number] |
| if target_time in goes_data.index: |
| row = goes_data.loc[target_time] |
| sxr_b = row['xrsb_flux'] |
| if not pd.isna(sxr_b): |
| sxr_b_values.append(float(sxr_b)) |
| available_instruments.append(f"GOES-{g_number}") |
|
|
| if sxr_b_values: |
| lookup_data.append({ |
| 'timestamp': target_time.strftime('%Y-%m-%dT%H:%M:%S'), |
| 'sxr_b': float(np.mean(sxr_b_values)), |
| 'instrument': ",".join(available_instruments) |
| }) |
| |
| print(f"Found valid data for {len(lookup_data)}/{len(target_timestamps)} timestamps") |
| return lookup_data |
|
|
|
|
| def process_batch(batch_data): |
| """ |
| Process a batch of timestamps efficiently. |
| This is much more efficient than processing one timestamp per process. |
| """ |
| successful_count = 0 |
| failed_count = 0 |
| results = [] |
| |
| for data in batch_data: |
| try: |
| timestamp = data['timestamp'] |
| sxr_b = data['sxr_b'] |
| instrument = data['instrument'] |
|
|
| np.save(f"{OUTPUT_SXR_DIR}/{timestamp}.npy", np.array([sxr_b], dtype=np.float32)) |
|
|
| successful_count += 1 |
| results.append((timestamp, True, f"Success using {instrument}")) |
|
|
| except Exception as e: |
| failed_count += 1 |
| results.append((timestamp, False, f"Error processing timestamp {timestamp}: {e}")) |
| |
| return results, successful_count, failed_count |
|
|
|
|
| def split_into_batches(data, batch_size): |
| """Split data into batches for parallel processing.""" |
| for i in range(0, len(data), batch_size): |
| yield data[i:i + batch_size] |
|
|
|
|
| def main(): |
| print("=" * 60) |
| print("GOES Data Alignment Tool") |
| print("=" * 60) |
| print(f"GOES data directory: {GOES_DATA_DIR}") |
| print(f"AIA processed directory: {AIA_PROCESSED_DIR}") |
| print(f"Output SXR directory: {OUTPUT_SXR_DIR}") |
| print(f"AIA missing directory: {AIA_MISSING_DIR}") |
| print("=" * 60) |
| |
| |
| os.makedirs(OUTPUT_SXR_DIR, exist_ok=True) |
| os.makedirs(AIA_MISSING_DIR, exist_ok=True) |
| |
| |
| goes_data_dict = load_and_prepare_goes_data(GOES_DATA_DIR) |
| |
| |
| print(f"\nFinding target timestamps from AIA files in: {AIA_PROCESSED_DIR}") |
| aia_files = sorted(glob.glob(f"{AIA_PROCESSED_DIR}/*.npy", recursive=True)) |
| aia_files_split = [file.split('/')[-1].split('.')[0] for file in aia_files] |
| common_timestamps = [ |
| datetime.fromisoformat(date_str).strftime('%Y-%m-%dT%H:%M:%S') |
| for date_str in aia_files_split |
| ] |
| |
| print(f"Found {len(common_timestamps)} target timestamps") |
| |
| |
| lookup_data = create_combined_lookup_table(goes_data_dict, common_timestamps) |
| |
| if not lookup_data: |
| print("No valid data found for any timestamps!") |
| return |
| |
| |
| start_time = time.time() |
| |
| |
| max_procs = MAX_PROCESSES if MAX_PROCESSES is not None else cpu_count() |
| num_processes = min(max_procs, max(1, len(lookup_data) // 100)) |
| batch_size = max(MIN_BATCH_SIZE, len(lookup_data) // (num_processes * BATCH_SIZE_MULTIPLIER)) |
| |
| print(f"\nProcessing {len(lookup_data)} valid timestamps...") |
| print(f"Using {num_processes} processes with batch size {batch_size}") |
| |
| |
| batches = list(split_into_batches(lookup_data, batch_size)) |
| |
| |
| total_successful = 0 |
| total_failed = 0 |
| |
| if num_processes == 1: |
| |
| pbar = tqdm(batches, desc="Processing batches") |
| for batch in pbar: |
| results, successful, failed = process_batch(batch) |
| total_successful += successful |
| total_failed += failed |
| pbar.set_postfix(success=total_successful, failed=total_failed) |
| else: |
| |
| with Pool(processes=num_processes) as pool: |
| |
| results = [] |
| for batch in tqdm(batches, desc="Submitting batches"): |
| result = pool.apply_async(process_batch, (batch,)) |
| results.append(result) |
| |
| |
| pbar = tqdm(total=len(results), desc="Processing batches") |
| for result in results: |
| batch_results, successful, failed = result.get() |
| total_successful += successful |
| total_failed += failed |
| pbar.set_postfix(success=total_successful, failed=total_failed) |
| pbar.update(1) |
| pbar.close() |
| |
| |
| end_time = time.time() |
| total_time = end_time - start_time |
| |
| print(f"\n" + "=" * 60) |
| print(f"PROCESSING COMPLETE!") |
| print(f"=" * 60) |
| print(f"Total time: {total_time:.2f} seconds") |
| print(f"Average time per timestamp: {total_time / len(lookup_data):.4f} seconds") |
| print(f"Successfully processed: {total_successful}/{len(lookup_data)} timestamps") |
| print(f"Failed processes: {total_failed}") |
| print(f"Processing rate: {len(lookup_data) / total_time:.2f} timestamps/second") |
| print(f"Available GOES instruments: {sorted(goes_data_dict.keys())}") |
| |
| |
| missing_count = len(common_timestamps) - len(lookup_data) |
| if missing_count > 0: |
| print(f"\n{missing_count} timestamps had no valid GOES data available") |
| print("This may be due to:") |
| print(" - Timestamps outside the coverage range of all GOES instruments") |
| print(" - Missing or invalid SXR data in the GOES files") |
| print(" - Time gaps between different GOES instruments") |
|
|
| |
| print(f"\nChecking for AIA files with missing GOES data...") |
| print(f"Moving files with missing GOES data to: {AIA_MISSING_DIR}") |
| |
| |
| valid_timestamps = {data['timestamp'] for data in lookup_data} |
| |
| moved_count = 0 |
| for file in aia_files: |
| |
| filename = file.split('/')[-1].split('.')[0] |
| timestamp = datetime.fromisoformat(filename).strftime('%Y-%m-%dT%H:%M:%S') |
| |
| if timestamp not in valid_timestamps: |
| try: |
| target_path = f"{AIA_MISSING_DIR}/{file.split('/')[-1]}" |
| os.rename(file, target_path) |
| moved_count += 1 |
| print(f"Moved {file} to {AIA_MISSING_DIR}") |
| except Exception as e: |
| print(f"Failed to move {file}: {e}") |
| |
| print(f"Moved {moved_count} files to {AIA_MISSING_DIR}") |
| print("\nDone!") |
|
|
| if __name__ == "__main__": |
| main() |