# Re-defining the integrated class first import os import re import zipfile import numpy as np import xarray as xr from typing import List, Tuple import shutil import tempfile # Added for safe temp directory usage class NAMEDataProcessor: def __init__(self, output_root: str = None): if output_root is None: output_root = os.path.join(tempfile.gettempdir(), "name_outputs") self.output_root = output_root self.output_3d = os.path.join(self.output_root, "3D") self.output_horizontal = os.path.join(self.output_root, "horizontal") os.makedirs(self.output_3d, exist_ok=True) os.makedirs(self.output_horizontal, exist_ok=True) self.output_root = output_root self.output_3d = os.path.join(self.output_root, "3D") self.output_horizontal = os.path.join(self.output_root, "horizontal") os.makedirs(self.output_3d, exist_ok=True) os.makedirs(self.output_horizontal, exist_ok=True) def _sanitize_key(self, key: str) -> str: key = re.sub(r'\W+', '_', key) if not key[0].isalpha(): key = f"attr_{key}" return key def _parse_metadata(self, lines: List[str]) -> dict: metadata = {} for line in lines: if ":" in line: key, value = line.split(":", 1) clean_key = self._sanitize_key(key.strip().lower()) metadata[clean_key] = value.strip() try: metadata.update({ "x_origin": float(metadata["x_grid_origin"]), "y_origin": float(metadata["y_grid_origin"]), "x_size": int(metadata["x_grid_size"]), "y_size": int(metadata["y_grid_size"]), "x_res": float(metadata["x_grid_resolution"]), "y_res": float(metadata["y_grid_resolution"]), }) except KeyError as e: raise ValueError(f"Missing required metadata field: {e}") except ValueError as e: raise ValueError(f"Invalid value in metadata: {e}") if metadata["x_res"] == 0 or metadata["y_res"] == 0: raise ZeroDivisionError("Grid resolution cannot be zero.") return metadata def _get_data_lines(self, lines: List[str]) -> List[str]: idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:") return lines[idx + 1:] def _is_horizontal_file(self, filename: str) -> bool: return "HorizontalField" in filename def _convert_horizontal(self, filepath: str, output_filename: str) -> str: with open(filepath, 'r') as f: lines = f.readlines() meta = self._parse_metadata(lines) data_lines = self._get_data_lines(lines) lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6) lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6) air_conc = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) dry_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) wet_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) for line in data_lines: parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()] if len(parts) >= 7 and parts[0].isdigit() and parts[1].isdigit(): try: x = int(parts[0]) - 1 y = int(parts[1]) - 1 air_val = float(parts[4]) dry_val = float(parts[5]) wet_val = float(parts[6]) if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]: air_conc[y, x] = air_val dry_depo[y, x] = dry_val wet_depo[y, x] = wet_val except Exception: continue ds = xr.Dataset( { "air_concentration": (['latitude', 'longitude'], air_conc), "dry_deposition_rate": (['latitude', 'longitude'], dry_depo), "wet_deposition_rate": (['latitude', 'longitude'], wet_depo) }, coords={ "latitude": lats, "longitude": lons }, attrs={ "title": "Volcanic Ash Horizontal Output (Multiple Fields)", "source": "NAME model output processed to NetCDF (horizontal multi-field)", **{k: str(v) for k, v in meta.items()} } ) ds["air_concentration"].attrs.update({ "units": "g/m^3", "long_name": "Boundary Layer Average Air Concentration" }) ds["dry_deposition_rate"].attrs.update({ "units": "g/m^2/s", "long_name": "Dry Deposition Rate" }) ds["wet_deposition_rate"].attrs.update({ "units": "g/m^2/s", "long_name": "Wet Deposition Rate" }) ds["latitude"].attrs["units"] = "degrees_north" ds["longitude"].attrs["units"] = "degrees_east" out_path = os.path.join(self.output_horizontal, output_filename) ds.to_netcdf(out_path, engine="netcdf4") return out_path def _convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str: first_file_path = group[0][1] with open(first_file_path, 'r') as f: lines = f.readlines() meta = self._parse_metadata(lines) lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6) lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6) z_levels = [] z_coords = [] for z_idx, filepath in group: with open(filepath, 'r') as f: lines = f.readlines() data_lines = self._get_data_lines(lines) grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) for line in data_lines: parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()] if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit(): try: x = int(parts[0]) - 1 y = int(parts[1]) - 1 val = float(parts[4]) if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]: grid[y, x] = val except Exception: continue z_levels.append(grid) z_coords.append(z_idx) z_cube = np.stack(z_levels, axis=0) ds = xr.Dataset( { "ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube) }, coords={ "altitude": np.array(z_coords, dtype=np.float32), "latitude": lats, "longitude": lons }, attrs={ "title": "Volcanic Ash Concentration (3D)", "source": "NAME model output processed to NetCDF (3D fields)", **{k: str(v) for k, v in meta.items()} } ) out_path = os.path.join(self.output_3d, output_filename) # 🔥 Check if file exists, delete it first # if os.path.exists(out_path): # os.remove(out_path) # 🔥 Save NetCDF safely using netCDF4 ds.to_netcdf(out_path, engine="netcdf4") return out_path def batch_process_zip(self, zip_path: str) -> List[str]: extract_dir = os.path.join(tempfile.gettempdir(), "unzipped_name_extract") os.makedirs(extract_dir, exist_ok=True) ### # Function to empty folder contents def empty_folder(folder_path): import os import glob files = glob.glob(os.path.join(folder_path, '*')) for f in files: try: os.remove(f) except IsADirectoryError: shutil.rmtree(f) # 🛠 Clear cached open files and garbage collect before deleting # 🔥 Empty previous outputs, do not delete folders if os.path.exists(self.output_3d): empty_folder(self.output_3d) else: os.makedirs(self.output_3d, exist_ok=True) # if os.path.exists(self.output_horizontal): # empty_folder(self.output_horizontal) # else: # os.makedirs(self.output_horizontal, exist_ok=True) # if os.path.exists(extract_dir): # shutil.rmtree(extract_dir) # os.makedirs(extract_dir, exist_ok=True) ##### with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) txt_files = [] for root, _, files in os.walk(extract_dir): for file in files: if file.endswith(".txt"): txt_files.append(os.path.join(root, file)) horizontal_files = [] grouped_3d = {} pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$") for f in txt_files: if self._is_horizontal_file(f): horizontal_files.append(f) else: match = pattern.search(f) if match: t = int(match.group(1)) z = int(match.group(2)) grouped_3d.setdefault(t, []).append((z, f)) nc_files = [] # Process horizontal for f in sorted(horizontal_files): base_name = os.path.splitext(os.path.basename(f))[0] out_nc = self._convert_horizontal(f, f"{base_name}.nc") nc_files.append(out_nc) # Process 3D for t_key in sorted(grouped_3d): group = sorted(grouped_3d[t_key]) out_nc = self._convert_3d_group(group, f"T{t_key}.nc") nc_files.append(out_nc) return nc_files