""" PDF Redaction module using NER """ from pdf2image import convert_from_path import pytesseract from pypdf import PdfReader, PdfWriter from pypdf.generic import DictionaryObject, ArrayObject, NameObject, NumberObject from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification from typing import List, Dict, Optional import logging logger = logging.getLogger(__name__) class PDFRedactor: """PDF Redaction using Named Entity Recognition""" def __init__(self, model_name: str = "./model"): """ Initialize the PDF Redactor Args: model_name: HuggingFace model name for NER """ self.model_name = model_name self.ner_pipeline = None self._load_model() def _load_model(self): """Load the NER model""" try: logger.info(f"Loading NER model: {self.model_name}") tokenizer = AutoTokenizer.from_pretrained(self.model_name) model = AutoModelForTokenClassification.from_pretrained(self.model_name) self.ner_pipeline = pipeline("token-classification", model=model, tokenizer=tokenizer) logger.info("NER model loaded successfully") except Exception as e: logger.error(f"Error loading NER model: {str(e)}") raise def is_model_loaded(self) -> bool: """Check if the model is loaded""" return self.ner_pipeline is not None def perform_ocr(self, pdf_path: str, dpi: int = 300) -> List[Dict]: """ Perform OCR on PDF and extract word bounding boxes Args: pdf_path: Path to the PDF file dpi: DPI for PDF to image conversion Returns: List of word data with bounding boxes and image dimensions """ logger.info(f"Starting OCR on {pdf_path} at {dpi} DPI") all_words_data = [] try: images = convert_from_path(pdf_path, dpi=dpi) logger.info(f"Converted PDF to {len(images)} images") for page_num, image in enumerate(images): # Get image dimensions image_width, image_height = image.size # Perform OCR data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) num_words = len(data['text']) for i in range(num_words): word_text = data['text'][i].strip() confidence = int(data['conf'][i]) # Filter out empty or low-confidence words if word_text and confidence > 0: all_words_data.append({ 'text': word_text, 'box': (data['left'][i], data['top'][i], data['width'][i], data['height'][i]), 'page': page_num + 1, 'confidence': confidence, 'image_width': image_width, 'image_height': image_height }) logger.info(f"Processed page {page_num + 1}: {len([w for w in all_words_data if w['page'] == page_num + 1])} words") logger.info(f"OCR complete: {len(all_words_data)} total words extracted") return all_words_data except Exception as e: logger.error(f"Error during OCR: {str(e)}") raise def run_ner(self, text: str) -> List[Dict]: """ Run NER on text Args: text: Input text Returns: List of identified entities """ if not self.ner_pipeline: raise RuntimeError("NER model not loaded") logger.info(f"Running NER on text of length {len(text)}") try: results = self.ner_pipeline(text) logger.info(f"NER identified {len(results)} entities") return results except Exception as e: logger.error(f"Error during NER: {str(e)}") raise def map_entities_to_boxes(self, ner_results: List[Dict], ocr_data: List[Dict]) -> List[Dict]: """ Map NER entities to OCR bounding boxes Args: ner_results: List of NER entities ocr_data: List of OCR word data Returns: List of mapped entities with bounding boxes """ logger.info("Mapping NER entities to OCR bounding boxes") mapped_entities = [] # Create character span mapping ocr_word_char_spans = [] current_char_index = 0 for ocr_data_idx, word_info in enumerate(ocr_data): word_text = word_info['text'] length = len(word_text) ocr_word_char_spans.append({ 'ocr_data_idx': ocr_data_idx, 'start_char': current_char_index, 'end_char': current_char_index + length }) current_char_index += length + 1 # Map each NER entity to OCR words for ner_entity in ner_results: ner_entity_type = ner_entity['entity'] ner_start = ner_entity['start'] ner_end = ner_entity['end'] ner_word = ner_entity['word'] matching_ocr_words = [] for ocr_word_span in ocr_word_char_spans: ocr_start = ocr_word_span['start_char'] ocr_end = ocr_word_span['end_char'] # Check for overlap if max(ocr_start, ner_start) < min(ocr_end, ner_end): matching_ocr_words.append(ocr_data[ocr_word_span['ocr_data_idx']]) if matching_ocr_words: mapped_entities.append({ 'entity_type': ner_entity_type, 'entity_text': ner_word, 'words': matching_ocr_words }) logger.info(f"Mapped {len(mapped_entities)} entities to bounding boxes") return mapped_entities def create_redacted_pdf(self, original_pdf_path: str, mapped_entities: List[Dict], output_path: str) -> str: """ Create redacted PDF with black rectangles over entities Args: original_pdf_path: Path to original PDF mapped_entities: List of entities with bounding boxes output_path: Path for output PDF Returns: Path to redacted PDF """ logger.info(f"Creating redacted PDF: {output_path}") try: reader = PdfReader(original_pdf_path) writer = PdfWriter() for page_num in range(len(reader.pages)): page = reader.pages[page_num] media_box = page.mediabox page_width = float(media_box.width) page_height = float(media_box.height) writer.add_page(page) page_entities = 0 for entity_info in mapped_entities: for word_info in entity_info['words']: if word_info['page'] == page_num + 1: x, y, w, h = word_info['box'] # Get image dimensions image_width = word_info['image_width'] image_height = word_info['image_height'] # Scale coordinates scale_x = page_width / image_width scale_y = page_height / image_height x_scaled = x * scale_x y_scaled = y * scale_y w_scaled = w * scale_x h_scaled = h * scale_y # Convert to PDF coordinates llx = x_scaled lly = page_height - (y_scaled + h_scaled) urx = x_scaled + w_scaled ury = page_height - y_scaled # Create redaction annotation redaction_annotation = DictionaryObject() redaction_annotation.update({ NameObject("/Type"): NameObject("/Annot"), NameObject("/Subtype"): NameObject("/Square"), NameObject("/Rect"): ArrayObject([ NumberObject(llx), NumberObject(lly), NumberObject(urx), NumberObject(ury), ]), NameObject("/C"): ArrayObject([ NumberObject(0), NumberObject(0), NumberObject(0) ]), NameObject("/IC"): ArrayObject([ NumberObject(0), NumberObject(0), NumberObject(0) ]), NameObject("/BS"): DictionaryObject({ NameObject("/W"): NumberObject(0) }) }) writer.add_annotation(page_number=page_num, annotation=redaction_annotation) page_entities += 1 logger.info(f"Page {page_num + 1}: Added {page_entities} redactions") # Write output with open(output_path, "wb") as output_file: writer.write(output_file) logger.info(f"Redacted PDF created successfully: {output_path}") return output_path except Exception as e: logger.error(f"Error creating redacted PDF: {str(e)}") raise def redact_document(self, pdf_path: str, output_path: str, dpi: int = 300, entity_filter: Optional[List[str]] = None) -> Dict: """ Complete redaction pipeline Args: pdf_path: Path to input PDF output_path: Path for output PDF dpi: DPI for OCR entity_filter: List of entity types to redact (None = all) Returns: Dictionary with redaction results """ logger.info(f"Starting redaction pipeline for {pdf_path}") # Step 1: OCR ocr_data = self.perform_ocr(pdf_path, dpi) # Step 2: Extract text full_text = " ".join([word['text'] for word in ocr_data]) # Step 3: NER ner_results = self.run_ner(full_text) # Step 4: Map entities to boxes mapped_entities = self.map_entities_to_boxes(ner_results, ocr_data) # Step 5: Filter entities if requested if entity_filter: mapped_entities = [ e for e in mapped_entities if e['entity_type'] in entity_filter ] logger.info(f"Filtered to {len(mapped_entities)} entities of types: {entity_filter}") # Step 6: Create redacted PDF self.create_redacted_pdf(pdf_path, mapped_entities, output_path) return { 'output_path': output_path, 'total_words': len(ocr_data), 'total_entities': len(ner_results), 'redacted_entities': len(mapped_entities), 'entities': mapped_entities }