pdf_layers_extractor / pdf_extractor.py
namtr92's picture
Upload 4 files
25fbdee verified
import fitz # PyMuPDF
import re
import os
import pymupdf
from typing import List, Dict, Any, Optional
class PDFExtractor:
"""Class for extracting content from PDF files."""
def __init__(self, pdf_path: str):
"""Initialize with the path to a PDF file."""
self.pdf_path = pdf_path
self.document = fitz.open(pdf_path)
def count_pages(self) -> int:
"""Return the number of pages in the PDF."""
return len(self.document)
def get_metadata(self) -> Dict[str, Any]:
"""Extract metadata from the PDF."""
metadata = self.document.metadata
return metadata
def get_text(self, page_numbers: Optional[List[int]] = None) -> Dict[int, str]:
"""Extract text from specified pages or all pages."""
result = {}
if page_numbers is None:
# Extract text from all pages
for i in range(len(self.document)):
page = self.document[i]
result[i] = page.get_text()
else:
# Extract text from specified pages
for i in page_numbers:
if 0 <= i < len(self.document):
page = self.document[i]
result[i] = page.get_text()
return result
def get_layers(self) -> List[str]:
"""Get available layers (OCGs - Optional Content Groups) in the PDF."""
try:
# Get OCGs with proper approach for the current PyMuPDF version
# Try layer_ui_configs as fallback
if hasattr(self.document, "layer_ui_configs"):
configs = self.document.layer_ui_configs()
if configs:
return [cfg.get('text', f"Layer_{i}") for i, cfg in enumerate(configs)]
# Fall back to direct layers attribute
elif hasattr(self.document, "layers"):
return [layer for layer in self.document.layers]
# Return empty list if no layers found
return []
except Exception as e:
print(f"Error getting layers: {e}")
return []
def _get_ocg_xrefs(self) -> List[int]:
"""Get xrefs for all OCGs in the document."""
try:
# Check if PDF has an OCG structure
catalog = self.document.pdf_catalog()
if not catalog or "OCProperties" not in catalog:
return []
# Use xref querying for the most reliable method
ocg_xrefs = []
for xref in range(1, self.document.xref_length()):
try:
obj = self.document.xref_object(xref, compressed=True)
if obj and obj.startswith(b"/Type/OCG"):
ocg_xrefs.append(xref)
except:
continue
return ocg_xrefs
except Exception as e:
print(f"Error getting OCG xrefs: {e}")
return []
def get_layer_configs(self) -> List[Dict[str, Any]]:
"""Get full layer configuration data including state information."""
try:
# First use get_ocgs if available
# if hasattr(self.document, "get_ocgs"):
# ocg_list = self.document.get_ocgs()
# if ocg_list:
# configs = []
# for xref, name in ocg_list:
# # Try to get OCG info for each layer
# try:
# ocg_info = {"id": xref, "name": name}
# if hasattr(self.document, "get_oc"):
# # Try to get additional OCG properties
# details = self.document.get_oc(xref)
# if details:
# ocg_info.update(details)
# configs.append(ocg_info)
# except Exception as e:
# # If error, still include basic info
# print(f"Error getting layer config for {name}: {e}")
# configs.append({"id": xref, "name": name})
# return configs
# Try layer_ui_configs method
if hasattr(self.document, "layer_ui_configs"):
#print(self.document.layer_ui_configs())
return self.document.layer_ui_configs()
# Fallback: manually build configs from xrefs
ocg_xrefs = self._get_ocg_xrefs()
if ocg_xrefs:
configs = []
for xref in ocg_xrefs:
# Try to get name and other properties
try:
if hasattr(self.document, "get_oc"):
# Try to get OCG details
ocg_info = self.document.get_oc(xref)
if ocg_info:
configs.append(ocg_info)
else:
configs.append({"id": xref, "name": f"Layer_{xref}"})
else:
configs.append({"id": xref, "name": f"Layer_{xref}"})
except Exception as e:
print(f"Error getting OCG details for {xref}: {e}")
configs.append({"id": xref, "name": f"Layer_{xref}"})
return configs
# No layers found
return []
except Exception as e:
print(f"Error getting layer configs: {e}")
return []
def _get_layer_number(self, layer_name: str) -> int:
"""Get the internal number/id of a layer by its name."""
try:
# Try get_ocgs approach first
# if hasattr(self.document, "get_ocgs"):
# ocgs = self.document.get_ocgs()
# for xref, name in ocgs:
# if name == layer_name:
# return xref
# Try manual config comparison
configs = self.get_layer_configs()
for cfg in configs:
# Check for name in different possible formats
if ((cfg.get('name') == layer_name) or
(cfg.get('text') == layer_name)):
# Return id/number based on format
return cfg.get('id', cfg.get('number', -1))
# If not found, check the OCG xrefs manually
# ocg_xrefs = self._get_ocg_xrefs()
# for xref in ocg_xrefs:
# try:
# if hasattr(self.document, "get_oc"):
# ocg_info = self.document.get_oc(xref)
# if ocg_info and ocg_info.get("name") == layer_name:
# return xref
# except:
# continue
# No matching layer found
return -1
except Exception as e:
print(f"Error getting layer number: {e}")
return -1
def extract_layer_text(self, layer_name: Optional[str] = None) -> Dict[str, str]:
"""Extract text from a specific layer or all layers."""
result = {}
layers = self.get_layers()
if not layers:
return {"error": "No layers found in document"}
# If layer_name is None, extract from all layers
layers_to_extract = [layer_name] if layer_name else layers
for layer in layers_to_extract:
try:
# Create a temporary copy of the document with only this layer visible
temp_doc = fitz.open()
# For each page in the original document
for page_num in range(len(self.document)):
# Get page with only this layer visible and add it to the temp document
pix = self._get_page_with_layer(page_num, layer)
temp_page = temp_doc.new_page(width=self.document[page_num].rect.width,
height=self.document[page_num].rect.height)
temp_page.insert_image(temp_page.rect, stream=pix.tobytes("png"))
# Now extract text from this filtered document
layer_text = ""
for page_num in range(len(temp_doc)):
page = temp_doc[page_num]
text = page.get_text()
if text and text.strip():
layer_text += f"--- Page {page_num + 1} ---\n"
layer_text += text
layer_text += "\n\n"
result[layer] = layer_text
# Clean up
temp_doc.close()
except Exception as e:
result[layer] = f"Error extracting layer {layer}: {str(e)}"
return result
def _get_page_with_layer(self, page_num: int, layer_name: str,dpi: int = 150)-> fitz.Pixmap:
"""Helper method to get a page pixmap with only specified layer visible."""
# Create a temporary document with the same page
#temp_doc = fitz.open()
page = self.document[page_num]
#temp_doc.insert_pdf(self.document, from_page=page_num, to_page=page_num,annots=True)
# Get layer ID/xref
layer_id = self._get_layer_number(layer_name)
print(f"Layer ID for {layer_name}: {layer_id}")
# Try to set layer visibility
if layer_id != -1:
try:
#Get all OCGs
# if hasattr(self.document, "get_ocgs"):
# ocgs = self.document.get_ocgs()
# # Attempt to set OCG states - hide all, show only target layer
# for xref, name in ocgs:
# try:
# # Use set_ocg_state if available
# if hasattr(self.document, "set_ocg_state"):
# # Turn off all ocgs first
# self.document.set_ocg_state(xref, 0) # 0 = OFF
# except:
# pass
# # Now turn on our target layer
# try:
# if hasattr(self.document, "set_ocg_state"):
# self.document.set_ocg_state(layer_id, 1) # 1 = ON
# except:
# pass
# Try older APIs if ocg methods don't work
if hasattr(self.document, "set_layer_ui_config"):
# Use older layer APIs as fallback
if hasattr(self.document, "layer_ui_configs"):
# Hide all layers first
print(self.document.layer_ui_configs())
for l in self.document.layer_ui_configs():
print(l)
self.document.set_layer_ui_config(l['number'], pymupdf.PDF_OC_OFF)
# Show only the requested layer
self.document.set_layer_ui_config(layer_name, 1)
except Exception as e:
print(f"Error setting layer visibility: {e}")
# Get the page with filtered content
temp_page = self.document[0]
pix = temp_page.get_pixmap(alpha=False,dpi=dpi)
# Clean up
#temp_doc.close()
return pix
def extract_layer_to_pdf(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str:
"""
Extract a specific layer to a new PDF file.
Args:
layer_name: Name of the layer to extract
output_path: Path to save the output PDF. If None, generate a name based on layer.
Returns:
Path to the saved PDF file
"""
layers = self.get_layers()
if not layers:
raise ValueError("No layers found in the document")
if layer_name not in layers:
raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}")
if output_path is None:
# Generate output filename based on input filename and layer name
base_dir = os.path.dirname(self.pdf_path)
base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
# Clean up layer name to be used as filename
safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name)
output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.pdf")
# Create a new PDF document
new_doc = fitz.open()
# Copy each page with only the selected layer visible
for page_num in range(len(self.document)):
try:
# Get page with only this layer visible
pix = self._get_page_with_layer(page_num, layer_name,dpi)
# Add to new document
new_page = new_doc.new_page(width=self.document[page_num].rect.width,
height=self.document[page_num].rect.height)
new_page.insert_image(new_page.rect, stream=pix.tobytes("png"))
except Exception as e:
print(f"Error processing page {page_num} for layer {layer_name}: {e}")
# Continue with next page
# Save the new document
new_doc.save(output_path)
new_doc.close()
return output_path
def extract_layer_to_png(self, layer_name: str, output_path: Optional[str] = None,dpi: int = 150) -> str:
"""
Extract a specific layer to a new PDF file.
Args:
layer_name: Name of the layer to extract
output_path: Path to save the output PDF. If None, generate a name based on layer.
Returns:
Path to the saved PDF file
"""
layers = self.get_layers()
if not layers:
raise ValueError("No layers found in the document")
if layer_name not in layers:
raise ValueError(f"Layer '{layer_name}' not found in the document. Available layers: {', '.join(layers)}")
if output_path is None:
# Generate output filename based on input filename and layer name
base_dir = os.path.dirname(self.pdf_path)
base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
# Clean up layer name to be used as filename
safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer_name)
output_path = os.path.join(base_dir, f"{base_name}_layer_{safe_layer_name}.png")
# Create a new PDF document
#new_doc = fitz.open()
# Copy each page with only the selected layer visible
for page_num in range(len(self.document)):
try:
# Get page with only this layer visible
pix = self._get_page_with_layer(page_num, layer_name,dpi)
pix.pil_save(output_path)
# Add to new document
#new_page = new_doc.new_page(width=self.document[page_num].rect.width,
# height=self.document[page_num].rect.height)
#new_page.insert_image(new_page.rect, stream=pix.tobytes("png"))
except Exception as e:
print(f"Error processing page {page_num} for layer {layer_name}: {e}")
# Continue with next page
# Save the new document
#new_doc.save(output_path)
#new_doc.close()
return output_path
def extract_all_layers_to_pdfs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]:
"""
Extract each layer to a separate PDF file.
Args:
output_dir: Directory to save the output PDFs. If None, use the same directory as the input file.
Returns:
Dictionary mapping layer names to saved PDF file paths
"""
result = {}
layers = self.get_layers()
if not layers:
return result
# Set output directory
if output_dir is None:
output_dir = os.path.dirname(self.pdf_path)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Extract each layer to a separate PDF
for layer in layers:
try:
# Clean up layer name to be used as filename
safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer)
base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf")
# Extract the layer
saved_path = self.extract_layer_to_pdf(layer, output_path)
result[layer] = saved_path
except Exception as e:
print(f"Error extracting layer {layer}: {e}")
return result
def extract_all_layers_to_pngs(self, output_dir: Optional[str] = None,dpi: int = 150) -> Dict[str, str]:
"""
Extract each layer to a separate PDF file.
Args:
output_dir: Directory to save the output PDFs. If None, use the same directory as the input file.
Returns:
Dictionary mapping layer names to saved PDF file paths
"""
result = {}
layers = self.get_layers()
if not layers:
return result
# Set output directory
if output_dir is None:
output_dir = os.path.dirname(self.pdf_path)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Extract each layer to a separate PDF
for layer in layers:
try:
# Clean up layer name to be used as filename
safe_layer_name = "".join(c if c.isalnum() else "_" for c in layer)
base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
output_path = os.path.join(output_dir, f"{base_name}_layer_{safe_layer_name}.pdf")
# Extract the layer
saved_path = self.extract_layer_to_png(layer, output_path)
result[layer] = saved_path
except Exception as e:
print(f"Error extracting layer {layer}: {e}")
return result
def close(self) -> None:
"""Close the PDF document."""
self.document.close()
def remove_text(self, layer_name: Optional[str] = None, output_path: Optional[str] = None) -> str:
"""
Remove all text from a specific layer or all layers and save to a new PDF.
Args:
layer_name: Name of the layer to remove text from. If None, removes text from all layers.
output_path: Path to save the modified PDF. If None, generates a path based on the input file.
Returns:
The path to the saved PDF file.
"""
if output_path is None:
# Generate output filename based on input filename
base_dir = os.path.dirname(self.pdf_path)
base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
output_path = os.path.join(base_dir, f"{base_name}_no_text.pdf")
# Create a copy of the document to work on
doc_copy = fitz.open(self.pdf_path)
# Check if we need to work with layers
layers_to_process = []
if hasattr(doc_copy, "layers") and doc_copy.layers:
if layer_name:
if layer_name in doc_copy.layers:
layers_to_process = [layer_name]
else:
raise ValueError(f"Layer '{layer_name}' not found in the document")
else:
layers_to_process = doc_copy.layers
try:
# Process layers if available
if layers_to_process:
for layer in layers_to_process:
# Hide all layers first
for l in doc_copy.layers:
doc_copy.set_layer(l, False)
# Show only the target layer
doc_copy.set_layer(layer, True)
# Process each page with the current layer configuration
for page_num in range(len(doc_copy)):
page = doc_copy[page_num]
self._remove_text_from_page(page)
else:
# No layers - process all pages
for page_num in range(len(doc_copy)):
page = doc_copy[page_num]
self._remove_text_from_page(page)
# Save the modified document
doc_copy.save(output_path)
return output_path
finally:
# Always close the document copy
doc_copy.close()
def _remove_text_from_page(self, page):
"""Helper method to remove text from a page using alternative methods."""
try:
# Method 1: Using search_for with text content
text = page.get_text()
if text and text.strip():
# Try different search methods if text exists on the page
# Option 1: Try searching for individual words
words = text.split()
for word in words:
if word.strip():
text_instances = page.search_for(word.strip())
if text_instances:
for inst in text_instances:
page.add_redact_annot(inst, text=" ")
# Apply all redactions at once
page.apply_redactions()
# Option 2: If words don't work, try direct rectangle redaction
text_blocks = page.get_text("blocks")
for block in text_blocks:
if len(block) >= 4: # Make sure the block has a rectangle
rect = fitz.Rect(block[:4])
annot = page.add_redact_annot(rect, text=" ")
page.apply_redactions()
except Exception as e:
# Log the error but continue processing other pages
print(f"Error removing text from page: {e}")
# Fallback: Try using a different approach with rectangles
try:
# Get all text blocks and redact them by their rectangle
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if "lines" in block:
for line in block["lines"]:
if "spans" in line:
for span in line["spans"]:
if "origin" in span and "size" in span:
x0 = span["origin"][0]
y0 = span["origin"][1]
x1 = x0 + span["size"][0]
y1 = y0 + span["size"][1]
rect = fitz.Rect(x0, y0, x1, y1)
annot = page.add_redact_annot(rect, text=" ")
page.apply_redactions()
except Exception as inner_e:
print(f"Fallback text removal failed: {inner_e}")
def extract_images(self, page_numbers: Optional[List[int]] = None) -> Dict[str, bytes]:
"""Extract images from specified pages or all pages."""
result = {}
image_count = 0
if page_numbers is None:
# Extract images from all pages
page_numbers = range(len(self.document))
for page_num in page_numbers:
if 0 <= page_num < len(self.document):
page = self.document[page_num]
image_list = page.get_images(full=True)
# Process each image on the page
for img_idx, img_info in enumerate(image_list):
xref = img_info[0] # Cross-reference number
# Try to extract the image
try:
base_image = self.document.extract_image(xref)
if base_image:
image_data = base_image["image"]
if base_image.get("ext"):
image_ext = base_image.get("ext")
else:
image_ext = "png"
image_name = f"page{page_num+1}_img{img_idx+1}.{image_ext}"
result[image_name] = image_data
image_count += 1
except Exception as e:
print(f"Error extracting image: {e}")
return result
def extract_images_from_layer(self, layer_name: Optional[str] = None) -> Dict[str, bytes]:
"""Extract images from a specific layer or all layers."""
result = {}
if not hasattr(self.document, "layers") or not self.document.layers:
# Fallback to regular image extraction if no layers
return self.extract_images()
# Determine which layers to process
layers_to_extract = [layer_name] if layer_name else self.document.layers
# Extract images from each selected layer
for layer in layers_to_extract:
try:
# Hide all layers first
for l in self.document.layers:
self.document.set_layer(l, False)
# Show only the requested layer
self.document.set_layer(layer, True)
# Extract images with this layer configuration
for page_num in range(len(self.document)):
page = self.document[page_num]
image_list = page.get_images(full=True)
# Process each image on the page
for img_idx, img_info in enumerate(image_list):
xref = img_info[0] # Cross-reference number
try:
base_image = self.document.extract_image(xref)
if base_image:
image_data = base_image["image"]
image_ext = base_image.get("ext", "png")
image_name = f"layer_{layer}_page{page_num+1}_img{img_idx+1}.{image_ext}"
result[image_name] = image_data
except Exception as e:
print(f"Error extracting image from layer {layer}: {e}")
except Exception as e:
print(f"Error processing layer {layer}: {e}")
return result
def save_images(self, images: Dict[str, bytes], output_dir: str) -> List[str]:
"""Save extracted images to the specified directory."""
saved_paths = []
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Save each image
for img_name, img_data in images.items():
try:
img_path = os.path.join(output_dir, img_name)
with open(img_path, "wb") as img_file:
img_file.write(img_data)
saved_paths.append(img_path)
except Exception as e:
print(f"Error saving image {img_name}: {e}")
return saved_paths