| """ |
| PDF Redaction module using NER |
| """ |
| from pdf2image import convert_from_path |
| import pytesseract |
| from pypdf import PdfReader, PdfWriter |
| from pypdf.generic import DictionaryObject, ArrayObject, NameObject, NumberObject |
| from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification |
| from typing import List, Dict, Optional |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PDFRedactor: |
| """PDF Redaction using Named Entity Recognition""" |
| |
| def __init__(self, model_name: str = "./model"): |
| """ |
| Initialize the PDF Redactor |
| |
| Args: |
| model_name: HuggingFace model name for NER |
| """ |
| self.model_name = model_name |
| self.ner_pipeline = None |
| self._load_model() |
| |
| def _load_model(self): |
| """Load the NER model""" |
| try: |
| logger.info(f"Loading NER model: {self.model_name}") |
| tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
| model = AutoModelForTokenClassification.from_pretrained(self.model_name) |
|
|
| self.ner_pipeline = pipeline("token-classification", model=model, |
| tokenizer=tokenizer) |
| logger.info("NER model loaded successfully") |
| except Exception as e: |
| logger.error(f"Error loading NER model: {str(e)}") |
| raise |
| |
| def is_model_loaded(self) -> bool: |
| """Check if the model is loaded""" |
| return self.ner_pipeline is not None |
| |
| def perform_ocr(self, pdf_path: str, dpi: int = 300) -> List[Dict]: |
| """ |
| Perform OCR on PDF and extract word bounding boxes |
| |
| Args: |
| pdf_path: Path to the PDF file |
| dpi: DPI for PDF to image conversion |
| |
| Returns: |
| List of word data with bounding boxes and image dimensions |
| """ |
| logger.info(f"Starting OCR on {pdf_path} at {dpi} DPI") |
| all_words_data = [] |
| |
| try: |
| images = convert_from_path(pdf_path, dpi=dpi) |
| logger.info(f"Converted PDF to {len(images)} images") |
| |
| for page_num, image in enumerate(images): |
| |
| image_width, image_height = image.size |
| |
| |
| data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) |
| |
| num_words = len(data['text']) |
| for i in range(num_words): |
| word_text = data['text'][i].strip() |
| confidence = int(data['conf'][i]) |
| |
| |
| if word_text and confidence > 0: |
| all_words_data.append({ |
| 'text': word_text, |
| 'box': (data['left'][i], data['top'][i], |
| data['width'][i], data['height'][i]), |
| 'page': page_num + 1, |
| 'confidence': confidence, |
| 'image_width': image_width, |
| 'image_height': image_height |
| }) |
| |
| logger.info(f"Processed page {page_num + 1}: {len([w for w in all_words_data if w['page'] == page_num + 1])} words") |
| |
| logger.info(f"OCR complete: {len(all_words_data)} total words extracted") |
| return all_words_data |
| |
| except Exception as e: |
| logger.error(f"Error during OCR: {str(e)}") |
| raise |
| |
| def run_ner(self, text: str) -> List[Dict]: |
| """ |
| Run NER on text |
| |
| Args: |
| text: Input text |
| |
| Returns: |
| List of identified entities |
| """ |
| if not self.ner_pipeline: |
| raise RuntimeError("NER model not loaded") |
| |
| logger.info(f"Running NER on text of length {len(text)}") |
| |
| try: |
| results = self.ner_pipeline(text) |
| logger.info(f"NER identified {len(results)} entities") |
| return results |
| except Exception as e: |
| logger.error(f"Error during NER: {str(e)}") |
| raise |
| |
| def map_entities_to_boxes(self, ner_results: List[Dict], |
| ocr_data: List[Dict]) -> List[Dict]: |
| """ |
| Map NER entities to OCR bounding boxes |
| |
| Args: |
| ner_results: List of NER entities |
| ocr_data: List of OCR word data |
| |
| Returns: |
| List of mapped entities with bounding boxes |
| """ |
| logger.info("Mapping NER entities to OCR bounding boxes") |
| mapped_entities = [] |
| |
| |
| ocr_word_char_spans = [] |
| current_char_index = 0 |
| |
| for ocr_data_idx, word_info in enumerate(ocr_data): |
| word_text = word_info['text'] |
| length = len(word_text) |
| |
| ocr_word_char_spans.append({ |
| 'ocr_data_idx': ocr_data_idx, |
| 'start_char': current_char_index, |
| 'end_char': current_char_index + length |
| }) |
| current_char_index += length + 1 |
| |
| |
| for ner_entity in ner_results: |
| ner_entity_type = ner_entity['entity'] |
| ner_start = ner_entity['start'] |
| ner_end = ner_entity['end'] |
| ner_word = ner_entity['word'] |
| |
| matching_ocr_words = [] |
| |
| for ocr_word_span in ocr_word_char_spans: |
| ocr_start = ocr_word_span['start_char'] |
| ocr_end = ocr_word_span['end_char'] |
| |
| |
| if max(ocr_start, ner_start) < min(ocr_end, ner_end): |
| matching_ocr_words.append(ocr_data[ocr_word_span['ocr_data_idx']]) |
| |
| if matching_ocr_words: |
| mapped_entities.append({ |
| 'entity_type': ner_entity_type, |
| 'entity_text': ner_word, |
| 'words': matching_ocr_words |
| }) |
| |
| logger.info(f"Mapped {len(mapped_entities)} entities to bounding boxes") |
| return mapped_entities |
| |
| def create_redacted_pdf(self, original_pdf_path: str, |
| mapped_entities: List[Dict], |
| output_path: str) -> str: |
| """ |
| Create redacted PDF with black rectangles over entities |
| |
| Args: |
| original_pdf_path: Path to original PDF |
| mapped_entities: List of entities with bounding boxes |
| output_path: Path for output PDF |
| |
| Returns: |
| Path to redacted PDF |
| """ |
| logger.info(f"Creating redacted PDF: {output_path}") |
| |
| try: |
| reader = PdfReader(original_pdf_path) |
| writer = PdfWriter() |
| |
| for page_num in range(len(reader.pages)): |
| page = reader.pages[page_num] |
| media_box = page.mediabox |
| page_width = float(media_box.width) |
| page_height = float(media_box.height) |
| |
| writer.add_page(page) |
| |
| page_entities = 0 |
| for entity_info in mapped_entities: |
| for word_info in entity_info['words']: |
| if word_info['page'] == page_num + 1: |
| x, y, w, h = word_info['box'] |
| |
| |
| image_width = word_info['image_width'] |
| image_height = word_info['image_height'] |
| |
| |
| scale_x = page_width / image_width |
| scale_y = page_height / image_height |
| |
| x_scaled = x * scale_x |
| y_scaled = y * scale_y |
| w_scaled = w * scale_x |
| h_scaled = h * scale_y |
| |
| |
| llx = x_scaled |
| lly = page_height - (y_scaled + h_scaled) |
| urx = x_scaled + w_scaled |
| ury = page_height - y_scaled |
| |
| |
| redaction_annotation = DictionaryObject() |
| redaction_annotation.update({ |
| NameObject("/Type"): NameObject("/Annot"), |
| NameObject("/Subtype"): NameObject("/Square"), |
| NameObject("/Rect"): ArrayObject([ |
| NumberObject(llx), |
| NumberObject(lly), |
| NumberObject(urx), |
| NumberObject(ury), |
| ]), |
| NameObject("/C"): ArrayObject([ |
| NumberObject(0), NumberObject(0), NumberObject(0) |
| ]), |
| NameObject("/IC"): ArrayObject([ |
| NumberObject(0), NumberObject(0), NumberObject(0) |
| ]), |
| NameObject("/BS"): DictionaryObject({ |
| NameObject("/W"): NumberObject(0) |
| }) |
| }) |
| |
| writer.add_annotation(page_number=page_num, |
| annotation=redaction_annotation) |
| page_entities += 1 |
| |
| logger.info(f"Page {page_num + 1}: Added {page_entities} redactions") |
| |
| |
| with open(output_path, "wb") as output_file: |
| writer.write(output_file) |
| |
| logger.info(f"Redacted PDF created successfully: {output_path}") |
| return output_path |
| |
| except Exception as e: |
| logger.error(f"Error creating redacted PDF: {str(e)}") |
| raise |
| |
| def redact_document(self, pdf_path: str, output_path: str, |
| dpi: int = 300, |
| entity_filter: Optional[List[str]] = None) -> Dict: |
| """ |
| Complete redaction pipeline |
| |
| Args: |
| pdf_path: Path to input PDF |
| output_path: Path for output PDF |
| dpi: DPI for OCR |
| entity_filter: List of entity types to redact (None = all) |
| |
| Returns: |
| Dictionary with redaction results |
| """ |
| logger.info(f"Starting redaction pipeline for {pdf_path}") |
| |
| |
| ocr_data = self.perform_ocr(pdf_path, dpi) |
| |
| |
| full_text = " ".join([word['text'] for word in ocr_data]) |
| |
| |
| ner_results = self.run_ner(full_text) |
| |
| |
| mapped_entities = self.map_entities_to_boxes(ner_results, ocr_data) |
| |
| |
| if entity_filter: |
| mapped_entities = [ |
| e for e in mapped_entities |
| if e['entity_type'] in entity_filter |
| ] |
| logger.info(f"Filtered to {len(mapped_entities)} entities of types: {entity_filter}") |
| |
| |
| self.create_redacted_pdf(pdf_path, mapped_entities, output_path) |
| |
| return { |
| 'output_path': output_path, |
| 'total_words': len(ocr_data), |
| 'total_entities': len(ner_results), |
| 'redacted_entities': len(mapped_entities), |
| 'entities': mapped_entities |
| } |
|
|