Docgenie-API / api /utils.py
Ahadhassan-2003
deploy: update HF Space
dc4e6da
"""
Core processing utilities for DocGenie document generation pipeline.
Integrated functionality (All 19 Stages):
- Stage 1-2: Seed selection, LLM prompting, response processing, PDF rendering, bbox extraction
- Stage 3: Handwriting & visual element synthesis (WordStylist diffusion, stamps, barcodes, logos)
- Stage 4: Image finalization & OCR (pdf2image, Microsoft Document Intelligence)
- Stage 5: Dataset packaging (bbox normalization, GT verification, analysis, debug viz)
References generationfolder for core pipeline logic.
"""
import asyncio
import base64
import json
import pathlib
import tempfile
import time
import uuid
import re
import io
import random
import fitz # PyMuPDF
import Levenshtein
from PIL import Image, ImageEnhance, ImageDraw, ImageFont
from typing import List, Tuple, Optional, Dict, Any, Callable
# Anthropic Pricing (USD per 1M tokens)
# Research-grade pricing for exact cost tracking
ANTHROPIC_PRICING = {
"claude-sonnet-4-20250514": {
"input": 3.00,
"output": 15.00,
"cache_write": 3.75,
"cache_read": 0.30,
},
"claude-sonnet-4-5-20250929": {
"input": 3.00,
"output": 15.00,
"cache_write": 3.75,
"cache_read": 0.30,
},
"claude-haiku-4-5-20251001": {
"input": 1.00,
"output": 5.00,
"cache_write": 1.25,
"cache_read": 0.10,
},
}
def calculate_message_cost(
model: str,
input_tokens: int,
output_tokens: int,
cache_creation_input_tokens: int = 0,
cache_read_input_tokens: int = 0,
) -> float:
"""
Calculate the cost of a single message based on token usage.
Research-grade implementation matching pipeline_01/cost.py.
"""
# Use Sonnet 4.5 pricing as default if model unknown
pricing = ANTHROPIC_PRICING.get(model, ANTHROPIC_PRICING["claude-sonnet-4-5-20250929"])
regular_input_tokens = (
input_tokens - cache_creation_input_tokens - cache_read_input_tokens
)
cost_usd = (
(regular_input_tokens / 1_000_000) * pricing["input"]
+ (output_tokens / 1_000_000) * pricing["output"]
+ (cache_creation_input_tokens / 1_000_000) * pricing["cache_write"]
+ (cache_read_input_tokens / 1_000_000) * pricing["cache_read"]
)
return cost_usd
def retry_on_network_error(func: Callable, max_retries: int = 3, delay: float = 2.0) -> Any:
"""
Retry a function on network errors with exponential backoff.
Args:
func: Function to execute (must be callable with no args)
max_retries: Maximum number of retry attempts
delay: Initial delay in seconds (doubles each retry)
Returns:
Result of the function call
Raises:
Last exception if all retries fail
"""
last_exception = None
for attempt in range(max_retries):
try:
return func()
except Exception as e:
last_exception = e
error_str = str(e).lower()
# Retry on network/DNS errors
if any(err in error_str for err in ['name resolution', 'connection', 'timeout', 'network']):
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt)
print(f"[Retry {attempt + 1}/{max_retries}] Network error, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
continue
# Non-network error or last attempt
raise
# All retries exhausted
raise last_exception
def ensure_max_dimensions(img: Image.Image, max_dim: int = 8000) -> Image.Image:
"""
Ensure image dimensions do not exceed max_dim (Claude API limit is 8000px).
Resizes image proportionally if necessary.
"""
w, h = img.size
if w > max_dim or h > max_dim:
if w > h:
new_w = max_dim
new_h = int(h * (max_dim / w))
else:
new_h = max_dim
new_w = int(w * (max_dim / h))
print(f" ⚠️ Image dimensions {w}x{h} exceed {max_dim}px limit. Resizing to {new_w}x{new_h}px.")
return img.resize((new_w, new_h), Image.Resampling.LANCZOS)
return img
from io import BytesIO
import requests
import httpx
from PIL import Image
from pdf2image import convert_from_path
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
import fitz # PyMuPDF for PDF processing
from docgenie.generation.constants import BS_PARSER, HANDWRITING_CLASS_NAME, VISUAL_ELEMENT_TYPE_SYNONYMS
from docgenie.generation.pipeline_01.claude_batching import ClaudeBatchedClient, create_message
from docgenie.generation.pipeline_03_process_response import (
extract_html_documents_from_text,
extract_gt,
)
from docgenie.generation.pipeline_03.css import (
increase_handwriting_font_size,
unmark_visual_elements,
)
from docgenie.generation.pipeline_04_render_pdf_and_extract_geos import (
render_pdf_async,
preprocess_html_for_pdf,
)
from docgenie.generation.pipeline_04.extract_bbox import extract_bboxes_from_pdf
# Stage 3 imports - we implement simplified versions directly in this file
# The full pipeline functions are available but require SynDatasetDefinition
# For API use, we extract elements directly from HTML/CSS
from docgenie.generation.utils.pdfjs import MEASURE_DIMENSIONS
from docgenie.generation.utils.stamp import create_stamp
from docgenie import ENV
# Import config for handwriting service URL
from .config import settings
async def download_image_to_base64(url: str) -> str:
"""
Download image or PDF from URL and convert to base64 JPEG.
If URL points to a PDF, converts the first page to an image.
Args:
url: Image or PDF URL
Returns:
Base64-encoded JPEG image string
"""
max_retries = 3
last_err = None
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
break
except Exception as e:
last_err = e
if attempt < max_retries - 1:
wait = 2 * (attempt + 1)
print(f" ⚠ Download failed, retrying in {wait}s: {e}")
time.sleep(wait)
else:
raise last_err
content_type = response.headers.get('Content-Type', '').lower()
is_pdf = 'application/pdf' in content_type or url.lower().endswith('.pdf')
if is_pdf:
# Handle PDF: convert first page to image
print(f" 📄 Detected PDF, converting first page to image: {url[:80]}...")
# Load PDF from bytes
pdf_document = fitz.open(stream=response.content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
# Render first page to image at high DPI
page = pdf_document[0]
# Use 300 DPI for high quality (matrix zoom factor = DPI/72)
zoom = 300 / 72
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert pixmap to PIL Image
img_data = pix.tobytes("png")
img = Image.open(BytesIO(img_data))
pdf_document.close()
print(f" ✓ Converted PDF to image: {img.size[0]}x{img.size[1]}px")
else:
# Handle regular image
img = Image.open(BytesIO(response.content))
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
# Ensure dimensions are within Claude API limits (8000px)
img = ensure_max_dimensions(img)
# Save as JPEG in memory
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=95)
buffer.seek(0)
# Encode to base64
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
return img_base64
def download_seed_images(urls: List[str]) -> List[str]:
"""
Download multiple seed images/PDFs and convert to base64 (synchronous version for worker).
If a URL points to a PDF, converts the first page to an image.
Implements retry logic for transient HTTP errors (503, 502, 504, 429).
Args:
urls: List of image or PDF URLs
Returns:
List of base64-encoded JPEG image strings
"""
images = []
for url in urls:
# Retry logic for transient HTTP errors
max_retries = 3
response = None
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
break # Success, exit retry loop
except requests.exceptions.HTTPError as e:
# Retry on transient server errors
if e.response.status_code in [502, 503, 504, 429]:
if attempt < max_retries - 1:
wait_time = 2 * (2 ** attempt) # Exponential backoff: 2s, 4s, 8s
print(f" ⚠️ HTTP {e.response.status_code} error downloading seed image, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...")
time.sleep(wait_time)
continue
# Non-retryable error or last attempt
raise
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if attempt < max_retries - 1:
wait_time = 2 * (2 ** attempt)
print(f" ⚠️ Network error downloading seed image, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries}): {e}")
time.sleep(wait_time)
continue
raise
if response is None:
raise Exception(f"Failed to download seed image after {max_retries} attempts")
content_type = response.headers.get('Content-Type', '').lower()
is_pdf = 'application/pdf' in content_type or url.lower().endswith('.pdf')
if is_pdf:
# Handle PDF: convert first page to image
print(f" 📄 Detected PDF, converting first page to image: {url[:80]}...")
# Load PDF from bytes
pdf_document = fitz.open(stream=response.content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
# Render first page to image at high DPI
page = pdf_document[0]
# Use 300 DPI for high quality (matrix zoom factor = DPI/72)
zoom = 300 / 72
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert pixmap to PIL Image
img_data = pix.tobytes("png")
img = Image.open(BytesIO(img_data))
pdf_document.close()
print(f" ✓ Converted PDF to image: {img.size[0]}x{img.size[1]}px")
else:
# Handle regular image
img = Image.open(BytesIO(response.content))
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
# Ensure dimensions are within Claude API limits (8000px)
img = ensure_max_dimensions(img)
# Save as JPEG in memory
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=95)
buffer.seek(0)
# Encode to base64
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
images.append(img_base64)
return images
def build_prompt(
language: str,
doc_type: str,
gt_type: str,
gt_format: str,
num_solutions: int,
num_seed_images: int,
prompt_template_path: pathlib.Path,
enable_visual_elements: bool = True,
visual_element_types: List[str] = None
) -> str:
"""
Build the system prompt by injecting parameters into template.
Args:
language: Language for documents
doc_type: Type of documents
gt_type: Ground truth type description
gt_format: Ground truth format specification
num_solutions: Number of documents to generate
num_seed_images: Number of seed images provided
prompt_template_path: Path to prompt template file
enable_visual_elements: Whether to include visual element instructions
visual_element_types: List of allowed visual element types
Returns:
Formatted prompt string
"""
template = prompt_template_path.read_text(encoding='utf-8')
# Handle dynamic Visual Placeholders block
import re
# Define placeholder block pattern
ve_block_pattern = r"## Visual Placeholders \(if document type requires\)\n(.*?)\n\n"
if not enable_visual_elements or not visual_element_types:
# Remove the whole block
template = re.sub(ve_block_pattern, "", template, flags=re.DOTALL)
# Also remove the checklist item
template = template.replace("- [ ] Visual elements are semantically coherent\n", "")
else:
# Update the block with specific types
types_str = ", ".join(visual_element_types)
# Example mapping
EXAMPLES = {
"stamp": '- Example: `<div data-placeholder="stamp" data-content="APPROVED 2024-03-15" style="position:absolute;top:50mm;right:20mm;width:35mm;height:35mm;z-index:10;"></div>`',
"logo": '- Example: `<div data-placeholder="logo" data-content="ACME Corp Logo" style="width:150mm;height:100mm;"></div>`',
"figure": '- Example: `<div data-placeholder="figure" data-content="Sales Chart 2023" style="width:120mm;height:80mm;"></div>`',
"barcode": '- Example: `<div data-placeholder="barcode" data-content="SKU-12345678" style="width:60mm;height:25mm;"></div>`',
"photo": '- Example: `<div data-placeholder="photo" data-content="Customer Portrait" style="width:40mm;height:50mm;"></div>`'
}
# Select examples
selected_examples = []
for t in visual_element_types:
if t in EXAMPLES:
selected_examples.append(EXAMPLES[t])
if len(selected_examples) >= 2:
break
# Fallback if somehow no types matched (shouldn't happen with valid types)
if len(selected_examples) == 0:
selected_examples = [EXAMPLES["logo"], EXAMPLES["stamp"]]
new_block = [
"## Visual Placeholders (if document type requires)",
"- Insert `<div data-placeholder=\"type\" style=\"...\">` for non-text elements at appropriate positions",
f"- Valid types are: {types_str}",
"- Add data-content attribute with actual content description",
"- For stamps, use `position:absolute;z-index:10;` and specify 'top' and 'right'" if "stamp" in visual_element_types else None,
"- Always provide appropiate dimensions",
]
# Add the selected examples (either 1 or 2)
new_block.extend(selected_examples)
# Filter out None and join
new_block_str = "\n".join([line for line in new_block if line is not None]) + "\n\n"
template = re.sub(ve_block_pattern, new_block_str, template, flags=re.DOTALL)
# Inject parameters into template
prompt = template.format(
language=language,
doc_type=doc_type,
gt_type=gt_type,
gt_format=gt_format,
num_solutions=num_solutions,
num_seed_images=num_seed_images
)
return prompt
async def call_claude_api_direct(
prompt: str,
seed_images_base64: List[str],
api_key: str,
model: str = "claude-sonnet-4-5-20250929",
max_tokens: int = 16384
) -> str:
"""
Call Claude API directly (non-batched) with prompt and seed images.
Used for API endpoint for immediate synchronous responses.
Args:
prompt: System prompt
seed_images_base64: List of base64-encoded seed images
api_key: Anthropic API key
model: Claude model name
max_tokens: Maximum tokens for response
Returns:
Raw LLM response text
"""
import anthropic
client = anthropic.Anthropic(api_key=api_key)
# Build message using the same format as batched client
message_content = create_message(prompt=prompt, images_base64=seed_images_base64)
# Call API with prompt caching enabled
message = client.messages.create(
model=model,
max_tokens=max_tokens,
messages=[message_content],
)
# Extract text response
response_text = ""
for block in message.content:
if block.type == "text":
response_text += block.text
# Extract usage metadata
usage = {
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
"cache_creation_tokens": getattr(message.usage, "cache_creation_input_tokens", 0),
"cache_read_tokens": getattr(message.usage, "cache_read_input_tokens", 0),
"model": model
}
return {
"response": response_text,
"usage": usage
}
def extract_html_documents_from_response(response_text: str) -> List[str]:
"""
Extract individual HTML documents from LLM response.
Uses pipeline_03 function for consistency.
Args:
response_text: Raw LLM response
Returns:
List of HTML document strings
"""
# Use the pipeline function for HTML extraction
return extract_html_documents_from_text(text=response_text)
def extract_ground_truth(html: str) -> Tuple[Optional[dict], str]:
"""
Extract ground truth JSON from HTML and return cleaned HTML.
Uses pipeline_03 function for consistency.
Args:
html: HTML document with embedded GT
Returns:
Tuple of (ground_truth_dict, html_without_gt)
"""
# Use the pipeline function
raw_json, html_clean, soup = extract_gt(html=html)
if raw_json:
try:
gt_dict = json.loads(raw_json)
return gt_dict, html_clean
except json.JSONDecodeError:
return None, html
return None, html
def extract_css_from_html(html: str) -> Tuple[str, str]:
"""
Extract CSS from HTML and return both separately.
Args:
html: HTML document
Returns:
Tuple of (css_string, html_string)
"""
soup = BeautifulSoup(html, BS_PARSER)
css_parts = []
# Extract from <style> tags
for style_tag in soup.find_all("style"):
if style_tag.string:
css_parts.append(style_tag.string)
# Extract inline styles (optional - for completeness)
for tag in soup.find_all(style=True):
css_parts.append(f"{tag.name} {{ {tag['style']} }}")
css = "\n".join(css_parts)
return css, html
# preprocess_html_for_pdf is now imported from pipeline_04_render_pdf_and_extract_geos
async def render_html_to_pdf(
html: str,
output_pdf_path: pathlib.Path,
timeout_seconds: int = 60
) -> Tuple[pathlib.Path, float, float, List[dict]]:
"""
Render HTML to PDF using Playwright with automatic size detection.
Also extracts element geometries for handwriting and visual elements.
Matches pipeline_04 rendering logic.
Args:
html: HTML content to render
output_pdf_path: Path where PDF should be saved
timeout_seconds: Timeout for rendering
Returns:
Tuple of (pdf_path, width_mm, height_mm, geometries)
- geometries: List of dicts with element positions, classes, and metadata
"""
# Preprocess HTML using pipeline function
html = preprocess_html_for_pdf(html)
soup = BeautifulSoup(html, BS_PARSER)
# Apply handwriting and visual element processing
soup = increase_handwriting_font_size(soup, dbg=False)
soup = unmark_visual_elements(soup)
prep_html = soup.prettify()
# Create temporary HTML file
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.html',
delete=False,
encoding='utf-8'
) as tmp_html:
tmp_html.write(prep_html)
tmp_html_path = tmp_html.name
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# Load HTML
await page.goto(
f"file://{tmp_html_path}",
wait_until="domcontentloaded"
)
await page.emulate_media(media="screen")
# Auto-detect dimensions
dimensions = await page.evaluate(MEASURE_DIMENSIONS)
page_width_px = dimensions["width"]
page_height_px = dimensions["height"]
# Set viewport
await page.set_viewport_size({
"width": page_width_px,
"height": page_height_px
})
await page.wait_for_timeout(30)
# Extract geometries BEFORE generating PDF (matches pipeline_04)
# Define selectors for handwriting and visual elements
selector_map = {
"handwriting": ".handwritten",
"visual_element": "[data-placeholder]",
"layout_element": r'[class*="LE-"]'
}
# Use json.dumps to properly escape quotes in selectors
import json
selector_map_js = json.dumps(selector_map)
# JavaScript geometry extraction (from pipeline_04)
geo_eval_script = f"""
() => {{
const data = [];
const selectorMap = {selector_map_js};
const processedElements = new Map();
// First pass: collect all elements and their matching selectors
Object.entries(selectorMap).forEach(([label, selector]) => {{
document.querySelectorAll(selector).forEach(el => {{
if (!processedElements.has(el)) {{
processedElements.set(el, []);
}}
processedElements.get(el).push(label);
}});
}});
// Second pass: create geometry data for each unique element
processedElements.forEach((selectorTypes, el) => {{
const rect = el.getBoundingClientRect();
const computed = window.getComputedStyle(el);
// Get text content
let text = '';
if (el.tagName.toLowerCase() === 'input') {{
text = (el.value || '').trim();
}} else {{
text = (el.innerText || el.textContent || '').trim();
}}
data.push({{
id: el.id || null,
tag: el.tagName.toLowerCase(),
classes: el.className || null,
rect: {{
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height
}},
visibility: computed.visibility,
dataContent: el.getAttribute('data-content') || null,
dataPlaceholder: el.getAttribute('data-placeholder') || null,
style: el.getAttribute('style') || null,
text: text,
selectorTypes: selectorTypes
}});
}});
return data;
}}
"""
geometries = await page.evaluate(geo_eval_script)
print(f" 🔍 Extracted {len(geometries)} geometries from rendered DOM")
# Debug: Show what was found
hw_geos = [g for g in geometries if "handwriting" in g.get("selectorTypes", [])]
ve_geos = [g for g in geometries if "visual_element" in g.get("selectorTypes", [])]
if hw_geos:
print(f" - Found {len(hw_geos)} handwriting elements in DOM")
if ve_geos:
print(f" - Found {len(ve_geos)} visual element placeholders in DOM")
if not hw_geos and not ve_geos:
print(f" - ⚠️ No handwriting or visual elements found in DOM")
# Generate PDF
page_width_inches = page_width_px / 96
page_height_inches = page_height_px / 96
await page.pdf(
path=str(output_pdf_path),
width=f"{page_width_inches}in",
height=f"{page_height_inches}in",
margin={
"top": "0",
"bottom": "0",
"left": "0",
"right": "0"
},
print_background=True,
display_header_footer=False,
prefer_css_page_size=False,
scale=1.0
)
await browser.close()
# Convert to mm
width_mm = page_width_inches * 25.4
height_mm = page_height_inches * 25.4
return output_pdf_path, width_mm, height_mm, geometries
finally:
# Clean up temp file
pathlib.Path(tmp_html_path).unlink(missing_ok=True)
def extract_bboxes_from_rendered_pdf(
pdf_path: pathlib.Path
) -> List[dict]:
"""
Extract bounding boxes from rendered PDF.
Args:
pdf_path: Path to PDF file
Returns:
List of bounding box dictionaries
"""
from docgenie.generation.models import OCRBox
# Extract word-level bboxes
word_bboxes = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="word"
)
# Convert OCRBox objects to dict format
# OCRBox has: x0, y0, x2, y2, text, block_no, line_no, word_no
bbox_list = []
for bbox in word_bboxes:
bbox_list.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width, # x2 - x0
"height": bbox.height, # y2 - y0
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0 # Single page documents only
})
return bbox_list
def pdf_to_base64(pdf_path: pathlib.Path) -> str:
"""
Convert PDF file to base64 string.
Args:
pdf_path: Path to PDF file
Returns:
Base64-encoded PDF
"""
with open(pdf_path, 'rb') as f:
pdf_bytes = f.read()
return base64.b64encode(pdf_bytes).decode('utf-8')
def validate_html_structure(html: str) -> Tuple[bool, str]:
"""
Validate HTML structure (pipeline_06 style validation).
Args:
html: HTML content to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
soup = BeautifulSoup(html, BS_PARSER)
# Check for required tags
if not soup.find('html'):
return False, "Missing <html> tag"
if not soup.find('head'):
return False, "Missing <head> tag"
if not soup.find('body'):
return False, "Missing <body> tag"
# Check for minimum content
body = soup.find('body')
if body and len(body.get_text(strip=True)) < 10:
return False, "Body content too short"
return True, ""
except Exception as e:
return False, f"HTML parsing error: {str(e)}"
def validate_pdf(pdf_path: pathlib.Path) -> Tuple[bool, str]:
"""
Validate PDF file (pipeline_06 style validation).
Args:
pdf_path: Path to PDF file
Returns:
Tuple of (is_valid, error_message)
"""
try:
from PyPDF2 import PdfReader
if not pdf_path.exists():
return False, "PDF file does not exist"
# Check file size
file_size = pdf_path.stat().st_size
if file_size == 0:
return False, "PDF file is empty"
if file_size > 50 * 1024 * 1024: # 50MB limit
return False, f"PDF file too large: {file_size / (1024*1024):.1f}MB"
# Check page count
with open(pdf_path, 'rb') as f:
reader = PdfReader(f)
num_pages = len(reader.pages)
if num_pages == 0:
return False, "PDF has no pages"
if num_pages > 1:
return False, f"PDF has {num_pages} pages (expected 1)"
return True, ""
except Exception as e:
return False, f"PDF validation error: {str(e)}"
def validate_bboxes(bboxes: List[dict], min_bbox_count: int = 0) -> Tuple[bool, str]:
"""
Validate bounding boxes (pipeline_06 style validation).
Args:
bboxes: List of bounding box dictionaries
min_bbox_count: Minimum number of bboxes required
Returns:
Tuple of (is_valid, error_message)
"""
if len(bboxes) < min_bbox_count:
return False, f"Only {len(bboxes)} bboxes found (minimum {min_bbox_count} required)"
for i, bbox in enumerate(bboxes):
# Check required fields
required_fields = ['text', 'x', 'y', 'width', 'height']
for field in required_fields:
if field not in bbox:
return False, f"BBox {i} missing required field: {field}"
# Check dimensions
if bbox['width'] <= 0 or bbox['height'] <= 0:
return False, f"BBox {i} has invalid dimensions: {bbox['width']}x{bbox['height']}"
return True, ""
def validate_html_structure(html: str) -> Tuple[bool, Optional[str]]:
"""
Validate HTML structure for common issues.
Args:
html: HTML content to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
soup = BeautifulSoup(html, BS_PARSER)
# Check for basic HTML structure
if not soup.find('html'):
return False, "Missing <html> tag"
if not soup.find('head'):
return False, "Missing <head> tag"
if not soup.find('body'):
return False, "Missing <body> tag"
return True, None
except Exception as e:
return False, f"HTML parsing error: {str(e)}"
def validate_pdf(pdf_path: pathlib.Path) -> Tuple[bool, Optional[str]]:
"""
Validate PDF file for common issues.
Args:
pdf_path: Path to PDF file
Returns:
Tuple of (is_valid, error_message)
"""
try:
from PyPDF2 import PdfReader
if not pdf_path.exists():
return False, "PDF file does not exist"
if pdf_path.stat().st_size == 0:
return False, "PDF file is empty"
# Try to open and read PDF
with open(pdf_path, 'rb') as f:
reader = PdfReader(f)
num_pages = len(reader.pages)
if num_pages == 0:
return False, "PDF has no pages"
if num_pages > 1:
return False, f"PDF has {num_pages} pages (expected 1)"
return True, None
except Exception as e:
return False, f"PDF validation error: {str(e)}"
def validate_bboxes(bboxes: List[dict], min_bbox_count: int = 1) -> Tuple[bool, Optional[str]]:
"""
Validate bounding boxes for common issues.
Args:
bboxes: List of bounding box dictionaries
min_bbox_count: Minimum expected number of bboxes
Returns:
Tuple of (is_valid, error_message)
"""
if len(bboxes) < min_bbox_count:
return False, f"Too few bboxes: {len(bboxes)} (expected at least {min_bbox_count})"
for i, bbox in enumerate(bboxes):
# Check required fields
required_fields = ['text', 'x', 'y', 'width', 'height']
for field in required_fields:
if field not in bbox:
return False, f"BBox {i} missing required field: {field}"
# Check for valid dimensions
if bbox['width'] <= 0 or bbox['height'] <= 0:
return False, f"BBox {i} has invalid dimensions: width={bbox['width']}, height={bbox['height']}"
return True, None
# ============================================================================
# STAGE 3: Feature Synthesis (Handwriting & Visual Elements)
# ============================================================================
async def call_handwriting_service_batch(
texts_with_metadata: List[dict],
apply_ink_filter: bool = True,
enable_enhancements: bool = False,
num_inference_steps: int = 1000
) -> List[dict]:
"""
Call RunPod handwriting generation service.
Supports both modern BATCH mode and legacy SINGLE-REQUEST mode.
Args:
texts_with_metadata: List of dicts with keys: text, author_id, hw_id
Returns:
List of dicts with keys: hw_id, image_base64, text, author_id, width, height
"""
if not texts_with_metadata:
return []
# Check if we should use legacy mode (one-at-a-time)
if not settings.HANDWRITING_SERVICE_SUPPORTS_BATCH:
print(f" ℹ️ Using LEGACY handwriting mode (calling service for each of {len(texts_with_metadata)} texts)...")
return await _call_handwriting_legacy_concurrent(
texts_with_metadata,
apply_ink_filter=apply_ink_filter,
enable_enhancements=enable_enhancements,
num_inference_steps=num_inference_steps
)
# MODERN BATCH MODE
max_retries = settings.HANDWRITING_SERVICE_MAX_RETRIES
timeout = settings.HANDWRITING_SERVICE_TIMEOUT
num_texts = len(texts_with_metadata)
batch_timeout = max(timeout, 90 + (num_texts // 2))
headers = {"Content-Type": "application/json"}
if settings.RUNPOD_API_KEY:
headers["Authorization"] = f"Bearer {settings.RUNPOD_API_KEY}"
print(f" Processing {num_texts} texts in ONE batch (1 worker activation)...")
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=batch_timeout) as client:
runpod_request = {
"input": {
"texts": [
{
"text": item["text"],
"author_id": item["author_id"],
"hw_id": item.get("hw_id", f"hw_{i}")
}
for i, item in enumerate(texts_with_metadata)
],
"apply_blur": False,
"blur_radius": 0.0,
"num_inference_steps": num_inference_steps,
"apply_ink_filter": apply_ink_filter,
"enable_enhancements": enable_enhancements
}
}
response = await client.post(
settings.HANDWRITING_SERVICE_URL,
json=runpod_request,
headers=headers
)
response.raise_for_status()
result = response.json()
# Check for async /run status
job_status = result.get("status")
if job_status in ["IN_PROGRESS", "IN_QUEUE"]:
job_id = result.get("id")
result = await _poll_runpod_status(job_id, client, headers)
job_status = result.get("status")
if job_status != "COMPLETED":
raise Exception(f"RunPod job not completed: {job_status}")
output = result.get("output", {})
if "error" in output:
raise Exception(f"RunPod error: {output['error']}")
# Extract images from batch response
images = output.get("images", [])
if not images:
# Fallback: maybe it returned a single image even if we requested batch?
if "image_base64" in output:
images = [output]
else:
raise Exception("No images in batch response")
# Format results
all_results = []
for i, img in enumerate(images):
all_results.append({
"hw_id": img.get("hw_id") or (texts_with_metadata[i].get("hw_id") if i < len(texts_with_metadata) else None),
"text": img.get("text") or (texts_with_metadata[i].get("text") if i < len(texts_with_metadata) else None),
"author_id": img.get("author_id") or (texts_with_metadata[i].get("author_id") if i < len(texts_with_metadata) else None),
"image_base64": img.get("image_base64"),
"width": img.get("width"),
"height": img.get("height"),
"baseline_ratio": img.get("baseline_ratio", 0.5)
})
print(f" → Batch complete: {len(all_results)}/{num_texts} texts generated successfully")
return all_results
except Exception as e:
if attempt < max_retries - 1:
wait_time = 5 * (attempt + 1)
print(f" ⚠️ Error on attempt {attempt + 1}/{max_retries}: {e}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
print(f" ❌ Batch failed: {e}")
return []
return []
async def _call_handwriting_legacy_concurrent(
texts_with_metadata: List[dict],
apply_ink_filter: bool = True,
enable_enhancements: bool = False,
num_inference_steps: int = 1000
) -> List[dict]:
"""Helper to call legacy (single-request) service concurrently for all texts"""
# Use a semaphore to avoid overloading the API/Worker with too many concurrent requests
sem = asyncio.Semaphore(10)
async def call_single(item, index):
async with sem:
return await _call_handwriting_single(
item,
index,
apply_ink_filter=apply_ink_filter,
enable_enhancements=enable_enhancements,
num_inference_steps=num_inference_steps
)
tasks = [call_single(item, i) for i, item in enumerate(texts_with_metadata)]
results = await asyncio.gather(*tasks)
# Filter out None results (failures)
return [r for r in results if r is not None]
async def _call_handwriting_single(
item: dict,
index: int,
apply_ink_filter: bool = True,
enable_enhancements: bool = False,
num_inference_steps: int = 1000
) -> Optional[dict]:
"""Call legacy single-request RunPod service for one text"""
max_retries = settings.HANDWRITING_SERVICE_MAX_RETRIES
headers = {"Content-Type": "application/json"}
if settings.RUNPOD_API_KEY:
headers["Authorization"] = f"Bearer {settings.RUNPOD_API_KEY}"
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=settings.HANDWRITING_SERVICE_TIMEOUT) as client:
# OLD SCHEMA: Single "text" and "author_id" keys
payload = {
"input": {
"text": item["text"],
"author_id": item["author_id"],
"apply_blur": False,
"blur_radius": 0.0,
"num_inference_steps": num_inference_steps,
"apply_ink_filter": apply_ink_filter,
"enable_enhancements": enable_enhancements
}
}
response = await client.post(
settings.HANDWRITING_SERVICE_URL,
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
# Handle async status
job_status = result.get("status")
if job_status in ["IN_PROGRESS", "IN_QUEUE"]:
job_id = result.get("id")
result = await _poll_runpod_status(job_id, client, headers)
job_status = result.get("status")
if job_status != "COMPLETED":
raise Exception(f"Status: {job_status}")
output = result.get("output", {})
if "error" in output:
raise Exception(f"Worker Error: {output['error']}")
# Format result
return {
"hw_id": item.get("hw_id"),
"text": item.get("text"),
"author_id": item.get("author_id"),
"image_base64": output.get("image_base64"),
"width": output.get("width"),
"height": output.get("height"),
"baseline_ratio": output.get("baseline_ratio", 0.5)
}
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 * (attempt + 1))
continue
else:
print(f" ⚠️ Failed to generate text '{item['text'][:10]}...': {e}")
return None
async def _poll_runpod_status(job_id: str, client: httpx.AsyncClient, headers: dict) -> dict:
"""Helper to poll RunPod job status until completion"""
if not job_id:
raise Exception("No job ID provided for polling")
base_url = settings.HANDWRITING_SERVICE_URL.replace("/runsync", "").replace("/run", "")
status_url = f"{base_url}/status/{job_id}"
max_polls = 60
poll_delay = 2
for i in range(max_polls):
await asyncio.sleep(poll_delay)
response = await client.get(status_url, headers=headers)
response.raise_for_status()
result = response.json()
status = result.get("status")
if status == "COMPLETED":
return result
elif status == "FAILED":
raise Exception(f"Job failed: {result.get('error')}")
elif status not in ["IN_PROGRESS", "IN_QUEUE"]:
raise Exception(f"Unexpected status: {status}")
# Slow down polling slightly
if i > 10: poll_delay = min(poll_delay + 1, 10)
raise Exception(f"Job {job_id} timed out after {max_polls} polls")
async def generate_visual_element_images(
visual_elements: list[dict],
seed: Optional[int] = None,
assets_dir: Optional[pathlib.Path] = None,
barcode_number: Optional[str] = None
) -> dict:
"""
Generate visual element images (stamps, logos, barcodes, photos, figures).
Args:
visual_elements: List of visual element definitions with type, content, rect
seed: Random seed for reproducible selection (default: None)
Returns:
Dict {ve_id: base64_png} of generated images
"""
import random
import base64
import io
from pathlib import Path
if seed is not None:
random.seed(seed)
visual_element_images = {}
# Cache prefab directories
logo_prefabs = None
photo_prefabs = None
figure_prefabs = None
def get_logo_prefabs():
nonlocal logo_prefabs
if logo_prefabs is None:
logo_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "logo"
logo_prefabs = list(logo_dir.glob("*.png")) + list(logo_dir.glob("*.jpg"))
return logo_prefabs
def get_photo_prefabs():
nonlocal photo_prefabs
if photo_prefabs is None:
photo_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "photo"
photo_prefabs = list(photo_dir.glob("*.png")) + list(photo_dir.glob("*.jpg"))
return photo_prefabs
def get_figure_prefabs():
nonlocal figure_prefabs
if figure_prefabs is None:
figure_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "figure"
figure_prefabs = list(figure_dir.glob("*.png")) + list(figure_dir.glob("*.jpg"))
return figure_prefabs
for ve in visual_elements:
ve_id = ve.get('id', 'unknown')
ve_type = ve.get('type', 'unknown')
content = ve.get('content', '')
rect = ve.get('rect', {})
width = rect.get('width', 100)
height = rect.get('height', 100)
rotation = ve.get('rotation', 0)
try:
img = None
if ve_type == 'stamp':
# Select stamp: from assets_dir if available, else generate
if assets_dir:
stamp_files = list(assets_dir.glob("stamp_*"))
if stamp_files:
selected_stamp = random.choice(stamp_files)
img = Image.open(selected_stamp).convert("RGBA")
img = ensure_max_dimensions(img)
if not img: # Fallback to generation
img = create_stamp(
text=content if content else "STAMP",
width=width,
height=height,
rot_angle=None # Rotation applied during insertion
)
elif ve_type == 'logo':
# Select logo: from assets_dir if available, else from prefabs
if assets_dir:
logo_files = list(assets_dir.glob("logo_*"))
if logo_files:
selected_logo = random.choice(logo_files)
img = Image.open(selected_logo).convert("RGBA")
img = ensure_max_dimensions(img)
if not img: # Fallback to prefabs
logos = get_logo_prefabs()
if logos:
selected_logo = random.choice(logos)
img = Image.open(selected_logo).convert("RGBA")
elif ve_type == 'barcode':
# Generate Code128 barcode
try:
from barcode import Code128
from barcode.writer import ImageWriter
# Validate barcode content
if barcode_number:
# Use provided barcode number if valid
barcode_content = barcode_number.strip()
# Simple length check for standard barcodes (8-15 chars typical for EAN/UPC/Code128)
if not barcode_content.isdigit():
print(f" ⚠ Provided barcode_number '{barcode_number}' is not numeric, using random.")
barcode_content = str(random.randint(100000000000, 999999999999))
elif not (8 <= len(barcode_content) <= 15):
print(f" ⚠ Provided barcode_number '{barcode_number}' has invalid length ({len(barcode_content)}), expected 8-15. Using random.")
barcode_content = str(random.randint(100000000000, 999999999999))
else:
barcode_content = content.strip() if content and content.strip().isdigit() else str(random.randint(100000000000, 999999999999))
# Configure barcode writer
writer = ImageWriter()
writer.set_options({
"module_width": 0.3,
"module_height": 15.0,
"quiet_zone": 6.5,
"font_size": 7,
"text_distance": 5,
"background": "rgba(255, 255, 255, 0)",
"foreground": "black",
})
code128 = Code128(barcode_content, writer=writer)
buffer = io.BytesIO()
code128.write(buffer, options={"format": "PNG"})
buffer.seek(0)
img = Image.open(buffer).convert("RGBA")
except ImportError:
print(f" ⚠ 'python-barcode' not installed, skipping barcode {ve_id}")
except Exception as e:
print(f" ⚠ Barcode generation failed for {ve_id}: {e}")
elif ve_type == 'photo':
# Select photo: from assets_dir if available, else from prefabs
if assets_dir:
photo_files = list(assets_dir.glob("photo_*"))
if photo_files:
selected_photo = random.choice(photo_files)
img = Image.open(selected_photo).convert("RGBA")
img = ensure_max_dimensions(img)
if not img: # Fallback to prefabs
photos = get_photo_prefabs()
if photos:
selected_photo = random.choice(photos)
img = Image.open(selected_photo).convert("RGBA")
elif ve_type in ['figure', 'chart', 'diagram']:
# Select figure: from assets_dir if available, else from prefabs
if assets_dir:
figure_files = list(assets_dir.glob("figure_*"))
if figure_files:
selected_figure = random.choice(figure_files)
img = Image.open(selected_figure).convert("RGBA")
img = ensure_max_dimensions(img)
if not img: # Fallback to prefabs
figures = get_figure_prefabs()
if figures:
selected_figure = random.choice(figures)
img = Image.open(selected_figure).convert("RGBA")
# Convert to base64 if successfully generated
if img:
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_b64 = base64.b64encode(buffer.read()).decode('utf-8')
visual_element_images[ve_id] = img_b64
except Exception as e:
print(f" ⚠ Failed to generate visual element {ve_id} (type: {ve_type}): {e}")
continue
return visual_element_images
async def process_stage3_complete(
pdf_path: pathlib.Path,
geometries: list[dict],
ground_truth: dict,
bboxes_raw: list[dict],
page_width_mm: float,
page_height_mm: float,
enable_handwriting: bool = False,
handwriting_ratio: float = 0.5,
handwriting_apply_ink_filter: bool = True,
handwriting_enable_enhancements: bool = False,
handwriting_num_inference_steps: int = 1000,
handwriting_writer_ids: List[int] = None,
enable_visual_elements: bool = False,
visual_element_types: list[str] = None,
seed: Optional[int] = None,
assets_dir: Optional[pathlib.Path] = None,
barcode_number: Optional[str] = None
) -> tuple[str, list[dict], list[dict], dict, dict, pathlib.Path | None, pathlib.Path | None]:
"""
Process complete Stage 3 pipeline (stages 07-11) using browser-extracted geometries.
- Extract handwriting definitions from geometries (from DOM, not HTML parsing)
- Extract visual element definitions from geometries
- Generate handwriting images (via EC2 service if enabled)
- Create visual element images
- Render second-pass PDF with handwriting and visual elements
- Convert final PDF to base64 image
Args:
geometries: List of element geometries extracted from browser DOM
Returns:
tuple: (final_image_base64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path)
- final_image_base64: Base64 PNG of final document
- handwriting_regions: List of handwriting metadata dicts
- visual_elements: List of visual element metadata dicts
- handwriting_images: Dict {hw_id: base64_png} for individual tokens
- visual_element_images: Dict {ve_id: base64_png} for individual elements
- pdf_with_handwriting_path: Path to PDF after handwriting insertion (or None)
- pdf_final_path: Path to final PDF after all modifications (or None)
"""
import random
import base64
import fitz # PyMuPDF
# Use provided seed or generate a random one for internal variety
internal_seed = seed if seed is not None else random.randint(0, 1000000)
handwriting_regions = []
visual_elements = []
# Define temp_dir for saving intermediate/final PDFs
# Use assets_dir if provided, otherwise fallback to system temp
temp_dir = assets_dir if assets_dir else pathlib.Path(tempfile.gettempdir())
print(f" 🔍 Processing {len(geometries)} geometries from DOM")
# Step 2: Extract handwriting definitions (pipeline_07) - map geometries to word bboxes
if enable_handwriting:
# Convert bboxes_raw dicts to OCRBox objects for matching
from docgenie.generation.models import OCRBox
from docgenie.generation.constants import BBOX_TO_GEO_MATCHING_THRESHOLD
from docgenie.generation.utils.bboxes import is_in_rect
# Build OCRBox list from bboxes_raw
word_bboxes = []
for bbox_dict in bboxes_raw:
word_bboxes.append(OCRBox(
x0=bbox_dict['x'],
y0=bbox_dict['y'],
x2=bbox_dict['x'] + bbox_dict['width'],
y2=bbox_dict['y'] + bbox_dict['height'],
text=bbox_dict['text'],
block_no=bbox_dict.get('block_no', 0), # Default if not present
line_no=bbox_dict.get('line_no', 0),
word_no=bbox_dict.get('word_no', 0)
))
# Filter geometries for handwriting elements
hw_geometries = [g for g in geometries if "handwriting" in g.get("selectorTypes", [])]
print(f" - Found {len(hw_geometries)} handwriting geometries")
# Determine which geometries to process based on word count budget (Predictable Selection)
total_words_tagged = sum(len(g.get('text', '').split()) for g in hw_geometries)
word_budget = total_words_tagged * handwriting_ratio
# Shuffle reproducibly using the provided seed
shuffled_hw = list(hw_geometries)
if seed is not None:
random.seed(seed)
random.shuffle(shuffled_hw)
# Select regions until budget is met
selected_geos = []
accumulated_words = 0
for geo in shuffled_hw:
word_count = len(geo.get('text', '').split())
if word_count == 0:
continue
# Select if under budget, or always select at least one if ratio > 0 and nothing selected yet
if accumulated_words < word_budget or (not selected_geos and handwriting_ratio > 0):
selected_geos.append(geo)
accumulated_words += word_count
if accumulated_words >= word_budget and handwriting_ratio < 1.0:
break
print(f" - Selection: {accumulated_words}/{total_words_tagged} words ({len(selected_geos)}/{len(hw_geometries)} regions) based on {handwriting_ratio} ratio")
taken_bbox_indices = set()
# Process only the selected regions
for i, geo in enumerate(selected_geos):
classes_str = geo.get('classes', '')
classes = classes_str.split() if classes_str else []
# Extract author ID
other_classes = [c for c in classes if c != 'handwritten']
valid_author_ids = [c for c in other_classes if c.startswith("author")]
author_id = valid_author_ids[0] if valid_author_ids else None
text_content = geo.get('text', '').strip()
if not text_content:
continue
is_signature = 'signature' in classes
# Convert browser coordinates (96 DPI) to mm
px_to_mm = 25.4 / 96.0
rect_browser = geo.get('rect', {})
rect = {
'x': rect_browser.get('x', 0) * px_to_mm,
'y': rect_browser.get('y', 0) * px_to_mm,
'width': rect_browser.get('width', 0) * px_to_mm,
'height': rect_browser.get('height', 0) * px_to_mm,
'page_index': geo.get('pageIndex', 0)
}
# For matching with PyMuPDF bboxes (points), we need a points version of the rect
dpi_scale = 72.0 / 96.0
rect_pt = {
'x': rect_browser.get('x', 0) * dpi_scale,
'y': rect_browser.get('y', 0) * dpi_scale,
'width': rect_browser.get('width', 0) * dpi_scale,
'height': rect_browser.get('height', 0) * dpi_scale
}
# Map geometry to word bboxes (like pipeline_07 find_bbox_indices)
words = text_content.split()
n = len(words)
matched_bboxes = []
for j in range(len(word_bboxes) - n + 1):
slice_texts = [b.text for b in word_bboxes[j : j + n]]
if slice_texts == words:
start, stop = j, j + n
if (start, stop) not in taken_bbox_indices:
# Check if bboxes are within geometry rect
start_in_rect = is_in_rect(
rect=rect_pt,
bbox=word_bboxes[start],
threshold=BBOX_TO_GEO_MATCHING_THRESHOLD
)
stop_in_rect = is_in_rect(
rect=rect_pt,
bbox=word_bboxes[stop - 1],
threshold=BBOX_TO_GEO_MATCHING_THRESHOLD
)
if start_in_rect and stop_in_rect:
matched_bboxes = word_bboxes[start:stop]
taken_bbox_indices.add((start, stop))
break
if not matched_bboxes:
print(f" - ⚠️ No bbox match for hw{i}: '{text_content[:30]}'")
continue
handwriting_regions.append({
'id': f"hw_{i}",
'rect': rect,
'text': text_content,
'author_id': author_id or f"author{random.randint(1, 9)}",
'is_signature': is_signature,
'bboxes': [b.as_string() for b in matched_bboxes],
'page_index': geo.get('pageIndex', 0),
'classes': classes_str
})
print(f" - Selected {len(handwriting_regions)} handwriting regions (ratio: {handwriting_ratio})")
# Step 3: Extract visual element definitions (pipeline_08) - from geometries
if enable_visual_elements:
# Filter geometries for visual element placeholders
ve_geometries = [g for g in geometries if "visual_element" in g.get("selectorTypes", [])]
print(f" - Found {len(ve_geometries)} visual element geometries")
for i, geo in enumerate(ve_geometries):
data_type = geo.get('dataPlaceholder', '')
data_content = geo.get('dataContent', '')
# Normalize type using synonyms (e.g., "chart" -> "figure")
normalized_type = VISUAL_ELEMENT_TYPE_SYNONYMS.get(data_type, data_type)
# Filter by requested types
if visual_element_types and normalized_type not in visual_element_types:
print(f" ⚠️ Filtered out visual element type '{data_type}' (normalized to '{normalized_type}', not in requested types: {visual_element_types})")
continue
# Use rect from geometry
rect_px = geo.get('rect', {})
px_to_mm = 25.4 / 96
rect = {
'x': rect_px.get('x', 0) * px_to_mm,
'y': rect_px.get('y', 0) * px_to_mm,
'width': rect_px.get('width', 0) * px_to_mm,
'height': rect_px.get('height', 0) * px_to_mm
}
# Extract rotation if present in style
rotation = 0
style = geo.get('style', '')
if style and 'rotate' in style:
rotation = extract_rotation_from_style(style)
ve = {
'id': f've{i}',
'type': normalized_type, # Use normalized type (e.g., "figure" not "chart")
'content': data_content,
'rect': rect,
'rotation': rotation
}
# Store page index for multi-page support
ve['page_index'] = geo.get('pageIndex', 0)
visual_elements.append(ve)
print(f" - Selected {len(visual_elements)} visual elements")
# Step 4: Generate handwriting images (pipeline_09)
handwriting_images = {}
# DEBUG: Show why handwriting service may not be called
print(f"\n 🔍 DEBUG - Handwriting Service Check:")
print(f" - enable_handwriting: {enable_handwriting}")
print(f" - handwriting_regions count: {len(handwriting_regions)}")
print(f" - HANDWRITING_SERVICE_ENABLED: {settings.HANDWRITING_SERVICE_ENABLED}")
print(f" - HANDWRITING_SERVICE_URL: {settings.HANDWRITING_SERVICE_URL}")
if enable_handwriting and handwriting_regions and settings.HANDWRITING_SERVICE_ENABLED:
print(f" ✅ Handwriting service check PASSED - preparing batch request...")
# Map author strings to numeric style IDs (matches original pipeline behavior)
# Use provided writer styles or fall back to default
writer_styles = handwriting_writer_ids
if not writer_styles:
from docgenie.generation.constants import WRITER_STYLES as DEFAULT_WRITER_STYLES
writer_styles = DEFAULT_WRITER_STYLES
# Create deterministic mapping: author_id string → numeric style ID
def map_author_to_style_id(author_id_str: str, seed_val: Optional[int] = None) -> int:
"""
Map author ID string (like 'author1') to numeric style ID (0-656).
Matches original pipeline's style selection logic.
"""
if not author_id_str or not author_id_str.startswith('author'):
# Fallback: random from writer_styles
return random.choice(writer_styles)
try:
# Parse number from "authorN"
author_num = int(author_id_str.replace('author', ''))
# Use seed to offset the index for variety across different jobs
# but keep it consistent within the same document
offset = seed_val if seed_val is not None else 0
style_idx = (author_num + offset) % len(writer_styles)
return writer_styles[style_idx]
except ValueError:
# If parsing fails, random selection
return random.choice(writer_styles)
# Prepare batch request for handwriting service
texts_to_generate = []
for i, hw_region in enumerate(handwriting_regions):
author_id_str = hw_region.get('author_id')
text = hw_region.get('text', '')
print(f" - Region {i+1}: author_id='{author_id_str}', text='{text[:30]}...'")
# Only generate if we have a valid author_id
if author_id_str is not None:
# Convert author string to numeric style ID
style_id = map_author_to_style_id(author_id_str, internal_seed)
print(f" → Mapped to style_id={style_id}")
# Group bboxes by block/line (like pipeline_12)
bboxes_str = hw_region.get('bboxes', [])
if not bboxes_str:
print(f" → ⚠️ Skipped (no bboxes)")
continue
# Parse bbox strings and group by (block_no, line_no)
from collections import defaultdict
from docgenie.generation.utils.bboxes import read_syn_dataset_bbox_str
grouped_bboxes = defaultdict(list)
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
grouped_bboxes[(bbox.block_no, bbox.line_no)].append(bbox)
# Generate one image per word (WordStylist doesn't support spaces)
for (block_no, line_no), bbox_group in grouped_bboxes.items():
# Process each word individually
for word_idx, bbox in enumerate(bbox_group):
word_text = bbox.text
# Filter to only letters (WordStylist only supports A-Z, a-z, no spaces)
filtered_text = ''.join(c for c in word_text if c.isalpha())
# Skip if no valid text remains after filtering
if not filtered_text:
continue
texts_to_generate.append({
'text': filtered_text,
'author_id': style_id,
'hw_id': f"{hw_region['id']}_b{block_no}_l{line_no}_w{word_idx}"
})
print(f" → {len(grouped_bboxes)} block/line groups")
else:
print(f" → ⚠️ Skipped (no author_id)")
print(f" - Prepared {len(texts_to_generate)} texts for generation")
if texts_to_generate:
try:
print(f" - Calling RunPod handwriting service at {settings.HANDWRITING_SERVICE_URL}...")
# Call RunPod handwriting service
results = await call_handwriting_service_batch(
texts_to_generate,
apply_ink_filter=handwriting_apply_ink_filter,
enable_enhancements=handwriting_enable_enhancements,
num_inference_steps=handwriting_num_inference_steps
)
print(f" - ✅ Received {len(results)} handwriting images")
# Store generated images
for result in results:
handwriting_images[result['hw_id']] = {
'image_base64': result['image_base64'],
'baseline_ratio': result.get('baseline_ratio', 0.5)
}
except Exception as e:
print(f" - ❌ Handwriting service call failed: {e}")
import traceback
traceback.print_exc()
# If handwriting is explicitly enabled, fail the entire generation
# Don't produce documents without handwriting when user requested it
raise Exception(f"Handwriting generation failed: {e}") from e
else:
print(f" - ⚠️ No texts to generate (all regions missing author_id)")
else:
reasons = []
if not enable_handwriting: reasons.append("disabled by user")
if not handwriting_regions: reasons.append("no handwriting regions found")
if not settings.HANDWRITING_SERVICE_ENABLED: reasons.append("service disabled in config")
print(f" ℹ️ Handwriting generation skipped: {', '.join(reasons)}")
# Step 5: Create visual element images (pipeline_10)
visual_element_images = {}
if enable_visual_elements and visual_elements:
try:
visual_element_images = await generate_visual_element_images(
visual_elements,
seed=seed,
assets_dir=assets_dir,
barcode_number=barcode_number
)
print(f" ✓ Generated {len(visual_element_images)} visual element images")
except Exception as e:
print(f" ⚠ Visual element generation failed: {e}")
# Continue without visual elements
def resize_to_bbox_highres(img, bbox_width, bbox_height, scale_up=3):
"""Resize with preserved aspect ratio, pad to bbox, upscale for sharpness."""
from PIL import Image
bbox_width = round(bbox_width)
bbox_height = round(bbox_height)
# Aspect Ratio
iw, ih = img.size
scale = min(bbox_width / iw, bbox_height / ih)
new_w = int(iw * scale * scale_up)
new_h = int(ih * scale * scale_up)
img_resized = img.resize((new_w, new_h), Image.Resampling.LANCZOS).convert("RGBA")
final_img = Image.new("RGBA", (new_w, new_h), (255, 255, 255, 0))
final_img.paste(img_resized, (0, 0), mask=img_resized)
return final_img
# Step 6: Insert images into PDF (pipeline_12 & pipeline_13)
doc = fitz.open(pdf_path)
num_pages = len(doc)
print(f" 📄 PDF has {num_pages} pages. Starting multi-page insertion...")
from docgenie.generation.constants import (
FIXED_HANDWRITING_X_OFFSET,
MAX_HANDWRITING_RAND_X_OFFSET_LEFT,
MAX_HANDWRITING_RAND_X_OFFSET_RIGHT,
MAX_HANDWRITING_RAND_Y_OFFSET_UP,
MAX_HANDWRITING_RAND_Y_OFFSET_DOWN,
PIPELINE_04_3_SCALE_UP_FACTOR
)
scale_up = PIPELINE_04_3_SCALE_UP_FACTOR
from docgenie.generation.utils.bboxes import read_syn_dataset_bbox_str
from collections import defaultdict
# Process each page for handwriting (Pass 1)
for page_num in range(num_pages):
page = doc[page_num]
page_hw_regions = [r for r in handwriting_regions if r.get('page_index', 0) == page_num]
if handwriting_images and page_hw_regions:
print(f" - Page {page_num}: Inserting {len(page_hw_regions)} handwriting regions...")
# First, white out original text regions
for hw_region in page_hw_regions:
bboxes_str = hw_region.get('bboxes', [])
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
rect = fitz.Rect(bbox.x0, bbox.y0, bbox.x2, bbox.y2)
page.draw_rect(rect, color=(1, 1, 1), fill=(1, 1, 1))
# Then, insert generated images
for hw_region in page_hw_regions:
hw_id = hw_region['id']
bboxes_str = hw_region.get('bboxes', [])
if not bboxes_str: continue
grouped_bboxes = defaultdict(list)
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
grouped_bboxes[(bbox.block_no, bbox.line_no)].append(bbox)
for (block_no, line_no), bbox_group in grouped_bboxes.items():
for word_idx, bbox in enumerate(bbox_group):
img_id = f"{hw_id}_b{block_no}_l{line_no}_w{word_idx}"
if img_id not in handwriting_images: continue
try:
hw_data = handwriting_images[img_id]
img_b64 = hw_data['image_base64']
baseline_ratio = hw_data['baseline_ratio']
img_data = base64.b64decode(img_b64)
img = Image.open(io.BytesIO(img_data))
bbox_w, bbox_h = bbox.x2 - bbox.x0, bbox.y2 - bbox.y0
img_resized = resize_to_bbox_highres(img, bbox_w, bbox_h, scale_up=scale_up)
offset_x = random.randint(-MAX_HANDWRITING_RAND_X_OFFSET_LEFT, MAX_HANDWRITING_RAND_X_OFFSET_RIGHT) + FIXED_HANDWRITING_X_OFFSET
offset_y = random.randint(-MAX_HANDWRITING_RAND_Y_OFFSET_UP, MAX_HANDWRITING_RAND_Y_OFFSET_DOWN)
original_baseline_y = bbox.y0 + (bbox_h * 0.8)
new_baseline_y = original_baseline_y + offset_y
target_h_points = img_resized.height / scale_up
target_w_points = img_resized.width / scale_up
y0_pos = new_baseline_y - (baseline_ratio * target_h_points)
x0_pos = bbox.x0 + offset_x
x2_pos = x0_pos + target_w_points
y2_pos = y0_pos + target_h_points
# Convert resized image to bytes for insertion
img_bytes_io = io.BytesIO()
img_resized.save(img_bytes_io, format="PNG")
page.insert_image(fitz.Rect(x0_pos, y0_pos, x2_pos, y2_pos), stream=img_bytes_io.getvalue())
except Exception as e:
print(f" ⚠ Insertion failed for {img_id}: {e}")
# Save intermediate handwriting PDF for backward compatibility
pdf_with_handwriting_path = temp_dir / "with_handwriting.pdf"
doc.save(pdf_with_handwriting_path)
print(f" ✓ Saved intermediate handwriting PDF: {pdf_with_handwriting_path.name}")
# Process each page for visual elements (Pass 2)
for page_num in range(num_pages):
page = doc[page_num]
page_visual_elements = [v for v in visual_elements if v.get('page_index', 0) == page_num]
if visual_element_images and page_visual_elements:
print(f" - Page {page_num}: Inserting {len(page_visual_elements)} visual elements...")
for ve in page_visual_elements:
ve_id = ve.get('id', 'unknown')
if ve_id not in visual_element_images: continue
try:
img_b64 = visual_element_images[ve_id]
img_data = base64.b64decode(img_b64)
img = Image.open(io.BytesIO(img_data))
rect = ve.get('rect', {})
rotation = ve.get('rotation', 0)
# Convert mm coordinates (from DOM extraction) to PDF points (72 DPI)
# This ensures correct placement regardless of internal rendering scale
mm_to_points = 72 / 25.4
x0 = rect.get('x', 0) * mm_to_points
y0 = rect.get('y', 0) * mm_to_points
w = rect.get('width', 0) * mm_to_points
h = rect.get('height', 0) * mm_to_points
fitz_rect = fitz.Rect(x0, y0, x0 + w, y0 + h)
# High-res resizing for visual elements
img_highres = resize_to_bbox_highres(img, w, h, scale_up=scale_up)
# Handle arbitrary rotation (Research Parity)
if rotation:
# Rotate in PIL with expand=True to match getBoundingClientRect behavior
# CSS rotate is clockwise, PIL rotate is counter-clockwise, so negate
img_highres = img_highres.rotate(-rotation, expand=True, resample=Image.Resampling.LANCZOS)
# After rotation with expand, we might need to slightly re-adjust alignment
# but if bbox was getBoundingClientRect, it should fit perfectly.
img_bytes_io = io.BytesIO()
img_highres.save(img_bytes_io, format="PNG")
# Insert the (possibly rotated) image into the axis-aligned bounding box
page.insert_image(fitz_rect, stream=img_bytes_io.getvalue())
except Exception as e:
print(f" ⚠ Visual element insertion failed for {ve_id}: {e}")
# Step 7: Finalize PDF and Render Image
pdf_final_path = temp_dir / "final_document.pdf"
doc.save(pdf_final_path)
# Render first page as base64 for API response
page_preview = doc[0]
pix = page_preview.get_pixmap(matrix=fitz.Matrix(3, 3)) # 3x scale for quality
img_bytes = pix.tobytes("png")
final_image_b64 = base64.b64encode(img_bytes).decode('utf-8')
doc.close()
return final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path
def extract_rect_from_style(style: str, page_width_mm: float, page_height_mm: float) -> dict:
"""Extract position and dimensions from inline CSS style."""
import re
rect = {'x': 0, 'y': 0, 'width': 0, 'height': 0}
# Parse CSS properties
for prop in style.split(';'):
if ':' not in prop:
continue
key, value = prop.split(':', 1)
key = key.strip().lower()
value = value.strip()
# Extract numeric value and unit
match = re.match(r'([-\d.]+)(mm|cm|px)?', value)
if not match:
continue
num_val = float(match.group(1))
unit = match.group(2) or 'mm'
# Convert to mm
if unit == 'cm':
num_val *= 10
elif unit == 'px':
num_val *= 0.2645833333 # 96 DPI to mm
# Map CSS properties to rect
if key in ('left', 'x'):
rect['x'] = num_val
elif key in ('top', 'y'):
rect['y'] = num_val
elif key == 'width':
rect['width'] = num_val
elif key == 'height':
rect['height'] = num_val
return rect
def extract_rotation_from_style(style: str) -> float:
"""Extract 2D rotation angle from CSS transform property."""
import re
match = re.search(r'rotate\(\s*([-+]?\d*\.?\d+)\s*deg\s*\)', style)
if match:
return float(match.group(1))
return 0.0
# ==================== Stages 14-15: Image Finalization & OCR ====================
def run_local_tesseract_ocr(image: Image.Image) -> dict:
"""
Run Tesseract OCR locally on image.
Args:
image: PIL Image to OCR
Returns:
dict: OCR results in Microsoft OCR format
"""
try:
import pytesseract
# Get OCR data with bounding boxes
data = pytesseract.image_to_data(
image,
lang=settings.OCR_TESSERACT_LANG,
config=settings.OCR_TESSERACT_CONFIG,
output_type=pytesseract.Output.DICT
)
# Convert to Microsoft OCR format
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
if text: # Only include non-empty text
words.append({
'text': text,
'confidence': float(data['conf'][i]) / 100.0 if data['conf'][i] != -1 else 0.0,
'geo': [
int(data['left'][i]),
int(data['top'][i]),
int(data['width'][i]),
int(data['height'][i])
]
})
return {
'angle': 0,
'imageWidth': image.width,
'imageHeight': image.height,
'words': words
}
except ImportError:
raise RuntimeError(
"pytesseract not installed. Install with: uv pip install pytesseract\n"
"Also ensure Tesseract OCR is installed on your system:\n"
" Ubuntu/Debian: sudo apt-get install tesseract-ocr\n"
" macOS: brew install tesseract\n"
" Windows: Download from https://github.com/UB-Mannheim/tesseract/wiki"
)
except Exception as e:
print(f"Error running local Tesseract OCR: {e}")
raise
async def call_ocr_service(
image: Image.Image,
ocr_url: str = None,
engine: str = "microsoft_di",
timeout: int = 30,
use_local: bool = None
) -> dict:
"""
Call OCR service on image (Stage 15: Perform OCR).
Supports both local Tesseract OCR and remote OCR services.
Args:
image: PIL Image to OCR
ocr_url: OCR service URL (defaults to settings.OCR_SERVICE_URL)
engine: OCR engine to use
timeout: Request timeout in seconds
use_local: Force local/remote mode (None = use settings.OCR_USE_LOCAL)
Returns:
dict: OCR results in Microsoft OCR format
"""
# Determine if using local or remote OCR
if use_local is None:
use_local = settings.OCR_USE_LOCAL
# Local Tesseract OCR
if use_local:
print(" Using local Tesseract OCR...")
return run_local_tesseract_ocr(image)
# Remote OCR service
if ocr_url is None:
ocr_url = settings.OCR_SERVICE_URL
try:
# Convert image to bytes
buffer = BytesIO()
image.save(buffer, format="PNG")
buffer.seek(0)
image_bytes = buffer.getvalue()
# Call OCR service
endpoint = f"{ocr_url}/v1/sync/ocr/{engine}"
async with httpx.AsyncClient(timeout=timeout) as client:
files = {'image': image_bytes, 'type': 'image/png'}
headers = {'accept': 'application/json'}
response = await client.post(endpoint, headers=headers, files=files)
response.raise_for_status()
data = response.json()
# Extract first page results
if 'ocr' in data and 'pages' in data['ocr'] and len(data['ocr']['pages']) > 0:
return data['ocr']['pages'][0]
else:
raise ValueError("Invalid OCR response format")
except Exception as e:
print(f"Error calling OCR service: {e}")
raise
async def render_pdf_to_image(
pdf_path: pathlib.Path,
dpi: int = 300
) -> tuple[Image.Image, str]:
"""
Convert PDF to high-quality image (Stage 14: Render Image).
Uses pdf2image (poppler) for high-quality conversion matching original pipeline.
Args:
pdf_path: Path to PDF file
dpi: DPI for rendering (default: 300, matching pipeline constant)
Returns:
tuple: (PIL Image, base64-encoded PNG string)
"""
try:
# Use pdf2image (same as original pipeline)
# This uses poppler under the hood for high-quality rendering
images = convert_from_path(pdf_path, dpi=dpi)
if not images:
raise ValueError("PDF conversion resulted in no images")
if len(images) > 1:
print(f"Warning: PDF has {len(images)} pages, using first page only")
img = images[0]
# Convert to base64
buffer = BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
return img, img_base64
except Exception as e:
print(f"Error converting PDF to image: {e}")
raise
def convert_ocr_to_api_format(ocr_page: dict) -> dict:
"""
Convert Microsoft OCR format to API OCRResult schema.
Implements research-grade spatial grouping to map words to their parent lines,
ensuring hierarchical structure for downstream KIE tasks.
Args:
ocr_page: OCR page result from Microsoft OCR service
Returns:
dict: OCR results in API format with nested words
"""
all_words = []
for word_data in ocr_page.get('words', []):
geo = word_data['geo'] # [x, y, width, height]
all_words.append({
'text': word_data['text'],
'confidence': word_data['confidence'],
'x': geo[0],
'y': geo[1],
'width': geo[2],
'height': geo[3]
})
lines = []
# Sort lines by top coordinate for deterministic processing
raw_lines = ocr_page.get('lines', [])
for line_data in raw_lines:
line_geo = line_data['geo']
l_x1, l_y1 = line_geo[0], line_geo[1]
l_x2, l_y2 = l_x1 + line_geo[2], l_y1 + line_geo[3]
# Extract words for this line using spatial overlap
line_words = []
for word in all_words:
w_x1, w_y1 = word['x'], word['y']
w_x2, w_y2 = w_x1 + word['width'], w_y1 + word['height']
# Calculate vertical overlap
overlap_y1 = max(l_y1, w_y1)
overlap_y2 = min(l_y2, w_y2)
if overlap_y1 < overlap_y2:
overlap_height = overlap_y2 - overlap_y1
# If more than 50% of word height is within the line height
if overlap_height > 0.5 * word['height']:
# Also check horizontal overlap (word center should be within line bounds)
w_center_x = w_x1 + (word['width'] / 2)
if l_x1 - 10 <= w_center_x <= l_x2 + 10:
line_words.append(word)
# Sort line words by x coordinate (reading order)
line_words.sort(key=lambda w: w['x'])
lines.append({
'text': line_data['text'],
'confidence': line_data['confidence'],
'x': line_geo[0],
'y': line_geo[1],
'width': line_geo[2],
'height': line_geo[3],
'words': line_words
})
return {
'image_width': ocr_page['imageWidth'],
'image_height': ocr_page['imageHeight'],
'angle': ocr_page.get('angle', 0.0),
'words': all_words,
'lines': lines
}
async def process_stage4_ocr(
pdf_path: pathlib.Path,
enable_ocr: bool = False,
dpi: int = 300
) -> tuple[Optional[str], Optional[dict]]:
"""
Process Stage 4: Image Finalization & OCR.
This corresponds to:
- pipeline_14: Render PDF to high-quality image
- pipeline_15: Perform OCR on final image
Args:
pdf_path: Path to final PDF (after Stage 3 if enabled)
enable_ocr: Whether to run OCR
dpi: DPI for image rendering
Returns:
tuple: (image_base64, ocr_results_dict)
"""
image_base64 = None
ocr_results = None
try:
# Stage 14: Render PDF to image
img, image_base64 = await render_pdf_to_image(pdf_path, dpi=dpi)
print(f" ✓ Stage 14: Rendered image {img.size[0]}x{img.size[1]} @ {dpi} DPI")
# Stage 15: Perform OCR (if enabled and service available)
if enable_ocr and settings.OCR_SERVICE_ENABLED:
try:
ocr_page = await call_ocr_service(
img,
timeout=settings.OCR_SERVICE_TIMEOUT
)
ocr_results = convert_ocr_to_api_format(ocr_page)
print(f" ✓ Stage 15: OCR complete - {len(ocr_results['words'])} words, {len(ocr_results['lines'])} lines")
except Exception as e:
print(f" ⚠ Stage 15: OCR failed - {str(e)}")
# Continue without OCR
elif enable_ocr:
print(f" ⚠ Stage 15: OCR requested but service not enabled (OCR_SERVICE_ENABLED=false)")
return image_base64, ocr_results
except Exception as e:
print(f" ⚠ Stage 4 processing failed: {str(e)}")
return None, None
# ==================== Stages 16-18: Dataset Packaging ====================
async def normalize_bboxes_stage16(
document_id: str,
pdf_path: str,
ocr_results: Optional[Dict[str, Any]],
bboxes_raw: Optional[List[Dict]] = None,
pdf_width_pt: Optional[float] = None,
pdf_height_pt: Optional[float] = None,
scale: str = "0-1"
) -> Tuple[Optional[List[Dict]], Optional[List[Dict]], Optional[List[Dict]]]:
"""
Stage 16: Normalize bounding boxes to [0,1] scale.
Reuses logic from pipeline_16_normalize_bboxes.py
Args:
document_id: Unique document identifier
pdf_path: Path to PDF file
ocr_results: OCR results from Stage 15
scale: Normalization scale ("0-1" or "0-1000")
Returns:
Tuple of (word_level_bboxes, segment_level_bboxes, raw_normalized_bboxes)
"""
try:
print(f"\\n Stage 16: Normalizing bounding boxes...")
if not ocr_results or not ocr_results.get('words'):
print(f" ⚠ Stage 16: No OCR results to normalize")
return None, None
# Get image dimensions from OCR results
img_w_px = ocr_results.get('image_width', 0)
img_h_px = ocr_results.get('image_height', 0)
if img_w_px == 0 or img_h_px == 0:
print(f" ⚠ Stage 16: Invalid image dimensions")
return None, None
# Normalize word-level bboxes
normalized_words = []
for word in ocr_results.get('words', []):
# Convert pixel coordinates to normalized [0,1]
x0_norm = word['x'] / img_w_px
y0_norm = word['y'] / img_h_px
x2_norm = (word['x'] + word['width']) / img_w_px
y2_norm = (word['y'] + word['height']) / img_h_px
# If scale is 0-1000, multiply by 1000
if scale == "0-1000":
x0_norm *= 1000
y0_norm *= 1000
x2_norm *= 1000
y2_norm *= 1000
normalized_words.append({
'text': word['text'],
'x0': x0_norm,
'y0': y0_norm,
'x2': x2_norm,
'y2': y2_norm,
'block_no': None,
'line_no': None,
'word_no': None
})
# Normalize line-level (segment) bboxes
normalized_segments = []
for line in ocr_results.get('lines', []):
x0_norm = line['x'] / img_w_px
y0_norm = line['y'] / img_h_px
x2_norm = (line['x'] + line['width']) / img_w_px
y2_norm = (line['y'] + line['height']) / img_h_px
if scale == "0-1000":
x0_norm *= 1000
y0_norm *= 1000
x2_norm *= 1000
y2_norm *= 1000
normalized_segments.append({
'text': line['text'],
'x0': x0_norm,
'y0': y0_norm,
'x2': x2_norm,
'y2': y2_norm,
'block_no': None,
'line_no': None,
'word_no': None
})
# Normalize raw PDF bboxes if provided (Research Parity)
normalized_raw = []
if bboxes_raw and pdf_width_pt and pdf_height_pt:
for bbox in bboxes_raw:
# Extract coordinates [x0, y0, x2, y2] from PDF points
if hasattr(bbox, 'x0'): # OCRBox object
bx0, by0, bx2, by2 = bbox.x0, bbox.y0, bbox.x2, bbox.y2
elif isinstance(bbox.get('bbox'), list):
bx0, by0, bx2, by2 = bbox['bbox']
else:
bx0, by0 = bbox.get('x', 0), bbox.get('y', 0)
bx2, by2 = bx0 + bbox.get('width', 0), by0 + bbox.get('height', 0)
nx0 = (bx0 / pdf_width_pt)
ny0 = (by0 / pdf_height_pt)
nx2 = (bx2 / pdf_width_pt)
ny2 = (by2 / pdf_height_pt)
if scale == "0-1000":
nx0 *= 1000
ny0 *= 1000
nx2 *= 1000
ny2 *= 1000
normalized_raw.append({
'text': bbox.get('text', ''),
'x0': nx0,
'y0': ny0,
'x2': nx2,
'y2': ny2,
'block_no': bbox.get('block_no'),
'line_no': bbox.get('line_no'),
'word_no': bbox.get('word_no')
})
print(f" ✓ Stage 16: Normalized {len(normalized_words)} OCR words, {len(normalized_raw)} PDF words")
return normalized_words, normalized_segments, normalized_raw
except Exception as e:
print(f" ⚠ Stage 16: BBox normalization failed - {str(e)}")
return None, None
def normalize_text(s: str) -> str:
"""Normalize whitespace in string."""
if not s: return ""
return re.sub(r"\s+", " ", str(s).strip())
def _find_best_fuzzy_match_span(
original_text: str,
pattern: str,
cutoff: float,
text_positions: List[Tuple[int, int]],
):
"""
Find the best fuzzy match for a pattern within original_text.
Returns (best_candidate_text, best_score, found, [bbox_indices])
"""
clean_text = normalize_text(original_text)
clean_text_lower = clean_text.lower()
clean_pattern = normalize_text(pattern).lower()
pat_len = len(clean_pattern)
if not clean_pattern or not clean_text:
return "", 0.0, False, []
best_candidate = ""
best_score = -1
best_span = (0, 0)
# Use Levenshtein to find best match in a sliding window
for i in range(0, len(clean_text) - pat_len + 1):
candidate = clean_text_lower[i : i + pat_len]
dist = Levenshtein.distance(candidate, clean_pattern)
clen = max(len(clean_pattern), len(candidate))
if clen == 0:
continue
score = 1 - dist / clen
if score > best_score:
best_score = score
best_candidate = clean_text[i : i + pat_len]
best_span = (i, i + pat_len)
found = best_score >= cutoff
# Map char span → bbox indices
bbox_indices = []
if found:
span_start, span_end = best_span
for idx, (start, end) in enumerate(text_positions):
if end < span_start:
continue
if start > span_end:
break
bbox_indices.append(idx)
return best_candidate, best_score, found, bbox_indices
async def verify_ground_truth_stage17(
document_id: str,
ground_truth: Optional[Dict],
layout_elements: Optional[List[Dict]],
bboxes: Optional[List[Dict]] = None,
similarity_cutoff: float = 0.8
) -> Optional[Dict]:
"""
Stage 17: Verify and prepare ground truth annotations using research-grade fuzzy matching.
Args:
document_id: Unique document identifier
ground_truth: Ground truth data from Stage 2 (QA pairs)
layout_elements: Layout/visual elements
bboxes: List of normalized word-level bboxes for fuzzy matching
similarity_cutoff: Similarity threshold for fuzzy matching
Returns:
GT verification result dict containing confirmed keys and bbox indices
"""
try:
print(f"\n Stage 17: Verifying ground truth with fuzzy matching...")
if not ground_truth:
print(f" ⚠ Stage 17: No ground truth to verify")
return {
'passed': False,
'skipped': True,
'confirmed_keys': [],
'similarities': {},
'verbatim_gts': {},
'bbox_indices_per_key': {}
}
# If no bboxes provided, fallback to basic validation
if not bboxes:
print(f" ⚠ Stage 17: No bboxes provided for fuzzy matching, using basic validation")
confirmed_keys = list(ground_truth.keys()) if isinstance(ground_truth, dict) else []
return {
'passed': len(confirmed_keys) > 0,
'skipped': False,
'confirmed_keys': confirmed_keys,
'similarities': {k: 1.0 for k in confirmed_keys},
'verbatim_gts': ground_truth if isinstance(ground_truth, dict) else {},
'bbox_indices_per_key': {}
}
# Build document text representation and map each word's char span
document_text = ""
text_positions = []
pos = 0
for b in bboxes:
word_text = b.get('text', '')
start = pos
document_text += word_text + " "
end = len(document_text) - 1
text_positions.append((start, end))
pos = len(document_text)
verbatim_gts = {}
similarities = {}
confirmed_keys = []
bbox_indices_per_key = {}
# Verify each QA pair
if isinstance(ground_truth, dict):
for question, expected_answer in ground_truth.items():
if not question or not expected_answer: continue
# Search for answer in document text
best_text, similarity, found, bbox_indices = _find_best_fuzzy_match_span(
document_text,
expected_answer,
cutoff=similarity_cutoff,
text_positions=text_positions
)
if found:
confirmed_keys.append(question)
print(f" ✓ Found match for '{question}': '{best_text}' (score: {similarity:.2f})")
else:
print(f" ✗ No fuzzy match for '{question}': expected '{expected_answer}', best was '{best_text}' (score: {similarity:.2f})")
verbatim_gts[question] = best_text.strip()
similarities[question] = similarity
bbox_indices_per_key[question] = bbox_indices
passed = len(confirmed_keys) > 0
result = {
'passed': passed,
'skipped': False,
'confirmed_keys': confirmed_keys,
'similarities': similarities,
'verbatim_gts': verbatim_gts,
'bbox_indices_per_key': bbox_indices_per_key,
'num_layout_elements': len(layout_elements) if layout_elements else 0,
'valid_labels': True
}
print(f" ✓ Stage 17: GT verification {'passed' if passed else 'failed'} - {len(confirmed_keys)} confirmed keys")
return result
except Exception as e:
print(f" ⚠ Stage 17: GT verification failed - {str(e)}")
import traceback
traceback.print_exc()
return {
'passed': False,
'skipped': False,
'confirmed_keys': [],
'similarities': {}
}
async def analyze_document_stage18(
document_id: str,
has_handwriting: bool,
has_visual_elements: bool,
has_ocr: bool,
gt_verification: Optional[Dict],
page_count: int = 1
) -> Dict:
"""
Stage 18: Generate document analysis and statistics.
Simplified version of pipeline_18_analyze.py
Args:
document_id: Unique document identifier
has_handwriting: Whether document has handwriting
has_visual_elements: Whether document has visual elements
has_ocr: Whether OCR was performed
gt_verification: GT verification results
page_count: Number of pages
Returns:
Analysis statistics dict
"""
try:
print(f"\n Stage 18: Analyzing document...")
# Document validation checks (Research Parity)
errors = []
if page_count != 1:
errors.append("is_multipage")
if not gt_verification or not gt_verification.get('passed'):
errors.append("gt_verification_failed")
if not has_ocr:
errors.append("missing_ocr")
is_valid = len(errors) == 0
# Calculate stats (Research Parity)
num_words = 0
num_chars = 0
if gt_verification and 'bbox_indices_per_key' in gt_verification:
# Total unique bboxes used in GT
all_indices = set()
for indices in gt_verification['bbox_indices_per_key'].values():
if indices:
all_indices.update(indices)
num_gt_bboxes = len(all_indices)
else:
num_gt_bboxes = 0
# Note: num_words/chars are usually passed from Stage 16/17 context
# We'll use counts from gt_verification if available
annotations_count = len(gt_verification.get('confirmed_keys', [])) if gt_verification else 0
stats = {
'total_documents': 1,
'valid_documents': 1 if is_valid else 0,
'error_counts': {error: 1 for error in errors},
'has_handwriting': 1 if has_handwriting else 0,
'has_visual_elements': 1 if has_visual_elements else 0,
'has_ocr': 1 if has_ocr else 0,
'multipage_count': 1 if page_count != 1 else 0,
'annotations_count': annotations_count,
'num_gt_bboxes': num_gt_bboxes,
'is_valid': is_valid,
'errors': errors
}
print(f" ✓ Stage 18: Analysis complete - {'valid' if is_valid else 'has errors: ' + ', '.join(errors)}")
return stats
except Exception as e:
print(f" ⚠ Stage 18: Analysis failed - {str(e)}")
import traceback
traceback.print_exc()
return {
'total_documents': 1,
'valid_documents': 0,
'error_counts': {'analysis_error': 1},
'is_valid': False
}
async def create_debug_visualization_stage19(
document_id: str,
image_base64: Optional[str],
normalized_bboxes: Optional[List[Dict]],
layout_elements: Optional[List[Dict]] = None,
gt_verification: Optional[Dict] = None,
show_text: bool = True,
ocr_color: Tuple[int, int, int] = (255, 0, 0), # Red
layout_color: Tuple[int, int, int] = (0, 0, 255), # Blue
gt_color: Tuple[int, int, int] = (0, 255, 0) # Green
) -> Optional[Dict]:
"""
Stage 19: Create multi-layer debug visualization with OCR, Layout, and GT overlays.
Args:
document_id: Unique document identifier
image_base64: Base64-encoded image
normalized_bboxes: Normalized bounding boxes
layout_elements: Optional layout elements
gt_verification: Optional GT verification results
show_text: Whether to show text labels
ocr_color: RGB color for OCR bboxes
layout_color: RGB color for Layout elements
gt_color: RGB color for GT bboxes
Returns:
Debug visualization dict with overlay image
"""
try:
print(f"\n Stage 19: Creating multi-layer debug visualization...")
if not image_base64:
print(f" ⚠ Stage 19: Missing base image")
return None
# Decode image
img_data = base64.b64decode(image_base64)
img = Image.open(io.BytesIO(img_data)).convert("RGB")
draw = ImageDraw.Draw(img)
img_w, img_h = img.size
# Load font
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 10)
large_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
except:
font = ImageFont.load_default()
large_font = ImageFont.load_default()
# LAYER 1: OCR Words (Red)
num_ocr = 0
if normalized_bboxes:
for bbox in normalized_bboxes[:500]: # Limit for performance
bx0, by0, bx2, by2 = bbox['x0'], bbox['y0'], bbox['x2'], bbox['y2']
# Handle 0-1000 scale
if bx0 > 1.1 or by0 > 1.1:
bx0, by0, bx2, by2 = bx0/1000, by0/1000, bx2/1000, by2/1000
x0, y0, x2, y2 = bx0*img_w, by0*img_h, bx2*img_w, by2*img_h
draw.rectangle([x0, y0, x2, y2], outline=ocr_color, width=1)
num_ocr += 1
# LAYER 2: Layout Elements (Blue)
num_layout = 0
if layout_elements:
for le in layout_elements:
# Use normalized bbox if available
if 'bbox' in le:
lx0, ly0, lx2, ly2 = le['bbox']
# Handle both 0-1 and 0-1000 scales
if lx0 > 1.1 or ly0 > 1.1:
lx0, ly0, lx2, ly2 = lx0/1000, ly0/1000, lx2/1000, ly2/1000
px0, py0, px2, py2 = lx0*img_w, ly0*img_h, lx2*img_w, ly2*img_h
draw.rectangle([px0, py0, px2, py2], outline=layout_color, width=2)
num_layout += 1
else:
# Legacy or unnormalized
rect = le.get('rect', {})
if 'x' in rect and 'width' in rect:
# Skip if not normalized to avoid weird drawing
pass
# LAYER 3: Ground Truth Answers (Green)
num_gt = 0
if gt_verification and 'bbox_indices_per_key' in gt_verification and normalized_bboxes:
indices_dict = gt_verification['bbox_indices_per_key']
for q, indices in indices_dict.items():
if not indices: continue
for idx in indices:
if idx < len(normalized_bboxes):
bbox = normalized_bboxes[idx]
gx0, gy0, gx2, gy2 = bbox['x0'], bbox['y0'], bbox['x2'], bbox['y2']
# Handle 0-1000 scale
if gx0 > 1.1 or gy0 > 1.1:
gx0, gy0, gx2, gy2 = gx0/1000, gy0/1000, gx2/1000, gy2/1000
x0, y0, x2, y2 = gx0*img_w, gy0*img_h, gx2*img_w, gy2*img_h
# Thick green box for GT
draw.rectangle([x0, y0, x2, y2], outline=gt_color, width=3)
# Label the GT
if show_text:
draw.text((x0, y0 - 15), f"GT: {q[:15]}", fill=gt_color, font=font)
num_gt += 1
# Add Legend
draw.text((10, 10), "DEBUG OVERLAY:", fill=(0,0,0), font=large_font)
draw.text((10, 30), f"■ OCR Words ({num_ocr})", fill=ocr_color, font=font)
draw.text((10, 45), f"■ GT Matches ({num_gt})", fill=gt_color, font=font)
# Convert back to base64
buffer = io.BytesIO()
img.save(buffer, format="PNG")
overlay_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
result = {
'bbox_overlay_base64': overlay_base64,
'num_ocr_drawn': num_ocr,
'num_gt_drawn': num_gt
}
print(f" ✓ Stage 19: Debug visualization created (OCR: {num_ocr}, GT: {num_gt})")
return result
except Exception as e:
print(f" ⚠ Stage 19: Debug visualization failed - {str(e)}")
import traceback
traceback.print_exc()
return None
async def process_stage5_complete(
document_id: str,
pdf_path: str,
image_base64: Optional[str],
ocr_results: Optional[Dict],
ground_truth: Optional[Dict],
bboxes_raw: Optional[List[Dict]] = None,
has_handwriting: bool = False,
has_visual_elements: bool = False,
layout_elements: Optional[List[Dict]] = None,
handwriting_regions: Optional[List[Dict]] = None,
page_width_mm: Optional[float] = None,
page_height_mm: Optional[float] = None,
enable_bbox_normalization: bool = True,
enable_gt_verification: bool = True,
enable_analysis: bool = True,
enable_debug_visualization: bool = False
) -> Dict:
"""
Process Stage 5: Dataset Packaging (Stages 16-19).
Args:
document_id: Unique document identifier
pdf_path: Path to PDF file
image_base64: Base64-encoded final image
ocr_results: OCR results from Stage 15
ground_truth: Ground truth from Stage 2
has_handwriting: Whether handwriting was generated
has_visual_elements: Whether visual elements were generated
layout_elements: Layout/visual element metadata
enable_*: Feature flags for each sub-stage
Returns:
Dict with all Stage 5 results
"""
results = {
'normalized_bboxes_word': None,
'normalized_bboxes_segment': None,
'gt_verification': None,
'analysis_stats': None,
'debug_visualization': None
}
try:
print(f"\n========== Stage 5: Dataset Packaging ==========")
# Stage 16: Normalize bboxes
if enable_bbox_normalization:
# Calculate PDF size in points for raw normalization
pdf_w_pt = page_width_mm * 72 / 25.4 if page_width_mm else None
pdf_h_pt = page_height_mm * 72 / 25.4 if page_height_mm else None
norm_words, norm_segments, norm_raw = await normalize_bboxes_stage16(
document_id=document_id,
pdf_path=pdf_path,
ocr_results=ocr_results,
bboxes_raw=bboxes_raw,
pdf_width_pt=pdf_w_pt,
pdf_height_pt=pdf_h_pt,
scale=settings.BBOX_NORMALIZATION_SCALE
)
results['normalized_bboxes_word'] = norm_words
results['normalized_bboxes_segment'] = norm_segments
results['normalized_bboxes_word_raw'] = norm_raw
# ALSO Normalize layout elements (Visual Elements + Handwriting) - Research Parity
all_layout = (layout_elements or []) + (handwriting_regions or [])
if all_layout and page_width_mm and page_height_mm:
normalized_layout = []
for elem in all_layout:
rect = elem.get('rect', {})
if not rect: continue
# mm -> normalized
lx0 = rect.get('x', 0) / page_width_mm
ly0 = rect.get('y', 0) / page_height_mm
lx2 = (rect.get('x', 0) + rect.get('width', 0)) / page_width_mm
ly2 = (rect.get('y', 0) + rect.get('height', 0)) / page_height_mm
if settings.BBOX_NORMALIZATION_SCALE == "0-1000":
lx0 *= 1000
ly0 *= 1000
lx2 *= 1000
ly2 *= 1000
norm_elem = elem.copy()
norm_elem['bbox'] = [lx0, ly0, lx2, ly2]
normalized_layout.append(norm_elem)
results['normalized_layout_elements'] = normalized_layout
print(f" ✓ Stage 16: Normalized {len(normalized_layout)} layout elements (VE + HW)")
else:
results['normalized_layout_elements'] = all_layout
# Stage 17: Verify GT
if enable_gt_verification:
gt_verification = await verify_ground_truth_stage17(
document_id=document_id,
ground_truth=ground_truth,
layout_elements=results.get('normalized_layout_elements'),
bboxes=results['normalized_bboxes_word'] or results['normalized_bboxes_word_raw'],
similarity_cutoff=settings.GT_VERIFICATION_SIMILARITY_CUTOFF
)
results['gt_verification'] = gt_verification
# Stage 18: Analysis
if enable_analysis:
analysis_stats = await analyze_document_stage18(
document_id=document_id,
has_handwriting=has_handwriting,
has_visual_elements=has_visual_elements,
has_ocr=ocr_results is not None,
gt_verification=results.get('gt_verification'),
page_count=1
)
results['analysis_stats'] = analysis_stats
# Stage 19: Debug Visualization
if enable_debug_visualization:
debug_visualization = await create_debug_visualization_stage19(
document_id=document_id,
image_base64=image_base64,
normalized_bboxes=results['normalized_bboxes_word'] or results['normalized_bboxes_word_raw'],
layout_elements=results.get('normalized_layout_elements'),
gt_verification=results.get('gt_verification'),
show_text=True
)
results['debug_visualization'] = debug_visualization
print(f" ✓ Stages 16-19: Dataset packaging complete\n")
return results
except Exception as e:
print(f" ⚠ Stages 16-18 processing failed: {str(e)}")
import traceback
traceback.print_exc()
return results
# ==================== Dataset Export ====================
async def export_to_msgpack(
document_id: str,
image_path: Optional[str],
image_base64: Optional[str],
words: List[str],
word_bboxes: List[List[float]],
segment_bboxes: Optional[List[List[float]]],
ground_truth: Optional[Dict],
output_path: pathlib.Path,
image_width: Optional[int] = None,
image_height: Optional[int] = None
) -> pathlib.Path:
"""
Export document data to msgpack format.
This creates a simple msgpack file containing the document data in a format
compatible with DocGenie's dataset infrastructure.
Args:
document_id: Unique document identifier
image_path: Path to document image (if available)
image_base64: Base64-encoded image (if no image_path)
words: List of word strings
word_bboxes: Word-level bounding boxes (normalized [0,1])
segment_bboxes: Segment-level bounding boxes (normalized [0,1])
ground_truth: Ground truth annotations
output_path: Output msgpack file path
image_width: Image width in pixels
image_height: Image height in pixels
Returns:
Path to created msgpack file
"""
try:
from datadings.writer import FileWriter
print(f"\\n========== Msgpack Export ==========")
print(f" Exporting document {document_id} to msgpack format...")
# Prepare document data
doc_data = {
"key": document_id,
"sample_id": document_id,
"words": words,
"word_bboxes": word_bboxes, # Should already be normalized [0,1]
}
# Add segment bboxes if available
if segment_bboxes:
doc_data["segment_level_bboxes"] = segment_bboxes
else:
# Fallback: use word bboxes as segment bboxes
doc_data["segment_level_bboxes"] = word_bboxes
# Add image dimensions if available
if image_width and image_height:
doc_data["image_width"] = image_width
doc_data["image_height"] = image_height
# Add image path if available
if image_path:
doc_data["image_file_path"] = str(image_path)
# Process ground truth annotations
if ground_truth:
# Extract classification label if exists
if "label" in ground_truth:
doc_data["label"] = ground_truth["label"]
# Extract entity labels (for NER/token classification)
if "entities" in ground_truth:
entities = ground_truth["entities"]
if entities:
# Create word-level labels (default "O" for outside)
word_labels = ["O"] * len(words)
# Map entities to words
for entity in entities:
entity_text = entity.get("text", "")
entity_label = entity.get("label", "ENTITY")
# Simple matching: find words that match entity text
entity_words = entity_text.split()
for i, word in enumerate(words):
if word in entity_words:
word_labels[i] = f"B-{entity_label}" if i == 0 or word_labels[i-1] == "O" else f"I-{entity_label}"
doc_data["word_labels"] = word_labels
# Extract QA pairs (for extractive QA)
if "questions" in ground_truth:
qa_pairs = []
for qa in ground_truth["questions"]:
qa_pair = {
"question": qa.get("question", ""),
"answers": qa.get("answers", []),
"question_id": qa.get("id", "")
}
qa_pairs.append(qa_pair)
doc_data["qa_pairs"] = qa_pairs
# Extract layout annotations (for document layout analysis)
if "layout_elements" in ground_truth:
layout_elements = ground_truth["layout_elements"]
annotated_objects = []
for elem in layout_elements:
obj = {
"label": elem.get("label", "text"),
"bbox": elem.get("bbox", [0, 0, 1, 1]), # Normalized bbox
"score": elem.get("score", 1.0)
}
annotated_objects.append(obj)
doc_data["annotated_objects"] = annotated_objects
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write to msgpack file
with FileWriter(output_path, overwrite=True) as writer:
writer.write(doc_data)
print(f" ✓ Msgpack exported: {output_path}")
print(f" - Words: {len(words)}")
print(f" - Word BBoxes: {len(word_bboxes)}")
print(f" - Segment BBoxes: {len(doc_data['segment_level_bboxes'])}")
if "word_labels" in doc_data:
print(f" - Labels: {len(doc_data['word_labels'])}")
if "qa_pairs" in doc_data:
print(f" - QA Pairs: {len(doc_data['qa_pairs'])}")
return output_path
except ImportError:
print(f" ⚠ Warning: 'datadings' package not available. Msgpack export skipped.")
print(f" Install with: pip install datadings")
return None
except Exception as e:
print(f" ⚠ Msgpack export failed: {str(e)}")
import traceback
traceback.print_exc()
return None
def save_individual_tokens_to_disk(
handwriting_images: dict,
visual_element_images: dict,
output_dir: pathlib.Path,
doc_id: str
) -> dict:
"""
Save individual handwriting tokens and visual elements to disk.
Used for 'dataset' and 'complete' output detail levels.
Args:
handwriting_images: Dict {hw_id: base64_png}
visual_element_images: Dict {ve_id: base64_png}
output_dir: Base output directory
doc_id: Document ID for folder naming
Returns:
dict with paths to saved files
"""
import base64
saved_files = {
'handwriting_tokens': [],
'visual_elements': []
}
# Save handwriting tokens
if handwriting_images:
hw_dir = output_dir / doc_id / "handwriting_tokens"
hw_dir.mkdir(parents=True, exist_ok=True)
for hw_id, img_b64 in handwriting_images.items():
img_bytes = base64.b64decode(img_b64)
img_path = hw_dir / f"{hw_id}.png"
img_path.write_bytes(img_bytes)
saved_files['handwriting_tokens'].append(str(img_path.relative_to(output_dir)))
# Save visual elements
if visual_element_images:
ve_dir = output_dir / doc_id / "visual_elements"
ve_dir.mkdir(parents=True, exist_ok=True)
for ve_id, img_b64 in visual_element_images.items():
img_bytes = base64.b64decode(img_b64)
img_path = ve_dir / f"{ve_id}.png"
img_path.write_bytes(img_bytes)
saved_files['visual_elements'].append(str(img_path.relative_to(output_dir)))
return saved_files
def create_token_mapping_json(
handwriting_regions: list[dict],
handwriting_images: dict,
visual_elements: list[dict],
visual_element_images: dict
) -> dict:
"""
Create mapping JSON for ML dataset creation.
Includes style IDs, positions, and image references.
Args:
handwriting_regions: List of handwriting metadata
handwriting_images: Dict of handwriting images
visual_elements: List of visual element metadata
visual_element_images: Dict of visual element images
Returns:
dict with complete token mapping
"""
mapping = {
'handwriting': {
'tokens': [],
'total_count': len(handwriting_regions)
},
'visual_elements': {
'items': [],
'total_count': len(visual_elements)
}
}
# Add handwriting token info
for hw_region in handwriting_regions:
hw_id = hw_region.get('id', 'unknown')
token_info = {
'id': hw_id,
'text': hw_region.get('text', ''),
'author_id': hw_region.get('author_id'),
'is_signature': hw_region.get('is_signature', False),
'rect': hw_region.get('rect', {}),
'has_image': hw_id in handwriting_images,
'image_filename': f"{hw_id}.png" if hw_id in handwriting_images else None
}
mapping['handwriting']['tokens'].append(token_info)
# Add visual element info
for ve in visual_elements:
ve_id = ve.get('id', 'unknown')
ve_info = {
'id': ve_id,
'type': ve.get('type', 'unknown'),
'content': ve.get('content'),
'rect': ve.get('rect', {}),
'has_image': ve_id in visual_element_images,
'image_filename': f"{ve_id}.png" if ve_id in visual_element_images else None
}
mapping['visual_elements']['items'].append(ve_info)
return mapping
def extract_all_bboxes_from_pdf(pdf_path: pathlib.Path) -> Dict[str, List[dict]]:
"""
Extract both word-level and character-level bounding boxes from PDF.
This is a high-priority feature for ML datasets as it provides:
- Word-level bboxes: Ground truth text positions from PDF
- Character-level bboxes: Fine-grained localization for character recognition
Args:
pdf_path: Path to PDF file
Returns:
Dictionary with 'word' and 'char' keys containing bbox lists
"""
from docgenie.generation.pipeline_04.extract_bbox import extract_bboxes_from_pdf
# Extract word-level bboxes
word_bboxes_raw = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="word"
)
# Extract character-level bboxes
char_bboxes_raw = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="char"
)
# Convert OCRBox objects to dict format
word_bboxes = []
for bbox in word_bboxes_raw:
word_bboxes.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width,
"height": bbox.height,
"bbox": [bbox.x0, bbox.y0, bbox.x2, bbox.y2],
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0
})
char_bboxes = []
for bbox in char_bboxes_raw:
char_bboxes.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width,
"height": bbox.height,
"bbox": [bbox.x0, bbox.y0, bbox.x2, bbox.y2],
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0
})
return {
"word": word_bboxes,
"char": char_bboxes
}
def extract_raw_annotations_from_geometries(geometries: List[dict]) -> List[dict]:
"""
Extract raw layout annotations (bounding boxes) from geometries.
This is a high-priority feature for ML datasets as it provides:
- Layout bounding boxes before any normalization
- Shows original coordinate space from HTML rendering
- Useful for debugging annotation processing pipeline
Args:
geometries: List of geometry dictionaries from HTML rendering
Returns:
List of layout annotation dictionaries with bbox coordinates
"""
annotations = []
for geom in geometries:
# Only extract layout elements (class starts with "LE-")
class_name = geom.get('class', '')
if not class_name.startswith('LE-'):
continue
# Extract bbox from rect
rect = geom.get('rect', {})
if not rect:
continue
annotation = {
'class': class_name,
'type': 'layout_element',
'bbox': {
'x': rect.get('x', 0),
'y': rect.get('y', 0),
'width': rect.get('width', 0),
'height': rect.get('height', 0)
},
'text': geom.get('text', ''),
'attributes': geom.get('attributes', {})
}
# Compute x2, y2 for convenience
annotation['bbox']['x2'] = annotation['bbox']['x'] + annotation['bbox']['width']
annotation['bbox']['y2'] = annotation['bbox']['y'] + annotation['bbox']['height']
annotations.append(annotation)
return annotations