#!/usr/bin/env python3
"""
HDF5/NetCDF MCP Server - Gradio Implementation
A Model Context Protocol server for reading and analyzing HDF5 and NetCDF scientific data files.
"""
import gradio as gr
import h5py
import netCDF4 as nc
import numpy as np
import pandas as pd
import json
import traceback
from typing import Dict, Any, Optional, Tuple, List
from pathlib import Path
import tempfile
import requests
import plotly.graph_objects as go
# Try to import h5netcdf for better NetCDF4 compatibility
try:
import h5netcdf
HAS_H5NETCDF = True
except ImportError:
HAS_H5NETCDF = False
# ============================================
# UTILITY FUNCTIONS (NOT EXPOSED AS MCP TOOLS)
# ============================================
def parse_slice(slice_str: str):
"""Parse a numpy-style slice string into a tuple of slice objects."""
if not slice_str or not slice_str.strip():
return slice(None)
parts = []
for part in slice_str.split(','):
part = part.strip()
if ':' in part:
slice_parts = part.split(':')
start = int(slice_parts[0]) if slice_parts[0] else None
stop = int(slice_parts[1]) if len(slice_parts) > 1 and slice_parts[1] else None
step = int(slice_parts[2]) if len(slice_parts) > 2 and slice_parts[2] else None
parts.append(slice(start, stop, step))
else:
parts.append(int(part))
return tuple(parts) if len(parts) > 1 else parts[0]
def open_file_with_fallback(file_path: str) -> Tuple[Any, str]:
"""
Open HDF5/NetCDF file with 3-tier fallback for maximum compatibility.
Returns:
(file_object, file_type) tuple where file_type is "HDF5", "NetCDF", or "NetCDF_h5netcdf"
"""
file_path = Path(file_path)
suffix = file_path.suffix.lower()
if suffix in ['.h5', '.hdf5']:
# Pure HDF5 files
f = h5py.File(file_path, 'r')
return f, "HDF5"
elif suffix in ['.nc', '.nc4', '.netcdf']:
# NetCDF files - try 3-tier fallback
# TIER 1: Try netCDF4 (best)
try:
f = nc.Dataset(file_path, 'r')
return f, "NetCDF"
except Exception as nc_error:
# TIER 2: Try h5netcdf
if HAS_H5NETCDF:
try:
f = h5netcdf.File(file_path, 'r')
return f, "NetCDF_h5netcdf"
except Exception:
pass
# TIER 3: Try h5py
try:
f = h5py.File(file_path, 'r')
return f, "HDF5"
except Exception as h5_error:
raise ValueError(f"Cannot open file. netCDF4 error: {nc_error}, h5py error: {h5_error}")
else:
raise ValueError(f"Unsupported file type: {suffix}")
def get_file_structure(file_path: str) -> Dict[str, Any]:
"""
Get the structure of an HDF5/NetCDF file.
Returns:
Dictionary with file structure information and list of datasets
"""
try:
f, file_type = open_file_with_fallback(file_path)
structure = {"file_type": file_type}
datasets = []
if file_type == "HDF5":
# HDF5 structure
dataset_list = []
def visitor(name, obj):
if isinstance(obj, h5py.Dataset):
dataset_list.append({
"path": "/" + name,
"shape": obj.shape,
"dtype": str(obj.dtype)
})
f.visititems(visitor)
structure["datasets"] = dataset_list
# Create dropdown-friendly list
for ds in dataset_list:
display = f"{ds['path']} ({ds['shape']}, {ds['dtype']})"
datasets.append(display)
else:
# NetCDF structure
structure["dimensions"] = {name: len(dim) for name, dim in f.dimensions.items()}
var_list = []
for name, var in f.variables.items():
var_list.append({
"name": name,
"dimensions": var.dimensions,
"shape": var.shape,
"dtype": str(var.dtype)
})
structure["variables"] = var_list
# Create dropdown-friendly list
for var in var_list:
display = f"{var['name']} ({var['shape']}, {var['dtype']})"
datasets.append(display)
f.close()
return {
"structure": structure,
"datasets": datasets,
"success": True
}
except Exception as e:
return {
"error": str(e),
"traceback": traceback.format_exc(),
"success": False
}
def get_var_attrs_dict(var, file_type: str) -> dict:
"""Get variable attributes as dict - handles all file types."""
try:
if file_type == "HDF5":
return dict(var.attrs)
elif file_type == "NetCDF":
return {k: var.getncattr(k) for k in var.ncattrs()}
elif file_type == "NetCDF_h5netcdf":
return dict(var.attrs)
else:
return {}
except Exception:
return {}
# ============================================
# MEMORY SAFETY LIMITS (OVERRIDABLE)
# ============================================
# Size thresholds for automatic memory checks (in MB)
SAFE_LOAD_MB = 100 # Green light - load without warning
WARNING_LOAD_MB = 500 # Yellow - load but warn
ERROR_LOAD_MB = 2000 # Red - refuse unless user overrides
def check_memory_safety(data_shape: tuple, dtype, slice_str: Optional[str] = None,
memory_limit_mb: Optional[float] = None) -> Dict[str, Any]:
"""
Check if loading data is safe based on size thresholds.
Args:
data_shape: Full shape of the dataset
dtype: Data type of the dataset
slice_str: Optional slice string to compute result shape
memory_limit_mb: Optional custom memory limit. If None, uses tiered defaults.
Returns:
Dictionary with:
- safe: bool - whether it's safe to proceed
- estimated_mb: float - estimated memory usage
- warning: str (optional) - warning message
- error: str (optional) - error message
- suggested_slice: str (optional) - suggested slice if too large
"""
# Compute result shape
if slice_str and slice_str.strip():
try:
slices = parse_slice(slice_str)
result_shape = compute_slice_shape(data_shape, slices)
except Exception as e:
return {"safe": False, "error": f"Invalid slice: {str(e)}"}
else:
result_shape = data_shape
# Calculate memory
result_elements = np.prod(result_shape)
bytes_per_element = np.dtype(dtype).itemsize
estimated_mb = result_elements * bytes_per_element / (1024 * 1024)
result = {
"estimated_mb": round(estimated_mb, 2),
"result_shape": list(result_shape),
"full_shape": list(data_shape)
}
# Use custom limit if provided
if memory_limit_mb is not None:
if estimated_mb <= memory_limit_mb:
result["safe"] = True
result["info"] = f"Within custom limit of {memory_limit_mb} MB"
else:
result["safe"] = False
result["error"] = f"Exceeds custom limit of {memory_limit_mb} MB (estimated: {estimated_mb:.1f} MB)"
if slice_str is None:
result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype, target_mb=memory_limit_mb)
return result
# Use tiered defaults
if estimated_mb < SAFE_LOAD_MB:
result["safe"] = True
result["info"] = "Safe to load"
elif estimated_mb < WARNING_LOAD_MB:
result["safe"] = True
result["warning"] = f"Loading {estimated_mb:.1f} MB. Consider slicing for faster operations."
elif estimated_mb < ERROR_LOAD_MB:
result["safe"] = False
result["error"] = f"Dataset too large ({estimated_mb:.1f} MB). Please use slicing."
if slice_str is None:
result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype)
else:
result["safe"] = False
result["error"] = f"Dataset extremely large ({estimated_mb:.1f} MB). Maximum recommended: {ERROR_LOAD_MB} MB."
result["suggested_slice"] = suggest_reasonable_slice(data_shape, dtype)
return result
def compute_slice_shape(full_shape: tuple, slices) -> tuple:
"""Compute the resulting shape after applying slices"""
if not isinstance(slices, tuple):
slices = (slices,)
# Pad slices with full-dimension slices if needed
while len(slices) < len(full_shape):
slices = slices + (slice(None),)
result_shape = []
for dim_size, s in zip(full_shape, slices):
if isinstance(s, int):
# Single index - dimension is removed
continue
elif isinstance(s, slice):
start = s.start if s.start is not None else 0
stop = s.stop if s.stop is not None else dim_size
step = s.step if s.step is not None else 1
# Handle negative indices
if start < 0:
start = dim_size + start
if stop < 0:
stop = dim_size + stop
# Compute dimension size
dim_result = max(0, (stop - start + step - 1) // step)
result_shape.append(dim_result)
return tuple(result_shape)
def suggest_reasonable_slice(full_shape: tuple, dtype, target_mb: float = 100) -> str:
"""Suggest a slice that would result in reasonable memory usage"""
bytes_per_element = np.dtype(dtype).itemsize
target_elements = int(target_mb * 1024 * 1024 / bytes_per_element)
# Find the largest dimension
max_dim_idx = np.argmax(full_shape)
# Calculate how many elements of that dimension we can take
other_dims_product = np.prod([s for i, s in enumerate(full_shape) if i != max_dim_idx])
if other_dims_product > 0:
max_allowed = max(1, int(target_elements / other_dims_product))
max_allowed = min(max_allowed, full_shape[max_dim_idx])
else:
max_allowed = min(10, full_shape[max_dim_idx])
# Build slice string
slice_parts = []
for i, dim_size in enumerate(full_shape):
if i == max_dim_idx:
slice_parts.append(f"0:{max_allowed}")
else:
slice_parts.append(":")
return ", ".join(slice_parts)
# ============================================
# MCP TOOL FUNCTIONS (STATELESS - ALWAYS TAKE file_path)
# ============================================
def download_file(url: str) -> Dict[str, Any]:
"""
Download a HDF5/NetCDF file from a URL and parse its structure.
**CRITICAL**: This tool downloads the file AND parses its structure. You do NOT need to
call list_structure() after downloading - the structure is already in the result.
Args:
url: Direct URL to the HDF5 (.h5, .hdf5) or NetCDF (.nc, .nc4) file
Returns:
Dictionary with these keys:
- file_path (str): Path to downloaded file - SAVE THIS, you need it for all other tools
- structure (dict): Complete file structure (file_type, dimensions, variables/datasets)
- datasets (list): Available dataset names with shapes/types
- filename (str): Original filename
- size_mb (float): File size in megabytes
- status (str): Success/error message
**Return value structure**:
{
"file_path": "/tmp/tmpXXX.nc", # ← Use this in other tools
"structure": {
"file_type": "NetCDF",
"dimensions": {"time": 100, "lat": 90, "lon": 180},
"variables": [{"name": "data", "shape": [100, 90, 180], ...}]
},
"datasets": ["data ((100, 90, 180), float32)", ...],
"status": "Successfully downloaded..."
}
**Usage pattern**:
1. Call download_file(url)
2. Extract file_path from result: file_path = result["file_path"]
3. Use file_path in other tools: compute_statistics(file_path, "dataset_name")
**Common mistake**: Don't call list_structure() after download_file() - the structure
is already in the download_file result, so that's redundant.
"""
try:
if not url:
return {"error": "Please provide a URL", "status": "failed"}
# Download file
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, stream=True, timeout=60, headers=headers, allow_redirects=True)
response.raise_for_status()
# Check content type
content_type = response.headers.get('content-type', '').lower()
if 'text/html' in content_type:
return {"error": "URL returned HTML instead of a file", "status": "failed"}
# Determine extension
url_path = Path(url)
extension = url_path.suffix if url_path.suffix else '.nc'
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=extension, mode='wb') as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp_file.write(chunk)
tmp_path = tmp_file.name
# Get file size
file_size_mb = Path(tmp_path).stat().st_size / (1024 * 1024)
# Parse structure
structure_result = get_file_structure(tmp_path)
if not structure_result.get("success"):
return {
"error": structure_result.get('error', 'Unknown error'),
"status": "failed"
}
filename = url_path.name
return {
"file_path": tmp_path,
"filename": filename,
"size_mb": round(file_size_mb, 2),
"structure": structure_result["structure"],
"datasets": structure_result["datasets"],
"status": f"Successfully downloaded {filename} ({file_size_mb:.2f} MB)"
}
except Exception as e:
return {
"error": str(e),
"traceback": traceback.format_exc(),
"status": "failed"
}
def read_dataset(file_path: str, dataset_path: str, slice_str: str = "",
memory_limit_mb: Optional[float] = None) -> Dict[str, Any]:
"""
Read data from a specific dataset/variable.
**WHEN TO USE THIS**:
- When you need the actual data values for visualization or export
- When compute_statistics() doesn't provide enough information
- When you need to inspect specific data points
**WHEN NOT TO USE THIS**:
- If you only need statistics (min, max, mean, etc.) → use compute_statistics() instead
- If the dataset is > 10,000 elements → use compute_statistics(), export_data(), or visualization tools
**Token efficiency note**: Large datasets are truncated to prevent wasting tokens. If you see
"serialized": false in the result, the tool is telling you to use a different approach.
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to the dataset within the file (e.g., 'variable_name', '/group/data')
slice_str: Optional numpy-style slice (e.g., '0:10, :, 5' = first 10 in dim 0, all of dim 1, index 5 of dim 2)
memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered
defaults (100 MB safe, 500 MB warning, 2000 MB max). Set higher to
override defaults if you have more memory available.
Returns:
Dictionary containing:
- data (list): Actual data values (only if < 10,000 elements; otherwise see alternatives)
- shape (list): Shape of the data
- dtype (str): Data type
- size_mb (float): Memory size
- num_elements (int): Total elements
- serialized (bool): True if full data included, False if truncated
- alternatives (list): Suggested alternative tools if data was truncated
**Return value structure (small dataset)**:
{
"data": [[1.2, 3.4], [5.6, 7.8]], # ← Full data array
"shape": [2, 2],
"dtype": "float32",
"serialized": true
}
**Return value structure (large dataset)**:
{
"serialized": false, # ← Data NOT included
"preview_first_100": [1.2, 3.4, ...],
"alternatives": [
"Use compute_statistics() to get summary stats",
"Use export_data() to save to file",
"Use create_histogram() to visualize distribution"
],
"shape": [1000, 90, 180],
"num_elements": 16200000
}
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
if not dataset_path:
return {"error": "Please specify a dataset path"}
# Extract clean path if it has shape/dtype info
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset/variable
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety before loading
safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
# Return error with clear override instructions
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"full_shape": safety_check["full_shape"],
"result_shape": safety_check["result_shape"],
"override_option": f"To load this data anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher",
"default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'"
return error_result
# Read data with optional slicing
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Convert to numpy array
if not isinstance(data, np.ndarray):
data = np.array(data)
result = {
"shape": list(data.shape),
"dtype": str(data.dtype),
"size_mb": round(data.nbytes / (1024 * 1024), 2),
"num_elements": int(data.size)
}
# Limit serialization to avoid excessive token usage in LLM context
# The LLM sees the full JSON response and large arrays waste tokens
MAX_SERIALIZE_ELEMENTS = 10000
if data.size > MAX_SERIALIZE_ELEMENTS:
result["serialized"] = False
result["note"] = f"Dataset has {data.size:,} elements. Only preview returned to save tokens."
result["token_saving_info"] = f"Full data would use ~{data.size * 10} tokens. Use export_data or create_histogram for large datasets."
# Provide preview
flat_data = data.flatten()
result["preview_first_100"] = flat_data[:100].tolist()
result["preview_last_100"] = flat_data[-100:].tolist()
result["preview_shape_info"] = f"Showing first/last 100 of {data.size:,} elements"
# Suggest alternatives
result["alternatives"] = [
f"Use compute_statistics() to get summary stats without loading all data",
f"Use export_data() to save to file for external analysis",
f"Use create_histogram() to visualize distribution",
f"Use slicing to load smaller subset: slice_str='{suggest_reasonable_slice(data.shape, data.dtype, target_mb=50)}'"
]
else:
result["serialized"] = True
result["data"] = data.tolist()
# Add warning if present
if "warning" in safety_check:
result["warning"] = safety_check["warning"]
return result
finally:
f.close()
except Exception as e:
return {"error": f"Failed to read dataset: {str(e)}", "traceback": traceback.format_exc()}
def get_dataset_preview(file_path: str, dataset_path: str, n_preview: int = 10) -> Dict[str, Any]:
"""
Get a safe preview of a dataset without loading the entire array.
**USE CASE**: Quick peek at data values without loading large arrays.
**WHEN TO USE THIS**:
- You want to see a few sample values to understand the data
- You need to verify data looks reasonable before processing
- You want to check data format/structure with minimal memory use
- Dataset is large but you only need to see representative values
**WHEN NOT TO USE THIS**:
- You need statistics → use compute_statistics() instead (more efficient)
- You need all the data → use read_dataset() for small datasets or export_data() for large ones
- You want to visualize → use create_histogram(), create_line_plot(), etc. instead
**PREVIEW BEHAVIOR**:
- Returns first N elements from flattened array (default: 10)
- Safe for any dataset size - only loads requested elements
- Provides context: shape, dtype, total size
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to the dataset within the file
n_preview: Number of elements to preview (default: 10, range: 1-1000)
Returns:
Dictionary with keys:
- preview (list): First n_preview elements from the dataset
- shape (list): Full shape of the dataset
- dtype (str): Data type
- size_mb (float): Total memory size in MB
- num_elements (int): Total number of elements
- preview_count (int): Number of elements actually returned
**Return value structure**:
{
"preview": [15.3, 14.2, 16.8, 15.1, ...], # ← First 10 values
"shape": [1750, 90, 180],
"dtype": "float32",
"size_mb": 113.4,
"num_elements": 28350000,
"preview_count": 10
}
**Example workflow - Quick data check**:
# Preview data to verify it looks reasonable
preview = get_dataset_preview(file_path, "temperature", n_preview=20)
sample_values = preview["preview"] # First 20 values
# Check if values are in expected range
if all(-50 < v < 50 for v in sample_values):
# Looks good, now get full statistics
stats = compute_statistics(file_path, "temperature")
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
if not dataset_path:
return {"error": "Please specify a dataset path"}
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset/variable
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
shape = var.shape
dtype = var.dtype
num_elements = int(np.prod(shape)) if len(shape) > 0 else 1
size_mb = num_elements * dtype.itemsize / (1024**2)
result = {
"dataset_path": dataset_path,
"shape": list(shape),
"dtype": str(dtype),
"size_mb": round(size_mb, 2),
"num_elements": num_elements
}
# Get preview data
if num_elements == 0:
result["preview_data"] = []
result["note"] = "Dataset is empty"
elif len(shape) == 0:
# Scalar
val = var[()]
result["preview_data"] = val.item() if isinstance(val, np.generic) else val
elif len(shape) == 1:
# 1D array
n = min(n_preview, shape[0])
result["first_n"] = var[:n].tolist()
if shape[0] > n:
result["last_n"] = var[-n:].tolist()
else:
# Multi-dimensional
sample_slice = [slice(0, min(n_preview, shape[0]))] + [0] * (len(shape) - 1)
sample_data = var[tuple(sample_slice)]
result["first_n_elements"] = np.array(sample_data).flatten()[:n_preview].tolist()
result["note"] = f"Showing first {min(n_preview, len(result['first_n_elements']))} of {num_elements} elements"
return result
finally:
f.close()
except Exception as e:
return {"error": f"Failed to preview dataset: {str(e)}", "traceback": traceback.format_exc()}
def get_attributes(file_path: str, path: str = "") -> Dict[str, Any]:
"""
Get metadata attributes for a file, group, or dataset.
**USE CASE**: Understanding what variables mean, their units, descriptions, etc.
**COMMON ATTRIBUTES TO LOOK FOR**:
- units: Physical units of the data (e.g., "kelvin", "meters", "days since 1800-01-01")
- long_name: Human-readable description of the variable
- standard_name: CF-convention standard name
- scale_factor / add_offset: Linear transformation parameters for packed data
- _FillValue: Value representing missing/invalid data
- valid_range / valid_min / valid_max: Valid data bounds
**WHEN TO USE THIS**:
- Before analyzing data: check units and understand what the values represent
- To find file-level metadata: leave path empty
- To understand coordinate variables: especially for time (to convert indices to dates)
- To check for data quality flags or processing history
Args:
file_path: Path to the HDF5/NetCDF file
path: Path to dataset/group (empty string "" for file-level attributes)
Returns:
Dictionary with keys:
- attributes (dict): All metadata attributes as key-value pairs
- path (str): Path that was queried ("root" if file-level)
**Return value structure**:
{
"attributes": {
"long_name": "Surface temperature anomaly",
"units": "K",
"scale_factor": 0.01,
"_FillValue": 32767,
"cell_methods": "time: mean"
},
"path": "temperature"
}
**Example workflow - Understanding time coordinate**:
# Get time attributes to understand how to interpret time values
time_attrs = get_attributes(file_path, "time")
units = time_attrs["attributes"]["units"] # e.g., "days since 1800-01-01"
# Now you know how to interpret time values from index_to_coordinate()
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
# Extract clean path
path = path.split(" (")[0] if path and " (" in path else path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get the object
if path:
if file_type == "HDF5":
obj = f[path]
else:
obj = f.variables[path]
else:
obj = f
# Get attributes
if file_type == "HDF5":
attrs = dict(obj.attrs)
elif file_type == "NetCDF":
attrs = {k: obj.getncattr(k) for k in obj.ncattrs()}
elif file_type == "NetCDF_h5netcdf":
attrs = dict(obj.attrs)
else:
attrs = {}
# Convert numpy types to native Python
for key, value in attrs.items():
if isinstance(value, np.generic):
attrs[key] = value.item()
return {"attributes": attrs, "path": path if path else "root"}
finally:
f.close()
except Exception as e:
return {"error": f"Failed to get attributes: {str(e)}", "traceback": traceback.format_exc()}
def get_dimensions(file_path: str, dataset_path: str = "") -> Dict[str, Any]:
"""
Get dimension information for a dataset or file.
**USE CASE**: Understanding data structure before slicing or analyzing.
**WHEN TO USE THIS**:
- To see what dimensions a dataset has and their sizes
- To understand how to construct slice strings
- For NetCDF files with empty dataset_path: get ALL dimensions in the file
**DO NOT use this if**:
- You just downloaded a file → the structure is already in download_file() result
- You already ran list_structure() → the dimensions are in that result
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to dataset (empty "" for NetCDF shows all dimensions, required for HDF5)
Returns:
For specific dataset:
- dimensions (list): Dimension names (NetCDF only)
- shape (list): Size of each dimension
For NetCDF with empty path:
- dimensions (dict): All dimensions in file as {name: size}
**Return value structure (specific dataset)**:
{
"dimensions": ["time", "lat", "lon"],
"shape": [1750, 90, 180]
}
**Return value structure (NetCDF, all dimensions)**:
{
"dimensions": {
"time": 1750,
"lat": 90,
"lon": 180,
"nv": 2
}
}
**Example workflow - Planning a slice**:
dims = get_dimensions(file_path, "temperature")
# dims["shape"] = [1750, 90, 180]
# Now you know: dimension 0 has 1750 elements, dim 1 has 90, dim 2 has 180
# To get first 10 time steps, all lat, all lon: slice_str = "0:10, :, :"
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if dataset_path and " (" in dataset_path else dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
if file_type == "HDF5":
if not dataset_path:
return {"error": "dataset_path required for HDF5 files"}
shape = f[dataset_path].shape
return {"shape": list(shape), "ndim": len(shape)}
else:
# NetCDF
if dataset_path:
var = f.variables[dataset_path]
return {
"dimensions": var.dimensions,
"shape": list(var.shape)
}
else:
return {
"dimensions": {name: len(dim) for name, dim in f.dimensions.items()}
}
finally:
f.close()
except Exception as e:
return {"error": f"Failed to get dimensions: {str(e)}", "traceback": traceback.format_exc()}
def list_structure(file_path: str) -> Dict[str, Any]:
"""
List the complete hierarchical structure of the file.
**WHEN TO USE THIS**:
- You need to browse available datasets/variables
- You don't know the names of variables in the file
- You need to see the complete file organization
**WHEN NOT TO USE THIS**:
- Right after download_file() → structure is already in that result
- You already know the dataset name → just use it directly in other tools
- File has > 100 datasets → output will be truncated to save tokens
**TOKEN EFFICIENCY NOTE**: For files with many datasets, this function truncates
the output to avoid wasting tokens. If you know specific dataset names, use them
directly instead of browsing.
Args:
file_path: Path to the HDF5/NetCDF file
Returns:
Dictionary containing:
- file_type (str): "HDF5" or "NetCDF"
- dimensions (dict): For NetCDF, dimensions and their sizes
- variables (list): For NetCDF, list of variables with metadata
- datasets (list): For HDF5, list of datasets with metadata
- token_saving_info (dict): If file is large, explanation of truncation
**Return value structure (NetCDF)**:
{
"file_type": "NetCDF",
"dimensions": {"time": 100, "lat": 90, "lon": 180},
"variables": [
{
"name": "temperature",
"dimensions": ["time", "lat", "lon"],
"shape": [100, 90, 180],
"dtype": "float32"
},
...
]
}
**Common mistake**: Calling this after download_file() is redundant - you already
have the structure from download_file()["structure"].
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
# Get the full structure
result = get_file_structure(file_path)
if not result.get("success"):
return {"error": result.get("error", "Unknown error")}
structure = result["structure"]
# Count total items
if structure.get("file_type") == "HDF5":
total_items = len(structure.get("datasets", []))
item_type = "datasets"
else: # NetCDF
total_items = len(structure.get("variables", []))
item_type = "variables"
# Token-saving thresholds
SAFE_LIMIT = 100 # Return full list
WARNING_LIMIT = 500 # Return full list with warning
TRUNCATE_LIMIT = 500 # Truncate and provide summary
if total_items <= SAFE_LIMIT:
# Small file - return everything
return structure
elif total_items <= WARNING_LIMIT:
# Medium file - return everything but warn about token usage
structure["token_warning"] = f"File has {total_items} {item_type}. Consider using specific dataset names instead of browsing full structure."
return structure
else:
# Large file - truncate to save tokens
if structure.get("file_type") == "HDF5":
full_list = structure.get("datasets", [])
structure["datasets"] = full_list[:100]
structure["datasets_truncated"] = True
structure["total_datasets"] = total_items
structure["showing_first"] = 100
else: # NetCDF
full_list = structure.get("variables", [])
structure["variables"] = full_list[:100]
structure["variables_truncated"] = True
structure["total_variables"] = total_items
structure["showing_first"] = 100
structure["token_saving_info"] = {
"message": f"File has {total_items} {item_type}. Only showing first 100 to save tokens.",
"estimated_tokens_saved": f"~{(total_items - 100) * 50:,} tokens",
"full_token_cost": f"Full list would use ~{total_items * 50:,} tokens",
"alternatives": [
"If you know the dataset name, use read_dataset() or get_attributes() directly",
"Use file metadata or documentation to identify dataset names",
"Search for specific patterns if available in your workflow"
]
}
return structure
except Exception as e:
return {"error": f"Failed to list structure: {str(e)}", "traceback": traceback.format_exc()}
def compute_statistics(file_path: str, dataset_path: str, slice_str: str = "",
memory_limit_mb: Optional[float] = None) -> Dict[str, Any]:
"""
Compute basic statistics for a numeric dataset.
**PREFERRED TOOL**: Use this instead of read_dataset() when you only need statistics.
This is more efficient and avoids loading full arrays into memory unnecessarily.
**Common use cases**:
- Comparing values between time periods (e.g., "compare mean in period A vs period B")
- Finding data ranges (min/max)
- Data quality checks (checking for NaN values)
- Quick data exploration before visualization
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to the dataset within the file
slice_str: Optional numpy-style slice (e.g., "100:200, :, :" for rows 100-200)
memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered
defaults (100 MB safe, 500 MB warning, 2000 MB max). Set higher to
override defaults if you have more memory available.
Returns:
Dictionary with keys:
- statistics (dict): Contains 'min', 'max', 'mean', 'std', 'median'
- shape (list): Shape of the data analyzed
- dtype (str): Data type
- size_mb (float): Memory size in MB
- num_elements (int): Total number of elements
- nan_count (int): Number of NaN values (for floating point data)
- nan_fraction (float): Fraction of values that are NaN
**Return value structure**:
{
"statistics": {
"min": -5.2,
"max": 42.1,
"mean": 15.3, # ← Access directly with result["statistics"]["mean"]
"std": 8.7,
"median": 14.2
},
"shape": [12, 90, 180],
"num_elements": 194400,
"nan_count": 0
}
**Example workflow - Comparing time periods**:
# INEFFICIENT (don't do this):
result1 = read_dataset(file, "var", "0:12, :, :") # loads full data
result2 = read_dataset(file, "var", "100:112, :, :") # loads full data
# then manually calculate means from the data arrays
# EFFICIENT (do this):
stats1 = compute_statistics(file, "var", "0:12, :, :") # computes stats directly
stats2 = compute_statistics(file, "var", "100:112, :, :") # computes stats directly
difference = stats2["statistics"]["mean"] - stats1["statistics"]["mean"]
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
if not dataset_path:
return {"error": "Please specify a dataset path"}
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset/variable
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety before loading
safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
# Return error with clear override instructions
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"full_shape": safety_check["full_shape"],
"result_shape": safety_check["result_shape"],
"override_option": f"To compute statistics anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher",
"default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'"
return error_result
# Read data with optional slicing
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Convert to numpy array
if not isinstance(data, np.ndarray):
data = np.array(data)
# Check if numeric
if not np.issubdtype(data.dtype, np.number):
return {"error": f"Dataset is not numeric (dtype: {data.dtype})"}
# Compute statistics
stats = {
"min": float(np.nanmin(data)),
"max": float(np.nanmax(data)),
"mean": float(np.nanmean(data)),
"std": float(np.nanstd(data)),
"median": float(np.nanmedian(data)),
"shape": list(data.shape),
"dtype": str(data.dtype),
"size_mb": round(data.nbytes / (1024 * 1024), 2),
"num_elements": int(data.size)
}
# Add NaN info if floating point
if np.issubdtype(data.dtype, np.floating):
num_nan = int(np.sum(np.isnan(data)))
stats["nan_count"] = num_nan
stats["nan_fraction"] = float(num_nan / data.size) if data.size > 0 else 0.0
result = {"statistics": stats}
# Add warning if present
if "warning" in safety_check:
result["warning"] = safety_check["warning"]
return result
finally:
f.close()
except Exception as e:
return {"error": f"Failed to compute statistics: {str(e)}", "traceback": traceback.format_exc()}
def export_data(file_path: str, dataset_path: str, format: str = "csv",
slice_str: str = "", max_rows: int = 100000,
memory_limit_mb: Optional[float] = None) -> Dict[str, Any]:
"""
Export dataset to CSV or JSON format for external analysis.
**WHEN TO USE THIS**:
- You need to analyze data in external tools (Excel, Python, R, etc.)
- You want to save processed/sliced data
- Dataset is too large to view in chat but small enough to export
**WHEN NOT TO USE THIS**:
- Just to check a few values → use get_dataset_preview() instead
- Just need statistics → use compute_statistics() instead
- Want to visualize → use create_histogram(), create_line_plot(), etc. instead
**FORMAT NOTES**:
- CSV: Best for tabular data, has row limits (default 100,000)
- JSON: Better for hierarchical data or very large datasets
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to the dataset within the file
format: Export format - "csv" or "json"
slice_str: Optional numpy-style slice to export subset
max_rows: Maximum rows for CSV export (safety limit)
memory_limit_mb: Optional custom memory limit in MB. If not specified, uses tiered
defaults (100 MB safe, 500 MB warning, 2000 MB max).
Returns:
Dictionary with keys:
- status (str): "success" or error
- file_path (str): Path to exported file (downloadable)
- format (str): Export format used
- size_mb (float): Size of exported file
- shape (list): Shape of exported data
- total_elements (int): Total elements exported
**Return value structure**:
{
"status": "success",
"file_path": "/tmp/exported_data.csv", # ← Download this file
"format": "CSV",
"size_mb": 1.5,
"shape": [1000, 90],
"total_elements": 90000
}
"""
try:
if not file_path:
return {"error": "Please provide a file path"}
if not dataset_path:
return {"error": "Please specify a dataset path"}
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset/variable
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety before loading
safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
# Return error with clear override instructions
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"full_shape": safety_check["full_shape"],
"result_shape": safety_check["result_shape"],
"override_option": f"To export anyway, set memory_limit_mb={int(safety_check['estimated_mb'] * 1.2)} or higher",
"default_limits": "Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["alternative"] = f"Or use slicing: slice_str='{safety_check['suggested_slice']}'"
return error_result
# Read data with optional slicing
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Convert to numpy array
if not isinstance(data, np.ndarray):
data = np.array(data)
# Get metadata
metadata = get_var_attrs_dict(var, file_type)
# Check size limits for CSV
total_elements = np.prod(data.shape)
if total_elements > max_rows and format == "csv":
return {"error": f"Dataset too large for CSV ({total_elements} elements). Maximum is {max_rows}. Use slicing or JSON format."}
# Create export file
if format == "csv":
tmp_path = tempfile.mktemp(suffix='.csv')
# Handle different dimensions
if data.ndim == 1:
df = pd.DataFrame({
'index': np.arange(len(data)),
'value': data
})
elif data.ndim == 2:
df = pd.DataFrame(data)
df.columns = [f'col_{i}' for i in range(data.shape[1])]
else:
# Flatten with multi-index
indices = np.array(np.unravel_index(np.arange(data.size), data.shape)).T
df = pd.DataFrame(indices, columns=[f'dim_{i}' for i in range(data.ndim)])
df['value'] = data.flatten()
# Apply row limit
if len(df) > max_rows:
df = df.head(max_rows)
# Write with metadata as comments
with open(tmp_path, 'w') as f_out:
f_out.write(f"# Dataset: {dataset_path}\n")
f_out.write(f"# Shape: {data.shape}\n")
f_out.write(f"# Dtype: {data.dtype}\n")
if metadata:
f_out.write("# Metadata:\n")
for key, value in metadata.items():
f_out.write(f"# {key}: {value}\n")
f_out.write("\n")
df.to_csv(f_out, index=False)
elif format == "json":
tmp_path = tempfile.mktemp(suffix='.json')
export_obj = {
"dataset": dataset_path,
"shape": list(data.shape),
"dtype": str(data.dtype),
"data": data.tolist(),
"metadata": metadata
}
with open(tmp_path, 'w') as f_out:
json.dump(export_obj, f_out, indent=2)
else:
return {"error": f"Unsupported format: {format}"}
# Get file size
file_size_mb = Path(tmp_path).stat().st_size / (1024 * 1024)
return {
"status": "success",
"file_path": tmp_path,
"format": format.upper(),
"size_mb": round(file_size_mb, 2),
"shape": list(data.shape),
"dtype": str(data.dtype),
"total_elements": int(total_elements)
}
finally:
f.close()
except Exception as e:
return {"error": f"Failed to export: {str(e)}", "traceback": traceback.format_exc()}
def create_histogram(file_path: str, dataset_path: str, slice_str: str = "",
title_override: str = "", xlabel_override: str = "",
nbins: int = 50) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Create histogram showing distribution of values.
**USE CASE**: Visualize the distribution of a dataset to understand its characteristics.
**WHEN TO USE THIS**:
- Understand data distribution (normal, skewed, bimodal, etc.)
- Identify outliers or unusual values
- Quality control - check if data looks reasonable
- Compare with expected distributions
**OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file.
The HTML file contains an interactive Plotly histogram viewable in any browser.
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to dataset/variable
slice_str: Optional numpy-style slice notation to plot subset
title_override: Optional custom title (leave empty for automatic)
xlabel_override: Optional custom x-axis label (leave empty for automatic)
nbins: Number of bins for histogram (default: 50, range: 5-200)
Returns:
Tuple of (status_dict, html_file_path):
- status_dict: Contains statistics and status info
- html_file_path: Path to interactive HTML file (downloadable)
**Return value structure**:
(
{
"status": "success",
"html_file": "/tmp/plot123.html", # ← Download this
"statistics": {
"mean": 15.3,
"median": 14.2,
"min": -5.2,
"max": 42.1,
"std": 8.7,
"num_bins": 50,
"num_values": 16200
}
},
"/tmp/plot123.html" # HTML file path
)
**Workflow**:
result, html_path = create_histogram(file_path, "variable_name")
# Download the HTML file from html_path to view the interactive plot
# Statistics are in result["statistics"]
"""
import plotly.graph_objects as go
try:
if not file_path:
return {
"error": "No file path provided",
"status": "failed"
}, None
if not dataset_path:
return {
"error": "No dataset path provided",
"status": "failed"
}, None
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file and read data directly (bypassing read_dataset's serialization limit)
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset/variable
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety before loading
safety_check = check_memory_safety(var.shape, var.dtype, slice_str)
if not safety_check["safe"]:
# Return error with suggestion
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'"
return error_result, None
# Read data with optional slicing
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Keep masked array for statistics
data_for_stats = data
# Convert to filled array for plotting
if isinstance(data, np.ma.MaskedArray):
data_for_plot = data.filled(np.nan)
else:
data_for_plot = np.array(data)
data_for_plot = data_for_plot.flatten()
finally:
f.close()
# Calculate statistics using masked array (respects mask)
mean_val = float(np.mean(data_for_stats))
median_val = float(np.median(data_for_stats))
min_val = float(np.min(data_for_stats))
max_val = float(np.max(data_for_stats))
std_val = float(np.std(data_for_stats))
# Create figure
fig = go.Figure()
# Add histogram
fig.add_trace(go.Histogram(
x=data_for_plot,
nbinsx=nbins,
name='Distribution',
marker_color='steelblue',
opacity=0.7
))
# Add vertical lines for statistics
fig.add_vline(
x=mean_val,
line_dash="dash",
line_color="red",
annotation_text=f"Mean: {mean_val:.2f}",
annotation_position="top"
)
fig.add_vline(
x=median_val,
line_dash="dot",
line_color="green",
annotation_text=f"Median: {median_val:.2f}",
annotation_position="bottom"
)
# Set title and labels
title = title_override if title_override else f"Distribution of {dataset_path}"
xlabel = xlabel_override if xlabel_override else "Value"
fig.update_layout(
title=title,
xaxis_title=xlabel,
yaxis_title="Frequency",
height=500,
showlegend=True
)
# Save as interactive HTML file
html_path = tempfile.mktemp(suffix='.html', dir='/tmp')
fig.write_html(html_path)
# Return JSON status instead of plot object (avoids MCP serialization issues)
result = {
"status": "success",
"message": f"Histogram created for {dataset_path}",
"html_file": html_path,
"statistics": {
"mean": mean_val,
"median": median_val,
"min": min_val,
"max": max_val,
"std": std_val,
"num_bins": nbins,
"num_values": len(data_for_plot)
},
"download_info": "Download the HTML file to view the interactive histogram in your browser"
}
return result, html_path
except Exception as e:
import traceback
return {
"error": f"Failed to create histogram: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}, None
def create_line_plot(file_path: str, dataset_path: str, slice_str: str = "",
title_override: str = "", xlabel_override: str = "", ylabel_override: str = "",
x_dataset_path: str = "", moving_avg_window: int = 0,
memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Create line plot for 1D data or time series.
**USE CASE**: Visualize trends over time or along a single dimension.
**WHEN TO USE THIS**:
- Time series data (temperature over time, stock prices, etc.)
- 1D profiles (depth profiles, altitude profiles)
- Sequential data where order matters
- When you want to see trends and patterns
**FEATURES**:
- Optional moving average smoothing to reduce noise
- Custom X-axis data or auto-generated indices
- Interactive Plotly visualization
**OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file.
The HTML file contains an interactive Plotly line plot viewable in any browser.
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to Y-axis dataset/variable
slice_str: Optional numpy-style slice notation
title_override: Optional custom title (leave empty for automatic)
xlabel_override: Optional custom x-axis label (leave empty for automatic)
ylabel_override: Optional custom y-axis label (leave empty for automatic)
x_dataset_path: Optional path to X-axis data (if empty, uses indices 0, 1, 2, ...)
moving_avg_window: Window size for smoothing (0 or 1 = no smoothing, 2-1000 = apply moving average)
memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000)
Returns:
Tuple of (status_dict, html_file_path):
- status_dict: Contains statistics and status info
- html_file_path: Path to interactive HTML file (downloadable)
**Return value structure**:
(
{
"status": "success",
"html_file": "/tmp/plot456.html", # ← Download this
"num_points": 1750,
"y_min": -5.2,
"y_max": 42.1,
"y_mean": 15.3,
"y_std": 8.7,
"smoothing": "Applied moving average with window size 10" # If smoothing used
},
"/tmp/plot456.html" # HTML file path
)
**Workflow - Time series with smoothing**:
# Plot temperature time series with 30-point moving average
result, html_path = create_line_plot(
file_path,
"temperature",
slice_str=":, 45, 90", # All times, specific lat/lon
moving_avg_window=30
)
# Download html_path to see both raw data and smoothed trend
"""
import plotly.graph_objects as go
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}, None
if not dataset_path:
return {"error": "No dataset path provided", "status": "failed"}, None
# Extract clean paths
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
if x_dataset_path:
x_dataset_path = x_dataset_path.split(" (")[0] if " (" in x_dataset_path else x_dataset_path
# Open file and read Y data
f, file_type = open_file_with_fallback(file_path)
try:
# Get Y dataset
if file_type == "HDF5":
y_var = f[dataset_path]
else:
y_var = f.variables[dataset_path]
# Check memory safety for Y data
safety_check = check_memory_safety(y_var.shape, y_var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'"
return error_result, None
# Read Y data
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
y_data = y_var[idx]
else:
y_data = y_var[:]
# Keep masked array for statistics
y_data_for_stats = y_data
# Convert to filled array for plotting
if isinstance(y_data, np.ma.MaskedArray):
y_data_for_plot = y_data.filled(np.nan)
else:
y_data_for_plot = np.array(y_data)
y_data_for_plot = y_data_for_plot.flatten()
# Handle X data
if x_dataset_path:
# Read X dataset
if file_type == "HDF5":
x_var = f[x_dataset_path]
else:
x_var = f.variables[x_dataset_path]
# Check memory safety for X data
x_safety_check = check_memory_safety(x_var.shape, x_var.dtype, slice_str, memory_limit_mb)
if not x_safety_check["safe"]:
error_result = {
"error": f"X data: {x_safety_check['error']}",
"estimated_size_mb": x_safety_check["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in x_safety_check:
error_result["suggested_slice"] = x_safety_check["suggested_slice"]
return error_result, None
# Read X data
if slice_str and slice_str.strip():
x_data = x_var[idx]
else:
x_data = x_var[:]
if not isinstance(x_data, np.ndarray):
x_data = np.array(x_data)
x_data = x_data.flatten()
if len(x_data) != len(y_data_for_plot):
return {
"error": f"X and Y data length mismatch: {len(x_data)} vs {len(y_data_for_plot)}",
"status": "failed"
}, None
else:
# Use indices as X
x_data = np.arange(len(y_data_for_plot))
finally:
f.close()
# Create figure
fig = go.Figure()
# Add raw data trace
fig.add_trace(go.Scatter(
x=x_data,
y=y_data_for_plot,
mode='lines+markers',
name='Raw Data',
line=dict(color='steelblue', width=1),
marker=dict(size=3),
opacity=0.7
))
# Add moving average if requested
smoothed_info = None
if moving_avg_window and moving_avg_window > 1:
# Validate window size
if moving_avg_window > len(y_data_for_plot):
smoothed_info = f"Moving average window ({moving_avg_window}) larger than data length ({len(y_data_for_plot)}), skipping smoothing"
else:
# Calculate moving average using convolution
window = np.ones(moving_avg_window) / moving_avg_window
y_smooth = np.convolve(y_data_for_plot, window, mode='valid')
# Adjust X data to match smoothed Y length
# 'valid' mode reduces length by (window_size - 1)
offset = (moving_avg_window - 1) // 2
x_smooth = x_data[offset:offset + len(y_smooth)]
# Add smoothed trace
fig.add_trace(go.Scatter(
x=x_smooth,
y=y_smooth,
mode='lines',
name=f'Moving Avg (n={moving_avg_window})',
line=dict(color='red', width=2),
opacity=0.9
))
smoothed_info = f"Applied moving average with window size {moving_avg_window}"
# Set title and labels
title = title_override if title_override else f"Line Plot: {dataset_path}"
xlabel = xlabel_override if xlabel_override else (x_dataset_path if x_dataset_path else "Index")
ylabel = ylabel_override if ylabel_override else dataset_path
fig.update_layout(
title=title,
xaxis_title=xlabel,
yaxis_title=ylabel,
height=500,
showlegend=True,
hovermode='closest'
)
# Save as interactive HTML
html_path = tempfile.mktemp(suffix='.html', dir='/tmp')
fig.write_html(html_path)
# Return JSON status
result = {
"status": "success",
"message": f"Line plot created for {dataset_path}",
"html_file": html_path,
"statistics": {
"num_points": len(y_data_for_plot),
"y_min": float(np.min(y_data_for_stats)),
"y_max": float(np.max(y_data_for_stats)),
"y_mean": float(np.mean(y_data_for_stats)),
"y_std": float(np.std(y_data_for_stats))
},
"download_info": "Download the HTML file to view the interactive line plot in your browser"
}
if smoothed_info:
result["smoothing"] = smoothed_info
return result, html_path
except Exception as e:
import traceback
return {
"error": f"Failed to create line plot: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}, None
def create_heatmap(file_path: str, dataset_path: str, slice_str: str = "",
title_override: str = "", xlabel_override: str = "", ylabel_override: str = "",
colorscale: str = "Viridis", memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Create 2D heatmap visualization.
**USE CASE**: Visualize spatial patterns and 2D data distributions.
**WHEN TO USE THIS**:
- Geographic/spatial data (temperature maps, elevation, satellite imagery)
- 2D slices from higher-dimensional data
- Matrices and correlation data
- Any data where you want to see spatial patterns
**FEATURES**:
- Multiple color scales for different data types. However, do not make any claims about what each color represents, as the colormap has default directionalities.
- Auto-validates data is 2D (will slice higher dimensions if needed)
- Interactive hover to inspect values
**OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file.
The HTML file contains an interactive Plotly heatmap viewable in any browser.
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to 2D dataset/variable
slice_str: Optional numpy-style slice (must result in 2D data, e.g., "0, :, :" for first time step)
title_override: Optional custom title (leave empty for automatic)
xlabel_override: Optional custom x-axis label (leave empty for automatic)
ylabel_override: Optional custom y-axis label (leave empty for automatic)
colorscale: Plotly colorscale name - options: Viridis, Plasma, Inferno, Magma, Cividis, Blues, Reds, YlOrRd, RdBu (default: Viridis)
memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000)
Returns:
Tuple of (status_dict, html_file_path):
- status_dict: Contains statistics and status info
- html_file_path: Path to interactive HTML file (downloadable)
**Return value structure**:
(
{
"status": "success",
"html_file": "/tmp/plot789.html", # ← Download this
"statistics": {
"shape": [90, 180],
"min": -5.2,
"max": 42.1,
"mean": 15.3,
"std": 8.7,
"colorscale": "Viridis"
}
},
"/tmp/plot789.html" # HTML file path
)
**Workflow - Spatial data visualization**:
# Visualize temperature at first time step
result, html_path = create_heatmap(
file_path,
"temperature",
slice_str="0, :, :", # Time=0, all lat, all lon
colorscale="RdBu" # Red-blue for temperature
)
# Download html_path to see the spatial pattern
**Color scale recommendations**:
- Temperature data: "RdBu" (red-blue diverging) (IMPORTANT: Blue is positive, red is negative here. This MAY be opposite to what you are thinking, but safest to just NOT make a claim about colors.)
- Elevation/depth: "Viridis" or "Plasma"
- Precipitation: "Blues" or "YlOrRd"
- General scientific: "Viridis" (perceptually uniform)
"""
import plotly.graph_objects as go
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}, None
if not dataset_path:
return {"error": "No dataset path provided", "status": "failed"}, None
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file and read data
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety
safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'"
return error_result, None
# Read data
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Keep masked array for statistics
data_for_stats = data
# Convert to filled array for plotting
if isinstance(data, np.ma.MaskedArray):
data_for_plot = data.filled(np.nan)
else:
data_for_plot = np.array(data)
# Ensure 2D
if data_for_plot.ndim == 1:
return {
"error": f"Data is 1D (shape {data_for_plot.shape}). Heatmap requires 2D data. Try using create_line_plot instead.",
"status": "failed"
}, None
elif data_for_plot.ndim > 2:
# Take first 2D slice
while data_for_plot.ndim > 2:
data_for_plot = data_for_plot[0]
suggestion = f"Data has {len(var.shape)}D shape. Showing first 2D slice. Use slice_str to select specific slice."
else:
suggestion = None
finally:
f.close()
# Calculate statistics using masked array
min_val = float(np.min(data_for_stats))
max_val = float(np.max(data_for_stats))
mean_val = float(np.mean(data_for_stats))
std_val = float(np.std(data_for_stats))
# Create figure using filled array
fig = go.Figure(data=go.Heatmap(
z=data_for_plot,
colorscale=colorscale,
colorbar=dict(title="Value"),
hovertemplate='X: %{x}
Y: %{y}
Value: %{z}'
))
# Set title and labels
title = title_override if title_override else f"Heatmap: {dataset_path}"
xlabel = xlabel_override if xlabel_override else "X"
ylabel = ylabel_override if ylabel_override else "Y"
fig.update_layout(
title=title,
xaxis_title=xlabel,
yaxis_title=ylabel,
height=600,
width=700
)
# Save as interactive HTML
html_path = tempfile.mktemp(suffix='.html', dir='/tmp')
fig.write_html(html_path)
# Return JSON status
result = {
"status": "success",
"message": f"Heatmap created for {dataset_path}",
"html_file": html_path,
"statistics": {
"shape": list(data_for_plot.shape),
"min": min_val,
"max": max_val,
"mean": mean_val,
"std": std_val,
"colorscale": colorscale
},
"download_info": "Download the HTML file to view the interactive heatmap in your browser"
}
if suggestion:
result["note"] = suggestion
return result, html_path
except Exception as e:
import traceback
return {
"error": f"Failed to create heatmap: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}, None
def create_scatter_plot(file_path: str, x_dataset_path: str, y_dataset_path: str,
x_slice_str: str = "", y_slice_str: str = "",
title_override: str = "", xlabel_override: str = "", ylabel_override: str = "",
memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Create scatter plot comparing two variables.
**USE CASE**: Explore relationships and correlations between two variables.
**WHEN TO USE THIS**:
- Check if two variables are correlated
- Compare data from different time periods or locations
- Validate model predictions vs observations
- Identify linear or non-linear relationships
**FEATURES**:
- Automatic correlation calculation (Pearson's r)
- Linear regression with R² value
- Regression line overlay
- Independent slicing for X and Y variables
**OUTPUT**: Returns both JSON status (with correlation stats) AND downloadable HTML file.
The HTML file contains an interactive Plotly scatter plot viewable in any browser.
Args:
file_path: Path to the HDF5/NetCDF file
x_dataset_path: Path to X-axis dataset/variable
y_dataset_path: Path to Y-axis dataset/variable
x_slice_str: Optional numpy-style slice for X data (can differ from Y slice)
y_slice_str: Optional numpy-style slice for Y data (can differ from X slice)
title_override: Optional custom title (leave empty for automatic)
xlabel_override: Optional custom x-axis label (leave empty for automatic)
ylabel_override: Optional custom y-axis label (leave empty for automatic)
memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000)
Returns:
Tuple of (status_dict, html_file_path):
- status_dict: Contains correlation and regression statistics
- html_file_path: Path to interactive HTML file (downloadable)
**Return value structure**:
(
{
"status": "success",
"html_file": "/tmp/plot999.html", # ← Download this
"statistics": {
"num_points": 1620,
"correlation": 0.87, # Pearson correlation coefficient
"r_squared": 0.76, # R² value
"slope": 1.23,
"intercept": -5.4,
"p_value": 1.2e-145 # Statistical significance
}
},
"/tmp/plot999.html" # HTML file path
)
**Workflow - Compare time periods**:
# Compare temperature at same location, different times
result, html_path = create_scatter_plot(
file_path,
x_dataset_path="temperature",
y_dataset_path="temperature",
x_slice_str="0, :, :", # Time index 0
y_slice_str="100, :, :" # Time index 100
)
# r² in result["statistics"]["r_squared"] shows how similar the patterns are
"""
import plotly.graph_objects as go
try:
from scipy import stats
except ImportError:
return {
"error": "scipy library not available. Install with: pip install scipy",
"status": "failed"
}, None
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}, None
if not x_dataset_path or not y_dataset_path:
return {"error": "Both X and Y dataset paths required", "status": "failed"}, None
# Extract clean paths
x_dataset_path = x_dataset_path.split(" (")[0] if " (" in x_dataset_path else x_dataset_path
y_dataset_path = y_dataset_path.split(" (")[0] if " (" in y_dataset_path else y_dataset_path
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get X and Y datasets (may be the same variable)
if file_type == "HDF5":
x_var = f[x_dataset_path]
# Get Y separately even if same path (avoid reference issues)
y_var = f[y_dataset_path]
else:
x_var = f.variables[x_dataset_path]
y_var = f.variables[y_dataset_path]
# Check memory safety for X
x_safety = check_memory_safety(x_var.shape, x_var.dtype, x_slice_str, memory_limit_mb)
if not x_safety["safe"]:
error_result = {
"error": f"X data: {x_safety['error']}",
"estimated_size_mb": x_safety["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in x_safety:
error_result["suggested_slice_x"] = x_safety["suggested_slice"]
return error_result, None
# Check memory safety for Y (use same variable shape if same dataset)
y_safety = check_memory_safety(y_var.shape, y_var.dtype, y_slice_str, memory_limit_mb)
if not y_safety["safe"]:
error_result = {
"error": f"Y data: {y_safety['error']}",
"estimated_size_mb": y_safety["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in y_safety:
error_result["suggested_slice_y"] = y_safety["suggested_slice"]
return error_result, None
# Read X data - parse slice and read
if x_slice_str and x_slice_str.strip():
x_idx = parse_slice(x_slice_str)
x_data = x_var[x_idx]
else:
x_data = x_var[:]
# Read Y data - parse slice and read
if y_slice_str and y_slice_str.strip():
y_idx = parse_slice(y_slice_str)
y_data = y_var[y_idx]
else:
y_data = y_var[:]
# Keep masked arrays for statistics
x_data_for_stats = x_data
y_data_for_stats = y_data
# Convert to filled arrays for plotting
if isinstance(x_data, np.ma.MaskedArray):
x_data_for_plot = x_data.filled(np.nan)
else:
x_data_for_plot = np.array(x_data)
if isinstance(y_data, np.ma.MaskedArray):
y_data_for_plot = y_data.filled(np.nan)
else:
y_data_for_plot = np.array(y_data)
# Flatten both arrays
x_data_for_plot = x_data_for_plot.flatten()
y_data_for_plot = y_data_for_plot.flatten()
# Check lengths match
if len(x_data_for_plot) != len(y_data_for_plot):
return {
"error": f"X and Y data length mismatch after slicing: {len(x_data_for_plot)} vs {len(y_data_for_plot)}",
"x_slice": x_slice_str if x_slice_str else "no slice",
"y_slice": y_slice_str if y_slice_str else "no slice",
"x_shape_after_slice": x_data_for_plot.shape,
"y_shape_after_slice": y_data_for_plot.shape,
"suggestion": "Adjust slices to produce equal-length arrays",
"status": "failed"
}, None
finally:
f.close()
# Calculate correlation (check for valid data)
if len(x_data_for_plot) < 2:
return {
"error": f"Not enough data points for correlation: {len(x_data_for_plot)} points (need at least 2)",
"status": "failed"
}, None
# For regression, need to filter out NaN/masked pairs
# Use the plot data (which has NaN for masked values) and create valid mask
x_flat = x_data_for_plot.flatten()
y_flat = y_data_for_plot.flatten()
# Create mask for valid (non-NaN) pairs
valid_mask = ~(np.isnan(x_flat) | np.isnan(y_flat))
x_valid = x_flat[valid_mask]
y_valid = y_flat[valid_mask]
if len(x_valid) < 2:
return {
"error": f"Not enough valid (non-NaN) data points: {len(x_valid)} points (need at least 2)",
"status": "failed"
}, None
# Compute correlation and regression on valid data only
correlation = float(np.corrcoef(x_valid, y_valid)[0, 1])
slope, intercept, r_value, p_value, std_err = stats.linregress(x_valid, y_valid)
# Create figure
fig = go.Figure()
# Add scatter points
fig.add_trace(go.Scatter(
x=x_data_for_plot,
y=y_data_for_plot,
mode='markers',
name='Data',
marker=dict(
size=6,
color='steelblue',
opacity=0.6
),
hovertemplate=f'{x_dataset_path}: %{{x}}
{y_dataset_path}: %{{y}}'
))
# Add regression line (use nanmin/nanmax to handle NaN values)
x_range = np.array([np.nanmin(x_data_for_plot), np.nanmax(x_data_for_plot)])
y_pred = slope * x_range + intercept
fig.add_trace(go.Scatter(
x=x_range,
y=y_pred,
mode='lines',
name=f'Fit (R²={r_value**2:.3f})',
line=dict(color='red', width=2, dash='dash'),
showlegend=True
))
# Set title and labels
title = title_override if title_override else f"{y_dataset_path} vs {x_dataset_path}"
xlabel = xlabel_override if xlabel_override else x_dataset_path
ylabel = ylabel_override if ylabel_override else y_dataset_path
fig.update_layout(
title=title,
xaxis_title=xlabel,
yaxis_title=ylabel,
height=600,
width=700,
showlegend=True,
hovermode='closest'
)
# Save as interactive HTML
html_path = tempfile.mktemp(suffix='.html', dir='/tmp')
fig.write_html(html_path)
# Return JSON status
result = {
"status": "success",
"message": f"Scatter plot created for {y_dataset_path} vs {x_dataset_path}",
"html_file": html_path,
"statistics": {
"num_points": len(x_data_for_plot),
"correlation": correlation,
"r_squared": float(r_value**2),
"slope": float(slope),
"intercept": float(intercept),
"p_value": float(p_value)
},
"download_info": "Download the HTML file to view the interactive scatter plot in your browser"
}
return result, html_path
except Exception as e:
import traceback
return {
"error": f"Failed to create scatter plot: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}, None
def coordinate_to_index(file_path: str, coordinate_var: str, value: float) -> Dict[str, Any]:
"""
Convert coordinate value to array index.
**USE CASE**: "What index corresponds to coordinate value X?"
Example: "What index corresponds to latitude 35.5?" or "What time index is year 2020?"
**WHEN TO USE THIS**:
- You have a coordinate value and need to find the nearest index for slicing
- You're building a slice string and need to know which indices to use
- You want to extract data at a specific coordinate location
Args:
file_path: Path to the HDF5/NetCDF file
coordinate_var: Name of coordinate variable (e.g., 'lat', 'lon', 'time', 'depth')
value: Coordinate value to find (will find nearest match)
Returns:
Dictionary with keys:
- status (str): "success" or error info
- index (int): The nearest index for this coordinate value
- requested_value (float): The value you searched for
- actual_value (float): The actual coordinate value at the found index
- distance (float): How far the actual value is from your requested value
- units (str): Units of the coordinate
- closest_match (bool): True if within half a grid spacing (good match)
- coordinate_variable (str): Name of the coordinate variable
- array_size (int): Total size of coordinate array
**Return value structure**:
{
"status": "success",
"index": 45, # ← Use this index in your slice
"requested_value": 35.5,
"actual_value": 36.0,
"distance": 0.5,
"closest_match": true,
"units": "degrees_north"
}
**Example workflow - Extract data at specific location**:
# Find index for latitude 40.0
lat_result = coordinate_to_index(file_path, "lat", 40.0)
lat_idx = lat_result["index"]
# Find index for longitude -75.0
lon_result = coordinate_to_index(file_path, "lon", -75.0)
lon_idx = lon_result["index"]
# Now use these indices to slice the data
slice_str = f":, {lat_idx}, {lon_idx}" # All times, specific lat/lon
data = read_dataset(file_path, "temperature", slice_str)
"""
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}
if not coordinate_var:
return {"error": "No coordinate variable specified", "status": "failed"}
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get coordinate variable
if file_type == "HDF5":
if coordinate_var not in f:
available = list(f.keys())
return {
"error": f"Coordinate variable '{coordinate_var}' not found",
"available_variables": available[:20], # Limit to avoid token explosion
"status": "failed"
}
coord_data = np.array(f[coordinate_var][:])
units = f[coordinate_var].attrs.get('units', 'unknown') if hasattr(f[coordinate_var], 'attrs') else 'unknown'
else: # NetCDF
if coordinate_var not in f.variables:
available = list(f.variables.keys())
return {
"error": f"Coordinate variable '{coordinate_var}' not found",
"available_variables": available[:20],
"status": "failed"
}
var = f.variables[coordinate_var]
coord_data = np.array(var[:])
units = var.units if hasattr(var, 'units') else 'unknown'
# Find closest index
idx = int(np.argmin(np.abs(coord_data - value)))
actual_value = float(coord_data[idx])
distance = abs(actual_value - value)
# Calculate if within half grid spacing (good match)
if len(coord_data) > 1:
grid_spacing = np.mean(np.abs(np.diff(coord_data)))
closest_match = distance < grid_spacing / 2
else:
closest_match = True
return {
"status": "success",
"index": idx,
"requested_value": value,
"actual_value": actual_value,
"distance": distance,
"units": str(units),
"closest_match": closest_match,
"coordinate_variable": coordinate_var,
"array_size": len(coord_data)
}
finally:
f.close()
except Exception as e:
import traceback
return {
"error": f"Failed to convert coordinate to index: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}
def index_to_coordinate(file_path: str, coordinate_var: str, index: int) -> Dict[str, Any]:
"""
Convert array index to coordinate value.
**USE CASE**: "What is the actual value at index N?"
Example: "What date is at time index 852?" or "What latitude is at index 45?"
**WHEN TO USE THIS**:
- You have an index number and need the corresponding coordinate value
- You're interpreting results from other tools that return indices
- You need to verify what a specific index represents
**DO NOT use this if**:
- The coordinate values are already in the file attributes → use get_attributes()
- You want to read the entire coordinate array → use read_dataset()
Args:
file_path: Path to the HDF5/NetCDF file
coordinate_var: Name of coordinate variable (e.g., 'lat', 'lon', 'time', 'depth')
index: Array index to look up (0-based indexing)
Returns:
Dictionary with keys:
- status (str): "success" or error info
- index (int): The queried index
- value (float): The coordinate value at that index
- units (str): Units of the coordinate (e.g., "degrees_north", "days since 1800-01-01")
- coordinate_variable (str): Name of the coordinate variable
- array_size (int): Total size of coordinate array
**Return value structure**:
{
"status": "success",
"index": 852,
"value": 55165.0, # ← The coordinate value you need
"units": "days since 1800-01-01 00:00:00",
"coordinate_variable": "time",
"array_size": 1750
}
**Example workflow**:
# You found that interesting data is at index 100
result = index_to_coordinate(file_path, "time", 100)
time_value = result["value"] # Get the actual time value
units = result["units"] # Get the units to interpret it
"""
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}
if not coordinate_var:
return {"error": "No coordinate variable specified", "status": "failed"}
# Open file
f, file_type = open_file_with_fallback(file_path)
try:
# Get coordinate variable
if file_type == "HDF5":
if coordinate_var not in f:
available = list(f.keys())
return {
"error": f"Coordinate variable '{coordinate_var}' not found",
"available_variables": available[:20],
"status": "failed"
}
coord_data = np.array(f[coordinate_var][:])
units = f[coordinate_var].attrs.get('units', 'unknown') if hasattr(f[coordinate_var], 'attrs') else 'unknown'
else: # NetCDF
if coordinate_var not in f.variables:
available = list(f.variables.keys())
return {
"error": f"Coordinate variable '{coordinate_var}' not found",
"available_variables": available[:20],
"status": "failed"
}
var = f.variables[coordinate_var]
coord_data = np.array(var[:])
units = var.units if hasattr(var, 'units') else 'unknown'
# Check index bounds
if index < 0 or index >= len(coord_data):
return {
"error": f"Index {index} out of bounds (array size: {len(coord_data)})",
"valid_range": f"0 to {len(coord_data)-1}",
"status": "failed"
}
value = float(coord_data[index])
return {
"status": "success",
"index": index,
"value": value,
"units": str(units),
"coordinate_variable": coordinate_var,
"array_size": len(coord_data)
}
finally:
f.close()
except Exception as e:
import traceback
return {
"error": f"Failed to convert index to coordinate: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}
def query_content(file_path: str, query: str) -> Dict[str, Any]:
"""
Semantic search through file contents - helps find relevant data.
**USE CASE**: "Find all datasets related to X" where X is a keyword.
**WHAT IT SEARCHES**:
- Dataset/variable names
- Attribute names and values
- Dimension names
- Long names and descriptions in metadata
**WHEN TO USE THIS**:
- You don't know the exact dataset name but know what you're looking for
- You want to find all variables related to a topic (e.g., "temperature", "wind", "pressure")
- The file has many variables and you need to narrow down
**SEARCH TIPS**:
- Use specific keywords (e.g., "temp", "precip", "wind")
- Search is case-insensitive
- Partial matches work (searching "temp" finds "temperature", "temp_anomaly", etc.)
Args:
file_path: Path to the HDF5/NetCDF file
query: Search term (case-insensitive, matches substrings)
Returns:
Dictionary with keys:
- datasets (list): Matching datasets with match reason
- dimensions (list): Matching dimensions
- num_dataset_matches (int): Count of dataset matches
- num_dimension_matches (int): Count of dimension matches
- total_matches (int): Total matches found
- query (str): The search term used
- message (str): Summary of results
**Return value structure**:
{
"datasets": [
{
"name": "air_temperature",
"shape": [100, 90, 180],
"dtype": "float32",
"match_reason": "name",
"long_name": "Air temperature",
"units": "K"
}
],
"dimensions": [],
"total_matches": 1,
"message": "Found 1 matches for query: 'temp'"
}
**Example workflow - Finding the right variable**:
# Search for temperature-related variables
results = query_content(file_path, "temp")
# Review matches and pick the right one
dataset_name = results["datasets"][0]["name"] # e.g., "air_temperature"
# Now use that dataset name
stats = compute_statistics(file_path, dataset_name)
"""
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}
if not query or not query.strip():
return {"error": "No query string provided", "status": "failed"}
query_lower = query.lower().strip()
# Open file
f, file_type = open_file_with_fallback(file_path)
matches = {
"datasets": [],
"attributes": [],
"dimensions": []
}
try:
if file_type == "HDF5":
def search_visitor(name, obj):
full_path = "/" + name
# Check dataset names
if isinstance(obj, h5py.Dataset):
if query_lower in name.lower():
matches["datasets"].append({
"path": full_path,
"shape": list(obj.shape),
"dtype": str(obj.dtype),
"match_reason": "name"
})
# Check dataset attributes
for attr_name, attr_val in obj.attrs.items():
if query_lower in attr_name.lower() or query_lower in str(attr_val).lower():
if full_path not in [d["path"] for d in matches["datasets"]]:
matches["datasets"].append({
"path": full_path,
"shape": list(obj.shape),
"dtype": str(obj.dtype),
"match_reason": f"attribute: {attr_name}"
})
break
f.visititems(search_visitor)
else: # NetCDF
# Search variable names and attributes
for var_name, var in f.variables.items():
matched = False
match_reason = None
# Check variable name
if query_lower in var_name.lower():
matched = True
match_reason = "name"
# Check attributes
if not matched:
var_attrs = get_var_attrs_dict(var, file_type)
for attr, attr_val in var_attrs.items():
if query_lower in attr.lower() or query_lower in str(attr_val).lower():
matched = True
match_reason = f"attribute: {attr}"
break
if matched:
info = {
"name": var_name,
"dimensions": list(var.dimensions) if hasattr(var, 'dimensions') else [],
"shape": list(var.shape),
"dtype": str(var.dtype),
"match_reason": match_reason
}
# Add key attributes
if hasattr(var, 'long_name'):
info["long_name"] = str(var.long_name)
if hasattr(var, 'units'):
info["units"] = str(var.units)
matches["datasets"].append(info)
# Search dimensions
for dim_name in f.dimensions.keys():
if query_lower in dim_name.lower():
matches["dimensions"].append({
"name": dim_name,
"size": len(f.dimensions[dim_name])
})
finally:
f.close()
matches["status"] = "success"
matches["query"] = query
matches["num_dataset_matches"] = len(matches["datasets"])
matches["num_dimension_matches"] = len(matches["dimensions"])
matches["total_matches"] = len(matches["datasets"]) + len(matches["dimensions"])
if matches["total_matches"] == 0:
matches["message"] = f"No matches found for query: '{query}'"
else:
matches["message"] = f"Found {matches['total_matches']} matches for query: '{query}'"
return matches
except Exception as e:
import traceback
return {
"error": f"Failed to search file contents: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}
def UI_HELPER(file_obj) -> Dict[str, Any]:
"""
UI ONLY function - MCP clients ignore this!
Browser interface helper for local files.
Returns file path for use in other tools.
Args:
file_obj: File object from browser interface
Returns:
Dictionary with file_path
"""
import shutil
try:
if file_obj is None:
return {
"error": "No file provided",
"status": "failed"
}
# Get the file path from the file object
# Gradio file objects have a .name attribute with the temp path
if hasattr(file_obj, 'name'):
file_path = file_obj.name
else:
file_path = str(file_obj)
# Verify it's an HDF5 or NetCDF file
if not any(file_path.endswith(ext) for ext in ['.h5', '.hdf5', '.nc', '.nc4']):
return {
"error": f"Unsupported file type. Must be .h5, .hdf5, .nc, or .nc4",
"file_path": file_path,
"status": "failed"
}
# Try to open to validate
try:
f, file_type = open_file_with_fallback(file_path)
f.close()
except Exception as e:
return {
"error": f"Failed to open file: {str(e)}",
"file_path": file_path,
"status": "failed"
}
return {
"status": "success",
"file_path": file_path,
"message": "File ready! Copy the file_path above to use in other tools.",
"instruction": "Use this file_path in the other tabs (Data Analysis, Visualizations)"
}
except Exception as e:
import traceback
return {
"error": f"Failed to process file: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}
def download_example(example_name: str) -> Dict[str, Any]:
"""
Download a pre-configured example dataset.
Provides quick access to curated test datasets for exploration.
Args:
example_name: Name of example dataset to download
Returns:
Dictionary containing file_path and file structure (same as download_file)
"""
# Curated example datasets from xarray-data repository
examples = {
"small_netcdf": {
"url": "https://github.com/pydata/xarray-data/raw/master/tiny.nc",
"description": "🛰️ Tiny NetCDF - Small test file for quick validation"
},
"ocean_basin": {
"url": "https://github.com/pydata/xarray-data/raw/master/basin_mask.nc",
"description": "🌊 Ocean Basin Mask - Global ocean basin classification"
},
"air_temperature": {
"url": "https://github.com/pydata/xarray-data/raw/master/air_temperature.nc",
"description": "🔬 Air Temperature - NCEP/NCAR reanalysis data"
},
"ocean_model": {
"url": "https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc",
"description": "🌍 Ocean Model - ROMS (Regional Ocean Modeling System) output"
},
"era_interim": {
"url": "https://github.com/pydata/xarray-data/raw/master/eraint_uvz.nc",
"description": "☁️ ERA-Interim - Wind and geopotential height data"
}
}
if not example_name:
return {
"error": "No example name provided",
"available_examples": list(examples.keys()),
"status": "failed"
}
if example_name not in examples:
return {
"error": f"Unknown example: '{example_name}'",
"available_examples": list(examples.keys()),
"descriptions": {k: v["description"] for k, v in examples.items()},
"status": "failed"
}
# Use the existing download_file function
example_info = examples[example_name]
result = download_file(example_info["url"])
# Add example metadata to result
if "status" in result and result["status"] == "success":
result["example_name"] = example_name
result["example_description"] = example_info["description"]
return result
def create_contour_plot(file_path: str, dataset_path: str, slice_str: str = "",
title_override: str = "", xlabel_override: str = "", ylabel_override: str = "",
num_contours: int = 20, colorscale: str = "Viridis",
memory_limit_mb: float = None) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Create contour plot for 2D data.
**USE CASE**: Visualize elevation/intensity maps with labeled contour lines.
**WHEN TO USE THIS**:
- Topographic/bathymetric data (elevation, ocean depth)
- Pressure/temperature fields with distinct levels
- When you need to see specific value boundaries
- Geographic data where contours show meaningful thresholds
**DIFFERENCE FROM HEATMAP**:
- Heatmap: Continuous color gradient, best for overall patterns
- Contour: Discrete lines at specific values, best for identifying levels
**FEATURES**:
- Labeled contour lines showing exact values
- Adjustable number of contour levels
- Multiple color scales
- Interactive hover to see values
**OUTPUT**: Returns both JSON status (with statistics) AND downloadable HTML file.
The HTML file contains an interactive Plotly contour plot viewable in any browser.
Args:
file_path: Path to the HDF5/NetCDF file
dataset_path: Path to 2D dataset/variable
slice_str: Optional numpy-style slice (must result in 2D data, e.g., "0, :, :" for first time step)
title_override: Optional custom title (leave empty for automatic)
xlabel_override: Optional custom x-axis label (leave empty for automatic)
ylabel_override: Optional custom y-axis label (leave empty for automatic)
num_contours: Number of contour levels (default: 20, range: 5-50)
colorscale: Plotly colorscale name - same options as heatmap (default: Viridis)
memory_limit_mb: Memory limit in MB (default: tiered 100/500/2000)
Returns:
Tuple of (status_dict, html_file_path):
- status_dict: Contains statistics and contour info
- html_file_path: Path to interactive HTML file (downloadable)
**Return value structure**:
(
{
"status": "success",
"html_file": "/tmp/plot321.html", # ← Download this
"statistics": {
"shape": [90, 180],
"min": -5000.0,
"max": 8848.0,
"mean": 237.5,
"std": 1250.3,
"num_contours": 20,
"colorscale": "Viridis"
}
},
"/tmp/plot321.html" # HTML file path
)
**Workflow - Elevation mapping**:
# Create topographic map with 30 elevation contours
result, html_path = create_contour_plot(
file_path,
"elevation",
num_contours=30,
colorscale="YlOrRd"
)
# Download html_path to see elevation levels with labeled contours
"""
import plotly.graph_objects as go
try:
if not file_path:
return {"error": "No file path provided", "status": "failed"}, None
if not dataset_path:
return {"error": "No dataset path provided", "status": "failed"}, None
# Extract clean path
dataset_path = dataset_path.split(" (")[0] if " (" in dataset_path else dataset_path
# Open file and read data
f, file_type = open_file_with_fallback(file_path)
try:
# Get dataset
if file_type == "HDF5":
var = f[dataset_path]
else:
var = f.variables[dataset_path]
# Check memory safety
safety_check = check_memory_safety(var.shape, var.dtype, slice_str, memory_limit_mb)
if not safety_check["safe"]:
error_result = {
"error": safety_check["error"],
"estimated_size_mb": safety_check["estimated_mb"],
"status": "failed"
}
if "suggested_slice" in safety_check:
error_result["suggested_slice"] = safety_check["suggested_slice"]
error_result["suggestion"] = f"Try using: slice_str='{safety_check['suggested_slice']}'"
return error_result, None
# Read data
if slice_str and slice_str.strip():
idx = parse_slice(slice_str)
data = var[idx]
else:
data = var[:]
# Keep masked array for statistics
data_for_stats = data
# Convert to filled array for plotting
if isinstance(data, np.ma.MaskedArray):
data_for_plot = data.filled(np.nan)
else:
data_for_plot = np.array(data)
# Ensure 2D
if data_for_plot.ndim == 1:
return {
"error": f"Data is 1D (shape {data_for_plot.shape}). Contour plot requires 2D data.",
"status": "failed"
}, None
elif data_for_plot.ndim > 2:
# Take first 2D slice
while data_for_plot.ndim > 2:
data_for_plot = data_for_plot[0]
suggestion = f"Data has {len(var.shape)}D shape. Showing first 2D slice. Use slice_str to select specific slice."
else:
suggestion = None
finally:
f.close()
# Calculate statistics using masked array
min_val = float(np.min(data_for_stats))
max_val = float(np.max(data_for_stats))
mean_val = float(np.mean(data_for_stats))
std_val = float(np.std(data_for_stats))
# Create figure using filled array
fig = go.Figure(data=go.Contour(
z=data_for_plot,
colorscale=colorscale,
ncontours=num_contours,
colorbar=dict(title="Value"),
hovertemplate='X: %{x}
Y: %{y}
Value: %{z}',
contours=dict(
showlabels=True,
labelfont=dict(size=10, color='white')
)
))
# Set title and labels
title = title_override if title_override else f"Contour Plot: {dataset_path}"
xlabel = xlabel_override if xlabel_override else "X"
ylabel = ylabel_override if ylabel_override else "Y"
fig.update_layout(
title=title,
xaxis_title=xlabel,
yaxis_title=ylabel,
height=600,
width=700
)
# Save as interactive HTML
html_path = tempfile.mktemp(suffix='.html', dir='/tmp')
fig.write_html(html_path)
# Return JSON status
result = {
"status": "success",
"message": f"Contour plot created for {dataset_path}",
"html_file": html_path,
"statistics": {
"shape": list(data_for_plot.shape),
"min": min_val,
"max": max_val,
"mean": mean_val,
"std": std_val,
"num_contours": num_contours,
"colorscale": colorscale
},
"download_info": "Download the HTML file to view the interactive contour plot in your browser"
}
if suggestion:
result["note"] = suggestion
return result, html_path
except Exception as e:
import traceback
return {
"error": f"Failed to create contour plot: {str(e)}",
"traceback": traceback.format_exc(),
"status": "failed"
}, None
# ============================================
# GRADIO MCP TOOLS (STATELESS)
# These are the ONLY functions exposed as MCP tools
# ============================================
def build_mcp_tools():
"""
Build explicit MCP tool interfaces.
CRITICAL: Use only Textbox/Number/Radio components - NO Dropdowns with state!
"""
tools = []
# Tool 1: Download File
tools.append(gr.Interface(
fn=download_file,
inputs=[
gr.Textbox(
label="URL",
placeholder="https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc",
info="Direct URL to HDF5 (.h5, .hdf5) or NetCDF (.nc, .nc4) file"
)
],
outputs=gr.JSON(label="Download Result - Copy the 'file_path' value to use in other tools"),
api_name="download_file",
title="Download File",
description="Download and parse HDF5/NetCDF file from URL. Returns file_path to use in other tools. IMPORTANT: Copy the 'file_path' from the result to use in other tool tabs."
))
# Tool 1b: UI Helper (UI ONLY)
tools.append(gr.Interface(
fn=UI_HELPER,
inputs=[
gr.File(
label="Select Local File",
file_types=[".h5", ".hdf5", ".nc", ".nc4"],
type="filepath"
)
],
outputs=gr.JSON(label="File Path Result - Copy the 'file_path' value to use in other tools"),
api_name="UI_HELPER",
title="Local File Upload",
description="UI ONLY function - MCP clients ignore this! Browser interface helper. Returns file_path for use in other tools."
))
# Tool 1c: Download Example Dataset
tools.append(gr.Interface(
fn=download_example,
inputs=[
gr.Radio(
label="Select Example Dataset",
choices=[
"small_netcdf",
"ocean_basin",
"air_temperature",
"ocean_model",
"era_interim"
],
value="small_netcdf",
info="Pre-configured example datasets for quick exploration"
)
],
outputs=gr.JSON(label="Download Result - Copy the 'file_path' value to use in other tools"),
api_name="download_example",
title="Download Example Dataset",
description="Download pre-configured example datasets. Returns file_path and structure. Available: small_netcdf, ocean_basin, air_temperature, ocean_model, era_interim"
))
# Tool 2: List Structure
tools.append(gr.Interface(
fn=list_structure,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
)
],
outputs=gr.JSON(label="File Structure"),
api_name="list_structure",
title="List Structure",
description="List complete hierarchical structure of the file"
))
# Tool 3: Read Dataset
tools.append(gr.Interface(
fn=read_dataset,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to dataset/variable (e.g., 'temperature', '/group/data')"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0:10, :, 5",
value="",
info="NumPy-style slice notation"
)
],
outputs=gr.JSON(label="Dataset Contents"),
api_name="read_dataset",
title="Read Dataset",
description="Read data from a specific dataset/variable. Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max). Override with memory_limit_mb if needed."
))
# Tool 4: Get Dataset Preview
tools.append(gr.Interface(
fn=get_dataset_preview,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to dataset/variable"
),
gr.Number(
label="Preview Size",
value=10,
minimum=5,
maximum=100,
info="Number of elements to preview"
)
],
outputs=gr.JSON(label="Dataset Preview"),
api_name="get_dataset_preview",
title="Preview Dataset",
description="Safely preview dataset without loading full array"
))
# Tool 5: Get Attributes
tools.append(gr.Interface(
fn=get_attributes,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Object Path (optional)",
placeholder="",
value="",
info="Path to dataset/group (empty for file-level attributes)"
)
],
outputs=gr.JSON(label="Attributes"),
api_name="get_attributes",
title="Get Attributes",
description="Get metadata attributes for file, group, or dataset"
))
# Tool 6: Get Dimensions
tools.append(gr.Interface(
fn=get_dimensions,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path (optional)",
placeholder="",
value="",
info="Path to dataset (empty for NetCDF to see all dimensions)"
)
],
outputs=gr.JSON(label="Dimension Information"),
api_name="get_dimensions",
title="Get Dimensions",
description="Get dimension and shape information"
))
# Tool 7: Compute Statistics
tools.append(gr.Interface(
fn=compute_statistics,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to dataset/variable"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0:10, :, :",
value="",
info="Compute statistics over a subset"
)
],
outputs=gr.JSON(label="Statistical Results"),
api_name="compute_statistics",
title="Compute Statistics",
description="Calculate statistical measures (min, max, mean, std, median). Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)."
))
# Tool 8: Export Data
tools.append(gr.Interface(
fn=export_data,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to dataset/variable"
),
gr.Radio(
choices=["csv", "json"],
value="csv",
label="Format",
info="Export format"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0:100, :, :",
value="",
info="Export a subset of data"
),
gr.Number(
label="Max Rows (CSV only)",
value=100000,
minimum=1000,
maximum=1000000,
info="Safety limit for CSV files"
)
],
outputs=[gr.JSON(label="Export Status"), gr.File(label="Download File")],
api_name="export_data",
title="Export Data",
description="Export dataset to CSV or JSON format. Default limits: 100 MB (safe), 500 MB (warning), 2000 MB (max)."
))
# Tool 9: Create Histogram
tools.append(gr.Interface(
fn=create_histogram,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/tmp/file.nc",
info="Path to the HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to dataset/variable"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0, :, :",
value="",
info="Optional subsetting before histogram"
),
gr.Textbox(
label="Title (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic title"
),
gr.Textbox(
label="X-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic x-axis label"
),
gr.Number(
label="Number of Bins",
value=50,
minimum=5,
maximum=200,
info="Number of bins for histogram (default: 50)"
)
],
outputs=[
gr.JSON(label="Histogram Status & Statistics"),
gr.File(label="Download Interactive HTML")
],
api_name="create_histogram",
title="Create Histogram",
description="Generate histogram showing distribution of values. Returns interactive plot and downloadable HTML file."
))
# Tool 10: Line Plot
tools.append(gr.Interface(
fn=create_line_plot,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Y Dataset Path",
placeholder="temperature",
info="Path to Y-axis dataset/variable"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0:100, :, 5",
value="",
info="Numpy-style slice notation"
),
gr.Textbox(
label="Title (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic title"
),
gr.Textbox(
label="X-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic x-axis label"
),
gr.Textbox(
label="Y-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic y-axis label"
),
gr.Textbox(
label="X Dataset Path (optional)",
placeholder="time",
value="",
info="Path to X-axis data (if empty, uses indices)"
),
gr.Number(
label="Moving Average Window (optional)",
value=0,
minimum=0,
maximum=1000,
info="Window size for smoothing (0 or 1 = no smoothing, 2-1000 = apply moving average)"
)
],
outputs=[
gr.JSON(label="Line Plot Status & Statistics"),
gr.File(label="Download Interactive HTML")
],
api_name="create_line_plot",
title="Create Line Plot",
description="Generate line plot for 1D data or time series. Returns statistics and downloadable HTML file."
))
# Tool 11: Heatmap
tools.append(gr.Interface(
fn=create_heatmap,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="temperature",
info="Path to 2D dataset/variable"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0, :, :",
value="",
info="Numpy-style slice notation (must result in 2D)"
),
gr.Textbox(
label="Title (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic title"
),
gr.Textbox(
label="X-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic x-axis label"
),
gr.Textbox(
label="Y-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic y-axis label"
),
gr.Radio(
label="Colorscale",
choices=["Viridis", "Plasma", "Inferno", "Magma", "Cividis", "Blues", "Reds", "YlOrRd", "RdBu"],
value="Viridis",
info="Plotly colorscale for heatmap"
)
],
outputs=[
gr.JSON(label="Heatmap Status & Statistics"),
gr.File(label="Download Interactive HTML")
],
api_name="create_heatmap",
title="Create Heatmap",
description="Generate 2D heatmap visualization. Returns statistics and downloadable HTML file."
))
# Tool 12: Scatter Plot
tools.append(gr.Interface(
fn=create_scatter_plot,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="X Dataset Path",
placeholder="temperature",
info="Path to X-axis dataset/variable"
),
gr.Textbox(
label="Y Dataset Path",
placeholder="salinity",
info="Path to Y-axis dataset/variable"
),
gr.Textbox(
label="X Slice (optional)",
placeholder="0, :, :",
value="",
info="Numpy-style slice for X data (can differ from Y slice)"
),
gr.Textbox(
label="Y Slice (optional)",
placeholder="1, :, :",
value="",
info="Numpy-style slice for Y data (can differ from X slice)"
),
gr.Textbox(
label="Title (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic title"
),
gr.Textbox(
label="X-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic x-axis label"
),
gr.Textbox(
label="Y-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic y-axis label"
)
],
outputs=[
gr.JSON(label="Scatter Plot Status & Statistics"),
gr.File(label="Download Interactive HTML")
],
api_name="create_scatter_plot",
title="Create Scatter Plot",
description="Generate scatter plot comparing two variables. Includes correlation and regression. Returns statistics and downloadable HTML file."
))
# Tool 13: Contour Plot
tools.append(gr.Interface(
fn=create_contour_plot,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Dataset Path",
placeholder="elevation",
info="Path to 2D dataset/variable"
),
gr.Textbox(
label="Slice (optional)",
placeholder="0, :, :",
value="",
info="Numpy-style slice notation (must result in 2D)"
),
gr.Textbox(
label="Title (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic title"
),
gr.Textbox(
label="X-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic x-axis label"
),
gr.Textbox(
label="Y-axis Label (optional)",
placeholder="Leave blank for automatic",
value="",
info="Override automatic y-axis label"
),
gr.Number(
label="Number of Contours",
value=20,
minimum=5,
maximum=50,
info="Number of contour levels"
),
gr.Radio(
label="Colorscale",
choices=["Viridis", "Plasma", "Inferno", "Magma", "Cividis", "Blues", "Reds", "YlOrRd", "RdBu"],
value="Viridis",
info="Plotly colorscale for contours"
)
],
outputs=[
gr.JSON(label="Contour Plot Status & Statistics"),
gr.File(label="Download Interactive HTML")
],
api_name="create_contour_plot",
title="Create Contour Plot",
description="Generate contour plot for 2D data. Returns statistics and downloadable HTML file."
))
# Tool 14: Coordinate to Index
tools.append(gr.Interface(
fn=coordinate_to_index,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Coordinate Variable",
placeholder="lat",
info="Name of coordinate variable (e.g., 'lat', 'lon', 'time')"
),
gr.Number(
label="Coordinate Value",
value=0.0,
info="Value to find in coordinate array"
)
],
outputs=gr.JSON(label="Index Lookup Result"),
api_name="coordinate_to_index",
title="Coordinate to Index",
description="Convert coordinate value to array index. E.g., 'what index corresponds to latitude 35.5?'"
))
# Tool 15: Index to Coordinate
tools.append(gr.Interface(
fn=index_to_coordinate,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Coordinate Variable",
placeholder="lat",
info="Name of coordinate variable (e.g., 'lat', 'lon', 'time')"
),
gr.Number(
label="Array Index",
value=0,
minimum=0,
info="Index to look up in coordinate array"
)
],
outputs=gr.JSON(label="Coordinate Lookup Result"),
api_name="index_to_coordinate",
title="Index to Coordinate",
description="Convert array index to coordinate value. E.g., 'what latitude is at index 142?'"
))
# Tool 16: Query Content (Semantic Search)
tools.append(gr.Interface(
fn=query_content,
inputs=[
gr.Textbox(
label="File Path",
placeholder="/path/to/file.nc",
info="Path to HDF5/NetCDF file"
),
gr.Textbox(
label="Search Query",
placeholder="temperature",
info="Search term to find in dataset names, attributes, and metadata"
)
],
outputs=gr.JSON(label="Search Results"),
api_name="query_content",
title="Query Content (Semantic Search)",
description="Search through file contents to find relevant datasets. Searches names, attributes, dimensions, and metadata."
))
return tools
# ============================================
# MAIN LAUNCH
# ============================================
if __name__ == "__main__":
# Build MCP tools (these ARE exposed to MCP)
mcp_tools = build_mcp_tools()
# Create organized interface with nested tabs
# All individual tools still exposed to MCP via api_name
with gr.Blocks(title="HDF5/NetCDF MCP Server") as demo:
gr.Markdown("# HDF5/NetCDF MCP Server")
gr.Markdown("Tools for analyzing HDF5 and NetCDF scientific data files")
with gr.Tab("📥 File Loading"):
with gr.Tabs():
with gr.Tab("Download from URL"):
mcp_tools[0].render() # Download File
with gr.Tab("Local File"):
mcp_tools[1].render() # UI Helper - Local File
with gr.Tab("Example Datasets"):
gr.Markdown("### 📚 Quick Start with Example Datasets")
gr.Markdown("Pre-configured datasets from the xarray-data repository for testing and exploration.")
mcp_tools[2].render() # Download Example
with gr.Tab("📋 Data Analysis & Utilities"):
with gr.Tabs():
with gr.Tab("Structure & Info"):
mcp_tools[3].render() # List Structure
with gr.Tab("Read Dataset"):
mcp_tools[4].render() # Read Dataset
with gr.Tab("Preview Dataset"):
mcp_tools[5].render() # Preview Dataset
with gr.Tab("Get Attributes"):
mcp_tools[6].render() # Get Attributes
with gr.Tab("Get Dimensions"):
mcp_tools[7].render() # Get Dimensions
with gr.Tab("Compute Statistics"):
mcp_tools[8].render() # Compute Statistics
with gr.Tab("Export Data"):
mcp_tools[9].render() # Export Data
with gr.Tab("Coordinate → Index"):
mcp_tools[15].render() # Coordinate to Index
with gr.Tab("Index → Coordinate"):
mcp_tools[16].render() # Index to Coordinate
with gr.Tab("Search Content"):
mcp_tools[17].render() # Query Content
with gr.Tab("📊 Visualizations"):
with gr.Tabs():
with gr.Tab("Histogram"):
mcp_tools[10].render() # Create Histogram
with gr.Tab("Line Plot"):
mcp_tools[11].render() # Create Line Plot
with gr.Tab("Heatmap"):
mcp_tools[12].render() # Create Heatmap
with gr.Tab("Scatter Plot"):
mcp_tools[13].render() # Create Scatter Plot
with gr.Tab("Contour Plot"):
mcp_tools[14].render() # Create Contour Plot
# Launch with MCP server enabled
demo.launch(mcp_server=True)