File size: 4,673 Bytes
433dab5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
"""I/O operations for opening and managing datasets."""
import os
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import xarray as xr
import fsspec
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
@dataclass
class VariableSpec:
"""Variable specification with metadata."""
name: str
shape: tuple
dims: tuple
dtype: str
units: str
long_name: str
attrs: Dict[str, Any]
class DatasetHandle:
"""Handle for opened datasets."""
def __init__(self, dataset: xr.Dataset, uri: str, engine: str):
self.dataset = dataset
self.uri = uri
self.engine = engine
def close(self):
"""Close the dataset."""
if hasattr(self.dataset, 'close'):
self.dataset.close()
def detect_engine(uri: str) -> str:
"""Auto-detect the appropriate engine for a given URI/file."""
if uri.lower().endswith('.zarr') or 'zarr' in uri.lower():
return 'zarr'
elif uri.lower().endswith('.grib') or uri.lower().endswith('.grb'):
return 'cfgrib'
elif any(ext in uri.lower() for ext in ['.nc', '.netcdf', '.hdf', '.h5']):
# Try h5netcdf first, fallback to netcdf4
try:
import h5netcdf
return 'h5netcdf'
except ImportError:
return 'netcdf4'
else:
# Default fallback
try:
import h5netcdf
return 'h5netcdf'
except ImportError:
return 'netcdf4'
def open_any(uri: str, engine: Optional[str] = None, chunks: str = "auto") -> DatasetHandle:
"""
Open a dataset from various sources (local, HTTP, S3, etc.).
Args:
uri: Path or URL to dataset
engine: Engine to use ('h5netcdf', 'netcdf4', 'cfgrib', 'zarr')
chunks: Chunking strategy for dask
Returns:
DatasetHandle: Handle to the opened dataset
"""
if engine is None:
engine = detect_engine(uri)
try:
if engine == 'zarr':
# For Zarr stores
if uri.startswith('s3://'):
import s3fs
fs = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root=uri, s3=fs, check=False)
ds = xr.open_zarr(store, chunks=chunks)
else:
ds = xr.open_zarr(uri, chunks=chunks)
elif engine == 'cfgrib':
# For GRIB files
ds = xr.open_dataset(uri, engine='cfgrib', chunks=chunks)
else:
# For netCDF/HDF files
if uri.startswith(('http://', 'https://')):
# Remote files (including OPeNDAP)
ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
elif uri.startswith('s3://'):
# S3 files
import s3fs
fs = s3fs.S3FileSystem(anon=True)
with fs.open(uri, 'rb') as f:
ds = xr.open_dataset(f, engine=engine, chunks=chunks)
else:
# Local files
ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
return DatasetHandle(ds, uri, engine)
except Exception as e:
raise RuntimeError(f"Failed to open {uri} with engine {engine}: {str(e)}")
def list_variables(handle: DatasetHandle) -> List[VariableSpec]:
"""
List all data variables in the dataset with their specifications.
Args:
handle: Dataset handle
Returns:
List of VariableSpec objects
"""
variables = []
for var_name, var in handle.dataset.data_vars.items():
# Skip coordinate variables and bounds
if var_name.endswith('_bounds') or var_name in handle.dataset.coords:
continue
attrs = dict(var.attrs)
spec = VariableSpec(
name=var_name,
shape=var.shape,
dims=var.dims,
dtype=str(var.dtype),
units=attrs.get('units', ''),
long_name=attrs.get('long_name', var_name),
attrs=attrs
)
variables.append(spec)
return variables
def get_dataarray(handle: DatasetHandle, var: str) -> xr.DataArray:
"""
Get a specific data array from the dataset.
Args:
handle: Dataset handle
var: Variable name
Returns:
xarray DataArray
"""
if var not in handle.dataset.data_vars:
raise ValueError(f"Variable '{var}' not found in dataset")
return handle.dataset[var]
def close(handle: DatasetHandle):
"""Close a dataset handle."""
handle.close() |