File size: 4,673 Bytes
433dab5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""I/O operations for opening and managing datasets."""

import os
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import xarray as xr
import fsspec
import warnings

warnings.filterwarnings("ignore", category=UserWarning)


@dataclass
class VariableSpec:
    """Variable specification with metadata."""
    name: str
    shape: tuple
    dims: tuple
    dtype: str
    units: str
    long_name: str
    attrs: Dict[str, Any]


class DatasetHandle:
    """Handle for opened datasets."""
    
    def __init__(self, dataset: xr.Dataset, uri: str, engine: str):
        self.dataset = dataset
        self.uri = uri
        self.engine = engine
        
    def close(self):
        """Close the dataset."""
        if hasattr(self.dataset, 'close'):
            self.dataset.close()


def detect_engine(uri: str) -> str:
    """Auto-detect the appropriate engine for a given URI/file."""
    if uri.lower().endswith('.zarr') or 'zarr' in uri.lower():
        return 'zarr'
    elif uri.lower().endswith('.grib') or uri.lower().endswith('.grb'):
        return 'cfgrib'
    elif any(ext in uri.lower() for ext in ['.nc', '.netcdf', '.hdf', '.h5']):
        # Try h5netcdf first, fallback to netcdf4
        try:
            import h5netcdf
            return 'h5netcdf'
        except ImportError:
            return 'netcdf4'
    else:
        # Default fallback
        try:
            import h5netcdf
            return 'h5netcdf'
        except ImportError:
            return 'netcdf4'


def open_any(uri: str, engine: Optional[str] = None, chunks: str = "auto") -> DatasetHandle:
    """
    Open a dataset from various sources (local, HTTP, S3, etc.).
    
    Args:
        uri: Path or URL to dataset
        engine: Engine to use ('h5netcdf', 'netcdf4', 'cfgrib', 'zarr')
        chunks: Chunking strategy for dask
        
    Returns:
        DatasetHandle: Handle to the opened dataset
    """
    if engine is None:
        engine = detect_engine(uri)
    
    try:
        if engine == 'zarr':
            # For Zarr stores
            if uri.startswith('s3://'):
                import s3fs
                fs = s3fs.S3FileSystem(anon=True)
                store = s3fs.S3Map(root=uri, s3=fs, check=False)
                ds = xr.open_zarr(store, chunks=chunks)
            else:
                ds = xr.open_zarr(uri, chunks=chunks)
        elif engine == 'cfgrib':
            # For GRIB files
            ds = xr.open_dataset(uri, engine='cfgrib', chunks=chunks)
        else:
            # For netCDF/HDF files
            if uri.startswith(('http://', 'https://')):
                # Remote files (including OPeNDAP)
                ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
            elif uri.startswith('s3://'):
                # S3 files
                import s3fs
                fs = s3fs.S3FileSystem(anon=True)
                with fs.open(uri, 'rb') as f:
                    ds = xr.open_dataset(f, engine=engine, chunks=chunks)
            else:
                # Local files
                ds = xr.open_dataset(uri, engine=engine, chunks=chunks)
                
        return DatasetHandle(ds, uri, engine)
        
    except Exception as e:
        raise RuntimeError(f"Failed to open {uri} with engine {engine}: {str(e)}")


def list_variables(handle: DatasetHandle) -> List[VariableSpec]:
    """
    List all data variables in the dataset with their specifications.
    
    Args:
        handle: Dataset handle
        
    Returns:
        List of VariableSpec objects
    """
    variables = []
    
    for var_name, var in handle.dataset.data_vars.items():
        # Skip coordinate variables and bounds
        if var_name.endswith('_bounds') or var_name in handle.dataset.coords:
            continue
            
        attrs = dict(var.attrs)
        
        spec = VariableSpec(
            name=var_name,
            shape=var.shape,
            dims=var.dims,
            dtype=str(var.dtype),
            units=attrs.get('units', ''),
            long_name=attrs.get('long_name', var_name),
            attrs=attrs
        )
        variables.append(spec)
    
    return variables


def get_dataarray(handle: DatasetHandle, var: str) -> xr.DataArray:
    """
    Get a specific data array from the dataset.
    
    Args:
        handle: Dataset handle
        var: Variable name
        
    Returns:
        xarray DataArray
    """
    if var not in handle.dataset.data_vars:
        raise ValueError(f"Variable '{var}' not found in dataset")
    
    return handle.dataset[var]


def close(handle: DatasetHandle):
    """Close a dataset handle."""
    handle.close()