jp / ash_animator /converter.py
Mahmudm's picture
Upload 162 files
f0bfd2d verified
# # Full updated and corrected version of NAMEDataConverter with sanitized metadata keys
# import os
# import re
# import zipfile
# import shutil
# import numpy as np
# import xarray as xr
# import matplotlib.pyplot as plt
# import matplotlib.animation as animation
# from typing import List, Tuple
# class NAMEDataConverter:
# def __init__(self, output_dir: str):
# self.output_dir = output_dir
# os.makedirs(self.output_dir, exist_ok=True)
# def _sanitize_key(self, key: str) -> str:
# # Replace non-alphanumeric characters with underscores, and ensure it starts with a letter
# key = re.sub(r'\W+', '_', key)
# if not key[0].isalpha():
# key = f"attr_{key}"
# return key
# def _parse_metadata(self, lines: List[str]) -> dict:
# metadata = {}
# for line in lines:
# if ":" in line:
# key, value = line.split(":", 1)
# clean_key = self._sanitize_key(key.strip().lower())
# metadata[clean_key] = value.strip()
# try:
# metadata.update({
# "x_origin": float(metadata["x_grid_origin"]),
# "y_origin": float(metadata["y_grid_origin"]),
# "x_size": int(metadata["x_grid_size"]),
# "y_size": int(metadata["y_grid_size"]),
# "x_res": float(metadata["x_grid_resolution"]),
# "y_res": float(metadata["y_grid_resolution"]),
# "prelim_cols": int(metadata["number_of_preliminary_cols"]),
# "n_fields": int(metadata["number_of_field_cols"]),
# })
# except KeyError as e:
# raise ValueError(f"Missing required metadata field: {e}")
# except ValueError as e:
# raise ValueError(f"Invalid value in metadata: {e}")
# if metadata["x_res"] == 0 or metadata["y_res"] == 0:
# raise ZeroDivisionError("Grid resolution cannot be zero.")
# return metadata
# def _get_data_lines(self, lines: List[str]) -> List[str]:
# idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:")
# return lines[idx + 1:]
# def convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str:
# first_file_path = group[0][1]
# with open(first_file_path, 'r') as f:
# lines = f.readlines()
# meta = self._parse_metadata(lines)
# lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
# lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
# z_levels = []
# z_coords = []
# for z_idx, filepath in group:
# with open(filepath, 'r') as f:
# lines = f.readlines()
# data_lines = self._get_data_lines(lines)
# grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
# for line in data_lines:
# parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
# if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit():
# try:
# x = int(parts[0]) - 1
# y = int(parts[1]) - 1
# val = float(parts[4])
# if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
# grid[y, x] = val
# except Exception:
# continue
# z_levels.append(grid)
# z_coords.append(z_idx)
# z_cube = np.stack(z_levels, axis=0)
# ds = xr.Dataset(
# {
# "ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube)
# },
# coords={
# "altitude": np.array(z_coords, dtype=np.float32),
# "latitude": lats,
# "longitude": lons
# },
# attrs={
# "title": "Volcanic Ash Concentration",
# "source": "NAME model output processed to NetCDF",
# **{k: str(v) for k, v in meta.items()} # Ensure all attrs are strings
# }
# )
# ds["ash_concentration"].attrs.update({
# "units": "g/m^3",
# "long_name": "Volcanic ash concentration"
# })
# ds["altitude"].attrs["units"] = "kilometers above sea level"
# ds["latitude"].attrs["units"] = "degrees_north"
# ds["longitude"].attrs["units"] = "degrees_east"
# out_path = os.path.join(self.output_dir, output_filename)
# ds.to_netcdf(out_path)
# return out_path
# def batch_process_zip(self, zip_path: str) -> List[str]:
# extract_dir = os.path.join(self.output_dir, "unzipped")
# os.makedirs(extract_dir, exist_ok=True)
# with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# zip_ref.extractall(extract_dir)
# txt_files = []
# for root, _, files in os.walk(extract_dir):
# for file in files:
# if file.endswith(".txt"):
# txt_files.append(os.path.join(root, file))
# pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$")
# grouped = {}
# for f in txt_files:
# match = pattern.search(f)
# if match:
# t = int(match.group(1))
# z = int(match.group(2))
# grouped.setdefault(t, []).append((z, f))
# nc_files = []
# for t_key in sorted(grouped):
# group = sorted(grouped[t_key])
# out_nc = self.convert_3d_group(group, f"T{t_key}.nc")
# nc_files.append(out_nc)
# return nc_files
# Re-defining the integrated class first
import os
import re
import zipfile
import numpy as np
import xarray as xr
from typing import List, Tuple
import shutil
class NAMEDataProcessor:
def __init__(self, output_root: str):
self.output_root = output_root
self.output_3d = os.path.join(self.output_root, "3D")
self.output_horizontal = os.path.join(self.output_root, "horizontal")
os.makedirs(self.output_3d, exist_ok=True)
os.makedirs(self.output_horizontal, exist_ok=True)
def _sanitize_key(self, key: str) -> str:
key = re.sub(r'\W+', '_', key)
if not key[0].isalpha():
key = f"attr_{key}"
return key
def _parse_metadata(self, lines: List[str]) -> dict:
metadata = {}
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
clean_key = self._sanitize_key(key.strip().lower())
metadata[clean_key] = value.strip()
try:
metadata.update({
"x_origin": float(metadata["x_grid_origin"]),
"y_origin": float(metadata["y_grid_origin"]),
"x_size": int(metadata["x_grid_size"]),
"y_size": int(metadata["y_grid_size"]),
"x_res": float(metadata["x_grid_resolution"]),
"y_res": float(metadata["y_grid_resolution"]),
})
except KeyError as e:
raise ValueError(f"Missing required metadata field: {e}")
except ValueError as e:
raise ValueError(f"Invalid value in metadata: {e}")
if metadata["x_res"] == 0 or metadata["y_res"] == 0:
raise ZeroDivisionError("Grid resolution cannot be zero.")
return metadata
def _get_data_lines(self, lines: List[str]) -> List[str]:
idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:")
return lines[idx + 1:]
def _is_horizontal_file(self, filename: str) -> bool:
return "HorizontalField" in filename
def _convert_horizontal(self, filepath: str, output_filename: str) -> str:
with open(filepath, 'r') as f:
lines = f.readlines()
meta = self._parse_metadata(lines)
data_lines = self._get_data_lines(lines)
lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
air_conc = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
dry_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
wet_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
for line in data_lines:
parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
if len(parts) >= 7 and parts[0].isdigit() and parts[1].isdigit():
try:
x = int(parts[0]) - 1
y = int(parts[1]) - 1
air_val = float(parts[4])
dry_val = float(parts[5])
wet_val = float(parts[6])
if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
air_conc[y, x] = air_val
dry_depo[y, x] = dry_val
wet_depo[y, x] = wet_val
except Exception:
continue
ds = xr.Dataset(
{
"air_concentration": (['latitude', 'longitude'], air_conc),
"dry_deposition_rate": (['latitude', 'longitude'], dry_depo),
"wet_deposition_rate": (['latitude', 'longitude'], wet_depo)
},
coords={
"latitude": lats,
"longitude": lons
},
attrs={
"title": "Volcanic Ash Horizontal Output (Multiple Fields)",
"source": "NAME model output processed to NetCDF (horizontal multi-field)",
**{k: str(v) for k, v in meta.items()}
}
)
ds["air_concentration"].attrs.update({
"units": "g/m^3",
"long_name": "Boundary Layer Average Air Concentration"
})
ds["dry_deposition_rate"].attrs.update({
"units": "g/m^2/s",
"long_name": "Dry Deposition Rate"
})
ds["wet_deposition_rate"].attrs.update({
"units": "g/m^2/s",
"long_name": "Wet Deposition Rate"
})
ds["latitude"].attrs["units"] = "degrees_north"
ds["longitude"].attrs["units"] = "degrees_east"
out_path = os.path.join(self.output_horizontal, output_filename)
ds.to_netcdf(out_path, engine="netcdf4")
return out_path
def _convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str:
first_file_path = group[0][1]
with open(first_file_path, 'r') as f:
lines = f.readlines()
meta = self._parse_metadata(lines)
lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
z_levels = []
z_coords = []
for z_idx, filepath in group:
with open(filepath, 'r') as f:
lines = f.readlines()
data_lines = self._get_data_lines(lines)
grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
for line in data_lines:
parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit():
try:
x = int(parts[0]) - 1
y = int(parts[1]) - 1
val = float(parts[4])
if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
grid[y, x] = val
except Exception:
continue
z_levels.append(grid)
z_coords.append(z_idx)
z_cube = np.stack(z_levels, axis=0)
ds = xr.Dataset(
{
"ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube)
},
coords={
"altitude": np.array(z_coords, dtype=np.float32),
"latitude": lats,
"longitude": lons
},
attrs={
"title": "Volcanic Ash Concentration (3D)",
"source": "NAME model output processed to NetCDF (3D fields)",
**{k: str(v) for k, v in meta.items()}
}
)
out_path = os.path.join(self.output_3d, output_filename)
# ๐Ÿ”ฅ Check if file exists, delete it first
# if os.path.exists(out_path):
# os.remove(out_path)
# ๐Ÿ”ฅ Save NetCDF safely using netCDF4
ds.to_netcdf(out_path, engine="netcdf4")
return out_path
def batch_process_zip(self, zip_path: str) -> List[str]:
extract_dir = os.path.abspath("unzipped")
os.makedirs(extract_dir, exist_ok=True)
###
# Function to empty folder contents
def empty_folder(folder_path):
import os
import glob
files = glob.glob(os.path.join(folder_path, '*'))
for f in files:
try:
os.remove(f)
except IsADirectoryError:
shutil.rmtree(f)
# ๐Ÿ›  Clear cached open files and garbage collect before deleting
# ๐Ÿ”ฅ Empty previous outputs, do not delete folders
if os.path.exists(self.output_3d):
empty_folder(self.output_3d)
else:
os.makedirs(self.output_3d, exist_ok=True)
# if os.path.exists(self.output_horizontal):
# empty_folder(self.output_horizontal)
# else:
# os.makedirs(self.output_horizontal, exist_ok=True)
# if os.path.exists(extract_dir):
# shutil.rmtree(extract_dir)
# os.makedirs(extract_dir, exist_ok=True)
#####
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
txt_files = []
for root, _, files in os.walk(extract_dir):
for file in files:
if file.endswith(".txt"):
txt_files.append(os.path.join(root, file))
horizontal_files = []
grouped_3d = {}
pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$")
for f in txt_files:
if self._is_horizontal_file(f):
horizontal_files.append(f)
else:
match = pattern.search(f)
if match:
t = int(match.group(1))
z = int(match.group(2))
grouped_3d.setdefault(t, []).append((z, f))
nc_files = []
# Process horizontal
for f in sorted(horizontal_files):
base_name = os.path.splitext(os.path.basename(f))[0]
out_nc = self._convert_horizontal(f, f"{base_name}.nc")
nc_files.append(out_nc)
# Process 3D
for t_key in sorted(grouped_3d):
group = sorted(grouped_3d[t_key])
out_nc = self._convert_3d_group(group, f"T{t_key}.nc")
nc_files.append(out_nc)
return nc_files