|
|
|
|
|
"""Markdown chunking CLI script for the RAG chatbot build pipeline. |
|
|
|
|
|
This script processes Markdown files from an input directory and chunks them |
|
|
into semantically meaningful segments for embedding. It is Step 3.6 of the |
|
|
offline build pipeline: Markdown -> Chunks using structure-aware chunking. |
|
|
|
|
|
Features: |
|
|
- Incremental chunking: Skip files that haven't changed (via manifest hash) |
|
|
- Force overwrite: Re-chunk all files with --force flag |
|
|
- Hash-based detection: MD5 hash comparison for change detection |
|
|
- Progress reporting: Visual progress bar during batch chunking |
|
|
- Verbose mode: Show detailed file names and chunk counts |
|
|
- Quiet mode: Suppress all output except summary |
|
|
- Statistics summary: Display chunking stats including token distribution |
|
|
|
|
|
Exit Codes: |
|
|
0: Success - All files processed successfully (or all skipped) |
|
|
1: Partial failure - Some files failed but some succeeded |
|
|
2: Total failure - No files processed or invalid arguments |
|
|
|
|
|
Example Usage: |
|
|
# Basic chunking |
|
|
poetry run python scripts/chunk.py data/processed/ data/chunks/chunks.jsonl |
|
|
|
|
|
# Force re-chunk all files |
|
|
poetry run python scripts/chunk.py data/processed/ data/chunks/chunks.jsonl --force |
|
|
|
|
|
# Verbose mode (show file names and chunk counts) |
|
|
poetry run python scripts/chunk.py data/processed/ data/chunks/chunks.jsonl -v |
|
|
|
|
|
# Quiet mode (no progress bar, only summary) |
|
|
poetry run python scripts/chunk.py data/processed/ data/chunks/chunks.jsonl -q |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This script uses lazy loading for heavy dependencies (Chunker, ChunkingConfig, |
|
|
TextNormalizer) to ensure fast CLI startup times. |
|
|
|
|
|
The manifest file stores file hashes to enable incremental chunking. It is |
|
|
stored alongside the output JSONL file as `.chunk_manifest.json`. |
|
|
|
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import hashlib |
|
|
import json |
|
|
import sys |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
_PROJECT_ROOT = Path(__file__).parent.parent |
|
|
_ENV_FILE = _PROJECT_ROOT / ".env" |
|
|
|
|
|
if _ENV_FILE.exists(): |
|
|
load_dotenv(_ENV_FILE) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from rag_chatbot.chunking import Chunk, Chunker |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__: list[str] = [ |
|
|
"ChunkingStatistics", |
|
|
"parse_args", |
|
|
"run_chunking", |
|
|
"main", |
|
|
"_get_chunker", |
|
|
"_compute_file_hash", |
|
|
"_should_chunk", |
|
|
"_load_manifest", |
|
|
"_save_manifest", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EXIT_SUCCESS = 0 |
|
|
EXIT_PARTIAL_FAILURE = 1 |
|
|
EXIT_TOTAL_FAILURE = 2 |
|
|
|
|
|
|
|
|
MANIFEST_VERSION = 1 |
|
|
|
|
|
|
|
|
|
|
|
TOKEN_BUCKETS: list[tuple[str, int, int]] = [ |
|
|
("0-100", 0, 100), |
|
|
("100-200", 100, 200), |
|
|
("200-300", 200, 300), |
|
|
("300-400", 300, 400), |
|
|
("400-500", 400, 500), |
|
|
("500-600", 500, 600), |
|
|
("600-700", 600, 700), |
|
|
("700+", 700, float("inf")), |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ChunkingStatistics: |
|
|
"""Statistics from a chunking run. |
|
|
|
|
|
This dataclass tracks metrics from a batch markdown chunking operation, |
|
|
including counts of processed files, token statistics, and timing information. |
|
|
|
|
|
Attributes: |
|
|
---------- |
|
|
total_files : int |
|
|
Total number of Markdown files found in the input directory. |
|
|
Must be non-negative. |
|
|
|
|
|
total_chunks : int |
|
|
Total number of chunks created across all files. |
|
|
Must be non-negative. |
|
|
|
|
|
skipped : int |
|
|
Number of files skipped due to unchanged content (incremental mode). |
|
|
Must be non-negative. |
|
|
|
|
|
failed : int |
|
|
Number of files that failed to chunk due to errors. |
|
|
Must be non-negative. |
|
|
|
|
|
avg_tokens : float |
|
|
Average number of tokens per chunk. |
|
|
Must be non-negative. |
|
|
|
|
|
min_tokens : int |
|
|
Minimum tokens in any single chunk. |
|
|
Must be non-negative. |
|
|
|
|
|
max_tokens : int |
|
|
Maximum tokens in any single chunk. |
|
|
Must be non-negative. |
|
|
|
|
|
elapsed_seconds : float |
|
|
Total time elapsed during chunking in seconds. |
|
|
Must be non-negative. |
|
|
|
|
|
token_distribution : dict[str, int] |
|
|
Histogram of chunk token counts by bucket. |
|
|
Keys are bucket ranges like "0-100", "100-200", etc. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> stats = ChunkingStatistics( |
|
|
... total_files=10, |
|
|
... total_chunks=50, |
|
|
... skipped=2, |
|
|
... failed=1, |
|
|
... avg_tokens=350.0, |
|
|
... min_tokens=100, |
|
|
... max_tokens=600, |
|
|
... elapsed_seconds=5.5, |
|
|
... token_distribution={"0-100": 5, "100-200": 10}, |
|
|
... ) |
|
|
>>> stats.total_files |
|
|
10 |
|
|
|
|
|
""" |
|
|
|
|
|
total_files: int |
|
|
total_chunks: int |
|
|
skipped: int |
|
|
failed: int |
|
|
avg_tokens: float |
|
|
min_tokens: int |
|
|
max_tokens: int |
|
|
elapsed_seconds: float |
|
|
token_distribution: dict[str, int] |
|
|
|
|
|
def __post_init__(self) -> None: |
|
|
"""Validate statistics values after initialization. |
|
|
|
|
|
Raises |
|
|
------ |
|
|
ValueError: If any count is negative. |
|
|
TypeError: If elapsed_seconds is not a number. |
|
|
|
|
|
""" |
|
|
|
|
|
if self.total_files < 0: |
|
|
msg = f"total_files must be non-negative, got {self.total_files}" |
|
|
raise ValueError(msg) |
|
|
if self.total_chunks < 0: |
|
|
msg = f"total_chunks must be non-negative, got {self.total_chunks}" |
|
|
raise ValueError(msg) |
|
|
if self.skipped < 0: |
|
|
msg = f"skipped must be non-negative, got {self.skipped}" |
|
|
raise ValueError(msg) |
|
|
if self.failed < 0: |
|
|
msg = f"failed must be non-negative, got {self.failed}" |
|
|
raise ValueError(msg) |
|
|
if self.avg_tokens < 0: |
|
|
msg = f"avg_tokens must be non-negative, got {self.avg_tokens}" |
|
|
raise ValueError(msg) |
|
|
if self.min_tokens < 0: |
|
|
msg = f"min_tokens must be non-negative, got {self.min_tokens}" |
|
|
raise ValueError(msg) |
|
|
if self.max_tokens < 0: |
|
|
msg = f"max_tokens must be non-negative, got {self.max_tokens}" |
|
|
raise ValueError(msg) |
|
|
if self.elapsed_seconds < 0: |
|
|
msg = f"elapsed_seconds must be non-negative, got {self.elapsed_seconds}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace: |
|
|
"""Parse command-line arguments for the chunking script. |
|
|
|
|
|
This function sets up the argument parser with all supported options |
|
|
and returns the parsed arguments. It handles validation of mutually |
|
|
exclusive flags (--verbose and --quiet cannot be used together). |
|
|
|
|
|
Args: |
|
|
---- |
|
|
argv : list[str] | None, optional |
|
|
Command-line arguments to parse. If None, uses sys.argv[1:]. |
|
|
This parameter enables testing without modifying sys.argv. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
argparse.Namespace |
|
|
Parsed arguments with the following attributes: |
|
|
- input_dir: Path - Directory containing processed markdown files |
|
|
- output_path: Path - Path to output JSONL file |
|
|
- force: bool - Whether to re-chunk all files |
|
|
- verbose: bool - Whether to show detailed output |
|
|
- quiet: bool - Whether to suppress progress output |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
SystemExit |
|
|
If required arguments are missing, unknown arguments are provided, |
|
|
or --verbose and --quiet are both specified. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> args = parse_args(["data/processed/", "out/chunks.jsonl", "--force"]) |
|
|
>>> args.input_dir |
|
|
PosixPath('data/processed') |
|
|
>>> args.force |
|
|
True |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
prog="chunk.py", |
|
|
description=( |
|
|
"Chunk Markdown documents into semantically meaningful segments " |
|
|
"for embedding in the RAG pipeline. Processes all Markdown files " |
|
|
"in the input directory and outputs a single JSONL file." |
|
|
), |
|
|
epilog=( |
|
|
"Examples:\n" |
|
|
" %(prog)s data/processed/ out/chunks.jsonl # Basic chunking\n" |
|
|
" %(prog)s data/processed/ out/chunks.jsonl -f # Force re-chunk\n" |
|
|
" %(prog)s data/processed/ out/chunks.jsonl -v # Verbose output\n" |
|
|
), |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"input_dir", |
|
|
type=Path, |
|
|
help="Directory containing processed Markdown files", |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"output_path", |
|
|
type=Path, |
|
|
help="Path to output JSONL file for chunks", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"--force", |
|
|
"-f", |
|
|
action="store_true", |
|
|
default=False, |
|
|
help="Force re-chunk all files (default: skip unchanged)", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
output_group = parser.add_mutually_exclusive_group() |
|
|
|
|
|
output_group.add_argument( |
|
|
"--verbose", |
|
|
"-v", |
|
|
action="store_true", |
|
|
default=False, |
|
|
help="Show detailed output including file names and chunk counts", |
|
|
) |
|
|
|
|
|
output_group.add_argument( |
|
|
"--quiet", |
|
|
"-q", |
|
|
action="store_true", |
|
|
default=False, |
|
|
help="Suppress progress bar (still shows summary)", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return parser.parse_args(argv) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_chunker() -> Chunker: |
|
|
"""Lazily load and return a configured Chunker instance. |
|
|
|
|
|
This function handles the lazy import of the Chunker class and its |
|
|
dependencies to avoid loading heavy modules at CLI startup time. |
|
|
It creates a Chunker with: |
|
|
- Default ChunkingConfig (min_tokens=450, max_tokens=700) |
|
|
- TextNormalizer for fixing OCR artifacts |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Chunker |
|
|
A configured Chunker instance ready for document processing. |
|
|
|
|
|
""" |
|
|
from rag_chatbot.chunking import Chunker, ChunkingConfig, TextNormalizer |
|
|
|
|
|
|
|
|
config = ChunkingConfig(min_tokens=450, max_tokens=700) |
|
|
|
|
|
|
|
|
normalizer = TextNormalizer() |
|
|
|
|
|
|
|
|
return Chunker(config, normalizer=normalizer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_debug_artifact(path: Path) -> bool: |
|
|
"""Return True if the path is a debug artifact that should be excluded.""" |
|
|
return path.name.endswith(".raw.md") |
|
|
|
|
|
|
|
|
def _compute_file_hash(path: Path) -> str: |
|
|
"""Compute SHA-256 hash of a file's contents. |
|
|
|
|
|
This function reads the file content and computes a SHA-256 hash, |
|
|
returning the first 16 characters for use in change detection. |
|
|
|
|
|
Args: |
|
|
---- |
|
|
path : Path |
|
|
Path to the file to hash. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
str |
|
|
First 16 characters of the SHA-256 hash. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> hash_str = _compute_file_hash(Path("document.md")) |
|
|
>>> len(hash_str) |
|
|
16 |
|
|
|
|
|
""" |
|
|
content = path.read_text(encoding="utf-8") |
|
|
full_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() |
|
|
return full_hash[:16] |
|
|
|
|
|
|
|
|
def _should_chunk(md_path: Path, manifest: dict[str, Any], force: bool) -> bool: |
|
|
"""Determine if a markdown file should be chunked. |
|
|
|
|
|
This function implements the incremental chunking logic by checking: |
|
|
1. If --force is set, always chunk |
|
|
2. If file is not in manifest, chunk (new file) |
|
|
3. If file hash differs from manifest, chunk (modified file) |
|
|
4. Otherwise, skip (file is unchanged) |
|
|
|
|
|
Args: |
|
|
---- |
|
|
md_path : Path |
|
|
Path to the markdown file to potentially chunk. |
|
|
manifest : dict[str, Any] |
|
|
The loaded manifest dictionary with file hashes. |
|
|
force : bool |
|
|
Whether to force chunking regardless of manifest. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
bool |
|
|
True if the file should be chunked, False if it should be skipped. |
|
|
|
|
|
""" |
|
|
|
|
|
if force: |
|
|
return True |
|
|
|
|
|
|
|
|
files = manifest.get("files", {}) |
|
|
file_name = md_path.name |
|
|
|
|
|
|
|
|
if file_name not in files: |
|
|
return True |
|
|
|
|
|
|
|
|
stored_hash: str = str(files[file_name].get("hash", "")) |
|
|
current_hash = _compute_file_hash(md_path) |
|
|
|
|
|
return stored_hash != current_hash |
|
|
|
|
|
|
|
|
def _load_manifest(manifest_path: Path) -> dict[str, Any]: |
|
|
"""Load the manifest file or return an empty manifest. |
|
|
|
|
|
This function attempts to load an existing manifest file. If the file |
|
|
doesn't exist or is corrupted, it returns an empty manifest structure. |
|
|
|
|
|
Args: |
|
|
---- |
|
|
manifest_path : Path |
|
|
Path to the manifest JSON file. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
dict[str, Any] |
|
|
The loaded manifest or an empty manifest structure. |
|
|
|
|
|
""" |
|
|
if not manifest_path.exists(): |
|
|
return {"version": MANIFEST_VERSION, "files": {}} |
|
|
|
|
|
try: |
|
|
with manifest_path.open("r", encoding="utf-8") as f: |
|
|
manifest = json.load(f) |
|
|
|
|
|
if not isinstance(manifest, dict): |
|
|
return {"version": MANIFEST_VERSION, "files": {}} |
|
|
if "files" not in manifest: |
|
|
manifest["files"] = {} |
|
|
return manifest |
|
|
except (json.JSONDecodeError, OSError): |
|
|
|
|
|
return {"version": MANIFEST_VERSION, "files": {}} |
|
|
|
|
|
|
|
|
def _save_manifest(manifest_path: Path, manifest: dict[str, Any]) -> None: |
|
|
"""Save the manifest to a JSON file. |
|
|
|
|
|
Args: |
|
|
---- |
|
|
manifest_path : Path |
|
|
Path to the manifest JSON file. |
|
|
manifest : dict[str, Any] |
|
|
The manifest data to save. |
|
|
|
|
|
""" |
|
|
|
|
|
manifest_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with manifest_path.open("w", encoding="utf-8") as f: |
|
|
json.dump(manifest, f, indent=2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_token_distribution(token_counts: list[int]) -> dict[str, int]: |
|
|
"""Build a histogram of token counts by bucket. |
|
|
|
|
|
Args: |
|
|
---- |
|
|
token_counts : list[int] |
|
|
List of token counts from all chunks. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
dict[str, int] |
|
|
Dictionary mapping bucket labels to counts. |
|
|
|
|
|
""" |
|
|
distribution: dict[str, int] = {bucket[0]: 0 for bucket in TOKEN_BUCKETS} |
|
|
|
|
|
for count in token_counts: |
|
|
for label, low, high in TOKEN_BUCKETS: |
|
|
if low <= count < high: |
|
|
distribution[label] += 1 |
|
|
break |
|
|
|
|
|
return distribution |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _print_summary(stats: ChunkingStatistics) -> None: |
|
|
"""Print the chunking summary statistics. |
|
|
|
|
|
Displays a formatted summary of the chunking run including counts |
|
|
of processed files, chunk statistics, and timing information. |
|
|
|
|
|
Args: |
|
|
---- |
|
|
stats : ChunkingStatistics |
|
|
The statistics from the chunking run. |
|
|
|
|
|
""" |
|
|
print() |
|
|
print("Chunking Complete") |
|
|
print("=" * 45) |
|
|
print(f"Total files: {stats.total_files:>6}") |
|
|
print(f"Files processed: {stats.total_files - stats.skipped - stats.failed:>6}") |
|
|
print(f"Files skipped: {stats.skipped:>6}") |
|
|
print(f"Files failed: {stats.failed:>6}") |
|
|
print("-" * 45) |
|
|
print(f"Total chunks: {stats.total_chunks:>6}") |
|
|
print(f"Avg tokens/chunk: {stats.avg_tokens:>6.1f}") |
|
|
print(f"Min tokens: {stats.min_tokens:>6}") |
|
|
print(f"Max tokens: {stats.max_tokens:>6}") |
|
|
print("-" * 45) |
|
|
print("Token Distribution:") |
|
|
for bucket_label, count in stats.token_distribution.items(): |
|
|
if count > 0: |
|
|
print(f" {bucket_label:>12}: {count:>6} chunks") |
|
|
print("-" * 45) |
|
|
print(f"Elapsed time: {stats.elapsed_seconds:>6.2f}s") |
|
|
print("=" * 45) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_chunking( |
|
|
input_dir: Path, |
|
|
output_path: Path, |
|
|
force: bool, |
|
|
verbose: bool, |
|
|
quiet: bool, |
|
|
) -> ChunkingStatistics: |
|
|
"""Run the chunking process on all Markdown files in the input directory. |
|
|
|
|
|
This function is the core chunking logic. It: |
|
|
1. Finds all Markdown files in the input directory |
|
|
2. Determines which files need chunking (incremental or force) |
|
|
3. Chunks each file using the structure-aware Chunker |
|
|
4. Writes output to a single JSONL file |
|
|
5. Updates the manifest with file hashes |
|
|
6. Tracks and returns chunking statistics |
|
|
|
|
|
Args: |
|
|
---- |
|
|
input_dir : Path |
|
|
Directory containing Markdown files to chunk. Must exist. |
|
|
output_path : Path |
|
|
Path to the output JSONL file. Created if needed. |
|
|
force : bool |
|
|
If True, re-chunk all files. If False, skip files that |
|
|
haven't changed according to the manifest. |
|
|
verbose : bool |
|
|
If True, print detailed information including file names. |
|
|
quiet : bool |
|
|
If True, suppress progress bar (but still print summary). |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
ChunkingStatistics |
|
|
Statistics about the chunking run including counts and timing. |
|
|
|
|
|
Note: |
|
|
---- |
|
|
The function handles errors gracefully, continuing to process remaining |
|
|
files if one fails. Failed files are logged and counted in statistics. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
start_time = time.perf_counter() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
manifest_path = output_path.parent / ".chunk_manifest.json" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
except PermissionError: |
|
|
|
|
|
print( |
|
|
f"Error: Permission denied creating output directory: {output_path.parent}", |
|
|
file=sys.stderr, |
|
|
) |
|
|
elapsed = time.perf_counter() - start_time |
|
|
return ChunkingStatistics( |
|
|
total_files=0, |
|
|
total_chunks=0, |
|
|
skipped=0, |
|
|
failed=0, |
|
|
avg_tokens=0.0, |
|
|
min_tokens=0, |
|
|
max_tokens=0, |
|
|
elapsed_seconds=elapsed, |
|
|
token_distribution={}, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_md_files = sorted([f for f in input_dir.glob("*.md") if f.is_file()]) |
|
|
md_files = [f for f in all_md_files if not _is_debug_artifact(f)] |
|
|
skipped_debug = len(all_md_files) - len(md_files) |
|
|
if skipped_debug > 0 and verbose and not quiet: |
|
|
print(f"Skipping {skipped_debug} debug artifact(s) (*.raw.md)") |
|
|
total_files = len(md_files) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if total_files == 0: |
|
|
elapsed = time.perf_counter() - start_time |
|
|
return ChunkingStatistics( |
|
|
total_files=0, |
|
|
total_chunks=0, |
|
|
skipped=0, |
|
|
failed=0, |
|
|
avg_tokens=0.0, |
|
|
min_tokens=0, |
|
|
max_tokens=0, |
|
|
elapsed_seconds=elapsed, |
|
|
token_distribution={}, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
skipped_count = 0 |
|
|
failed_count = 0 |
|
|
all_chunks: list[Chunk] = [] |
|
|
all_token_counts: list[int] = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
manifest = _load_manifest(manifest_path) |
|
|
new_manifest: dict[str, Any] = {"version": MANIFEST_VERSION, "files": {}} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
existing_chunks_by_source: dict[str, list[dict[str, Any]]] = {} |
|
|
if output_path.exists() and not force: |
|
|
try: |
|
|
with output_path.open("r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
if line.strip(): |
|
|
try: |
|
|
chunk_data = json.loads(line) |
|
|
source = chunk_data.get("source", "") |
|
|
if source not in existing_chunks_by_source: |
|
|
existing_chunks_by_source[source] = [] |
|
|
existing_chunks_by_source[source].append(chunk_data) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
except OSError: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chunker: Chunker | None = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not quiet: |
|
|
try: |
|
|
from tqdm import tqdm |
|
|
|
|
|
progress_bar = tqdm( |
|
|
md_files, |
|
|
desc="Chunking", |
|
|
unit="file", |
|
|
disable=False, |
|
|
) |
|
|
except ImportError: |
|
|
|
|
|
progress_bar = md_files |
|
|
print(f"Processing {total_files} Markdown files...") |
|
|
else: |
|
|
progress_bar = md_files |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for md_path in progress_bar: |
|
|
file_name = md_path.name |
|
|
|
|
|
|
|
|
if not _should_chunk(md_path, manifest, force): |
|
|
|
|
|
skipped_count += 1 |
|
|
|
|
|
|
|
|
if file_name in manifest.get("files", {}): |
|
|
new_manifest["files"][file_name] = manifest["files"][file_name] |
|
|
|
|
|
|
|
|
if file_name in existing_chunks_by_source: |
|
|
for chunk_data in existing_chunks_by_source[file_name]: |
|
|
|
|
|
from rag_chatbot.chunking import Chunk |
|
|
|
|
|
try: |
|
|
chunk = Chunk(**chunk_data) |
|
|
all_chunks.append(chunk) |
|
|
all_token_counts.append(chunk.token_count) |
|
|
except (TypeError, ValueError): |
|
|
continue |
|
|
|
|
|
if verbose: |
|
|
print(f" Skipping (unchanged): {file_name}") |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
if verbose: |
|
|
print(f" Processing: {file_name}") |
|
|
|
|
|
|
|
|
if chunker is None: |
|
|
chunker = _get_chunker() |
|
|
|
|
|
|
|
|
content = md_path.read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
|
|
|
chunks = chunker.chunk_document(content, file_name, 1) |
|
|
|
|
|
|
|
|
all_chunks.extend(chunks) |
|
|
for chunk in chunks: |
|
|
all_token_counts.append(chunk.token_count) |
|
|
|
|
|
|
|
|
file_hash = _compute_file_hash(md_path) |
|
|
new_manifest["files"][file_name] = { |
|
|
"hash": file_hash, |
|
|
"chunks": len(chunks), |
|
|
"last_modified": time.time(), |
|
|
} |
|
|
|
|
|
if verbose: |
|
|
print(f" Created {len(chunks)} chunks") |
|
|
|
|
|
except PermissionError as e: |
|
|
|
|
|
failed_count += 1 |
|
|
print(f"Error: Permission denied for {file_name}: {e}", file=sys.stderr) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
failed_count += 1 |
|
|
print(f"Error: Failed to chunk {file_name}: {e}", file=sys.stderr) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if all_chunks: |
|
|
try: |
|
|
with output_path.open("w", encoding="utf-8") as f: |
|
|
for chunk in all_chunks: |
|
|
json_line = json.dumps(chunk.to_jsonl_dict(), ensure_ascii=False) |
|
|
f.write(json_line + "\n") |
|
|
except OSError as e: |
|
|
print(f"Error: Failed to write output file: {e}", file=sys.stderr) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_save_manifest(manifest_path, new_manifest) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elapsed = time.perf_counter() - start_time |
|
|
total_chunks = len(all_chunks) |
|
|
|
|
|
if all_token_counts: |
|
|
avg_tokens = sum(all_token_counts) / len(all_token_counts) |
|
|
min_tokens = min(all_token_counts) |
|
|
max_tokens = max(all_token_counts) |
|
|
else: |
|
|
avg_tokens = 0.0 |
|
|
min_tokens = 0 |
|
|
max_tokens = 0 |
|
|
|
|
|
token_distribution = _build_token_distribution(all_token_counts) |
|
|
|
|
|
stats = ChunkingStatistics( |
|
|
total_files=total_files, |
|
|
total_chunks=total_chunks, |
|
|
skipped=skipped_count, |
|
|
failed=failed_count, |
|
|
avg_tokens=avg_tokens, |
|
|
min_tokens=min_tokens, |
|
|
max_tokens=max_tokens, |
|
|
elapsed_seconds=elapsed, |
|
|
token_distribution=token_distribution, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not quiet: |
|
|
_print_summary(stats) |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int: |
|
|
"""Execute the chunking CLI script. |
|
|
|
|
|
This function orchestrates the entire chunking process: |
|
|
1. Parses command-line arguments |
|
|
2. Validates input directory existence |
|
|
3. Runs the chunking process |
|
|
4. Returns appropriate exit code |
|
|
|
|
|
Args: |
|
|
---- |
|
|
argv : list[str] | None, optional |
|
|
Command-line arguments to parse. If None, uses sys.argv[1:]. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
int |
|
|
Exit code indicating success or failure: |
|
|
- 0: Success (all files processed or skipped) |
|
|
- 1: Partial failure (some files failed) |
|
|
- 2: Total failure (no files processed or invalid input) |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> exit_code = main(["data/processed/", "data/chunks/chunks.jsonl"]) |
|
|
>>> exit_code |
|
|
0 |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
args = parse_args(argv) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not args.input_dir.exists(): |
|
|
print( |
|
|
f"Error: Input directory does not exist: {args.input_dir}", |
|
|
file=sys.stderr, |
|
|
) |
|
|
return EXIT_TOTAL_FAILURE |
|
|
|
|
|
if not args.input_dir.is_dir(): |
|
|
print( |
|
|
f"Error: Input path is not a directory: {args.input_dir}", |
|
|
file=sys.stderr, |
|
|
) |
|
|
return EXIT_TOTAL_FAILURE |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
md_files = [f for f in args.input_dir.glob("*.md") if not _is_debug_artifact(f)] |
|
|
if args.verbose and args.quiet: |
|
|
pass |
|
|
elif args.verbose: |
|
|
skipped_debug = len(list(args.input_dir.glob("*.md"))) - len(md_files) |
|
|
if skipped_debug > 0: |
|
|
print(f"Skipping {skipped_debug} debug artifact(s) (*.raw.md)") |
|
|
if not md_files: |
|
|
print( |
|
|
f"Error: No markdown files found in {args.input_dir}", |
|
|
file=sys.stderr, |
|
|
) |
|
|
return EXIT_TOTAL_FAILURE |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stats = run_chunking( |
|
|
input_dir=args.input_dir, |
|
|
output_path=args.output_path, |
|
|
force=args.force, |
|
|
verbose=args.verbose, |
|
|
quiet=args.quiet, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if stats.total_files == 0 and len(md_files) > 0: |
|
|
return EXIT_TOTAL_FAILURE |
|
|
|
|
|
|
|
|
if stats.failed == 0: |
|
|
return EXIT_SUCCESS |
|
|
|
|
|
|
|
|
processed = stats.total_files - stats.skipped - stats.failed |
|
|
if processed > 0 or stats.skipped > 0: |
|
|
return EXIT_PARTIAL_FAILURE |
|
|
|
|
|
|
|
|
return EXIT_TOTAL_FAILURE |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|