# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Caching and incremental build support for JSON output extension.""" from collections.abc import Callable from pathlib import Path from threading import Lock from typing import Any, ClassVar from sphinx.util import logging logger = logging.getLogger(__name__) class JSONOutputCache: """Manages caching and incremental builds for JSON output.""" # Class-level shared caches with thread safety _shared_cache_lock = Lock() _shared_metadata_cache: ClassVar[dict[str, Any]] = {} _shared_frontmatter_cache: ClassVar[dict[str, Any]] = {} _shared_content_cache: ClassVar[dict[str, Any]] = {} _file_timestamps: ClassVar[dict[str, float]] = {} # Track file modification times def __init__(self): """Initialize cache instance with shared caches.""" with self._shared_cache_lock: self._metadata_cache = self._shared_metadata_cache self._frontmatter_cache = self._shared_frontmatter_cache self._content_cache = self._shared_content_cache self._timestamps = self._file_timestamps def get_metadata_cache(self) -> dict[str, Any]: """Get the metadata cache.""" return self._metadata_cache def get_frontmatter_cache(self) -> dict[str, Any]: """Get the frontmatter cache.""" return self._frontmatter_cache def get_content_cache(self) -> dict[str, Any]: """Get the content cache.""" return self._content_cache def needs_update(self, docname: str, source_path: Path, incremental_enabled: bool = False) -> bool: """Check if document needs to be updated based on modification time.""" if not incremental_enabled: return True # Process all files if incremental build is disabled try: if not source_path or not source_path.exists(): return True current_mtime = source_path.stat().st_mtime # Check if we have a recorded timestamp if docname in self._timestamps: return current_mtime > self._timestamps[docname] else: # First time processing this file self._timestamps[docname] = current_mtime return True except Exception as e: # noqa: BLE001 logger.debug(f"Error checking modification time for {docname}: {e}") return True # Process if we can't determine modification time def mark_updated(self, docname: str, source_path: Path) -> None: """Mark document as processed with current timestamp.""" try: if source_path and source_path.exists(): self._timestamps[docname] = source_path.stat().st_mtime except Exception: # noqa: BLE001 logger.debug(f"Could not update timestamp for {docname}") def clear_caches(self) -> None: """Clear all caches (useful for testing or memory cleanup).""" with self._shared_cache_lock: self._metadata_cache.clear() self._frontmatter_cache.clear() self._content_cache.clear() self._timestamps.clear() def get_cache_stats(self) -> dict[str, int]: """Get cache statistics for debugging.""" return { "metadata_cache_size": len(self._metadata_cache), "frontmatter_cache_size": len(self._frontmatter_cache), "content_cache_size": len(self._content_cache), "timestamps_size": len(self._timestamps), } def with_cache_lock(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: # noqa: ANN401 """Execute function with cache lock held.""" with self._shared_cache_lock: return func(*args, **kwargs)