Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Full rebuild CLI script for the RAG chatbot build pipeline. | |
| This script orchestrates the complete build pipeline from raw PDF files | |
| through to published HuggingFace artifacts. It is the master build script | |
| that chains together extraction, chunking, embedding, and publishing. | |
| Pipeline Steps: | |
| 1. Validate Input - Check data/raw/ exists and contains PDF files | |
| 2. Clear Artifacts - Remove old artifacts (data/processed/, chunks/, embeddings/) | |
| 3. Extract - PDF to Markdown extraction using pymupdf4llm | |
| 4. Chunk - Structure-aware chunking with heading inheritance | |
| 5. Embed & Index - BGE embedding + FAISS/BM25 index building | |
| 6. Publish - Upload artifacts to HuggingFace (if --publish) | |
| Features: | |
| - Deterministic builds: Same inputs produce same outputs (sorted file lists) | |
| - Fail-fast behavior: Exits immediately on critical errors | |
| - Verbose/quiet modes: Control output verbosity | |
| - Progress reporting: Rich progress bars for each stage | |
| - Statistics summary: Total time, artifact sizes, throughput metrics | |
| Exit Codes: | |
| 0: Success - Full pipeline completed successfully | |
| 1: Partial failure - Some steps completed but others failed | |
| 2: Total failure - Pipeline could not start or failed completely | |
| Example Usage: | |
| # Basic rebuild (no publish) | |
| poetry run python scripts/rebuild.py data/raw/ | |
| # Full rebuild with HuggingFace publishing | |
| poetry run python scripts/rebuild.py data/raw/ --publish | |
| # Force rebuild even if artifacts exist | |
| poetry run python scripts/rebuild.py data/raw/ --force | |
| # Custom output directory | |
| poetry run python scripts/rebuild.py data/raw/ --output-dir /tmp/build/ | |
| # Verbose mode (show detailed progress) | |
| poetry run python scripts/rebuild.py data/raw/ -v | |
| # Quiet mode (suppress progress, show only summary) | |
| poetry run python scripts/rebuild.py data/raw/ -q | |
| Note: | |
| ---- | |
| This script imports and calls functions from the individual pipeline | |
| scripts (extract.py, chunk.py, embed.py) to ensure consistency. | |
| Heavy dependencies are lazily loaded to ensure fast CLI startup. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import importlib.util | |
| import shutil | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from types import ModuleType | |
| from typing import TYPE_CHECKING | |
| # ============================================================================= | |
| # Environment Variable Loading | |
| # ============================================================================= | |
| # Load environment variables from .env file at script startup. | |
| # This ensures HF_TOKEN and other secrets are available before they're needed. | |
| # The .env file should be in the project root directory. | |
| # ============================================================================= | |
| from dotenv import load_dotenv | |
| # Find the project root (parent of scripts/ directory) and load .env from there | |
| _PROJECT_ROOT = Path(__file__).parent.parent | |
| _ENV_FILE = _PROJECT_ROOT / ".env" | |
| if _ENV_FILE.exists(): | |
| load_dotenv(_ENV_FILE) | |
| # ============================================================================= | |
| # Type Checking Imports | |
| # ============================================================================= | |
| # Heavy dependencies are imported lazily to ensure fast CLI startup. | |
| # Type checkers still see the types for proper type checking. | |
| # ============================================================================= | |
| if TYPE_CHECKING: | |
| from rich.console import Console | |
| from scripts.chunk import ChunkingStatistics | |
| from scripts.extract import ExtractionStatistics | |
| # ============================================================================= | |
| # Module Exports | |
| # ============================================================================= | |
| __all__: list[str] = [ | |
| "RebuildStatistics", | |
| "parse_args", | |
| "run_rebuild", | |
| "main", | |
| ] | |
| # ============================================================================= | |
| # Constants | |
| # ============================================================================= | |
| # Exit codes for the CLI script | |
| EXIT_SUCCESS = 0 # All pipeline steps completed successfully | |
| EXIT_PARTIAL_FAILURE = 1 # Some steps failed but others succeeded | |
| EXIT_TOTAL_FAILURE = 2 # Pipeline could not run or failed completely | |
| # Default HuggingFace repository for artifacts | |
| DEFAULT_HF_REPO = "sadickam/pytherm_index" | |
| # Pipeline step names for progress display | |
| STEP_VALIDATE = "Validating input" | |
| STEP_CLEAR = "Clearing artifacts" | |
| STEP_EXTRACT = "Extracting PDFs" | |
| STEP_CHUNK = "Chunking documents" | |
| STEP_EMBED = "Generating embeddings" | |
| STEP_INDEX = "Building indexes" | |
| STEP_PUBLISH = "Publishing to HuggingFace" | |
| # Scripts directory path for dynamic imports | |
| SCRIPTS_DIR = Path(__file__).parent | |
| # ============================================================================= | |
| # Module Loading Helper | |
| # ============================================================================= | |
| def _load_script_module(script_name: str) -> ModuleType: | |
| """Dynamically load a script module from the scripts directory. | |
| This function uses importlib to load Python modules from file paths, | |
| avoiding the need for the scripts directory to be a package. | |
| Args: | |
| ---- | |
| script_name : str | |
| Name of the script file without .py extension (e.g., "extract"). | |
| Returns: | |
| ------- | |
| ModuleType | |
| The loaded module object. | |
| Raises: | |
| ------ | |
| ImportError: If the module cannot be loaded. | |
| Example: | |
| ------- | |
| >>> extract_module = _load_script_module("extract") | |
| >>> extract_module.run_extraction(...) | |
| """ | |
| script_path = SCRIPTS_DIR / f"{script_name}.py" | |
| if not script_path.exists(): | |
| msg = f"Script not found: {script_path}" | |
| raise ImportError(msg) | |
| spec = importlib.util.spec_from_file_location(script_name, script_path) | |
| if spec is None or spec.loader is None: | |
| msg = f"Cannot load spec for: {script_path}" | |
| raise ImportError(msg) | |
| module = importlib.util.module_from_spec(spec) | |
| sys.modules[script_name] = module | |
| spec.loader.exec_module(module) | |
| return module | |
| # ============================================================================= | |
| # Data Classes | |
| # ============================================================================= | |
| class RebuildStatistics: | |
| """Statistics from a full rebuild pipeline run. | |
| This dataclass tracks metrics from the complete build pipeline, | |
| including timing for each stage and artifact sizes. | |
| Attributes: | |
| ---------- | |
| total_pdfs : int | |
| Total number of PDF files found in the input directory. | |
| Must be non-negative. | |
| total_pages : int | |
| Total number of pages extracted across all PDFs. | |
| Must be non-negative. | |
| total_chunks : int | |
| Total number of chunks created from extracted documents. | |
| Must be non-negative. | |
| extraction_time : float | |
| Time spent on PDF extraction in seconds. | |
| Must be non-negative. | |
| chunking_time : float | |
| Time spent on document chunking in seconds. | |
| Must be non-negative. | |
| embedding_time : float | |
| Time spent on embedding generation and index building in seconds. | |
| Must be non-negative. | |
| publish_time : float | |
| Time spent on HuggingFace publishing in seconds. | |
| Must be non-negative (0.0 if not publishing). | |
| total_time : float | |
| Total elapsed time for the entire pipeline in seconds. | |
| Must be non-negative. | |
| embeddings_size_mb : float | |
| Size of the embeddings.parquet file in megabytes. | |
| Must be non-negative. | |
| faiss_index_size_mb : float | |
| Size of the FAISS index file in megabytes. | |
| Must be non-negative. | |
| bm25_index_size_mb : float | |
| Size of the BM25 index file in megabytes. | |
| Must be non-negative. | |
| dataset_url : str | None | |
| URL of the published HuggingFace dataset, or None if not published. | |
| errors : list[str] | |
| List of error messages encountered during the pipeline. | |
| Empty if no errors occurred. | |
| Example: | |
| ------- | |
| >>> stats = RebuildStatistics( | |
| ... total_pdfs=5, | |
| ... total_pages=42, | |
| ... total_chunks=150, | |
| ... extraction_time=12.3, | |
| ... chunking_time=2.1, | |
| ... embedding_time=8.5, | |
| ... publish_time=5.0, | |
| ... total_time=28.0, | |
| ... embeddings_size_mb=2.1, | |
| ... faiss_index_size_mb=1.2, | |
| ... bm25_index_size_mb=0.3, | |
| ... dataset_url="https://huggingface.co/datasets/sadickam/pytherm_index", | |
| ... errors=[], | |
| ... ) | |
| >>> stats.total_pdfs | |
| 5 | |
| """ | |
| total_pdfs: int = 0 | |
| total_pages: int = 0 | |
| total_chunks: int = 0 | |
| extraction_time: float = 0.0 | |
| chunking_time: float = 0.0 | |
| embedding_time: float = 0.0 | |
| publish_time: float = 0.0 | |
| total_time: float = 0.0 | |
| embeddings_size_mb: float = 0.0 | |
| faiss_index_size_mb: float = 0.0 | |
| bm25_index_size_mb: float = 0.0 | |
| dataset_url: str | None = None | |
| errors: list[str] = field(default_factory=list) | |
| def __post_init__(self) -> None: | |
| """Validate statistics values after initialization. | |
| Raises | |
| ------ | |
| ValueError: If any numeric value is negative. | |
| """ | |
| # Validate all counts are non-negative | |
| if self.total_pdfs < 0: | |
| msg = f"total_pdfs must be non-negative, got {self.total_pdfs}" | |
| raise ValueError(msg) | |
| if self.total_pages < 0: | |
| msg = f"total_pages must be non-negative, got {self.total_pages}" | |
| raise ValueError(msg) | |
| if self.total_chunks < 0: | |
| msg = f"total_chunks must be non-negative, got {self.total_chunks}" | |
| raise ValueError(msg) | |
| # Validate all times are non-negative | |
| if self.extraction_time < 0: | |
| msg = f"extraction_time must be non-negative, got {self.extraction_time}" | |
| raise ValueError(msg) | |
| if self.chunking_time < 0: | |
| msg = f"chunking_time must be non-negative, got {self.chunking_time}" | |
| raise ValueError(msg) | |
| if self.embedding_time < 0: | |
| msg = f"embedding_time must be non-negative, got {self.embedding_time}" | |
| raise ValueError(msg) | |
| if self.publish_time < 0: | |
| msg = f"publish_time must be non-negative, got {self.publish_time}" | |
| raise ValueError(msg) | |
| if self.total_time < 0: | |
| msg = f"total_time must be non-negative, got {self.total_time}" | |
| raise ValueError(msg) | |
| # Validate all sizes are non-negative | |
| if self.embeddings_size_mb < 0: | |
| msg = ( | |
| f"embeddings_size_mb must be non-negative, " | |
| f"got {self.embeddings_size_mb}" | |
| ) | |
| raise ValueError(msg) | |
| if self.faiss_index_size_mb < 0: | |
| msg = ( | |
| f"faiss_index_size_mb must be non-negative, " | |
| f"got {self.faiss_index_size_mb}" | |
| ) | |
| raise ValueError(msg) | |
| if self.bm25_index_size_mb < 0: | |
| msg = ( | |
| f"bm25_index_size_mb must be non-negative, " | |
| f"got {self.bm25_index_size_mb}" | |
| ) | |
| raise ValueError(msg) | |
| def success(self) -> bool: | |
| """Check if the rebuild completed successfully. | |
| Returns | |
| ------- | |
| bool | |
| True if no errors occurred, False otherwise. | |
| """ | |
| return len(self.errors) == 0 | |
| # ============================================================================= | |
| # Argument Parsing | |
| # ============================================================================= | |
| def parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| """Parse command-line arguments for the rebuild script. | |
| This function sets up the argument parser with all supported options | |
| and returns the parsed arguments. It handles validation of mutually | |
| exclusive flags (--verbose and --quiet cannot be used together). | |
| Args: | |
| ---- | |
| argv : list[str] | None, optional | |
| Command-line arguments to parse. If None, uses sys.argv[1:]. | |
| This parameter enables testing without modifying sys.argv. | |
| Returns: | |
| ------- | |
| argparse.Namespace | |
| Parsed arguments with the following attributes: | |
| - input_dir: Path - Directory containing raw PDF files | |
| - output_dir: Path - Base output directory (default: data/) | |
| - publish: bool - Whether to publish to HuggingFace | |
| - force: bool - Whether to force rebuild even if artifacts exist | |
| - verbose: bool - Whether to show detailed output | |
| - quiet: bool - Whether to suppress progress output | |
| Raises: | |
| ------ | |
| SystemExit | |
| If required arguments are missing, unknown arguments are provided, | |
| or --verbose and --quiet are both specified. | |
| Example: | |
| ------- | |
| >>> args = parse_args(["data/raw/", "--publish"]) | |
| >>> args.input_dir | |
| PosixPath('data/raw') | |
| >>> args.publish | |
| True | |
| """ | |
| # ------------------------------------------------------------------------- | |
| # Create the argument parser with program description | |
| # ------------------------------------------------------------------------- | |
| parser = argparse.ArgumentParser( | |
| prog="rebuild.py", | |
| description=( | |
| "Full rebuild pipeline for the RAG chatbot. " | |
| "Processes PDF files through extraction, chunking, embedding, " | |
| "and optionally publishes artifacts to HuggingFace." | |
| ), | |
| epilog=( | |
| "Examples:\n" | |
| " %(prog)s data/raw/ # Basic rebuild\n" | |
| " %(prog)s data/raw/ --publish # Rebuild and publish\n" | |
| " %(prog)s data/raw/ --force # Force full rebuild\n" | |
| " %(prog)s data/raw/ -v # Verbose output\n" | |
| ), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Positional Arguments | |
| # ------------------------------------------------------------------------- | |
| parser.add_argument( | |
| "input_dir", | |
| type=Path, | |
| help="Directory containing raw PDF files to process", | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Optional Arguments | |
| # ------------------------------------------------------------------------- | |
| parser.add_argument( | |
| "--output-dir", | |
| type=Path, | |
| default=Path("data"), | |
| help="Base output directory for artifacts (default: data/)", | |
| ) | |
| parser.add_argument( | |
| "--publish", | |
| action="store_true", | |
| default=False, | |
| help="Publish artifacts to HuggingFace after build", | |
| ) | |
| parser.add_argument( | |
| "--force", | |
| "-f", | |
| action="store_true", | |
| default=False, | |
| help="Force rebuild even if artifacts exist (clears all artifacts)", | |
| ) | |
| # Create mutually exclusive group for verbose/quiet | |
| # These flags cannot be used together | |
| output_group = parser.add_mutually_exclusive_group() | |
| output_group.add_argument( | |
| "--verbose", | |
| "-v", | |
| action="store_true", | |
| default=False, | |
| help="Show detailed output including file names being processed", | |
| ) | |
| output_group.add_argument( | |
| "--quiet", | |
| "-q", | |
| action="store_true", | |
| default=False, | |
| help="Suppress progress output (still shows summary)", | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Parse and return arguments | |
| # ------------------------------------------------------------------------- | |
| return parser.parse_args(argv) | |
| # ============================================================================= | |
| # Helper Functions | |
| # ============================================================================= | |
| def _get_console() -> Console: | |
| """Lazily load and return a Rich Console instance. | |
| Returns | |
| ------- | |
| Console | |
| A Rich Console instance for styled output. | |
| """ | |
| from rich.console import Console | |
| return Console() | |
| def _get_file_size_mb(path: Path) -> float: | |
| """Get the size of a file in megabytes. | |
| Args: | |
| ---- | |
| path : Path | |
| Path to the file. | |
| Returns: | |
| ------- | |
| float | |
| File size in megabytes, or 0.0 if file doesn't exist. | |
| """ | |
| if not path.exists(): | |
| return 0.0 | |
| return path.stat().st_size / (1024 * 1024) | |
| def _clear_directory(directory: Path, verbose: bool, quiet: bool) -> bool: | |
| """Clear all contents of a directory. | |
| Removes all files and subdirectories within the specified directory. | |
| The directory itself is preserved (not deleted). | |
| Args: | |
| ---- | |
| directory : Path | |
| The directory to clear. | |
| verbose : bool | |
| If True, print detailed information about what's being deleted. | |
| quiet : bool | |
| If True, suppress output. | |
| Returns: | |
| ------- | |
| bool | |
| True if directory was cleared successfully, False on error. | |
| """ | |
| if not directory.exists(): | |
| return True | |
| try: | |
| # Remove all contents but keep the directory | |
| for item in directory.iterdir(): | |
| if item.is_file(): | |
| if verbose: | |
| print(f" Removing file: {item.name}") | |
| item.unlink() | |
| elif item.is_dir(): | |
| if verbose: | |
| print(f" Removing directory: {item.name}/") | |
| shutil.rmtree(item) | |
| except PermissionError as e: | |
| if not quiet: | |
| print( | |
| f"Error: Permission denied clearing {directory}: {e}", | |
| file=sys.stderr, | |
| ) | |
| return False | |
| except OSError as e: | |
| if not quiet: | |
| print(f"Error: Failed to clear {directory}: {e}", file=sys.stderr) | |
| return False | |
| else: | |
| return True | |
| def _validate_input( | |
| input_dir: Path, | |
| quiet: bool, | |
| ) -> tuple[bool, list[Path], list[str]]: | |
| """Validate the input directory and find PDF files. | |
| Checks that the input directory exists, is a directory, and contains | |
| at least one PDF file. Returns validation status and list of PDF files. | |
| Args: | |
| ---- | |
| input_dir : Path | |
| The input directory to validate. | |
| quiet : bool | |
| If True, suppress output messages. | |
| Returns: | |
| ------- | |
| tuple[bool, list[Path], list[str]] | |
| A tuple containing: | |
| - bool: True if validation passed, False otherwise | |
| - list[Path]: List of PDF file paths (sorted for determinism) | |
| - list[str]: List of error messages (empty if validation passed) | |
| """ | |
| errors: list[str] = [] | |
| # Check if input directory exists | |
| if not input_dir.exists(): | |
| msg = f"Input directory does not exist: {input_dir}" | |
| if not quiet: | |
| print(f"Error: {msg}", file=sys.stderr) | |
| errors.append(msg) | |
| return False, [], errors | |
| # Check if input is a directory | |
| if not input_dir.is_dir(): | |
| msg = f"Input path is not a directory: {input_dir}" | |
| if not quiet: | |
| print(f"Error: {msg}", file=sys.stderr) | |
| errors.append(msg) | |
| return False, [], errors | |
| # Find PDF files (sorted for deterministic processing) | |
| pdf_files = sorted(input_dir.glob("*.pdf")) | |
| # Check if any PDF files were found | |
| if not pdf_files: | |
| msg = f"No PDF files found in {input_dir}" | |
| if not quiet: | |
| print(f"Error: {msg}", file=sys.stderr) | |
| errors.append(msg) | |
| return False, [], errors | |
| return True, pdf_files, [] | |
| def _print_header(console: Console, quiet: bool) -> None: | |
| """Print the pipeline header banner. | |
| Args: | |
| ---- | |
| console : Console | |
| Rich console for output. | |
| quiet : bool | |
| If True, suppress output. | |
| """ | |
| if quiet: | |
| return | |
| console.print() | |
| console.print("[bold cyan]Full Rebuild Pipeline[/bold cyan]") | |
| console.print("[cyan]" + "=" * 40 + "[/cyan]") | |
| console.print() | |
| def _print_step( | |
| console: Console, | |
| step_num: int, | |
| total_steps: int, | |
| description: str, | |
| quiet: bool, | |
| ) -> None: | |
| """Print a pipeline step header. | |
| Args: | |
| ---- | |
| console : Console | |
| Rich console for output. | |
| step_num : int | |
| Current step number (1-indexed). | |
| total_steps : int | |
| Total number of steps. | |
| description : str | |
| Description of the step. | |
| quiet : bool | |
| If True, suppress output. | |
| """ | |
| if quiet: | |
| return | |
| console.print(f"[bold cyan][{step_num}/{total_steps}][/bold cyan] {description}...") | |
| def _print_step_result( | |
| console: Console, | |
| detail: str, | |
| elapsed: float, | |
| quiet: bool, | |
| ) -> None: | |
| """Print the result of a pipeline step. | |
| Args: | |
| ---- | |
| console : Console | |
| Rich console for output. | |
| detail : str | |
| Detail message about what was accomplished. | |
| elapsed : float | |
| Time taken for the step in seconds. | |
| quiet : bool | |
| If True, suppress output. | |
| """ | |
| if quiet: | |
| return | |
| console.print(f" {detail}") | |
| console.print(f" Time: {elapsed:.1f}s") | |
| console.print() | |
| def _print_summary( | |
| console: Console, | |
| stats: RebuildStatistics, | |
| ) -> None: | |
| """Print the final rebuild summary. | |
| Displays a formatted summary of the rebuild run including counts, | |
| timing information, and artifact sizes. | |
| Args: | |
| ---- | |
| console : Console | |
| Rich console for output. | |
| stats : RebuildStatistics | |
| Statistics from the rebuild run. | |
| """ | |
| # Import Rich table lazily | |
| from rich.table import Table | |
| console.print() | |
| # Print success or failure banner | |
| if stats.success: | |
| console.print("[bold green]Rebuild Complete[/bold green]") | |
| else: | |
| console.print("[bold red]Rebuild Failed[/bold red]") | |
| console.print("[cyan]" + "=" * 40 + "[/cyan]") | |
| # Create statistics table | |
| table = Table(show_header=False, box=None, padding=(0, 2)) | |
| table.add_column("Metric", style="dim", width=20) | |
| table.add_column("Value", justify="right") | |
| # Input/Output statistics | |
| table.add_row("Input PDFs:", f"{stats.total_pdfs}") | |
| table.add_row("Total Pages:", f"{stats.total_pages}") | |
| table.add_row("Output Chunks:", f"{stats.total_chunks}") | |
| table.add_row("", "") | |
| # Timing breakdown | |
| table.add_row("Extraction Time:", f"{stats.extraction_time:.1f}s") | |
| table.add_row("Chunking Time:", f"{stats.chunking_time:.1f}s") | |
| table.add_row("Embedding Time:", f"{stats.embedding_time:.1f}s") | |
| if stats.publish_time > 0: | |
| table.add_row("Publish Time:", f"{stats.publish_time:.1f}s") | |
| table.add_row("[bold]Total Time:[/bold]", f"[bold]{stats.total_time:.1f}s[/bold]") | |
| table.add_row("", "") | |
| # Artifact sizes | |
| if stats.embeddings_size_mb > 0 or stats.faiss_index_size_mb > 0: | |
| table.add_row("Embeddings:", f"{stats.embeddings_size_mb:.2f} MB") | |
| table.add_row("FAISS Index:", f"{stats.faiss_index_size_mb:.2f} MB") | |
| table.add_row("BM25 Index:", f"{stats.bm25_index_size_mb:.2f} MB") | |
| # Dataset URL if published | |
| if stats.dataset_url: | |
| table.add_row("", "") | |
| table.add_row("Published URL:", stats.dataset_url) | |
| console.print(table) | |
| console.print("[cyan]" + "=" * 40 + "[/cyan]") | |
| console.print() | |
| # Print any errors | |
| if stats.errors: | |
| console.print("[bold red]Errors encountered:[/bold red]") | |
| for error in stats.errors: | |
| console.print(f" - {error}") | |
| console.print() | |
| # ============================================================================= | |
| # Pipeline Step Functions | |
| # ============================================================================= | |
| def _run_extraction( | |
| input_dir: Path, | |
| output_dir: Path, | |
| force: bool, | |
| verbose: bool, | |
| quiet: bool, | |
| ) -> tuple[ExtractionStatistics | None, str | None]: | |
| """Run the PDF extraction step. | |
| Args: | |
| ---- | |
| input_dir : Path | |
| Directory containing PDF files. | |
| output_dir : Path | |
| Directory for markdown output. | |
| force : bool | |
| If True, force re-extraction of all files. | |
| verbose : bool | |
| If True, show detailed output. | |
| quiet : bool | |
| If True, suppress progress output. | |
| Returns: | |
| ------- | |
| tuple[ExtractionStatistics | None, str | None] | |
| A tuple of (statistics, error_message). If extraction succeeded, | |
| statistics is populated and error_message is None. If it failed, | |
| statistics is None and error_message describes the failure. | |
| """ | |
| # Dynamically load the extraction script module | |
| extract_module = _load_script_module("extract") | |
| run_extraction = extract_module.run_extraction | |
| try: | |
| stats = run_extraction( | |
| input_dir=input_dir, | |
| output_dir=output_dir, | |
| force=force, | |
| verbose=verbose, | |
| quiet=quiet, | |
| ) | |
| except Exception as e: | |
| return None, f"Extraction failed: {e}" | |
| # Check for complete failure | |
| if stats.total == 0: | |
| return None, "No PDF files were processed" | |
| if stats.failed == stats.total: | |
| return None, f"All {stats.total} files failed to extract" | |
| return stats, None | |
| def _run_chunking( | |
| input_dir: Path, | |
| output_path: Path, | |
| force: bool, | |
| verbose: bool, | |
| quiet: bool, | |
| ) -> tuple[ChunkingStatistics | None, str | None]: | |
| """Run the document chunking step. | |
| Args: | |
| ---- | |
| input_dir : Path | |
| Directory containing markdown files. | |
| output_path : Path | |
| Path to output JSONL file. | |
| force : bool | |
| If True, force re-chunking of all files. | |
| verbose : bool | |
| If True, show detailed output. | |
| quiet : bool | |
| If True, suppress progress output. | |
| Returns: | |
| ------- | |
| tuple[ChunkingStatistics | None, str | None] | |
| A tuple of (statistics, error_message). If chunking succeeded, | |
| statistics is populated and error_message is None. If it failed, | |
| statistics is None and error_message describes the failure. | |
| """ | |
| # Dynamically load the chunking script module | |
| chunk_module = _load_script_module("chunk") | |
| run_chunking = chunk_module.run_chunking | |
| try: | |
| stats = run_chunking( | |
| input_dir=input_dir, | |
| output_path=output_path, | |
| force=force, | |
| verbose=verbose, | |
| quiet=quiet, | |
| ) | |
| except Exception as e: | |
| return None, f"Chunking failed: {e}" | |
| # Check for complete failure | |
| if stats.total_files == 0: | |
| return None, "No markdown files were found to chunk" | |
| if stats.failed == stats.total_files: | |
| return None, f"All {stats.total_files} files failed to chunk" | |
| if stats.total_chunks == 0: | |
| return None, "No chunks were created" | |
| return stats, None | |
| def _run_embedding( | |
| input_path: Path, | |
| output_dir: Path, | |
| publish: bool, | |
| quiet: bool, | |
| ) -> tuple[float, str | None, str | None]: | |
| """Run the embedding generation and index building step. | |
| This function generates embeddings, builds FAISS and BM25 indexes, | |
| and optionally publishes to HuggingFace. | |
| Args: | |
| ---- | |
| input_path : Path | |
| Path to chunks.jsonl file. | |
| output_dir : Path | |
| Directory for embedding output. | |
| publish : bool | |
| If True, publish artifacts to HuggingFace. | |
| quiet : bool | |
| If True, suppress progress output. | |
| Returns: | |
| ------- | |
| tuple[float, str | None, str | None] | |
| A tuple of (elapsed_time, dataset_url, error_message). | |
| - elapsed_time: Time taken for embedding/indexing | |
| - dataset_url: URL if published, None otherwise | |
| - error_message: None on success, error description on failure | |
| """ | |
| # Lazy imports for heavy dependencies | |
| import math | |
| from rich.console import Console | |
| from rich.progress import ( | |
| BarColumn, | |
| MofNCompleteColumn, | |
| Progress, | |
| SpinnerColumn, | |
| TaskProgressColumn, | |
| TextColumn, | |
| TimeElapsedColumn, | |
| TimeRemainingColumn, | |
| ) | |
| # Dynamically load the embed script module | |
| embed_module = _load_script_module("embed") | |
| build_indexes = embed_module.build_indexes | |
| create_embedding_records = embed_module.create_embedding_records | |
| load_chunks_from_jsonl = embed_module.load_chunks_from_jsonl | |
| publish_to_huggingface = embed_module.publish_to_huggingface | |
| console = Console() | |
| start_time = time.perf_counter() | |
| dataset_url: str | None = None | |
| try: | |
| # Load chunks | |
| if not quiet: | |
| console.print(" Loading chunks...") | |
| chunks = load_chunks_from_jsonl(input_path) | |
| if not chunks: | |
| return 0.0, None, "No chunks loaded from JSONL file" | |
| # Lazy import encoder dependencies | |
| from rag_chatbot.embeddings import ( | |
| BGEEncoder, | |
| EmbeddingBatch, | |
| EmbeddingStorage, | |
| ) | |
| # Create output directory | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Initialize encoder | |
| model_name = "BAAI/bge-small-en-v1.5" | |
| encoder = BGEEncoder( | |
| model_name=model_name, | |
| device=None, # Auto-detect | |
| normalize_text=False, | |
| ) | |
| # Initialize storage | |
| storage = EmbeddingStorage(output_dir) | |
| # Calculate batches | |
| batch_size = 32 | |
| total_batches = math.ceil(len(chunks) / batch_size) | |
| # Generate embeddings with progress | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TaskProgressColumn(), | |
| MofNCompleteColumn(), | |
| TimeElapsedColumn(), | |
| TimeRemainingColumn(), | |
| console=console, | |
| disable=quiet, | |
| ) as progress: | |
| # Embedding task | |
| embed_task = progress.add_task( | |
| "[cyan]Embedding chunks...", | |
| total=total_batches, | |
| ) | |
| # Generate embeddings | |
| records = create_embedding_records( | |
| chunks=chunks, | |
| encoder=encoder, | |
| batch_size=batch_size, | |
| progress=progress, | |
| task_id=embed_task, | |
| ) | |
| progress.update(embed_task, completed=total_batches) | |
| # Save embeddings | |
| save_task = progress.add_task("[cyan]Saving embeddings...", total=1) | |
| batch = EmbeddingBatch( | |
| model_name=model_name, | |
| dimension=encoder.embedding_dim, | |
| dtype="float16", | |
| records=records, | |
| ) | |
| storage.save(batch) | |
| progress.update(save_task, completed=1) | |
| # Build indexes | |
| _faiss_time, _bm25_time = build_indexes( | |
| output_dir=output_dir, | |
| chunks=chunks, | |
| progress=progress, | |
| ) | |
| # Publish if requested | |
| if publish: | |
| dataset_url = publish_to_huggingface( | |
| output_dir=output_dir, | |
| chunks=chunks, | |
| model_name=model_name, | |
| embedding_dim=encoder.embedding_dim, | |
| progress=progress, | |
| ) | |
| except FileNotFoundError as e: | |
| elapsed = time.perf_counter() - start_time | |
| return elapsed, None, f"File not found: {e}" | |
| except ValueError as e: | |
| elapsed = time.perf_counter() - start_time | |
| return elapsed, None, f"Invalid data: {e}" | |
| except Exception as e: | |
| elapsed = time.perf_counter() - start_time | |
| return elapsed, None, f"Embedding failed: {e}" | |
| else: | |
| elapsed = time.perf_counter() - start_time | |
| return elapsed, dataset_url, None | |
| # ============================================================================= | |
| # Main Rebuild Logic | |
| # ============================================================================= | |
| def run_rebuild( # noqa: PLR0912, PLR0913, PLR0915 | |
| input_dir: Path, | |
| output_dir: Path, | |
| publish: bool, | |
| force: bool, | |
| verbose: bool, | |
| quiet: bool, | |
| ) -> RebuildStatistics: | |
| """Run the full rebuild pipeline. | |
| This function orchestrates the complete build process: | |
| 1. Validates input directory and PDF files | |
| 2. Clears artifact directories (if force or changed) | |
| 3. Extracts PDFs to Markdown | |
| 4. Chunks documents into semantic segments | |
| 5. Generates embeddings and builds indexes | |
| 6. Publishes to HuggingFace (if requested) | |
| Args: | |
| ---- | |
| input_dir : Path | |
| Directory containing raw PDF files. | |
| output_dir : Path | |
| Base output directory for all artifacts. | |
| publish : bool | |
| If True, publish artifacts to HuggingFace after build. | |
| force : bool | |
| If True, force rebuild by clearing all existing artifacts. | |
| verbose : bool | |
| If True, show detailed output during processing. | |
| quiet : bool | |
| If True, suppress progress output (still shows summary). | |
| Returns: | |
| ------- | |
| RebuildStatistics | |
| Statistics about the rebuild run including timing and sizes. | |
| Note: | |
| ---- | |
| The function uses fail-fast behavior - if a critical step fails | |
| (validation, extraction, chunking), subsequent steps are skipped. | |
| Partial failures (some files failed) continue to next steps. | |
| """ | |
| # ------------------------------------------------------------------------- | |
| # Start timing and initialize | |
| # ------------------------------------------------------------------------- | |
| pipeline_start = time.perf_counter() | |
| console = _get_console() | |
| # Initialize statistics | |
| stats = RebuildStatistics() | |
| # Calculate total steps (4 without publish, 5 with publish) | |
| total_steps = 5 if publish else 4 | |
| # Define output subdirectories | |
| processed_dir = output_dir / "processed" | |
| chunks_dir = output_dir / "chunks" | |
| chunks_file = chunks_dir / "chunks.jsonl" | |
| embeddings_dir = output_dir / "embeddings" | |
| # ------------------------------------------------------------------------- | |
| # Print header | |
| # ------------------------------------------------------------------------- | |
| _print_header(console, quiet) | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Validate Input | |
| # ------------------------------------------------------------------------- | |
| _print_step(console, 1, total_steps, STEP_VALIDATE, quiet) | |
| validate_start = time.perf_counter() | |
| valid, pdf_files, errors = _validate_input(input_dir, quiet) | |
| validate_elapsed = time.perf_counter() - validate_start | |
| if not valid: | |
| stats.errors.extend(errors) | |
| stats.total_time = time.perf_counter() - pipeline_start | |
| _print_summary(console, stats) | |
| return stats | |
| stats.total_pdfs = len(pdf_files) | |
| _print_step_result( | |
| console, | |
| f"Found {len(pdf_files)} PDF files", | |
| validate_elapsed, | |
| quiet, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Clear Artifacts (if force) | |
| # ------------------------------------------------------------------------- | |
| _print_step(console, 2, total_steps, STEP_CLEAR, quiet) | |
| clear_start = time.perf_counter() | |
| if force: | |
| dirs_to_clear = [processed_dir, chunks_dir, embeddings_dir] | |
| cleared_count = 0 | |
| for dir_path in dirs_to_clear: | |
| if _clear_directory(dir_path, verbose, quiet): | |
| cleared_count += 1 | |
| else: | |
| stats.errors.append(f"Failed to clear {dir_path}") | |
| clear_elapsed = time.perf_counter() - clear_start | |
| _print_step_result( | |
| console, | |
| f"Cleared {cleared_count} directories", | |
| clear_elapsed, | |
| quiet, | |
| ) | |
| else: | |
| clear_elapsed = time.perf_counter() - clear_start | |
| _print_step_result( | |
| console, | |
| "Skipped (use --force to clear)", | |
| clear_elapsed, | |
| quiet, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Step 3: Extract PDFs | |
| # ------------------------------------------------------------------------- | |
| _print_step(console, 3, total_steps, STEP_EXTRACT, quiet) | |
| extract_start = time.perf_counter() | |
| extraction_stats, extraction_error = _run_extraction( | |
| input_dir=input_dir, | |
| output_dir=processed_dir, | |
| force=force, | |
| verbose=verbose, | |
| quiet=quiet, | |
| ) | |
| stats.extraction_time = time.perf_counter() - extract_start | |
| if extraction_error: | |
| stats.errors.append(extraction_error) | |
| stats.total_time = time.perf_counter() - pipeline_start | |
| _print_summary(console, stats) | |
| return stats | |
| if extraction_stats: | |
| stats.total_pages = extraction_stats.total_pages | |
| detail = ( | |
| f"Processed: {extraction_stats.extracted} files, " | |
| f"{extraction_stats.total_pages} pages" | |
| ) | |
| _print_step_result(console, detail, stats.extraction_time, quiet) | |
| # ------------------------------------------------------------------------- | |
| # Step 4: Chunk Documents | |
| # ------------------------------------------------------------------------- | |
| current_step = 4 if publish else 4 | |
| _print_step(console, current_step, total_steps, STEP_CHUNK, quiet) | |
| chunk_start = time.perf_counter() | |
| chunking_stats, chunking_error = _run_chunking( | |
| input_dir=processed_dir, | |
| output_path=chunks_file, | |
| force=force, | |
| verbose=verbose, | |
| quiet=quiet, | |
| ) | |
| stats.chunking_time = time.perf_counter() - chunk_start | |
| if chunking_error: | |
| stats.errors.append(chunking_error) | |
| stats.total_time = time.perf_counter() - pipeline_start | |
| _print_summary(console, stats) | |
| return stats | |
| if chunking_stats: | |
| stats.total_chunks = chunking_stats.total_chunks | |
| _print_step_result( | |
| console, | |
| f"Created: {chunking_stats.total_chunks} chunks", | |
| stats.chunking_time, | |
| quiet, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Step 5: Generate Embeddings and Build Indexes | |
| # ------------------------------------------------------------------------- | |
| if publish: | |
| step_desc = f"{STEP_EMBED} & {STEP_PUBLISH}" | |
| _print_step(console, 5, total_steps, step_desc, quiet) | |
| else: | |
| _print_step(console, 4, total_steps, STEP_EMBED, quiet) | |
| embedding_time, dataset_url, embedding_error = _run_embedding( | |
| input_path=chunks_file, | |
| output_dir=embeddings_dir, | |
| publish=publish, | |
| quiet=quiet, | |
| ) | |
| stats.embedding_time = embedding_time | |
| if embedding_error: | |
| stats.errors.append(embedding_error) | |
| else: | |
| # Collect artifact sizes | |
| emb_parquet = embeddings_dir / "embeddings.parquet" | |
| faiss_bin = embeddings_dir / "faiss_index.bin" | |
| bm25_pkl = embeddings_dir / "bm25_index.pkl" | |
| stats.embeddings_size_mb = _get_file_size_mb(emb_parquet) | |
| stats.faiss_index_size_mb = _get_file_size_mb(faiss_bin) | |
| stats.bm25_index_size_mb = _get_file_size_mb(bm25_pkl) | |
| stats.dataset_url = dataset_url | |
| if publish and dataset_url: | |
| detail = ( | |
| f"Embedded: {stats.total_chunks} chunks, " f"Published: {dataset_url}" | |
| ) | |
| _print_step_result(console, detail, stats.embedding_time, quiet) | |
| else: | |
| detail = f"Embedded: {stats.total_chunks} chunks" | |
| _print_step_result(console, detail, stats.embedding_time, quiet) | |
| # ------------------------------------------------------------------------- | |
| # Calculate total time and print summary | |
| # ------------------------------------------------------------------------- | |
| stats.total_time = time.perf_counter() - pipeline_start | |
| _print_summary(console, stats) | |
| return stats | |
| # ============================================================================= | |
| # Main Entry Point | |
| # ============================================================================= | |
| def main(argv: list[str] | None = None) -> int: | |
| """Execute the full rebuild pipeline CLI script. | |
| This function orchestrates the entire rebuild process: | |
| 1. Parses command-line arguments | |
| 2. Validates input directory | |
| 3. Runs the full pipeline | |
| 4. Returns appropriate exit code | |
| Args: | |
| ---- | |
| argv : list[str] | None, optional | |
| Command-line arguments to parse. If None, uses sys.argv[1:]. | |
| Returns: | |
| ------- | |
| int | |
| Exit code indicating success or failure: | |
| - 0: Success (all steps completed successfully) | |
| - 1: Partial failure (some steps failed but pipeline continued) | |
| - 2: Total failure (pipeline could not complete) | |
| Example: | |
| ------- | |
| >>> exit_code = main(["data/raw/", "--publish"]) | |
| >>> exit_code | |
| 0 | |
| """ | |
| # ------------------------------------------------------------------------- | |
| # Parse arguments | |
| # ------------------------------------------------------------------------- | |
| args = parse_args(argv) | |
| # ------------------------------------------------------------------------- | |
| # Run the rebuild pipeline | |
| # ------------------------------------------------------------------------- | |
| stats = run_rebuild( | |
| input_dir=args.input_dir, | |
| output_dir=args.output_dir, | |
| publish=args.publish, | |
| force=args.force, | |
| verbose=args.verbose, | |
| quiet=args.quiet, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Determine exit code based on results | |
| # ------------------------------------------------------------------------- | |
| if stats.success: | |
| # All steps completed successfully | |
| return EXIT_SUCCESS | |
| # Check if any useful work was done | |
| if stats.total_chunks > 0: | |
| # Some work completed but there were errors | |
| return EXIT_PARTIAL_FAILURE | |
| # Complete failure - no useful output | |
| return EXIT_TOTAL_FAILURE | |
| # ============================================================================= | |
| # Script Entry Point | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |