File size: 8,106 Bytes
0722e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Document processing and build orchestration for JSON output extension."""

import multiprocessing
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor

from sphinx.application import Sphinx
from sphinx.config import Config
from sphinx.util import logging

from ..core.builder import JSONOutputBuilder
from ..utils import get_setting, validate_content_gating_integration

logger = logging.getLogger(__name__)


def on_build_finished(app: Sphinx, exception: Exception) -> None:
    """Generate JSON files after HTML build is complete."""
    if exception is not None:
        return

    verbose = get_setting(app.config, "verbose", False)
    log_func = logger.info if verbose else logger.debug
    log_func("Generating JSON output files...")

    # Setup and validation
    json_builder = _setup_json_builder(app)
    if not json_builder:
        return

    # Get and filter documents
    all_docs = _filter_documents(app, json_builder, log_func)

    # Process documents
    generated_count, failed_count = _process_documents(app, json_builder, all_docs, log_func)

    # Final logging
    _log_results(log_func, generated_count, failed_count)


def _setup_json_builder(app: Sphinx) -> JSONOutputBuilder | None:
    """Setup and validate JSON builder."""
    validate_content_gating_integration(app)

    try:
        return JSONOutputBuilder(app)
    except Exception:
        logger.exception("Failed to initialize JSONOutputBuilder")
        return None


def _filter_documents(app: Sphinx, json_builder: JSONOutputBuilder, log_func: Callable[[str], None]) -> list[str]:
    """Filter documents based on gating, incremental build, and size limits."""
    all_docs, gated_docs = _get_initial_documents(app, json_builder)

    if gated_docs:
        log_func(f"Content gating: excluding {len(gated_docs)} documents from JSON generation")
        verbose = get_setting(app.config, "verbose", False)
        if verbose and gated_docs:
            logger.debug(f"Gated documents: {', '.join(sorted(gated_docs))}")

    all_docs = _apply_incremental_filtering(app, json_builder, all_docs, log_func)
    return _apply_size_filtering(app, all_docs, log_func)


def _get_initial_documents(app: Sphinx, json_builder: JSONOutputBuilder) -> tuple[list[str], list[str]]:
    """Get initial document lists, separating processable from gated documents."""
    all_docs = []
    gated_docs = []

    for docname in app.env.all_docs:
        if json_builder.should_generate_json(docname):
            all_docs.append(docname)
        else:
            gated_docs.append(docname)

    return all_docs, gated_docs


def _apply_incremental_filtering(
    app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> list[str]:
    """Apply incremental build filtering if enabled."""
    if not get_setting(app.config, "incremental_build", False):
        return all_docs

    incremental_docs = [docname for docname in all_docs if json_builder.needs_update(docname)]
    skipped_count = len(all_docs) - len(incremental_docs)
    if skipped_count > 0:
        log_func(f"Incremental build: skipping {skipped_count} unchanged files")
    return incremental_docs


def _apply_size_filtering(app: Sphinx, all_docs: list[str], log_func: Callable[[str], None]) -> list[str]:
    """Apply file size filtering if enabled."""
    skip_large_files = get_setting(app.config, "skip_large_files", 0)
    if skip_large_files <= 0:
        return all_docs

    filtered_docs = []
    for docname in all_docs:
        try:
            source_path = app.env.doc2path(docname)
            if source_path and source_path.stat().st_size <= skip_large_files:
                filtered_docs.append(docname)
            else:
                log_func(f"Skipping large file: {docname} ({source_path.stat().st_size} bytes)")
        except Exception:  # noqa: BLE001, PERF203
            filtered_docs.append(docname)  # Include if we can't check size
    return filtered_docs


def _process_documents(
    app: Sphinx, json_builder: JSONOutputBuilder, all_docs: list[str], log_func: Callable[[str], None]
) -> tuple[int, int]:
    """Process documents either in parallel or sequentially."""
    if get_setting(app.config, "parallel", False):
        return process_documents_parallel(json_builder, all_docs, app.config, log_func)
    else:
        return process_documents_sequential(json_builder, all_docs)


def _log_results(log_func: Callable[[str], None], generated_count: int, failed_count: int) -> None:
    """Log final processing results."""
    log_func(f"Generated {generated_count} JSON files")
    if failed_count > 0:
        logger.warning(f"Failed to generate {failed_count} JSON files")


def process_documents_parallel(
    json_builder: JSONOutputBuilder, all_docs: list[str], config: Config, log_func: Callable[[str], None]
) -> tuple[int, int]:
    """Process documents in parallel batches."""
    parallel_workers = get_setting(config, "parallel_workers", "auto")
    if parallel_workers == "auto":
        cpu_count = multiprocessing.cpu_count() or 1
        max_workers = min(cpu_count, 8)  # Limit to 8 threads max
    else:
        max_workers = min(int(parallel_workers), 16)  # Cap at 16 for safety

    batch_size = get_setting(config, "batch_size", 50)

    generated_count = 0
    failed_count = 0

    # Process in batches to control memory usage
    for i in range(0, len(all_docs), batch_size):
        batch_docs = all_docs[i : i + batch_size]
        log_func(
            f"Processing batch {i // batch_size + 1}/{(len(all_docs) - 1) // batch_size + 1} ({len(batch_docs)} docs)"
        )

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {}
            for docname in batch_docs:
                future = executor.submit(process_document, json_builder, docname)
                futures[future] = docname

            for future, docname in futures.items():
                try:
                    if future.result():
                        generated_count += 1
                    else:
                        failed_count += 1
                except Exception:  # noqa: PERF203
                    logger.exception(f"Error generating JSON for {docname}")
                    failed_count += 1

    return generated_count, failed_count


def process_documents_sequential(json_builder: JSONOutputBuilder, all_docs: list[str]) -> tuple[int, int]:
    """Process documents sequentially."""
    generated_count = 0
    failed_count = 0

    for docname in all_docs:
        try:
            json_data = json_builder.build_json_data(docname)
            json_builder.write_json_file(docname, json_data)
            generated_count += 1
        except Exception:  # noqa: PERF203
            logger.exception(f"Error generating JSON for {docname}")
            failed_count += 1

    return generated_count, failed_count


def process_document(json_builder: JSONOutputBuilder, docname: str) -> bool:
    """Process a single document for parallel execution."""
    try:
        json_data = json_builder.build_json_data(docname)
        json_builder.write_json_file(docname, json_data)
        json_builder.mark_updated(docname)  # Mark as processed for incremental builds
    except Exception:
        logger.exception(f"Error generating JSON for {docname}")
        return False
    else:
        return True