Spaces:
Configuration error
Configuration error
| # SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Document processing and build orchestration for JSON output extension.""" | |
| import multiprocessing | |
| from collections.abc import Callable | |
| from concurrent.futures import ThreadPoolExecutor | |
| from sphinx.application import Sphinx | |
| from sphinx.config import Config | |
| from sphinx.util import logging | |
| from ..core.builder import JSONOutputBuilder | |
| from ..utils import get_setting, validate_content_gating_integration | |
| logger = logging.getLogger(__name__) | |
| def on_build_finished(app: Sphinx, exception: Exception) -> None: | |
| """Generate JSON files after HTML build is complete.""" | |
| if exception is not None: | |
| return | |
| verbose = get_setting(app.config, "verbose", False) | |
| log_func = logger.info if verbose else logger.debug | |
| log_func("Generating JSON output files...") | |
| # Setup and validation | |
| json_builder = _setup_json_builder(app) | |
| if not json_builder: | |
| return | |
| # Get and filter documents | |
| all_docs = _filter_documents(app, json_builder, log_func) | |
| # Process documents | |
| generated_count, failed_count = _process_documents(app, json_builder, all_docs, log_func) | |
| # Final logging | |
| _log_results(log_func, generated_count, failed_count) | |
| def _setup_json_builder(app: Sphinx) -> JSONOutputBuilder | None: | |
| """Setup and validate JSON builder.""" | |
| validate_content_gating_integration(app) | |
| try: | |
| return JSONOutputBuilder(app) | |
| except Exception: | |
| logger.exception("Failed to initialize JSONOutputBuilder") | |
| return None | |
| def _filter_documents(app: Sphinx, json_builder: JSONOutputBuilder, log_func: Callable[[str], None]) -> list[str]: | |
| """Filter documents based on gating, incremental build, and size limits.""" | |
| all_docs, gated_docs = _get_initial_documents(app, json_builder) | |
| if gated_docs: | |
| log_func(f"Content gating: excluding {len(gated_docs)} documents from JSON generation") | |
| verbose = get_setting(app.config, "verbose", False) | |
| if verbose and gated_docs: | |
| logger.debug(f"Gated documents: {', '.join(sorted(gated_docs))}") | |
| all_docs = _apply_incremental_filtering(app, json_builder, all_docs, log_func) | |
| return _apply_size_filtering(app, all_docs, log_func) | |
| def _get_initial_documents(app: Sphinx, json_builder: JSONOutputBuilder) -> tuple[list[str], list[str]]: | |
| """Get initial document lists, separating processable from gated documents.""" | |
| all_docs = [] | |
| gated_docs = [] | |
| for docname in app.env.all_docs: | |
| if json_builder.should_generate_json(docname): | |
| all_docs.append(docname) | |
| else: | |
| gated_docs.append(docname) | |
| return all_docs, gated_docs | |
| def _apply_incremental_filtering( | |
| app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None] | |
| ) -> list[str]: | |
| """Apply incremental build filtering if enabled.""" | |
| if not get_setting(app.config, "incremental_build", False): | |
| return all_docs | |
| incremental_docs = [docname for docname in all_docs if json_builder.needs_update(docname)] | |
| skipped_count = len(all_docs) - len(incremental_docs) | |
| if skipped_count > 0: | |
| log_func(f"Incremental build: skipping {skipped_count} unchanged files") | |
| return incremental_docs | |
| def _apply_size_filtering(app: Sphinx, all_docs: list[str], log_func: Callable[[str], None]) -> list[str]: | |
| """Apply file size filtering if enabled.""" | |
| skip_large_files = get_setting(app.config, "skip_large_files", 0) | |
| if skip_large_files <= 0: | |
| return all_docs | |
| filtered_docs = [] | |
| for docname in all_docs: | |
| try: | |
| source_path = app.env.doc2path(docname) | |
| if source_path and source_path.stat().st_size <= skip_large_files: | |
| filtered_docs.append(docname) | |
| else: | |
| log_func(f"Skipping large file: {docname} ({source_path.stat().st_size} bytes)") | |
| except Exception: # noqa: BLE001, PERF203 | |
| filtered_docs.append(docname) # Include if we can't check size | |
| return filtered_docs | |
| def _process_documents( | |
| app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None] | |
| ) -> tuple[int, int]: | |
| """Process documents either in parallel or sequentially.""" | |
| if get_setting(app.config, "parallel", False): | |
| return process_documents_parallel(json_builder, all_docs, app.config, log_func) | |
| else: | |
| return process_documents_sequential(json_builder, all_docs) | |
| def _log_results(log_func: Callable[[str], None], generated_count: int, failed_count: int) -> None: | |
| """Log final processing results.""" | |
| log_func(f"Generated {generated_count} JSON files") | |
| if failed_count > 0: | |
| logger.warning(f"Failed to generate {failed_count} JSON files") | |
| def process_documents_parallel( | |
| json_builder: JSONOutputBuilder, all_docs: list[str], config: Config, log_func: Callable[[str], None] | |
| ) -> tuple[int, int]: | |
| """Process documents in parallel batches.""" | |
| parallel_workers = get_setting(config, "parallel_workers", "auto") | |
| if parallel_workers == "auto": | |
| cpu_count = multiprocessing.cpu_count() or 1 | |
| max_workers = min(cpu_count, 8) # Limit to 8 threads max | |
| else: | |
| max_workers = min(int(parallel_workers), 16) # Cap at 16 for safety | |
| batch_size = get_setting(config, "batch_size", 50) | |
| generated_count = 0 | |
| failed_count = 0 | |
| # Process in batches to control memory usage | |
| for i in range(0, len(all_docs), batch_size): | |
| batch_docs = all_docs[i : i + batch_size] | |
| log_func( | |
| f"Processing batch {i // batch_size + 1}/{(len(all_docs) - 1) // batch_size + 1} ({len(batch_docs)} docs)" | |
| ) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = {} | |
| for docname in batch_docs: | |
| future = executor.submit(process_document, json_builder, docname) | |
| futures[future] = docname | |
| for future, docname in futures.items(): | |
| try: | |
| if future.result(): | |
| generated_count += 1 | |
| else: | |
| failed_count += 1 | |
| except Exception: # noqa: PERF203 | |
| logger.exception(f"Error generating JSON for {docname}") | |
| failed_count += 1 | |
| return generated_count, failed_count | |
| def process_documents_sequential(json_builder: JSONOutputBuilder, all_docs: list[str]) -> tuple[int, int]: | |
| """Process documents sequentially.""" | |
| generated_count = 0 | |
| failed_count = 0 | |
| for docname in all_docs: | |
| try: | |
| json_data = json_builder.build_json_data(docname) | |
| json_builder.write_json_file(docname, json_data) | |
| generated_count += 1 | |
| except Exception: # noqa: PERF203 | |
| logger.exception(f"Error generating JSON for {docname}") | |
| failed_count += 1 | |
| return generated_count, failed_count | |
| def process_document(json_builder: JSONOutputBuilder, docname: str) -> bool: | |
| """Process a single document for parallel execution.""" | |
| try: | |
| json_data = json_builder.build_json_data(docname) | |
| json_builder.write_json_file(docname, json_data) | |
| json_builder.mark_updated(docname) # Mark as processed for incremental builds | |
| except Exception: | |
| logger.exception(f"Error generating JSON for {docname}") | |
| return False | |
| else: | |
| return True | |