Spaces:
Sleeping
Sleeping
| import fitz # PyMuPDF | |
| import re | |
| import os | |
| import pymupdf | |
| from typing import List, Dict, Any, Optional | |
| class PDFExtractor: | |
| """Class for extracting content from PDF files.""" | |
| def __init__(self, pdf_path: str): | |
| """Initialize with the path to a PDF file.""" | |
| self.pdf_path = pdf_path | |
| self.document = fitz.open(pdf_path) | |
| def count_pages(self) -> int: | |
| """Return the number of pages in the PDF.""" | |
| return len(self.document) | |
| def get_metadata(self) -> Dict[str, Any]: | |
| """Extract metadata from the PDF.""" | |
| metadata = self.document.metadata | |
| return metadata | |
| def get_text(self, page_numbers: Optional[List[int]] = None) -> Dict[int, str]: | |
| """Extract text from specified pages or all pages.""" | |
| result = {} | |
| if page_numbers is None: | |
| # Extract text from all pages | |
| for i in range(len(self.document)): | |
| page = self.document[i] | |
| result[i] = page.get_text() | |
| else: | |
| # Extract text from specified pages | |
| for i in page_numbers: | |
| if 0 <= i < len(self.document): | |
| page = self.document[i] | |
| result[i] = page.get_text() | |
| return result | |
| def get_layers(self) -> List[str]: | |
| """Get available layers (OCGs - Optional Content Groups) in the PDF.""" | |
| try: | |
| # Get OCGs with proper approach for the current PyMuPDF version | |
| # Try layer_ui_configs as fallback | |
| if hasattr(self.document, "layer_ui_configs"): | |
| configs = self.document.layer_ui_configs() | |
| if configs: | |
| return [cfg.get('text', f"Layer_{i}") for i, cfg in enumerate(configs)] | |
| # Fall back to direct layers attribute | |
| elif hasattr(self.document, "layers"): | |
| return [layer for layer in self.document.layers] | |
| # Return empty list if no layers found | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting layers: {e}") | |
| return [] | |
| def _get_ocg_xrefs(self) -> List[int]: | |
| """Get xrefs for all OCGs in the document.""" | |
| try: | |
| # Check if PDF has an OCG structure | |
| catalog = self.document.pdf_catalog() | |
| if not catalog or "OCProperties" not in catalog: | |
| return [] | |
| # Use xref querying for the most reliable method | |
| ocg_xrefs = [] | |
| for xref in range(1, self.document.xref_length()): | |
| try: | |
| obj = self.document.xref_object(xref, compressed=True) | |
| if obj and obj.startswith(b"/Type/OCG"): | |
| ocg_xrefs.append(xref) | |
| except: | |
| continue | |
| return ocg_xrefs | |
| except Exception as e: | |
| print(f"Error getting OCG xrefs: {e}") | |
| return [] | |
| def get_layer_configs(self) -> List[Dict[str, Any]]: | |
| """Get full layer configuration data including state information.""" | |
| try: | |
| # First use get_ocgs if available | |
| # if hasattr(self.document, "get_ocgs"): | |
| # ocg_list = self.document.get_ocgs() | |
| # if ocg_list: | |
| # configs = [] | |
| # for xref, name in ocg_list: | |
| # # Try to get OCG info for each layer | |
| # try: | |
| # ocg_info = {"id": xref, "name": name} | |
| # if hasattr(self.document, "get_oc"): | |
| # # Try to get additional OCG properties | |
| # details = self.document.get_oc(xref) | |
| # if details: | |
| # ocg_info.update(details) | |
| # configs.append(ocg_info) | |
| # except Exception as e: | |
| # # If error, still include basic info | |
| # print(f"Error getting layer config for {name}: {e}") | |
| # configs.append({"id": xref, "name": name}) | |
| # return configs | |
| # Try layer_ui_configs method | |
| if hasattr(self.document, "layer_ui_configs"): | |
| #print(self.document.layer_ui_configs()) | |
| return self.document.layer_ui_configs() | |
| # Fallback: manually build configs from xrefs | |
| ocg_xrefs = self._get_ocg_xrefs() | |
| if ocg_xrefs: | |
| configs = [] | |
| for xref in ocg_xrefs: | |
| # Try to get name and other properties | |
| try: | |
| if hasattr(self.document, "get_oc"): | |
| # Try to get OCG details | |
| ocg_info = self.document.get_oc(xref) | |
| if ocg_info: | |
| configs.append(ocg_info) | |
| else: | |
| configs.append({"id": xref, "name": f"Layer_{xref}"}) | |
| else: | |
| configs.append({"id": xref, "name": f"Layer_{xref}"}) | |
| except Exception as e: | |
| print(f"Error getting OCG details for {xref}: {e}") | |
| configs.append({"id": xref, "name": f"Layer_{xref}"}) | |
| return configs | |
| # No layers found | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting layer configs: {e}") | |
| return [] | |
| def _get_layer_number(self, layer_name: str) -> int: | |
| """Get the internal number/id of a layer by its name.""" | |
| try: | |
| # Try get_ocgs approach first | |
| # if hasattr(self.document, "get_ocgs"): | |
| # ocgs = self.document.get_ocgs() | |
| # for xref, name in ocgs: | |
| # if name == layer_name: | |
| # return xref | |
| # Try manual config comparison | |
| configs = self.get_layer_configs() | |
| for cfg in configs: | |
| # Check for name in different possible formats | |
| if ((cfg.get('name') == layer_name) or | |
| (cfg.get('text') == layer_name)): | |
| # Return id/number based on format | |
| return cfg.get('id', cfg.get('number', -1)) | |
| # If not found, check the OCG xrefs manually | |
| # ocg_xrefs = self._get_ocg_xrefs() | |
| # for xref in ocg_xrefs: | |
| # try: | |
| # if hasattr(self.document, "get_oc"): | |
| # ocg_info = self.document.get_oc(xref) | |
| # if ocg_info and ocg_info.get("name") == layer_name: | |
| # return xref | |
| # except: | |
| # continue | |
| # No matching layer found | |
| return -1 | |
| except Exception as e: | |
| print(f"Error getting layer number: {e}") | |
| return -1 | |
| def extract_layer_text(self, layer_name: Optional[str] = None) -> Dict[str, str]: | |
| """Extract text from a specific layer or all layers.""" | |
| result = {} | |
| layers = self.get_layers() | |
| if not layers: | |
| return {"error": "No layers found in document"} | |
| # If layer_name is None, extract from all layers | |
| layers_to_extract = [layer_name] if layer_name else layers | |
| for layer in layers_to_extract: | |
| try: | |
| # Create a temporary copy of the document with only this layer visible | |
| temp_doc = fitz.open() | |
| # For each page in the original document | |
| for page_num in range(len(self.document)): | |
| # Get page with only this layer visible and add it to the temp document | |
| pix = self._get_page_with_layer(page_num, layer) | |
| temp_page = temp_doc.new_page(width=self.document[page_num].rect.width, | |
| height=self.document[page_num].rect.height) | |
| temp_page.insert_image(temp_page.rect, stream=pix.tobytes("png")) | |
| # Now extract text from this filtered document | |
| layer_text = "" | |
| for page_num in range(len(temp_doc)): | |
| page = temp_doc[page_num] | |
| text = page.get_text() | |
| if text and text.strip(): | |
| layer_text += f"--- Page {page_num + 1} ---\n" | |
| layer_text += text | |
| layer_text += "\n\n" | |
| result[layer] = layer_text | |
| # Clean up | |
| temp_doc.close() | |
| except Exception as e: | |
| result[layer] = f"Error extracting layer {layer}: {str(e)}" | |
| return result | |
| def _get_page_with_layer(self, page_num: int, layer_name: str,dpi: int = 150)-> fitz.Pixmap: | |
| """Helper method to get a page pixmap with only specified layer visible.""" | |
| # Create a temporary document with the same page | |
| #temp_doc = fitz.open() | |
| page = self.document[page_num] | |
| #temp_doc.insert_pdf(self.document, from_page=page_num, to_page=page_num,annots=True) | |
| # Get layer ID/xref | |
| layer_id = self._get_layer_number(layer_name) | |
| print(f"Layer ID for {layer_name}: {layer_id}") | |
| # Try to set layer visibility | |
| if layer_id != -1: | |
| try: | |
| #Get all OCGs | |
| # if hasattr(self.document, "get_ocgs"): | |
| # ocgs = self.document.get_ocgs() | |
| # # Attempt to set OCG states - hide all, show only target layer | |
| # for xref, name in ocgs: | |
| # try: | |
| # # Use set_ocg_state if available | |
| # if hasattr(self.document, "set_ocg_state"): | |
| # # Turn off all ocgs first | |
| # self.document.set_ocg_state(xref, 0) # 0 = OFF | |
| # except: | |
| # pass | |
| # # Now turn on our target layer | |
| # try: | |
| # if hasattr(self.document, "set_ocg_state"): | |
| # self.document.set_ocg_state(layer_id, 1) # 1 = ON | |
| # except: | |
| # pass | |
| # Try older APIs if ocg methods don't work | |
| if hasattr(self.document, "set_layer_ui_config"): | |
| # Use older layer APIs as fallback | |
| if hasattr(self.document, "layer_ui_configs"): | |
| # Hide all layers first | |
| print(self.document.layer_ui_configs()) | |
| for l in self.document.layer_ui_configs(): | |
| print(l) | |
| self.document.set_layer_ui_config(l['number'], pymupdf.PDF_OC_OFF) | |
| # Show only the requested layer | |
| self.document.set_layer_ui_config(layer_name, 1) | |
| except Exception as e: | |
| print(f"Error setting layer visibility: {e}") | |
| # Get the page with filtered content | |
| temp_page = self.document[0] | |
| pix = temp_page.get_pixmap(alpha=False,dpi=dpi) | |
| # Clean up | |
| #temp_doc.close() | |
| return pix | |
| def extract_layer_to_pdf(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str: | |
| """ | |
| Extract a specific layer to a new PDF file. | |
| Args: | |
| layer_name: Name of the layer to extract | |
| output_path: Path to save the output PDF. If None, generate a name based on layer. | |
| Returns: | |
| Path to the saved PDF file | |
| """ | |
| layers = self.get_layers() | |
| if not layers: | |
| raise ValueError("No layers found in the document") | |
| if layer_name not in layers: | |
| raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}") | |
| if output_path is None: | |
| # Generate output filename based on input filename and layer name | |
| base_dir = os.path.dirname(self.pdf_path) | |
| base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] | |
| # Clean up layer name to be used as filename | |
| safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name) | |
| output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.pdf") | |
| # Create a new PDF document | |
| new_doc = fitz.open() | |
| # Copy each page with only the selected layer visible | |
| for page_num in range(len(self.document)): | |
| try: | |
| # Get page with only this layer visible | |
| pix = self._get_page_with_layer(page_num, layer_name,dpi) | |
| # Add to new document | |
| new_page = new_doc.new_page(width=self.document[page_num].rect.width, | |
| height=self.document[page_num].rect.height) | |
| new_page.insert_image(new_page.rect, stream=pix.tobytes("png")) | |
| except Exception as e: | |
| print(f"Error processing page {page_num} for layer {layer_name}: {e}") | |
| # Continue with next page | |
| # Save the new document | |
| new_doc.save(output_path) | |
| new_doc.close() | |
| return output_path | |
| def extract_layer_to_png(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str: | |
| """ | |
| Extract a specific layer to a new PDF file. | |
| Args: | |
| layer_name: Name of the layer to extract | |
| output_path: Path to save the output PDF. If None, generate a name based on layer. | |
| Returns: | |
| Path to the saved PDF file | |
| """ | |
| layers = self.get_layers() | |
| if not layers: | |
| raise ValueError("No layers found in the document") | |
| if layer_name not in layers: | |
| raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}") | |
| if output_path is None: | |
| # Generate output filename based on input filename and layer name | |
| base_dir = os.path.dirname(self.pdf_path) | |
| base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] | |
| # Clean up layer name to be used as filename | |
| safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name) | |
| output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.png") | |
| # Create a new PDF document | |
| #new_doc = fitz.open() | |
| # Copy each page with only the selected layer visible | |
| for page_num in range(len(self.document)): | |
| try: | |
| # Get page with only this layer visible | |
| pix = self._get_page_with_layer(page_num, layer_name,dpi) | |
| pix.pil_save(output_path) | |
| # Add to new document | |
| #new_page = new_doc.new_page(width=self.document[page_num].rect.width, | |
| # height=self.document[page_num].rect.height) | |
| #new_page.insert_image(new_page.rect, stream=pix.tobytes("png")) | |
| except Exception as e: | |
| print(f"Error processing page {page_num} for layer {layer_name}: {e}") | |
| # Continue with next page | |
| # Save the new document | |
| #new_doc.save(output_path) | |
| #new_doc.close() | |
| return output_path | |
| def extract_all_layers_to_pdfs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]: | |
| """ | |
| Extract each layer to a separate PDF file. | |
| Args: | |
| output_dir: Directory to save the output PDFs. If None, use the same directory as the input file. | |
| Returns: | |
| Dictionary mapping layer names to saved PDF file paths | |
| """ | |
| result = {} | |
| layers = self.get_layers() | |
| if not layers: | |
| return result | |
| # Set output directory | |
| if output_dir is None: | |
| output_dir = os.path.dirname(self.pdf_path) | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Extract each layer to a separate PDF | |
| for layer in layers: | |
| try: | |
| # Clean up layer name to be used as filename | |
| safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer) | |
| base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] | |
| output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf") | |
| # Extract the layer | |
| saved_path = self.extract_layer_to_pdf(layer, output_path) | |
| result[layer] = saved_path | |
| except Exception as e: | |
| print(f"Error extracting layer {layer}: {e}") | |
| return result | |
| def extract_all_layers_to_pngs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]: | |
| """ | |
| Extract each layer to a separate PDF file. | |
| Args: | |
| output_dir: Directory to save the output PDFs. If None, use the same directory as the input file. | |
| Returns: | |
| Dictionary mapping layer names to saved PDF file paths | |
| """ | |
| result = {} | |
| layers = self.get_layers() | |
| if not layers: | |
| return result | |
| # Set output directory | |
| if output_dir is None: | |
| output_dir = os.path.dirname(self.pdf_path) | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Extract each layer to a separate PDF | |
| for layer in layers: | |
| try: | |
| # Clean up layer name to be used as filename | |
| safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer) | |
| base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] | |
| output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf") | |
| # Extract the layer | |
| saved_path = self.extract_layer_to_png(layer, output_path) | |
| result[layer] = saved_path | |
| except Exception as e: | |
| print(f"Error extracting layer {layer}: {e}") | |
| return result | |
| def close(self) -> None: | |
| """Close the PDF document.""" | |
| self.document.close() | |
| def remove_text(self, layer_name: Optional[str] = None, output_path: Optional[str] = None) -> str: | |
| """ | |
| Remove all text from a specific layer or all layers and save to a new PDF. | |
| Args: | |
| layer_name: Name of the layer to remove text from. If None, removes text from all layers. | |
| output_path: Path to save the modified PDF. If None, generates a path based on the input file. | |
| Returns: | |
| The path to the saved PDF file. | |
| """ | |
| if output_path is None: | |
| # Generate output filename based on input filename | |
| base_dir = os.path.dirname(self.pdf_path) | |
| base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] | |
| output_path = os.path.join(base_dir, f"{base_name}_no_text.pdf") | |
| # Create a copy of the document to work on | |
| doc_copy = fitz.open(self.pdf_path) | |
| # Check if we need to work with layers | |
| layers_to_process = [] | |
| if hasattr(doc_copy, "layers") and doc_copy.layers: | |
| if layer_name: | |
| if layer_name in doc_copy.layers: | |
| layers_to_process = [layer_name] | |
| else: | |
| raise ValueError(f"Layer '{layer_name}' not found in the document") | |
| else: | |
| layers_to_process = doc_copy.layers | |
| try: | |
| # Process layers if available | |
| if layers_to_process: | |
| for layer in layers_to_process: | |
| # Hide all layers first | |
| for l in doc_copy.layers: | |
| doc_copy.set_layer(l, False) | |
| # Show only the target layer | |
| doc_copy.set_layer(layer, True) | |
| # Process each page with the current layer configuration | |
| for page_num in range(len(doc_copy)): | |
| page = doc_copy[page_num] | |
| self._remove_text_from_page(page) | |
| else: | |
| # No layers - process all pages | |
| for page_num in range(len(doc_copy)): | |
| page = doc_copy[page_num] | |
| self._remove_text_from_page(page) | |
| # Save the modified document | |
| doc_copy.save(output_path) | |
| return output_path | |
| finally: | |
| # Always close the document copy | |
| doc_copy.close() | |
| def _remove_text_from_page(self, page): | |
| """Helper method to remove text from a page using alternative methods.""" | |
| try: | |
| # Method 1: Using search_for with text content | |
| text = page.get_text() | |
| if text and text.strip(): | |
| # Try different search methods if text exists on the page | |
| # Option 1: Try searching for individual words | |
| words = text.split() | |
| for word in words: | |
| if word.strip(): | |
| text_instances = page.search_for(word.strip()) | |
| if text_instances: | |
| for inst in text_instances: | |
| page.add_redact_annot(inst, text=" ") | |
| # Apply all redactions at once | |
| page.apply_redactions() | |
| # Option 2: If words don't work, try direct rectangle redaction | |
| text_blocks = page.get_text("blocks") | |
| for block in text_blocks: | |
| if len(block) >= 4: # Make sure the block has a rectangle | |
| rect = fitz.Rect(block[:4]) | |
| annot = page.add_redact_annot(rect, text=" ") | |
| page.apply_redactions() | |
| except Exception as e: | |
| # Log the error but continue processing other pages | |
| print(f"Error removing text from page: {e}") | |
| # Fallback: Try using a different approach with rectangles | |
| try: | |
| # Get all text blocks and redact them by their rectangle | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if "lines" in block: | |
| for line in block["lines"]: | |
| if "spans" in line: | |
| for span in line["spans"]: | |
| if "origin" in span and "size" in span: | |
| x0 = span["origin"][0] | |
| y0 = span["origin"][1] | |
| x1 = x0 + span["size"][0] | |
| y1 = y0 + span["size"][1] | |
| rect = fitz.Rect(x0, y0, x1, y1) | |
| annot = page.add_redact_annot(rect, text=" ") | |
| page.apply_redactions() | |
| except Exception as inner_e: | |
| print(f"Fallback text removal failed: {inner_e}") | |
| def extract_images(self, page_numbers: Optional[List[int]] = None) -> Dict[str, bytes]: | |
| """Extract images from specified pages or all pages.""" | |
| result = {} | |
| image_count = 0 | |
| if page_numbers is None: | |
| # Extract images from all pages | |
| page_numbers = range(len(self.document)) | |
| for page_num in page_numbers: | |
| if 0 <= page_num < len(self.document): | |
| page = self.document[page_num] | |
| image_list = page.get_images(full=True) | |
| # Process each image on the page | |
| for img_idx, img_info in enumerate(image_list): | |
| xref = img_info[0] # Cross-reference number | |
| # Try to extract the image | |
| try: | |
| base_image = self.document.extract_image(xref) | |
| if base_image: | |
| image_data = base_image["image"] | |
| if base_image.get("ext"): | |
| image_ext = base_image.get("ext") | |
| else: | |
| image_ext = "png" | |
| image_name = f"page{page_num+1}_img{img_idx+1}.{image_ext}" | |
| result[image_name] = image_data | |
| image_count += 1 | |
| except Exception as e: | |
| print(f"Error extracting image: {e}") | |
| return result | |
| def extract_images_from_layer(self, layer_name: Optional[str] = None) -> Dict[str, bytes]: | |
| """Extract images from a specific layer or all layers.""" | |
| result = {} | |
| if not hasattr(self.document, "layers") or not self.document.layers: | |
| # Fallback to regular image extraction if no layers | |
| return self.extract_images() | |
| # Determine which layers to process | |
| layers_to_extract = [layer_name] if layer_name else self.document.layers | |
| # Extract images from each selected layer | |
| for layer in layers_to_extract: | |
| try: | |
| # Hide all layers first | |
| for l in self.document.layers: | |
| self.document.set_layer(l, False) | |
| # Show only the requested layer | |
| self.document.set_layer(layer, True) | |
| # Extract images with this layer configuration | |
| for page_num in range(len(self.document)): | |
| page = self.document[page_num] | |
| image_list = page.get_images(full=True) | |
| # Process each image on the page | |
| for img_idx, img_info in enumerate(image_list): | |
| xref = img_info[0] # Cross-reference number | |
| try: | |
| base_image = self.document.extract_image(xref) | |
| if base_image: | |
| image_data = base_image["image"] | |
| image_ext = base_image.get("ext", "png") | |
| image_name = f"layer_{layer}_page{page_num+1}_img{img_idx+1}.{image_ext}" | |
| result[image_name] = image_data | |
| except Exception as e: | |
| print(f"Error extracting image from layer {layer}: {e}") | |
| except Exception as e: | |
| print(f"Error processing layer {layer}: {e}") | |
| return result | |
| def save_images(self, images: Dict[str, bytes], output_dir: str) -> List[str]: | |
| """Save extracted images to the specified directory.""" | |
| saved_paths = [] | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Save each image | |
| for img_name, img_data in images.items(): | |
| try: | |
| img_path = os.path.join(output_dir, img_name) | |
| with open(img_path, "wb") as img_file: | |
| img_file.write(img_data) | |
| saved_paths.append(img_path) | |
| except Exception as e: | |
| print(f"Error saving image {img_name}: {e}") | |
| return saved_paths | |