import fitz # PyMuPDF import re import os import pymupdf from typing import List, Dict, Any, Optional class PDFExtractor: """Class for extracting content from PDF files.""" def __init__(self, pdf_path: str): """Initialize with the path to a PDF file.""" self.pdf_path = pdf_path self.document = fitz.open(pdf_path) def count_pages(self) -> int: """Return the number of pages in the PDF.""" return len(self.document) def get_metadata(self) -> Dict[str, Any]: """Extract metadata from the PDF.""" metadata = self.document.metadata return metadata def get_text(self, page_numbers: Optional[List[int]] = None) -> Dict[int, str]: """Extract text from specified pages or all pages.""" result = {} if page_numbers is None: # Extract text from all pages for i in range(len(self.document)): page = self.document[i] result[i] = page.get_text() else: # Extract text from specified pages for i in page_numbers: if 0 <= i < len(self.document): page = self.document[i] result[i] = page.get_text() return result def get_layers(self) -> List[str]: """Get available layers (OCGs - Optional Content Groups) in the PDF.""" try: # Get OCGs with proper approach for the current PyMuPDF version # Try layer_ui_configs as fallback if hasattr(self.document, "layer_ui_configs"): configs = self.document.layer_ui_configs() if configs: return [cfg.get('text', f"Layer_{i}") for i, cfg in enumerate(configs)] # Fall back to direct layers attribute elif hasattr(self.document, "layers"): return [layer for layer in self.document.layers] # Return empty list if no layers found return [] except Exception as e: print(f"Error getting layers: {e}") return [] def _get_ocg_xrefs(self) -> List[int]: """Get xrefs for all OCGs in the document.""" try: # Check if PDF has an OCG structure catalog = self.document.pdf_catalog() if not catalog or "OCProperties" not in catalog: return [] # Use xref querying for the most reliable method ocg_xrefs = [] for xref in range(1, self.document.xref_length()): try: obj = self.document.xref_object(xref, compressed=True) if obj and obj.startswith(b"/Type/OCG"): ocg_xrefs.append(xref) except: continue return ocg_xrefs except Exception as e: print(f"Error getting OCG xrefs: {e}") return [] def get_layer_configs(self) -> List[Dict[str, Any]]: """Get full layer configuration data including state information.""" try: # First use get_ocgs if available # if hasattr(self.document, "get_ocgs"): # ocg_list = self.document.get_ocgs() # if ocg_list: # configs = [] # for xref, name in ocg_list: # # Try to get OCG info for each layer # try: # ocg_info = {"id": xref, "name": name} # if hasattr(self.document, "get_oc"): # # Try to get additional OCG properties # details = self.document.get_oc(xref) # if details: # ocg_info.update(details) # configs.append(ocg_info) # except Exception as e: # # If error, still include basic info # print(f"Error getting layer config for {name}: {e}") # configs.append({"id": xref, "name": name}) # return configs # Try layer_ui_configs method if hasattr(self.document, "layer_ui_configs"): #print(self.document.layer_ui_configs()) return self.document.layer_ui_configs() # Fallback: manually build configs from xrefs ocg_xrefs = self._get_ocg_xrefs() if ocg_xrefs: configs = [] for xref in ocg_xrefs: # Try to get name and other properties try: if hasattr(self.document, "get_oc"): # Try to get OCG details ocg_info = self.document.get_oc(xref) if ocg_info: configs.append(ocg_info) else: configs.append({"id": xref, "name": f"Layer_{xref}"}) else: configs.append({"id": xref, "name": f"Layer_{xref}"}) except Exception as e: print(f"Error getting OCG details for {xref}: {e}") configs.append({"id": xref, "name": f"Layer_{xref}"}) return configs # No layers found return [] except Exception as e: print(f"Error getting layer configs: {e}") return [] def _get_layer_number(self, layer_name: str) -> int: """Get the internal number/id of a layer by its name.""" try: # Try get_ocgs approach first # if hasattr(self.document, "get_ocgs"): # ocgs = self.document.get_ocgs() # for xref, name in ocgs: # if name == layer_name: # return xref # Try manual config comparison configs = self.get_layer_configs() for cfg in configs: # Check for name in different possible formats if ((cfg.get('name') == layer_name) or (cfg.get('text') == layer_name)): # Return id/number based on format return cfg.get('id', cfg.get('number', -1)) # If not found, check the OCG xrefs manually # ocg_xrefs = self._get_ocg_xrefs() # for xref in ocg_xrefs: # try: # if hasattr(self.document, "get_oc"): # ocg_info = self.document.get_oc(xref) # if ocg_info and ocg_info.get("name") == layer_name: # return xref # except: # continue # No matching layer found return -1 except Exception as e: print(f"Error getting layer number: {e}") return -1 def extract_layer_text(self, layer_name: Optional[str] = None) -> Dict[str, str]: """Extract text from a specific layer or all layers.""" result = {} layers = self.get_layers() if not layers: return {"error": "No layers found in document"} # If layer_name is None, extract from all layers layers_to_extract = [layer_name] if layer_name else layers for layer in layers_to_extract: try: # Create a temporary copy of the document with only this layer visible temp_doc = fitz.open() # For each page in the original document for page_num in range(len(self.document)): # Get page with only this layer visible and add it to the temp document pix = self._get_page_with_layer(page_num, layer) temp_page = temp_doc.new_page(width=self.document[page_num].rect.width, height=self.document[page_num].rect.height) temp_page.insert_image(temp_page.rect, stream=pix.tobytes("png")) # Now extract text from this filtered document layer_text = "" for page_num in range(len(temp_doc)): page = temp_doc[page_num] text = page.get_text() if text and text.strip(): layer_text += f"--- Page {page_num + 1} ---\n" layer_text += text layer_text += "\n\n" result[layer] = layer_text # Clean up temp_doc.close() except Exception as e: result[layer] = f"Error extracting layer {layer}: {str(e)}" return result def _get_page_with_layer(self, page_num: int, layer_name: str,dpi: int = 150)-> fitz.Pixmap: """Helper method to get a page pixmap with only specified layer visible.""" # Create a temporary document with the same page #temp_doc = fitz.open() page = self.document[page_num] #temp_doc.insert_pdf(self.document, from_page=page_num, to_page=page_num,annots=True) # Get layer ID/xref layer_id = self._get_layer_number(layer_name) print(f"Layer ID for {layer_name}: {layer_id}") # Try to set layer visibility if layer_id != -1: try: #Get all OCGs # if hasattr(self.document, "get_ocgs"): # ocgs = self.document.get_ocgs() # # Attempt to set OCG states - hide all, show only target layer # for xref, name in ocgs: # try: # # Use set_ocg_state if available # if hasattr(self.document, "set_ocg_state"): # # Turn off all ocgs first # self.document.set_ocg_state(xref, 0) # 0 = OFF # except: # pass # # Now turn on our target layer # try: # if hasattr(self.document, "set_ocg_state"): # self.document.set_ocg_state(layer_id, 1) # 1 = ON # except: # pass # Try older APIs if ocg methods don't work if hasattr(self.document, "set_layer_ui_config"): # Use older layer APIs as fallback if hasattr(self.document, "layer_ui_configs"): # Hide all layers first print(self.document.layer_ui_configs()) for l in self.document.layer_ui_configs(): print(l) self.document.set_layer_ui_config(l['number'], pymupdf.PDF_OC_OFF) # Show only the requested layer self.document.set_layer_ui_config(layer_name, 1) except Exception as e: print(f"Error setting layer visibility: {e}") # Get the page with filtered content temp_page = self.document[0] pix = temp_page.get_pixmap(alpha=False,dpi=dpi) # Clean up #temp_doc.close() return pix def extract_layer_to_pdf(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str: """ Extract a specific layer to a new PDF file. Args: layer_name: Name of the layer to extract output_path: Path to save the output PDF. If None, generate a name based on layer. Returns: Path to the saved PDF file """ layers = self.get_layers() if not layers: raise ValueError("No layers found in the document") if layer_name not in layers: raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}") if output_path is None: # Generate output filename based on input filename and layer name base_dir = os.path.dirname(self.pdf_path) base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] # Clean up layer name to be used as filename safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name) output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.pdf") # Create a new PDF document new_doc = fitz.open() # Copy each page with only the selected layer visible for page_num in range(len(self.document)): try: # Get page with only this layer visible pix = self._get_page_with_layer(page_num, layer_name,dpi) # Add to new document new_page = new_doc.new_page(width=self.document[page_num].rect.width, height=self.document[page_num].rect.height) new_page.insert_image(new_page.rect, stream=pix.tobytes("png")) except Exception as e: print(f"Error processing page {page_num} for layer {layer_name}: {e}") # Continue with next page # Save the new document new_doc.save(output_path) new_doc.close() return output_path def extract_layer_to_png(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str: """ Extract a specific layer to a new PDF file. Args: layer_name: Name of the layer to extract output_path: Path to save the output PDF. If None, generate a name based on layer. Returns: Path to the saved PDF file """ layers = self.get_layers() if not layers: raise ValueError("No layers found in the document") if layer_name not in layers: raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}") if output_path is None: # Generate output filename based on input filename and layer name base_dir = os.path.dirname(self.pdf_path) base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] # Clean up layer name to be used as filename safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name) output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.png") # Create a new PDF document #new_doc = fitz.open() # Copy each page with only the selected layer visible for page_num in range(len(self.document)): try: # Get page with only this layer visible pix = self._get_page_with_layer(page_num, layer_name,dpi) pix.pil_save(output_path) # Add to new document #new_page = new_doc.new_page(width=self.document[page_num].rect.width, # height=self.document[page_num].rect.height) #new_page.insert_image(new_page.rect, stream=pix.tobytes("png")) except Exception as e: print(f"Error processing page {page_num} for layer {layer_name}: {e}") # Continue with next page # Save the new document #new_doc.save(output_path) #new_doc.close() return output_path def extract_all_layers_to_pdfs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]: """ Extract each layer to a separate PDF file. Args: output_dir: Directory to save the output PDFs. If None, use the same directory as the input file. Returns: Dictionary mapping layer names to saved PDF file paths """ result = {} layers = self.get_layers() if not layers: return result # Set output directory if output_dir is None: output_dir = os.path.dirname(self.pdf_path) # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Extract each layer to a separate PDF for layer in layers: try: # Clean up layer name to be used as filename safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer) base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf") # Extract the layer saved_path = self.extract_layer_to_pdf(layer, output_path) result[layer] = saved_path except Exception as e: print(f"Error extracting layer {layer}: {e}") return result def extract_all_layers_to_pngs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]: """ Extract each layer to a separate PDF file. Args: output_dir: Directory to save the output PDFs. If None, use the same directory as the input file. Returns: Dictionary mapping layer names to saved PDF file paths """ result = {} layers = self.get_layers() if not layers: return result # Set output directory if output_dir is None: output_dir = os.path.dirname(self.pdf_path) # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Extract each layer to a separate PDF for layer in layers: try: # Clean up layer name to be used as filename safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer) base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf") # Extract the layer saved_path = self.extract_layer_to_png(layer, output_path) result[layer] = saved_path except Exception as e: print(f"Error extracting layer {layer}: {e}") return result def close(self) -> None: """Close the PDF document.""" self.document.close() def remove_text(self, layer_name: Optional[str] = None, output_path: Optional[str] = None) -> str: """ Remove all text from a specific layer or all layers and save to a new PDF. Args: layer_name: Name of the layer to remove text from. If None, removes text from all layers. output_path: Path to save the modified PDF. If None, generates a path based on the input file. Returns: The path to the saved PDF file. """ if output_path is None: # Generate output filename based on input filename base_dir = os.path.dirname(self.pdf_path) base_name = os.path.splitext(os.path.basename(self.pdf_path))[0] output_path = os.path.join(base_dir, f"{base_name}_no_text.pdf") # Create a copy of the document to work on doc_copy = fitz.open(self.pdf_path) # Check if we need to work with layers layers_to_process = [] if hasattr(doc_copy, "layers") and doc_copy.layers: if layer_name: if layer_name in doc_copy.layers: layers_to_process = [layer_name] else: raise ValueError(f"Layer '{layer_name}' not found in the document") else: layers_to_process = doc_copy.layers try: # Process layers if available if layers_to_process: for layer in layers_to_process: # Hide all layers first for l in doc_copy.layers: doc_copy.set_layer(l, False) # Show only the target layer doc_copy.set_layer(layer, True) # Process each page with the current layer configuration for page_num in range(len(doc_copy)): page = doc_copy[page_num] self._remove_text_from_page(page) else: # No layers - process all pages for page_num in range(len(doc_copy)): page = doc_copy[page_num] self._remove_text_from_page(page) # Save the modified document doc_copy.save(output_path) return output_path finally: # Always close the document copy doc_copy.close() def _remove_text_from_page(self, page): """Helper method to remove text from a page using alternative methods.""" try: # Method 1: Using search_for with text content text = page.get_text() if text and text.strip(): # Try different search methods if text exists on the page # Option 1: Try searching for individual words words = text.split() for word in words: if word.strip(): text_instances = page.search_for(word.strip()) if text_instances: for inst in text_instances: page.add_redact_annot(inst, text=" ") # Apply all redactions at once page.apply_redactions() # Option 2: If words don't work, try direct rectangle redaction text_blocks = page.get_text("blocks") for block in text_blocks: if len(block) >= 4: # Make sure the block has a rectangle rect = fitz.Rect(block[:4]) annot = page.add_redact_annot(rect, text=" ") page.apply_redactions() except Exception as e: # Log the error but continue processing other pages print(f"Error removing text from page: {e}") # Fallback: Try using a different approach with rectangles try: # Get all text blocks and redact them by their rectangle blocks = page.get_text("dict")["blocks"] for block in blocks: if "lines" in block: for line in block["lines"]: if "spans" in line: for span in line["spans"]: if "origin" in span and "size" in span: x0 = span["origin"][0] y0 = span["origin"][1] x1 = x0 + span["size"][0] y1 = y0 + span["size"][1] rect = fitz.Rect(x0, y0, x1, y1) annot = page.add_redact_annot(rect, text=" ") page.apply_redactions() except Exception as inner_e: print(f"Fallback text removal failed: {inner_e}") def extract_images(self, page_numbers: Optional[List[int]] = None) -> Dict[str, bytes]: """Extract images from specified pages or all pages.""" result = {} image_count = 0 if page_numbers is None: # Extract images from all pages page_numbers = range(len(self.document)) for page_num in page_numbers: if 0 <= page_num < len(self.document): page = self.document[page_num] image_list = page.get_images(full=True) # Process each image on the page for img_idx, img_info in enumerate(image_list): xref = img_info[0] # Cross-reference number # Try to extract the image try: base_image = self.document.extract_image(xref) if base_image: image_data = base_image["image"] if base_image.get("ext"): image_ext = base_image.get("ext") else: image_ext = "png" image_name = f"page{page_num+1}_img{img_idx+1}.{image_ext}" result[image_name] = image_data image_count += 1 except Exception as e: print(f"Error extracting image: {e}") return result def extract_images_from_layer(self, layer_name: Optional[str] = None) -> Dict[str, bytes]: """Extract images from a specific layer or all layers.""" result = {} if not hasattr(self.document, "layers") or not self.document.layers: # Fallback to regular image extraction if no layers return self.extract_images() # Determine which layers to process layers_to_extract = [layer_name] if layer_name else self.document.layers # Extract images from each selected layer for layer in layers_to_extract: try: # Hide all layers first for l in self.document.layers: self.document.set_layer(l, False) # Show only the requested layer self.document.set_layer(layer, True) # Extract images with this layer configuration for page_num in range(len(self.document)): page = self.document[page_num] image_list = page.get_images(full=True) # Process each image on the page for img_idx, img_info in enumerate(image_list): xref = img_info[0] # Cross-reference number try: base_image = self.document.extract_image(xref) if base_image: image_data = base_image["image"] image_ext = base_image.get("ext", "png") image_name = f"layer_{layer}_page{page_num+1}_img{img_idx+1}.{image_ext}" result[image_name] = image_data except Exception as e: print(f"Error extracting image from layer {layer}: {e}") except Exception as e: print(f"Error processing layer {layer}: {e}") return result def save_images(self, images: Dict[str, bytes], output_dir: str) -> List[str]: """Save extracted images to the specified directory.""" saved_paths = [] # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Save each image for img_name, img_data in images.items(): try: img_path = os.path.join(output_dir, img_name) with open(img_path, "wb") as img_file: img_file.write(img_data) saved_paths.append(img_path) except Exception as e: print(f"Error saving image {img_name}: {e}") return saved_paths