| | |
| | """ |
| | Document Intelligence Demo |
| | |
| | Demonstrates the capabilities of the SPARKNET document_intelligence subsystem: |
| | - Document parsing with OCR and layout detection |
| | - Schema-driven field extraction |
| | - Visual grounding with evidence |
| | - Question answering |
| | - Document classification |
| | """ |
| |
|
| | import asyncio |
| | import json |
| | from pathlib import Path |
| |
|
| | |
| | import sys |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| |
|
| | def demo_parse_document(doc_path: str): |
| | """Demo: Parse a document into semantic chunks.""" |
| | print("\n" + "=" * 60) |
| | print("1. DOCUMENT PARSING") |
| | print("=" * 60) |
| |
|
| | from src.document_intelligence import ( |
| | DocumentParser, |
| | ParserConfig, |
| | ) |
| |
|
| | |
| | config = ParserConfig( |
| | render_dpi=200, |
| | max_pages=5, |
| | include_markdown=True, |
| | ) |
| |
|
| | parser = DocumentParser(config=config) |
| |
|
| | print(f"\nParsing: {doc_path}") |
| | result = parser.parse(doc_path) |
| |
|
| | print(f"\nDocument ID: {result.doc_id}") |
| | print(f"Filename: {result.filename}") |
| | print(f"Pages: {result.num_pages}") |
| | print(f"Chunks: {len(result.chunks)}") |
| | print(f"Processing time: {result.processing_time_ms:.0f}ms") |
| |
|
| | |
| | print("\nChunk types:") |
| | by_type = {} |
| | for chunk in result.chunks: |
| | t = chunk.chunk_type.value |
| | by_type[t] = by_type.get(t, 0) + 1 |
| |
|
| | for t, count in sorted(by_type.items()): |
| | print(f" - {t}: {count}") |
| |
|
| | |
| | print("\nFirst 3 chunks:") |
| | for i, chunk in enumerate(result.chunks[:3]): |
| | print(f"\n [{i+1}] Type: {chunk.chunk_type.value}, Page: {chunk.page}") |
| | print(f" ID: {chunk.chunk_id}") |
| | print(f" Text: {chunk.text[:100]}...") |
| | print(f" BBox: {chunk.bbox.xyxy}") |
| | print(f" Confidence: {chunk.confidence:.2f}") |
| |
|
| | return result |
| |
|
| |
|
| | def demo_extract_fields(parse_result): |
| | """Demo: Extract fields using a schema.""" |
| | print("\n" + "=" * 60) |
| | print("2. SCHEMA-DRIVEN EXTRACTION") |
| | print("=" * 60) |
| |
|
| | from src.document_intelligence import ( |
| | FieldExtractor, |
| | ExtractionSchema, |
| | FieldType, |
| | ExtractionValidator, |
| | ) |
| |
|
| | |
| | schema = ExtractionSchema( |
| | name="DocumentInfo", |
| | description="Basic document information", |
| | ) |
| |
|
| | schema.add_string_field("title", "Document title or heading", required=True) |
| | schema.add_string_field("date", "Document date", required=False) |
| | schema.add_string_field("author", "Author or organization name", required=False) |
| | schema.add_string_field("reference_number", "Reference or ID number", required=False) |
| |
|
| | print(f"\nExtraction schema: {schema.name}") |
| | print("Fields:") |
| | for field in schema.fields: |
| | req = "required" if field.required else "optional" |
| | print(f" - {field.name} ({field.field_type.value}, {req})") |
| |
|
| | |
| | extractor = FieldExtractor() |
| | result = extractor.extract(parse_result, schema) |
| |
|
| | print("\nExtracted data:") |
| | for key, value in result.data.items(): |
| | status = " [ABSTAINED]" if key in result.abstained_fields else "" |
| | print(f" {key}: {value}{status}") |
| |
|
| | print(f"\nOverall confidence: {result.overall_confidence:.2f}") |
| |
|
| | |
| | if result.evidence: |
| | print("\nEvidence:") |
| | for ev in result.evidence[:3]: |
| | print(f" - Page {ev.page}, Chunk {ev.chunk_id[:12]}...") |
| | print(f" Snippet: {ev.snippet[:80]}...") |
| |
|
| | |
| | validator = ExtractionValidator() |
| | validation = validator.validate(result, schema) |
| |
|
| | print(f"\nValidation: {'PASSED' if validation.is_valid else 'FAILED'}") |
| | if validation.issues: |
| | print("Issues:") |
| | for issue in validation.issues[:3]: |
| | print(f" - [{issue.severity}] {issue.field_name}: {issue.message}") |
| |
|
| | return result |
| |
|
| |
|
| | def demo_search_and_qa(parse_result): |
| | """Demo: Search and question answering.""" |
| | print("\n" + "=" * 60) |
| | print("3. SEARCH AND Q&A") |
| | print("=" * 60) |
| |
|
| | from src.document_intelligence.tools import get_tool |
| |
|
| | |
| | print("\nSearching for 'document'...") |
| | search_tool = get_tool("search_chunks") |
| | search_result = search_tool.execute( |
| | parse_result=parse_result, |
| | query="document", |
| | top_k=5, |
| | ) |
| |
|
| | if search_result.success: |
| | matches = search_result.data.get("results", []) |
| | print(f"Found {len(matches)} matches:") |
| | for i, match in enumerate(matches[:3], 1): |
| | print(f" {i}. Page {match['page']}, Type: {match['type']}") |
| | print(f" Score: {match['score']:.2f}") |
| | print(f" Text: {match['text'][:80]}...") |
| |
|
| | |
| | print("\nAsking: 'What is this document about?'") |
| | qa_tool = get_tool("answer_question") |
| | qa_result = qa_tool.execute( |
| | parse_result=parse_result, |
| | question="What is this document about?", |
| | ) |
| |
|
| | if qa_result.success: |
| | print(f"Answer: {qa_result.data.get('answer', 'No answer')}") |
| | print(f"Confidence: {qa_result.data.get('confidence', 0):.2f}") |
| |
|
| |
|
| | def demo_grounding(parse_result, doc_path: str): |
| | """Demo: Visual grounding with crops.""" |
| | print("\n" + "=" * 60) |
| | print("4. VISUAL GROUNDING") |
| | print("=" * 60) |
| |
|
| | from src.document_intelligence import ( |
| | load_document, |
| | RenderOptions, |
| | ) |
| | from src.document_intelligence.grounding import ( |
| | EvidenceBuilder, |
| | crop_region, |
| | create_annotated_image, |
| | ) |
| |
|
| | |
| | loader, renderer = load_document(doc_path) |
| | page_image = renderer.render_page(1, RenderOptions(dpi=200)) |
| | loader.close() |
| |
|
| | print(f"\nPage 1 image size: {page_image.shape}") |
| |
|
| | |
| | page_chunks = [c for c in parse_result.chunks if c.page == 1] |
| | print(f"Page 1 chunks: {len(page_chunks)}") |
| |
|
| | |
| | if page_chunks: |
| | chunk = page_chunks[0] |
| | evidence_builder = EvidenceBuilder() |
| |
|
| | evidence = evidence_builder.create_evidence( |
| | chunk=chunk, |
| | value=chunk.text[:50], |
| | field_name="example_field", |
| | ) |
| |
|
| | print(f"\nEvidence created:") |
| | print(f" Chunk ID: {evidence.chunk_id}") |
| | print(f" Page: {evidence.page}") |
| | print(f" BBox: {evidence.bbox.xyxy}") |
| | print(f" Snippet: {evidence.snippet[:80]}...") |
| |
|
| | |
| | crop = crop_region(page_image, chunk.bbox) |
| | print(f" Crop size: {crop.shape}") |
| |
|
| | |
| | print("\nAnnotated image would include bounding boxes for all chunks.") |
| | print("Use the CLI 'sparknet docint visualize' command to generate.") |
| |
|
| |
|
| | def demo_classification(parse_result): |
| | """Demo: Document classification.""" |
| | print("\n" + "=" * 60) |
| | print("5. DOCUMENT CLASSIFICATION") |
| | print("=" * 60) |
| |
|
| | from src.document_intelligence.chunks import DocumentType |
| |
|
| | |
| | first_page = [c for c in parse_result.chunks if c.page == 1][:5] |
| | content = " ".join(c.text for c in first_page).lower() |
| |
|
| | type_keywords = { |
| | "invoice": ["invoice", "bill", "payment due", "amount due"], |
| | "contract": ["agreement", "contract", "party", "whereas"], |
| | "receipt": ["receipt", "paid", "transaction"], |
| | "patent": ["patent", "claims", "invention"], |
| | "report": ["report", "findings", "summary"], |
| | } |
| |
|
| | detected_type = "other" |
| | confidence = 0.3 |
| |
|
| | for doc_type, keywords in type_keywords.items(): |
| | matches = sum(1 for k in keywords if k in content) |
| | if matches >= 2: |
| | detected_type = doc_type |
| | confidence = min(0.95, 0.5 + matches * 0.15) |
| | break |
| |
|
| | print(f"\nDetected type: {detected_type}") |
| | print(f"Confidence: {confidence:.2f}") |
| |
|
| |
|
| | def main(): |
| | """Run all demos.""" |
| | print("=" * 60) |
| | print("SPARKNET Document Intelligence Demo") |
| | print("=" * 60) |
| |
|
| | |
| | sample_paths = [ |
| | Path("Dataset/Patent_1.pdf"), |
| | Path("data/sample.pdf"), |
| | Path("tests/fixtures/sample.pdf"), |
| | ] |
| |
|
| | doc_path = None |
| | for path in sample_paths: |
| | if path.exists(): |
| | doc_path = str(path) |
| | break |
| |
|
| | if not doc_path: |
| | print("\nNo sample document found.") |
| | print("Please provide a PDF file path as argument.") |
| | print("\nUsage: python document_intelligence_demo.py [path/to/document.pdf]") |
| |
|
| | if len(sys.argv) > 1: |
| | doc_path = sys.argv[1] |
| | else: |
| | return |
| |
|
| | print(f"\nUsing document: {doc_path}") |
| |
|
| | try: |
| | |
| | parse_result = demo_parse_document(doc_path) |
| | demo_extract_fields(parse_result) |
| | demo_search_and_qa(parse_result) |
| | demo_grounding(parse_result, doc_path) |
| | demo_classification(parse_result) |
| |
|
| | print("\n" + "=" * 60) |
| | print("Demo complete!") |
| | print("=" * 60) |
| |
|
| | except ImportError as e: |
| | print(f"\nImport error: {e}") |
| | print("Make sure all dependencies are installed:") |
| | print(" pip install pymupdf pillow numpy pydantic") |
| |
|
| | except Exception as e: |
| | print(f"\nError: {e}") |
| | import traceback |
| | traceback.print_exc() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|