Spaces:
Configuration error
Configuration error
File size: 8,106 Bytes
0722e92 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | # SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Document processing and build orchestration for JSON output extension."""
import multiprocessing
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from sphinx.application import Sphinx
from sphinx.config import Config
from sphinx.util import logging
from ..core.builder import JSONOutputBuilder
from ..utils import get_setting, validate_content_gating_integration
logger = logging.getLogger(__name__)
def on_build_finished(app: Sphinx, exception: Exception) -> None:
"""Generate JSON files after HTML build is complete."""
if exception is not None:
return
verbose = get_setting(app.config, "verbose", False)
log_func = logger.info if verbose else logger.debug
log_func("Generating JSON output files...")
# Setup and validation
json_builder = _setup_json_builder(app)
if not json_builder:
return
# Get and filter documents
all_docs = _filter_documents(app, json_builder, log_func)
# Process documents
generated_count, failed_count = _process_documents(app, json_builder, all_docs, log_func)
# Final logging
_log_results(log_func, generated_count, failed_count)
def _setup_json_builder(app: Sphinx) -> JSONOutputBuilder | None:
"""Setup and validate JSON builder."""
validate_content_gating_integration(app)
try:
return JSONOutputBuilder(app)
except Exception:
logger.exception("Failed to initialize JSONOutputBuilder")
return None
def _filter_documents(app: Sphinx, json_builder: JSONOutputBuilder, log_func: Callable[[str], None]) -> list[str]:
"""Filter documents based on gating, incremental build, and size limits."""
all_docs, gated_docs = _get_initial_documents(app, json_builder)
if gated_docs:
log_func(f"Content gating: excluding {len(gated_docs)} documents from JSON generation")
verbose = get_setting(app.config, "verbose", False)
if verbose and gated_docs:
logger.debug(f"Gated documents: {', '.join(sorted(gated_docs))}")
all_docs = _apply_incremental_filtering(app, json_builder, all_docs, log_func)
return _apply_size_filtering(app, all_docs, log_func)
def _get_initial_documents(app: Sphinx, json_builder: JSONOutputBuilder) -> tuple[list[str], list[str]]:
"""Get initial document lists, separating processable from gated documents."""
all_docs = []
gated_docs = []
for docname in app.env.all_docs:
if json_builder.should_generate_json(docname):
all_docs.append(docname)
else:
gated_docs.append(docname)
return all_docs, gated_docs
def _apply_incremental_filtering(
app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> list[str]:
"""Apply incremental build filtering if enabled."""
if not get_setting(app.config, "incremental_build", False):
return all_docs
incremental_docs = [docname for docname in all_docs if json_builder.needs_update(docname)]
skipped_count = len(all_docs) - len(incremental_docs)
if skipped_count > 0:
log_func(f"Incremental build: skipping {skipped_count} unchanged files")
return incremental_docs
def _apply_size_filtering(app: Sphinx, all_docs: list[str], log_func: Callable[[str], None]) -> list[str]:
"""Apply file size filtering if enabled."""
skip_large_files = get_setting(app.config, "skip_large_files", 0)
if skip_large_files <= 0:
return all_docs
filtered_docs = []
for docname in all_docs:
try:
source_path = app.env.doc2path(docname)
if source_path and source_path.stat().st_size <= skip_large_files:
filtered_docs.append(docname)
else:
log_func(f"Skipping large file: {docname} ({source_path.stat().st_size} bytes)")
except Exception: # noqa: BLE001, PERF203
filtered_docs.append(docname) # Include if we can't check size
return filtered_docs
def _process_documents(
app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> tuple[int, int]:
"""Process documents either in parallel or sequentially."""
if get_setting(app.config, "parallel", False):
return process_documents_parallel(json_builder, all_docs, app.config, log_func)
else:
return process_documents_sequential(json_builder, all_docs)
def _log_results(log_func: Callable[[str], None], generated_count: int, failed_count: int) -> None:
"""Log final processing results."""
log_func(f"Generated {generated_count} JSON files")
if failed_count > 0:
logger.warning(f"Failed to generate {failed_count} JSON files")
def process_documents_parallel(
json_builder: JSONOutputBuilder, all_docs: list[str], config: Config, log_func: Callable[[str], None]
) -> tuple[int, int]:
"""Process documents in parallel batches."""
parallel_workers = get_setting(config, "parallel_workers", "auto")
if parallel_workers == "auto":
cpu_count = multiprocessing.cpu_count() or 1
max_workers = min(cpu_count, 8) # Limit to 8 threads max
else:
max_workers = min(int(parallel_workers), 16) # Cap at 16 for safety
batch_size = get_setting(config, "batch_size", 50)
generated_count = 0
failed_count = 0
# Process in batches to control memory usage
for i in range(0, len(all_docs), batch_size):
batch_docs = all_docs[i : i + batch_size]
log_func(
f"Processing batch {i // batch_size + 1}/{(len(all_docs) - 1) // batch_size + 1} ({len(batch_docs)} docs)"
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for docname in batch_docs:
future = executor.submit(process_document, json_builder, docname)
futures[future] = docname
for future, docname in futures.items():
try:
if future.result():
generated_count += 1
else:
failed_count += 1
except Exception: # noqa: PERF203
logger.exception(f"Error generating JSON for {docname}")
failed_count += 1
return generated_count, failed_count
def process_documents_sequential(json_builder: JSONOutputBuilder, all_docs: list[str]) -> tuple[int, int]:
"""Process documents sequentially."""
generated_count = 0
failed_count = 0
for docname in all_docs:
try:
json_data = json_builder.build_json_data(docname)
json_builder.write_json_file(docname, json_data)
generated_count += 1
except Exception: # noqa: PERF203
logger.exception(f"Error generating JSON for {docname}")
failed_count += 1
return generated_count, failed_count
def process_document(json_builder: JSONOutputBuilder, docname: str) -> bool:
"""Process a single document for parallel execution."""
try:
json_data = json_builder.build_json_data(docname)
json_builder.write_json_file(docname, json_data)
json_builder.mark_updated(docname) # Mark as processed for incremental builds
except Exception:
logger.exception(f"Error generating JSON for {docname}")
return False
else:
return True
|