| import os |
| import pandas as pd |
| import xarray as xr |
| import numpy as np |
| from datetime import datetime, timedelta |
| from multiprocessing import Pool |
| import multiprocessing as mp |
| from tqdm import tqdm |
|
|
| |
| nc_path='../datasets/0.25' |
| startDate='2021' |
| endDate='2022' |
| freq='6h' |
| horizon = 6 |
| surface_variables = ["t2m","u10","v10", "msl", "tp"] |
| upper_variables = ["t", "u", "v", "r", "z",] |
| levels = [50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000] |
| surface_prefix = ["t2m.era5.", "u10.era5.", "v10.era5.", "era5.mslp-slhf-sshf.",""] |
| upper_prefix = ["era5.temperature.", "era5.u_component_of_wind.", "era5.v_component_of_wind.", "era5.relative_humidity.", "era5.geopotential."] |
| output_dir = '../datasets/0.25/china_organized' |
|
|
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
|
|
| |
| keys = list(pd.date_range(start=startDate, end=endDate, freq="1d"))[:-1] |
|
|
| def process_time(key): |
| try: |
| time_str = key.strftime('%Y%m%d%H') |
| surface_datasets = [xr.open_dataset(os.path.join(nc_path, "surface", var, f'{surface_prefix[i]}{time_str[:8]}.nc')) for i, var in enumerate(surface_variables[:-1])] |
| upper_datasets = [xr.open_dataset(os.path.join(nc_path, "upper", var, f'{upper_prefix[i]}{time_str[:8]}.nc')).sel(level=levels) for i, var in enumerate(upper_variables)] |
| for _ in range(24//horizon): |
| |
| time_str = key.strftime('%Y%m%d%H') |
| output_file = os.path.join(output_dir, time_str + '.npy') |
| if os.path.exists(output_file): |
| key += timedelta(hours=horizon) |
| continue |
| input_surface_variables = [] |
| for i, var in enumerate(surface_variables): |
| if var == "tp": |
| data = np.load(os.path.join(nc_path, "tp_6hr", f"{time_str[:10]}.npy")).astype(np.float32)[126:366,296:536] |
| input_surface_variables.append(data[np.newaxis, ...]) |
| else: |
| ds = surface_datasets[i] |
| data = ds[var].sel(time=key).values.astype(np.float32)[126:366,296:536] |
| input_surface_variables.append(data[np.newaxis, ...]) |
| |
| surface_data = np.stack(input_surface_variables, axis=0) |
|
|
| |
| input_upper_variables = [] |
| for i, var in enumerate(upper_variables): |
| ds = upper_datasets[i] |
| data = ds[var].sel(time=key).values.astype(np.float32)[:, 126:366,296:536] |
| input_upper_variables.append(data) |
| |
| upper_data = np.stack(input_upper_variables, axis=0) |
|
|
| all_data = np.concatenate((surface_data, upper_data), 1) |
| np.save(output_file, all_data) |
| key += timedelta(hours=horizon) |
|
|
| except Exception as e: |
| print(f"处理时间 {key} 时出错: {e}") |
|
|
| if __name__ == '__main__': |
| num_processes = 111 |
| with Pool(num_processes) as pool: |
| pool.map(process_time, keys) |
|
|
|
|
|
|