pdfinspector / structure_tree.py
rianders's picture
advanced featrues
27fda3f
"""
Structure Tree Analysis Module
Provides functionality for extracting and analyzing PDF structure trees,
including paragraph detection and block-to-tag mapping.
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Tuple
import pikepdf
import statistics
from collections import Counter
@dataclass
class StructureNode:
"""Represents a node in the PDF structure tree."""
tag_type: str # P, H1, Document, etc.
depth: int
mcid: Optional[int] = None
alt_text: Optional[str] = None
actual_text: Optional[str] = None
page_ref: Optional[int] = None
children: List['StructureNode'] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
'tag_type': self.tag_type,
'depth': self.depth,
'mcid': self.mcid,
'alt_text': self.alt_text,
'actual_text': self.actual_text,
'page_ref': self.page_ref,
'children': [child.to_dict() for child in self.children]
}
def extract_structure_tree(pdf_path: str) -> Optional[StructureNode]:
"""
Extract the complete structure tree from a PDF.
Args:
pdf_path: Path to the PDF file
Returns:
Root StructureNode or None if no structure tree exists
"""
try:
with pikepdf.open(pdf_path) as pdf:
if '/StructTreeRoot' not in pdf.Root:
return None
struct_root = pdf.Root.StructTreeRoot
# Create root node
root_node = StructureNode(
tag_type="StructTreeRoot",
depth=0
)
# Recursively parse the tree
if '/K' in struct_root:
_parse_structure_element(struct_root.K, root_node, 1, pdf)
return root_node
except Exception as e:
print(f"Error extracting structure tree: {e}")
return None
def _parse_structure_element(element, parent_node: StructureNode, depth: int, pdf: pikepdf.Pdf, max_depth: int = 20):
"""
Recursively parse a structure element and its children.
Args:
element: pikepdf object representing the element
parent_node: Parent StructureNode to attach children to
depth: Current depth in the tree
pdf: pikepdf.Pdf object for resolving references
max_depth: Maximum recursion depth to prevent infinite loops
"""
if depth > max_depth:
return
# Handle arrays of elements
if isinstance(element, pikepdf.Array):
for item in element:
_parse_structure_element(item, parent_node, depth, pdf, max_depth)
return
# Handle MCID (Marked Content ID) - leaf node
if isinstance(element, int):
node = StructureNode(
tag_type="MCID",
depth=depth,
mcid=element
)
parent_node.children.append(node)
return
# Handle dictionary (structure element)
if isinstance(element, pikepdf.Dictionary):
# Extract tag type
tag_type = str(element.get('/S', 'Unknown'))
if tag_type.startswith('/'):
tag_type = tag_type[1:] # Remove leading slash
# Extract attributes
mcid = None
if '/MCID' in element:
mcid = int(element.MCID)
alt_text = None
if '/Alt' in element:
try:
alt_text = str(element.Alt)
except:
pass
actual_text = None
if '/ActualText' in element:
try:
actual_text = str(element.ActualText)
except:
pass
page_ref = None
if '/Pg' in element:
try:
# Find the page number
page_obj = element.Pg
for i, page in enumerate(pdf.pages):
if page.obj == page_obj:
page_ref = i
break
except:
pass
# Create node
node = StructureNode(
tag_type=tag_type,
depth=depth,
mcid=mcid,
alt_text=alt_text,
actual_text=actual_text,
page_ref=page_ref
)
parent_node.children.append(node)
# Recursively process children
if '/K' in element:
_parse_structure_element(element.K, node, depth + 1, pdf, max_depth)
def format_tree_text(root: StructureNode, max_nodes: int = 500) -> str:
"""
Format structure tree as indented text with box-drawing characters.
Args:
root: Root StructureNode
max_nodes: Maximum number of nodes to display
Returns:
Formatted text representation
"""
lines = []
node_count = [0] # Use list to allow modification in nested function
def _format_node(node: StructureNode, prefix: str = "", is_last: bool = True):
if node_count[0] >= max_nodes:
if node_count[0] == max_nodes:
lines.append(f"{prefix}... (truncated, tree too large)")
node_count[0] += 1
return
# Format node info
info = node.tag_type
if node.mcid is not None:
info += f" [MCID: {node.mcid}]"
if node.alt_text:
info += f" (Alt: {node.alt_text[:30]}...)" if len(node.alt_text) > 30 else f" (Alt: {node.alt_text})"
if node.actual_text:
info += f" (Text: {node.actual_text[:30]}...)" if len(node.actual_text) > 30 else f" (Text: {node.actual_text})"
if node.page_ref is not None:
info += f" [Page {node.page_ref + 1}]"
# Add line with appropriate prefix
if node.depth == 0:
lines.append(info)
else:
connector = "└── " if is_last else "├── "
lines.append(f"{prefix}{connector}{info}")
node_count[0] += 1
# Process children
if node.children:
extension = " " if is_last else "│ "
new_prefix = prefix + extension if node.depth > 0 else ""
for i, child in enumerate(node.children):
is_last_child = (i == len(node.children) - 1)
_format_node(child, new_prefix, is_last_child)
_format_node(root)
return "\n".join(lines)
def get_tree_statistics(root: StructureNode) -> Dict[str, Any]:
"""
Calculate statistics about the structure tree.
Args:
root: Root StructureNode
Returns:
Dictionary of statistics
"""
node_count = 0
max_depth = 0
tag_counts = Counter()
pages_with_structure = set()
nodes_with_alt_text = 0
nodes_with_actual_text = 0
mcid_count = 0
def _traverse(node: StructureNode):
nonlocal node_count, max_depth, nodes_with_alt_text, nodes_with_actual_text, mcid_count
node_count += 1
max_depth = max(max_depth, node.depth)
tag_counts[node.tag_type] += 1
if node.page_ref is not None:
pages_with_structure.add(node.page_ref)
if node.alt_text:
nodes_with_alt_text += 1
if node.actual_text:
nodes_with_actual_text += 1
if node.mcid is not None:
mcid_count += 1
for child in node.children:
_traverse(child)
_traverse(root)
return {
'total_nodes': node_count,
'max_depth': max_depth,
'tag_type_counts': dict(tag_counts.most_common()),
'pages_with_structure': sorted(list(pages_with_structure)),
'nodes_with_alt_text': nodes_with_alt_text,
'nodes_with_actual_text': nodes_with_actual_text,
'mcid_count': mcid_count
}
def format_statistics_markdown(stats: Dict[str, Any]) -> str:
"""Format tree statistics as Markdown."""
lines = [
"## Structure Tree Statistics",
"",
f"**Total Nodes**: {stats['total_nodes']}",
f"**Maximum Depth**: {stats['max_depth']}",
f"**Nodes with Alt Text**: {stats['nodes_with_alt_text']}",
f"**Nodes with Actual Text**: {stats['nodes_with_actual_text']}",
f"**MCID References**: {stats['mcid_count']}",
"",
"### Tag Type Distribution",
""
]
for tag, count in stats['tag_type_counts'].items():
lines.append(f"- **{tag}**: {count}")
lines.extend([
"",
f"**Pages with Structure**: {len(stats['pages_with_structure'])}"
])
if stats['pages_with_structure']:
page_list = ", ".join([str(p + 1) for p in stats['pages_with_structure'][:20]])
if len(stats['pages_with_structure']) > 20:
page_list += f" ... ({len(stats['pages_with_structure']) - 20} more)"
lines.append(f"({page_list})")
return "\n".join(lines)
def extract_mcid_for_page(pdf_path: str, page_index: int) -> List[int]:
"""
Extract all MCIDs (Marked Content IDs) from a page's content stream.
Args:
pdf_path: Path to PDF file
page_index: 0-based page index
Returns:
List of MCIDs found in the page
"""
import re
try:
with pikepdf.open(pdf_path) as pdf:
page = pdf.pages[page_index]
# Get the content stream
if '/Contents' not in page:
return []
# Extract content stream as text
contents = page.Contents
if isinstance(contents, pikepdf.Array):
# Multiple content streams
stream_data = b""
for stream in contents:
stream_data += bytes(stream.read_bytes())
else:
stream_data = bytes(contents.read_bytes())
# Decode content stream
try:
content_text = stream_data.decode('latin-1')
except:
content_text = stream_data.decode('utf-8', errors='ignore')
# Extract MCIDs using regex
# Pattern: /MCID <number> BDC or /MCID <number> >> BDC
mcid_pattern = r'/MCID\s+(\d+)'
matches = re.findall(mcid_pattern, content_text)
return [int(m) for m in matches]
except Exception as e:
print(f"Error extracting MCIDs: {e}")
return []
def map_blocks_to_tags(pdf_path: str, page_index: int, blocks) -> List[Dict[str, Any]]:
"""
Map visual blocks to structure tree tags via MCIDs.
Args:
pdf_path: Path to PDF file
page_index: 0-based page index
blocks: List of BlockInfo objects from extract_blocks_spans
Returns:
List of mappings with block index, tag info, MCID, alt text
"""
# Extract structure tree
root = extract_structure_tree(pdf_path)
if not root:
return []
# Get MCIDs from page
page_mcids = extract_mcid_for_page(pdf_path, page_index)
# Build MCID to structure node mapping
mcid_to_node = {}
def _find_mcids(node: StructureNode):
if node.mcid is not None and (node.page_ref is None or node.page_ref == page_index):
mcid_to_node[node.mcid] = node
for child in node.children:
_find_mcids(child)
_find_mcids(root)
# Create mappings
mappings = []
for i, mcid in enumerate(page_mcids):
if i < len(blocks) and mcid in mcid_to_node:
node = mcid_to_node[mcid]
mappings.append({
'block_index': i,
'tag_type': node.tag_type,
'mcid': mcid,
'alt_text': node.alt_text or "",
'actual_text': node.actual_text or ""
})
return mappings
def detect_visual_paragraphs(blocks, vertical_gap_threshold: float = 15.0) -> List[List[int]]:
"""
Detect visual paragraphs based on spacing heuristics.
Args:
blocks: List of BlockInfo objects
vertical_gap_threshold: Minimum vertical gap to consider a paragraph break
Returns:
List of paragraph groups, where each group is a list of block indices
"""
# Filter text blocks and sort by position
text_blocks = [(i, b) for i, b in enumerate(blocks) if b.block_type == 0 and b.text.strip()]
if not text_blocks:
return []
# Sort by vertical position (top to bottom)
text_blocks.sort(key=lambda x: x[1].bbox[1])
paragraphs = []
current_paragraph = [text_blocks[0][0]]
prev_bbox = text_blocks[0][1].bbox
for idx, block in text_blocks[1:]:
bbox = block.bbox
# Calculate vertical gap
vertical_gap = bbox[1] - prev_bbox[3]
# Check if blocks are roughly aligned horizontally (same column)
horizontal_overlap = min(bbox[2], prev_bbox[2]) - max(bbox[0], prev_bbox[0])
if vertical_gap < vertical_gap_threshold and horizontal_overlap > 0:
# Same paragraph
current_paragraph.append(idx)
else:
# New paragraph
paragraphs.append(current_paragraph)
current_paragraph = [idx]
prev_bbox = bbox
# Add last paragraph
if current_paragraph:
paragraphs.append(current_paragraph)
return paragraphs
def detect_semantic_paragraphs(pdf_path: str, page_index: int) -> List[StructureNode]:
"""
Extract semantic paragraph tags (<P>) from structure tree.
Args:
pdf_path: Path to PDF file
page_index: 0-based page index
Returns:
List of StructureNode objects with tag_type='P' for the page
"""
root = extract_structure_tree(pdf_path)
if not root:
return []
paragraphs = []
def _find_paragraphs(node: StructureNode):
if node.tag_type == 'P' and (node.page_ref is None or node.page_ref == page_index):
paragraphs.append(node)
for child in node.children:
_find_paragraphs(child)
_find_paragraphs(root)
return paragraphs
def compare_paragraphs(visual_paragraphs: List[List[int]], semantic_paragraphs: List[StructureNode]) -> Dict[str, Any]:
"""
Compare visual and semantic paragraph detection.
Args:
visual_paragraphs: List of visual paragraph groups (block indices)
semantic_paragraphs: List of semantic <P> tags
Returns:
Dictionary with comparison statistics
"""
visual_count = len(visual_paragraphs)
semantic_count = len(semantic_paragraphs)
# Calculate match quality score (simple heuristic)
if visual_count == 0 and semantic_count == 0:
match_score = 1.0
elif visual_count == 0 or semantic_count == 0:
match_score = 0.0
else:
# Score based on count similarity
match_score = min(visual_count, semantic_count) / max(visual_count, semantic_count)
return {
'visual_count': visual_count,
'semantic_count': semantic_count,
'match_score': match_score,
'count_mismatch': abs(visual_count - semantic_count)
}