Spaces:
Configuration error
Configuration error
| # Re-defining the integrated class first | |
| import os | |
| import re | |
| import zipfile | |
| import numpy as np | |
| import xarray as xr | |
| from typing import List, Tuple | |
| import shutil | |
| import tempfile # Added for safe temp directory usage | |
| class NAMEDataProcessor: | |
| def __init__(self, output_root: str = None): | |
| if output_root is None: | |
| output_root = os.path.join(tempfile.gettempdir(), "name_outputs") | |
| self.output_root = output_root | |
| self.output_3d = os.path.join(self.output_root, "3D") | |
| self.output_horizontal = os.path.join(self.output_root, "horizontal") | |
| os.makedirs(self.output_3d, exist_ok=True) | |
| os.makedirs(self.output_horizontal, exist_ok=True) | |
| self.output_root = output_root | |
| self.output_3d = os.path.join(self.output_root, "3D") | |
| self.output_horizontal = os.path.join(self.output_root, "horizontal") | |
| os.makedirs(self.output_3d, exist_ok=True) | |
| os.makedirs(self.output_horizontal, exist_ok=True) | |
| def _sanitize_key(self, key: str) -> str: | |
| key = re.sub(r'\W+', '_', key) | |
| if not key[0].isalpha(): | |
| key = f"attr_{key}" | |
| return key | |
| def _parse_metadata(self, lines: List[str]) -> dict: | |
| metadata = {} | |
| for line in lines: | |
| if ":" in line: | |
| key, value = line.split(":", 1) | |
| clean_key = self._sanitize_key(key.strip().lower()) | |
| metadata[clean_key] = value.strip() | |
| try: | |
| metadata.update({ | |
| "x_origin": float(metadata["x_grid_origin"]), | |
| "y_origin": float(metadata["y_grid_origin"]), | |
| "x_size": int(metadata["x_grid_size"]), | |
| "y_size": int(metadata["y_grid_size"]), | |
| "x_res": float(metadata["x_grid_resolution"]), | |
| "y_res": float(metadata["y_grid_resolution"]), | |
| }) | |
| except KeyError as e: | |
| raise ValueError(f"Missing required metadata field: {e}") | |
| except ValueError as e: | |
| raise ValueError(f"Invalid value in metadata: {e}") | |
| if metadata["x_res"] == 0 or metadata["y_res"] == 0: | |
| raise ZeroDivisionError("Grid resolution cannot be zero.") | |
| return metadata | |
| def _get_data_lines(self, lines: List[str]) -> List[str]: | |
| idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:") | |
| return lines[idx + 1:] | |
| def _is_horizontal_file(self, filename: str) -> bool: | |
| return "HorizontalField" in filename | |
| def _convert_horizontal(self, filepath: str, output_filename: str) -> str: | |
| with open(filepath, 'r') as f: | |
| lines = f.readlines() | |
| meta = self._parse_metadata(lines) | |
| data_lines = self._get_data_lines(lines) | |
| lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6) | |
| lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6) | |
| air_conc = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) | |
| dry_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) | |
| wet_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) | |
| for line in data_lines: | |
| parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()] | |
| if len(parts) >= 7 and parts[0].isdigit() and parts[1].isdigit(): | |
| try: | |
| x = int(parts[0]) - 1 | |
| y = int(parts[1]) - 1 | |
| air_val = float(parts[4]) | |
| dry_val = float(parts[5]) | |
| wet_val = float(parts[6]) | |
| if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]: | |
| air_conc[y, x] = air_val | |
| dry_depo[y, x] = dry_val | |
| wet_depo[y, x] = wet_val | |
| except Exception: | |
| continue | |
| ds = xr.Dataset( | |
| { | |
| "air_concentration": (['latitude', 'longitude'], air_conc), | |
| "dry_deposition_rate": (['latitude', 'longitude'], dry_depo), | |
| "wet_deposition_rate": (['latitude', 'longitude'], wet_depo) | |
| }, | |
| coords={ | |
| "latitude": lats, | |
| "longitude": lons | |
| }, | |
| attrs={ | |
| "title": "Volcanic Ash Horizontal Output (Multiple Fields)", | |
| "source": "NAME model output processed to NetCDF (horizontal multi-field)", | |
| **{k: str(v) for k, v in meta.items()} | |
| } | |
| ) | |
| ds["air_concentration"].attrs.update({ | |
| "units": "g/m^3", | |
| "long_name": "Boundary Layer Average Air Concentration" | |
| }) | |
| ds["dry_deposition_rate"].attrs.update({ | |
| "units": "g/m^2/s", | |
| "long_name": "Dry Deposition Rate" | |
| }) | |
| ds["wet_deposition_rate"].attrs.update({ | |
| "units": "g/m^2/s", | |
| "long_name": "Wet Deposition Rate" | |
| }) | |
| ds["latitude"].attrs["units"] = "degrees_north" | |
| ds["longitude"].attrs["units"] = "degrees_east" | |
| out_path = os.path.join(self.output_horizontal, output_filename) | |
| ds.to_netcdf(out_path, engine="netcdf4") | |
| return out_path | |
| def _convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str: | |
| first_file_path = group[0][1] | |
| with open(first_file_path, 'r') as f: | |
| lines = f.readlines() | |
| meta = self._parse_metadata(lines) | |
| lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6) | |
| lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6) | |
| z_levels = [] | |
| z_coords = [] | |
| for z_idx, filepath in group: | |
| with open(filepath, 'r') as f: | |
| lines = f.readlines() | |
| data_lines = self._get_data_lines(lines) | |
| grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32) | |
| for line in data_lines: | |
| parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()] | |
| if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit(): | |
| try: | |
| x = int(parts[0]) - 1 | |
| y = int(parts[1]) - 1 | |
| val = float(parts[4]) | |
| if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]: | |
| grid[y, x] = val | |
| except Exception: | |
| continue | |
| z_levels.append(grid) | |
| z_coords.append(z_idx) | |
| z_cube = np.stack(z_levels, axis=0) | |
| ds = xr.Dataset( | |
| { | |
| "ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube) | |
| }, | |
| coords={ | |
| "altitude": np.array(z_coords, dtype=np.float32), | |
| "latitude": lats, | |
| "longitude": lons | |
| }, | |
| attrs={ | |
| "title": "Volcanic Ash Concentration (3D)", | |
| "source": "NAME model output processed to NetCDF (3D fields)", | |
| **{k: str(v) for k, v in meta.items()} | |
| } | |
| ) | |
| out_path = os.path.join(self.output_3d, output_filename) | |
| # 🔥 Check if file exists, delete it first | |
| # if os.path.exists(out_path): | |
| # os.remove(out_path) | |
| # 🔥 Save NetCDF safely using netCDF4 | |
| ds.to_netcdf(out_path, engine="netcdf4") | |
| return out_path | |
| def batch_process_zip(self, zip_path: str) -> List[str]: | |
| extract_dir = os.path.join(tempfile.gettempdir(), "unzipped_name_extract") | |
| os.makedirs(extract_dir, exist_ok=True) | |
| ### | |
| # Function to empty folder contents | |
| def empty_folder(folder_path): | |
| import os | |
| import glob | |
| files = glob.glob(os.path.join(folder_path, '*')) | |
| for f in files: | |
| try: | |
| os.remove(f) | |
| except IsADirectoryError: | |
| shutil.rmtree(f) | |
| # 🛠 Clear cached open files and garbage collect before deleting | |
| # 🔥 Empty previous outputs, do not delete folders | |
| if os.path.exists(self.output_3d): | |
| empty_folder(self.output_3d) | |
| else: | |
| os.makedirs(self.output_3d, exist_ok=True) | |
| # if os.path.exists(self.output_horizontal): | |
| # empty_folder(self.output_horizontal) | |
| # else: | |
| # os.makedirs(self.output_horizontal, exist_ok=True) | |
| # if os.path.exists(extract_dir): | |
| # shutil.rmtree(extract_dir) | |
| # os.makedirs(extract_dir, exist_ok=True) | |
| ##### | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| txt_files = [] | |
| for root, _, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.endswith(".txt"): | |
| txt_files.append(os.path.join(root, file)) | |
| horizontal_files = [] | |
| grouped_3d = {} | |
| pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$") | |
| for f in txt_files: | |
| if self._is_horizontal_file(f): | |
| horizontal_files.append(f) | |
| else: | |
| match = pattern.search(f) | |
| if match: | |
| t = int(match.group(1)) | |
| z = int(match.group(2)) | |
| grouped_3d.setdefault(t, []).append((z, f)) | |
| nc_files = [] | |
| # Process horizontal | |
| for f in sorted(horizontal_files): | |
| base_name = os.path.splitext(os.path.basename(f))[0] | |
| out_nc = self._convert_horizontal(f, f"{base_name}.nc") | |
| nc_files.append(out_nc) | |
| # Process 3D | |
| for t_key in sorted(grouped_3d): | |
| group = sorted(grouped_3d[t_key]) | |
| out_nc = self._convert_3d_group(group, f"T{t_key}.nc") | |
| nc_files.append(out_nc) | |
| return nc_files |