| | """ |
| | Document Intelligence CLI Commands |
| | |
| | CLI interface for the document_intelligence subsystem. |
| | """ |
| |
|
| | import json |
| | import sys |
| | from pathlib import Path |
| | from typing import List, Optional |
| |
|
| | import click |
| |
|
| |
|
| | @click.group(name="docint") |
| | def docint_cli(): |
| | """Document Intelligence commands.""" |
| | pass |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
| | @click.option("--max-pages", type=int, help="Maximum pages to process") |
| | @click.option("--dpi", type=int, default=200, help="Render DPI (default: 200)") |
| | @click.option("--format", "output_format", type=click.Choice(["json", "markdown", "text"]), |
| | default="json", help="Output format") |
| | def parse(path: str, output: Optional[str], max_pages: Optional[int], |
| | dpi: int, output_format: str): |
| | """ |
| | Parse a document into semantic chunks. |
| | |
| | Example: |
| | sparknet docint parse invoice.pdf -o result.json |
| | sparknet docint parse document.pdf --format markdown |
| | """ |
| | from src.document_intelligence import ( |
| | DocumentParser, |
| | ParserConfig, |
| | ) |
| |
|
| | config = ParserConfig( |
| | render_dpi=dpi, |
| | max_pages=max_pages, |
| | ) |
| |
|
| | parser = DocumentParser(config=config) |
| |
|
| | click.echo(f"Parsing: {path}") |
| |
|
| | try: |
| | result = parser.parse(path) |
| |
|
| | if output_format == "json": |
| | output_data = { |
| | "doc_id": result.doc_id, |
| | "filename": result.filename, |
| | "num_pages": result.num_pages, |
| | "chunks": [ |
| | { |
| | "chunk_id": c.chunk_id, |
| | "type": c.chunk_type.value, |
| | "text": c.text, |
| | "page": c.page, |
| | "bbox": c.bbox.xyxy, |
| | "confidence": c.confidence, |
| | } |
| | for c in result.chunks |
| | ], |
| | "processing_time_ms": result.processing_time_ms, |
| | } |
| |
|
| | if output: |
| | with open(output, "w") as f: |
| | json.dump(output_data, f, indent=2) |
| | click.echo(f"Output written to: {output}") |
| | else: |
| | click.echo(json.dumps(output_data, indent=2)) |
| |
|
| | elif output_format == "markdown": |
| | if output: |
| | with open(output, "w") as f: |
| | f.write(result.markdown_full) |
| | click.echo(f"Markdown written to: {output}") |
| | else: |
| | click.echo(result.markdown_full) |
| |
|
| | else: |
| | for chunk in result.chunks: |
| | click.echo(f"[Page {chunk.page}, {chunk.chunk_type.value}]") |
| | click.echo(chunk.text) |
| | click.echo() |
| |
|
| | click.echo(f"\nParsed {len(result.chunks)} chunks in {result.processing_time_ms:.0f}ms") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.option("--field", "-f", multiple=True, help="Field to extract (can specify multiple)") |
| | @click.option("--schema", "-s", type=click.Path(exists=True), help="JSON schema file") |
| | @click.option("--preset", type=click.Choice(["invoice", "receipt", "contract"]), |
| | help="Use preset schema") |
| | @click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
| | def extract(path: str, field: tuple, schema: Optional[str], preset: Optional[str], |
| | output: Optional[str]): |
| | """ |
| | Extract fields from a document. |
| | |
| | Example: |
| | sparknet docint extract invoice.pdf --preset invoice |
| | sparknet docint extract doc.pdf -f vendor_name -f total_amount |
| | sparknet docint extract doc.pdf --schema my_schema.json |
| | """ |
| | from src.document_intelligence import ( |
| | DocumentParser, |
| | FieldExtractor, |
| | ExtractionSchema, |
| | FieldSpec, |
| | FieldType, |
| | create_invoice_schema, |
| | create_receipt_schema, |
| | create_contract_schema, |
| | ) |
| |
|
| | |
| | if preset: |
| | if preset == "invoice": |
| | extraction_schema = create_invoice_schema() |
| | elif preset == "receipt": |
| | extraction_schema = create_receipt_schema() |
| | elif preset == "contract": |
| | extraction_schema = create_contract_schema() |
| | elif schema: |
| | with open(schema) as f: |
| | schema_dict = json.load(f) |
| | extraction_schema = ExtractionSchema.from_json_schema(schema_dict) |
| | elif field: |
| | extraction_schema = ExtractionSchema(name="custom") |
| | for f in field: |
| | extraction_schema.add_string_field(f, required=True) |
| | else: |
| | click.echo("Error: Specify --field, --schema, or --preset", err=True) |
| | sys.exit(1) |
| |
|
| | click.echo(f"Extracting from: {path}") |
| | click.echo(f"Fields: {', '.join(f.name for f in extraction_schema.fields)}") |
| |
|
| | try: |
| | |
| | parser = DocumentParser() |
| | parse_result = parser.parse(path) |
| |
|
| | |
| | extractor = FieldExtractor() |
| | result = extractor.extract(parse_result, extraction_schema) |
| |
|
| | output_data = { |
| | "doc_id": parse_result.doc_id, |
| | "filename": parse_result.filename, |
| | "extracted_data": result.data, |
| | "confidence": result.overall_confidence, |
| | "abstained_fields": result.abstained_fields, |
| | "evidence": [ |
| | { |
| | "chunk_id": e.chunk_id, |
| | "page": e.page, |
| | "bbox": e.bbox.xyxy, |
| | "snippet": e.snippet, |
| | } |
| | for e in result.evidence |
| | ], |
| | } |
| |
|
| | if output: |
| | with open(output, "w") as f: |
| | json.dump(output_data, f, indent=2) |
| | click.echo(f"Output written to: {output}") |
| | else: |
| | click.echo("\nExtracted Data:") |
| | for key, value in result.data.items(): |
| | status = "" if key not in result.abstained_fields else " [ABSTAINED]" |
| | click.echo(f" {key}: {value}{status}") |
| |
|
| | click.echo(f"\nConfidence: {result.overall_confidence:.2f}") |
| |
|
| | if result.abstained_fields: |
| | click.echo(f"Abstained: {', '.join(result.abstained_fields)}") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.argument("question") |
| | @click.option("--verbose", "-v", is_flag=True, help="Show evidence details") |
| | @click.option("--use-rag", is_flag=True, help="Use RAG for retrieval (requires indexed document)") |
| | @click.option("--document-id", "-d", help="Document ID for RAG retrieval") |
| | @click.option("--top-k", "-k", type=int, default=5, help="Number of chunks to consider") |
| | @click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type (can specify multiple)") |
| | @click.option("--page-start", type=int, help="Filter by page range start") |
| | @click.option("--page-end", type=int, help="Filter by page range end") |
| | def ask(path: str, question: str, verbose: bool, use_rag: bool, |
| | document_id: Optional[str], top_k: int, chunk_type: tuple, |
| | page_start: Optional[int], page_end: Optional[int]): |
| | """ |
| | Ask a question about a document. |
| | |
| | Example: |
| | sparknet docint ask invoice.pdf "What is the total amount?" |
| | sparknet docint ask doc.pdf "Find claims" --use-rag --top-k 10 |
| | sparknet docint ask doc.pdf "What tables show?" -t table --use-rag |
| | """ |
| | from src.document_intelligence import DocumentParser |
| |
|
| | click.echo(f"Document: {path}") |
| | click.echo(f"Question: {question}") |
| |
|
| | if use_rag: |
| | click.echo("Mode: RAG (semantic retrieval)") |
| | else: |
| | click.echo("Mode: Keyword search") |
| |
|
| | click.echo() |
| |
|
| | try: |
| | if use_rag: |
| | |
| | from src.document_intelligence.tools import get_rag_tool |
| |
|
| | tool = get_rag_tool("rag_answer") |
| |
|
| | |
| | page_range = None |
| | if page_start is not None and page_end is not None: |
| | page_range = (page_start, page_end) |
| |
|
| | result = tool.execute( |
| | question=question, |
| | document_id=document_id, |
| | top_k=top_k, |
| | chunk_types=list(chunk_type) if chunk_type else None, |
| | page_range=page_range, |
| | ) |
| | else: |
| | |
| | from src.document_intelligence.tools import get_tool |
| |
|
| | parser = DocumentParser() |
| | parse_result = parser.parse(path) |
| |
|
| | tool = get_tool("answer_question") |
| | result = tool.execute( |
| | parse_result=parse_result, |
| | question=question, |
| | top_k=top_k, |
| | ) |
| |
|
| | if result.success: |
| | data = result.data |
| | click.echo(f"Answer: {data.get('answer', 'No answer found')}") |
| | click.echo(f"Confidence: {data.get('confidence', 0):.2f}") |
| |
|
| | if data.get('abstained'): |
| | click.echo("Note: The system abstained due to low confidence.") |
| |
|
| | if verbose and result.evidence: |
| | click.echo("\nEvidence:") |
| | for ev in result.evidence: |
| | click.echo(f" - Page {ev.get('page', '?')}: {ev.get('snippet', '')[:100]}...") |
| |
|
| | if data.get('citations'): |
| | click.echo("\nCitations:") |
| | for cit in data['citations']: |
| | click.echo(f" [{cit['index']}] {cit.get('text', '')[:80]}...") |
| | else: |
| | click.echo(f"Error: {result.error}", err=True) |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
| | def classify(path: str, output: Optional[str]): |
| | """ |
| | Classify a document's type. |
| | |
| | Example: |
| | sparknet docint classify document.pdf |
| | """ |
| | from src.document_intelligence import DocumentParser |
| | from src.document_intelligence.chunks import DocumentType |
| |
|
| | click.echo(f"Classifying: {path}") |
| |
|
| | try: |
| | |
| | parser = DocumentParser() |
| | parse_result = parser.parse(path) |
| |
|
| | |
| | first_page_chunks = [c for c in parse_result.chunks if c.page == 1][:5] |
| | content = " ".join(c.text[:200] for c in first_page_chunks).lower() |
| |
|
| | doc_type = "other" |
| | confidence = 0.5 |
| |
|
| | type_keywords = { |
| | "invoice": ["invoice", "bill", "payment due", "amount due", "invoice number"], |
| | "contract": ["agreement", "contract", "party", "whereas", "terms and conditions"], |
| | "receipt": ["receipt", "paid", "transaction", "thank you for your purchase"], |
| | "form": ["form", "fill in", "checkbox", "signature line"], |
| | "letter": ["dear", "sincerely", "regards", "to whom it may concern"], |
| | "report": ["report", "findings", "conclusion", "summary", "analysis"], |
| | "patent": ["patent", "claims", "invention", "embodiment", "disclosed"], |
| | } |
| |
|
| | for dtype, keywords in type_keywords.items(): |
| | matches = sum(1 for k in keywords if k in content) |
| | if matches >= 2: |
| | doc_type = dtype |
| | confidence = min(0.95, 0.5 + matches * 0.15) |
| | break |
| |
|
| | output_data = { |
| | "doc_id": parse_result.doc_id, |
| | "filename": parse_result.filename, |
| | "document_type": doc_type, |
| | "confidence": confidence, |
| | } |
| |
|
| | if output: |
| | with open(output, "w") as f: |
| | json.dump(output_data, f, indent=2) |
| | click.echo(f"Output written to: {output}") |
| | else: |
| | click.echo(f"Type: {doc_type}") |
| | click.echo(f"Confidence: {confidence:.2f}") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.option("--query", "-q", help="Search query") |
| | @click.option("--type", "chunk_type", help="Filter by chunk type") |
| | @click.option("--top", "-k", type=int, default=10, help="Number of results") |
| | def search(path: str, query: Optional[str], chunk_type: Optional[str], top: int): |
| | """ |
| | Search document content. |
| | |
| | Example: |
| | sparknet docint search document.pdf -q "payment terms" |
| | sparknet docint search document.pdf --type table |
| | """ |
| | from src.document_intelligence import DocumentParser |
| | from src.document_intelligence.tools import get_tool |
| |
|
| | click.echo(f"Searching: {path}") |
| |
|
| | try: |
| | |
| | parser = DocumentParser() |
| | parse_result = parser.parse(path) |
| |
|
| | if query: |
| | |
| | tool = get_tool("search_chunks") |
| | result = tool.execute( |
| | parse_result=parse_result, |
| | query=query, |
| | chunk_types=[chunk_type] if chunk_type else None, |
| | top_k=top, |
| | ) |
| |
|
| | if result.success: |
| | results = result.data.get("results", []) |
| | click.echo(f"Found {len(results)} results:\n") |
| |
|
| | for i, r in enumerate(results, 1): |
| | click.echo(f"{i}. [Page {r['page']}, {r['type']}] (score: {r['score']:.2f})") |
| | click.echo(f" {r['text'][:200]}...") |
| | click.echo() |
| | else: |
| | click.echo(f"Error: {result.error}", err=True) |
| |
|
| | elif chunk_type: |
| | |
| | matching = [c for c in parse_result.chunks if c.chunk_type.value == chunk_type] |
| | click.echo(f"Found {len(matching)} {chunk_type} chunks:\n") |
| |
|
| | for i, chunk in enumerate(matching[:top], 1): |
| | click.echo(f"{i}. [Page {chunk.page}] {chunk.chunk_id}") |
| | click.echo(f" {chunk.text[:200]}...") |
| | click.echo() |
| |
|
| | else: |
| | |
| | click.echo(f"Total chunks: {len(parse_result.chunks)}\n") |
| |
|
| | |
| | by_type = {} |
| | for chunk in parse_result.chunks: |
| | t = chunk.chunk_type.value |
| | by_type[t] = by_type.get(t, 0) + 1 |
| |
|
| | click.echo("Chunk types:") |
| | for t, count in sorted(by_type.items()): |
| | click.echo(f" {t}: {count}") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("path", type=click.Path(exists=True)) |
| | @click.option("--page", "-p", type=int, default=1, help="Page number") |
| | @click.option("--output-dir", "-d", type=click.Path(), default="./crops", |
| | help="Output directory for crops") |
| | @click.option("--annotate", "-a", is_flag=True, help="Create annotated page image") |
| | def visualize(path: str, page: int, output_dir: str, annotate: bool): |
| | """ |
| | Visualize document regions. |
| | |
| | Example: |
| | sparknet docint visualize document.pdf --page 1 --annotate |
| | """ |
| | from src.document_intelligence import ( |
| | DocumentParser, |
| | load_document, |
| | RenderOptions, |
| | ) |
| | from src.document_intelligence.grounding import create_annotated_image, CropManager |
| | from PIL import Image |
| | import numpy as np |
| |
|
| | output_path = Path(output_dir) |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | click.echo(f"Processing: {path}, page {page}") |
| |
|
| | try: |
| | |
| | parser = DocumentParser() |
| | parse_result = parser.parse(path) |
| |
|
| | |
| | loader, renderer = load_document(path) |
| | page_image = renderer.render_page(page, RenderOptions(dpi=200)) |
| | loader.close() |
| |
|
| | |
| | page_chunks = [c for c in parse_result.chunks if c.page == page] |
| |
|
| | if annotate: |
| | |
| | bboxes = [c.bbox for c in page_chunks] |
| | labels = [f"{c.chunk_type.value[:10]}" for c in page_chunks] |
| |
|
| | annotated = create_annotated_image(page_image, bboxes, labels) |
| |
|
| | output_file = output_path / f"annotated_page_{page}.png" |
| | Image.fromarray(annotated).save(output_file) |
| | click.echo(f"Saved annotated image: {output_file}") |
| |
|
| | else: |
| | |
| | crop_manager = CropManager(output_path) |
| |
|
| | for chunk in page_chunks: |
| | crop_path = crop_manager.save_crop( |
| | page_image, |
| | parse_result.doc_id, |
| | page, |
| | chunk.bbox, |
| | ) |
| | click.echo(f"Saved crop: {crop_path}") |
| |
|
| | click.echo(f"\nProcessed {len(page_chunks)} chunks from page {page}") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command() |
| | @click.argument("paths", nargs=-1, type=click.Path(exists=True), required=True) |
| | @click.option("--max-pages", type=int, help="Maximum pages to process per document") |
| | @click.option("--batch-size", type=int, default=32, help="Embedding batch size") |
| | @click.option("--min-length", type=int, default=10, help="Minimum chunk text length") |
| | def index(paths: tuple, max_pages: Optional[int], batch_size: int, min_length: int): |
| | """ |
| | Index documents into the vector store for RAG. |
| | |
| | Example: |
| | sparknet docint index document.pdf |
| | sparknet docint index *.pdf --max-pages 50 |
| | sparknet docint index doc1.pdf doc2.pdf doc3.pdf |
| | """ |
| | from src.document_intelligence.tools import get_rag_tool |
| |
|
| | click.echo(f"Indexing {len(paths)} document(s)...") |
| | click.echo() |
| |
|
| | try: |
| | tool = get_rag_tool("index_document") |
| |
|
| | total_indexed = 0 |
| | total_skipped = 0 |
| | errors = [] |
| |
|
| | for path in paths: |
| | click.echo(f"Processing: {path}") |
| |
|
| | result = tool.execute( |
| | path=path, |
| | max_pages=max_pages, |
| | ) |
| |
|
| | if result.success: |
| | data = result.data |
| | indexed = data.get("chunks_indexed", 0) |
| | skipped = data.get("chunks_skipped", 0) |
| | total_indexed += indexed |
| | total_skipped += skipped |
| | click.echo(f" Indexed: {indexed} chunks, Skipped: {skipped}") |
| | click.echo(f" Document ID: {data.get('document_id', 'unknown')}") |
| | else: |
| | errors.append((path, result.error)) |
| | click.echo(f" Error: {result.error}", err=True) |
| |
|
| | click.echo() |
| | click.echo("=" * 40) |
| | click.echo(f"Total documents: {len(paths)}") |
| | click.echo(f"Total chunks indexed: {total_indexed}") |
| | click.echo(f"Total chunks skipped: {total_skipped}") |
| |
|
| | if errors: |
| | click.echo(f"Errors: {len(errors)}") |
| | for path, err in errors: |
| | click.echo(f" - {path}: {err}") |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command(name="index-stats") |
| | def index_stats(): |
| | """ |
| | Show statistics about the vector store index. |
| | |
| | Example: |
| | sparknet docint index-stats |
| | """ |
| | from src.document_intelligence.tools import get_rag_tool |
| |
|
| | try: |
| | tool = get_rag_tool("get_index_stats") |
| | result = tool.execute() |
| |
|
| | if result.success: |
| | data = result.data |
| | click.echo("Vector Store Statistics:") |
| | click.echo(f" Total chunks: {data.get('total_chunks', 0)}") |
| | click.echo(f" Embedding model: {data.get('embedding_model', 'unknown')}") |
| | click.echo(f" Embedding dimension: {data.get('embedding_dimension', 'unknown')}") |
| | else: |
| | click.echo(f"Error: {result.error}", err=True) |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command(name="delete-index") |
| | @click.argument("document_id") |
| | @click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt") |
| | def delete_index(document_id: str, yes: bool): |
| | """ |
| | Delete a document from the vector store index. |
| | |
| | Example: |
| | sparknet docint delete-index doc_abc123 |
| | """ |
| | from src.document_intelligence.tools import get_rag_tool |
| |
|
| | if not yes: |
| | click.confirm(f"Delete document '{document_id}' from index?", abort=True) |
| |
|
| | try: |
| | tool = get_rag_tool("delete_document") |
| | result = tool.execute(document_id=document_id) |
| |
|
| | if result.success: |
| | data = result.data |
| | click.echo(f"Deleted {data.get('chunks_deleted', 0)} chunks for document: {document_id}") |
| | else: |
| | click.echo(f"Error: {result.error}", err=True) |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | @docint_cli.command(name="retrieve") |
| | @click.argument("query") |
| | @click.option("--top-k", "-k", type=int, default=5, help="Number of results") |
| | @click.option("--document-id", "-d", help="Filter by document ID") |
| | @click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type") |
| | @click.option("--page-start", type=int, help="Filter by page range start") |
| | @click.option("--page-end", type=int, help="Filter by page range end") |
| | @click.option("--verbose", "-v", is_flag=True, help="Show full chunk text") |
| | def retrieve(query: str, top_k: int, document_id: Optional[str], |
| | chunk_type: tuple, page_start: Optional[int], |
| | page_end: Optional[int], verbose: bool): |
| | """ |
| | Retrieve relevant chunks from the vector store. |
| | |
| | Example: |
| | sparknet docint retrieve "payment terms" |
| | sparknet docint retrieve "claims" -d doc_abc123 -t paragraph -k 10 |
| | """ |
| | from src.document_intelligence.tools import get_rag_tool |
| |
|
| | click.echo(f"Query: {query}") |
| | click.echo() |
| |
|
| | try: |
| | tool = get_rag_tool("retrieve_chunks") |
| |
|
| | page_range = None |
| | if page_start is not None and page_end is not None: |
| | page_range = (page_start, page_end) |
| |
|
| | result = tool.execute( |
| | query=query, |
| | top_k=top_k, |
| | document_id=document_id, |
| | chunk_types=list(chunk_type) if chunk_type else None, |
| | page_range=page_range, |
| | ) |
| |
|
| | if result.success: |
| | data = result.data |
| | chunks = data.get("chunks", []) |
| | click.echo(f"Found {len(chunks)} results:\n") |
| |
|
| | for i, chunk in enumerate(chunks, 1): |
| | click.echo(f"{i}. [sim={chunk['similarity']:.3f}] Page {chunk.get('page', '?')}, {chunk.get('chunk_type', 'text')}") |
| | click.echo(f" Document: {chunk['document_id']}") |
| |
|
| | text = chunk['text'] |
| | if verbose: |
| | click.echo(f" Text: {text}") |
| | else: |
| | click.echo(f" Text: {text[:150]}...") |
| | click.echo() |
| | else: |
| | click.echo(f"Error: {result.error}", err=True) |
| |
|
| | except Exception as e: |
| | click.echo(f"Error: {e}", err=True) |
| | sys.exit(1) |
| |
|
| |
|
| | |
| | def register_commands(cli): |
| | """Register docint commands with main CLI.""" |
| | cli.add_command(docint_cli) |
| |
|