| | """ |
| | RAG CLI Commands |
| | |
| | Commands: |
| | sparknet rag index <file> - Index document for retrieval |
| | sparknet rag search <query> - Search indexed documents |
| | sparknet rag ask <question> - Answer question using RAG |
| | sparknet rag status - Show index status |
| | """ |
| |
|
| | import typer |
| | from typing import Optional, List |
| | from pathlib import Path |
| | import json |
| | import sys |
| |
|
| | |
| | rag_app = typer.Typer( |
| | name="rag", |
| | help="RAG and retrieval commands", |
| | ) |
| |
|
| |
|
| | @rag_app.command("index") |
| | def index_document( |
| | files: List[Path] = typer.Argument(..., help="Document file(s) to index"), |
| | collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| | embedding_model: str = typer.Option("nomic-embed-text", "--model", "-m", help="Embedding model"), |
| | ): |
| | """ |
| | Index document(s) for RAG retrieval. |
| | |
| | Example: |
| | sparknet rag index document.pdf |
| | sparknet rag index *.pdf --collection contracts |
| | """ |
| | from loguru import logger |
| |
|
| | |
| | valid_files = [] |
| | for f in files: |
| | if f.exists(): |
| | valid_files.append(f) |
| | else: |
| | typer.echo(f"Warning: File not found, skipping: {f}", err=True) |
| |
|
| | if not valid_files: |
| | typer.echo("Error: No valid files to index", err=True) |
| | raise typer.Exit(1) |
| |
|
| | typer.echo(f"Indexing {len(valid_files)} document(s)...") |
| |
|
| | try: |
| | from ..rag import ( |
| | VectorStoreConfig, |
| | EmbeddingConfig, |
| | get_document_indexer, |
| | ) |
| |
|
| | |
| | store_config = VectorStoreConfig(collection_name=collection) |
| | embed_config = EmbeddingConfig(ollama_model=embedding_model) |
| |
|
| | |
| | indexer = get_document_indexer() |
| |
|
| | |
| | results = indexer.index_batch([str(f) for f in valid_files]) |
| |
|
| | |
| | successful = sum(1 for r in results if r.success) |
| | total_chunks = sum(r.num_chunks_indexed for r in results) |
| |
|
| | typer.echo(f"\nIndexing complete:") |
| | typer.echo(f" Documents: {successful}/{len(results)} successful") |
| | typer.echo(f" Chunks indexed: {total_chunks}") |
| |
|
| | for r in results: |
| | status = "✓" if r.success else "✗" |
| | typer.echo(f" [{status}] {r.source_path}: {r.num_chunks_indexed} chunks") |
| | if r.error: |
| | typer.echo(f" Error: {r.error}") |
| |
|
| | except ImportError as e: |
| | typer.echo(f"Error: Missing dependency - {e}", err=True) |
| | raise typer.Exit(1) |
| | except Exception as e: |
| | typer.echo(f"Error indexing documents: {e}", err=True) |
| | raise typer.Exit(1) |
| |
|
| |
|
| | @rag_app.command("search") |
| | def search_documents( |
| | query: str = typer.Argument(..., help="Search query"), |
| | top_k: int = typer.Option(5, "--top", "-k", help="Number of results"), |
| | collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| | document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
| | chunk_type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by chunk type"), |
| | output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
| | ): |
| | """ |
| | Search indexed documents. |
| | |
| | Example: |
| | sparknet rag search "payment terms" --top 10 |
| | sparknet rag search "table data" --type table |
| | """ |
| | typer.echo(f"Searching: {query}") |
| |
|
| | try: |
| | from ..rag import get_document_retriever, RetrieverConfig |
| |
|
| | |
| | config = RetrieverConfig(default_top_k=top_k) |
| | retriever = get_document_retriever(config) |
| |
|
| | |
| | filters = {} |
| | if document_id: |
| | filters["document_id"] = document_id |
| | if chunk_type: |
| | filters["chunk_type"] = chunk_type |
| |
|
| | |
| | chunks = retriever.retrieve(query, top_k=top_k, filters=filters if filters else None) |
| |
|
| | if not chunks: |
| | typer.echo("No results found.") |
| | return |
| |
|
| | |
| | output_data = { |
| | "query": query, |
| | "num_results": len(chunks), |
| | "results": [ |
| | { |
| | "chunk_id": c.chunk_id, |
| | "document_id": c.document_id, |
| | "page": c.page, |
| | "chunk_type": c.chunk_type, |
| | "similarity": c.similarity, |
| | "text": c.text[:500] + "..." if len(c.text) > 500 else c.text, |
| | } |
| | for c in chunks |
| | ], |
| | } |
| |
|
| | if output: |
| | with open(output, "w") as f: |
| | json.dump(output_data, f, indent=2) |
| | typer.echo(f"Results written to: {output}") |
| | else: |
| | typer.echo(f"\nFound {len(chunks)} results:\n") |
| | for i, c in enumerate(chunks, 1): |
| | typer.echo(f"[{i}] Similarity: {c.similarity:.3f}") |
| | if c.page is not None: |
| | typer.echo(f" Page: {c.page + 1}, Type: {c.chunk_type or 'text'}") |
| | typer.echo(f" {c.text[:200]}...") |
| | typer.echo() |
| |
|
| | except Exception as e: |
| | typer.echo(f"Error searching: {e}", err=True) |
| | raise typer.Exit(1) |
| |
|
| |
|
| | @rag_app.command("ask") |
| | def ask_question( |
| | question: str = typer.Argument(..., help="Question to answer"), |
| | top_k: int = typer.Option(5, "--top", "-k", help="Number of context chunks"), |
| | collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| | document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
| | output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
| | show_evidence: bool = typer.Option(True, "--evidence/--no-evidence", help="Show evidence sources"), |
| | ): |
| | """ |
| | Answer a question using RAG. |
| | |
| | Example: |
| | sparknet rag ask "What are the payment terms?" |
| | sparknet rag ask "What is the contract value?" --document contract123 |
| | """ |
| | typer.echo(f"Question: {question}") |
| | typer.echo("Processing...") |
| |
|
| | try: |
| | from ..rag import get_grounded_generator, GeneratorConfig |
| |
|
| | |
| | config = GeneratorConfig() |
| | generator = get_grounded_generator(config) |
| |
|
| | |
| | filters = {"document_id": document_id} if document_id else None |
| |
|
| | |
| | result = generator.answer_question(question, top_k=top_k, filters=filters) |
| |
|
| | |
| | output_data = { |
| | "question": question, |
| | "answer": result.answer, |
| | "confidence": result.confidence, |
| | "abstained": result.abstained, |
| | "abstain_reason": result.abstain_reason, |
| | "citations": [ |
| | { |
| | "index": c.index, |
| | "page": c.page, |
| | "snippet": c.text_snippet, |
| | "confidence": c.confidence, |
| | } |
| | for c in result.citations |
| | ], |
| | "num_chunks_used": result.num_chunks_used, |
| | } |
| |
|
| | if output: |
| | with open(output, "w") as f: |
| | json.dump(output_data, f, indent=2) |
| | typer.echo(f"Results written to: {output}") |
| | else: |
| | typer.echo(f"\nAnswer: {result.answer}") |
| | typer.echo(f"\nConfidence: {result.confidence:.2f}") |
| |
|
| | if result.abstained: |
| | typer.echo(f"Note: {result.abstain_reason}") |
| |
|
| | if show_evidence and result.citations: |
| | typer.echo(f"\nSources ({len(result.citations)}):") |
| | for c in result.citations: |
| | page_info = f"Page {c.page + 1}" if c.page is not None else "" |
| | typer.echo(f" [{c.index}] {page_info}: {c.text_snippet[:80]}...") |
| |
|
| | except Exception as e: |
| | typer.echo(f"Error generating answer: {e}", err=True) |
| | raise typer.Exit(1) |
| |
|
| |
|
| | @rag_app.command("status") |
| | def show_status( |
| | collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| | ): |
| | """ |
| | Show RAG index status. |
| | |
| | Example: |
| | sparknet rag status |
| | sparknet rag status --collection contracts |
| | """ |
| | typer.echo("RAG Index Status") |
| | typer.echo("=" * 40) |
| |
|
| | try: |
| | from ..rag import get_vector_store, VectorStoreConfig |
| |
|
| | config = VectorStoreConfig(collection_name=collection) |
| | store = get_vector_store(config) |
| |
|
| | |
| | total_chunks = store.count() |
| |
|
| | typer.echo(f"Collection: {collection}") |
| | typer.echo(f"Total chunks: {total_chunks}") |
| |
|
| | |
| | if hasattr(store, 'list_documents'): |
| | doc_ids = store.list_documents() |
| | typer.echo(f"Documents indexed: {len(doc_ids)}") |
| |
|
| | if doc_ids: |
| | typer.echo("\nDocuments:") |
| | for doc_id in doc_ids[:10]: |
| | chunk_count = store.count(doc_id) |
| | typer.echo(f" - {doc_id}: {chunk_count} chunks") |
| |
|
| | if len(doc_ids) > 10: |
| | typer.echo(f" ... and {len(doc_ids) - 10} more") |
| |
|
| | except Exception as e: |
| | typer.echo(f"Error getting status: {e}", err=True) |
| | raise typer.Exit(1) |
| |
|
| |
|
| | @rag_app.command("delete") |
| | def delete_document( |
| | document_id: str = typer.Argument(..., help="Document ID to delete"), |
| | collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| | force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"), |
| | ): |
| | """ |
| | Delete a document from the index. |
| | |
| | Example: |
| | sparknet rag delete doc123 |
| | sparknet rag delete doc123 --force |
| | """ |
| | if not force: |
| | confirm = typer.confirm(f"Delete document '{document_id}' from index?") |
| | if not confirm: |
| | typer.echo("Cancelled.") |
| | return |
| |
|
| | try: |
| | from ..rag import get_vector_store, VectorStoreConfig |
| |
|
| | config = VectorStoreConfig(collection_name=collection) |
| | store = get_vector_store(config) |
| |
|
| | deleted = store.delete_document(document_id) |
| | typer.echo(f"Deleted {deleted} chunks for document: {document_id}") |
| |
|
| | except Exception as e: |
| | typer.echo(f"Error deleting document: {e}", err=True) |
| | raise typer.Exit(1) |
| |
|