eye-wiki / scripts /build_index.py
stanleydukor's picture
Fix Qdrant Cloud connection logic
1e05ebd
#!/usr/bin/env python3
"""Build index by processing raw markdown files into semantic chunks with metadata."""
import argparse
import json
import sys
import traceback
from pathlib import Path
from typing import Dict, List
from tqdm import tqdm
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.processing.chunker import SemanticChunker, ChunkNode
from src.processing.metadata_extractor import MetadataExtractor
from src.vectorstore.qdrant_store import QdrantStoreManager
from src.llm.sentence_transformer_client import SentenceTransformerClient
from config.settings import settings
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Process raw EyeWiki markdown into semantic chunks with medical metadata",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Just process files (no vector indexing)
python scripts/build_index.py
# Process AND build vector index
python scripts/build_index.py --index-vectors
# Only build vector index from existing processed files
python scripts/build_index.py --index-only
# Process with custom directories
python scripts/build_index.py --input-dir ./my_raw --output-dir ./my_processed
# Force rebuild with fresh Qdrant collection
python scripts/build_index.py --rebuild --index-vectors --recreate-collection
# Process only files matching pattern
python scripts/build_index.py --pattern "Glaucoma*.md" --index-vectors
# Custom chunking and embedding parameters
python scripts/build_index.py --chunk-size 1024 --embedding-batch-size 64 --index-vectors
""",
)
parser.add_argument(
"--input-dir",
type=str,
default=None,
help=f"Input directory with raw markdown files (default: {settings.data_raw_path})",
)
parser.add_argument(
"--output-dir",
type=str,
default=None,
help=f"Output directory for processed chunks (default: {settings.data_processed_path})",
)
parser.add_argument(
"--rebuild",
action="store_true",
help="Force rebuild even if output files exist",
)
parser.add_argument(
"--pattern",
type=str,
default="*.md",
help="Glob pattern for files to process (default: *.md)",
)
parser.add_argument(
"--chunk-size",
type=int,
default=None,
help=f"Chunk size in tokens (default: {settings.chunk_size})",
)
parser.add_argument(
"--chunk-overlap",
type=int,
default=None,
help=f"Chunk overlap in tokens (default: {settings.chunk_overlap})",
)
parser.add_argument(
"--min-chunk-size",
type=int,
default=None,
help=f"Minimum chunk size in tokens (default: {settings.min_chunk_size})",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Enable verbose output with detailed error messages",
)
parser.add_argument(
"--index-vectors",
action="store_true",
help="Build vector index in Qdrant after processing",
)
parser.add_argument(
"--index-only",
action="store_true",
help="Skip processing, only build vector index from existing processed files",
)
parser.add_argument(
"--recreate-collection",
action="store_true",
help="Recreate Qdrant collection (deletes existing data)",
)
parser.add_argument(
"--embedding-batch-size",
type=int,
default=32,
help="Batch size for embedding generation (default: 32)",
)
parser.add_argument(
"--embedding-model",
type=str,
default="sentence-transformers/all-mpnet-base-v2",
help="Sentence transformer model name (default: all-mpnet-base-v2)",
)
return parser.parse_args()
def print_banner(console: Console):
"""Print welcome banner."""
banner = """
[bold cyan]EyeWiki Index Builder[/bold cyan]
[dim]Processing pipeline: Markdown � Metadata Extraction � Semantic Chunking � JSON[/dim]
"""
console.print(Panel(banner, border_style="cyan"))
def load_markdown_file(md_file: Path) -> tuple[str, Dict]:
"""
Load markdown content and corresponding JSON metadata.
Args:
md_file: Path to markdown file
Returns:
Tuple of (content, metadata)
Raises:
FileNotFoundError: If JSON metadata file not found
ValueError: If content is empty or metadata is invalid
"""
# Read markdown content
with open(md_file, "r", encoding="utf-8") as f:
content = f.read()
if not content.strip():
raise ValueError("Empty markdown content")
# Look for corresponding JSON metadata
json_file = md_file.with_suffix(".json")
if not json_file.exists():
raise FileNotFoundError(f"Metadata file not found: {json_file}")
# Read metadata
with open(json_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
if not isinstance(metadata, dict):
raise ValueError("Invalid metadata format (must be dict)")
return content, metadata
def process_file(
md_file: Path,
output_dir: Path,
chunker: SemanticChunker,
extractor: MetadataExtractor,
rebuild: bool = False,
verbose: bool = False,
) -> Dict:
"""
Process a single markdown file through the pipeline.
Pipeline:
1. Load markdown and metadata
2. Extract medical metadata
3. Chunk document
4. Save chunks to JSON
Args:
md_file: Path to markdown file
output_dir: Output directory for chunks
chunker: SemanticChunker instance
extractor: MetadataExtractor instance
rebuild: Force rebuild even if output exists
verbose: Enable verbose error output
Returns:
Dictionary with processing results and statistics
"""
result = {
"file": md_file.name,
"status": "pending",
"chunks_created": 0,
"total_tokens": 0,
"error": None,
}
output_file = output_dir / f"{md_file.stem}_chunks.json"
# Check if output already exists
if output_file.exists() and not rebuild:
result["status"] = "skipped"
result["error"] = "Output already exists (use --rebuild to force)"
return result
try:
# Step 1: Load file
content, metadata = load_markdown_file(md_file)
# Step 2: Extract medical metadata
enhanced_metadata = extractor.extract(content, metadata)
# Step 3: Chunk document
chunks = chunker.chunk_document(content, enhanced_metadata)
if not chunks:
result["status"] = "skipped"
result["error"] = "No chunks created (content too small or filtered)"
return result
# Step 4: Save chunks to JSON
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
chunk_dicts = [chunk.to_dict() for chunk in chunks]
json.dump(chunk_dicts, f, indent=2, ensure_ascii=False)
# Update result
result["status"] = "success"
result["chunks_created"] = len(chunks)
result["total_tokens"] = sum(chunk.token_count for chunk in chunks)
except FileNotFoundError as e:
result["status"] = "error"
result["error"] = f"File not found: {e}"
if verbose:
result["traceback"] = traceback.format_exc()
except ValueError as e:
result["status"] = "error"
result["error"] = f"Invalid data: {e}"
if verbose:
result["traceback"] = traceback.format_exc()
except Exception as e:
result["status"] = "error"
result["error"] = f"Unexpected error: {e}"
if verbose:
result["traceback"] = traceback.format_exc()
return result
def print_statistics(results: List[Dict], console: Console):
"""
Print processing statistics.
Args:
results: List of processing results
console: Rich console for output
"""
# Calculate statistics
total_files = len(results)
successful = sum(1 for r in results if r["status"] == "success")
skipped = sum(1 for r in results if r["status"] == "skipped")
errors = sum(1 for r in results if r["status"] == "error")
total_chunks = sum(r["chunks_created"] for r in results)
total_tokens = sum(r["total_tokens"] for r in results)
avg_chunks = total_chunks / successful if successful > 0 else 0
avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
avg_tokens_per_doc = total_tokens / successful if successful > 0 else 0
# Create statistics table
table = Table(title="Processing Statistics", border_style="green")
table.add_column("Metric", style="cyan", justify="left")
table.add_column("Value", style="white", justify="right")
table.add_row("Total Files", f"{total_files:,}")
table.add_row("Successfully Processed", f"{successful:,}")
table.add_row("Skipped", f"{skipped:,}")
table.add_row("Errors", f"{errors:,}")
table.add_row("", "") # Separator
table.add_row("Total Chunks Created", f"{total_chunks:,}")
table.add_row("Total Tokens", f"{total_tokens:,}")
table.add_row("", "") # Separator
table.add_row("Avg Chunks per Document", f"{avg_chunks:.1f}")
table.add_row("Avg Tokens per Chunk", f"{avg_tokens_per_chunk:.1f}")
table.add_row("Avg Tokens per Document", f"{avg_tokens_per_doc:.1f}")
console.print("\n")
console.print(table)
# Show error details if any
error_results = [r for r in results if r["status"] == "error"]
if error_results:
console.print("\n[yellow]Error Details:[/yellow]")
for i, result in enumerate(error_results[:10], 1):
console.print(f" {i}. [red]{result['file']}[/red]")
console.print(f" [dim]{result['error']}[/dim]")
if "traceback" in result:
console.print(f" [dim]{result['traceback']}[/dim]")
if len(error_results) > 10:
console.print(f" [dim]... and {len(error_results) - 10} more errors[/dim]")
# Show skipped details if any
skip_results = [r for r in results if r["status"] == "skipped"]
if skip_results and len(skip_results) <= 5:
console.print("\n[yellow]Skipped Files:[/yellow]")
for i, result in enumerate(skip_results, 1):
console.print(f" {i}. {result['file']}: {result['error']}")
def load_processed_chunks(processed_dir: Path, console: Console) -> List[ChunkNode]:
"""
Load all processed chunks from JSON files.
Args:
processed_dir: Directory containing processed chunk JSON files
console: Rich console for output
Returns:
List of ChunkNode objects
"""
chunk_files = list(processed_dir.glob("*_chunks.json"))
if not chunk_files:
console.print(f"[yellow]No processed chunk files found in {processed_dir}[/yellow]")
return []
all_chunks = []
console.print(f"\n[cyan]Loading processed chunks from {len(chunk_files)} files...[/cyan]")
with tqdm(chunk_files, desc="Loading chunks", unit="file") as pbar:
for chunk_file in pbar:
try:
with open(chunk_file, "r", encoding="utf-8") as f:
chunk_dicts = json.load(f)
# Convert dicts to ChunkNode objects
for chunk_dict in chunk_dicts:
chunk = ChunkNode.from_dict(chunk_dict)
all_chunks.append(chunk)
pbar.set_postfix({"total_chunks": len(all_chunks)})
except Exception as e:
console.print(f"[red]Error loading {chunk_file.name}: {e}[/red]")
console.print(f"[green]✓[/green] Loaded {len(all_chunks):,} chunks")
return all_chunks
def build_vector_index(
chunks: List[ChunkNode],
embedding_client: SentenceTransformerClient,
qdrant_manager: QdrantStoreManager,
batch_size: int,
console: Console,
) -> Dict:
"""
Build vector index by generating embeddings and inserting into Qdrant.
Args:
chunks: List of ChunkNode objects
embedding_client: SentenceTransformerClient for stable embeddings
qdrant_manager: QdrantStoreManager for vector storage
batch_size: Batch size for embedding generation
console: Rich console for output
Returns:
Dictionary with indexing statistics
"""
if not chunks:
console.print("[yellow]No chunks to index[/yellow]")
return {"chunks_indexed": 0, "time_taken": 0}
console.print(f"\n[bold cyan]Building Vector Index[/bold cyan]")
console.print(f"Chunks to index: {len(chunks):,}")
console.print(f"Embedding batch size: {batch_size}")
import time
start_time = time.time()
# Extract text content for embedding
texts = [chunk.content for chunk in chunks]
# Generate embeddings with progress bar
console.print("\n[cyan]Generating embeddings...[/cyan]")
try:
embeddings = embedding_client.embed_batch(
texts=texts,
batch_size=batch_size,
show_progress=True,
)
except Exception as e:
console.print(f"[red]Failed to generate embeddings: {e}[/red]")
raise
# Insert into Qdrant
console.print("\n[cyan]Inserting into Qdrant...[/cyan]")
try:
num_added = qdrant_manager.add_documents(
chunks=chunks,
dense_embeddings=embeddings,
)
except Exception as e:
console.print(f"[red]Failed to insert into Qdrant: {e}[/red]")
raise
elapsed_time = time.time() - start_time
# Get collection info
try:
collection_info = qdrant_manager.get_collection_info()
except Exception as e:
console.print(f"[yellow]Could not get collection info: {e}[/yellow]")
collection_info = {}
stats = {
"chunks_indexed": num_added,
"time_taken": elapsed_time,
"chunks_per_second": num_added / elapsed_time if elapsed_time > 0 else 0,
"collection_info": collection_info,
}
return stats
def print_index_statistics(stats: Dict, console: Console):
"""
Print vector indexing statistics.
Args:
stats: Statistics dictionary
console: Rich console for output
"""
table = Table(title="Vector Index Statistics", border_style="green")
table.add_column("Metric", style="cyan", justify="left")
table.add_column("Value", style="white", justify="right")
table.add_row("Chunks Indexed", f"{stats['chunks_indexed']:,}")
table.add_row("Time Taken", f"{stats['time_taken']:.1f}s")
table.add_row("Chunks/Second", f"{stats['chunks_per_second']:.1f}")
if "collection_info" in stats and stats["collection_info"]:
info = stats["collection_info"]
table.add_row("", "") # Separator
table.add_row("Collection Name", info.get("name", "N/A"))
table.add_row("Total Vectors", f"{info.get('vectors_count', 0):,}")
table.add_row("Total Points", f"{info.get('points_count', 0):,}")
table.add_row("Status", info.get("status", "N/A"))
console.print("\n")
console.print(table)
def main():
"""Main entry point for index building."""
args = parse_args()
console = Console()
# Print banner
print_banner(console)
# Prepare directories
input_dir = Path(args.input_dir) if args.input_dir else Path(settings.data_raw_path)
output_dir = Path(args.output_dir) if args.output_dir else Path(settings.data_processed_path)
# Check mode
index_only = args.index_only
should_index = args.index_vectors or args.index_only
# Print mode
if index_only:
console.print("[cyan]Mode:[/cyan] Index only (skip processing)")
elif should_index:
console.print("[cyan]Mode:[/cyan] Process and build vector index")
else:
console.print("[cyan]Mode:[/cyan] Process only (no vector indexing)")
# Validate input directory (only needed if not index-only)
if not index_only and not input_dir.exists():
console.print(f"[bold red]Error: Input directory does not exist: {input_dir}[/bold red]")
return 1
# Validate output directory exists (needed for index-only)
if index_only and not output_dir.exists():
console.print(f"[bold red]Error: Output directory does not exist: {output_dir}[/bold red]")
console.print("[yellow]Please run processing first without --index-only[/yellow]")
return 1
# Print configuration
if not index_only:
# Find all markdown files
md_files = list(input_dir.glob(args.pattern))
if not md_files:
console.print(f"[yellow]No files matching pattern '{args.pattern}' found in {input_dir}[/yellow]")
return 0
console.print(f"[cyan]Input directory:[/cyan] {input_dir}")
console.print(f"[cyan]Output directory:[/cyan] {output_dir}")
console.print(f"[cyan]Files found:[/cyan] {len(md_files)}")
console.print(f"[cyan]Pattern:[/cyan] {args.pattern}")
console.print(f"[cyan]Rebuild mode:[/cyan] {'Yes' if args.rebuild else 'No'}")
else:
console.print(f"[cyan]Processed directory:[/cyan] {output_dir}")
# Initialize components (only if processing)
results = []
if not index_only:
chunker = SemanticChunker(
chunk_size=args.chunk_size if args.chunk_size is not None else settings.chunk_size,
chunk_overlap=args.chunk_overlap if args.chunk_overlap is not None else settings.chunk_overlap,
min_chunk_size=args.min_chunk_size if args.min_chunk_size is not None else settings.min_chunk_size,
)
extractor = MetadataExtractor()
console.print(f"[cyan]Chunk size:[/cyan] {chunker.chunk_size} tokens")
console.print(f"[cyan]Chunk overlap:[/cyan] {chunker.chunk_overlap} tokens")
console.print(f"[cyan]Min chunk size:[/cyan] {chunker.min_chunk_size} tokens")
console.print()
# Process files with progress bar
console.print("[bold cyan]Processing Files...[/bold cyan]\n")
with tqdm(
total=len(md_files),
desc="Processing",
unit="file",
ncols=100,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]",
) as pbar:
for md_file in md_files:
# Update progress bar description
pbar.set_description(f"Processing {md_file.name[:30]:30}")
# Process file
result = process_file(
md_file=md_file,
output_dir=output_dir,
chunker=chunker,
extractor=extractor,
rebuild=args.rebuild,
verbose=args.verbose,
)
results.append(result)
# Update progress bar postfix with running stats
successful = sum(1 for r in results if r["status"] == "success")
chunks = sum(r["chunks_created"] for r in results)
pbar.set_postfix({"success": successful, "chunks": chunks})
pbar.update(1)
# Print statistics
print_statistics(results, console)
# Check processing status
successful = sum(1 for r in results if r["status"] == "success")
errors = sum(1 for r in results if r["status"] == "error")
console.print()
if errors == 0 and successful > 0:
console.print("[bold green]Processing completed successfully![/bold green]")
console.print(f"[green]Processed files saved to: {output_dir}[/green]")
elif successful > 0:
console.print("[bold yellow]Processing completed with some errors.[/bold yellow]")
console.print(f"[yellow]Processed files saved to: {output_dir}[/yellow]")
else:
console.print("[bold red]Processing failed - no files were processed successfully.[/bold red]")
if not should_index:
return 1
# Vector indexing phase
if should_index:
try:
# Initialize embedding client with sentence-transformers
console.print("\n[bold cyan]Initializing Sentence Transformers Client...[/bold cyan]")
try:
embedding_client = SentenceTransformerClient(model_name=args.embedding_model)
model_info = embedding_client.get_model_info()
console.print(f"[green]✓[/green] Loaded model: {model_info['model_name']}")
console.print(f"[green]✓[/green] Device: {model_info['device']}")
console.print(f"[green]✓[/green] Embedding dimension: {model_info['embedding_dim']}")
except Exception as e:
console.print(f"[bold red]Failed to initialize Sentence Transformers: {e}[/bold red]")
console.print("[yellow]Install sentence-transformers: pip install sentence-transformers torch[/yellow]")
return 1
# Initialize Qdrant store
console.print("\n[bold cyan]Initializing Qdrant Store...[/bold cyan]")
try:
qdrant_manager = QdrantStoreManager(
url=settings.qdrant_url,
api_key=settings.qdrant_api_key,
)
qdrant_manager.initialize_collection(recreate=args.recreate_collection)
except Exception as e:
console.print(f"[bold red]Failed to initialize Qdrant: {e}[/bold red]")
return 1
# Load processed chunks
chunks = load_processed_chunks(output_dir, console)
if not chunks:
console.print("[yellow]No chunks to index. Please process documents first.[/yellow]")
return 0
# Build vector index
try:
index_stats = build_vector_index(
chunks=chunks,
embedding_client=embedding_client,
qdrant_manager=qdrant_manager,
batch_size=args.embedding_batch_size,
console=console,
)
# Print index statistics
print_index_statistics(index_stats, console)
console.print("\n[bold green]Vector indexing completed successfully![/bold green]")
except Exception as e:
console.print(f"\n[bold red]Vector indexing failed: {e}[/bold red]")
if args.verbose:
traceback.print_exc()
return 1
except KeyboardInterrupt:
console.print("\n[yellow]Indexing interrupted by user (Ctrl+C)[/yellow]")
return 130
return 0
if __name__ == "__main__":
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
console = Console()
console.print("\n[yellow]Process interrupted by user (Ctrl+C)[/yellow]")
sys.exit(130)
except Exception as e:
console = Console()
console.print(f"\n[bold red]Fatal error: {e}[/bold red]")
traceback.print_exc()
sys.exit(1)