sadickam's picture
Initial commit for HF Space
3326079
#!/usr/bin/env python3
"""Full rebuild CLI script for the RAG chatbot build pipeline.
This script orchestrates the complete build pipeline from raw PDF files
through to published HuggingFace artifacts. It is the master build script
that chains together extraction, chunking, embedding, and publishing.
Pipeline Steps:
1. Validate Input - Check data/raw/ exists and contains PDF files
2. Clear Artifacts - Remove old artifacts (data/processed/, chunks/, embeddings/)
3. Extract - PDF to Markdown extraction using pymupdf4llm
4. Chunk - Structure-aware chunking with heading inheritance
5. Embed & Index - BGE embedding + FAISS/BM25 index building
6. Publish - Upload artifacts to HuggingFace (if --publish)
Features:
- Deterministic builds: Same inputs produce same outputs (sorted file lists)
- Fail-fast behavior: Exits immediately on critical errors
- Verbose/quiet modes: Control output verbosity
- Progress reporting: Rich progress bars for each stage
- Statistics summary: Total time, artifact sizes, throughput metrics
Exit Codes:
0: Success - Full pipeline completed successfully
1: Partial failure - Some steps completed but others failed
2: Total failure - Pipeline could not start or failed completely
Example Usage:
# Basic rebuild (no publish)
poetry run python scripts/rebuild.py data/raw/
# Full rebuild with HuggingFace publishing
poetry run python scripts/rebuild.py data/raw/ --publish
# Force rebuild even if artifacts exist
poetry run python scripts/rebuild.py data/raw/ --force
# Custom output directory
poetry run python scripts/rebuild.py data/raw/ --output-dir /tmp/build/
# Verbose mode (show detailed progress)
poetry run python scripts/rebuild.py data/raw/ -v
# Quiet mode (suppress progress, show only summary)
poetry run python scripts/rebuild.py data/raw/ -q
Note:
----
This script imports and calls functions from the individual pipeline
scripts (extract.py, chunk.py, embed.py) to ensure consistency.
Heavy dependencies are lazily loaded to ensure fast CLI startup.
"""
from __future__ import annotations
import argparse
import importlib.util
import shutil
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING
# =============================================================================
# Environment Variable Loading
# =============================================================================
# Load environment variables from .env file at script startup.
# This ensures HF_TOKEN and other secrets are available before they're needed.
# The .env file should be in the project root directory.
# =============================================================================
from dotenv import load_dotenv
# Find the project root (parent of scripts/ directory) and load .env from there
_PROJECT_ROOT = Path(__file__).parent.parent
_ENV_FILE = _PROJECT_ROOT / ".env"
if _ENV_FILE.exists():
load_dotenv(_ENV_FILE)
# =============================================================================
# Type Checking Imports
# =============================================================================
# Heavy dependencies are imported lazily to ensure fast CLI startup.
# Type checkers still see the types for proper type checking.
# =============================================================================
if TYPE_CHECKING:
from rich.console import Console
from scripts.chunk import ChunkingStatistics
from scripts.extract import ExtractionStatistics
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = [
"RebuildStatistics",
"parse_args",
"run_rebuild",
"main",
]
# =============================================================================
# Constants
# =============================================================================
# Exit codes for the CLI script
EXIT_SUCCESS = 0 # All pipeline steps completed successfully
EXIT_PARTIAL_FAILURE = 1 # Some steps failed but others succeeded
EXIT_TOTAL_FAILURE = 2 # Pipeline could not run or failed completely
# Default HuggingFace repository for artifacts
DEFAULT_HF_REPO = "sadickam/pytherm_index"
# Pipeline step names for progress display
STEP_VALIDATE = "Validating input"
STEP_CLEAR = "Clearing artifacts"
STEP_EXTRACT = "Extracting PDFs"
STEP_CHUNK = "Chunking documents"
STEP_EMBED = "Generating embeddings"
STEP_INDEX = "Building indexes"
STEP_PUBLISH = "Publishing to HuggingFace"
# Scripts directory path for dynamic imports
SCRIPTS_DIR = Path(__file__).parent
# =============================================================================
# Module Loading Helper
# =============================================================================
def _load_script_module(script_name: str) -> ModuleType:
"""Dynamically load a script module from the scripts directory.
This function uses importlib to load Python modules from file paths,
avoiding the need for the scripts directory to be a package.
Args:
----
script_name : str
Name of the script file without .py extension (e.g., "extract").
Returns:
-------
ModuleType
The loaded module object.
Raises:
------
ImportError: If the module cannot be loaded.
Example:
-------
>>> extract_module = _load_script_module("extract")
>>> extract_module.run_extraction(...)
"""
script_path = SCRIPTS_DIR / f"{script_name}.py"
if not script_path.exists():
msg = f"Script not found: {script_path}"
raise ImportError(msg)
spec = importlib.util.spec_from_file_location(script_name, script_path)
if spec is None or spec.loader is None:
msg = f"Cannot load spec for: {script_path}"
raise ImportError(msg)
module = importlib.util.module_from_spec(spec)
sys.modules[script_name] = module
spec.loader.exec_module(module)
return module
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class RebuildStatistics:
"""Statistics from a full rebuild pipeline run.
This dataclass tracks metrics from the complete build pipeline,
including timing for each stage and artifact sizes.
Attributes:
----------
total_pdfs : int
Total number of PDF files found in the input directory.
Must be non-negative.
total_pages : int
Total number of pages extracted across all PDFs.
Must be non-negative.
total_chunks : int
Total number of chunks created from extracted documents.
Must be non-negative.
extraction_time : float
Time spent on PDF extraction in seconds.
Must be non-negative.
chunking_time : float
Time spent on document chunking in seconds.
Must be non-negative.
embedding_time : float
Time spent on embedding generation and index building in seconds.
Must be non-negative.
publish_time : float
Time spent on HuggingFace publishing in seconds.
Must be non-negative (0.0 if not publishing).
total_time : float
Total elapsed time for the entire pipeline in seconds.
Must be non-negative.
embeddings_size_mb : float
Size of the embeddings.parquet file in megabytes.
Must be non-negative.
faiss_index_size_mb : float
Size of the FAISS index file in megabytes.
Must be non-negative.
bm25_index_size_mb : float
Size of the BM25 index file in megabytes.
Must be non-negative.
dataset_url : str | None
URL of the published HuggingFace dataset, or None if not published.
errors : list[str]
List of error messages encountered during the pipeline.
Empty if no errors occurred.
Example:
-------
>>> stats = RebuildStatistics(
... total_pdfs=5,
... total_pages=42,
... total_chunks=150,
... extraction_time=12.3,
... chunking_time=2.1,
... embedding_time=8.5,
... publish_time=5.0,
... total_time=28.0,
... embeddings_size_mb=2.1,
... faiss_index_size_mb=1.2,
... bm25_index_size_mb=0.3,
... dataset_url="https://huggingface.co/datasets/sadickam/pytherm_index",
... errors=[],
... )
>>> stats.total_pdfs
5
"""
total_pdfs: int = 0
total_pages: int = 0
total_chunks: int = 0
extraction_time: float = 0.0
chunking_time: float = 0.0
embedding_time: float = 0.0
publish_time: float = 0.0
total_time: float = 0.0
embeddings_size_mb: float = 0.0
faiss_index_size_mb: float = 0.0
bm25_index_size_mb: float = 0.0
dataset_url: str | None = None
errors: list[str] = field(default_factory=list)
def __post_init__(self) -> None:
"""Validate statistics values after initialization.
Raises
------
ValueError: If any numeric value is negative.
"""
# Validate all counts are non-negative
if self.total_pdfs < 0:
msg = f"total_pdfs must be non-negative, got {self.total_pdfs}"
raise ValueError(msg)
if self.total_pages < 0:
msg = f"total_pages must be non-negative, got {self.total_pages}"
raise ValueError(msg)
if self.total_chunks < 0:
msg = f"total_chunks must be non-negative, got {self.total_chunks}"
raise ValueError(msg)
# Validate all times are non-negative
if self.extraction_time < 0:
msg = f"extraction_time must be non-negative, got {self.extraction_time}"
raise ValueError(msg)
if self.chunking_time < 0:
msg = f"chunking_time must be non-negative, got {self.chunking_time}"
raise ValueError(msg)
if self.embedding_time < 0:
msg = f"embedding_time must be non-negative, got {self.embedding_time}"
raise ValueError(msg)
if self.publish_time < 0:
msg = f"publish_time must be non-negative, got {self.publish_time}"
raise ValueError(msg)
if self.total_time < 0:
msg = f"total_time must be non-negative, got {self.total_time}"
raise ValueError(msg)
# Validate all sizes are non-negative
if self.embeddings_size_mb < 0:
msg = (
f"embeddings_size_mb must be non-negative, "
f"got {self.embeddings_size_mb}"
)
raise ValueError(msg)
if self.faiss_index_size_mb < 0:
msg = (
f"faiss_index_size_mb must be non-negative, "
f"got {self.faiss_index_size_mb}"
)
raise ValueError(msg)
if self.bm25_index_size_mb < 0:
msg = (
f"bm25_index_size_mb must be non-negative, "
f"got {self.bm25_index_size_mb}"
)
raise ValueError(msg)
@property
def success(self) -> bool:
"""Check if the rebuild completed successfully.
Returns
-------
bool
True if no errors occurred, False otherwise.
"""
return len(self.errors) == 0
# =============================================================================
# Argument Parsing
# =============================================================================
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse command-line arguments for the rebuild script.
This function sets up the argument parser with all supported options
and returns the parsed arguments. It handles validation of mutually
exclusive flags (--verbose and --quiet cannot be used together).
Args:
----
argv : list[str] | None, optional
Command-line arguments to parse. If None, uses sys.argv[1:].
This parameter enables testing without modifying sys.argv.
Returns:
-------
argparse.Namespace
Parsed arguments with the following attributes:
- input_dir: Path - Directory containing raw PDF files
- output_dir: Path - Base output directory (default: data/)
- publish: bool - Whether to publish to HuggingFace
- force: bool - Whether to force rebuild even if artifacts exist
- verbose: bool - Whether to show detailed output
- quiet: bool - Whether to suppress progress output
Raises:
------
SystemExit
If required arguments are missing, unknown arguments are provided,
or --verbose and --quiet are both specified.
Example:
-------
>>> args = parse_args(["data/raw/", "--publish"])
>>> args.input_dir
PosixPath('data/raw')
>>> args.publish
True
"""
# -------------------------------------------------------------------------
# Create the argument parser with program description
# -------------------------------------------------------------------------
parser = argparse.ArgumentParser(
prog="rebuild.py",
description=(
"Full rebuild pipeline for the RAG chatbot. "
"Processes PDF files through extraction, chunking, embedding, "
"and optionally publishes artifacts to HuggingFace."
),
epilog=(
"Examples:\n"
" %(prog)s data/raw/ # Basic rebuild\n"
" %(prog)s data/raw/ --publish # Rebuild and publish\n"
" %(prog)s data/raw/ --force # Force full rebuild\n"
" %(prog)s data/raw/ -v # Verbose output\n"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# -------------------------------------------------------------------------
# Positional Arguments
# -------------------------------------------------------------------------
parser.add_argument(
"input_dir",
type=Path,
help="Directory containing raw PDF files to process",
)
# -------------------------------------------------------------------------
# Optional Arguments
# -------------------------------------------------------------------------
parser.add_argument(
"--output-dir",
type=Path,
default=Path("data"),
help="Base output directory for artifacts (default: data/)",
)
parser.add_argument(
"--publish",
action="store_true",
default=False,
help="Publish artifacts to HuggingFace after build",
)
parser.add_argument(
"--force",
"-f",
action="store_true",
default=False,
help="Force rebuild even if artifacts exist (clears all artifacts)",
)
# Create mutually exclusive group for verbose/quiet
# These flags cannot be used together
output_group = parser.add_mutually_exclusive_group()
output_group.add_argument(
"--verbose",
"-v",
action="store_true",
default=False,
help="Show detailed output including file names being processed",
)
output_group.add_argument(
"--quiet",
"-q",
action="store_true",
default=False,
help="Suppress progress output (still shows summary)",
)
# -------------------------------------------------------------------------
# Parse and return arguments
# -------------------------------------------------------------------------
return parser.parse_args(argv)
# =============================================================================
# Helper Functions
# =============================================================================
def _get_console() -> Console:
"""Lazily load and return a Rich Console instance.
Returns
-------
Console
A Rich Console instance for styled output.
"""
from rich.console import Console
return Console()
def _get_file_size_mb(path: Path) -> float:
"""Get the size of a file in megabytes.
Args:
----
path : Path
Path to the file.
Returns:
-------
float
File size in megabytes, or 0.0 if file doesn't exist.
"""
if not path.exists():
return 0.0
return path.stat().st_size / (1024 * 1024)
def _clear_directory(directory: Path, verbose: bool, quiet: bool) -> bool:
"""Clear all contents of a directory.
Removes all files and subdirectories within the specified directory.
The directory itself is preserved (not deleted).
Args:
----
directory : Path
The directory to clear.
verbose : bool
If True, print detailed information about what's being deleted.
quiet : bool
If True, suppress output.
Returns:
-------
bool
True if directory was cleared successfully, False on error.
"""
if not directory.exists():
return True
try:
# Remove all contents but keep the directory
for item in directory.iterdir():
if item.is_file():
if verbose:
print(f" Removing file: {item.name}")
item.unlink()
elif item.is_dir():
if verbose:
print(f" Removing directory: {item.name}/")
shutil.rmtree(item)
except PermissionError as e:
if not quiet:
print(
f"Error: Permission denied clearing {directory}: {e}",
file=sys.stderr,
)
return False
except OSError as e:
if not quiet:
print(f"Error: Failed to clear {directory}: {e}", file=sys.stderr)
return False
else:
return True
def _validate_input(
input_dir: Path,
quiet: bool,
) -> tuple[bool, list[Path], list[str]]:
"""Validate the input directory and find PDF files.
Checks that the input directory exists, is a directory, and contains
at least one PDF file. Returns validation status and list of PDF files.
Args:
----
input_dir : Path
The input directory to validate.
quiet : bool
If True, suppress output messages.
Returns:
-------
tuple[bool, list[Path], list[str]]
A tuple containing:
- bool: True if validation passed, False otherwise
- list[Path]: List of PDF file paths (sorted for determinism)
- list[str]: List of error messages (empty if validation passed)
"""
errors: list[str] = []
# Check if input directory exists
if not input_dir.exists():
msg = f"Input directory does not exist: {input_dir}"
if not quiet:
print(f"Error: {msg}", file=sys.stderr)
errors.append(msg)
return False, [], errors
# Check if input is a directory
if not input_dir.is_dir():
msg = f"Input path is not a directory: {input_dir}"
if not quiet:
print(f"Error: {msg}", file=sys.stderr)
errors.append(msg)
return False, [], errors
# Find PDF files (sorted for deterministic processing)
pdf_files = sorted(input_dir.glob("*.pdf"))
# Check if any PDF files were found
if not pdf_files:
msg = f"No PDF files found in {input_dir}"
if not quiet:
print(f"Error: {msg}", file=sys.stderr)
errors.append(msg)
return False, [], errors
return True, pdf_files, []
def _print_header(console: Console, quiet: bool) -> None:
"""Print the pipeline header banner.
Args:
----
console : Console
Rich console for output.
quiet : bool
If True, suppress output.
"""
if quiet:
return
console.print()
console.print("[bold cyan]Full Rebuild Pipeline[/bold cyan]")
console.print("[cyan]" + "=" * 40 + "[/cyan]")
console.print()
def _print_step(
console: Console,
step_num: int,
total_steps: int,
description: str,
quiet: bool,
) -> None:
"""Print a pipeline step header.
Args:
----
console : Console
Rich console for output.
step_num : int
Current step number (1-indexed).
total_steps : int
Total number of steps.
description : str
Description of the step.
quiet : bool
If True, suppress output.
"""
if quiet:
return
console.print(f"[bold cyan][{step_num}/{total_steps}][/bold cyan] {description}...")
def _print_step_result(
console: Console,
detail: str,
elapsed: float,
quiet: bool,
) -> None:
"""Print the result of a pipeline step.
Args:
----
console : Console
Rich console for output.
detail : str
Detail message about what was accomplished.
elapsed : float
Time taken for the step in seconds.
quiet : bool
If True, suppress output.
"""
if quiet:
return
console.print(f" {detail}")
console.print(f" Time: {elapsed:.1f}s")
console.print()
def _print_summary(
console: Console,
stats: RebuildStatistics,
) -> None:
"""Print the final rebuild summary.
Displays a formatted summary of the rebuild run including counts,
timing information, and artifact sizes.
Args:
----
console : Console
Rich console for output.
stats : RebuildStatistics
Statistics from the rebuild run.
"""
# Import Rich table lazily
from rich.table import Table
console.print()
# Print success or failure banner
if stats.success:
console.print("[bold green]Rebuild Complete[/bold green]")
else:
console.print("[bold red]Rebuild Failed[/bold red]")
console.print("[cyan]" + "=" * 40 + "[/cyan]")
# Create statistics table
table = Table(show_header=False, box=None, padding=(0, 2))
table.add_column("Metric", style="dim", width=20)
table.add_column("Value", justify="right")
# Input/Output statistics
table.add_row("Input PDFs:", f"{stats.total_pdfs}")
table.add_row("Total Pages:", f"{stats.total_pages}")
table.add_row("Output Chunks:", f"{stats.total_chunks}")
table.add_row("", "")
# Timing breakdown
table.add_row("Extraction Time:", f"{stats.extraction_time:.1f}s")
table.add_row("Chunking Time:", f"{stats.chunking_time:.1f}s")
table.add_row("Embedding Time:", f"{stats.embedding_time:.1f}s")
if stats.publish_time > 0:
table.add_row("Publish Time:", f"{stats.publish_time:.1f}s")
table.add_row("[bold]Total Time:[/bold]", f"[bold]{stats.total_time:.1f}s[/bold]")
table.add_row("", "")
# Artifact sizes
if stats.embeddings_size_mb > 0 or stats.faiss_index_size_mb > 0:
table.add_row("Embeddings:", f"{stats.embeddings_size_mb:.2f} MB")
table.add_row("FAISS Index:", f"{stats.faiss_index_size_mb:.2f} MB")
table.add_row("BM25 Index:", f"{stats.bm25_index_size_mb:.2f} MB")
# Dataset URL if published
if stats.dataset_url:
table.add_row("", "")
table.add_row("Published URL:", stats.dataset_url)
console.print(table)
console.print("[cyan]" + "=" * 40 + "[/cyan]")
console.print()
# Print any errors
if stats.errors:
console.print("[bold red]Errors encountered:[/bold red]")
for error in stats.errors:
console.print(f" - {error}")
console.print()
# =============================================================================
# Pipeline Step Functions
# =============================================================================
def _run_extraction(
input_dir: Path,
output_dir: Path,
force: bool,
verbose: bool,
quiet: bool,
) -> tuple[ExtractionStatistics | None, str | None]:
"""Run the PDF extraction step.
Args:
----
input_dir : Path
Directory containing PDF files.
output_dir : Path
Directory for markdown output.
force : bool
If True, force re-extraction of all files.
verbose : bool
If True, show detailed output.
quiet : bool
If True, suppress progress output.
Returns:
-------
tuple[ExtractionStatistics | None, str | None]
A tuple of (statistics, error_message). If extraction succeeded,
statistics is populated and error_message is None. If it failed,
statistics is None and error_message describes the failure.
"""
# Dynamically load the extraction script module
extract_module = _load_script_module("extract")
run_extraction = extract_module.run_extraction
try:
stats = run_extraction(
input_dir=input_dir,
output_dir=output_dir,
force=force,
verbose=verbose,
quiet=quiet,
)
except Exception as e:
return None, f"Extraction failed: {e}"
# Check for complete failure
if stats.total == 0:
return None, "No PDF files were processed"
if stats.failed == stats.total:
return None, f"All {stats.total} files failed to extract"
return stats, None
def _run_chunking(
input_dir: Path,
output_path: Path,
force: bool,
verbose: bool,
quiet: bool,
) -> tuple[ChunkingStatistics | None, str | None]:
"""Run the document chunking step.
Args:
----
input_dir : Path
Directory containing markdown files.
output_path : Path
Path to output JSONL file.
force : bool
If True, force re-chunking of all files.
verbose : bool
If True, show detailed output.
quiet : bool
If True, suppress progress output.
Returns:
-------
tuple[ChunkingStatistics | None, str | None]
A tuple of (statistics, error_message). If chunking succeeded,
statistics is populated and error_message is None. If it failed,
statistics is None and error_message describes the failure.
"""
# Dynamically load the chunking script module
chunk_module = _load_script_module("chunk")
run_chunking = chunk_module.run_chunking
try:
stats = run_chunking(
input_dir=input_dir,
output_path=output_path,
force=force,
verbose=verbose,
quiet=quiet,
)
except Exception as e:
return None, f"Chunking failed: {e}"
# Check for complete failure
if stats.total_files == 0:
return None, "No markdown files were found to chunk"
if stats.failed == stats.total_files:
return None, f"All {stats.total_files} files failed to chunk"
if stats.total_chunks == 0:
return None, "No chunks were created"
return stats, None
def _run_embedding(
input_path: Path,
output_dir: Path,
publish: bool,
quiet: bool,
) -> tuple[float, str | None, str | None]:
"""Run the embedding generation and index building step.
This function generates embeddings, builds FAISS and BM25 indexes,
and optionally publishes to HuggingFace.
Args:
----
input_path : Path
Path to chunks.jsonl file.
output_dir : Path
Directory for embedding output.
publish : bool
If True, publish artifacts to HuggingFace.
quiet : bool
If True, suppress progress output.
Returns:
-------
tuple[float, str | None, str | None]
A tuple of (elapsed_time, dataset_url, error_message).
- elapsed_time: Time taken for embedding/indexing
- dataset_url: URL if published, None otherwise
- error_message: None on success, error description on failure
"""
# Lazy imports for heavy dependencies
import math
from rich.console import Console
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
# Dynamically load the embed script module
embed_module = _load_script_module("embed")
build_indexes = embed_module.build_indexes
create_embedding_records = embed_module.create_embedding_records
load_chunks_from_jsonl = embed_module.load_chunks_from_jsonl
publish_to_huggingface = embed_module.publish_to_huggingface
console = Console()
start_time = time.perf_counter()
dataset_url: str | None = None
try:
# Load chunks
if not quiet:
console.print(" Loading chunks...")
chunks = load_chunks_from_jsonl(input_path)
if not chunks:
return 0.0, None, "No chunks loaded from JSONL file"
# Lazy import encoder dependencies
from rag_chatbot.embeddings import (
BGEEncoder,
EmbeddingBatch,
EmbeddingStorage,
)
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Initialize encoder
model_name = "BAAI/bge-small-en-v1.5"
encoder = BGEEncoder(
model_name=model_name,
device=None, # Auto-detect
normalize_text=False,
)
# Initialize storage
storage = EmbeddingStorage(output_dir)
# Calculate batches
batch_size = 32
total_batches = math.ceil(len(chunks) / batch_size)
# Generate embeddings with progress
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console,
disable=quiet,
) as progress:
# Embedding task
embed_task = progress.add_task(
"[cyan]Embedding chunks...",
total=total_batches,
)
# Generate embeddings
records = create_embedding_records(
chunks=chunks,
encoder=encoder,
batch_size=batch_size,
progress=progress,
task_id=embed_task,
)
progress.update(embed_task, completed=total_batches)
# Save embeddings
save_task = progress.add_task("[cyan]Saving embeddings...", total=1)
batch = EmbeddingBatch(
model_name=model_name,
dimension=encoder.embedding_dim,
dtype="float16",
records=records,
)
storage.save(batch)
progress.update(save_task, completed=1)
# Build indexes
_faiss_time, _bm25_time = build_indexes(
output_dir=output_dir,
chunks=chunks,
progress=progress,
)
# Publish if requested
if publish:
dataset_url = publish_to_huggingface(
output_dir=output_dir,
chunks=chunks,
model_name=model_name,
embedding_dim=encoder.embedding_dim,
progress=progress,
)
except FileNotFoundError as e:
elapsed = time.perf_counter() - start_time
return elapsed, None, f"File not found: {e}"
except ValueError as e:
elapsed = time.perf_counter() - start_time
return elapsed, None, f"Invalid data: {e}"
except Exception as e:
elapsed = time.perf_counter() - start_time
return elapsed, None, f"Embedding failed: {e}"
else:
elapsed = time.perf_counter() - start_time
return elapsed, dataset_url, None
# =============================================================================
# Main Rebuild Logic
# =============================================================================
def run_rebuild( # noqa: PLR0912, PLR0913, PLR0915
input_dir: Path,
output_dir: Path,
publish: bool,
force: bool,
verbose: bool,
quiet: bool,
) -> RebuildStatistics:
"""Run the full rebuild pipeline.
This function orchestrates the complete build process:
1. Validates input directory and PDF files
2. Clears artifact directories (if force or changed)
3. Extracts PDFs to Markdown
4. Chunks documents into semantic segments
5. Generates embeddings and builds indexes
6. Publishes to HuggingFace (if requested)
Args:
----
input_dir : Path
Directory containing raw PDF files.
output_dir : Path
Base output directory for all artifacts.
publish : bool
If True, publish artifacts to HuggingFace after build.
force : bool
If True, force rebuild by clearing all existing artifacts.
verbose : bool
If True, show detailed output during processing.
quiet : bool
If True, suppress progress output (still shows summary).
Returns:
-------
RebuildStatistics
Statistics about the rebuild run including timing and sizes.
Note:
----
The function uses fail-fast behavior - if a critical step fails
(validation, extraction, chunking), subsequent steps are skipped.
Partial failures (some files failed) continue to next steps.
"""
# -------------------------------------------------------------------------
# Start timing and initialize
# -------------------------------------------------------------------------
pipeline_start = time.perf_counter()
console = _get_console()
# Initialize statistics
stats = RebuildStatistics()
# Calculate total steps (4 without publish, 5 with publish)
total_steps = 5 if publish else 4
# Define output subdirectories
processed_dir = output_dir / "processed"
chunks_dir = output_dir / "chunks"
chunks_file = chunks_dir / "chunks.jsonl"
embeddings_dir = output_dir / "embeddings"
# -------------------------------------------------------------------------
# Print header
# -------------------------------------------------------------------------
_print_header(console, quiet)
# -------------------------------------------------------------------------
# Step 1: Validate Input
# -------------------------------------------------------------------------
_print_step(console, 1, total_steps, STEP_VALIDATE, quiet)
validate_start = time.perf_counter()
valid, pdf_files, errors = _validate_input(input_dir, quiet)
validate_elapsed = time.perf_counter() - validate_start
if not valid:
stats.errors.extend(errors)
stats.total_time = time.perf_counter() - pipeline_start
_print_summary(console, stats)
return stats
stats.total_pdfs = len(pdf_files)
_print_step_result(
console,
f"Found {len(pdf_files)} PDF files",
validate_elapsed,
quiet,
)
# -------------------------------------------------------------------------
# Step 2: Clear Artifacts (if force)
# -------------------------------------------------------------------------
_print_step(console, 2, total_steps, STEP_CLEAR, quiet)
clear_start = time.perf_counter()
if force:
dirs_to_clear = [processed_dir, chunks_dir, embeddings_dir]
cleared_count = 0
for dir_path in dirs_to_clear:
if _clear_directory(dir_path, verbose, quiet):
cleared_count += 1
else:
stats.errors.append(f"Failed to clear {dir_path}")
clear_elapsed = time.perf_counter() - clear_start
_print_step_result(
console,
f"Cleared {cleared_count} directories",
clear_elapsed,
quiet,
)
else:
clear_elapsed = time.perf_counter() - clear_start
_print_step_result(
console,
"Skipped (use --force to clear)",
clear_elapsed,
quiet,
)
# -------------------------------------------------------------------------
# Step 3: Extract PDFs
# -------------------------------------------------------------------------
_print_step(console, 3, total_steps, STEP_EXTRACT, quiet)
extract_start = time.perf_counter()
extraction_stats, extraction_error = _run_extraction(
input_dir=input_dir,
output_dir=processed_dir,
force=force,
verbose=verbose,
quiet=quiet,
)
stats.extraction_time = time.perf_counter() - extract_start
if extraction_error:
stats.errors.append(extraction_error)
stats.total_time = time.perf_counter() - pipeline_start
_print_summary(console, stats)
return stats
if extraction_stats:
stats.total_pages = extraction_stats.total_pages
detail = (
f"Processed: {extraction_stats.extracted} files, "
f"{extraction_stats.total_pages} pages"
)
_print_step_result(console, detail, stats.extraction_time, quiet)
# -------------------------------------------------------------------------
# Step 4: Chunk Documents
# -------------------------------------------------------------------------
current_step = 4 if publish else 4
_print_step(console, current_step, total_steps, STEP_CHUNK, quiet)
chunk_start = time.perf_counter()
chunking_stats, chunking_error = _run_chunking(
input_dir=processed_dir,
output_path=chunks_file,
force=force,
verbose=verbose,
quiet=quiet,
)
stats.chunking_time = time.perf_counter() - chunk_start
if chunking_error:
stats.errors.append(chunking_error)
stats.total_time = time.perf_counter() - pipeline_start
_print_summary(console, stats)
return stats
if chunking_stats:
stats.total_chunks = chunking_stats.total_chunks
_print_step_result(
console,
f"Created: {chunking_stats.total_chunks} chunks",
stats.chunking_time,
quiet,
)
# -------------------------------------------------------------------------
# Step 5: Generate Embeddings and Build Indexes
# -------------------------------------------------------------------------
if publish:
step_desc = f"{STEP_EMBED} & {STEP_PUBLISH}"
_print_step(console, 5, total_steps, step_desc, quiet)
else:
_print_step(console, 4, total_steps, STEP_EMBED, quiet)
embedding_time, dataset_url, embedding_error = _run_embedding(
input_path=chunks_file,
output_dir=embeddings_dir,
publish=publish,
quiet=quiet,
)
stats.embedding_time = embedding_time
if embedding_error:
stats.errors.append(embedding_error)
else:
# Collect artifact sizes
emb_parquet = embeddings_dir / "embeddings.parquet"
faiss_bin = embeddings_dir / "faiss_index.bin"
bm25_pkl = embeddings_dir / "bm25_index.pkl"
stats.embeddings_size_mb = _get_file_size_mb(emb_parquet)
stats.faiss_index_size_mb = _get_file_size_mb(faiss_bin)
stats.bm25_index_size_mb = _get_file_size_mb(bm25_pkl)
stats.dataset_url = dataset_url
if publish and dataset_url:
detail = (
f"Embedded: {stats.total_chunks} chunks, " f"Published: {dataset_url}"
)
_print_step_result(console, detail, stats.embedding_time, quiet)
else:
detail = f"Embedded: {stats.total_chunks} chunks"
_print_step_result(console, detail, stats.embedding_time, quiet)
# -------------------------------------------------------------------------
# Calculate total time and print summary
# -------------------------------------------------------------------------
stats.total_time = time.perf_counter() - pipeline_start
_print_summary(console, stats)
return stats
# =============================================================================
# Main Entry Point
# =============================================================================
def main(argv: list[str] | None = None) -> int:
"""Execute the full rebuild pipeline CLI script.
This function orchestrates the entire rebuild process:
1. Parses command-line arguments
2. Validates input directory
3. Runs the full pipeline
4. Returns appropriate exit code
Args:
----
argv : list[str] | None, optional
Command-line arguments to parse. If None, uses sys.argv[1:].
Returns:
-------
int
Exit code indicating success or failure:
- 0: Success (all steps completed successfully)
- 1: Partial failure (some steps failed but pipeline continued)
- 2: Total failure (pipeline could not complete)
Example:
-------
>>> exit_code = main(["data/raw/", "--publish"])
>>> exit_code
0
"""
# -------------------------------------------------------------------------
# Parse arguments
# -------------------------------------------------------------------------
args = parse_args(argv)
# -------------------------------------------------------------------------
# Run the rebuild pipeline
# -------------------------------------------------------------------------
stats = run_rebuild(
input_dir=args.input_dir,
output_dir=args.output_dir,
publish=args.publish,
force=args.force,
verbose=args.verbose,
quiet=args.quiet,
)
# -------------------------------------------------------------------------
# Determine exit code based on results
# -------------------------------------------------------------------------
if stats.success:
# All steps completed successfully
return EXIT_SUCCESS
# Check if any useful work was done
if stats.total_chunks > 0:
# Some work completed but there were errors
return EXIT_PARTIAL_FAILURE
# Complete failure - no useful output
return EXIT_TOTAL_FAILURE
# =============================================================================
# Script Entry Point
# =============================================================================
if __name__ == "__main__":
sys.exit(main())