|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| import re
|
| import zipfile
|
| import numpy as np
|
| import xarray as xr
|
| from typing import List, Tuple
|
| import shutil
|
|
|
|
|
| class NAMEDataProcessor:
|
| def __init__(self, output_root: str):
|
| self.output_root = output_root
|
| self.output_3d = os.path.join(self.output_root, "3D")
|
| self.output_horizontal = os.path.join(self.output_root, "horizontal")
|
| os.makedirs(self.output_3d, exist_ok=True)
|
| os.makedirs(self.output_horizontal, exist_ok=True)
|
|
|
| def _sanitize_key(self, key: str) -> str:
|
| key = re.sub(r'\W+', '_', key)
|
| if not key[0].isalpha():
|
| key = f"attr_{key}"
|
| return key
|
|
|
| def _parse_metadata(self, lines: List[str]) -> dict:
|
| metadata = {}
|
| for line in lines:
|
| if ":" in line:
|
| key, value = line.split(":", 1)
|
| clean_key = self._sanitize_key(key.strip().lower())
|
| metadata[clean_key] = value.strip()
|
|
|
| try:
|
| metadata.update({
|
| "x_origin": float(metadata["x_grid_origin"]),
|
| "y_origin": float(metadata["y_grid_origin"]),
|
| "x_size": int(metadata["x_grid_size"]),
|
| "y_size": int(metadata["y_grid_size"]),
|
| "x_res": float(metadata["x_grid_resolution"]),
|
| "y_res": float(metadata["y_grid_resolution"]),
|
| })
|
| except KeyError as e:
|
| raise ValueError(f"Missing required metadata field: {e}")
|
| except ValueError as e:
|
| raise ValueError(f"Invalid value in metadata: {e}")
|
|
|
| if metadata["x_res"] == 0 or metadata["y_res"] == 0:
|
| raise ZeroDivisionError("Grid resolution cannot be zero.")
|
|
|
| return metadata
|
|
|
| def _get_data_lines(self, lines: List[str]) -> List[str]:
|
| idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:")
|
| return lines[idx + 1:]
|
|
|
| def _is_horizontal_file(self, filename: str) -> bool:
|
| return "HorizontalField" in filename
|
|
|
| def _convert_horizontal(self, filepath: str, output_filename: str) -> str:
|
| with open(filepath, 'r') as f:
|
| lines = f.readlines()
|
|
|
| meta = self._parse_metadata(lines)
|
| data_lines = self._get_data_lines(lines)
|
|
|
| lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
|
| lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
|
|
|
| air_conc = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
|
| dry_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
|
| wet_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
|
|
|
| for line in data_lines:
|
| parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
|
| if len(parts) >= 7 and parts[0].isdigit() and parts[1].isdigit():
|
| try:
|
| x = int(parts[0]) - 1
|
| y = int(parts[1]) - 1
|
| air_val = float(parts[4])
|
| dry_val = float(parts[5])
|
| wet_val = float(parts[6])
|
| if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
|
| air_conc[y, x] = air_val
|
| dry_depo[y, x] = dry_val
|
| wet_depo[y, x] = wet_val
|
| except Exception:
|
| continue
|
|
|
| ds = xr.Dataset(
|
| {
|
| "air_concentration": (['latitude', 'longitude'], air_conc),
|
| "dry_deposition_rate": (['latitude', 'longitude'], dry_depo),
|
| "wet_deposition_rate": (['latitude', 'longitude'], wet_depo)
|
| },
|
| coords={
|
| "latitude": lats,
|
| "longitude": lons
|
| },
|
| attrs={
|
| "title": "Volcanic Ash Horizontal Output (Multiple Fields)",
|
| "source": "NAME model output processed to NetCDF (horizontal multi-field)",
|
| **{k: str(v) for k, v in meta.items()}
|
| }
|
| )
|
|
|
| ds["air_concentration"].attrs.update({
|
| "units": "g/m^3",
|
| "long_name": "Boundary Layer Average Air Concentration"
|
| })
|
| ds["dry_deposition_rate"].attrs.update({
|
| "units": "g/m^2/s",
|
| "long_name": "Dry Deposition Rate"
|
| })
|
| ds["wet_deposition_rate"].attrs.update({
|
| "units": "g/m^2/s",
|
| "long_name": "Wet Deposition Rate"
|
| })
|
| ds["latitude"].attrs["units"] = "degrees_north"
|
| ds["longitude"].attrs["units"] = "degrees_east"
|
|
|
| out_path = os.path.join(self.output_horizontal, output_filename)
|
| ds.to_netcdf(out_path, engine="netcdf4")
|
|
|
| return out_path
|
|
|
|
|
| def _convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str:
|
| first_file_path = group[0][1]
|
| with open(first_file_path, 'r') as f:
|
| lines = f.readlines()
|
| meta = self._parse_metadata(lines)
|
|
|
| lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
|
| lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
|
|
|
| z_levels = []
|
| z_coords = []
|
|
|
| for z_idx, filepath in group:
|
| with open(filepath, 'r') as f:
|
| lines = f.readlines()
|
| data_lines = self._get_data_lines(lines)
|
| grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
|
|
|
| for line in data_lines:
|
| parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
|
| if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit():
|
| try:
|
| x = int(parts[0]) - 1
|
| y = int(parts[1]) - 1
|
| val = float(parts[4])
|
| if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
|
| grid[y, x] = val
|
| except Exception:
|
| continue
|
| z_levels.append(grid)
|
| z_coords.append(z_idx)
|
|
|
| z_cube = np.stack(z_levels, axis=0)
|
| ds = xr.Dataset(
|
| {
|
| "ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube)
|
| },
|
| coords={
|
| "altitude": np.array(z_coords, dtype=np.float32),
|
| "latitude": lats,
|
| "longitude": lons
|
| },
|
| attrs={
|
| "title": "Volcanic Ash Concentration (3D)",
|
| "source": "NAME model output processed to NetCDF (3D fields)",
|
| **{k: str(v) for k, v in meta.items()}
|
| }
|
| )
|
|
|
| out_path = os.path.join(self.output_3d, output_filename)
|
|
|
|
|
|
|
|
|
|
|
|
|
| ds.to_netcdf(out_path, engine="netcdf4")
|
|
|
| return out_path
|
|
|
|
|
| def batch_process_zip(self, zip_path: str) -> List[str]:
|
| extract_dir = os.path.abspath("unzipped")
|
|
|
| os.makedirs(extract_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
| def empty_folder(folder_path):
|
| import os
|
| import glob
|
| files = glob.glob(os.path.join(folder_path, '*'))
|
| for f in files:
|
| try:
|
| os.remove(f)
|
| except IsADirectoryError:
|
| shutil.rmtree(f)
|
|
|
|
|
|
|
|
|
| if os.path.exists(self.output_3d):
|
| empty_folder(self.output_3d)
|
| else:
|
| os.makedirs(self.output_3d, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
| zip_ref.extractall(extract_dir)
|
|
|
| txt_files = []
|
| for root, _, files in os.walk(extract_dir):
|
| for file in files:
|
| if file.endswith(".txt"):
|
| txt_files.append(os.path.join(root, file))
|
|
|
| horizontal_files = []
|
| grouped_3d = {}
|
|
|
| pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$")
|
|
|
| for f in txt_files:
|
| if self._is_horizontal_file(f):
|
| horizontal_files.append(f)
|
| else:
|
| match = pattern.search(f)
|
| if match:
|
| t = int(match.group(1))
|
| z = int(match.group(2))
|
| grouped_3d.setdefault(t, []).append((z, f))
|
|
|
| nc_files = []
|
|
|
|
|
| for f in sorted(horizontal_files):
|
| base_name = os.path.splitext(os.path.basename(f))[0]
|
| out_nc = self._convert_horizontal(f, f"{base_name}.nc")
|
| nc_files.append(out_nc)
|
|
|
|
|
| for t_key in sorted(grouped_3d):
|
| group = sorted(grouped_3d[t_key])
|
| out_nc = self._convert_3d_group(group, f"T{t_key}.nc")
|
| nc_files.append(out_nc)
|
|
|
| return nc_files |