File size: 12,300 Bytes
af107f1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 | """
PDF Redaction module using NER
"""
from pdf2image import convert_from_path
import pytesseract
from pypdf import PdfReader, PdfWriter
from pypdf.generic import DictionaryObject, ArrayObject, NameObject, NumberObject
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class PDFRedactor:
"""PDF Redaction using Named Entity Recognition"""
def __init__(self, model_name: str = "./model"):
"""
Initialize the PDF Redactor
Args:
model_name: HuggingFace model name for NER
"""
self.model_name = model_name
self.ner_pipeline = None
self._load_model()
def _load_model(self):
"""Load the NER model"""
try:
logger.info(f"Loading NER model: {self.model_name}")
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModelForTokenClassification.from_pretrained(self.model_name)
self.ner_pipeline = pipeline("token-classification", model=model,
tokenizer=tokenizer)
logger.info("NER model loaded successfully")
except Exception as e:
logger.error(f"Error loading NER model: {str(e)}")
raise
def is_model_loaded(self) -> bool:
"""Check if the model is loaded"""
return self.ner_pipeline is not None
def perform_ocr(self, pdf_path: str, dpi: int = 300) -> List[Dict]:
"""
Perform OCR on PDF and extract word bounding boxes
Args:
pdf_path: Path to the PDF file
dpi: DPI for PDF to image conversion
Returns:
List of word data with bounding boxes and image dimensions
"""
logger.info(f"Starting OCR on {pdf_path} at {dpi} DPI")
all_words_data = []
try:
images = convert_from_path(pdf_path, dpi=dpi)
logger.info(f"Converted PDF to {len(images)} images")
for page_num, image in enumerate(images):
# Get image dimensions
image_width, image_height = image.size
# Perform OCR
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
num_words = len(data['text'])
for i in range(num_words):
word_text = data['text'][i].strip()
confidence = int(data['conf'][i])
# Filter out empty or low-confidence words
if word_text and confidence > 0:
all_words_data.append({
'text': word_text,
'box': (data['left'][i], data['top'][i],
data['width'][i], data['height'][i]),
'page': page_num + 1,
'confidence': confidence,
'image_width': image_width,
'image_height': image_height
})
logger.info(f"Processed page {page_num + 1}: {len([w for w in all_words_data if w['page'] == page_num + 1])} words")
logger.info(f"OCR complete: {len(all_words_data)} total words extracted")
return all_words_data
except Exception as e:
logger.error(f"Error during OCR: {str(e)}")
raise
def run_ner(self, text: str) -> List[Dict]:
"""
Run NER on text
Args:
text: Input text
Returns:
List of identified entities
"""
if not self.ner_pipeline:
raise RuntimeError("NER model not loaded")
logger.info(f"Running NER on text of length {len(text)}")
try:
results = self.ner_pipeline(text)
logger.info(f"NER identified {len(results)} entities")
return results
except Exception as e:
logger.error(f"Error during NER: {str(e)}")
raise
def map_entities_to_boxes(self, ner_results: List[Dict],
ocr_data: List[Dict]) -> List[Dict]:
"""
Map NER entities to OCR bounding boxes
Args:
ner_results: List of NER entities
ocr_data: List of OCR word data
Returns:
List of mapped entities with bounding boxes
"""
logger.info("Mapping NER entities to OCR bounding boxes")
mapped_entities = []
# Create character span mapping
ocr_word_char_spans = []
current_char_index = 0
for ocr_data_idx, word_info in enumerate(ocr_data):
word_text = word_info['text']
length = len(word_text)
ocr_word_char_spans.append({
'ocr_data_idx': ocr_data_idx,
'start_char': current_char_index,
'end_char': current_char_index + length
})
current_char_index += length + 1
# Map each NER entity to OCR words
for ner_entity in ner_results:
ner_entity_type = ner_entity['entity']
ner_start = ner_entity['start']
ner_end = ner_entity['end']
ner_word = ner_entity['word']
matching_ocr_words = []
for ocr_word_span in ocr_word_char_spans:
ocr_start = ocr_word_span['start_char']
ocr_end = ocr_word_span['end_char']
# Check for overlap
if max(ocr_start, ner_start) < min(ocr_end, ner_end):
matching_ocr_words.append(ocr_data[ocr_word_span['ocr_data_idx']])
if matching_ocr_words:
mapped_entities.append({
'entity_type': ner_entity_type,
'entity_text': ner_word,
'words': matching_ocr_words
})
logger.info(f"Mapped {len(mapped_entities)} entities to bounding boxes")
return mapped_entities
def create_redacted_pdf(self, original_pdf_path: str,
mapped_entities: List[Dict],
output_path: str) -> str:
"""
Create redacted PDF with black rectangles over entities
Args:
original_pdf_path: Path to original PDF
mapped_entities: List of entities with bounding boxes
output_path: Path for output PDF
Returns:
Path to redacted PDF
"""
logger.info(f"Creating redacted PDF: {output_path}")
try:
reader = PdfReader(original_pdf_path)
writer = PdfWriter()
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
media_box = page.mediabox
page_width = float(media_box.width)
page_height = float(media_box.height)
writer.add_page(page)
page_entities = 0
for entity_info in mapped_entities:
for word_info in entity_info['words']:
if word_info['page'] == page_num + 1:
x, y, w, h = word_info['box']
# Get image dimensions
image_width = word_info['image_width']
image_height = word_info['image_height']
# Scale coordinates
scale_x = page_width / image_width
scale_y = page_height / image_height
x_scaled = x * scale_x
y_scaled = y * scale_y
w_scaled = w * scale_x
h_scaled = h * scale_y
# Convert to PDF coordinates
llx = x_scaled
lly = page_height - (y_scaled + h_scaled)
urx = x_scaled + w_scaled
ury = page_height - y_scaled
# Create redaction annotation
redaction_annotation = DictionaryObject()
redaction_annotation.update({
NameObject("/Type"): NameObject("/Annot"),
NameObject("/Subtype"): NameObject("/Square"),
NameObject("/Rect"): ArrayObject([
NumberObject(llx),
NumberObject(lly),
NumberObject(urx),
NumberObject(ury),
]),
NameObject("/C"): ArrayObject([
NumberObject(0), NumberObject(0), NumberObject(0)
]),
NameObject("/IC"): ArrayObject([
NumberObject(0), NumberObject(0), NumberObject(0)
]),
NameObject("/BS"): DictionaryObject({
NameObject("/W"): NumberObject(0)
})
})
writer.add_annotation(page_number=page_num,
annotation=redaction_annotation)
page_entities += 1
logger.info(f"Page {page_num + 1}: Added {page_entities} redactions")
# Write output
with open(output_path, "wb") as output_file:
writer.write(output_file)
logger.info(f"Redacted PDF created successfully: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating redacted PDF: {str(e)}")
raise
def redact_document(self, pdf_path: str, output_path: str,
dpi: int = 300,
entity_filter: Optional[List[str]] = None) -> Dict:
"""
Complete redaction pipeline
Args:
pdf_path: Path to input PDF
output_path: Path for output PDF
dpi: DPI for OCR
entity_filter: List of entity types to redact (None = all)
Returns:
Dictionary with redaction results
"""
logger.info(f"Starting redaction pipeline for {pdf_path}")
# Step 1: OCR
ocr_data = self.perform_ocr(pdf_path, dpi)
# Step 2: Extract text
full_text = " ".join([word['text'] for word in ocr_data])
# Step 3: NER
ner_results = self.run_ner(full_text)
# Step 4: Map entities to boxes
mapped_entities = self.map_entities_to_boxes(ner_results, ocr_data)
# Step 5: Filter entities if requested
if entity_filter:
mapped_entities = [
e for e in mapped_entities
if e['entity_type'] in entity_filter
]
logger.info(f"Filtered to {len(mapped_entities)} entities of types: {entity_filter}")
# Step 6: Create redacted PDF
self.create_redacted_pdf(pdf_path, mapped_entities, output_path)
return {
'output_path': output_path,
'total_words': len(ocr_data),
'total_entities': len(ner_results),
'redacted_entities': len(mapped_entities),
'entities': mapped_entities
}
|