| |
| """Build index by processing raw markdown files into semantic chunks with metadata.""" |
|
|
| import argparse |
| import json |
| import sys |
| import traceback |
| from pathlib import Path |
| from typing import Dict, List |
|
|
| from tqdm import tqdm |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from src.processing.chunker import SemanticChunker, ChunkNode |
| from src.processing.metadata_extractor import MetadataExtractor |
| from src.vectorstore.qdrant_store import QdrantStoreManager |
| from src.llm.sentence_transformer_client import SentenceTransformerClient |
| from config.settings import settings |
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.table import Table |
|
|
|
|
| def parse_args(): |
| """Parse command line arguments.""" |
| parser = argparse.ArgumentParser( |
| description="Process raw EyeWiki markdown into semantic chunks with medical metadata", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # Just process files (no vector indexing) |
| python scripts/build_index.py |
| |
| # Process AND build vector index |
| python scripts/build_index.py --index-vectors |
| |
| # Only build vector index from existing processed files |
| python scripts/build_index.py --index-only |
| |
| # Process with custom directories |
| python scripts/build_index.py --input-dir ./my_raw --output-dir ./my_processed |
| |
| # Force rebuild with fresh Qdrant collection |
| python scripts/build_index.py --rebuild --index-vectors --recreate-collection |
| |
| # Process only files matching pattern |
| python scripts/build_index.py --pattern "Glaucoma*.md" --index-vectors |
| |
| # Custom chunking and embedding parameters |
| python scripts/build_index.py --chunk-size 1024 --embedding-batch-size 64 --index-vectors |
| """, |
| ) |
|
|
| parser.add_argument( |
| "--input-dir", |
| type=str, |
| default=None, |
| help=f"Input directory with raw markdown files (default: {settings.data_raw_path})", |
| ) |
|
|
| parser.add_argument( |
| "--output-dir", |
| type=str, |
| default=None, |
| help=f"Output directory for processed chunks (default: {settings.data_processed_path})", |
| ) |
|
|
| parser.add_argument( |
| "--rebuild", |
| action="store_true", |
| help="Force rebuild even if output files exist", |
| ) |
|
|
| parser.add_argument( |
| "--pattern", |
| type=str, |
| default="*.md", |
| help="Glob pattern for files to process (default: *.md)", |
| ) |
|
|
| parser.add_argument( |
| "--chunk-size", |
| type=int, |
| default=None, |
| help=f"Chunk size in tokens (default: {settings.chunk_size})", |
| ) |
|
|
| parser.add_argument( |
| "--chunk-overlap", |
| type=int, |
| default=None, |
| help=f"Chunk overlap in tokens (default: {settings.chunk_overlap})", |
| ) |
|
|
| parser.add_argument( |
| "--min-chunk-size", |
| type=int, |
| default=None, |
| help=f"Minimum chunk size in tokens (default: {settings.min_chunk_size})", |
| ) |
|
|
| parser.add_argument( |
| "--verbose", |
| "-v", |
| action="store_true", |
| help="Enable verbose output with detailed error messages", |
| ) |
|
|
| parser.add_argument( |
| "--index-vectors", |
| action="store_true", |
| help="Build vector index in Qdrant after processing", |
| ) |
|
|
| parser.add_argument( |
| "--index-only", |
| action="store_true", |
| help="Skip processing, only build vector index from existing processed files", |
| ) |
|
|
| parser.add_argument( |
| "--recreate-collection", |
| action="store_true", |
| help="Recreate Qdrant collection (deletes existing data)", |
| ) |
|
|
| parser.add_argument( |
| "--embedding-batch-size", |
| type=int, |
| default=32, |
| help="Batch size for embedding generation (default: 32)", |
| ) |
|
|
| parser.add_argument( |
| "--embedding-model", |
| type=str, |
| default="sentence-transformers/all-mpnet-base-v2", |
| help="Sentence transformer model name (default: all-mpnet-base-v2)", |
| ) |
|
|
| return parser.parse_args() |
|
|
|
|
| def print_banner(console: Console): |
| """Print welcome banner.""" |
| banner = """ |
| [bold cyan]EyeWiki Index Builder[/bold cyan] |
| [dim]Processing pipeline: Markdown � Metadata Extraction � Semantic Chunking � JSON[/dim] |
| """ |
| console.print(Panel(banner, border_style="cyan")) |
|
|
|
|
| def load_markdown_file(md_file: Path) -> tuple[str, Dict]: |
| """ |
| Load markdown content and corresponding JSON metadata. |
| |
| Args: |
| md_file: Path to markdown file |
| |
| Returns: |
| Tuple of (content, metadata) |
| |
| Raises: |
| FileNotFoundError: If JSON metadata file not found |
| ValueError: If content is empty or metadata is invalid |
| """ |
| |
| with open(md_file, "r", encoding="utf-8") as f: |
| content = f.read() |
|
|
| if not content.strip(): |
| raise ValueError("Empty markdown content") |
|
|
| |
| json_file = md_file.with_suffix(".json") |
| if not json_file.exists(): |
| raise FileNotFoundError(f"Metadata file not found: {json_file}") |
|
|
| |
| with open(json_file, "r", encoding="utf-8") as f: |
| metadata = json.load(f) |
|
|
| if not isinstance(metadata, dict): |
| raise ValueError("Invalid metadata format (must be dict)") |
|
|
| return content, metadata |
|
|
|
|
| def process_file( |
| md_file: Path, |
| output_dir: Path, |
| chunker: SemanticChunker, |
| extractor: MetadataExtractor, |
| rebuild: bool = False, |
| verbose: bool = False, |
| ) -> Dict: |
| """ |
| Process a single markdown file through the pipeline. |
| |
| Pipeline: |
| 1. Load markdown and metadata |
| 2. Extract medical metadata |
| 3. Chunk document |
| 4. Save chunks to JSON |
| |
| Args: |
| md_file: Path to markdown file |
| output_dir: Output directory for chunks |
| chunker: SemanticChunker instance |
| extractor: MetadataExtractor instance |
| rebuild: Force rebuild even if output exists |
| verbose: Enable verbose error output |
| |
| Returns: |
| Dictionary with processing results and statistics |
| """ |
| result = { |
| "file": md_file.name, |
| "status": "pending", |
| "chunks_created": 0, |
| "total_tokens": 0, |
| "error": None, |
| } |
|
|
| output_file = output_dir / f"{md_file.stem}_chunks.json" |
|
|
| |
| if output_file.exists() and not rebuild: |
| result["status"] = "skipped" |
| result["error"] = "Output already exists (use --rebuild to force)" |
| return result |
|
|
| try: |
| |
| content, metadata = load_markdown_file(md_file) |
|
|
| |
| enhanced_metadata = extractor.extract(content, metadata) |
|
|
| |
| chunks = chunker.chunk_document(content, enhanced_metadata) |
|
|
| if not chunks: |
| result["status"] = "skipped" |
| result["error"] = "No chunks created (content too small or filtered)" |
| return result |
|
|
| |
| output_dir.mkdir(parents=True, exist_ok=True) |
| with open(output_file, "w", encoding="utf-8") as f: |
| chunk_dicts = [chunk.to_dict() for chunk in chunks] |
| json.dump(chunk_dicts, f, indent=2, ensure_ascii=False) |
|
|
| |
| result["status"] = "success" |
| result["chunks_created"] = len(chunks) |
| result["total_tokens"] = sum(chunk.token_count for chunk in chunks) |
|
|
| except FileNotFoundError as e: |
| result["status"] = "error" |
| result["error"] = f"File not found: {e}" |
| if verbose: |
| result["traceback"] = traceback.format_exc() |
|
|
| except ValueError as e: |
| result["status"] = "error" |
| result["error"] = f"Invalid data: {e}" |
| if verbose: |
| result["traceback"] = traceback.format_exc() |
|
|
| except Exception as e: |
| result["status"] = "error" |
| result["error"] = f"Unexpected error: {e}" |
| if verbose: |
| result["traceback"] = traceback.format_exc() |
|
|
| return result |
|
|
|
|
| def print_statistics(results: List[Dict], console: Console): |
| """ |
| Print processing statistics. |
| |
| Args: |
| results: List of processing results |
| console: Rich console for output |
| """ |
| |
| total_files = len(results) |
| successful = sum(1 for r in results if r["status"] == "success") |
| skipped = sum(1 for r in results if r["status"] == "skipped") |
| errors = sum(1 for r in results if r["status"] == "error") |
|
|
| total_chunks = sum(r["chunks_created"] for r in results) |
| total_tokens = sum(r["total_tokens"] for r in results) |
|
|
| avg_chunks = total_chunks / successful if successful > 0 else 0 |
| avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0 |
| avg_tokens_per_doc = total_tokens / successful if successful > 0 else 0 |
|
|
| |
| table = Table(title="Processing Statistics", border_style="green") |
| table.add_column("Metric", style="cyan", justify="left") |
| table.add_column("Value", style="white", justify="right") |
|
|
| table.add_row("Total Files", f"{total_files:,}") |
| table.add_row("Successfully Processed", f"{successful:,}") |
| table.add_row("Skipped", f"{skipped:,}") |
| table.add_row("Errors", f"{errors:,}") |
| table.add_row("", "") |
| table.add_row("Total Chunks Created", f"{total_chunks:,}") |
| table.add_row("Total Tokens", f"{total_tokens:,}") |
| table.add_row("", "") |
| table.add_row("Avg Chunks per Document", f"{avg_chunks:.1f}") |
| table.add_row("Avg Tokens per Chunk", f"{avg_tokens_per_chunk:.1f}") |
| table.add_row("Avg Tokens per Document", f"{avg_tokens_per_doc:.1f}") |
|
|
| console.print("\n") |
| console.print(table) |
|
|
| |
| error_results = [r for r in results if r["status"] == "error"] |
| if error_results: |
| console.print("\n[yellow]Error Details:[/yellow]") |
| for i, result in enumerate(error_results[:10], 1): |
| console.print(f" {i}. [red]{result['file']}[/red]") |
| console.print(f" [dim]{result['error']}[/dim]") |
| if "traceback" in result: |
| console.print(f" [dim]{result['traceback']}[/dim]") |
|
|
| if len(error_results) > 10: |
| console.print(f" [dim]... and {len(error_results) - 10} more errors[/dim]") |
|
|
| |
| skip_results = [r for r in results if r["status"] == "skipped"] |
| if skip_results and len(skip_results) <= 5: |
| console.print("\n[yellow]Skipped Files:[/yellow]") |
| for i, result in enumerate(skip_results, 1): |
| console.print(f" {i}. {result['file']}: {result['error']}") |
|
|
|
|
| def load_processed_chunks(processed_dir: Path, console: Console) -> List[ChunkNode]: |
| """ |
| Load all processed chunks from JSON files. |
| |
| Args: |
| processed_dir: Directory containing processed chunk JSON files |
| console: Rich console for output |
| |
| Returns: |
| List of ChunkNode objects |
| """ |
| chunk_files = list(processed_dir.glob("*_chunks.json")) |
|
|
| if not chunk_files: |
| console.print(f"[yellow]No processed chunk files found in {processed_dir}[/yellow]") |
| return [] |
|
|
| all_chunks = [] |
|
|
| console.print(f"\n[cyan]Loading processed chunks from {len(chunk_files)} files...[/cyan]") |
|
|
| with tqdm(chunk_files, desc="Loading chunks", unit="file") as pbar: |
| for chunk_file in pbar: |
| try: |
| with open(chunk_file, "r", encoding="utf-8") as f: |
| chunk_dicts = json.load(f) |
|
|
| |
| for chunk_dict in chunk_dicts: |
| chunk = ChunkNode.from_dict(chunk_dict) |
| all_chunks.append(chunk) |
|
|
| pbar.set_postfix({"total_chunks": len(all_chunks)}) |
|
|
| except Exception as e: |
| console.print(f"[red]Error loading {chunk_file.name}: {e}[/red]") |
|
|
| console.print(f"[green]✓[/green] Loaded {len(all_chunks):,} chunks") |
| return all_chunks |
|
|
|
|
| def build_vector_index( |
| chunks: List[ChunkNode], |
| embedding_client: SentenceTransformerClient, |
| qdrant_manager: QdrantStoreManager, |
| batch_size: int, |
| console: Console, |
| ) -> Dict: |
| """ |
| Build vector index by generating embeddings and inserting into Qdrant. |
| |
| Args: |
| chunks: List of ChunkNode objects |
| embedding_client: SentenceTransformerClient for stable embeddings |
| qdrant_manager: QdrantStoreManager for vector storage |
| batch_size: Batch size for embedding generation |
| console: Rich console for output |
| |
| Returns: |
| Dictionary with indexing statistics |
| """ |
| if not chunks: |
| console.print("[yellow]No chunks to index[/yellow]") |
| return {"chunks_indexed": 0, "time_taken": 0} |
|
|
| console.print(f"\n[bold cyan]Building Vector Index[/bold cyan]") |
| console.print(f"Chunks to index: {len(chunks):,}") |
| console.print(f"Embedding batch size: {batch_size}") |
|
|
| import time |
| start_time = time.time() |
|
|
| |
| texts = [chunk.content for chunk in chunks] |
|
|
| |
| console.print("\n[cyan]Generating embeddings...[/cyan]") |
| try: |
| embeddings = embedding_client.embed_batch( |
| texts=texts, |
| batch_size=batch_size, |
| show_progress=True, |
| ) |
| except Exception as e: |
| console.print(f"[red]Failed to generate embeddings: {e}[/red]") |
| raise |
|
|
| |
| console.print("\n[cyan]Inserting into Qdrant...[/cyan]") |
| try: |
| num_added = qdrant_manager.add_documents( |
| chunks=chunks, |
| dense_embeddings=embeddings, |
| ) |
| except Exception as e: |
| console.print(f"[red]Failed to insert into Qdrant: {e}[/red]") |
| raise |
|
|
| elapsed_time = time.time() - start_time |
|
|
| |
| try: |
| collection_info = qdrant_manager.get_collection_info() |
| except Exception as e: |
| console.print(f"[yellow]Could not get collection info: {e}[/yellow]") |
| collection_info = {} |
|
|
| stats = { |
| "chunks_indexed": num_added, |
| "time_taken": elapsed_time, |
| "chunks_per_second": num_added / elapsed_time if elapsed_time > 0 else 0, |
| "collection_info": collection_info, |
| } |
|
|
| return stats |
|
|
|
|
| def print_index_statistics(stats: Dict, console: Console): |
| """ |
| Print vector indexing statistics. |
| |
| Args: |
| stats: Statistics dictionary |
| console: Rich console for output |
| """ |
| table = Table(title="Vector Index Statistics", border_style="green") |
| table.add_column("Metric", style="cyan", justify="left") |
| table.add_column("Value", style="white", justify="right") |
|
|
| table.add_row("Chunks Indexed", f"{stats['chunks_indexed']:,}") |
| table.add_row("Time Taken", f"{stats['time_taken']:.1f}s") |
| table.add_row("Chunks/Second", f"{stats['chunks_per_second']:.1f}") |
|
|
| if "collection_info" in stats and stats["collection_info"]: |
| info = stats["collection_info"] |
| table.add_row("", "") |
| table.add_row("Collection Name", info.get("name", "N/A")) |
| table.add_row("Total Vectors", f"{info.get('vectors_count', 0):,}") |
| table.add_row("Total Points", f"{info.get('points_count', 0):,}") |
| table.add_row("Status", info.get("status", "N/A")) |
|
|
| console.print("\n") |
| console.print(table) |
|
|
|
|
| def main(): |
| """Main entry point for index building.""" |
| args = parse_args() |
| console = Console() |
|
|
| |
| print_banner(console) |
|
|
| |
| input_dir = Path(args.input_dir) if args.input_dir else Path(settings.data_raw_path) |
| output_dir = Path(args.output_dir) if args.output_dir else Path(settings.data_processed_path) |
|
|
| |
| index_only = args.index_only |
| should_index = args.index_vectors or args.index_only |
|
|
| |
| if index_only: |
| console.print("[cyan]Mode:[/cyan] Index only (skip processing)") |
| elif should_index: |
| console.print("[cyan]Mode:[/cyan] Process and build vector index") |
| else: |
| console.print("[cyan]Mode:[/cyan] Process only (no vector indexing)") |
|
|
| |
| if not index_only and not input_dir.exists(): |
| console.print(f"[bold red]Error: Input directory does not exist: {input_dir}[/bold red]") |
| return 1 |
|
|
| |
| if index_only and not output_dir.exists(): |
| console.print(f"[bold red]Error: Output directory does not exist: {output_dir}[/bold red]") |
| console.print("[yellow]Please run processing first without --index-only[/yellow]") |
| return 1 |
|
|
| |
| if not index_only: |
| |
| md_files = list(input_dir.glob(args.pattern)) |
|
|
| if not md_files: |
| console.print(f"[yellow]No files matching pattern '{args.pattern}' found in {input_dir}[/yellow]") |
| return 0 |
|
|
| console.print(f"[cyan]Input directory:[/cyan] {input_dir}") |
| console.print(f"[cyan]Output directory:[/cyan] {output_dir}") |
| console.print(f"[cyan]Files found:[/cyan] {len(md_files)}") |
| console.print(f"[cyan]Pattern:[/cyan] {args.pattern}") |
| console.print(f"[cyan]Rebuild mode:[/cyan] {'Yes' if args.rebuild else 'No'}") |
| else: |
| console.print(f"[cyan]Processed directory:[/cyan] {output_dir}") |
|
|
| |
| results = [] |
|
|
| if not index_only: |
| chunker = SemanticChunker( |
| chunk_size=args.chunk_size if args.chunk_size is not None else settings.chunk_size, |
| chunk_overlap=args.chunk_overlap if args.chunk_overlap is not None else settings.chunk_overlap, |
| min_chunk_size=args.min_chunk_size if args.min_chunk_size is not None else settings.min_chunk_size, |
| ) |
|
|
| extractor = MetadataExtractor() |
|
|
| console.print(f"[cyan]Chunk size:[/cyan] {chunker.chunk_size} tokens") |
| console.print(f"[cyan]Chunk overlap:[/cyan] {chunker.chunk_overlap} tokens") |
| console.print(f"[cyan]Min chunk size:[/cyan] {chunker.min_chunk_size} tokens") |
| console.print() |
|
|
| |
| console.print("[bold cyan]Processing Files...[/bold cyan]\n") |
|
|
| with tqdm( |
| total=len(md_files), |
| desc="Processing", |
| unit="file", |
| ncols=100, |
| bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]", |
| ) as pbar: |
|
|
| for md_file in md_files: |
| |
| pbar.set_description(f"Processing {md_file.name[:30]:30}") |
|
|
| |
| result = process_file( |
| md_file=md_file, |
| output_dir=output_dir, |
| chunker=chunker, |
| extractor=extractor, |
| rebuild=args.rebuild, |
| verbose=args.verbose, |
| ) |
|
|
| results.append(result) |
|
|
| |
| successful = sum(1 for r in results if r["status"] == "success") |
| chunks = sum(r["chunks_created"] for r in results) |
| pbar.set_postfix({"success": successful, "chunks": chunks}) |
|
|
| pbar.update(1) |
|
|
| |
| print_statistics(results, console) |
|
|
| |
| successful = sum(1 for r in results if r["status"] == "success") |
| errors = sum(1 for r in results if r["status"] == "error") |
|
|
| console.print() |
| if errors == 0 and successful > 0: |
| console.print("[bold green]Processing completed successfully![/bold green]") |
| console.print(f"[green]Processed files saved to: {output_dir}[/green]") |
| elif successful > 0: |
| console.print("[bold yellow]Processing completed with some errors.[/bold yellow]") |
| console.print(f"[yellow]Processed files saved to: {output_dir}[/yellow]") |
| else: |
| console.print("[bold red]Processing failed - no files were processed successfully.[/bold red]") |
| if not should_index: |
| return 1 |
|
|
| |
| if should_index: |
| try: |
| |
| console.print("\n[bold cyan]Initializing Sentence Transformers Client...[/bold cyan]") |
| try: |
| embedding_client = SentenceTransformerClient(model_name=args.embedding_model) |
| model_info = embedding_client.get_model_info() |
| console.print(f"[green]✓[/green] Loaded model: {model_info['model_name']}") |
| console.print(f"[green]✓[/green] Device: {model_info['device']}") |
| console.print(f"[green]✓[/green] Embedding dimension: {model_info['embedding_dim']}") |
| except Exception as e: |
| console.print(f"[bold red]Failed to initialize Sentence Transformers: {e}[/bold red]") |
| console.print("[yellow]Install sentence-transformers: pip install sentence-transformers torch[/yellow]") |
| return 1 |
|
|
| |
| console.print("\n[bold cyan]Initializing Qdrant Store...[/bold cyan]") |
| try: |
| qdrant_manager = QdrantStoreManager( |
| url=settings.qdrant_url, |
| api_key=settings.qdrant_api_key, |
| ) |
| qdrant_manager.initialize_collection(recreate=args.recreate_collection) |
| except Exception as e: |
| console.print(f"[bold red]Failed to initialize Qdrant: {e}[/bold red]") |
| return 1 |
|
|
| |
| chunks = load_processed_chunks(output_dir, console) |
|
|
| if not chunks: |
| console.print("[yellow]No chunks to index. Please process documents first.[/yellow]") |
| return 0 |
|
|
| |
| try: |
| index_stats = build_vector_index( |
| chunks=chunks, |
| embedding_client=embedding_client, |
| qdrant_manager=qdrant_manager, |
| batch_size=args.embedding_batch_size, |
| console=console, |
| ) |
|
|
| |
| print_index_statistics(index_stats, console) |
|
|
| console.print("\n[bold green]Vector indexing completed successfully![/bold green]") |
|
|
| except Exception as e: |
| console.print(f"\n[bold red]Vector indexing failed: {e}[/bold red]") |
| if args.verbose: |
| traceback.print_exc() |
| return 1 |
|
|
| except KeyboardInterrupt: |
| console.print("\n[yellow]Indexing interrupted by user (Ctrl+C)[/yellow]") |
| return 130 |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| exit_code = main() |
| sys.exit(exit_code) |
| except KeyboardInterrupt: |
| console = Console() |
| console.print("\n[yellow]Process interrupted by user (Ctrl+C)[/yellow]") |
| sys.exit(130) |
| except Exception as e: |
| console = Console() |
| console.print(f"\n[bold red]Fatal error: {e}[/bold red]") |
| traceback.print_exc() |
| sys.exit(1) |
|
|