Nipun's picture
🌍 TensorView v1.0 - Complete NetCDF/HDF/GRIB viewer
433dab5
"""I/O operations for opening and managing datasets."""
import os
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import xarray as xr
import fsspec
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
@dataclass
class VariableSpec:
"""Variable specification with metadata."""
name: str
shape: tuple
dims: tuple
dtype: str
units: str
long_name: str
attrs: Dict[str, Any]
class DatasetHandle:
"""Handle for opened datasets."""
def __init__(self, dataset: xr.Dataset, uri: str, engine: str):
self.dataset = dataset
self.uri = uri
self.engine = engine
def close(self):
"""Close the dataset."""
if hasattr(self.dataset, 'close'):
self.dataset.close()
def detect_engine(uri: str) -> str:
"""Auto-detect the appropriate engine for a given URI/file."""
if uri.lower().endswith('.zarr') or 'zarr' in uri.lower():
return 'zarr'
elif uri.lower().endswith('.grib') or uri.lower().endswith('.grb'):
return 'cfgrib'
elif any(ext in uri.lower() for ext in ['.nc', '.netcdf', '.hdf', '.h5']):
# Try h5netcdf first, fallback to netcdf4
try:
import h5netcdf
return 'h5netcdf'
except ImportError:
return 'netcdf4'
else:
# Default fallback
try:
import h5netcdf
return 'h5netcdf'
except ImportError:
return 'netcdf4'
def open_any(uri: str, engine: Optional[str] = None, chunks: str = "auto") -> DatasetHandle:
"""
Open a dataset from various sources (local, HTTP, S3, etc.).
Args:
uri: Path or URL to dataset
engine: Engine to use ('h5netcdf', 'netcdf4', 'cfgrib', 'zarr')
chunks: Chunking strategy for dask
Returns:
DatasetHandle: Handle to the opened dataset
"""
if engine is None:
engine = detect_engine(uri)
try:
if engine == 'zarr':
# For Zarr stores
if uri.startswith('s3://'):
import s3fs
fs = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root=uri, s3=fs, check=False)
ds = xr.open_zarr(store, chunks=chunks)
else:
ds = xr.open_zarr(uri, chunks=chunks)
elif engine == 'cfgrib':
# For GRIB files
ds = xr.open_dataset(uri, engine='cfgrib', chunks=chunks)
else:
# For netCDF/HDF files
if uri.startswith(('http://', 'https://')):
# Remote files (including OPeNDAP)
ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
elif uri.startswith('s3://'):
# S3 files
import s3fs
fs = s3fs.S3FileSystem(anon=True)
with fs.open(uri, 'rb') as f:
ds = xr.open_dataset(f, engine=engine, chunks=chunks)
else:
# Local files
ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
return DatasetHandle(ds, uri, engine)
except Exception as e:
raise RuntimeError(f"Failed to open {uri} with engine {engine}: {str(e)}")
def list_variables(handle: DatasetHandle) -> List[VariableSpec]:
"""
List all data variables in the dataset with their specifications.
Args:
handle: Dataset handle
Returns:
List of VariableSpec objects
"""
variables = []
for var_name, var in handle.dataset.data_vars.items():
# Skip coordinate variables and bounds
if var_name.endswith('_bounds') or var_name in handle.dataset.coords:
continue
attrs = dict(var.attrs)
spec = VariableSpec(
name=var_name,
shape=var.shape,
dims=var.dims,
dtype=str(var.dtype),
units=attrs.get('units', ''),
long_name=attrs.get('long_name', var_name),
attrs=attrs
)
variables.append(spec)
return variables
def get_dataarray(handle: DatasetHandle, var: str) -> xr.DataArray:
"""
Get a specific data array from the dataset.
Args:
handle: Dataset handle
var: Variable name
Returns:
xarray DataArray
"""
if var not in handle.dataset.data_vars:
raise ValueError(f"Variable '{var}' not found in dataset")
return handle.dataset[var]
def close(handle: DatasetHandle):
"""Close a dataset handle."""
handle.close()