Spaces:
Sleeping
Sleeping
| """ | |
| Structure Tree Analysis Module | |
| Provides functionality for extracting and analyzing PDF structure trees, | |
| including paragraph detection and block-to-tag mapping. | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Dict, Any, Tuple | |
| import pikepdf | |
| import statistics | |
| from collections import Counter | |
| class StructureNode: | |
| """Represents a node in the PDF structure tree.""" | |
| tag_type: str # P, H1, Document, etc. | |
| depth: int | |
| mcid: Optional[int] = None | |
| alt_text: Optional[str] = None | |
| actual_text: Optional[str] = None | |
| page_ref: Optional[int] = None | |
| children: List['StructureNode'] = field(default_factory=list) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON serialization.""" | |
| return { | |
| 'tag_type': self.tag_type, | |
| 'depth': self.depth, | |
| 'mcid': self.mcid, | |
| 'alt_text': self.alt_text, | |
| 'actual_text': self.actual_text, | |
| 'page_ref': self.page_ref, | |
| 'children': [child.to_dict() for child in self.children] | |
| } | |
| def extract_structure_tree(pdf_path: str) -> Optional[StructureNode]: | |
| """ | |
| Extract the complete structure tree from a PDF. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| Root StructureNode or None if no structure tree exists | |
| """ | |
| try: | |
| with pikepdf.open(pdf_path) as pdf: | |
| if '/StructTreeRoot' not in pdf.Root: | |
| return None | |
| struct_root = pdf.Root.StructTreeRoot | |
| # Create root node | |
| root_node = StructureNode( | |
| tag_type="StructTreeRoot", | |
| depth=0 | |
| ) | |
| # Recursively parse the tree | |
| if '/K' in struct_root: | |
| _parse_structure_element(struct_root.K, root_node, 1, pdf) | |
| return root_node | |
| except Exception as e: | |
| print(f"Error extracting structure tree: {e}") | |
| return None | |
| def _parse_structure_element(element, parent_node: StructureNode, depth: int, pdf: pikepdf.Pdf, max_depth: int = 20): | |
| """ | |
| Recursively parse a structure element and its children. | |
| Args: | |
| element: pikepdf object representing the element | |
| parent_node: Parent StructureNode to attach children to | |
| depth: Current depth in the tree | |
| pdf: pikepdf.Pdf object for resolving references | |
| max_depth: Maximum recursion depth to prevent infinite loops | |
| """ | |
| if depth > max_depth: | |
| return | |
| # Handle arrays of elements | |
| if isinstance(element, pikepdf.Array): | |
| for item in element: | |
| _parse_structure_element(item, parent_node, depth, pdf, max_depth) | |
| return | |
| # Handle MCID (Marked Content ID) - leaf node | |
| if isinstance(element, int): | |
| node = StructureNode( | |
| tag_type="MCID", | |
| depth=depth, | |
| mcid=element | |
| ) | |
| parent_node.children.append(node) | |
| return | |
| # Handle dictionary (structure element) | |
| if isinstance(element, pikepdf.Dictionary): | |
| # Extract tag type | |
| tag_type = str(element.get('/S', 'Unknown')) | |
| if tag_type.startswith('/'): | |
| tag_type = tag_type[1:] # Remove leading slash | |
| # Extract attributes | |
| mcid = None | |
| if '/MCID' in element: | |
| mcid = int(element.MCID) | |
| alt_text = None | |
| if '/Alt' in element: | |
| try: | |
| alt_text = str(element.Alt) | |
| except: | |
| pass | |
| actual_text = None | |
| if '/ActualText' in element: | |
| try: | |
| actual_text = str(element.ActualText) | |
| except: | |
| pass | |
| page_ref = None | |
| if '/Pg' in element: | |
| try: | |
| # Find the page number | |
| page_obj = element.Pg | |
| for i, page in enumerate(pdf.pages): | |
| if page.obj == page_obj: | |
| page_ref = i | |
| break | |
| except: | |
| pass | |
| # Create node | |
| node = StructureNode( | |
| tag_type=tag_type, | |
| depth=depth, | |
| mcid=mcid, | |
| alt_text=alt_text, | |
| actual_text=actual_text, | |
| page_ref=page_ref | |
| ) | |
| parent_node.children.append(node) | |
| # Recursively process children | |
| if '/K' in element: | |
| _parse_structure_element(element.K, node, depth + 1, pdf, max_depth) | |
| def format_tree_text(root: StructureNode, max_nodes: int = 500) -> str: | |
| """ | |
| Format structure tree as indented text with box-drawing characters. | |
| Args: | |
| root: Root StructureNode | |
| max_nodes: Maximum number of nodes to display | |
| Returns: | |
| Formatted text representation | |
| """ | |
| lines = [] | |
| node_count = [0] # Use list to allow modification in nested function | |
| def _format_node(node: StructureNode, prefix: str = "", is_last: bool = True): | |
| if node_count[0] >= max_nodes: | |
| if node_count[0] == max_nodes: | |
| lines.append(f"{prefix}... (truncated, tree too large)") | |
| node_count[0] += 1 | |
| return | |
| # Format node info | |
| info = node.tag_type | |
| if node.mcid is not None: | |
| info += f" [MCID: {node.mcid}]" | |
| if node.alt_text: | |
| info += f" (Alt: {node.alt_text[:30]}...)" if len(node.alt_text) > 30 else f" (Alt: {node.alt_text})" | |
| if node.actual_text: | |
| info += f" (Text: {node.actual_text[:30]}...)" if len(node.actual_text) > 30 else f" (Text: {node.actual_text})" | |
| if node.page_ref is not None: | |
| info += f" [Page {node.page_ref + 1}]" | |
| # Add line with appropriate prefix | |
| if node.depth == 0: | |
| lines.append(info) | |
| else: | |
| connector = "└── " if is_last else "├── " | |
| lines.append(f"{prefix}{connector}{info}") | |
| node_count[0] += 1 | |
| # Process children | |
| if node.children: | |
| extension = " " if is_last else "│ " | |
| new_prefix = prefix + extension if node.depth > 0 else "" | |
| for i, child in enumerate(node.children): | |
| is_last_child = (i == len(node.children) - 1) | |
| _format_node(child, new_prefix, is_last_child) | |
| _format_node(root) | |
| return "\n".join(lines) | |
| def get_tree_statistics(root: StructureNode) -> Dict[str, Any]: | |
| """ | |
| Calculate statistics about the structure tree. | |
| Args: | |
| root: Root StructureNode | |
| Returns: | |
| Dictionary of statistics | |
| """ | |
| node_count = 0 | |
| max_depth = 0 | |
| tag_counts = Counter() | |
| pages_with_structure = set() | |
| nodes_with_alt_text = 0 | |
| nodes_with_actual_text = 0 | |
| mcid_count = 0 | |
| def _traverse(node: StructureNode): | |
| nonlocal node_count, max_depth, nodes_with_alt_text, nodes_with_actual_text, mcid_count | |
| node_count += 1 | |
| max_depth = max(max_depth, node.depth) | |
| tag_counts[node.tag_type] += 1 | |
| if node.page_ref is not None: | |
| pages_with_structure.add(node.page_ref) | |
| if node.alt_text: | |
| nodes_with_alt_text += 1 | |
| if node.actual_text: | |
| nodes_with_actual_text += 1 | |
| if node.mcid is not None: | |
| mcid_count += 1 | |
| for child in node.children: | |
| _traverse(child) | |
| _traverse(root) | |
| return { | |
| 'total_nodes': node_count, | |
| 'max_depth': max_depth, | |
| 'tag_type_counts': dict(tag_counts.most_common()), | |
| 'pages_with_structure': sorted(list(pages_with_structure)), | |
| 'nodes_with_alt_text': nodes_with_alt_text, | |
| 'nodes_with_actual_text': nodes_with_actual_text, | |
| 'mcid_count': mcid_count | |
| } | |
| def format_statistics_markdown(stats: Dict[str, Any]) -> str: | |
| """Format tree statistics as Markdown.""" | |
| lines = [ | |
| "## Structure Tree Statistics", | |
| "", | |
| f"**Total Nodes**: {stats['total_nodes']}", | |
| f"**Maximum Depth**: {stats['max_depth']}", | |
| f"**Nodes with Alt Text**: {stats['nodes_with_alt_text']}", | |
| f"**Nodes with Actual Text**: {stats['nodes_with_actual_text']}", | |
| f"**MCID References**: {stats['mcid_count']}", | |
| "", | |
| "### Tag Type Distribution", | |
| "" | |
| ] | |
| for tag, count in stats['tag_type_counts'].items(): | |
| lines.append(f"- **{tag}**: {count}") | |
| lines.extend([ | |
| "", | |
| f"**Pages with Structure**: {len(stats['pages_with_structure'])}" | |
| ]) | |
| if stats['pages_with_structure']: | |
| page_list = ", ".join([str(p + 1) for p in stats['pages_with_structure'][:20]]) | |
| if len(stats['pages_with_structure']) > 20: | |
| page_list += f" ... ({len(stats['pages_with_structure']) - 20} more)" | |
| lines.append(f"({page_list})") | |
| return "\n".join(lines) | |
| def extract_mcid_for_page(pdf_path: str, page_index: int) -> List[int]: | |
| """ | |
| Extract all MCIDs (Marked Content IDs) from a page's content stream. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| Returns: | |
| List of MCIDs found in the page | |
| """ | |
| import re | |
| try: | |
| with pikepdf.open(pdf_path) as pdf: | |
| page = pdf.pages[page_index] | |
| # Get the content stream | |
| if '/Contents' not in page: | |
| return [] | |
| # Extract content stream as text | |
| contents = page.Contents | |
| if isinstance(contents, pikepdf.Array): | |
| # Multiple content streams | |
| stream_data = b"" | |
| for stream in contents: | |
| stream_data += bytes(stream.read_bytes()) | |
| else: | |
| stream_data = bytes(contents.read_bytes()) | |
| # Decode content stream | |
| try: | |
| content_text = stream_data.decode('latin-1') | |
| except: | |
| content_text = stream_data.decode('utf-8', errors='ignore') | |
| # Extract MCIDs using regex | |
| # Pattern: /MCID <number> BDC or /MCID <number> >> BDC | |
| mcid_pattern = r'/MCID\s+(\d+)' | |
| matches = re.findall(mcid_pattern, content_text) | |
| return [int(m) for m in matches] | |
| except Exception as e: | |
| print(f"Error extracting MCIDs: {e}") | |
| return [] | |
| def map_blocks_to_tags(pdf_path: str, page_index: int, blocks) -> List[Dict[str, Any]]: | |
| """ | |
| Map visual blocks to structure tree tags via MCIDs. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| blocks: List of BlockInfo objects from extract_blocks_spans | |
| Returns: | |
| List of mappings with block index, tag info, MCID, alt text | |
| """ | |
| # Extract structure tree | |
| root = extract_structure_tree(pdf_path) | |
| if not root: | |
| return [] | |
| # Get MCIDs from page | |
| page_mcids = extract_mcid_for_page(pdf_path, page_index) | |
| # Build MCID to structure node mapping | |
| mcid_to_node = {} | |
| def _find_mcids(node: StructureNode): | |
| if node.mcid is not None and (node.page_ref is None or node.page_ref == page_index): | |
| mcid_to_node[node.mcid] = node | |
| for child in node.children: | |
| _find_mcids(child) | |
| _find_mcids(root) | |
| # Create mappings | |
| mappings = [] | |
| for i, mcid in enumerate(page_mcids): | |
| if i < len(blocks) and mcid in mcid_to_node: | |
| node = mcid_to_node[mcid] | |
| mappings.append({ | |
| 'block_index': i, | |
| 'tag_type': node.tag_type, | |
| 'mcid': mcid, | |
| 'alt_text': node.alt_text or "", | |
| 'actual_text': node.actual_text or "" | |
| }) | |
| return mappings | |
| def detect_visual_paragraphs(blocks, vertical_gap_threshold: float = 15.0) -> List[List[int]]: | |
| """ | |
| Detect visual paragraphs based on spacing heuristics. | |
| Args: | |
| blocks: List of BlockInfo objects | |
| vertical_gap_threshold: Minimum vertical gap to consider a paragraph break | |
| Returns: | |
| List of paragraph groups, where each group is a list of block indices | |
| """ | |
| # Filter text blocks and sort by position | |
| text_blocks = [(i, b) for i, b in enumerate(blocks) if b.block_type == 0 and b.text.strip()] | |
| if not text_blocks: | |
| return [] | |
| # Sort by vertical position (top to bottom) | |
| text_blocks.sort(key=lambda x: x[1].bbox[1]) | |
| paragraphs = [] | |
| current_paragraph = [text_blocks[0][0]] | |
| prev_bbox = text_blocks[0][1].bbox | |
| for idx, block in text_blocks[1:]: | |
| bbox = block.bbox | |
| # Calculate vertical gap | |
| vertical_gap = bbox[1] - prev_bbox[3] | |
| # Check if blocks are roughly aligned horizontally (same column) | |
| horizontal_overlap = min(bbox[2], prev_bbox[2]) - max(bbox[0], prev_bbox[0]) | |
| if vertical_gap < vertical_gap_threshold and horizontal_overlap > 0: | |
| # Same paragraph | |
| current_paragraph.append(idx) | |
| else: | |
| # New paragraph | |
| paragraphs.append(current_paragraph) | |
| current_paragraph = [idx] | |
| prev_bbox = bbox | |
| # Add last paragraph | |
| if current_paragraph: | |
| paragraphs.append(current_paragraph) | |
| return paragraphs | |
| def detect_semantic_paragraphs(pdf_path: str, page_index: int) -> List[StructureNode]: | |
| """ | |
| Extract semantic paragraph tags (<P>) from structure tree. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| Returns: | |
| List of StructureNode objects with tag_type='P' for the page | |
| """ | |
| root = extract_structure_tree(pdf_path) | |
| if not root: | |
| return [] | |
| paragraphs = [] | |
| def _find_paragraphs(node: StructureNode): | |
| if node.tag_type == 'P' and (node.page_ref is None or node.page_ref == page_index): | |
| paragraphs.append(node) | |
| for child in node.children: | |
| _find_paragraphs(child) | |
| _find_paragraphs(root) | |
| return paragraphs | |
| def compare_paragraphs(visual_paragraphs: List[List[int]], semantic_paragraphs: List[StructureNode]) -> Dict[str, Any]: | |
| """ | |
| Compare visual and semantic paragraph detection. | |
| Args: | |
| visual_paragraphs: List of visual paragraph groups (block indices) | |
| semantic_paragraphs: List of semantic <P> tags | |
| Returns: | |
| Dictionary with comparison statistics | |
| """ | |
| visual_count = len(visual_paragraphs) | |
| semantic_count = len(semantic_paragraphs) | |
| # Calculate match quality score (simple heuristic) | |
| if visual_count == 0 and semantic_count == 0: | |
| match_score = 1.0 | |
| elif visual_count == 0 or semantic_count == 0: | |
| match_score = 0.0 | |
| else: | |
| # Score based on count similarity | |
| match_score = min(visual_count, semantic_count) / max(visual_count, semantic_count) | |
| return { | |
| 'visual_count': visual_count, | |
| 'semantic_count': semantic_count, | |
| 'match_score': match_score, | |
| 'count_mismatch': abs(visual_count - semantic_count) | |
| } | |