Spaces:
Sleeping
Sleeping
| """ | |
| Content Stream Parser Module | |
| Provides functionality for extracting and analyzing PDF content stream operators, | |
| correlating them with visual blocks. | |
| """ | |
| import re | |
| from typing import Dict, List, Optional, Any, Tuple | |
| import fitz # PyMuPDF | |
| def extract_content_stream_for_block( | |
| pdf_path: str, | |
| page_index: int, | |
| block_index: int, | |
| blocks: List[Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Extract content stream operators for a specific block. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| page_index: 0-based page index | |
| block_index: Index of the block to analyze | |
| blocks: List of BlockInfo objects from extract_blocks_spans | |
| Returns: | |
| Dictionary with operators, raw stream, and metadata | |
| """ | |
| if block_index < 0 or block_index >= len(blocks): | |
| return { | |
| 'error': 'Invalid block index', | |
| 'operators': [], | |
| 'raw_stream': '' | |
| } | |
| target_block = blocks[block_index] | |
| try: | |
| doc = fitz.open(pdf_path) | |
| page = doc[page_index] | |
| # Clean and consolidate content streams | |
| page.clean_contents() | |
| # Get the page's content stream xref | |
| xref = page.get_contents()[0] # Get first content stream xref | |
| # Extract raw stream data | |
| stream_data = doc.xref_stream(xref) | |
| try: | |
| raw_stream = stream_data.decode('latin-1') | |
| except: | |
| raw_stream = stream_data.decode('utf-8', errors='ignore') | |
| # Parse text objects from the stream | |
| text_objects = _parse_text_objects(raw_stream) | |
| # Find the text object that matches our target block | |
| matching_object = _find_matching_text_object(text_objects, target_block) | |
| doc.close() | |
| if matching_object: | |
| return { | |
| 'operators': matching_object['operators'], | |
| 'raw_stream': raw_stream, | |
| 'matched': True, | |
| 'block_text': target_block.text[:100] | |
| } | |
| else: | |
| return { | |
| 'operators': [], | |
| 'raw_stream': raw_stream, | |
| 'matched': False, | |
| 'block_text': target_block.text[:100], | |
| 'message': 'Could not find matching text object in content stream' | |
| } | |
| except Exception as e: | |
| return { | |
| 'error': str(e), | |
| 'operators': [], | |
| 'raw_stream': '' | |
| } | |
| def _parse_text_objects(content_stream: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parse text objects (BT...ET blocks) from content stream. | |
| Args: | |
| content_stream: Raw PDF content stream text | |
| Returns: | |
| List of text objects with their operators | |
| """ | |
| text_objects = [] | |
| # Find all BT...ET blocks | |
| bt_et_pattern = r'BT\s+(.*?)\s+ET' | |
| matches = re.finditer(bt_et_pattern, content_stream, re.DOTALL) | |
| for match in matches: | |
| text_block = match.group(1) | |
| operators = _parse_operators(text_block) | |
| text_objects.append({ | |
| 'operators': operators, | |
| 'text': _extract_text_from_operators(operators) | |
| }) | |
| return text_objects | |
| def _parse_operators(text_block: str) -> List[Dict[str, str]]: | |
| """ | |
| Parse individual operators from a text block. | |
| Args: | |
| text_block: Text between BT and ET | |
| Returns: | |
| List of operator dictionaries with type and value | |
| """ | |
| operators = [] | |
| # Text matrix (Tm) | |
| tm_pattern = r'([\d.\-\s]+)\s+Tm' | |
| for match in re.finditer(tm_pattern, text_block): | |
| operators.append({ | |
| 'type': 'Tm', | |
| 'value': match.group(1).strip(), | |
| 'description': 'Text Matrix' | |
| }) | |
| # Font (Tf) | |
| tf_pattern = r'/(\S+)\s+([\d.]+)\s+Tf' | |
| for match in re.finditer(tf_pattern, text_block): | |
| operators.append({ | |
| 'type': 'Tf', | |
| 'value': f'/{match.group(1)} {match.group(2)}', | |
| 'description': f'Font: {match.group(1)}, Size: {match.group(2)}' | |
| }) | |
| # Text positioning (Td, TD) | |
| td_pattern = r'([\d.\-]+)\s+([\d.\-]+)\s+T[dD]' | |
| for match in re.finditer(td_pattern, text_block): | |
| operators.append({ | |
| 'type': 'Td', | |
| 'value': f'{match.group(1)} {match.group(2)}', | |
| 'description': f'Move text position ({match.group(1)}, {match.group(2)})' | |
| }) | |
| # Text showing (Tj) | |
| tj_pattern = r'\((.*?)\)\s*Tj' | |
| for match in re.finditer(tj_pattern, text_block): | |
| text = match.group(1) | |
| operators.append({ | |
| 'type': 'Tj', | |
| 'value': f'({text})', | |
| 'description': f'Show text: {text[:50]}' | |
| }) | |
| # Text showing (TJ - array) | |
| tj_array_pattern = r'\[(.*?)\]\s*TJ' | |
| for match in re.finditer(tj_array_pattern, text_block, re.DOTALL): | |
| array_content = match.group(1) | |
| operators.append({ | |
| 'type': 'TJ', | |
| 'value': f'[{array_content[:100]}]', | |
| 'description': 'Show text array' | |
| }) | |
| # Text leading (TL) | |
| tl_pattern = r'([\d.\-]+)\s+TL' | |
| for match in re.finditer(tl_pattern, text_block): | |
| operators.append({ | |
| 'type': 'TL', | |
| 'value': match.group(1), | |
| 'description': f'Text leading: {match.group(1)}' | |
| }) | |
| # Color operators (rg, RG, g, G) | |
| color_pattern = r'([\d.\s]+)\s+(rg|RG|g|G)' | |
| for match in re.finditer(color_pattern, text_block): | |
| operators.append({ | |
| 'type': match.group(2), | |
| 'value': match.group(1).strip(), | |
| 'description': f'Color: {match.group(1).strip()}' | |
| }) | |
| return operators | |
| def _extract_text_from_operators(operators: List[Dict[str, str]]) -> str: | |
| """ | |
| Extract visible text from operator list. | |
| Args: | |
| operators: List of operator dictionaries | |
| Returns: | |
| Concatenated text content | |
| """ | |
| text_parts = [] | |
| for op in operators: | |
| if op['type'] in ['Tj', 'TJ']: | |
| # Extract text from parentheses or array | |
| value = op['value'] | |
| # Simple extraction - just get content in parentheses | |
| matches = re.findall(r'\((.*?)\)', value) | |
| text_parts.extend(matches) | |
| return ' '.join(text_parts) | |
| def _find_matching_text_object( | |
| text_objects: List[Dict[str, Any]], | |
| target_block: Any | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find the text object that best matches the target block. | |
| Args: | |
| text_objects: List of parsed text objects | |
| target_block: BlockInfo object to match | |
| Returns: | |
| Matching text object or None | |
| """ | |
| target_text = target_block.text.strip() | |
| if not target_text: | |
| return None | |
| best_match = None | |
| best_score = 0 | |
| for text_obj in text_objects: | |
| obj_text = text_obj['text'].strip() | |
| if not obj_text: | |
| continue | |
| # Calculate similarity score (simple substring matching) | |
| # Check if either text contains the other | |
| if target_text in obj_text or obj_text in target_text: | |
| score = min(len(target_text), len(obj_text)) / max(len(target_text), len(obj_text)) | |
| if score > best_score: | |
| best_score = score | |
| best_match = text_obj | |
| # Only return match if score is reasonable | |
| if best_score > 0.3: | |
| return best_match | |
| return None | |
| def format_operators_markdown(result: Dict[str, Any]) -> str: | |
| """ | |
| Format operators as readable Markdown. | |
| Args: | |
| result: Result dictionary from extract_content_stream_for_block | |
| Returns: | |
| Formatted Markdown string | |
| """ | |
| if 'error' in result: | |
| return f"## Error\n\n{result['error']}" | |
| lines = [ | |
| "## Content Stream Operators", | |
| "", | |
| f"**Block Text**: {result.get('block_text', 'N/A')}", | |
| "" | |
| ] | |
| if not result.get('matched'): | |
| lines.extend([ | |
| "⚠️ **Warning**: Could not find exact matching text object in content stream.", | |
| "", | |
| result.get('message', ''), | |
| "" | |
| ]) | |
| operators = result.get('operators', []) | |
| if operators: | |
| lines.extend([ | |
| "### Operators Found", | |
| "" | |
| ]) | |
| for i, op in enumerate(operators, 1): | |
| lines.append(f"**{i}. {op['type']}**") | |
| lines.append(f" - Value: `{op['value']}`") | |
| lines.append(f" - {op['description']}") | |
| lines.append("") | |
| else: | |
| lines.append("No operators found.") | |
| return "\n".join(lines) | |
| def format_raw_stream(raw_stream: str, max_lines: int = 100) -> str: | |
| """ | |
| Format raw content stream for display. | |
| Args: | |
| raw_stream: Raw PDF content stream text | |
| max_lines: Maximum number of lines to display | |
| Returns: | |
| Formatted string | |
| """ | |
| lines = raw_stream.split('\n') | |
| if len(lines) > max_lines: | |
| lines = lines[:max_lines] | |
| lines.append(f"\n... (truncated, {len(raw_stream.split('\n')) - max_lines} more lines)") | |
| return '\n'.join(lines) | |