| | """ |
| | Core Data Models for Document Intelligence |
| | |
| | Comprehensive Pydantic models for: |
| | - Bounding boxes and spatial data |
| | - Document chunks (text, table, chart, form fields) |
| | - Evidence references for grounding |
| | - Parse results and document metadata |
| | |
| | Design principles: |
| | - Vision-first: treat documents as visual objects |
| | - Grounding: every extraction has evidence pointers |
| | - Stable IDs: reproducible, hash-based chunk identifiers |
| | - Schema-compatible: JSON export/import, Pydantic validation |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import hashlib |
| | import json |
| | from datetime import datetime |
| | from enum import Enum |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple, Union |
| |
|
| | from pydantic import BaseModel, Field, field_validator, model_validator |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class BoundingBox(BaseModel): |
| | """ |
| | Bounding box in XYXY format (x_min, y_min, x_max, y_max). |
| | |
| | Supports both pixel coordinates and normalized (0-1) coordinates. |
| | All spatial grounding uses this as the standard format. |
| | """ |
| | x_min: float = Field(..., description="Left edge (x1)") |
| | y_min: float = Field(..., description="Top edge (y1)") |
| | x_max: float = Field(..., description="Right edge (x2)") |
| | y_max: float = Field(..., description="Bottom edge (y2)") |
| |
|
| | |
| | normalized: bool = Field(default=False, description="True if 0-1 normalized") |
| | page_width: Optional[int] = Field(default=None, description="Page width in pixels") |
| | page_height: Optional[int] = Field(default=None, description="Page height in pixels") |
| |
|
| | @field_validator('x_max') |
| | @classmethod |
| | def validate_x_max(cls, v, info): |
| | if 'x_min' in info.data and v < info.data['x_min']: |
| | raise ValueError('x_max must be >= x_min') |
| | return v |
| |
|
| | @field_validator('y_max') |
| | @classmethod |
| | def validate_y_max(cls, v, info): |
| | if 'y_min' in info.data and v < info.data['y_min']: |
| | raise ValueError('y_max must be >= y_min') |
| | return v |
| |
|
| | @property |
| | def width(self) -> float: |
| | return self.x_max - self.x_min |
| |
|
| | @property |
| | def height(self) -> float: |
| | return self.y_max - self.y_min |
| |
|
| | @property |
| | def area(self) -> float: |
| | return self.width * self.height |
| |
|
| | @property |
| | def center(self) -> Tuple[float, float]: |
| | return ((self.x_min + self.x_max) / 2, (self.y_min + self.y_max) / 2) |
| |
|
| | @property |
| | def xyxy(self) -> Tuple[float, float, float, float]: |
| | """Return as (x_min, y_min, x_max, y_max).""" |
| | return (self.x_min, self.y_min, self.x_max, self.y_max) |
| |
|
| | @property |
| | def xywh(self) -> Tuple[float, float, float, float]: |
| | """Return as (x, y, width, height).""" |
| | return (self.x_min, self.y_min, self.width, self.height) |
| |
|
| | def to_pixel(self, width: int, height: int) -> BoundingBox: |
| | """Convert to pixel coordinates.""" |
| | if not self.normalized: |
| | return self |
| | return BoundingBox( |
| | x_min=int(self.x_min * width), |
| | y_min=int(self.y_min * height), |
| | x_max=int(self.x_max * width), |
| | y_max=int(self.y_max * height), |
| | normalized=False, |
| | page_width=width, |
| | page_height=height, |
| | ) |
| |
|
| | def to_normalized(self, width: int, height: int) -> BoundingBox: |
| | """Convert to normalized (0-1) coordinates.""" |
| | if self.normalized: |
| | return self |
| | return BoundingBox( |
| | x_min=self.x_min / width, |
| | y_min=self.y_min / height, |
| | x_max=self.x_max / width, |
| | y_max=self.y_max / height, |
| | normalized=True, |
| | page_width=width, |
| | page_height=height, |
| | ) |
| |
|
| | def iou(self, other: BoundingBox) -> float: |
| | """Calculate Intersection over Union.""" |
| | x1 = max(self.x_min, other.x_min) |
| | y1 = max(self.y_min, other.y_min) |
| | x2 = min(self.x_max, other.x_max) |
| | y2 = min(self.y_max, other.y_max) |
| |
|
| | if x2 < x1 or y2 < y1: |
| | return 0.0 |
| |
|
| | intersection = (x2 - x1) * (y2 - y1) |
| | union = self.area + other.area - intersection |
| | return intersection / union if union > 0 else 0.0 |
| |
|
| | def contains(self, other: BoundingBox) -> bool: |
| | """Check if this bbox fully contains another.""" |
| | return ( |
| | self.x_min <= other.x_min and |
| | self.y_min <= other.y_min and |
| | self.x_max >= other.x_max and |
| | self.y_max >= other.y_max |
| | ) |
| |
|
| | def expand(self, margin: float) -> BoundingBox: |
| | """Expand bbox by margin pixels.""" |
| | return BoundingBox( |
| | x_min=max(0, self.x_min - margin), |
| | y_min=max(0, self.y_min - margin), |
| | x_max=self.x_max + margin, |
| | y_max=self.y_max + margin, |
| | normalized=self.normalized, |
| | page_width=self.page_width, |
| | page_height=self.page_height, |
| | ) |
| |
|
| | def clip(self, max_width: float, max_height: float) -> BoundingBox: |
| | """Clip bbox to image boundaries.""" |
| | return BoundingBox( |
| | x_min=max(0, self.x_min), |
| | y_min=max(0, self.y_min), |
| | x_max=min(max_width, self.x_max), |
| | y_max=min(max_height, self.y_max), |
| | normalized=self.normalized, |
| | page_width=self.page_width, |
| | page_height=self.page_height, |
| | ) |
| |
|
| | @classmethod |
| | def from_xyxy(cls, xyxy: Tuple[float, float, float, float], **kwargs) -> BoundingBox: |
| | """Create from (x_min, y_min, x_max, y_max) tuple.""" |
| | return cls(x_min=xyxy[0], y_min=xyxy[1], x_max=xyxy[2], y_max=xyxy[3], **kwargs) |
| |
|
| | @classmethod |
| | def from_xywh(cls, xywh: Tuple[float, float, float, float], **kwargs) -> BoundingBox: |
| | """Create from (x, y, width, height) tuple.""" |
| | x, y, w, h = xywh |
| | return cls(x_min=x, y_min=y, x_max=x + w, y_max=y + h, **kwargs) |
| |
|
| | def __hash__(self): |
| | return hash((self.x_min, self.y_min, self.x_max, self.y_max)) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ChunkType(str, Enum): |
| | """ |
| | Semantic chunk types for document segmentation. |
| | |
| | Covers text, tables, figures, charts, forms, and structural elements. |
| | Used for routing chunks to specialized extraction logic. |
| | """ |
| | |
| | TEXT = "text" |
| | TITLE = "title" |
| | HEADING = "heading" |
| | PARAGRAPH = "paragraph" |
| | LIST = "list" |
| | LIST_ITEM = "list_item" |
| |
|
| | |
| | TABLE = "table" |
| | TABLE_CELL = "table_cell" |
| | FIGURE = "figure" |
| | CHART = "chart" |
| | FORMULA = "formula" |
| | CODE = "code" |
| |
|
| | |
| | FORM_FIELD = "form_field" |
| | CHECKBOX = "checkbox" |
| | SIGNATURE = "signature" |
| | STAMP = "stamp" |
| | HANDWRITING = "handwriting" |
| |
|
| | |
| | HEADER = "header" |
| | FOOTER = "footer" |
| | PAGE_NUMBER = "page_number" |
| | CAPTION = "caption" |
| | FOOTNOTE = "footnote" |
| | WATERMARK = "watermark" |
| | LOGO = "logo" |
| |
|
| | |
| | METADATA = "metadata" |
| | UNKNOWN = "unknown" |
| |
|
| |
|
| | class ConfidenceLevel(str, Enum): |
| | """Confidence level classification.""" |
| | HIGH = "high" |
| | MEDIUM = "medium" |
| | LOW = "low" |
| | VERY_LOW = "very_low" |
| |
|
| | @classmethod |
| | def from_score(cls, score: float) -> ConfidenceLevel: |
| | if score >= 0.9: |
| | return cls.HIGH |
| | elif score >= 0.7: |
| | return cls.MEDIUM |
| | elif score >= 0.5: |
| | return cls.LOW |
| | else: |
| | return cls.VERY_LOW |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class DocumentChunk(BaseModel): |
| | """ |
| | Base document chunk with text and grounding evidence. |
| | |
| | This is the fundamental unit for retrieval and extraction. |
| | Every chunk has: |
| | - Stable, reproducible chunk_id (hash-based) |
| | - Precise spatial grounding (page, bbox) |
| | - Confidence score for quality assessment |
| | """ |
| | |
| | chunk_id: str = Field(..., description="Unique, stable chunk identifier") |
| | doc_id: str = Field(..., description="Parent document identifier") |
| |
|
| | |
| | chunk_type: ChunkType = Field(..., description="Semantic type") |
| | text: str = Field(..., description="Text content") |
| |
|
| | |
| | page: int = Field(..., ge=0, description="Zero-indexed page number") |
| | bbox: BoundingBox = Field(..., description="Bounding box on page") |
| |
|
| | |
| | confidence: float = Field(default=1.0, ge=0.0, le=1.0, description="Extraction confidence") |
| |
|
| | |
| | sequence_index: int = Field(default=0, ge=0, description="Position in reading order") |
| |
|
| | |
| | source_path: Optional[str] = Field(default=None, description="Original file path") |
| |
|
| | |
| | parent_id: Optional[str] = Field(default=None, description="Parent chunk ID") |
| | children_ids: List[str] = Field(default_factory=list, description="Child chunk IDs") |
| |
|
| | |
| | caption: Optional[str] = Field(default=None, description="Caption if applicable") |
| |
|
| | |
| | warnings: List[str] = Field(default_factory=list, description="Quality warnings") |
| |
|
| | |
| | extra: Dict[str, Any] = Field(default_factory=dict, description="Type-specific metadata") |
| |
|
| | |
| | embedding: Optional[List[float]] = Field(default=None, exclude=True) |
| |
|
| | @property |
| | def confidence_level(self) -> ConfidenceLevel: |
| | return ConfidenceLevel.from_score(self.confidence) |
| |
|
| | @property |
| | def needs_review(self) -> bool: |
| | """Check if chunk needs human review.""" |
| | return self.confidence < 0.7 or len(self.warnings) > 0 |
| |
|
| | def content_hash(self) -> str: |
| | """Generate hash of chunk content for deduplication.""" |
| | content = f"{self.doc_id}:{self.page}:{self.chunk_type.value}:{self.text[:200]}" |
| | return hashlib.sha256(content.encode()).hexdigest()[:16] |
| |
|
| | @staticmethod |
| | def generate_chunk_id( |
| | doc_id: str, |
| | page: int, |
| | bbox: BoundingBox, |
| | chunk_type: ChunkType, |
| | ) -> str: |
| | """ |
| | Generate a stable, reproducible chunk ID. |
| | |
| | Uses hash of (doc_id, page, bbox, type) for reproducibility. |
| | """ |
| | bbox_str = f"{bbox.x_min:.2f},{bbox.y_min:.2f},{bbox.x_max:.2f},{bbox.y_max:.2f}" |
| | content = f"{doc_id}:p{page}:{bbox_str}:{chunk_type.value}" |
| | return hashlib.sha256(content.encode()).hexdigest()[:16] |
| |
|
| | def to_retrieval_metadata(self) -> Dict[str, Any]: |
| | """Convert to metadata dict for vector store.""" |
| | return { |
| | "chunk_id": self.chunk_id, |
| | "doc_id": self.doc_id, |
| | "chunk_type": self.chunk_type.value, |
| | "page": self.page, |
| | "bbox_xyxy": list(self.bbox.xyxy), |
| | "confidence": self.confidence, |
| | "sequence_index": self.sequence_index, |
| | "source_path": self.source_path, |
| | } |
| |
|
| | def __hash__(self): |
| | return hash(self.chunk_id) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TableCell(BaseModel): |
| | """A single cell in a table.""" |
| | cell_id: str = Field(..., description="Unique cell identifier") |
| | row: int = Field(..., ge=0, description="Row index (0-based)") |
| | col: int = Field(..., ge=0, description="Column index (0-based)") |
| | text: str = Field(default="", description="Cell text content") |
| | bbox: Optional[BoundingBox] = Field(default=None, description="Cell bounding box") |
| |
|
| | |
| | rowspan: int = Field(default=1, ge=1, description="Number of rows spanned") |
| | colspan: int = Field(default=1, ge=1, description="Number of columns spanned") |
| |
|
| | |
| | is_header: bool = Field(default=False, description="Is header cell") |
| |
|
| | confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
| |
|
| |
|
| | class TableChunk(DocumentChunk): |
| | """ |
| | Specialized chunk for tables with structured cell data. |
| | |
| | Preserves row/column structure and supports merged cells. |
| | """ |
| | chunk_type: ChunkType = Field(default=ChunkType.TABLE) |
| |
|
| | |
| | cells: List[TableCell] = Field(default_factory=list, description="All table cells") |
| | num_rows: int = Field(default=0, ge=0, description="Number of rows") |
| | num_cols: int = Field(default=0, ge=0, description="Number of columns") |
| |
|
| | |
| | header_rows: List[int] = Field(default_factory=list, description="Header row indices") |
| | header_cols: List[int] = Field(default_factory=list, description="Header column indices") |
| |
|
| | |
| | has_merged_cells: bool = Field(default=False) |
| | table_title: Optional[str] = Field(default=None) |
| |
|
| | def get_cell(self, row: int, col: int) -> Optional[TableCell]: |
| | """Get cell at specific position.""" |
| | for cell in self.cells: |
| | if cell.row == row and cell.col == col: |
| | return cell |
| | |
| | if (cell.row <= row < cell.row + cell.rowspan and |
| | cell.col <= col < cell.col + cell.colspan): |
| | return cell |
| | return None |
| |
|
| | def get_row(self, row: int) -> List[TableCell]: |
| | """Get all cells in a row.""" |
| | return [c for c in self.cells if c.row == row] |
| |
|
| | def get_column(self, col: int) -> List[TableCell]: |
| | """Get all cells in a column.""" |
| | return [c for c in self.cells if c.col == col] |
| |
|
| | def to_csv(self) -> str: |
| | """Export table to CSV format.""" |
| | import io |
| | import csv |
| |
|
| | output = io.StringIO() |
| | writer = csv.writer(output) |
| |
|
| | for row_idx in range(self.num_rows): |
| | row_data = [] |
| | for col_idx in range(self.num_cols): |
| | cell = self.get_cell(row_idx, col_idx) |
| | row_data.append(cell.text if cell else "") |
| | writer.writerow(row_data) |
| |
|
| | return output.getvalue() |
| |
|
| | def to_markdown(self) -> str: |
| | """Export table to Markdown format.""" |
| | lines = [] |
| |
|
| | for row_idx in range(self.num_rows): |
| | row_cells = [] |
| | for col_idx in range(self.num_cols): |
| | cell = self.get_cell(row_idx, col_idx) |
| | row_cells.append(cell.text if cell else "") |
| | lines.append("| " + " | ".join(row_cells) + " |") |
| |
|
| | |
| | if row_idx == 0 or row_idx in self.header_rows: |
| | lines.append("| " + " | ".join(["---"] * self.num_cols) + " |") |
| |
|
| | return "\n".join(lines) |
| |
|
| | def to_structured_json(self) -> Dict[str, Any]: |
| | """Export table to structured JSON with headers.""" |
| | |
| | headers = [] |
| | if self.header_rows: |
| | for col_idx in range(self.num_cols): |
| | cell = self.get_cell(self.header_rows[0], col_idx) |
| | headers.append(cell.text if cell else f"col_{col_idx}") |
| | else: |
| | headers = [f"col_{i}" for i in range(self.num_cols)] |
| |
|
| | |
| | data_start = max(self.header_rows) + 1 if self.header_rows else 0 |
| | rows = [] |
| |
|
| | for row_idx in range(data_start, self.num_rows): |
| | row_dict = {} |
| | for col_idx, header in enumerate(headers): |
| | cell = self.get_cell(row_idx, col_idx) |
| | row_dict[header] = cell.text if cell else "" |
| | rows.append(row_dict) |
| |
|
| | return { |
| | "headers": headers, |
| | "rows": rows, |
| | "num_rows": self.num_rows - len(self.header_rows), |
| | "num_cols": self.num_cols, |
| | } |
| |
|
| |
|
| | class ChartDataPoint(BaseModel): |
| | """A data point in a chart.""" |
| | label: Optional[str] = None |
| | value: Optional[float] = None |
| | category: Optional[str] = None |
| | series: Optional[str] = None |
| | confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
| |
|
| |
|
| | class ChartChunk(DocumentChunk): |
| | """ |
| | Specialized chunk for charts/graphs with structured interpretation. |
| | |
| | Extracts title, axes, series, and key values from visualizations. |
| | """ |
| | chunk_type: ChunkType = Field(default=ChunkType.CHART) |
| |
|
| | |
| | chart_type: Optional[str] = Field(default=None, description="bar, line, pie, scatter, etc.") |
| | title: Optional[str] = Field(default=None) |
| |
|
| | |
| | x_axis_label: Optional[str] = Field(default=None) |
| | y_axis_label: Optional[str] = Field(default=None) |
| | x_axis_unit: Optional[str] = Field(default=None) |
| | y_axis_unit: Optional[str] = Field(default=None) |
| |
|
| | |
| | series_names: List[str] = Field(default_factory=list) |
| | data_points: List[ChartDataPoint] = Field(default_factory=list) |
| |
|
| | |
| | key_values: Dict[str, Any] = Field(default_factory=dict, description="Key numeric values") |
| | trends: List[str] = Field(default_factory=list, description="Identified trends") |
| | summary: Optional[str] = Field(default=None, description="Natural language summary") |
| |
|
| | def to_structured_json(self) -> Dict[str, Any]: |
| | """Export chart data as structured JSON.""" |
| | return { |
| | "chart_type": self.chart_type, |
| | "title": self.title, |
| | "axes": { |
| | "x": {"label": self.x_axis_label, "unit": self.x_axis_unit}, |
| | "y": {"label": self.y_axis_label, "unit": self.y_axis_unit}, |
| | }, |
| | "series": self.series_names, |
| | "data_points": [dp.model_dump() for dp in self.data_points], |
| | "key_values": self.key_values, |
| | "trends": self.trends, |
| | "summary": self.summary, |
| | } |
| |
|
| |
|
| | class FormFieldChunk(DocumentChunk): |
| | """ |
| | Specialized chunk for form fields. |
| | |
| | Handles text fields, checkboxes, radio buttons, signatures. |
| | """ |
| | chunk_type: ChunkType = Field(default=ChunkType.FORM_FIELD) |
| |
|
| | |
| | field_name: Optional[str] = Field(default=None, description="Field label/name") |
| | field_value: Optional[str] = Field(default=None, description="Extracted value") |
| | field_type: str = Field(default="text", description="text, checkbox, signature, date, etc.") |
| |
|
| | |
| | is_checked: Optional[bool] = Field(default=None) |
| | options: List[str] = Field(default_factory=list) |
| |
|
| | |
| | is_required: bool = Field(default=False) |
| | is_filled: bool = Field(default=False) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class EvidenceRef(BaseModel): |
| | """ |
| | Evidence reference for grounding extractions. |
| | |
| | Links every extracted value back to its source in the document. |
| | Required for auditability and trust. |
| | """ |
| | |
| | chunk_id: str = Field(..., description="Source chunk ID") |
| | doc_id: str = Field(..., description="Document ID") |
| | page: int = Field(..., ge=0, description="Page number (0-indexed)") |
| | bbox: BoundingBox = Field(..., description="Bounding box of evidence") |
| |
|
| | |
| | source_type: str = Field(..., description="text, table, chart, form_field, etc.") |
| | snippet: str = Field(..., max_length=1000, description="Text snippet as evidence") |
| |
|
| | |
| | confidence: float = Field(..., ge=0.0, le=1.0, description="Evidence confidence") |
| |
|
| | |
| | cell_id: Optional[str] = Field(default=None, description="Table cell ID if applicable") |
| |
|
| | |
| | crop_path: Optional[str] = Field(default=None, description="Path to cropped image") |
| | image_base64: Optional[str] = Field(default=None, description="Base64 encoded crop") |
| |
|
| | |
| | warnings: List[str] = Field(default_factory=list) |
| |
|
| | @property |
| | def needs_review(self) -> bool: |
| | return self.confidence < 0.7 or len(self.warnings) > 0 |
| |
|
| | def to_citation(self, include_bbox: bool = False) -> str: |
| | """Format as human-readable citation.""" |
| | citation = f"[Page {self.page + 1}, {self.source_type}]" |
| | if include_bbox: |
| | citation += f" @ ({self.bbox.x_min:.0f}, {self.bbox.y_min:.0f})" |
| | citation += f': "{self.snippet[:100]}..."' if len(self.snippet) > 100 else f': "{self.snippet}"' |
| | return citation |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class PageResult(BaseModel): |
| | """Result of parsing a single page.""" |
| | page_num: int = Field(..., ge=0, description="Page number (0-indexed)") |
| | width: int = Field(..., gt=0, description="Page width in pixels") |
| | height: int = Field(..., gt=0, description="Page height in pixels") |
| |
|
| | |
| | chunks: List[DocumentChunk] = Field(default_factory=list) |
| | markdown: str = Field(default="", description="Page content as Markdown") |
| |
|
| | |
| | ocr_confidence: Optional[float] = Field(default=None) |
| | layout_confidence: Optional[float] = Field(default=None) |
| |
|
| | |
| | image_path: Optional[str] = Field(default=None, description="Path to rendered page image") |
| |
|
| |
|
| | class ParseResult(BaseModel): |
| | """ |
| | Complete result of document parsing. |
| | |
| | Contains all parsed content with metadata for downstream processing. |
| | """ |
| | |
| | doc_id: str = Field(..., description="Unique document identifier") |
| | source_path: str = Field(..., description="Original file path") |
| | filename: str = Field(..., description="Original filename") |
| |
|
| | |
| | file_type: str = Field(..., description="pdf, png, jpg, tiff, etc.") |
| | file_size_bytes: int = Field(default=0, ge=0) |
| | file_hash: Optional[str] = Field(default=None, description="SHA256 of file content") |
| |
|
| | |
| | num_pages: int = Field(..., ge=1) |
| | pages: List[PageResult] = Field(default_factory=list) |
| |
|
| | |
| | chunks: List[DocumentChunk] = Field(default_factory=list) |
| |
|
| | |
| | markdown_full: str = Field(default="", description="Full document as Markdown") |
| | markdown_by_page: Dict[int, str] = Field(default_factory=dict) |
| |
|
| | |
| | parsed_at: datetime = Field(default_factory=datetime.utcnow) |
| | processing_time_ms: float = Field(default=0.0) |
| |
|
| | |
| | avg_ocr_confidence: Optional[float] = Field(default=None) |
| | avg_layout_confidence: Optional[float] = Field(default=None) |
| |
|
| | |
| | detected_language: Optional[str] = Field(default=None) |
| |
|
| | |
| | models_used: Dict[str, str] = Field(default_factory=dict, description="Model name -> version") |
| |
|
| | |
| | warnings: List[str] = Field(default_factory=list) |
| | errors: List[str] = Field(default_factory=list) |
| |
|
| | |
| | metadata: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| | @property |
| | def is_successful(self) -> bool: |
| | return len(self.errors) == 0 and len(self.chunks) > 0 |
| |
|
| | @property |
| | def has_tables(self) -> bool: |
| | return any(c.chunk_type == ChunkType.TABLE for c in self.chunks) |
| |
|
| | @property |
| | def has_charts(self) -> bool: |
| | return any(c.chunk_type == ChunkType.CHART for c in self.chunks) |
| |
|
| | def get_chunks_by_type(self, chunk_type: ChunkType) -> List[DocumentChunk]: |
| | return [c for c in self.chunks if c.chunk_type == chunk_type] |
| |
|
| | def get_chunks_by_page(self, page: int) -> List[DocumentChunk]: |
| | return [c for c in self.chunks if c.page == page] |
| |
|
| | def get_tables(self) -> List[TableChunk]: |
| | return [c for c in self.chunks if isinstance(c, TableChunk)] |
| |
|
| | def get_charts(self) -> List[ChartChunk]: |
| | return [c for c in self.chunks if isinstance(c, ChartChunk)] |
| |
|
| | def to_json(self, indent: int = 2) -> str: |
| | """Serialize to JSON.""" |
| | return self.model_dump_json(indent=indent) |
| |
|
| | @classmethod |
| | def from_json(cls, json_str: str) -> ParseResult: |
| | """Deserialize from JSON.""" |
| | return cls.model_validate_json(json_str) |
| |
|
| | def save(self, path: Union[str, Path]): |
| | """Save to JSON file.""" |
| | Path(path).write_text(self.to_json(), encoding="utf-8") |
| |
|
| | @classmethod |
| | def load(cls, path: Union[str, Path]) -> ParseResult: |
| | """Load from JSON file.""" |
| | return cls.from_json(Path(path).read_text(encoding="utf-8")) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class FieldExtraction(BaseModel): |
| | """ |
| | Single extracted field with evidence. |
| | """ |
| | field_name: str = Field(..., description="Schema field name") |
| | value: Any = Field(..., description="Extracted value") |
| | value_type: str = Field(..., description="string, number, boolean, array, object") |
| |
|
| | |
| | evidence: List[EvidenceRef] = Field(default_factory=list) |
| | confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
| |
|
| | |
| | is_valid: bool = Field(default=True) |
| | validation_errors: List[str] = Field(default_factory=list) |
| |
|
| | |
| | abstained: bool = Field(default=False) |
| | abstain_reason: Optional[str] = Field(default=None) |
| |
|
| |
|
| | class ExtractionResult(BaseModel): |
| | """ |
| | Complete extraction result with data, evidence, and validation. |
| | """ |
| | |
| | data: Dict[str, Any] = Field(default_factory=dict) |
| | fields: List[FieldExtraction] = Field(default_factory=list) |
| |
|
| | |
| | evidence: List[EvidenceRef] = Field(default_factory=list) |
| |
|
| | |
| | overall_confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
| |
|
| | |
| | validation_passed: bool = Field(default=True) |
| | validation_errors: List[str] = Field(default_factory=list) |
| | validation_warnings: List[str] = Field(default_factory=list) |
| |
|
| | |
| | abstained_fields: List[str] = Field(default_factory=list) |
| |
|
| | |
| | processing_time_ms: float = Field(default=0.0) |
| | model_used: Optional[str] = Field(default=None) |
| |
|
| | @property |
| | def is_grounded(self) -> bool: |
| | """Check if all fields have evidence.""" |
| | return all(f.evidence for f in self.fields if not f.abstained) |
| |
|
| | @property |
| | def needs_review(self) -> bool: |
| | """Check if result needs human review.""" |
| | return ( |
| | self.overall_confidence < 0.7 or |
| | len(self.abstained_fields) > 0 or |
| | not self.validation_passed |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class DocumentType(str, Enum): |
| | """Document type classifications.""" |
| | INVOICE = "invoice" |
| | CONTRACT = "contract" |
| | AGREEMENT = "agreement" |
| | PATENT = "patent" |
| | RESEARCH_PAPER = "research_paper" |
| | REPORT = "report" |
| | LETTER = "letter" |
| | FORM = "form" |
| | RECEIPT = "receipt" |
| | BANK_STATEMENT = "bank_statement" |
| | TAX_DOCUMENT = "tax_document" |
| | ID_DOCUMENT = "id_document" |
| | MEDICAL_RECORD = "medical_record" |
| | LEGAL_DOCUMENT = "legal_document" |
| | TECHNICAL_SPEC = "technical_spec" |
| | PRESENTATION = "presentation" |
| | SPREADSHEET = "spreadsheet" |
| | EMAIL = "email" |
| | OTHER = "other" |
| | UNKNOWN = "unknown" |
| |
|
| |
|
| | class ClassificationResult(BaseModel): |
| | """Document classification result.""" |
| | doc_id: str |
| | doc_type: DocumentType |
| | confidence: float = Field(ge=0.0, le=1.0) |
| |
|
| | |
| | alternatives: List[Tuple[DocumentType, float]] = Field(default_factory=list) |
| |
|
| | |
| | evidence: List[EvidenceRef] = Field(default_factory=list) |
| | reasoning: Optional[str] = Field(default=None) |
| |
|
| | |
| | is_confident: bool = Field(default=True) |
| |
|