INIclaw / docs /_ext /json_output /processing /processor.py
NitishStark's picture
Upload folder using huggingface_hub
0722e92 verified
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Document processing and build orchestration for JSON output extension."""
import multiprocessing
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from sphinx.application import Sphinx
from sphinx.config import Config
from sphinx.util import logging
from ..core.builder import JSONOutputBuilder
from ..utils import get_setting, validate_content_gating_integration
logger = logging.getLogger(__name__)
def on_build_finished(app: Sphinx, exception: Exception) -> None:
"""Generate JSON files after HTML build is complete."""
if exception is not None:
return
verbose = get_setting(app.config, "verbose", False)
log_func = logger.info if verbose else logger.debug
log_func("Generating JSON output files...")
# Setup and validation
json_builder = _setup_json_builder(app)
if not json_builder:
return
# Get and filter documents
all_docs = _filter_documents(app, json_builder, log_func)
# Process documents
generated_count, failed_count = _process_documents(app, json_builder, all_docs, log_func)
# Final logging
_log_results(log_func, generated_count, failed_count)
def _setup_json_builder(app: Sphinx) -> JSONOutputBuilder | None:
"""Setup and validate JSON builder."""
validate_content_gating_integration(app)
try:
return JSONOutputBuilder(app)
except Exception:
logger.exception("Failed to initialize JSONOutputBuilder")
return None
def _filter_documents(app: Sphinx, json_builder: JSONOutputBuilder, log_func: Callable[[str], None]) -> list[str]:
"""Filter documents based on gating, incremental build, and size limits."""
all_docs, gated_docs = _get_initial_documents(app, json_builder)
if gated_docs:
log_func(f"Content gating: excluding {len(gated_docs)} documents from JSON generation")
verbose = get_setting(app.config, "verbose", False)
if verbose and gated_docs:
logger.debug(f"Gated documents: {', '.join(sorted(gated_docs))}")
all_docs = _apply_incremental_filtering(app, json_builder, all_docs, log_func)
return _apply_size_filtering(app, all_docs, log_func)
def _get_initial_documents(app: Sphinx, json_builder: JSONOutputBuilder) -> tuple[list[str], list[str]]:
"""Get initial document lists, separating processable from gated documents."""
all_docs = []
gated_docs = []
for docname in app.env.all_docs:
if json_builder.should_generate_json(docname):
all_docs.append(docname)
else:
gated_docs.append(docname)
return all_docs, gated_docs
def _apply_incremental_filtering(
app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> list[str]:
"""Apply incremental build filtering if enabled."""
if not get_setting(app.config, "incremental_build", False):
return all_docs
incremental_docs = [docname for docname in all_docs if json_builder.needs_update(docname)]
skipped_count = len(all_docs) - len(incremental_docs)
if skipped_count > 0:
log_func(f"Incremental build: skipping {skipped_count} unchanged files")
return incremental_docs
def _apply_size_filtering(app: Sphinx, all_docs: list[str], log_func: Callable[[str], None]) -> list[str]:
"""Apply file size filtering if enabled."""
skip_large_files = get_setting(app.config, "skip_large_files", 0)
if skip_large_files <= 0:
return all_docs
filtered_docs = []
for docname in all_docs:
try:
source_path = app.env.doc2path(docname)
if source_path and source_path.stat().st_size <= skip_large_files:
filtered_docs.append(docname)
else:
log_func(f"Skipping large file: {docname} ({source_path.stat().st_size} bytes)")
except Exception: # noqa: BLE001, PERF203
filtered_docs.append(docname) # Include if we can't check size
return filtered_docs
def _process_documents(
app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> tuple[int, int]:
"""Process documents either in parallel or sequentially."""
if get_setting(app.config, "parallel", False):
return process_documents_parallel(json_builder, all_docs, app.config, log_func)
else:
return process_documents_sequential(json_builder, all_docs)
def _log_results(log_func: Callable[[str], None], generated_count: int, failed_count: int) -> None:
"""Log final processing results."""
log_func(f"Generated {generated_count} JSON files")
if failed_count > 0:
logger.warning(f"Failed to generate {failed_count} JSON files")
def process_documents_parallel(
json_builder: JSONOutputBuilder, all_docs: list[str], config: Config, log_func: Callable[[str], None]
) -> tuple[int, int]:
"""Process documents in parallel batches."""
parallel_workers = get_setting(config, "parallel_workers", "auto")
if parallel_workers == "auto":
cpu_count = multiprocessing.cpu_count() or 1
max_workers = min(cpu_count, 8) # Limit to 8 threads max
else:
max_workers = min(int(parallel_workers), 16) # Cap at 16 for safety
batch_size = get_setting(config, "batch_size", 50)
generated_count = 0
failed_count = 0
# Process in batches to control memory usage
for i in range(0, len(all_docs), batch_size):
batch_docs = all_docs[i : i + batch_size]
log_func(
f"Processing batch {i // batch_size + 1}/{(len(all_docs) - 1) // batch_size + 1} ({len(batch_docs)} docs)"
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for docname in batch_docs:
future = executor.submit(process_document, json_builder, docname)
futures[future] = docname
for future, docname in futures.items():
try:
if future.result():
generated_count += 1
else:
failed_count += 1
except Exception: # noqa: PERF203
logger.exception(f"Error generating JSON for {docname}")
failed_count += 1
return generated_count, failed_count
def process_documents_sequential(json_builder: JSONOutputBuilder, all_docs: list[str]) -> tuple[int, int]:
"""Process documents sequentially."""
generated_count = 0
failed_count = 0
for docname in all_docs:
try:
json_data = json_builder.build_json_data(docname)
json_builder.write_json_file(docname, json_data)
generated_count += 1
except Exception: # noqa: PERF203
logger.exception(f"Error generating JSON for {docname}")
failed_count += 1
return generated_count, failed_count
def process_document(json_builder: JSONOutputBuilder, docname: str) -> bool:
"""Process a single document for parallel execution."""
try:
json_data = json_builder.build_json_data(docname)
json_builder.write_json_file(docname, json_data)
json_builder.mark_updated(docname) # Mark as processed for incremental builds
except Exception:
logger.exception(f"Error generating JSON for {docname}")
return False
else:
return True