Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HDF5/NetCDF MCP Server - Gradio Implementation | |
| A Model Context Protocol server for reading and analyzing HDF5 and NetCDF scientific data files. | |
| """ | |
| import gradio as gr | |
| import h5py | |
| import netCDF4 as nc | |
| import numpy as np | |
| import pandas as pd | |
| import json | |
| import traceback | |
| from typing import Dict, Any, Optional, Tuple, List | |
| from pathlib import Path | |
| import tempfile | |
| import requests | |
| import plotly.graph_objects as go | |
| # Try to import h5netcdf for better NetCDF4 compatibility | |
| try: | |
| import h5netcdf | |
| HAS_H5NETCDF = True | |
| except ImportError: | |
| HAS_H5NETCDF = False | |
| # ============================================ | |
| # UTILITY FUNCTIONS (NOT EXPOSED AS MCP TOOLS) | |
| # ============================================ | |
| def parse_slice(slice_str: str): | |
| """Parse a numpy-style slice string into a tuple of slice objects.""" | |
| if not slice_str or not slice_str.strip(): | |
| return slice(None) | |
| parts = [] | |
| for part in slice_str.split(','): | |
| part = part.strip() | |
| if ':' in part: | |
| slice_parts = part.split(':') | |
| start = int(slice_parts[0]) if slice_parts[0] else None | |
| stop = int(slice_parts[1]) if len(slice_parts) > 1 and slice_parts[1] else None | |
| step = int(slice_parts[2]) if len(slice_parts) > 2 and slice_parts[2] else None | |
| parts.append(slice(start, stop, step)) | |
| else: | |
| parts.append(int(part)) | |
| return tuple(parts) if len(parts) > 1 else parts[0] | |
| def open_file_with_fallback(file_path: str) -> Tuple[Any, str]: | |
| """ | |
| Open HDF5/NetCDF file with 3-tier fallback for maximum compatibility. | |
| Returns: | |
| (file_object, file_type) tuple where file_type is "HDF5", "NetCDF", or "NetCDF_h5netcdf" | |
| """ | |
| file_path = Path(file_path) | |
| suffix = file_path.suffix.lower() | |
| if suffix in ['.h5', '.hdf5']: | |
| # Pure HDF5 files | |
| f = h5py.File(file_path, 'r') | |
| return f, "HDF5" | |
| elif suffix in ['.nc', '.nc4', '.netcdf']: | |
| # NetCDF files - try 3-tier fallback | |
| # TIER 1: Try netCDF4 (best) | |
| try: | |
| f = nc.Dataset(file_path, 'r') | |
| return f, "NetCDF" | |
| except Exception as nc_error: | |
| # TIER 2: Try h5netcdf | |
| if HAS_H5NETCDF: | |
| try: | |
| f = h5netcdf.File(file_path, 'r') | |
| return f, "NetCDF_h5netcdf" | |
| except Exception: | |
| pass | |
| # TIER 3: Try h5py | |
| try: | |
| f = h5py.File(file_path, 'r') | |
| return f, "HDF5" | |
| except Exception as h5_error: | |
| raise ValueError(f"Cannot open file. netCDF4 error: {nc_error}, h5py error: {h5_error}") | |
| else: | |
| raise ValueError(f"Unsupported file type: {suffix}") | |
| def get_file_structure(file_path: str) -> Dict[str, Any]: | |
| """ | |
| Get the structure of an HDF5/NetCDF file. | |
| Returns: | |
| Dictionary with file structure information and list of datasets | |
| """ | |
| try: | |
| f, file_type = open_file_with_fallback(file_path) | |
| structure = {"file_type": file_type} | |
| datasets = [] | |
| if file_type == "HDF5": | |
| # HDF5 structure | |
| dataset_list = [] | |
| def visitor(name, obj): | |
| if isinstance(obj, h5py.Dataset): | |
| dataset_list.append({ | |
| "path": "/" + name, | |
| "shape": obj.shape, | |
| "dtype": str(obj.dtype) | |
| }) | |
| f.visititems(visitor) | |
| structure["datasets"] = dataset_list | |
| # Create dropdown-friendly list | |
| for ds in dataset_list: | |
| display = f"{ds['path']} ({ds['shape']}, {ds['dtype']})" | |
| datasets.append(display) | |
| else: | |
| # NetCDF structure | |
| structure["dimensions"] = {name: len(dim) for name, dim in f.dimensions.items()} | |
| var_list = [] | |
| for name, var in f.variables.items(): | |
| var_list.append({ | |
| "name": name, | |
| "dimensions": var.dimensions, | |
| "shape": var.shape, | |
| "dtype": str(var.dtype) | |
| }) | |
| structure["variables"] = var_list | |
| # Create dropdown-friendly list | |
| for var in var_list: | |
| display = f"{var['name']} ({var['shape']}, {var['dtype']})" | |
| datasets.append(display) | |
| f.close() | |
| return { | |
| "structure": structure, | |
| "datasets": datasets, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "success": False | |
| } | |
| def get_var_attrs_dict(var, file_type: str) -> dict: | |
| """Get variable attributes as dict - handles all file types.""" | |
| try: | |
| if file_type == "HDF5": | |
| return dict(var.attrs) | |
| elif file_type == "NetCDF": | |
| return {k: var.getncattr(k) for k in var.ncattrs()} | |
| elif file_type == "NetCDF_h5netcdf": | |
| return dict(var.attrs) | |
| else: | |
| return {} | |
| except Exception: | |
| return {} | |
| # ============================================ | |
| # MEMORY SAFETY LIMITS (OVERRIDABLE) | |
| # ============================================ | |
| # Size thresholds for automatic memory checks (in MB) | |
| SAFE_LOAD_MB = 100 # Green light - load without warning | |
| WARNING_LOAD_MB = 500 # Yellow - load but warn | |
| ERROR_LOAD_MB = 2000 # Red - refuse unless user overrides | |
| def check_memory_safety(data_shape: tuple, dtype, slice_str: Optional[str] = None, | |
| memory_limit_mb: Optional[float] = None) -> Dict[str, Any]: | |
| """ | |
| Check if loading data is safe based on size thresholds. | |
| Args: | |
| data_shape: Full shape of the dataset | |
| dtype: Data type of the dataset | |
| slice_str: Optional slice string to compute result shape | |
| memory_limit_mb: Optional custom memory limit. If None, uses tiered defaults. | |
| Returns: | |
| Dictionary with: | |
| - safe: bool - whether it's safe to proceed | |
| - estimated_mb: float - estimated memory usage | |
| - warning: str (optional) - warning message | |
| - error: str (optional) - error message | |
| - suggested_slice: str (optional) - suggested slice if too large | |
| """ | |
| # Compute result shape | |
| if slice_str and slice_str.strip(): | |
| try: | |
| slices = parse_slice(slice_str) | |
| result_shape = compute_slice_shape(data_shape, slices) | |
| except Exception as e: | |
| return {"safe": False, "error": f"Invalid slice: {str(e)}"} | |
| else: | |
| result_shape = data_shape | |
| # Calculate memory | |
| result_elements = np.prod(result_shape) | |
| bytes_per_element = np.dtype(dtype).itemsize | |
| estimated_mb = result_elements * bytes_per_element / (1024 * 1024) | |
| result = { | |
| "estimated_mb": round(estimated_mb, 2), | |
| "result_shape": list(result_shape), | |
| "full_shape": list(data_shape) | |
| } | |
| # Use custom limit if provided | |
| if memory_limit_mb is not None: | |
| if estimated_mb <= memory_limit_mb: | |
| result["safe"] = True | |
| result["info"] = f"Within custom limit of {memory_limit_mb} MB" | |
| else: | |
| result["safe"] = False | |
| result["error"] = f"Exceeds custom limit of {memory_limit_mb} MB (estimated: {estimated_mb:.1f} MB)" | |
| if slice_str is None: | |
| result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype, target_mb=memory_limit_mb) | |
| return result | |
| # Use tiered defaults | |
| if estimated_mb < SAFE_LOAD_MB: | |
| result["safe"] = True | |
| result["info"] = "Safe to load" | |
| elif estimated_mb < WARNING_LOAD_MB: | |
| result["safe"] = True | |
| result["warning"] = f"Loading {estimated_mb:.1f} MB. Consider slicing for faster operations." | |
| elif estimated_mb < ERROR_LOAD_MB: | |
| result["safe"] = False | |
| result["error"] = f"Dataset too large ({estimated_mb:.1f} MB). Please use slicing." | |
| if slice_str is None: | |
| result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype) | |
| else: | |
| result["safe"] = False | |
| result["error"] = f"Dataset extremely large ({estimated_mb:.1f} MB). Maximum recommended: {ERROR_LOAD_MB} MB." | |
| result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype) | |
| return result | |
| def compute_slice_shape(full_shape: tuple, slices) -> tuple: | |
| """Compute the resulting shape after applying slices""" | |
| if not isinstance(slices, tuple): | |
| slices = (slices,) | |
| # Pad slices with full-dimension slices if needed | |
| while len(slices) < len(full_shape): | |
| slices = slices + (slice(None),) | |
| result_shape = [] | |
| for dim_size, s in zip(full_shape, slices): | |
| if isinstance(s, int): | |
| # Single index - dimension is removed | |
| continue | |
| elif isinstance(s, slice): | |
| start = s.start if s.start is not None else 0 | |
| stop = s.stop if s.stop is not None else dim_size | |
| step = s.step if s.step is not None else 1 | |
| # Handle negative indices | |
| if start < 0: | |
| start = dim_size + start | |
| if stop < 0: | |
| stop = dim_size + stop | |
| # Compute dimension size | |
| dim_result = max(0, (stop - start + step - 1) // step) | |
| result_shape.append(dim_result) | |
| return tuple(result_shape) | |
| def suggest_reasonable_slice(full_shape: tuple, dtype, target_mb: float = 100) -> str: | |
| """Suggest a slice that would result in reasonable memory usage""" | |
| bytes_per_element = np.dtype(dtype).itemsize | |
| target_elements = int(target_mb * 1024 * 1024 / bytes_per_element) | |
| # Find the largest dimension | |
| max_dim_idx = np.argmax(full_shape) | |
| # Calculate how many elements of that dimension we can take | |
| other_dims_product = np.prod([s for i, s in enumerate(full_shape) if i != max_dim_idx]) | |
| if other_dims_product > 0: | |
| max_allowed = max(1, int(target_elements / other_dims_product)) | |
| max_allowed = min(max_allowed, full_shape[max_dim_idx]) | |
| else: | |
| max_allowed = min(10, full_shape[max_dim_idx]) | |
| # Build slice string | |
| slice_parts = [] | |
| for i, dim_size in enumerate(full_shape): | |
| if i == max_dim_idx: | |
| slice_parts.append(f"0:{max_allowed}") | |
| else: | |
| slice_parts.append(":") | |
| return ", ".join(slice_parts) | |
| # ============================================ | |
| # MCP TOOL FUNCTIONS (STATELESS - ALWAYS TAKE file_path) | |
| # ============================================ | |
| def download_file(url: str) -> Dict[str, Any]: | |
| """ | |
| Download a HDF5/NetCDF file from a URL and parse its structure. | |
| **CRITICAL**: This tool downloads the file AND parses its structure. You do NOT need to | |
| call list_structure() after downloading - the structure is already in the result. | |
| Args: | |
| url: Direct URL to the HDF5 (.h5, .hdf5) or NetCDF (.nc, .nc4) file | |
| Returns: | |
| Dictionary with these keys: | |
| - file_path (str): Path to downloaded file - SAVE THIS, you need it for all other tools | |
| - structure (dict): Complete file structure (file_type, dimensions, variables/datasets) | |
| - datasets (list): Available dataset names with shapes/types | |
| - filename (str): Original filename | |
| - size_mb (float): File size in megabytes | |
| - status (str): Success/error message | |
| **Return value structure**: | |
| { | |
| "file_path": "/tmp/tmpXXX.nc", # β Use this in other tools | |
| "structure": { | |
| "file_type": "NetCDF", | |
| "dimensions": {"time": 100, "lat": 90, "lon": 180}, | |
| "variables": [{"name": "data", "shape": [100, 90, 180], ...}] | |
| }, | |
| "datasets": ["data ((100, 90, 180), float32)", ...], | |
| "status": "Successfully downloaded..." | |
| } | |
| **Usage pattern**: | |
| 1. Call download_file(url) | |
| 2. Extract file_path from result: file_path = result["file_path"] | |
| 3. Use file_path in other tools: compute_statistics(file_path, "dataset_name") | |
| **Common mistake**: Don't call list_structure() after download_file() - the structure | |
| is already in the download_file result, so that's redundant. | |
| """ | |
| try: | |
| if not url: | |
| return {"error": "Please provide a URL", "status": "failed"} | |
| # Download file | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| response = requests.get(url, stream=True, timeout=60, headers=headers, allow_redirects=True) | |
| response.raise_for_status() | |
| # Check content type | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type: | |
| return {"error": "URL returned HTML instead of a file", "status": "failed"} | |
| # Determine extension | |
| url_path = Path(url) | |
| extension = url_path.suffix if url_path.suffix else '.nc' | |
| # Save to temp file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=extension, mode='wb') as tmp_file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| tmp_file.write(chunk) | |
| tmp_path = tmp_file.name | |
| # Get file size | |
| file_size_mb = Path(tmp_path).stat().st_size / (1024 * 1024) | |
| # Parse structure | |
| structure_result = get_file_structure(tmp_path) | |
| if not structure_result.get("success"): | |
| return { | |
| "error": structure_result.get('error', 'Unknown error'), | |
| "status": "failed" | |
| } | |
| filename = url_path.name | |
| return { | |
| "file_path": tmp_path, | |
| "filename": filename, | |
| "size_mb": round(file_size_mb, 2), | |
| "structure": structure_result["structure"], | |
| "datasets": structure_result["datasets"], | |
| "status": f"Successfully downloaded {filename} ({file_size_mb:.2f} MB)" | |
| } | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| } | |
| def read_dataset(file_path: str, dataset_path: str, slice_str: str = "", | |
| memory_limit_mb: Optional[float] = None) -> Dict[str, Any]: | |
| """ | |
| Read data from a specific dataset/variable. | |
| **WHEN TO USE THIS**: | |
| - When you need the actual data values for visualization or export | |
| - When compute_statistics() doesn't provide enough information | |
| - When you need to inspect specific data points | |
| **WHEN NOT TO USE THIS**: | |
| - If you only need statistics (min, max, mean, etc.) β use compute_statistics() instead | |
| - If the dataset is > 10,000 elements β use compute_statistics(), export_data(), or visualization tools | |
| **Token efficiency note**: Large datasets are truncated to prevent wasting tokens. If you see | |
| "serialized": false in the result, the tool is telling you to use a different approach. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to the dataset within the file (e.g., 'variable_name', '/group/data') | |
| slice_str: Optional numpy-style slice (e.g., '0:10, :, 5' = first 10 in dim 0, all of dim 1, index 5 of dim 2) | |
| memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered | |
| defaults (100 MB safe, 500 MB warning, 2000 MB max). Set higher to | |
| override defaults if you have more memory available. | |
| Returns: | |
| Dictionary containing: | |
| - data (list): Actual data values (only if < 10,000 elements; otherwise see alternatives) | |
| - shape (list): Shape of the data | |
| - dtype (str): Data type | |
| - size_mb (float): Memory size | |
| - num_elements (int): Total elements | |
| - serialized (bool): True if full data included, False if truncated | |
| - alternatives (list): Suggested alternative tools if data was truncated | |
| **Return value structure (small dataset)**: | |
| { | |
| "data": [[1.2, 3.4], [5.6, 7.8]], # β Full data array | |
| "shape": [2, 2], | |
| "dtype": "float32", | |
| "serialized": true | |
| } | |
| **Return value structure (large dataset)**: | |
| { | |
| "serialized": false, # β Data NOT included | |
| "preview_first_100": [1.2, 3.4, ...], | |
| "alternatives": [ | |
| "Use compute_statistics() to get summary stats", | |
| "Use export_data() to save to file", | |
| "Use create_histogram() to visualize distribution" | |
| ], | |
| "shape": [1000, 90, 180], | |
| "num_elements": 16200000 | |
| } | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| if not dataset_path: | |
| return {"error": "Please specify a dataset path"} | |
| # Extract clean path if it has shape/dtype info | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset/variable | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety before loading | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| # Return error with clear override instructions | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "full_shape": safety_check["full_shape"], | |
| "result_shape": safety_check["result_shape"], | |
| "override_option": f"To load this data anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher", | |
| "default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result | |
| # Read data with optional slicing | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Convert to numpy array | |
| if not isinstance(data, np.ndarray): | |
| data = np.array(data) | |
| result = { | |
| "shape": list(data.shape), | |
| "dtype": str(data.dtype), | |
| "size_mb": round(data.nbytes / (1024 * 1024), 2), | |
| "num_elements": int(data.size) | |
| } | |
| # Limit serialization to avoid excessive token usage in LLM context | |
| # The LLM sees the full JSON response and large arrays waste tokens | |
| MAX_SERIALIZE_ELEMENTS = 10000 | |
| if data.size > MAX_SERIALIZE_ELEMENTS: | |
| result["serialized"] = False | |
| result["note"] = f"Dataset has {data.size:,} elements. Only preview returned to save tokens." | |
| result["token_saving_info"] = f"Full data would use ~{data.size * 10} tokens. Use export_data or create_histogram for large datasets." | |
| # Provide preview | |
| flat_data = data.flatten() | |
| result["preview_first_100"] = flat_data[:100].tolist() | |
| result["preview_last_100"] = flat_data[-100:].tolist() | |
| result["preview_shape_info"] = f"Showing first/last 100 of {data.size:,} elements" | |
| # Suggest alternatives | |
| result["alternatives"] = [ | |
| f"Use compute_statistics() to get summary stats without loading all data", | |
| f"Use export_data() to save to file for external analysis", | |
| f"Use create_histogram() to visualize distribution", | |
| f"Use slicing to load smaller subset: slice_str='{suggest_reasonable_slice(data.shape, data.dtype, target_mb=50)}'" | |
| ] | |
| else: | |
| result["serialized"] = True | |
| result["data"] = data.tolist() | |
| # Add warning if present | |
| if "warning" in safety_check: | |
| result["warning"] = safety_check["warning"] | |
| return result | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to read dataset: {str(e)}", "traceback": traceback.format_exc()} | |
| def get_dataset_preview(file_path: str, dataset_path: str, n_preview: int = 10) -> Dict[str, Any]: | |
| """ | |
| Get a safe preview of a dataset without loading the entire array. | |
| **USE CASE**: Quick peek at data values without loading large arrays. | |
| **WHEN TO USE THIS**: | |
| - You want to see a few sample values to understand the data | |
| - You need to verify data looks reasonable before processing | |
| - You want to check data format/structure with minimal memory use | |
| - Dataset is large but you only need to see representative values | |
| **WHEN NOT TO USE THIS**: | |
| - You need statistics β use compute_statistics() instead (more efficient) | |
| - You need all the data β use read_dataset() for small datasets or export_data() for large ones | |
| - You want to visualize β use create_histogram(), create_line_plot(), etc. instead | |
| **PREVIEW BEHAVIOR**: | |
| - Returns first N elements from flattened array (default: 10) | |
| - Safe for any dataset size - only loads requested elements | |
| - Provides context: shape, dtype, total size | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to the dataset within the file | |
| n_preview: Number of elements to preview (default: 10, range: 1-1000) | |
| Returns: | |
| Dictionary with keys: | |
| - preview (list): First n_preview elements from the dataset | |
| - shape (list): Full shape of the dataset | |
| - dtype (str): Data type | |
| - size_mb (float): Total memory size in MB | |
| - num_elements (int): Total number of elements | |
| - preview_count (int): Number of elements actually returned | |
| **Return value structure**: | |
| { | |
| "preview": [15.3, 14.2, 16.8, 15.1, ...], # β First 10 values | |
| "shape": [1750, 90, 180], | |
| "dtype": "float32", | |
| "size_mb": 113.4, | |
| "num_elements": 28350000, | |
| "preview_count": 10 | |
| } | |
| **Example workflow - Quick data check**: | |
| # Preview data to verify it looks reasonable | |
| preview = get_dataset_preview(file_path, "temperature", n_preview=20) | |
| sample_values = preview["preview"] # First 20 values | |
| # Check if values are in expected range | |
| if all(-50 < v < 50 for v in sample_values): | |
| # Looks good, now get full statistics | |
| stats = compute_statistics(file_path, "temperature") | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| if not dataset_path: | |
| return {"error": "Please specify a dataset path"} | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset/variable | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| shape = var.shape | |
| dtype = var.dtype | |
| num_elements = int(np.prod(shape)) if len(shape) > 0 else 1 | |
| size_mb = num_elements * dtype.itemsize / (1024**2) | |
| result = { | |
| "dataset_path": dataset_path, | |
| "shape": list(shape), | |
| "dtype": str(dtype), | |
| "size_mb": round(size_mb, 2), | |
| "num_elements": num_elements | |
| } | |
| # Get preview data | |
| if num_elements == 0: | |
| result["preview_data"] = [] | |
| result["note"] = "Dataset is empty" | |
| elif len(shape) == 0: | |
| # Scalar | |
| val = var[()] | |
| result["preview_data"] = val.item() if isinstance(val, np.generic) else val | |
| elif len(shape) == 1: | |
| # 1D array | |
| n = min(n_preview, shape[0]) | |
| result["first_n"] = var[:n].tolist() | |
| if shape[0] > n: | |
| result["last_n"] = var[-n:].tolist() | |
| else: | |
| # Multi-dimensional | |
| sample_slice = [slice(0, min(n_preview, shape[0]))] + [0] * (len(shape) - 1) | |
| sample_data = var[tuple(sample_slice)] | |
| result["first_n_elements"] = np.array(sample_data).flatten()[:n_preview].tolist() | |
| result["note"] = f"Showing first {min(n_preview, len(result['first_n_elements']))} of {num_elements} elements" | |
| return result | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to preview dataset: {str(e)}", "traceback": traceback.format_exc()} | |
| def get_attributes(file_path: str, path: str = "") -> Dict[str, Any]: | |
| """ | |
| Get metadata attributes for a file, group, or dataset. | |
| **USE CASE**: Understanding what variables mean, their units, descriptions, etc. | |
| **COMMON ATTRIBUTES TO LOOK FOR**: | |
| - units: Physical units of the data (e.g., "kelvin", "meters", "days since 1800-01-01") | |
| - long_name: Human-readable description of the variable | |
| - standard_name: CF-convention standard name | |
| - scale_factor / add_offset: Linear transformation parameters for packed data | |
| - _FillValue: Value representing missing/invalid data | |
| - valid_range / valid_min / valid_max: Valid data bounds | |
| **WHEN TO USE THIS**: | |
| - Before analyzing data: check units and understand what the values represent | |
| - To find file-level metadata: leave path empty | |
| - To understand coordinate variables: especially for time (to convert indices to dates) | |
| - To check for data quality flags or processing history | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| path: Path to dataset/group (empty string "" for file-level attributes) | |
| Returns: | |
| Dictionary with keys: | |
| - attributes (dict): All metadata attributes as key-value pairs | |
| - path (str): Path that was queried ("root" if file-level) | |
| **Return value structure**: | |
| { | |
| "attributes": { | |
| "long_name": "Surface temperature anomaly", | |
| "units": "K", | |
| "scale_factor": 0.01, | |
| "_FillValue": 32767, | |
| "cell_methods": "time: mean" | |
| }, | |
| "path": "temperature" | |
| } | |
| **Example workflow - Understanding time coordinate**: | |
| # Get time attributes to understand how to interpret time values | |
| time_attrs = get_attributes(file_path, "time") | |
| units = time_attrs["attributes"]["units"] # e.g., "days since 1800-01-01" | |
| # Now you know how to interpret time values from index_to_coordinate() | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| # Extract clean path | |
| path = path.split(" (")[0] if path and " (" in path else path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get the object | |
| if path: | |
| if file_type == "HDF5": | |
| obj = f[path] | |
| else: | |
| obj = f.variables[path] | |
| else: | |
| obj = f | |
| # Get attributes | |
| if file_type == "HDF5": | |
| attrs = dict(obj.attrs) | |
| elif file_type == "NetCDF": | |
| attrs = {k: obj.getncattr(k) for k in obj.ncattrs()} | |
| elif file_type == "NetCDF_h5netcdf": | |
| attrs = dict(obj.attrs) | |
| else: | |
| attrs = {} | |
| # Convert numpy types to native Python | |
| for key, value in attrs.items(): | |
| if isinstance(value, np.generic): | |
| attrs[key] = value.item() | |
| return {"attributes": attrs, "path": path if path else "root"} | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to get attributes: {str(e)}", "traceback": traceback.format_exc()} | |
| def get_dimensions(file_path: str, dataset_path: str = "") -> Dict[str, Any]: | |
| """ | |
| Get dimension information for a dataset or file. | |
| **USE CASE**: Understanding data structure before slicing or analyzing. | |
| **WHEN TO USE THIS**: | |
| - To see what dimensions a dataset has and their sizes | |
| - To understand how to construct slice strings | |
| - For NetCDF files with empty dataset_path: get ALL dimensions in the file | |
| **DO NOT use this if**: | |
| - You just downloaded a file β the structure is already in download_file() result | |
| - You already ran list_structure() β the dimensions are in that result | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to dataset (empty "" for NetCDF shows all dimensions, required for HDF5) | |
| Returns: | |
| For specific dataset: | |
| - dimensions (list): Dimension names (NetCDF only) | |
| - shape (list): Size of each dimension | |
| For NetCDF with empty path: | |
| - dimensions (dict): All dimensions in file as {name: size} | |
| **Return value structure (specific dataset)**: | |
| { | |
| "dimensions": ["time", "lat", "lon"], | |
| "shape": [1750, 90, 180] | |
| } | |
| **Return value structure (NetCDF, all dimensions)**: | |
| { | |
| "dimensions": { | |
| "time": 1750, | |
| "lat": 90, | |
| "lon": 180, | |
| "nv": 2 | |
| } | |
| } | |
| **Example workflow - Planning a slice**: | |
| dims = get_dimensions(file_path, "temperature") | |
| # dims["shape"] = [1750, 90, 180] | |
| # Now you know: dimension 0 has 1750 elements, dim 1 has 90, dim 2 has 180 | |
| # To get first 10 time steps, all lat, all lon: slice_str = "0:10, :, :" | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if dataset_path and " (" in dataset_path else dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| if file_type == "HDF5": | |
| if not dataset_path: | |
| return {"error": "dataset_path required for HDF5 files"} | |
| shape = f[dataset_path].shape | |
| return {"shape": list(shape), "ndim": len(shape)} | |
| else: | |
| # NetCDF | |
| if dataset_path: | |
| var = f.variables[dataset_path] | |
| return { | |
| "dimensions": var.dimensions, | |
| "shape": list(var.shape) | |
| } | |
| else: | |
| return { | |
| "dimensions": {name: len(dim) for name, dim in f.dimensions.items()} | |
| } | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to get dimensions: {str(e)}", "traceback": traceback.format_exc()} | |
| def list_structure(file_path: str) -> Dict[str, Any]: | |
| """ | |
| List the complete hierarchical structure of the file. | |
| **WHEN TO USE THIS**: | |
| - You need to browse available datasets/variables | |
| - You don't know the names of variables in the file | |
| - You need to see the complete file organization | |
| **WHEN NOT TO USE THIS**: | |
| - Right after download_file() β structure is already in that result | |
| - You already know the dataset name β just use it directly in other tools | |
| - File has > 100 datasets β output will be truncated to save tokens | |
| **TOKEN EFFICIENCY NOTE**: For files with many datasets, this function truncates | |
| the output to avoid wasting tokens. If you know specific dataset names, use them | |
| directly instead of browsing. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| Returns: | |
| Dictionary containing: | |
| - file_type (str): "HDF5" or "NetCDF" | |
| - dimensions (dict): For NetCDF, dimensions and their sizes | |
| - variables (list): For NetCDF, list of variables with metadata | |
| - datasets (list): For HDF5, list of datasets with metadata | |
| - token_saving_info (dict): If file is large, explanation of truncation | |
| **Return value structure (NetCDF)**: | |
| { | |
| "file_type": "NetCDF", | |
| "dimensions": {"time": 100, "lat": 90, "lon": 180}, | |
| "variables": [ | |
| { | |
| "name": "temperature", | |
| "dimensions": ["time", "lat", "lon"], | |
| "shape": [100, 90, 180], | |
| "dtype": "float32" | |
| }, | |
| ... | |
| ] | |
| } | |
| **Common mistake**: Calling this after download_file() is redundant - you already | |
| have the structure from download_file()["structure"]. | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| # Get the full structure | |
| result = get_file_structure(file_path) | |
| if not result.get("success"): | |
| return {"error": result.get("error", "Unknown error")} | |
| structure = result["structure"] | |
| # Count total items | |
| if structure.get("file_type") == "HDF5": | |
| total_items = len(structure.get("datasets", [])) | |
| item_type = "datasets" | |
| else: # NetCDF | |
| total_items = len(structure.get("variables", [])) | |
| item_type = "variables" | |
| # Token-saving thresholds | |
| SAFE_LIMIT = 100 # Return full list | |
| WARNING_LIMIT = 500 # Return full list with warning | |
| TRUNCATE_LIMIT = 500 # Truncate and provide summary | |
| if total_items <= SAFE_LIMIT: | |
| # Small file - return everything | |
| return structure | |
| elif total_items <= WARNING_LIMIT: | |
| # Medium file - return everything but warn about token usage | |
| structure["token_warning"] = f"File has {total_items} {item_type}. Consider using specific dataset names instead of browsing full structure." | |
| return structure | |
| else: | |
| # Large file - truncate to save tokens | |
| if structure.get("file_type") == "HDF5": | |
| full_list = structure.get("datasets", []) | |
| structure["datasets"] = full_list[:100] | |
| structure["datasets_truncated"] = True | |
| structure["total_datasets"] = total_items | |
| structure["showing_first"] = 100 | |
| else: # NetCDF | |
| full_list = structure.get("variables", []) | |
| structure["variables"] = full_list[:100] | |
| structure["variables_truncated"] = True | |
| structure["total_variables"] = total_items | |
| structure["showing_first"] = 100 | |
| structure["token_saving_info"] = { | |
| "message": f"File has {total_items} {item_type}. Only showing first 100 to save tokens.", | |
| "estimated_tokens_saved": f"~{(total_items - 100) * 50:,} tokens", | |
| "full_token_cost": f"Full list would use ~{total_items * 50:,} tokens", | |
| "alternatives": [ | |
| "If you know the dataset name, use read_dataset() or get_attributes() directly", | |
| "Use file metadata or documentation to identify dataset names", | |
| "Search for specific patterns if available in your workflow" | |
| ] | |
| } | |
| return structure | |
| except Exception as e: | |
| return {"error": f"Failed to list structure: {str(e)}", "traceback": traceback.format_exc()} | |
| def compute_statistics(file_path: str, dataset_path: str, slice_str: str = "", | |
| memory_limit_mb: Optional[float] = None) -> Dict[str, Any]: | |
| """ | |
| Compute basic statistics for a numeric dataset. | |
| **PREFERRED TOOL**: Use this instead of read_dataset() when you only need statistics. | |
| This is more efficient and avoids loading full arrays into memory unnecessarily. | |
| **Common use cases**: | |
| - Comparing values between time periods (e.g., "compare mean in period A vs period B") | |
| - Finding data ranges (min/max) | |
| - Data quality checks (checking for NaN values) | |
| - Quick data exploration before visualization | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to the dataset within the file | |
| slice_str: Optional numpy-style slice (e.g., "100:200, :, :" for rows 100-200) | |
| memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered | |
| defaults (100 MB safe, 500 MB warning, 2000 MB max). Set higher to | |
| override defaults if you have more memory available. | |
| Returns: | |
| Dictionary with keys: | |
| - statistics (dict): Contains 'min', 'max', 'mean', 'std', 'median' | |
| - shape (list): Shape of the data analyzed | |
| - dtype (str): Data type | |
| - size_mb (float): Memory size in MB | |
| - num_elements (int): Total number of elements | |
| - nan_count (int): Number of NaN values (for floating point data) | |
| - nan_fraction (float): Fraction of values that are NaN | |
| **Return value structure**: | |
| { | |
| "statistics": { | |
| "min": -5.2, | |
| "max": 42.1, | |
| "mean": 15.3, # β Access directly with result["statistics"]["mean"] | |
| "std": 8.7, | |
| "median": 14.2 | |
| }, | |
| "shape": [12, 90, 180], | |
| "num_elements": 194400, | |
| "nan_count": 0 | |
| } | |
| **Example workflow - Comparing time periods**: | |
| # INEFFICIENT (don't do this): | |
| result1 = read_dataset(file, "var", "0:12, :, :") # loads full data | |
| result2 = read_dataset(file, "var", "100:112, :, :") # loads full data | |
| # then manually calculate means from the data arrays | |
| # EFFICIENT (do this): | |
| stats1 = compute_statistics(file, "var", "0:12, :, :") # computes stats directly | |
| stats2 = compute_statistics(file, "var", "100:112, :, :") # computes stats directly | |
| difference = stats2["statistics"]["mean"] - stats1["statistics"]["mean"] | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| if not dataset_path: | |
| return {"error": "Please specify a dataset path"} | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset/variable | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety before loading | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| # Return error with clear override instructions | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "full_shape": safety_check["full_shape"], | |
| "result_shape": safety_check["result_shape"], | |
| "override_option": f"To compute statistics anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher", | |
| "default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result | |
| # Read data with optional slicing | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Convert to numpy array | |
| if not isinstance(data, np.ndarray): | |
| data = np.array(data) | |
| # Check if numeric | |
| if not np.issubdtype(data.dtype, np.number): | |
| return {"error": f"Dataset is not numeric (dtype: {data.dtype})"} | |
| # Compute statistics | |
| stats = { | |
| "min": float(np.nanmin(data)), | |
| "max": float(np.nanmax(data)), | |
| "mean": float(np.nanmean(data)), | |
| "std": float(np.nanstd(data)), | |
| "median": float(np.nanmedian(data)), | |
| "shape": list(data.shape), | |
| "dtype": str(data.dtype), | |
| "size_mb": round(data.nbytes / (1024 * 1024), 2), | |
| "num_elements": int(data.size) | |
| } | |
| # Add NaN info if floating point | |
| if np.issubdtype(data.dtype, np.floating): | |
| num_nan = int(np.sum(np.isnan(data))) | |
| stats["nan_count"] = num_nan | |
| stats["nan_fraction"] = float(num_nan / data.size) if data.size > 0 else 0.0 | |
| result = {"statistics": stats} | |
| # Add warning if present | |
| if "warning" in safety_check: | |
| result["warning"] = safety_check["warning"] | |
| return result | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to compute statistics: {str(e)}", "traceback": traceback.format_exc()} | |
| def export_data(file_path: str, dataset_path: str, format: str = "csv", | |
| slice_str: str = "", max_rows: int = 100000, | |
| memory_limit_mb: Optional[float] = None) -> Dict[str, Any]: | |
| """ | |
| Export dataset to CSV or JSON format for external analysis. | |
| **WHEN TO USE THIS**: | |
| - You need to analyze data in external tools (Excel, Python, R, etc.) | |
| - You want to save processed/sliced data | |
| - Dataset is too large to view in chat but small enough to export | |
| **WHEN NOT TO USE THIS**: | |
| - Just to check a few values β use get_dataset_preview() instead | |
| - Just need statistics β use compute_statistics() instead | |
| - Want to visualize β use create_histogram(), create_line_plot(), etc. instead | |
| **FORMAT NOTES**: | |
| - CSV: Best for tabular data, has row limits (default 100,000) | |
| - JSON: Better for hierarchical data or very large datasets | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to the dataset within the file | |
| format: Export format - "csv" or "json" | |
| slice_str: Optional numpy-style slice to export subset | |
| max_rows: Maximum rows for CSV export (safety limit) | |
| memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered | |
| defaults (100 MB safe, 500 MB warning, 2000 MB max). | |
| Returns: | |
| Dictionary with keys: | |
| - status (str): "success" or error | |
| - file_path (str): Path to exported file (downloadable) | |
| - format (str): Export format used | |
| - size_mb (float): Size of exported file | |
| - shape (list): Shape of exported data | |
| - total_elements (int): Total elements exported | |
| **Return value structure**: | |
| { | |
| "status": "success", | |
| "file_path": "/tmp/exported_data.csv", # β Download this file | |
| "format": "CSV", | |
| "size_mb": 1.5, | |
| "shape": [1000, 90], | |
| "total_elements": 90000 | |
| } | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "Please provide a file path"} | |
| if not dataset_path: | |
| return {"error": "Please specify a dataset path"} | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset/variable | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety before loading | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| # Return error with clear override instructions | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "full_shape": safety_check["full_shape"], | |
| "result_shape": safety_check["result_shape"], | |
| "override_option": f"To export anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher", | |
| "default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result | |
| # Read data with optional slicing | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Convert to numpy array | |
| if not isinstance(data, np.ndarray): | |
| data = np.array(data) | |
| # Get metadata | |
| metadata = get_var_attrs_dict(var, file_type) | |
| # Check size limits for CSV | |
| total_elements = np.prod(data.shape) | |
| if total_elements > max_rows and format == "csv": | |
| return {"error": f"Dataset too large for CSV ({total_elements} elements). Maximum is {max_rows}. Use slicing or JSON format."} | |
| # Create export file | |
| if format == "csv": | |
| tmp_path = tempfile.mktemp(suffix='.csv') | |
| # Handle different dimensions | |
| if data.ndim == 1: | |
| df = pd.DataFrame({ | |
| 'index': np.arange(len(data)), | |
| 'value': data | |
| }) | |
| elif data.ndim == 2: | |
| df = pd.DataFrame(data) | |
| df.columns = [f'col_{i}' for i in range(data.shape[1])] | |
| else: | |
| # Flatten with multi-index | |
| indices = np.array(np.unravel_index(np.arange(data.size), data.shape)).T | |
| df = pd.DataFrame(indices, columns=[f'dim_{i}' for i in range(data.ndim)]) | |
| df['value'] = data.flatten() | |
| # Apply row limit | |
| if len(df) > max_rows: | |
| df = df.head(max_rows) | |
| # Write with metadata as comments | |
| with open(tmp_path, 'w') as f_out: | |
| f_out.write(f"# Dataset: {dataset_path}\n") | |
| f_out.write(f"# Shape: {data.shape}\n") | |
| f_out.write(f"# Dtype: {data.dtype}\n") | |
| if metadata: | |
| f_out.write("# Metadata:\n") | |
| for key, value in metadata.items(): | |
| f_out.write(f"# {key}: {value}\n") | |
| f_out.write("\n") | |
| df.to_csv(f_out, index=False) | |
| elif format == "json": | |
| tmp_path = tempfile.mktemp(suffix='.json') | |
| export_obj = { | |
| "dataset": dataset_path, | |
| "shape": list(data.shape), | |
| "dtype": str(data.dtype), | |
| "data": data.tolist(), | |
| "metadata": metadata | |
| } | |
| with open(tmp_path, 'w') as f_out: | |
| json.dump(export_obj, f_out, indent=2) | |
| else: | |
| return {"error": f"Unsupported format: {format}"} | |
| # Get file size | |
| file_size_mb = Path(tmp_path).stat().st_size / (1024 * 1024) | |
| return { | |
| "status": "success", | |
| "file_path": tmp_path, | |
| "format": format.upper(), | |
| "size_mb": round(file_size_mb, 2), | |
| "shape": list(data.shape), | |
| "dtype": str(data.dtype), | |
| "total_elements": int(total_elements) | |
| } | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| return {"error": f"Failed to export: {str(e)}", "traceback": traceback.format_exc()} | |
| def create_histogram(file_path: str, dataset_path: str, slice_str: str = "", | |
| title_override: str = "", xlabel_override: str = "", | |
| nbins: int = 50) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """ | |
| Create histogram showing distribution of values. | |
| **USE CASE**: Visualize the distribution of a dataset to understand its characteristics. | |
| **WHEN TO USE THIS**: | |
| - Understand data distribution (normal, skewed, bimodal, etc.) | |
| - Identify outliers or unusual values | |
| - Quality control - check if data looks reasonable | |
| - Compare with expected distributions | |
| **OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file. | |
| The HTML file contains an interactive Plotly histogram viewable in any browser. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to dataset/variable | |
| slice_str: Optional numpy-style slice notation to plot subset | |
| title_override: Optional custom title (leave empty for automatic) | |
| xlabel_override: Optional custom x-axis label (leave empty for automatic) | |
| nbins: Number of bins for histogram (default: 50, range: 5-200) | |
| Returns: | |
| Tuple of (status_dict, html_file_path): | |
| - status_dict: Contains statistics and status info | |
| - html_file_path: Path to interactive HTML file (downloadable) | |
| **Return value structure**: | |
| ( | |
| { | |
| "status": "success", | |
| "html_file": "/tmp/plot123.html", # β Download this | |
| "statistics": { | |
| "mean": 15.3, | |
| "median": 14.2, | |
| "min": -5.2, | |
| "max": 42.1, | |
| "std": 8.7, | |
| "num_bins": 50, | |
| "num_values": 16200 | |
| } | |
| }, | |
| "/tmp/plot123.html" # HTML file path | |
| ) | |
| **Workflow**: | |
| result, html_path = create_histogram(file_path, "variable_name") | |
| # Download the HTML file from html_path to view the interactive plot | |
| # Statistics are in result["statistics"] | |
| """ | |
| import plotly.graph_objects as go | |
| try: | |
| if not file_path: | |
| return { | |
| "error": "No file path provided", | |
| "status": "failed" | |
| }, None | |
| if not dataset_path: | |
| return { | |
| "error": "No dataset path provided", | |
| "status": "failed" | |
| }, None | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file and read data directly (bypassing read_dataset's serialization limit) | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset/variable | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety before loading | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str) | |
| if not safety_check["safe"]: | |
| # Return error with suggestion | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result, None | |
| # Read data with optional slicing | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Keep masked array for statistics | |
| data_for_stats = data | |
| # Convert to filled array for plotting | |
| if isinstance(data, np.ma.MaskedArray): | |
| data_for_plot = data.filled(np.nan) | |
| else: | |
| data_for_plot = np.array(data) | |
| data_for_plot = data_for_plot.flatten() | |
| finally: | |
| f.close() | |
| # Calculate statistics using masked array (respects mask) | |
| mean_val = float(np.mean(data_for_stats)) | |
| median_val = float(np.median(data_for_stats)) | |
| min_val = float(np.min(data_for_stats)) | |
| max_val = float(np.max(data_for_stats)) | |
| std_val = float(np.std(data_for_stats)) | |
| # Create figure | |
| fig = go.Figure() | |
| # Add histogram | |
| fig.add_trace(go.Histogram( | |
| x=data_for_plot, | |
| nbinsx=nbins, | |
| name='Distribution', | |
| marker_color='steelblue', | |
| opacity=0.7 | |
| )) | |
| # Add vertical lines for statistics | |
| fig.add_vline( | |
| x=mean_val, | |
| line_dash="dash", | |
| line_color="red", | |
| annotation_text=f"Mean: {mean_val:.2f}", | |
| annotation_position="top" | |
| ) | |
| fig.add_vline( | |
| x=median_val, | |
| line_dash="dot", | |
| line_color="green", | |
| annotation_text=f"Median: {median_val:.2f}", | |
| annotation_position="bottom" | |
| ) | |
| # Set title and labels | |
| title = title_override if title_override else f"Distribution of {dataset_path}" | |
| xlabel = xlabel_override if xlabel_override else "Value" | |
| fig.update_layout( | |
| title=title, | |
| xaxis_title=xlabel, | |
| yaxis_title="Frequency", | |
| height=500, | |
| showlegend=True | |
| ) | |
| # Save as interactive HTML file | |
| html_path = tempfile.mktemp(suffix='.html', dir='/tmp') | |
| fig.write_html(html_path) | |
| # Return JSON status instead of plot object (avoids MCP serialization issues) | |
| result = { | |
| "status": "success", | |
| "message": f"Histogram created for {dataset_path}", | |
| "html_file": html_path, | |
| "statistics": { | |
| "mean": mean_val, | |
| "median": median_val, | |
| "min": min_val, | |
| "max": max_val, | |
| "std": std_val, | |
| "num_bins": nbins, | |
| "num_values": len(data_for_plot) | |
| }, | |
| "download_info": "Download the HTML file to view the interactive histogram in your browser" | |
| } | |
| return result, html_path | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to create histogram: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| }, None | |
| def create_line_plot(file_path: str, dataset_path: str, slice_str: str = "", | |
| title_override: str = "", xlabel_override: str = "", ylabel_override: str = "", | |
| x_dataset_path: str = "", moving_avg_window: int = 0, | |
| memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """ | |
| Create line plot for 1D data or time series. | |
| **USE CASE**: Visualize trends over time or along a single dimension. | |
| **WHEN TO USE THIS**: | |
| - Time series data (temperature over time, stock prices, etc.) | |
| - 1D profiles (depth profiles, altitude profiles) | |
| - Sequential data where order matters | |
| - When you want to see trends and patterns | |
| **FEATURES**: | |
| - Optional moving average smoothing to reduce noise | |
| - Custom X-axis data or auto-generated indices | |
| - Interactive Plotly visualization | |
| **OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file. | |
| The HTML file contains an interactive Plotly line plot viewable in any browser. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to Y-axis dataset/variable | |
| slice_str: Optional numpy-style slice notation | |
| title_override: Optional custom title (leave empty for automatic) | |
| xlabel_override: Optional custom x-axis label (leave empty for automatic) | |
| ylabel_override: Optional custom y-axis label (leave empty for automatic) | |
| x_dataset_path: Optional path to X-axis data (if empty, uses indices 0, 1, 2, ...) | |
| moving_avg_window: Window size for smoothing (0 or 1 = no smoothing, 2-1000 = apply moving average) | |
| memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000) | |
| Returns: | |
| Tuple of (status_dict, html_file_path): | |
| - status_dict: Contains statistics and status info | |
| - html_file_path: Path to interactive HTML file (downloadable) | |
| **Return value structure**: | |
| ( | |
| { | |
| "status": "success", | |
| "html_file": "/tmp/plot456.html", # β Download this | |
| "num_points": 1750, | |
| "y_min": -5.2, | |
| "y_max": 42.1, | |
| "y_mean": 15.3, | |
| "y_std": 8.7, | |
| "smoothing": "Applied moving average with window size 10" # If smoothing used | |
| }, | |
| "/tmp/plot456.html" # HTML file path | |
| ) | |
| **Workflow - Time series with smoothing**: | |
| # Plot temperature time series with 30-point moving average | |
| result, html_path = create_line_plot( | |
| file_path, | |
| "temperature", | |
| slice_str=":, 45, 90", # All times, specific lat/lon | |
| moving_avg_window=30 | |
| ) | |
| # Download html_path to see both raw data and smoothed trend | |
| """ | |
| import plotly.graph_objects as go | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"}, None | |
| if not dataset_path: | |
| return {"error": "No dataset path provided", "status": "failed"}, None | |
| # Extract clean paths | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| if x_dataset_path: | |
| x_dataset_path = x_dataset_path.split(" (")[0] if " (" in x_dataset_path else x_dataset_path | |
| # Open file and read Y data | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get Y dataset | |
| if file_type == "HDF5": | |
| y_var = f[dataset_path] | |
| else: | |
| y_var = f.variables[dataset_path] | |
| # Check memory safety for Y data | |
| safety_check = check_memory_safety(y_var.shape, y_var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result, None | |
| # Read Y data | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| y_data = y_var[idx] | |
| else: | |
| y_data = y_var[:] | |
| # Keep masked array for statistics | |
| y_data_for_stats = y_data | |
| # Convert to filled array for plotting | |
| if isinstance(y_data, np.ma.MaskedArray): | |
| y_data_for_plot = y_data.filled(np.nan) | |
| else: | |
| y_data_for_plot = np.array(y_data) | |
| y_data_for_plot = y_data_for_plot.flatten() | |
| # Handle X data | |
| if x_dataset_path: | |
| # Read X dataset | |
| if file_type == "HDF5": | |
| x_var = f[x_dataset_path] | |
| else: | |
| x_var = f.variables[x_dataset_path] | |
| # Check memory safety for X data | |
| x_safety_check = check_memory_safety(x_var.shape, x_var.dtype, slice_str, memory_limit_mb) | |
| if not x_safety_check["safe"]: | |
| error_result = { | |
| "error": f"X data: {x_safety_check['error']}", | |
| "estimated_size_mb": x_safety_check["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in x_safety_check: | |
| error_result["suggested_slice"] = x_safety_check["suggested_slice"] | |
| return error_result, None | |
| # Read X data | |
| if slice_str and slice_str.strip(): | |
| x_data = x_var[idx] | |
| else: | |
| x_data = x_var[:] | |
| if not isinstance(x_data, np.ndarray): | |
| x_data = np.array(x_data) | |
| x_data = x_data.flatten() | |
| if len(x_data) != len(y_data_for_plot): | |
| return { | |
| "error": f"X and Y data length mismatch: {len(x_data)} vs {len(y_data_for_plot)}", | |
| "status": "failed" | |
| }, None | |
| else: | |
| # Use indices as X | |
| x_data = np.arange(len(y_data_for_plot)) | |
| finally: | |
| f.close() | |
| # Create figure | |
| fig = go.Figure() | |
| # Add raw data trace | |
| fig.add_trace(go.Scatter( | |
| x=x_data, | |
| y=y_data_for_plot, | |
| mode='lines+markers', | |
| name='Raw Data', | |
| line=dict(color='steelblue', width=1), | |
| marker=dict(size=3), | |
| opacity=0.7 | |
| )) | |
| # Add moving average if requested | |
| smoothed_info = None | |
| if moving_avg_window and moving_avg_window > 1: | |
| # Validate window size | |
| if moving_avg_window > len(y_data_for_plot): | |
| smoothed_info = f"Moving average window ({moving_avg_window}) larger than data length ({len(y_data_for_plot)}), skipping smoothing" | |
| else: | |
| # Calculate moving average using convolution | |
| window = np.ones(moving_avg_window) / moving_avg_window | |
| y_smooth = np.convolve(y_data_for_plot, window, mode='valid') | |
| # Adjust X data to match smoothed Y length | |
| # 'valid' mode reduces length by (window_size - 1) | |
| offset = (moving_avg_window - 1) // 2 | |
| x_smooth = x_data[offset:offset + len(y_smooth)] | |
| # Add smoothed trace | |
| fig.add_trace(go.Scatter( | |
| x=x_smooth, | |
| y=y_smooth, | |
| mode='lines', | |
| name=f'Moving Avg (n={moving_avg_window})', | |
| line=dict(color='red', width=2), | |
| opacity=0.9 | |
| )) | |
| smoothed_info = f"Applied moving average with window size {moving_avg_window}" | |
| # Set title and labels | |
| title = title_override if title_override else f"Line Plot: {dataset_path}" | |
| xlabel = xlabel_override if xlabel_override else (x_dataset_path if x_dataset_path else "Index") | |
| ylabel = ylabel_override if ylabel_override else dataset_path | |
| fig.update_layout( | |
| title=title, | |
| xaxis_title=xlabel, | |
| yaxis_title=ylabel, | |
| height=500, | |
| showlegend=True, | |
| hovermode='closest' | |
| ) | |
| # Save as interactive HTML | |
| html_path = tempfile.mktemp(suffix='.html', dir='/tmp') | |
| fig.write_html(html_path) | |
| # Return JSON status | |
| result = { | |
| "status": "success", | |
| "message": f"Line plot created for {dataset_path}", | |
| "html_file": html_path, | |
| "statistics": { | |
| "num_points": len(y_data_for_plot), | |
| "y_min": float(np.min(y_data_for_stats)), | |
| "y_max": float(np.max(y_data_for_stats)), | |
| "y_mean": float(np.mean(y_data_for_stats)), | |
| "y_std": float(np.std(y_data_for_stats)) | |
| }, | |
| "download_info": "Download the HTML file to view the interactive line plot in your browser" | |
| } | |
| if smoothed_info: | |
| result["smoothing"] = smoothed_info | |
| return result, html_path | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to create line plot: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| }, None | |
| def create_heatmap(file_path: str, dataset_path: str, slice_str: str = "", | |
| title_override: str = "", xlabel_override: str = "", ylabel_override: str = "", | |
| colorscale: str = "Viridis", memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """ | |
| Create 2D heatmap visualization. | |
| **USE CASE**: Visualize spatial patterns and 2D data distributions. | |
| **WHEN TO USE THIS**: | |
| - Geographic/spatial data (temperature maps, elevation, satellite imagery) | |
| - 2D slices from higher-dimensional data | |
| - Matrices and correlation data | |
| - Any data where you want to see spatial patterns | |
| **FEATURES**: | |
| - Multiple color scales for different data types. However, do not make any claims about what each color represents, as the colormap has default directionalities. | |
| - Auto-validates data is 2D (will slice higher dimensions if needed) | |
| - Interactive hover to inspect values | |
| **OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file. | |
| The HTML file contains an interactive Plotly heatmap viewable in any browser. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to 2D dataset/variable | |
| slice_str: Optional numpy-style slice (must result in 2D data, e.g., "0, :, :" for first time step) | |
| title_override: Optional custom title (leave empty for automatic) | |
| xlabel_override: Optional custom x-axis label (leave empty for automatic) | |
| ylabel_override: Optional custom y-axis label (leave empty for automatic) | |
| colorscale: Plotly colorscale name - options: Viridis, Plasma, Inferno, Magma, Cividis, Blues, Reds, YlOrRd, RdBu (default: Viridis) | |
| memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000) | |
| Returns: | |
| Tuple of (status_dict, html_file_path): | |
| - status_dict: Contains statistics and status info | |
| - html_file_path: Path to interactive HTML file (downloadable) | |
| **Return value structure**: | |
| ( | |
| { | |
| "status": "success", | |
| "html_file": "/tmp/plot789.html", # β Download this | |
| "statistics": { | |
| "shape": [90, 180], | |
| "min": -5.2, | |
| "max": 42.1, | |
| "mean": 15.3, | |
| "std": 8.7, | |
| "colorscale": "Viridis" | |
| } | |
| }, | |
| "/tmp/plot789.html" # HTML file path | |
| ) | |
| **Workflow - Spatial data visualization**: | |
| # Visualize temperature at first time step | |
| result, html_path = create_heatmap( | |
| file_path, | |
| "temperature", | |
| slice_str="0, :, :", # Time=0, all lat, all lon | |
| colorscale="RdBu" # Red-blue for temperature | |
| ) | |
| # Download html_path to see the spatial pattern | |
| **Color scale recommendations**: | |
| - Temperature data: "RdBu" (red-blue diverging) (IMPORTANT: Blue is positive, red is negative here. This MAY be opposite to what you are thinking, but safest to just NOT make a claim about colors.) | |
| - Elevation/depth: "Viridis" or "Plasma" | |
| - Precipitation: "Blues" or "YlOrRd" | |
| - General scientific: "Viridis" (perceptually uniform) | |
| """ | |
| import plotly.graph_objects as go | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"}, None | |
| if not dataset_path: | |
| return {"error": "No dataset path provided", "status": "failed"}, None | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file and read data | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result, None | |
| # Read data | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Keep masked array for statistics | |
| data_for_stats = data | |
| # Convert to filled array for plotting | |
| if isinstance(data, np.ma.MaskedArray): | |
| data_for_plot = data.filled(np.nan) | |
| else: | |
| data_for_plot = np.array(data) | |
| # Ensure 2D | |
| if data_for_plot.ndim == 1: | |
| return { | |
| "error": f"Data is 1D (shape {data_for_plot.shape}). Heatmap requires 2D data. Try using create_line_plot instead.", | |
| "status": "failed" | |
| }, None | |
| elif data_for_plot.ndim > 2: | |
| # Take first 2D slice | |
| while data_for_plot.ndim > 2: | |
| data_for_plot = data_for_plot[0] | |
| suggestion = f"Data has {len(var.shape)}D shape. Showing first 2D slice. Use slice_str to select specific slice." | |
| else: | |
| suggestion = None | |
| finally: | |
| f.close() | |
| # Calculate statistics using masked array | |
| min_val = float(np.min(data_for_stats)) | |
| max_val = float(np.max(data_for_stats)) | |
| mean_val = float(np.mean(data_for_stats)) | |
| std_val = float(np.std(data_for_stats)) | |
| # Create figure using filled array | |
| fig = go.Figure(data=go.Heatmap( | |
| z=data_for_plot, | |
| colorscale=colorscale, | |
| colorbar=dict(title="Value"), | |
| hovertemplate='X: %{x}<br>Y: %{y}<br>Value: %{z}<extra></extra>' | |
| )) | |
| # Set title and labels | |
| title = title_override if title_override else f"Heatmap: {dataset_path}" | |
| xlabel = xlabel_override if xlabel_override else "X" | |
| ylabel = ylabel_override if ylabel_override else "Y" | |
| fig.update_layout( | |
| title=title, | |
| xaxis_title=xlabel, | |
| yaxis_title=ylabel, | |
| height=600, | |
| width=700 | |
| ) | |
| # Save as interactive HTML | |
| html_path = tempfile.mktemp(suffix='.html', dir='/tmp') | |
| fig.write_html(html_path) | |
| # Return JSON status | |
| result = { | |
| "status": "success", | |
| "message": f"Heatmap created for {dataset_path}", | |
| "html_file": html_path, | |
| "statistics": { | |
| "shape": list(data_for_plot.shape), | |
| "min": min_val, | |
| "max": max_val, | |
| "mean": mean_val, | |
| "std": std_val, | |
| "colorscale": colorscale | |
| }, | |
| "download_info": "Download the HTML file to view the interactive heatmap in your browser" | |
| } | |
| if suggestion: | |
| result["note"] = suggestion | |
| return result, html_path | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to create heatmap: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| }, None | |
| def create_scatter_plot(file_path: str, x_dataset_path: str, y_dataset_path: str, | |
| x_slice_str: str = "", y_slice_str: str = "", | |
| title_override: str = "", xlabel_override: str = "", ylabel_override: str = "", | |
| memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """ | |
| Create scatter plot comparing two variables. | |
| **USE CASE**: Explore relationships and correlations between two variables. | |
| **WHEN TO USE THIS**: | |
| - Check if two variables are correlated | |
| - Compare data from different time periods or locations | |
| - Validate model predictions vs observations | |
| - Identify linear or non-linear relationships | |
| **FEATURES**: | |
| - Automatic correlation calculation (Pearson's r) | |
| - Linear regression with RΒ² value | |
| - Regression line overlay | |
| - Independent slicing for X and Y variables | |
| **OUTPUT**: Returns both JSON status (with correlation stats) AND downloadable HTML file. | |
| The HTML file contains an interactive Plotly scatter plot viewable in any browser. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| x_dataset_path: Path to X-axis dataset/variable | |
| y_dataset_path: Path to Y-axis dataset/variable | |
| x_slice_str: Optional numpy-style slice for X data (can differ from Y slice) | |
| y_slice_str: Optional numpy-style slice for Y data (can differ from X slice) | |
| title_override: Optional custom title (leave empty for automatic) | |
| xlabel_override: Optional custom x-axis label (leave empty for automatic) | |
| ylabel_override: Optional custom y-axis label (leave empty for automatic) | |
| memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000) | |
| Returns: | |
| Tuple of (status_dict, html_file_path): | |
| - status_dict: Contains correlation and regression statistics | |
| - html_file_path: Path to interactive HTML file (downloadable) | |
| **Return value structure**: | |
| ( | |
| { | |
| "status": "success", | |
| "html_file": "/tmp/plot999.html", # β Download this | |
| "statistics": { | |
| "num_points": 1620, | |
| "correlation": 0.87, # Pearson correlation coefficient | |
| "r_squared": 0.76, # RΒ² value | |
| "slope": 1.23, | |
| "intercept": -5.4, | |
| "p_value": 1.2e-145 # Statistical significance | |
| } | |
| }, | |
| "/tmp/plot999.html" # HTML file path | |
| ) | |
| **Workflow - Compare time periods**: | |
| # Compare temperature at same location, different times | |
| result, html_path = create_scatter_plot( | |
| file_path, | |
| x_dataset_path="temperature", | |
| y_dataset_path="temperature", | |
| x_slice_str="0, :, :", # Time index 0 | |
| y_slice_str="100, :, :" # Time index 100 | |
| ) | |
| # rΒ² in result["statistics"]["r_squared"] shows how similar the patterns are | |
| """ | |
| import plotly.graph_objects as go | |
| try: | |
| from scipy import stats | |
| except ImportError: | |
| return { | |
| "error": "scipy library not available. Install with: pip install scipy", | |
| "status": "failed" | |
| }, None | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"}, None | |
| if not x_dataset_path or not y_dataset_path: | |
| return {"error": "Both X and Y dataset paths required", "status": "failed"}, None | |
| # Extract clean paths | |
| x_dataset_path = x_dataset_path.split(" (")[0] if " (" in x_dataset_path else x_dataset_path | |
| y_dataset_path = y_dataset_path.split(" (")[0] if " (" in y_dataset_path else y_dataset_path | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get X and Y datasets (may be the same variable) | |
| if file_type == "HDF5": | |
| x_var = f[x_dataset_path] | |
| # Get Y separately even if same path (avoid reference issues) | |
| y_var = f[y_dataset_path] | |
| else: | |
| x_var = f.variables[x_dataset_path] | |
| y_var = f.variables[y_dataset_path] | |
| # Check memory safety for X | |
| x_safety = check_memory_safety(x_var.shape, x_var.dtype, x_slice_str, memory_limit_mb) | |
| if not x_safety["safe"]: | |
| error_result = { | |
| "error": f"X data: {x_safety['error']}", | |
| "estimated_size_mb": x_safety["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in x_safety: | |
| error_result["suggested_slice_x"] = x_safety["suggested_slice"] | |
| return error_result, None | |
| # Check memory safety for Y (use same variable shape if same dataset) | |
| y_safety = check_memory_safety(y_var.shape, y_var.dtype, y_slice_str, memory_limit_mb) | |
| if not y_safety["safe"]: | |
| error_result = { | |
| "error": f"Y data: {y_safety['error']}", | |
| "estimated_size_mb": y_safety["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in y_safety: | |
| error_result["suggested_slice_y"] = y_safety["suggested_slice"] | |
| return error_result, None | |
| # Read X data - parse slice and read | |
| if x_slice_str and x_slice_str.strip(): | |
| x_idx = parse_slice(x_slice_str) | |
| x_data = x_var[x_idx] | |
| else: | |
| x_data = x_var[:] | |
| # Read Y data - parse slice and read | |
| if y_slice_str and y_slice_str.strip(): | |
| y_idx = parse_slice(y_slice_str) | |
| y_data = y_var[y_idx] | |
| else: | |
| y_data = y_var[:] | |
| # Keep masked arrays for statistics | |
| x_data_for_stats = x_data | |
| y_data_for_stats = y_data | |
| # Convert to filled arrays for plotting | |
| if isinstance(x_data, np.ma.MaskedArray): | |
| x_data_for_plot = x_data.filled(np.nan) | |
| else: | |
| x_data_for_plot = np.array(x_data) | |
| if isinstance(y_data, np.ma.MaskedArray): | |
| y_data_for_plot = y_data.filled(np.nan) | |
| else: | |
| y_data_for_plot = np.array(y_data) | |
| # Flatten both arrays | |
| x_data_for_plot = x_data_for_plot.flatten() | |
| y_data_for_plot = y_data_for_plot.flatten() | |
| # Check lengths match | |
| if len(x_data_for_plot) != len(y_data_for_plot): | |
| return { | |
| "error": f"X and Y data length mismatch after slicing: {len(x_data_for_plot)} vs {len(y_data_for_plot)}", | |
| "x_slice": x_slice_str if x_slice_str else "no slice", | |
| "y_slice": y_slice_str if y_slice_str else "no slice", | |
| "x_shape_after_slice": x_data_for_plot.shape, | |
| "y_shape_after_slice": y_data_for_plot.shape, | |
| "suggestion": "Adjust slices to produce equal-length arrays", | |
| "status": "failed" | |
| }, None | |
| finally: | |
| f.close() | |
| # Calculate correlation (check for valid data) | |
| if len(x_data_for_plot) < 2: | |
| return { | |
| "error": f"Not enough data points for correlation: {len(x_data_for_plot)} points (need at least 2)", | |
| "status": "failed" | |
| }, None | |
| # For regression, need to filter out NaN/masked pairs | |
| # Use the plot data (which has NaN for masked values) and create valid mask | |
| x_flat = x_data_for_plot.flatten() | |
| y_flat = y_data_for_plot.flatten() | |
| # Create mask for valid (non-NaN) pairs | |
| valid_mask = ~(np.isnan(x_flat) | np.isnan(y_flat)) | |
| x_valid = x_flat[valid_mask] | |
| y_valid = y_flat[valid_mask] | |
| if len(x_valid) < 2: | |
| return { | |
| "error": f"Not enough valid (non-NaN) data points: {len(x_valid)} points (need at least 2)", | |
| "status": "failed" | |
| }, None | |
| # Compute correlation and regression on valid data only | |
| correlation = float(np.corrcoef(x_valid, y_valid)[0, 1]) | |
| slope, intercept, r_value, p_value, std_err = stats.linregress(x_valid, y_valid) | |
| # Create figure | |
| fig = go.Figure() | |
| # Add scatter points | |
| fig.add_trace(go.Scatter( | |
| x=x_data_for_plot, | |
| y=y_data_for_plot, | |
| mode='markers', | |
| name='Data', | |
| marker=dict( | |
| size=6, | |
| color='steelblue', | |
| opacity=0.6 | |
| ), | |
| hovertemplate=f'{x_dataset_path}: %{{x}}<br>{y_dataset_path}: %{{y}}<extra></extra>' | |
| )) | |
| # Add regression line (use nanmin/nanmax to handle NaN values) | |
| x_range = np.array([np.nanmin(x_data_for_plot), np.nanmax(x_data_for_plot)]) | |
| y_pred = slope * x_range + intercept | |
| fig.add_trace(go.Scatter( | |
| x=x_range, | |
| y=y_pred, | |
| mode='lines', | |
| name=f'Fit (RΒ²={r_value**2:.3f})', | |
| line=dict(color='red', width=2, dash='dash'), | |
| showlegend=True | |
| )) | |
| # Set title and labels | |
| title = title_override if title_override else f"{y_dataset_path} vs {x_dataset_path}" | |
| xlabel = xlabel_override if xlabel_override else x_dataset_path | |
| ylabel = ylabel_override if ylabel_override else y_dataset_path | |
| fig.update_layout( | |
| title=title, | |
| xaxis_title=xlabel, | |
| yaxis_title=ylabel, | |
| height=600, | |
| width=700, | |
| showlegend=True, | |
| hovermode='closest' | |
| ) | |
| # Save as interactive HTML | |
| html_path = tempfile.mktemp(suffix='.html', dir='/tmp') | |
| fig.write_html(html_path) | |
| # Return JSON status | |
| result = { | |
| "status": "success", | |
| "message": f"Scatter plot created for {y_dataset_path} vs {x_dataset_path}", | |
| "html_file": html_path, | |
| "statistics": { | |
| "num_points": len(x_data_for_plot), | |
| "correlation": correlation, | |
| "r_squared": float(r_value**2), | |
| "slope": float(slope), | |
| "intercept": float(intercept), | |
| "p_value": float(p_value) | |
| }, | |
| "download_info": "Download the HTML file to view the interactive scatter plot in your browser" | |
| } | |
| return result, html_path | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to create scatter plot: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| }, None | |
| def coordinate_to_index(file_path: str, coordinate_var: str, value: float) -> Dict[str, Any]: | |
| """ | |
| Convert coordinate value to array index. | |
| **USE CASE**: "What index corresponds to coordinate value X?" | |
| Example: "What index corresponds to latitude 35.5?" or "What time index is year 2020?" | |
| **WHEN TO USE THIS**: | |
| - You have a coordinate value and need to find the nearest index for slicing | |
| - You're building a slice string and need to know which indices to use | |
| - You want to extract data at a specific coordinate location | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| coordinate_var: Name of coordinate variable (e.g., 'lat', 'lon', 'time', 'depth') | |
| value: Coordinate value to find (will find nearest match) | |
| Returns: | |
| Dictionary with keys: | |
| - status (str): "success" or error info | |
| - index (int): The nearest index for this coordinate value | |
| - requested_value (float): The value you searched for | |
| - actual_value (float): The actual coordinate value at the found index | |
| - distance (float): How far the actual value is from your requested value | |
| - units (str): Units of the coordinate | |
| - closest_match (bool): True if within half a grid spacing (good match) | |
| - coordinate_variable (str): Name of the coordinate variable | |
| - array_size (int): Total size of coordinate array | |
| **Return value structure**: | |
| { | |
| "status": "success", | |
| "index": 45, # β Use this index in your slice | |
| "requested_value": 35.5, | |
| "actual_value": 36.0, | |
| "distance": 0.5, | |
| "closest_match": true, | |
| "units": "degrees_north" | |
| } | |
| **Example workflow - Extract data at specific location**: | |
| # Find index for latitude 40.0 | |
| lat_result = coordinate_to_index(file_path, "lat", 40.0) | |
| lat_idx = lat_result["index"] | |
| # Find index for longitude -75.0 | |
| lon_result = coordinate_to_index(file_path, "lon", -75.0) | |
| lon_idx = lon_result["index"] | |
| # Now use these indices to slice the data | |
| slice_str = f":, {lat_idx}, {lon_idx}" # All times, specific lat/lon | |
| data = read_dataset(file_path, "temperature", slice_str) | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"} | |
| if not coordinate_var: | |
| return {"error": "No coordinate variable specified", "status": "failed"} | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get coordinate variable | |
| if file_type == "HDF5": | |
| if coordinate_var not in f: | |
| available = list(f.keys()) | |
| return { | |
| "error": f"Coordinate variable '{coordinate_var}' not found", | |
| "available_variables": available[:20], # Limit to avoid token explosion | |
| "status": "failed" | |
| } | |
| coord_data = np.array(f[coordinate_var][:]) | |
| units = f[coordinate_var].attrs.get('units', 'unknown') if hasattr(f[coordinate_var], 'attrs') else 'unknown' | |
| else: # NetCDF | |
| if coordinate_var not in f.variables: | |
| available = list(f.variables.keys()) | |
| return { | |
| "error": f"Coordinate variable '{coordinate_var}' not found", | |
| "available_variables": available[:20], | |
| "status": "failed" | |
| } | |
| var = f.variables[coordinate_var] | |
| coord_data = np.array(var[:]) | |
| units = var.units if hasattr(var, 'units') else 'unknown' | |
| # Find closest index | |
| idx = int(np.argmin(np.abs(coord_data - value))) | |
| actual_value = float(coord_data[idx]) | |
| distance = abs(actual_value - value) | |
| # Calculate if within half grid spacing (good match) | |
| if len(coord_data) > 1: | |
| grid_spacing = np.mean(np.abs(np.diff(coord_data))) | |
| closest_match = distance < grid_spacing / 2 | |
| else: | |
| closest_match = True | |
| return { | |
| "status": "success", | |
| "index": idx, | |
| "requested_value": value, | |
| "actual_value": actual_value, | |
| "distance": distance, | |
| "units": str(units), | |
| "closest_match": closest_match, | |
| "coordinate_variable": coordinate_var, | |
| "array_size": len(coord_data) | |
| } | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to convert coordinate to index: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| } | |
| def index_to_coordinate(file_path: str, coordinate_var: str, index: int) -> Dict[str, Any]: | |
| """ | |
| Convert array index to coordinate value. | |
| **USE CASE**: "What is the actual value at index N?" | |
| Example: "What date is at time index 852?" or "What latitude is at index 45?" | |
| **WHEN TO USE THIS**: | |
| - You have an index number and need the corresponding coordinate value | |
| - You're interpreting results from other tools that return indices | |
| - You need to verify what a specific index represents | |
| **DO NOT use this if**: | |
| - The coordinate values are already in the file attributes β use get_attributes() | |
| - You want to read the entire coordinate array β use read_dataset() | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| coordinate_var: Name of coordinate variable (e.g., 'lat', 'lon', 'time', 'depth') | |
| index: Array index to look up (0-based indexing) | |
| Returns: | |
| Dictionary with keys: | |
| - status (str): "success" or error info | |
| - index (int): The queried index | |
| - value (float): The coordinate value at that index | |
| - units (str): Units of the coordinate (e.g., "degrees_north", "days since 1800-01-01") | |
| - coordinate_variable (str): Name of the coordinate variable | |
| - array_size (int): Total size of coordinate array | |
| **Return value structure**: | |
| { | |
| "status": "success", | |
| "index": 852, | |
| "value": 55165.0, # β The coordinate value you need | |
| "units": "days since 1800-01-01 00:00:00", | |
| "coordinate_variable": "time", | |
| "array_size": 1750 | |
| } | |
| **Example workflow**: | |
| # You found that interesting data is at index 100 | |
| result = index_to_coordinate(file_path, "time", 100) | |
| time_value = result["value"] # Get the actual time value | |
| units = result["units"] # Get the units to interpret it | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"} | |
| if not coordinate_var: | |
| return {"error": "No coordinate variable specified", "status": "failed"} | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get coordinate variable | |
| if file_type == "HDF5": | |
| if coordinate_var not in f: | |
| available = list(f.keys()) | |
| return { | |
| "error": f"Coordinate variable '{coordinate_var}' not found", | |
| "available_variables": available[:20], | |
| "status": "failed" | |
| } | |
| coord_data = np.array(f[coordinate_var][:]) | |
| units = f[coordinate_var].attrs.get('units', 'unknown') if hasattr(f[coordinate_var], 'attrs') else 'unknown' | |
| else: # NetCDF | |
| if coordinate_var not in f.variables: | |
| available = list(f.variables.keys()) | |
| return { | |
| "error": f"Coordinate variable '{coordinate_var}' not found", | |
| "available_variables": available[:20], | |
| "status": "failed" | |
| } | |
| var = f.variables[coordinate_var] | |
| coord_data = np.array(var[:]) | |
| units = var.units if hasattr(var, 'units') else 'unknown' | |
| # Check index bounds | |
| if index < 0 or index >= len(coord_data): | |
| return { | |
| "error": f"Index {index} out of bounds (array size: {len(coord_data)})", | |
| "valid_range": f"0 to {len(coord_data)-1}", | |
| "status": "failed" | |
| } | |
| value = float(coord_data[index]) | |
| return { | |
| "status": "success", | |
| "index": index, | |
| "value": value, | |
| "units": str(units), | |
| "coordinate_variable": coordinate_var, | |
| "array_size": len(coord_data) | |
| } | |
| finally: | |
| f.close() | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to convert index to coordinate: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| } | |
| def query_content(file_path: str, query: str) -> Dict[str, Any]: | |
| """ | |
| Semantic search through file contents - helps find relevant data. | |
| **USE CASE**: "Find all datasets related to X" where X is a keyword. | |
| **WHAT IT SEARCHES**: | |
| - Dataset/variable names | |
| - Attribute names and values | |
| - Dimension names | |
| - Long names and descriptions in metadata | |
| **WHEN TO USE THIS**: | |
| - You don't know the exact dataset name but know what you're looking for | |
| - You want to find all variables related to a topic (e.g., "temperature", "wind", "pressure") | |
| - The file has many variables and you need to narrow down | |
| **SEARCH TIPS**: | |
| - Use specific keywords (e.g., "temp", "precip", "wind") | |
| - Search is case-insensitive | |
| - Partial matches work (searching "temp" finds "temperature", "temp_anomaly", etc.) | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| query: Search term (case-insensitive, matches substrings) | |
| Returns: | |
| Dictionary with keys: | |
| - datasets (list): Matching datasets with match reason | |
| - dimensions (list): Matching dimensions | |
| - num_dataset_matches (int): Count of dataset matches | |
| - num_dimension_matches (int): Count of dimension matches | |
| - total_matches (int): Total matches found | |
| - query (str): The search term used | |
| - message (str): Summary of results | |
| **Return value structure**: | |
| { | |
| "datasets": [ | |
| { | |
| "name": "air_temperature", | |
| "shape": [100, 90, 180], | |
| "dtype": "float32", | |
| "match_reason": "name", | |
| "long_name": "Air temperature", | |
| "units": "K" | |
| } | |
| ], | |
| "dimensions": [], | |
| "total_matches": 1, | |
| "message": "Found 1 matches for query: 'temp'" | |
| } | |
| **Example workflow - Finding the right variable**: | |
| # Search for temperature-related variables | |
| results = query_content(file_path, "temp") | |
| # Review matches and pick the right one | |
| dataset_name = results["datasets"][0]["name"] # e.g., "air_temperature" | |
| # Now use that dataset name | |
| stats = compute_statistics(file_path, dataset_name) | |
| """ | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"} | |
| if not query or not query.strip(): | |
| return {"error": "No query string provided", "status": "failed"} | |
| query_lower = query.lower().strip() | |
| # Open file | |
| f, file_type = open_file_with_fallback(file_path) | |
| matches = { | |
| "datasets": [], | |
| "attributes": [], | |
| "dimensions": [] | |
| } | |
| try: | |
| if file_type == "HDF5": | |
| def search_visitor(name, obj): | |
| full_path = "/" + name | |
| # Check dataset names | |
| if isinstance(obj, h5py.Dataset): | |
| if query_lower in name.lower(): | |
| matches["datasets"].append({ | |
| "path": full_path, | |
| "shape": list(obj.shape), | |
| "dtype": str(obj.dtype), | |
| "match_reason": "name" | |
| }) | |
| # Check dataset attributes | |
| for attr_name, attr_val in obj.attrs.items(): | |
| if query_lower in attr_name.lower() or query_lower in str(attr_val).lower(): | |
| if full_path not in [d["path"] for d in matches["datasets"]]: | |
| matches["datasets"].append({ | |
| "path": full_path, | |
| "shape": list(obj.shape), | |
| "dtype": str(obj.dtype), | |
| "match_reason": f"attribute: {attr_name}" | |
| }) | |
| break | |
| f.visititems(search_visitor) | |
| else: # NetCDF | |
| # Search variable names and attributes | |
| for var_name, var in f.variables.items(): | |
| matched = False | |
| match_reason = None | |
| # Check variable name | |
| if query_lower in var_name.lower(): | |
| matched = True | |
| match_reason = "name" | |
| # Check attributes | |
| if not matched: | |
| var_attrs = get_var_attrs_dict(var, file_type) | |
| for attr, attr_val in var_attrs.items(): | |
| if query_lower in attr.lower() or query_lower in str(attr_val).lower(): | |
| matched = True | |
| match_reason = f"attribute: {attr}" | |
| break | |
| if matched: | |
| info = { | |
| "name": var_name, | |
| "dimensions": list(var.dimensions) if hasattr(var, 'dimensions') else [], | |
| "shape": list(var.shape), | |
| "dtype": str(var.dtype), | |
| "match_reason": match_reason | |
| } | |
| # Add key attributes | |
| if hasattr(var, 'long_name'): | |
| info["long_name"] = str(var.long_name) | |
| if hasattr(var, 'units'): | |
| info["units"] = str(var.units) | |
| matches["datasets"].append(info) | |
| # Search dimensions | |
| for dim_name in f.dimensions.keys(): | |
| if query_lower in dim_name.lower(): | |
| matches["dimensions"].append({ | |
| "name": dim_name, | |
| "size": len(f.dimensions[dim_name]) | |
| }) | |
| finally: | |
| f.close() | |
| matches["status"] = "success" | |
| matches["query"] = query | |
| matches["num_dataset_matches"] = len(matches["datasets"]) | |
| matches["num_dimension_matches"] = len(matches["dimensions"]) | |
| matches["total_matches"] = len(matches["datasets"]) + len(matches["dimensions"]) | |
| if matches["total_matches"] == 0: | |
| matches["message"] = f"No matches found for query: '{query}'" | |
| else: | |
| matches["message"] = f"Found {matches['total_matches']} matches for query: '{query}'" | |
| return matches | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to search file contents: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| } | |
| def UI_HELPER(file_obj) -> Dict[str, Any]: | |
| """ | |
| UI ONLY function - MCP clients ignore this! | |
| Browser interface helper for local files. | |
| Returns file path for use in other tools. | |
| Args: | |
| file_obj: File object from browser interface | |
| Returns: | |
| Dictionary with file_path | |
| """ | |
| import shutil | |
| try: | |
| if file_obj is None: | |
| return { | |
| "error": "No file provided", | |
| "status": "failed" | |
| } | |
| # Get the file path from the file object | |
| # Gradio file objects have a .name attribute with the temp path | |
| if hasattr(file_obj, 'name'): | |
| file_path = file_obj.name | |
| else: | |
| file_path = str(file_obj) | |
| # Verify it's an HDF5 or NetCDF file | |
| if not any(file_path.endswith(ext) for ext in ['.h5', '.hdf5', '.nc', '.nc4']): | |
| return { | |
| "error": f"Unsupported file type. Must be .h5, .hdf5, .nc, or .nc4", | |
| "file_path": file_path, | |
| "status": "failed" | |
| } | |
| # Try to open to validate | |
| try: | |
| f, file_type = open_file_with_fallback(file_path) | |
| f.close() | |
| except Exception as e: | |
| return { | |
| "error": f"Failed to open file: {str(e)}", | |
| "file_path": file_path, | |
| "status": "failed" | |
| } | |
| return { | |
| "status": "success", | |
| "file_path": file_path, | |
| "message": "File ready! Copy the file_path above to use in other tools.", | |
| "instruction": "Use this file_path in the other tabs (Data Analysis, Visualizations)" | |
| } | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to process file: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| } | |
| def download_example(example_name: str) -> Dict[str, Any]: | |
| """ | |
| Download a pre-configured example dataset. | |
| Provides quick access to curated test datasets for exploration. | |
| Args: | |
| example_name: Name of example dataset to download | |
| Returns: | |
| Dictionary containing file_path and file structure (same as download_file) | |
| """ | |
| # Curated example datasets from xarray-data repository | |
| examples = { | |
| "small_netcdf": { | |
| "url": "https://github.com/pydata/xarray-data/raw/master/tiny.nc", | |
| "description": "π°οΈ Tiny NetCDF - Small test file for quick validation" | |
| }, | |
| "ocean_basin": { | |
| "url": "https://github.com/pydata/xarray-data/raw/master/basin_mask.nc", | |
| "description": "π Ocean Basin Mask - Global ocean basin classification" | |
| }, | |
| "air_temperature": { | |
| "url": "https://github.com/pydata/xarray-data/raw/master/air_temperature.nc", | |
| "description": "π¬ Air Temperature - NCEP/NCAR reanalysis data" | |
| }, | |
| "ocean_model": { | |
| "url": "https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc", | |
| "description": "π Ocean Model - ROMS (Regional Ocean Modeling System) output" | |
| }, | |
| "era_interim": { | |
| "url": "https://github.com/pydata/xarray-data/raw/master/eraint_uvz.nc", | |
| "description": "βοΈ ERA-Interim - Wind and geopotential height data" | |
| } | |
| } | |
| if not example_name: | |
| return { | |
| "error": "No example name provided", | |
| "available_examples": list(examples.keys()), | |
| "status": "failed" | |
| } | |
| if example_name not in examples: | |
| return { | |
| "error": f"Unknown example: '{example_name}'", | |
| "available_examples": list(examples.keys()), | |
| "descriptions": {k: v["description"] for k, v in examples.items()}, | |
| "status": "failed" | |
| } | |
| # Use the existing download_file function | |
| example_info = examples[example_name] | |
| result = download_file(example_info["url"]) | |
| # Add example metadata to result | |
| if "status" in result and result["status"] == "success": | |
| result["example_name"] = example_name | |
| result["example_description"] = example_info["description"] | |
| return result | |
| def create_contour_plot(file_path: str, dataset_path: str, slice_str: str = "", | |
| title_override: str = "", xlabel_override: str = "", ylabel_override: str = "", | |
| num_contours: int = 20, colorscale: str = "Viridis", | |
| memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """ | |
| Create contour plot for 2D data. | |
| **USE CASE**: Visualize elevation/intensity maps with labeled contour lines. | |
| **WHEN TO USE THIS**: | |
| - Topographic/bathymetric data (elevation, ocean depth) | |
| - Pressure/temperature fields with distinct levels | |
| - When you need to see specific value boundaries | |
| - Geographic data where contours show meaningful thresholds | |
| **DIFFERENCE FROM HEATMAP**: | |
| - Heatmap: Continuous color gradient, best for overall patterns | |
| - Contour: Discrete lines at specific values, best for identifying levels | |
| **FEATURES**: | |
| - Labeled contour lines showing exact values | |
| - Adjustable number of contour levels | |
| - Multiple color scales | |
| - Interactive hover to see values | |
| **OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file. | |
| The HTML file contains an interactive Plotly contour plot viewable in any browser. | |
| Args: | |
| file_path: Path to the HDF5/NetCDF file | |
| dataset_path: Path to 2D dataset/variable | |
| slice_str: Optional numpy-style slice (must result in 2D data, e.g., "0, :, :" for first time step) | |
| title_override: Optional custom title (leave empty for automatic) | |
| xlabel_override: Optional custom x-axis label (leave empty for automatic) | |
| ylabel_override: Optional custom y-axis label (leave empty for automatic) | |
| num_contours: Number of contour levels (default: 20, range: 5-50) | |
| colorscale: Plotly colorscale name - same options as heatmap (default: Viridis) | |
| memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000) | |
| Returns: | |
| Tuple of (status_dict, html_file_path): | |
| - status_dict: Contains statistics and contour info | |
| - html_file_path: Path to interactive HTML file (downloadable) | |
| **Return value structure**: | |
| ( | |
| { | |
| "status": "success", | |
| "html_file": "/tmp/plot321.html", # β Download this | |
| "statistics": { | |
| "shape": [90, 180], | |
| "min": -5000.0, | |
| "max": 8848.0, | |
| "mean": 237.5, | |
| "std": 1250.3, | |
| "num_contours": 20, | |
| "colorscale": "Viridis" | |
| } | |
| }, | |
| "/tmp/plot321.html" # HTML file path | |
| ) | |
| **Workflow - Elevation mapping**: | |
| # Create topographic map with 30 elevation contours | |
| result, html_path = create_contour_plot( | |
| file_path, | |
| "elevation", | |
| num_contours=30, | |
| colorscale="YlOrRd" | |
| ) | |
| # Download html_path to see elevation levels with labeled contours | |
| """ | |
| import plotly.graph_objects as go | |
| try: | |
| if not file_path: | |
| return {"error": "No file path provided", "status": "failed"}, None | |
| if not dataset_path: | |
| return {"error": "No dataset path provided", "status": "failed"}, None | |
| # Extract clean path | |
| dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path | |
| # Open file and read data | |
| f, file_type = open_file_with_fallback(file_path) | |
| try: | |
| # Get dataset | |
| if file_type == "HDF5": | |
| var = f[dataset_path] | |
| else: | |
| var = f.variables[dataset_path] | |
| # Check memory safety | |
| safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb) | |
| if not safety_check["safe"]: | |
| error_result = { | |
| "error": safety_check["error"], | |
| "estimated_size_mb": safety_check["estimated_mb"], | |
| "status": "failed" | |
| } | |
| if "suggested_slice" in safety_check: | |
| error_result["suggested_slice"] = safety_check["suggested_slice"] | |
| error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'" | |
| return error_result, None | |
| # Read data | |
| if slice_str and slice_str.strip(): | |
| idx = parse_slice(slice_str) | |
| data = var[idx] | |
| else: | |
| data = var[:] | |
| # Keep masked array for statistics | |
| data_for_stats = data | |
| # Convert to filled array for plotting | |
| if isinstance(data, np.ma.MaskedArray): | |
| data_for_plot = data.filled(np.nan) | |
| else: | |
| data_for_plot = np.array(data) | |
| # Ensure 2D | |
| if data_for_plot.ndim == 1: | |
| return { | |
| "error": f"Data is 1D (shape {data_for_plot.shape}). Contour plot requires 2D data.", | |
| "status": "failed" | |
| }, None | |
| elif data_for_plot.ndim > 2: | |
| # Take first 2D slice | |
| while data_for_plot.ndim > 2: | |
| data_for_plot = data_for_plot[0] | |
| suggestion = f"Data has {len(var.shape)}D shape. Showing first 2D slice. Use slice_str to select specific slice." | |
| else: | |
| suggestion = None | |
| finally: | |
| f.close() | |
| # Calculate statistics using masked array | |
| min_val = float(np.min(data_for_stats)) | |
| max_val = float(np.max(data_for_stats)) | |
| mean_val = float(np.mean(data_for_stats)) | |
| std_val = float(np.std(data_for_stats)) | |
| # Create figure using filled array | |
| fig = go.Figure(data=go.Contour( | |
| z=data_for_plot, | |
| colorscale=colorscale, | |
| ncontours=num_contours, | |
| colorbar=dict(title="Value"), | |
| hovertemplate='X: %{x}<br>Y: %{y}<br>Value: %{z}<extra></extra>', | |
| contours=dict( | |
| showlabels=True, | |
| labelfont=dict(size=10, color='white') | |
| ) | |
| )) | |
| # Set title and labels | |
| title = title_override if title_override else f"Contour Plot: {dataset_path}" | |
| xlabel = xlabel_override if xlabel_override else "X" | |
| ylabel = ylabel_override if ylabel_override else "Y" | |
| fig.update_layout( | |
| title=title, | |
| xaxis_title=xlabel, | |
| yaxis_title=ylabel, | |
| height=600, | |
| width=700 | |
| ) | |
| # Save as interactive HTML | |
| html_path = tempfile.mktemp(suffix='.html', dir='/tmp') | |
| fig.write_html(html_path) | |
| # Return JSON status | |
| result = { | |
| "status": "success", | |
| "message": f"Contour plot created for {dataset_path}", | |
| "html_file": html_path, | |
| "statistics": { | |
| "shape": list(data_for_plot.shape), | |
| "min": min_val, | |
| "max": max_val, | |
| "mean": mean_val, | |
| "std": std_val, | |
| "num_contours": num_contours, | |
| "colorscale": colorscale | |
| }, | |
| "download_info": "Download the HTML file to view the interactive contour plot in your browser" | |
| } | |
| if suggestion: | |
| result["note"] = suggestion | |
| return result, html_path | |
| except Exception as e: | |
| import traceback | |
| return { | |
| "error": f"Failed to create contour plot: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "status": "failed" | |
| }, None | |
| # ============================================ | |
| # GRADIO MCP TOOLS (STATELESS) | |
| # These are the ONLY functions exposed as MCP tools | |
| # ============================================ | |
| def build_mcp_tools(): | |
| """ | |
| Build explicit MCP tool interfaces. | |
| CRITICAL: Use only Textbox/Number/Radio components - NO Dropdowns with state! | |
| """ | |
| tools = [] | |
| # Tool 1: Download File | |
| tools.append(gr.Interface( | |
| fn=download_file, | |
| inputs=[ | |
| gr.Textbox( | |
| label="URL", | |
| placeholder="https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc", | |
| info="Direct URL to HDF5 (.h5, .hdf5) or NetCDF (.nc, .nc4) file" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Download Result - Copy the 'file_path' value to use in other tools"), | |
| api_name="download_file", | |
| title="Download File", | |
| description="Download and parse HDF5/NetCDF file from URL. Returns file_path to use in other tools. IMPORTANT: Copy the 'file_path' from the result to use in other tool tabs." | |
| )) | |
| # Tool 1b: UI Helper (UI ONLY) | |
| tools.append(gr.Interface( | |
| fn=UI_HELPER, | |
| inputs=[ | |
| gr.File( | |
| label="Select Local File", | |
| file_types=[".h5", ".hdf5", ".nc", ".nc4"], | |
| type="filepath" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="File Path Result - Copy the 'file_path' value to use in other tools"), | |
| api_name="UI_HELPER", | |
| title="Local File Upload", | |
| description="UI ONLY function - MCP clients ignore this! Browser interface helper. Returns file_path for use in other tools." | |
| )) | |
| # Tool 1c: Download Example Dataset | |
| tools.append(gr.Interface( | |
| fn=download_example, | |
| inputs=[ | |
| gr.Radio( | |
| label="Select Example Dataset", | |
| choices=[ | |
| "small_netcdf", | |
| "ocean_basin", | |
| "air_temperature", | |
| "ocean_model", | |
| "era_interim" | |
| ], | |
| value="small_netcdf", | |
| info="Pre-configured example datasets for quick exploration" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Download Result - Copy the 'file_path' value to use in other tools"), | |
| api_name="download_example", | |
| title="Download Example Dataset", | |
| description="Download pre-configured example datasets. Returns file_path and structure. Available: small_netcdf, ocean_basin, air_temperature, ocean_model, era_interim" | |
| )) | |
| # Tool 2: List Structure | |
| tools.append(gr.Interface( | |
| fn=list_structure, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="File Structure"), | |
| api_name="list_structure", | |
| title="List Structure", | |
| description="List complete hierarchical structure of the file" | |
| )) | |
| # Tool 3: Read Dataset | |
| tools.append(gr.Interface( | |
| fn=read_dataset, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to dataset/variable (e.g., 'temperature', '/group/data')" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0:10, :, 5", | |
| value="", | |
| info="NumPy-style slice notation" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Dataset Contents"), | |
| api_name="read_dataset", | |
| title="Read Dataset", | |
| description="Read data from a specific dataset/variable. Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max). Override with memory_limit_mb if needed." | |
| )) | |
| # Tool 4: Get Dataset Preview | |
| tools.append(gr.Interface( | |
| fn=get_dataset_preview, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to dataset/variable" | |
| ), | |
| gr.Number( | |
| label="Preview Size", | |
| value=10, | |
| minimum=5, | |
| maximum=100, | |
| info="Number of elements to preview" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Dataset Preview"), | |
| api_name="get_dataset_preview", | |
| title="Preview Dataset", | |
| description="Safely preview dataset without loading full array" | |
| )) | |
| # Tool 5: Get Attributes | |
| tools.append(gr.Interface( | |
| fn=get_attributes, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Object Path (optional)", | |
| placeholder="", | |
| value="", | |
| info="Path to dataset/group (empty for file-level attributes)" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Attributes"), | |
| api_name="get_attributes", | |
| title="Get Attributes", | |
| description="Get metadata attributes for file, group, or dataset" | |
| )) | |
| # Tool 6: Get Dimensions | |
| tools.append(gr.Interface( | |
| fn=get_dimensions, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path (optional)", | |
| placeholder="", | |
| value="", | |
| info="Path to dataset (empty for NetCDF to see all dimensions)" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Dimension Information"), | |
| api_name="get_dimensions", | |
| title="Get Dimensions", | |
| description="Get dimension and shape information" | |
| )) | |
| # Tool 7: Compute Statistics | |
| tools.append(gr.Interface( | |
| fn=compute_statistics, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0:10, :, :", | |
| value="", | |
| info="Compute statistics over a subset" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Statistical Results"), | |
| api_name="compute_statistics", | |
| title="Compute Statistics", | |
| description="Calculate statistical measures (min, max, mean, std, median). Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)." | |
| )) | |
| # Tool 8: Export Data | |
| tools.append(gr.Interface( | |
| fn=export_data, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to dataset/variable" | |
| ), | |
| gr.Radio( | |
| choices=["csv", "json"], | |
| value="csv", | |
| label="Format", | |
| info="Export format" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0:100, :, :", | |
| value="", | |
| info="Export a subset of data" | |
| ), | |
| gr.Number( | |
| label="Max Rows (CSV only)", | |
| value=100000, | |
| minimum=1000, | |
| maximum=1000000, | |
| info="Safety limit for CSV files" | |
| ) | |
| ], | |
| outputs=[gr.JSON(label="Export Status"), gr.File(label="Download File")], | |
| api_name="export_data", | |
| title="Export Data", | |
| description="Export dataset to CSV or JSON format. Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)." | |
| )) | |
| # Tool 9: Create Histogram | |
| tools.append(gr.Interface( | |
| fn=create_histogram, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/tmp/file.nc", | |
| info="Path to the HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0, :, :", | |
| value="", | |
| info="Optional subsetting before histogram" | |
| ), | |
| gr.Textbox( | |
| label="Title (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic title" | |
| ), | |
| gr.Textbox( | |
| label="X-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic x-axis label" | |
| ), | |
| gr.Number( | |
| label="Number of Bins", | |
| value=50, | |
| minimum=5, | |
| maximum=200, | |
| info="Number of bins for histogram (default: 50)" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Histogram Status & Statistics"), | |
| gr.File(label="Download Interactive HTML") | |
| ], | |
| api_name="create_histogram", | |
| title="Create Histogram", | |
| description="Generate histogram showing distribution of values. Returns interactive plot and downloadable HTML file." | |
| )) | |
| # Tool 10: Line Plot | |
| tools.append(gr.Interface( | |
| fn=create_line_plot, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Y Dataset Path", | |
| placeholder="temperature", | |
| info="Path to Y-axis dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0:100, :, 5", | |
| value="", | |
| info="Numpy-style slice notation" | |
| ), | |
| gr.Textbox( | |
| label="Title (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic title" | |
| ), | |
| gr.Textbox( | |
| label="X-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic x-axis label" | |
| ), | |
| gr.Textbox( | |
| label="Y-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic y-axis label" | |
| ), | |
| gr.Textbox( | |
| label="X Dataset Path (optional)", | |
| placeholder="time", | |
| value="", | |
| info="Path to X-axis data (if empty, uses indices)" | |
| ), | |
| gr.Number( | |
| label="Moving Average Window (optional)", | |
| value=0, | |
| minimum=0, | |
| maximum=1000, | |
| info="Window size for smoothing (0 or 1 = no smoothing, 2-1000 = apply moving average)" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Line Plot Status & Statistics"), | |
| gr.File(label="Download Interactive HTML") | |
| ], | |
| api_name="create_line_plot", | |
| title="Create Line Plot", | |
| description="Generate line plot for 1D data or time series. Returns statistics and downloadable HTML file." | |
| )) | |
| # Tool 11: Heatmap | |
| tools.append(gr.Interface( | |
| fn=create_heatmap, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="temperature", | |
| info="Path to 2D dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0, :, :", | |
| value="", | |
| info="Numpy-style slice notation (must result in 2D)" | |
| ), | |
| gr.Textbox( | |
| label="Title (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic title" | |
| ), | |
| gr.Textbox( | |
| label="X-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic x-axis label" | |
| ), | |
| gr.Textbox( | |
| label="Y-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic y-axis label" | |
| ), | |
| gr.Radio( | |
| label="Colorscale", | |
| choices=["Viridis", "Plasma", "Inferno", "Magma", "Cividis", "Blues", "Reds", "YlOrRd", "RdBu"], | |
| value="Viridis", | |
| info="Plotly colorscale for heatmap" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Heatmap Status & Statistics"), | |
| gr.File(label="Download Interactive HTML") | |
| ], | |
| api_name="create_heatmap", | |
| title="Create Heatmap", | |
| description="Generate 2D heatmap visualization. Returns statistics and downloadable HTML file." | |
| )) | |
| # Tool 12: Scatter Plot | |
| tools.append(gr.Interface( | |
| fn=create_scatter_plot, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="X Dataset Path", | |
| placeholder="temperature", | |
| info="Path to X-axis dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Y Dataset Path", | |
| placeholder="salinity", | |
| info="Path to Y-axis dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="X Slice (optional)", | |
| placeholder="0, :, :", | |
| value="", | |
| info="Numpy-style slice for X data (can differ from Y slice)" | |
| ), | |
| gr.Textbox( | |
| label="Y Slice (optional)", | |
| placeholder="1, :, :", | |
| value="", | |
| info="Numpy-style slice for Y data (can differ from X slice)" | |
| ), | |
| gr.Textbox( | |
| label="Title (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic title" | |
| ), | |
| gr.Textbox( | |
| label="X-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic x-axis label" | |
| ), | |
| gr.Textbox( | |
| label="Y-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic y-axis label" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Scatter Plot Status & Statistics"), | |
| gr.File(label="Download Interactive HTML") | |
| ], | |
| api_name="create_scatter_plot", | |
| title="Create Scatter Plot", | |
| description="Generate scatter plot comparing two variables. Includes correlation and regression. Returns statistics and downloadable HTML file." | |
| )) | |
| # Tool 13: Contour Plot | |
| tools.append(gr.Interface( | |
| fn=create_contour_plot, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Dataset Path", | |
| placeholder="elevation", | |
| info="Path to 2D dataset/variable" | |
| ), | |
| gr.Textbox( | |
| label="Slice (optional)", | |
| placeholder="0, :, :", | |
| value="", | |
| info="Numpy-style slice notation (must result in 2D)" | |
| ), | |
| gr.Textbox( | |
| label="Title (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic title" | |
| ), | |
| gr.Textbox( | |
| label="X-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic x-axis label" | |
| ), | |
| gr.Textbox( | |
| label="Y-axis Label (optional)", | |
| placeholder="Leave blank for automatic", | |
| value="", | |
| info="Override automatic y-axis label" | |
| ), | |
| gr.Number( | |
| label="Number of Contours", | |
| value=20, | |
| minimum=5, | |
| maximum=50, | |
| info="Number of contour levels" | |
| ), | |
| gr.Radio( | |
| label="Colorscale", | |
| choices=["Viridis", "Plasma", "Inferno", "Magma", "Cividis", "Blues", "Reds", "YlOrRd", "RdBu"], | |
| value="Viridis", | |
| info="Plotly colorscale for contours" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Contour Plot Status & Statistics"), | |
| gr.File(label="Download Interactive HTML") | |
| ], | |
| api_name="create_contour_plot", | |
| title="Create Contour Plot", | |
| description="Generate contour plot for 2D data. Returns statistics and downloadable HTML file." | |
| )) | |
| # Tool 14: Coordinate to Index | |
| tools.append(gr.Interface( | |
| fn=coordinate_to_index, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Coordinate Variable", | |
| placeholder="lat", | |
| info="Name of coordinate variable (e.g., 'lat', 'lon', 'time')" | |
| ), | |
| gr.Number( | |
| label="Coordinate Value", | |
| value=0.0, | |
| info="Value to find in coordinate array" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Index Lookup Result"), | |
| api_name="coordinate_to_index", | |
| title="Coordinate to Index", | |
| description="Convert coordinate value to array index. E.g., 'what index corresponds to latitude 35.5?'" | |
| )) | |
| # Tool 15: Index to Coordinate | |
| tools.append(gr.Interface( | |
| fn=index_to_coordinate, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Coordinate Variable", | |
| placeholder="lat", | |
| info="Name of coordinate variable (e.g., 'lat', 'lon', 'time')" | |
| ), | |
| gr.Number( | |
| label="Array Index", | |
| value=0, | |
| minimum=0, | |
| info="Index to look up in coordinate array" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Coordinate Lookup Result"), | |
| api_name="index_to_coordinate", | |
| title="Index to Coordinate", | |
| description="Convert array index to coordinate value. E.g., 'what latitude is at index 142?'" | |
| )) | |
| # Tool 16: Query Content (Semantic Search) | |
| tools.append(gr.Interface( | |
| fn=query_content, | |
| inputs=[ | |
| gr.Textbox( | |
| label="File Path", | |
| placeholder="/path/to/file.nc", | |
| info="Path to HDF5/NetCDF file" | |
| ), | |
| gr.Textbox( | |
| label="Search Query", | |
| placeholder="temperature", | |
| info="Search term to find in dataset names, attributes, and metadata" | |
| ) | |
| ], | |
| outputs=gr.JSON(label="Search Results"), | |
| api_name="query_content", | |
| title="Query Content (Semantic Search)", | |
| description="Search through file contents to find relevant datasets. Searches names, attributes, dimensions, and metadata." | |
| )) | |
| return tools | |
| # ============================================ | |
| # MAIN LAUNCH | |
| # ============================================ | |
| if __name__ == "__main__": | |
| # Build MCP tools (these ARE exposed to MCP) | |
| mcp_tools = build_mcp_tools() | |
| # Create organized interface with nested tabs | |
| # All individual tools still exposed to MCP via api_name | |
| with gr.Blocks(title="HDF5/NetCDF MCP Server") as demo: | |
| gr.Markdown("# HDF5/NetCDF MCP Server") | |
| gr.Markdown("Tools for analyzing HDF5 and NetCDF scientific data files") | |
| with gr.Tab("π₯ File Loading"): | |
| with gr.Tabs(): | |
| with gr.Tab("Download from URL"): | |
| mcp_tools[0].render() # Download File | |
| with gr.Tab("Local File"): | |
| mcp_tools[1].render() # UI Helper - Local File | |
| with gr.Tab("Example Datasets"): | |
| gr.Markdown("### π Quick Start with Example Datasets") | |
| gr.Markdown("Pre-configured datasets from the xarray-data repository for testing and exploration.") | |
| mcp_tools[2].render() # Download Example | |
| with gr.Tab("π Data Analysis & Utilities"): | |
| with gr.Tabs(): | |
| with gr.Tab("Structure & Info"): | |
| mcp_tools[3].render() # List Structure | |
| with gr.Tab("Read Dataset"): | |
| mcp_tools[4].render() # Read Dataset | |
| with gr.Tab("Preview Dataset"): | |
| mcp_tools[5].render() # Preview Dataset | |
| with gr.Tab("Get Attributes"): | |
| mcp_tools[6].render() # Get Attributes | |
| with gr.Tab("Get Dimensions"): | |
| mcp_tools[7].render() # Get Dimensions | |
| with gr.Tab("Compute Statistics"): | |
| mcp_tools[8].render() # Compute Statistics | |
| with gr.Tab("Export Data"): | |
| mcp_tools[9].render() # Export Data | |
| with gr.Tab("Coordinate β Index"): | |
| mcp_tools[15].render() # Coordinate to Index | |
| with gr.Tab("Index β Coordinate"): | |
| mcp_tools[16].render() # Index to Coordinate | |
| with gr.Tab("Search Content"): | |
| mcp_tools[17].render() # Query Content | |
| with gr.Tab("π Visualizations"): | |
| with gr.Tabs(): | |
| with gr.Tab("Histogram"): | |
| mcp_tools[10].render() # Create Histogram | |
| with gr.Tab("Line Plot"): | |
| mcp_tools[11].render() # Create Line Plot | |
| with gr.Tab("Heatmap"): | |
| mcp_tools[12].render() # Create Heatmap | |
| with gr.Tab("Scatter Plot"): | |
| mcp_tools[13].render() # Create Scatter Plot | |
| with gr.Tab("Contour Plot"): | |
| mcp_tools[14].render() # Create Contour Plot | |
| # Launch with MCP server enabled | |
| demo.launch(mcp_server=True) |