| | """ |
| | Agent Adapter for Document Intelligence |
| | |
| | Bridges the DocumentAgent with the new document_intelligence subsystem. |
| | Provides enhanced tools and capabilities. |
| | """ |
| |
|
| | import logging |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple, Union |
| |
|
| | from .chunks.models import ( |
| | DocumentChunk, |
| | EvidenceRef, |
| | ParseResult, |
| | ExtractionResult, |
| | ClassificationResult, |
| | DocumentType, |
| | ) |
| | from .parsing import DocumentParser, ParserConfig |
| | from .extraction import ( |
| | ExtractionSchema, |
| | FieldExtractor, |
| | ExtractionConfig, |
| | ExtractionValidator, |
| | ) |
| | from .grounding import EvidenceBuilder, EvidenceTracker, CropManager |
| | from .tools import get_tool, list_tools, ToolResult |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class AgentConfig: |
| | """Configuration for the document agent adapter.""" |
| |
|
| | |
| | render_dpi: int = 200 |
| | max_pages: Optional[int] = None |
| | ocr_languages: List[str] = None |
| |
|
| | |
| | min_confidence: float = 0.5 |
| | abstain_on_low_confidence: bool = True |
| |
|
| | |
| | enable_crops: bool = True |
| | crop_output_dir: Optional[Path] = None |
| |
|
| | |
| | max_iterations: int = 10 |
| | verbose: bool = False |
| |
|
| | def __post_init__(self): |
| | if self.ocr_languages is None: |
| | self.ocr_languages = ["en"] |
| |
|
| |
|
| | class DocumentIntelligenceAdapter: |
| | """ |
| | Adapter connecting DocumentAgent with document_intelligence subsystem. |
| | |
| | Provides: |
| | - Document loading and parsing |
| | - Schema-driven extraction |
| | - Evidence-grounded results |
| | - Tool execution |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[AgentConfig] = None, |
| | llm_client: Optional[Any] = None, |
| | ): |
| | self.config = config or AgentConfig() |
| | self.llm_client = llm_client |
| |
|
| | |
| | self.parser = DocumentParser( |
| | config=ParserConfig( |
| | render_dpi=self.config.render_dpi, |
| | max_pages=self.config.max_pages, |
| | ocr_languages=self.config.ocr_languages, |
| | ) |
| | ) |
| |
|
| | self.extractor = FieldExtractor( |
| | config=ExtractionConfig( |
| | min_field_confidence=self.config.min_confidence, |
| | abstain_on_low_confidence=self.config.abstain_on_low_confidence, |
| | ) |
| | ) |
| |
|
| | self.validator = ExtractionValidator( |
| | min_confidence=self.config.min_confidence, |
| | ) |
| |
|
| | self.evidence_builder = EvidenceBuilder() |
| |
|
| | if self.config.enable_crops and self.config.crop_output_dir: |
| | self.crop_manager = CropManager(self.config.crop_output_dir) |
| | else: |
| | self.crop_manager = None |
| |
|
| | |
| | self._current_parse_result: Optional[ParseResult] = None |
| | self._page_images: Dict[int, Any] = {} |
| |
|
| | logger.info("Initialized DocumentIntelligenceAdapter") |
| |
|
| | def load_document( |
| | self, |
| | path: Union[str, Path], |
| | render_pages: bool = True, |
| | ) -> ParseResult: |
| | """ |
| | Load and parse a document. |
| | |
| | Args: |
| | path: Path to document file |
| | render_pages: Whether to keep rendered page images |
| | |
| | Returns: |
| | ParseResult with chunks and metadata |
| | """ |
| | path = Path(path) |
| | logger.info(f"Loading document: {path}") |
| |
|
| | |
| | self._current_parse_result = self.parser.parse(path) |
| |
|
| | |
| | if render_pages: |
| | from .io import load_document, RenderOptions |
| | loader, renderer = load_document(path) |
| | for page_num in range(1, self._current_parse_result.num_pages + 1): |
| | self._page_images[page_num] = renderer.render_page( |
| | page_num, |
| | RenderOptions(dpi=self.config.render_dpi) |
| | ) |
| | loader.close() |
| |
|
| | return self._current_parse_result |
| |
|
| | def extract_fields( |
| | self, |
| | schema: Union[ExtractionSchema, Dict[str, Any]], |
| | validate: bool = True, |
| | ) -> ExtractionResult: |
| | """ |
| | Extract fields from the loaded document. |
| | |
| | Args: |
| | schema: Extraction schema |
| | validate: Whether to validate results |
| | |
| | Returns: |
| | ExtractionResult with values and evidence |
| | """ |
| | if not self._current_parse_result: |
| | raise RuntimeError("No document loaded. Call load_document() first.") |
| |
|
| | |
| | if isinstance(schema, dict): |
| | schema = ExtractionSchema.from_json_schema(schema) |
| |
|
| | |
| | result = self.extractor.extract(self._current_parse_result, schema) |
| |
|
| | |
| | if validate: |
| | validation = self.validator.validate(result, schema) |
| | if not validation.is_valid: |
| | logger.warning(f"Extraction validation failed: {validation.error_count} errors") |
| | |
| | result.metadata = result.metadata or {} |
| | result.metadata["validation_issues"] = [ |
| | {"field": i.field_name, "type": i.issue_type, "message": i.message} |
| | for i in validation.issues |
| | ] |
| |
|
| | return result |
| |
|
| | def answer_question( |
| | self, |
| | question: str, |
| | use_llm: bool = True, |
| | ) -> Tuple[str, List[EvidenceRef], float]: |
| | """ |
| | Answer a question about the document. |
| | |
| | Args: |
| | question: Question to answer |
| | use_llm: Whether to use LLM for generation |
| | |
| | Returns: |
| | Tuple of (answer, evidence, confidence) |
| | """ |
| | if not self._current_parse_result: |
| | raise RuntimeError("No document loaded") |
| |
|
| | tool = get_tool("answer_question", llm_client=self.llm_client) |
| | result = tool.execute( |
| | parse_result=self._current_parse_result, |
| | question=question, |
| | use_rag=False, |
| | ) |
| |
|
| | if not result.success: |
| | return f"Error: {result.error}", [], 0.0 |
| |
|
| | data = result.data |
| | answer = data.get("answer", "") |
| | confidence = data.get("confidence", 0.5) |
| |
|
| | |
| | evidence = [] |
| | for ev_dict in result.evidence: |
| | from .chunks.models import BoundingBox |
| | evidence.append(EvidenceRef( |
| | chunk_id=ev_dict["chunk_id"], |
| | doc_id=self._current_parse_result.doc_id, |
| | page=ev_dict["page"], |
| | bbox=BoundingBox( |
| | x_min=ev_dict["bbox"][0], |
| | y_min=ev_dict["bbox"][1], |
| | x_max=ev_dict["bbox"][2], |
| | y_max=ev_dict["bbox"][3], |
| | normalized=True, |
| | ), |
| | source_type="text", |
| | snippet=ev_dict.get("snippet", ""), |
| | confidence=confidence, |
| | )) |
| |
|
| | return answer, evidence, confidence |
| |
|
| | def search_chunks( |
| | self, |
| | query: str, |
| | chunk_types: Optional[List[str]] = None, |
| | top_k: int = 10, |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Search for chunks matching a query. |
| | |
| | Args: |
| | query: Search query |
| | chunk_types: Optional chunk type filter |
| | top_k: Maximum results |
| | |
| | Returns: |
| | List of matching chunks with scores |
| | """ |
| | if not self._current_parse_result: |
| | raise RuntimeError("No document loaded") |
| |
|
| | tool = get_tool("search_chunks") |
| | result = tool.execute( |
| | parse_result=self._current_parse_result, |
| | query=query, |
| | chunk_types=chunk_types, |
| | top_k=top_k, |
| | ) |
| |
|
| | if not result.success: |
| | return [] |
| |
|
| | return result.data.get("results", []) |
| |
|
| | def get_chunk(self, chunk_id: str) -> Optional[DocumentChunk]: |
| | """Get a chunk by ID.""" |
| | if not self._current_parse_result: |
| | return None |
| |
|
| | for chunk in self._current_parse_result.chunks: |
| | if chunk.chunk_id == chunk_id: |
| | return chunk |
| | return None |
| |
|
| | def get_page_image(self, page: int) -> Optional[Any]: |
| | """Get rendered page image.""" |
| | return self._page_images.get(page) |
| |
|
| | def crop_chunk( |
| | self, |
| | chunk: DocumentChunk, |
| | padding_percent: float = 0.02, |
| | ) -> Optional[Any]: |
| | """Crop the region of a chunk from its page.""" |
| | page_image = self.get_page_image(chunk.page) |
| | if page_image is None: |
| | return None |
| |
|
| | from .grounding import crop_region |
| | return crop_region(page_image, chunk.bbox, padding_percent) |
| |
|
| | def get_tools_description(self) -> str: |
| | """Get description of available tools for agent prompts.""" |
| | tools = list_tools() |
| | lines = [] |
| | for tool in tools: |
| | lines.append(f"- {tool['name']}: {tool['description']}") |
| | return "\n".join(lines) |
| |
|
| | def execute_tool( |
| | self, |
| | tool_name: str, |
| | **kwargs |
| | ) -> ToolResult: |
| | """ |
| | Execute a document tool. |
| | |
| | Args: |
| | tool_name: Name of tool to execute |
| | **kwargs: Tool arguments |
| | |
| | Returns: |
| | ToolResult |
| | """ |
| | |
| | if "parse_result" not in kwargs and self._current_parse_result: |
| | kwargs["parse_result"] = self._current_parse_result |
| |
|
| | tool = get_tool(tool_name, llm_client=self.llm_client) |
| | return tool.execute(**kwargs) |
| |
|
| | @property |
| | def parse_result(self) -> Optional[ParseResult]: |
| | """Get current parse result.""" |
| | return self._current_parse_result |
| |
|
| | @property |
| | def document_id(self) -> Optional[str]: |
| | """Get current document ID.""" |
| | if self._current_parse_result: |
| | return self._current_parse_result.doc_id |
| | return None |
| |
|
| |
|
| | def create_enhanced_document_agent( |
| | llm_client: Any, |
| | config: Optional[AgentConfig] = None, |
| | ) -> "EnhancedDocumentAgent": |
| | """ |
| | Create an enhanced DocumentAgent with document_intelligence integration. |
| | |
| | Args: |
| | llm_client: LLM client for reasoning |
| | config: Agent configuration |
| | |
| | Returns: |
| | EnhancedDocumentAgent instance |
| | """ |
| | return EnhancedDocumentAgent(llm_client=llm_client, config=config) |
| |
|
| |
|
| | class EnhancedDocumentAgent: |
| | """ |
| | Enhanced DocumentAgent using document_intelligence subsystem. |
| | |
| | Extends the ReAct-style agent with: |
| | - Better parsing and chunking |
| | - Schema-driven extraction |
| | - Visual grounding |
| | - Evidence tracking |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | llm_client: Any, |
| | config: Optional[AgentConfig] = None, |
| | ): |
| | self.adapter = DocumentIntelligenceAdapter( |
| | config=config, |
| | llm_client=llm_client, |
| | ) |
| | self.llm_client = llm_client |
| | self.config = config or AgentConfig() |
| |
|
| | async def load_document(self, path: Union[str, Path]) -> ParseResult: |
| | """Load a document for processing.""" |
| | return self.adapter.load_document(path, render_pages=True) |
| |
|
| | async def extract_fields( |
| | self, |
| | schema: Union[ExtractionSchema, Dict], |
| | ) -> ExtractionResult: |
| | """Extract fields using schema.""" |
| | return self.adapter.extract_fields(schema, validate=True) |
| |
|
| | async def answer_question( |
| | self, |
| | question: str, |
| | ) -> Tuple[str, List[EvidenceRef]]: |
| | """Answer a question about the document.""" |
| | answer, evidence, confidence = self.adapter.answer_question(question) |
| | return answer, evidence |
| |
|
| | async def classify(self) -> ClassificationResult: |
| | """Classify the document type.""" |
| | if not self.adapter.parse_result: |
| | raise RuntimeError("No document loaded") |
| |
|
| | |
| | first_page_chunks = [ |
| | c for c in self.adapter.parse_result.chunks |
| | if c.page == 1 |
| | ][:5] |
| |
|
| | content = " ".join(c.text[:200] for c in first_page_chunks) |
| |
|
| | |
| | doc_type = DocumentType.OTHER |
| | confidence = 0.5 |
| |
|
| | type_keywords = { |
| | DocumentType.INVOICE: ["invoice", "bill", "payment due", "amount due"], |
| | DocumentType.CONTRACT: ["agreement", "contract", "party", "whereas"], |
| | DocumentType.RECEIPT: ["receipt", "paid", "transaction", "thank you"], |
| | DocumentType.FORM: ["form", "fill in", "checkbox", "signature line"], |
| | DocumentType.LETTER: ["dear", "sincerely", "regards"], |
| | DocumentType.REPORT: ["report", "findings", "conclusion", "summary"], |
| | DocumentType.PATENT: ["patent", "claims", "invention", "embodiment"], |
| | } |
| |
|
| | content_lower = content.lower() |
| | for dtype, keywords in type_keywords.items(): |
| | matches = sum(1 for k in keywords if k in content_lower) |
| | if matches > 0: |
| | doc_type = dtype |
| | confidence = min(0.9, 0.5 + matches * 0.15) |
| | break |
| |
|
| | return ClassificationResult( |
| | doc_id=self.adapter.document_id, |
| | document_type=doc_type, |
| | confidence=confidence, |
| | secondary_types=[], |
| | ) |
| |
|
| | def search( |
| | self, |
| | query: str, |
| | top_k: int = 10, |
| | ) -> List[Dict[str, Any]]: |
| | """Search document content.""" |
| | return self.adapter.search_chunks(query, top_k=top_k) |
| |
|
| | @property |
| | def current_document(self) -> Optional[ParseResult]: |
| | """Get current document.""" |
| | return self.adapter.parse_result |
| |
|