PDF-Redaction-API / app /redaction.py
Sammi1211's picture
adding url support
af107f1
"""
PDF Redaction module using NER
"""
from pdf2image import convert_from_path
import pytesseract
from pypdf import PdfReader, PdfWriter
from pypdf.generic import DictionaryObject, ArrayObject, NameObject, NumberObject
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class PDFRedactor:
"""PDF Redaction using Named Entity Recognition"""
def __init__(self, model_name: str = "./model"):
"""
Initialize the PDF Redactor
Args:
model_name: HuggingFace model name for NER
"""
self.model_name = model_name
self.ner_pipeline = None
self._load_model()
def _load_model(self):
"""Load the NER model"""
try:
logger.info(f"Loading NER model: {self.model_name}")
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModelForTokenClassification.from_pretrained(self.model_name)
self.ner_pipeline = pipeline("token-classification", model=model,
tokenizer=tokenizer)
logger.info("NER model loaded successfully")
except Exception as e:
logger.error(f"Error loading NER model: {str(e)}")
raise
def is_model_loaded(self) -> bool:
"""Check if the model is loaded"""
return self.ner_pipeline is not None
def perform_ocr(self, pdf_path: str, dpi: int = 300) -> List[Dict]:
"""
Perform OCR on PDF and extract word bounding boxes
Args:
pdf_path: Path to the PDF file
dpi: DPI for PDF to image conversion
Returns:
List of word data with bounding boxes and image dimensions
"""
logger.info(f"Starting OCR on {pdf_path} at {dpi} DPI")
all_words_data = []
try:
images = convert_from_path(pdf_path, dpi=dpi)
logger.info(f"Converted PDF to {len(images)} images")
for page_num, image in enumerate(images):
# Get image dimensions
image_width, image_height = image.size
# Perform OCR
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
num_words = len(data['text'])
for i in range(num_words):
word_text = data['text'][i].strip()
confidence = int(data['conf'][i])
# Filter out empty or low-confidence words
if word_text and confidence > 0:
all_words_data.append({
'text': word_text,
'box': (data['left'][i], data['top'][i],
data['width'][i], data['height'][i]),
'page': page_num + 1,
'confidence': confidence,
'image_width': image_width,
'image_height': image_height
})
logger.info(f"Processed page {page_num + 1}: {len([w for w in all_words_data if w['page'] == page_num + 1])} words")
logger.info(f"OCR complete: {len(all_words_data)} total words extracted")
return all_words_data
except Exception as e:
logger.error(f"Error during OCR: {str(e)}")
raise
def run_ner(self, text: str) -> List[Dict]:
"""
Run NER on text
Args:
text: Input text
Returns:
List of identified entities
"""
if not self.ner_pipeline:
raise RuntimeError("NER model not loaded")
logger.info(f"Running NER on text of length {len(text)}")
try:
results = self.ner_pipeline(text)
logger.info(f"NER identified {len(results)} entities")
return results
except Exception as e:
logger.error(f"Error during NER: {str(e)}")
raise
def map_entities_to_boxes(self, ner_results: List[Dict],
ocr_data: List[Dict]) -> List[Dict]:
"""
Map NER entities to OCR bounding boxes
Args:
ner_results: List of NER entities
ocr_data: List of OCR word data
Returns:
List of mapped entities with bounding boxes
"""
logger.info("Mapping NER entities to OCR bounding boxes")
mapped_entities = []
# Create character span mapping
ocr_word_char_spans = []
current_char_index = 0
for ocr_data_idx, word_info in enumerate(ocr_data):
word_text = word_info['text']
length = len(word_text)
ocr_word_char_spans.append({
'ocr_data_idx': ocr_data_idx,
'start_char': current_char_index,
'end_char': current_char_index + length
})
current_char_index += length + 1
# Map each NER entity to OCR words
for ner_entity in ner_results:
ner_entity_type = ner_entity['entity']
ner_start = ner_entity['start']
ner_end = ner_entity['end']
ner_word = ner_entity['word']
matching_ocr_words = []
for ocr_word_span in ocr_word_char_spans:
ocr_start = ocr_word_span['start_char']
ocr_end = ocr_word_span['end_char']
# Check for overlap
if max(ocr_start, ner_start) < min(ocr_end, ner_end):
matching_ocr_words.append(ocr_data[ocr_word_span['ocr_data_idx']])
if matching_ocr_words:
mapped_entities.append({
'entity_type': ner_entity_type,
'entity_text': ner_word,
'words': matching_ocr_words
})
logger.info(f"Mapped {len(mapped_entities)} entities to bounding boxes")
return mapped_entities
def create_redacted_pdf(self, original_pdf_path: str,
mapped_entities: List[Dict],
output_path: str) -> str:
"""
Create redacted PDF with black rectangles over entities
Args:
original_pdf_path: Path to original PDF
mapped_entities: List of entities with bounding boxes
output_path: Path for output PDF
Returns:
Path to redacted PDF
"""
logger.info(f"Creating redacted PDF: {output_path}")
try:
reader = PdfReader(original_pdf_path)
writer = PdfWriter()
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
media_box = page.mediabox
page_width = float(media_box.width)
page_height = float(media_box.height)
writer.add_page(page)
page_entities = 0
for entity_info in mapped_entities:
for word_info in entity_info['words']:
if word_info['page'] == page_num + 1:
x, y, w, h = word_info['box']
# Get image dimensions
image_width = word_info['image_width']
image_height = word_info['image_height']
# Scale coordinates
scale_x = page_width / image_width
scale_y = page_height / image_height
x_scaled = x * scale_x
y_scaled = y * scale_y
w_scaled = w * scale_x
h_scaled = h * scale_y
# Convert to PDF coordinates
llx = x_scaled
lly = page_height - (y_scaled + h_scaled)
urx = x_scaled + w_scaled
ury = page_height - y_scaled
# Create redaction annotation
redaction_annotation = DictionaryObject()
redaction_annotation.update({
NameObject("/Type"): NameObject("/Annot"),
NameObject("/Subtype"): NameObject("/Square"),
NameObject("/Rect"): ArrayObject([
NumberObject(llx),
NumberObject(lly),
NumberObject(urx),
NumberObject(ury),
]),
NameObject("/C"): ArrayObject([
NumberObject(0), NumberObject(0), NumberObject(0)
]),
NameObject("/IC"): ArrayObject([
NumberObject(0), NumberObject(0), NumberObject(0)
]),
NameObject("/BS"): DictionaryObject({
NameObject("/W"): NumberObject(0)
})
})
writer.add_annotation(page_number=page_num,
annotation=redaction_annotation)
page_entities += 1
logger.info(f"Page {page_num + 1}: Added {page_entities} redactions")
# Write output
with open(output_path, "wb") as output_file:
writer.write(output_file)
logger.info(f"Redacted PDF created successfully: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating redacted PDF: {str(e)}")
raise
def redact_document(self, pdf_path: str, output_path: str,
dpi: int = 300,
entity_filter: Optional[List[str]] = None) -> Dict:
"""
Complete redaction pipeline
Args:
pdf_path: Path to input PDF
output_path: Path for output PDF
dpi: DPI for OCR
entity_filter: List of entity types to redact (None = all)
Returns:
Dictionary with redaction results
"""
logger.info(f"Starting redaction pipeline for {pdf_path}")
# Step 1: OCR
ocr_data = self.perform_ocr(pdf_path, dpi)
# Step 2: Extract text
full_text = " ".join([word['text'] for word in ocr_data])
# Step 3: NER
ner_results = self.run_ner(full_text)
# Step 4: Map entities to boxes
mapped_entities = self.map_entities_to_boxes(ner_results, ocr_data)
# Step 5: Filter entities if requested
if entity_filter:
mapped_entities = [
e for e in mapped_entities
if e['entity_type'] in entity_filter
]
logger.info(f"Filtered to {len(mapped_entities)} entities of types: {entity_filter}")
# Step 6: Create redacted PDF
self.create_redacted_pdf(pdf_path, mapped_entities, output_path)
return {
'output_path': output_path,
'total_words': len(ocr_data),
'total_entities': len(ner_results),
'redacted_entities': len(mapped_entities),
'entities': mapped_entities
}