""" Pure backend registration functions for pystackreg-app. These functions implement the three image-registration workflows as file-based, MCP-friendly operations. They take TIFF file paths as inputs and return the path to the output TIFF file. No Gradio objects are returned. Gradio MCP uses function names, type hints, and docstrings to build MCP tool schemas, so all three are kept clear and complete. """ import ipaddress import os import socket import tempfile import urllib.parse import urllib.request from typing import Optional import numpy as np import tifffile from pystackreg import StackReg from core.utils import WORK_DIR, DEMO_DIR, get_sr_mode, load_stack, normalize_stack # --------------------------------------------------------------------------- # Validation helpers # --------------------------------------------------------------------------- VALID_MODES = {"TRANSLATION", "RIGID_BODY", "SCALED_ROTATION", "AFFINE", "BILINEAR"} # Maximum size allowed for HTTP downloads (prevents resource-exhaustion attacks). _MAX_DOWNLOAD_BYTES = 500 * 1024 * 1024 # 500 MB _DOWNLOAD_TIMEOUT = 30 # seconds # Valid TIFF magic byte sequences (little/big-endian classic and BigTIFF). _TIFF_MAGIC = ( b"II\x2A\x00", # little-endian TIFF b"MM\x00\x2A", # big-endian TIFF b"II\x2B\x00", # little-endian BigTIFF b"MM\x00\x2B", # big-endian BigTIFF ) def _block_private_url(url: str) -> None: """Raise ValueError if *url* resolves to a private, loopback, link-local, reserved, or multicast address (SSRF protection).""" parsed = urllib.parse.urlparse(url) host = parsed.hostname if not host: raise ValueError(f"Could not parse host from URL: {url!r}") try: infos = socket.getaddrinfo(host, None) except socket.gaierror as exc: raise ValueError(f"Could not resolve host '{host}': {exc}") from exc for info in infos: addr_str = info[4][0] try: addr = ipaddress.ip_address(addr_str) except ValueError: continue if ( addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_reserved or addr.is_multicast or addr.is_unspecified ): raise ValueError( f"Requests to private or internal addresses are not allowed " f"('{host}' resolved to {addr})." ) class _SafeRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): _block_private_url(newurl) return super().redirect_request(req, fp, code, msg, headers, newurl) _SAFE_URL_OPENER = urllib.request.build_opener(_SafeRedirectHandler) def _download_tiff_to_work_dir(url: str, label: str) -> str: """Download *url* to a temp file in WORK_DIR, validating TIFF magic and size. Uses chunked streaming so that the full file is never held in memory. Raises ValueError on SSRF, size-limit, or magic-byte failures. Cleans up the temp file before raising on any error. """ _block_private_url(url) fd, local_path = tempfile.mkstemp(suffix=".tif", dir=WORK_DIR) os.close(fd) total = 0 first4 = b"" try: with _SAFE_URL_OPENER.open(url, timeout=_DOWNLOAD_TIMEOUT) as resp, open(local_path, "wb") as f: _block_private_url(resp.geturl()) while True: chunk = resp.read(1024 * 1024) if not chunk: break if len(first4) < 4: first4 = (first4 + chunk[: 4 - len(first4)])[:4] if len(first4) == 4 and not any(first4.startswith(magic) for magic in _TIFF_MAGIC): raise ValueError(f"{label} does not appear to be a valid TIFF file.") total += len(chunk) if total > _MAX_DOWNLOAD_BYTES: raise ValueError( f"{label} exceeds the maximum allowed download size of " f"{_MAX_DOWNLOAD_BYTES // (1024 * 1024)} MB." ) f.write(chunk) if total == 0: raise ValueError(f"{label} is empty.") if len(first4) < 4 or not any(first4.startswith(magic) for magic in _TIFF_MAGIC): raise ValueError(f"{label} does not appear to be a valid TIFF file.") return local_path except Exception: try: os.unlink(local_path) except FileNotFoundError: pass raise def _resolve_path(path_or_url: str, label: str = "file") -> str: """Return a local, sandbox-safe path for *path_or_url*. - If the value starts with ``http://`` or ``https://``, the file is downloaded to WORK_DIR and the local path is returned. This is the intended flow for MCP clients, which cannot upload files directly. - Otherwise the value is treated as a local path and must resolve within the app sandbox enforced by _require_file(): WORK_DIR (for outputs from previous tool calls) or DEMO_DIR (for cached demo files). """ if path_or_url.startswith(("http://", "https://")): return _download_tiff_to_work_dir(path_or_url, label) _require_file(path_or_url, label) return path_or_url def _validate_mode(mode: str) -> None: """Raise ValueError if *mode* is not a supported transformation mode.""" if mode not in VALID_MODES: raise ValueError( f"Invalid transformation mode '{mode}'. " f"Must be one of: {', '.join(sorted(VALID_MODES))}." ) def _validate_index(idx: int, stack_len: int, name: str = "frame index") -> None: """Raise IndexError if *idx* is outside [0, stack_len).""" if not (0 <= idx < stack_len): raise IndexError( f"{name} {idx} is out of range for a stack with {stack_len} frame(s) " f"(valid range: 0 to {stack_len - 1})." ) def _require_file(path: str, label: str = "file") -> None: """Raise an error if *path* does not exist or is outside the app's sandbox. Allowed locations are WORK_DIR (outputs from previous tool calls, enabling tool chaining) and DEMO_DIR (cached demo files). Symlinks are resolved first to prevent traversal attacks. Remote MCP clients should pass HTTP/HTTPS URLs; _resolve_path() downloads them to WORK_DIR automatically. """ real = os.path.realpath(path) sandboxes = (os.path.realpath(WORK_DIR), os.path.realpath(DEMO_DIR)) if not any(real == s or real.startswith(s + os.sep) for s in sandboxes): raise ValueError( f"{label} must be an HTTP/HTTPS URL or a path returned by a previous " "tool call. Pass a URL and it will be downloaded automatically." ) if not os.path.isfile(path): raise FileNotFoundError(f"{label} not found: {path}") # --------------------------------------------------------------------------- # Private computation helpers (array-in / array-out, no file I/O) # --------------------------------------------------------------------------- def _run_align_to_reference( stack: np.ndarray, ref_frame: np.ndarray, mode: str ) -> np.ndarray: """Register every frame in *stack* against *ref_frame*. Returns normalised uint8 array.""" sr = StackReg(get_sr_mode(mode)) return normalize_stack(np.stack([sr.register_transform(ref_frame, fr) for fr in stack])) def _run_align_to_stack( ref_stack: np.ndarray, mov_stack: np.ndarray, mode: str ) -> np.ndarray: """Register every frame in *mov_stack* against the first frame of *ref_stack*. Returns normalised uint8 array.""" sr = StackReg(get_sr_mode(mode)) return normalize_stack(np.stack([sr.register_transform(ref_stack[0], fr) for fr in mov_stack])) # --------------------------------------------------------------------------- # Public backend functions (exposed as MCP tools) # --------------------------------------------------------------------------- def align_stack_to_reference( stack_file: str, reference_index: int = 0, mode: str = "RIGID_BODY", external_reference_file: Optional[str] = None, external_reference_index: int = 0, ) -> str: """ Align every frame in a TIFF stack to a chosen reference frame (intra-stack alignment). Each frame in *stack_file* is registered to the selected reference frame using the chosen transformation model. The reference frame can come from the same stack or from a separate external TIFF stack. Args: stack_file: Path to the input TIFF stack whose frames will be aligned. reference_index: Zero-based index of the reference frame inside *stack_file* (ignored when *external_reference_file* is provided). Default is 0. mode: Transformation model for registration. One of: TRANSLATION, RIGID_BODY, SCALED_ROTATION, AFFINE, BILINEAR. Default is RIGID_BODY. external_reference_file: Optional path to an external TIFF stack from which the reference frame is taken. When provided, *reference_index* is ignored and *external_reference_index* is used instead. external_reference_index: Zero-based index of the reference frame inside *external_reference_file*. Default is 0. Returns: Path to the aligned output TIFF file (same number of frames as input). Raises: FileNotFoundError: If *stack_file* or *external_reference_file* does not exist on disk. IndexError: If *reference_index* or *external_reference_index* is out of range for the corresponding stack. ValueError: If *mode* is not one of the supported transformation modes. """ _validate_mode(mode) stack_file = _resolve_path(stack_file, "stack_file") stack = load_stack(stack_file) if external_reference_file is not None: external_reference_file = _resolve_path(external_reference_file, "external_reference_file") ext_stack = load_stack(external_reference_file) _validate_index(external_reference_index, len(ext_stack), "external_reference_index") ref_frame = ext_stack[external_reference_index] else: _validate_index(reference_index, len(stack), "reference_index") ref_frame = stack[reference_index] aligned = _run_align_to_reference(stack, ref_frame, mode) fd, out_path = tempfile.mkstemp(suffix=".tif", dir=WORK_DIR) os.close(fd) tifffile.imwrite(out_path, aligned, photometric="minisblack") return out_path def align_stack_to_stack( reference_stack_file: str, moving_stack_file: str, mode: str = "RIGID_BODY", ) -> str: """ Align every frame in a moving TIFF stack to the first frame of a reference TIFF stack. All frames in *moving_stack_file* are registered against the first frame of *reference_stack_file* using the specified transformation model. Args: reference_stack_file: Path to the reference TIFF stack. Its first frame is used as the alignment target for all frames in the moving stack. moving_stack_file: Path to the moving TIFF stack to align. mode: Transformation model for registration. One of: TRANSLATION, RIGID_BODY, SCALED_ROTATION, AFFINE, BILINEAR. Default is RIGID_BODY. Returns: Path to the aligned output TIFF file (same number of frames as the moving stack). Raises: FileNotFoundError: If *reference_stack_file* or *moving_stack_file* does not exist on disk. ValueError: If *mode* is not one of the supported transformation modes. """ _validate_mode(mode) reference_stack_file = _resolve_path(reference_stack_file, "reference_stack_file") moving_stack_file = _resolve_path(moving_stack_file, "moving_stack_file") ref_stack = load_stack(reference_stack_file) mov_stack = load_stack(moving_stack_file) aligned = _run_align_to_stack(ref_stack, mov_stack, mode) fd, out_path = tempfile.mkstemp(suffix=".tif", dir=WORK_DIR) os.close(fd) tifffile.imwrite(out_path, aligned, photometric="minisblack") return out_path def align_frame_to_frame( stack_file: str, reference_index: int, moving_index: int, mode: str = "RIGID_BODY", ) -> str: """ Align a single moving frame to a reference frame within the same TIFF stack. Registers the frame at *moving_index* against the frame at *reference_index* inside *stack_file* and writes the result as a single-frame TIFF. Args: stack_file: Path to the TIFF stack that contains both frames. reference_index: Zero-based index of the reference frame within the stack. moving_index: Zero-based index of the frame to align (the moving frame). mode: Transformation model for registration. One of: TRANSLATION, RIGID_BODY, SCALED_ROTATION, AFFINE, BILINEAR. Default is RIGID_BODY. Returns: Path to the aligned output TIFF file (single-frame TIFF). Raises: FileNotFoundError: If *stack_file* does not exist on disk. IndexError: If *reference_index* or *moving_index* is out of range for the stack. ValueError: If *mode* is not one of the supported transformation modes. """ _validate_mode(mode) stack_file = _resolve_path(stack_file, "stack_file") stack = load_stack(stack_file) _validate_index(reference_index, len(stack), "reference_index") _validate_index(moving_index, len(stack), "moving_index") sr = StackReg(get_sr_mode(mode)) aligned = normalize_stack(np.stack([sr.register_transform(stack[reference_index], stack[moving_index])]))[0] fd, out_path = tempfile.mkstemp(suffix=".tif", dir=WORK_DIR) os.close(fd) tifffile.imwrite(out_path, aligned[np.newaxis, ...], photometric="minisblack") return out_path