""" Structure Tree Analysis Module Provides functionality for extracting and analyzing PDF structure trees, including paragraph detection and block-to-tag mapping. """ from dataclasses import dataclass, field from typing import List, Optional, Dict, Any, Tuple import pikepdf import statistics from collections import Counter @dataclass class StructureNode: """Represents a node in the PDF structure tree.""" tag_type: str # P, H1, Document, etc. depth: int mcid: Optional[int] = None alt_text: Optional[str] = None actual_text: Optional[str] = None page_ref: Optional[int] = None children: List['StructureNode'] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { 'tag_type': self.tag_type, 'depth': self.depth, 'mcid': self.mcid, 'alt_text': self.alt_text, 'actual_text': self.actual_text, 'page_ref': self.page_ref, 'children': [child.to_dict() for child in self.children] } def extract_structure_tree(pdf_path: str) -> Optional[StructureNode]: """ Extract the complete structure tree from a PDF. Args: pdf_path: Path to the PDF file Returns: Root StructureNode or None if no structure tree exists """ try: with pikepdf.open(pdf_path) as pdf: if '/StructTreeRoot' not in pdf.Root: return None struct_root = pdf.Root.StructTreeRoot # Create root node root_node = StructureNode( tag_type="StructTreeRoot", depth=0 ) # Recursively parse the tree if '/K' in struct_root: _parse_structure_element(struct_root.K, root_node, 1, pdf) return root_node except Exception as e: print(f"Error extracting structure tree: {e}") return None def _parse_structure_element(element, parent_node: StructureNode, depth: int, pdf: pikepdf.Pdf, max_depth: int = 20): """ Recursively parse a structure element and its children. Args: element: pikepdf object representing the element parent_node: Parent StructureNode to attach children to depth: Current depth in the tree pdf: pikepdf.Pdf object for resolving references max_depth: Maximum recursion depth to prevent infinite loops """ if depth > max_depth: return # Handle arrays of elements if isinstance(element, pikepdf.Array): for item in element: _parse_structure_element(item, parent_node, depth, pdf, max_depth) return # Handle MCID (Marked Content ID) - leaf node if isinstance(element, int): node = StructureNode( tag_type="MCID", depth=depth, mcid=element ) parent_node.children.append(node) return # Handle dictionary (structure element) if isinstance(element, pikepdf.Dictionary): # Extract tag type tag_type = str(element.get('/S', 'Unknown')) if tag_type.startswith('/'): tag_type = tag_type[1:] # Remove leading slash # Extract attributes mcid = None if '/MCID' in element: mcid = int(element.MCID) alt_text = None if '/Alt' in element: try: alt_text = str(element.Alt) except: pass actual_text = None if '/ActualText' in element: try: actual_text = str(element.ActualText) except: pass page_ref = None if '/Pg' in element: try: # Find the page number page_obj = element.Pg for i, page in enumerate(pdf.pages): if page.obj == page_obj: page_ref = i break except: pass # Create node node = StructureNode( tag_type=tag_type, depth=depth, mcid=mcid, alt_text=alt_text, actual_text=actual_text, page_ref=page_ref ) parent_node.children.append(node) # Recursively process children if '/K' in element: _parse_structure_element(element.K, node, depth + 1, pdf, max_depth) def format_tree_text(root: StructureNode, max_nodes: int = 500) -> str: """ Format structure tree as indented text with box-drawing characters. Args: root: Root StructureNode max_nodes: Maximum number of nodes to display Returns: Formatted text representation """ lines = [] node_count = [0] # Use list to allow modification in nested function def _format_node(node: StructureNode, prefix: str = "", is_last: bool = True): if node_count[0] >= max_nodes: if node_count[0] == max_nodes: lines.append(f"{prefix}... (truncated, tree too large)") node_count[0] += 1 return # Format node info info = node.tag_type if node.mcid is not None: info += f" [MCID: {node.mcid}]" if node.alt_text: info += f" (Alt: {node.alt_text[:30]}...)" if len(node.alt_text) > 30 else f" (Alt: {node.alt_text})" if node.actual_text: info += f" (Text: {node.actual_text[:30]}...)" if len(node.actual_text) > 30 else f" (Text: {node.actual_text})" if node.page_ref is not None: info += f" [Page {node.page_ref + 1}]" # Add line with appropriate prefix if node.depth == 0: lines.append(info) else: connector = "└── " if is_last else "├── " lines.append(f"{prefix}{connector}{info}") node_count[0] += 1 # Process children if node.children: extension = " " if is_last else "│ " new_prefix = prefix + extension if node.depth > 0 else "" for i, child in enumerate(node.children): is_last_child = (i == len(node.children) - 1) _format_node(child, new_prefix, is_last_child) _format_node(root) return "\n".join(lines) def get_tree_statistics(root: StructureNode) -> Dict[str, Any]: """ Calculate statistics about the structure tree. Args: root: Root StructureNode Returns: Dictionary of statistics """ node_count = 0 max_depth = 0 tag_counts = Counter() pages_with_structure = set() nodes_with_alt_text = 0 nodes_with_actual_text = 0 mcid_count = 0 def _traverse(node: StructureNode): nonlocal node_count, max_depth, nodes_with_alt_text, nodes_with_actual_text, mcid_count node_count += 1 max_depth = max(max_depth, node.depth) tag_counts[node.tag_type] += 1 if node.page_ref is not None: pages_with_structure.add(node.page_ref) if node.alt_text: nodes_with_alt_text += 1 if node.actual_text: nodes_with_actual_text += 1 if node.mcid is not None: mcid_count += 1 for child in node.children: _traverse(child) _traverse(root) return { 'total_nodes': node_count, 'max_depth': max_depth, 'tag_type_counts': dict(tag_counts.most_common()), 'pages_with_structure': sorted(list(pages_with_structure)), 'nodes_with_alt_text': nodes_with_alt_text, 'nodes_with_actual_text': nodes_with_actual_text, 'mcid_count': mcid_count } def format_statistics_markdown(stats: Dict[str, Any]) -> str: """Format tree statistics as Markdown.""" lines = [ "## Structure Tree Statistics", "", f"**Total Nodes**: {stats['total_nodes']}", f"**Maximum Depth**: {stats['max_depth']}", f"**Nodes with Alt Text**: {stats['nodes_with_alt_text']}", f"**Nodes with Actual Text**: {stats['nodes_with_actual_text']}", f"**MCID References**: {stats['mcid_count']}", "", "### Tag Type Distribution", "" ] for tag, count in stats['tag_type_counts'].items(): lines.append(f"- **{tag}**: {count}") lines.extend([ "", f"**Pages with Structure**: {len(stats['pages_with_structure'])}" ]) if stats['pages_with_structure']: page_list = ", ".join([str(p + 1) for p in stats['pages_with_structure'][:20]]) if len(stats['pages_with_structure']) > 20: page_list += f" ... ({len(stats['pages_with_structure']) - 20} more)" lines.append(f"({page_list})") return "\n".join(lines) def extract_mcid_for_page(pdf_path: str, page_index: int) -> List[int]: """ Extract all MCIDs (Marked Content IDs) from a page's content stream. Args: pdf_path: Path to PDF file page_index: 0-based page index Returns: List of MCIDs found in the page """ import re try: with pikepdf.open(pdf_path) as pdf: page = pdf.pages[page_index] # Get the content stream if '/Contents' not in page: return [] # Extract content stream as text contents = page.Contents if isinstance(contents, pikepdf.Array): # Multiple content streams stream_data = b"" for stream in contents: stream_data += bytes(stream.read_bytes()) else: stream_data = bytes(contents.read_bytes()) # Decode content stream try: content_text = stream_data.decode('latin-1') except: content_text = stream_data.decode('utf-8', errors='ignore') # Extract MCIDs using regex # Pattern: /MCID BDC or /MCID >> BDC mcid_pattern = r'/MCID\s+(\d+)' matches = re.findall(mcid_pattern, content_text) return [int(m) for m in matches] except Exception as e: print(f"Error extracting MCIDs: {e}") return [] def map_blocks_to_tags(pdf_path: str, page_index: int, blocks) -> List[Dict[str, Any]]: """ Map visual blocks to structure tree tags via MCIDs. Args: pdf_path: Path to PDF file page_index: 0-based page index blocks: List of BlockInfo objects from extract_blocks_spans Returns: List of mappings with block index, tag info, MCID, alt text """ # Extract structure tree root = extract_structure_tree(pdf_path) if not root: return [] # Get MCIDs from page page_mcids = extract_mcid_for_page(pdf_path, page_index) # Build MCID to structure node mapping mcid_to_node = {} def _find_mcids(node: StructureNode): if node.mcid is not None and (node.page_ref is None or node.page_ref == page_index): mcid_to_node[node.mcid] = node for child in node.children: _find_mcids(child) _find_mcids(root) # Create mappings mappings = [] for i, mcid in enumerate(page_mcids): if i < len(blocks) and mcid in mcid_to_node: node = mcid_to_node[mcid] mappings.append({ 'block_index': i, 'tag_type': node.tag_type, 'mcid': mcid, 'alt_text': node.alt_text or "", 'actual_text': node.actual_text or "" }) return mappings def detect_visual_paragraphs(blocks, vertical_gap_threshold: float = 15.0) -> List[List[int]]: """ Detect visual paragraphs based on spacing heuristics. Args: blocks: List of BlockInfo objects vertical_gap_threshold: Minimum vertical gap to consider a paragraph break Returns: List of paragraph groups, where each group is a list of block indices """ # Filter text blocks and sort by position text_blocks = [(i, b) for i, b in enumerate(blocks) if b.block_type == 0 and b.text.strip()] if not text_blocks: return [] # Sort by vertical position (top to bottom) text_blocks.sort(key=lambda x: x[1].bbox[1]) paragraphs = [] current_paragraph = [text_blocks[0][0]] prev_bbox = text_blocks[0][1].bbox for idx, block in text_blocks[1:]: bbox = block.bbox # Calculate vertical gap vertical_gap = bbox[1] - prev_bbox[3] # Check if blocks are roughly aligned horizontally (same column) horizontal_overlap = min(bbox[2], prev_bbox[2]) - max(bbox[0], prev_bbox[0]) if vertical_gap < vertical_gap_threshold and horizontal_overlap > 0: # Same paragraph current_paragraph.append(idx) else: # New paragraph paragraphs.append(current_paragraph) current_paragraph = [idx] prev_bbox = bbox # Add last paragraph if current_paragraph: paragraphs.append(current_paragraph) return paragraphs def detect_semantic_paragraphs(pdf_path: str, page_index: int) -> List[StructureNode]: """ Extract semantic paragraph tags (

) from structure tree. Args: pdf_path: Path to PDF file page_index: 0-based page index Returns: List of StructureNode objects with tag_type='P' for the page """ root = extract_structure_tree(pdf_path) if not root: return [] paragraphs = [] def _find_paragraphs(node: StructureNode): if node.tag_type == 'P' and (node.page_ref is None or node.page_ref == page_index): paragraphs.append(node) for child in node.children: _find_paragraphs(child) _find_paragraphs(root) return paragraphs def compare_paragraphs(visual_paragraphs: List[List[int]], semantic_paragraphs: List[StructureNode]) -> Dict[str, Any]: """ Compare visual and semantic paragraph detection. Args: visual_paragraphs: List of visual paragraph groups (block indices) semantic_paragraphs: List of semantic

tags Returns: Dictionary with comparison statistics """ visual_count = len(visual_paragraphs) semantic_count = len(semantic_paragraphs) # Calculate match quality score (simple heuristic) if visual_count == 0 and semantic_count == 0: match_score = 1.0 elif visual_count == 0 or semantic_count == 0: match_score = 0.0 else: # Score based on count similarity match_score = min(visual_count, semantic_count) / max(visual_count, semantic_count) return { 'visual_count': visual_count, 'semantic_count': semantic_count, 'match_score': match_score, 'count_mismatch': abs(visual_count - semantic_count) }