Yaz Hobooti
Increase PDF resolution: DPI from 300 to 600, scaling factors improved for better OCR and barcode detection
e7a28e8
| import os | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| import pytesseract | |
| from pdf2image import convert_from_path | |
| from pyzbar.pyzbar import decode | |
| from spellchecker import SpellChecker | |
| import nltk | |
| from skimage.metrics import structural_similarity as ssim | |
| from skimage import color | |
| import json | |
| import tempfile | |
| import shutil | |
| import unicodedata | |
| import regex as re | |
| # Domain whitelist for spell checking | |
| DOMAIN_WHITELIST = { | |
| # units / abbreviations | |
| "mg", "mg/g", "ml", "g", "thc", "cbd", "tcm", "mct", | |
| # common packaging terms / bilingual words you expect | |
| "gouttes", "tennir", "net", "zoom", "tytann", "dome", "drops", | |
| # brand or proper names you want to ignore completely | |
| "purified", "brands", "tytann", "dome", "drops", | |
| } | |
| # lowercase everything in whitelist for comparisons | |
| DOMAIN_WHITELIST = {w.lower() for w in DOMAIN_WHITELIST} | |
| # Safe import for regex with fallback | |
| try: | |
| import regex as _re | |
| _USE_REGEX = True | |
| except ImportError: | |
| import re as _re | |
| _USE_REGEX = False | |
| TOKEN_PATTERN = r"(?:\p{L})(?:[\p{L}'-]{1,})" if _USE_REGEX else r"[A-Za-z][A-Za-z'-]{1,}" | |
| class PDFComparator: | |
| def __init__(self): | |
| # Initialize spell checkers for English and French | |
| self.english_spellchecker = SpellChecker(language='en') | |
| self.french_spellchecker = SpellChecker(language='fr') | |
| # Add domain whitelist to spell checkers | |
| for w in DOMAIN_WHITELIST: | |
| self.english_spellchecker.word_frequency.add(w) | |
| self.french_spellchecker.word_frequency.add(w) | |
| # Download required NLTK data | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| nltk.download('punkt') | |
| def enhance_image_for_tiny_fonts(self, image): | |
| """Enhance image specifically for tiny font OCR""" | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(gray) | |
| denoised = cv2.bilateralFilter(enhanced, 9, 75, 75) | |
| gaussian = cv2.GaussianBlur(denoised, (0, 0), 2.0) | |
| unsharp_mask = cv2.addWeighted(denoised, 1.5, gaussian, -0.5, 0) | |
| thresh = cv2.adaptiveThreshold(unsharp_mask, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1)) | |
| cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) | |
| return cleaned | |
| except Exception as e: | |
| print(f"Error enhancing image for tiny fonts: {str(e)}") | |
| return image | |
| def create_inverted_image(self, image): | |
| """Create inverted image for white text on dark backgrounds""" | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| inverted = cv2.bitwise_not(gray) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(inverted) | |
| _, thresh = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| return thresh | |
| except Exception as e: | |
| print(f"Error creating inverted image: {str(e)}") | |
| return image | |
| def extract_color_channels(self, image): | |
| """Extract text from different color channels""" | |
| try: | |
| # RGB channels | |
| b, g, r = cv2.split(image) | |
| # HSV channels | |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv) | |
| # LAB channels | |
| lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) | |
| l, a, b_lab = cv2.split(lab) | |
| channels = [r, g, b, v, l] | |
| texts = [] | |
| for channel in channels: | |
| _, thresh = cv2.threshold(channel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| text = pytesseract.image_to_string(thresh, config='--oem 3 --psm 6') | |
| if text.strip(): | |
| texts.append(text) | |
| return texts | |
| except Exception as e: | |
| print(f"Error extracting color channels: {str(e)}") | |
| return [] | |
| def create_edge_enhanced_image(self, image): | |
| """Create edge-enhanced image for text detection""" | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| kernel = np.ones((2,2), np.uint8) | |
| dilated = cv2.dilate(edges, kernel, iterations=1) | |
| inverted = cv2.bitwise_not(dilated) | |
| return inverted | |
| except Exception as e: | |
| print(f"Error creating edge-enhanced image: {str(e)}") | |
| return image | |
| def ocr_with_multiple_configs(self, image): | |
| """Run OCR with multiple configurations and return best result""" | |
| configs = [ | |
| '--oem 3 --psm 6', # Uniform block of text | |
| '--oem 3 --psm 8', # Single word | |
| '--oem 3 --psm 13', # Raw line | |
| '--oem 1 --psm 6', # LSTM + Uniform block | |
| '--oem 3 --psm 3', # Fully automatic page segmentation | |
| ] | |
| best_text = "" | |
| best_length = 0 | |
| for config in configs: | |
| try: | |
| text = pytesseract.image_to_string(image, config=config) | |
| if len(text.strip()) > best_length: | |
| best_text = text | |
| best_length = len(text.strip()) | |
| except Exception as e: | |
| print(f"OCR config {config} failed: {str(e)}") | |
| continue | |
| return best_text | |
| def extract_multi_color_text(self, image): | |
| """Extract text using multiple preprocessing methods""" | |
| texts = [] | |
| # Method 1: Standard black text | |
| enhanced = self.enhance_image_for_tiny_fonts(image) | |
| text1 = self.ocr_with_multiple_configs(enhanced) | |
| if text1.strip(): | |
| texts.append(text1) | |
| # Method 2: Inverted text (white on dark) | |
| inverted = self.create_inverted_image(image) | |
| text2 = self.ocr_with_multiple_configs(inverted) | |
| if text2.strip(): | |
| texts.append(text2) | |
| # Method 3: Color channel separation | |
| color_texts = self.extract_color_channels(image) | |
| texts.extend(color_texts) | |
| # Method 4: Edge-enhanced | |
| edge_enhanced = self.create_edge_enhanced_image(image) | |
| text4 = self.ocr_with_multiple_configs(edge_enhanced) | |
| if text4.strip(): | |
| texts.append(text4) | |
| # Combine all texts and return the best one | |
| combined_text = " ".join(texts) | |
| return combined_text | |
| def validate_pdf(self, pdf_path): | |
| """Validate that PDF contains '50 Carroll' using enhanced OCR""" | |
| try: | |
| # Multiple DPI settings for better detection | |
| dpi_settings = [200, 300, 400] | |
| for dpi in dpi_settings: | |
| try: | |
| images = convert_from_path(pdf_path, dpi=dpi) | |
| for page_num, image in enumerate(images): | |
| # Convert PIL image to OpenCV format | |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Enhanced text extraction | |
| text = self.extract_multi_color_text(opencv_image) | |
| # Check for "50 Carroll" with multiple patterns | |
| patterns = ["50 Carroll", "50 carroll", "50Carroll", "50 carroll"] | |
| for pattern in patterns: | |
| if pattern in text: | |
| return True | |
| # Also try standard OCR as fallback | |
| standard_text = pytesseract.image_to_string(opencv_image, config='--oem 3 --psm 6') | |
| for pattern in patterns: | |
| if pattern in standard_text: | |
| return True | |
| except Exception as e: | |
| print(f"DPI {dpi} failed: {str(e)}") | |
| continue | |
| return False | |
| except Exception as e: | |
| raise Exception(f"Error validating PDF: {str(e)}") | |
| def extract_text_from_pdf(self, pdf_path): | |
| """Extract text from PDF using enhanced OCR""" | |
| try: | |
| # Use higher DPI for better text extraction | |
| images = convert_from_path(pdf_path, dpi=300) | |
| all_text = [] | |
| for page_num, image in enumerate(images): | |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Enhanced text extraction | |
| text = self.extract_multi_color_text(opencv_image) | |
| # Fallback to standard OCR if enhanced extraction is empty | |
| if not text.strip(): | |
| text = pytesseract.image_to_string(opencv_image, config='--oem 3 --psm 6') | |
| all_text.append({ | |
| 'page': page_num + 1, | |
| 'text': text, | |
| 'image': image | |
| }) | |
| return all_text | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {str(e)}") | |
| def _likely_french(self, token: str) -> bool: | |
| """Helper function to guess if a token is likely French""" | |
| if _USE_REGEX: | |
| # any Latin letter outside ASCII => probably FR (é, è, ç…) | |
| return bool(_re.search(r"[\p{Letter}&&\p{Latin}&&[^A-Za-z]]", token)) | |
| # fallback: any non-ascii letter | |
| return any((not ('a' <= c.lower() <= 'z')) and c.isalpha() for c in token) | |
| def check_spelling(self, text): | |
| """ | |
| Robust EN/FR spell check: | |
| - Unicode-aware tokens (keeps accents) | |
| - Normalizes curly quotes/ligatures | |
| - Heuristic per-token language (accented => FR; else EN) | |
| - Flags if unknown in its likely language (not both) | |
| """ | |
| try: | |
| text = unicodedata.normalize("NFKC", text) | |
| text = text.replace("'", "'").replace(""", '"').replace(""", '"') | |
| tokens = _re.findall(TOKEN_PATTERN, text, flags=_re.UNICODE if _USE_REGEX else 0) | |
| issues = [] | |
| for raw in tokens: | |
| t = raw.lower() | |
| # skip very short, short ALL-CAPS acronyms, and whitelisted terms | |
| if len(t) < 3: | |
| continue | |
| if raw.isupper() and len(raw) <= 3: # Changed from <=5 to <=3 | |
| continue | |
| if t in DOMAIN_WHITELIST: | |
| continue | |
| miss_en = t in self.english_spellchecker.unknown([t]) | |
| miss_fr = t in self.french_spellchecker.unknown([t]) | |
| use_fr = self._likely_french(raw) | |
| # Prefer the likely language, but fall back to "either language unknown" | |
| if (use_fr and miss_fr) or ((not use_fr) and miss_en) or (miss_en and miss_fr): | |
| issues.append({ | |
| "word": raw, | |
| "lang": "fr" if use_fr else "en", | |
| "suggestions_en": list(self.english_spellchecker.candidates(t))[:3], | |
| "suggestions_fr": list(self.french_spellchecker.candidates(t))[:3], | |
| }) | |
| return issues | |
| except Exception as e: | |
| print(f"Error checking spelling: {e}") | |
| return [] | |
| def annotate_spelling_errors_on_image(self, pil_image, misspelled): | |
| """ | |
| Draw one red rectangle around each misspelled token using Tesseract word boxes. | |
| 'misspelled' must be a list of dicts with 'word' keys (from check_spelling). | |
| """ | |
| if not misspelled: | |
| return pil_image | |
| def _norm(s: str) -> str: | |
| return unicodedata.normalize("NFKC", s).replace("'","'").strip(".,:;!?)(").lower() | |
| miss_set = {_norm(m["word"]) for m in misspelled} | |
| img = pil_image | |
| try: | |
| data = pytesseract.image_to_data( | |
| img, | |
| lang="eng+fra", # Added lang parameter | |
| config="--oem 3 --psm 6", | |
| output_type=pytesseract.Output.DICT, | |
| ) | |
| except Exception as e: | |
| print("image_to_data failed:", e) | |
| return img | |
| draw = ImageDraw.Draw(img) | |
| n = len(data.get("text", [])) | |
| for i in range(n): | |
| word = (data["text"][i] or "").strip() | |
| if not word: | |
| continue | |
| clean = _norm(word) # Used _norm function | |
| if clean and clean in miss_set: | |
| x, y, w, h = data["left"][i], data["top"][i], data["width"][i], data["height"][i] | |
| draw.rectangle([x, y, x + w, y + h], outline="red", width=4) | |
| return img | |
| def detect_barcodes_qr_codes(self, image): | |
| """Detect and decode barcodes and QR codes""" | |
| try: | |
| # Convert PIL image to OpenCV format | |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Decode barcodes and QR codes | |
| decoded_objects = decode(opencv_image) | |
| barcodes = [] | |
| for obj in decoded_objects: | |
| barcode_info = { | |
| 'type': obj.type, | |
| 'data': obj.data.decode('utf-8'), | |
| 'rect': obj.rect | |
| } | |
| barcodes.append(barcode_info) | |
| return barcodes | |
| except Exception as e: | |
| print(f"Error detecting barcodes: {str(e)}") | |
| return [] | |
| def compare_colors(self, image1, image2): | |
| """Compare colors between two images and return differences""" | |
| try: | |
| # Convert images to same size | |
| img1 = np.array(image1) | |
| img2 = np.array(image2) | |
| # Resize images to same dimensions | |
| height = min(img1.shape[0], img2.shape[0]) | |
| width = min(img1.shape[1], img2.shape[1]) | |
| img1_resized = cv2.resize(img1, (width, height)) | |
| img2_resized = cv2.resize(img2, (width, height)) | |
| # Convert to grayscale for comparison | |
| gray1 = cv2.cvtColor(img1_resized, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2_resized, cv2.COLOR_RGB2GRAY) | |
| # Calculate structural similarity | |
| (score, diff) = ssim(gray1, gray2, full=True) | |
| # Convert difference to binary mask | |
| diff = (diff * 255).astype("uint8") | |
| thresh = cv2.threshold(diff, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1] | |
| # Find contours of differences | |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| color_differences = [] | |
| for contour in contours: | |
| if cv2.contourArea(contour) > 100: # Filter small differences | |
| x, y, w, h = cv2.boundingRect(contour) | |
| color_differences.append({ | |
| 'x': x, | |
| 'y': y, | |
| 'width': w, | |
| 'height': h, | |
| 'area': cv2.contourArea(contour) | |
| }) | |
| return color_differences | |
| except Exception as e: | |
| print(f"Error comparing colors: {str(e)}") | |
| return [] | |
| def create_annotated_image(self, image, differences, output_path): | |
| """Create annotated image with red boxes around differences""" | |
| try: | |
| # Create a copy of the image | |
| annotated_image = image.copy() | |
| draw = ImageDraw.Draw(annotated_image) | |
| # Draw red rectangles around differences | |
| for diff in differences: | |
| x, y, w, h = diff['x'], diff['y'], diff['width'], diff['height'] | |
| draw.rectangle([x, y, x + w, y + h], outline='red', width=3) | |
| # Save annotated image | |
| annotated_image.save(output_path) | |
| except Exception as e: | |
| print(f"Error creating annotated image: {str(e)}") | |
| def compare_pdfs(self, pdf1_path, pdf2_path, session_id): | |
| """Main comparison function""" | |
| try: | |
| # Validate both PDFs contain "50 Carroll" | |
| if not self.validate_pdf(pdf1_path): | |
| raise Exception("INVALID DOCUMENT") | |
| if not self.validate_pdf(pdf2_path): | |
| raise Exception("INVALID DOCUMENT") | |
| # Extract text and images from both PDFs | |
| pdf1_data = self.extract_text_from_pdf(pdf1_path) | |
| pdf2_data = self.extract_text_from_pdf(pdf2_path) | |
| # Initialize results | |
| results = { | |
| 'session_id': session_id, | |
| 'validation': { | |
| 'pdf1_valid': True, | |
| 'pdf2_valid': True, | |
| 'validation_text': '50 Carroll' | |
| }, | |
| 'text_comparison': [], | |
| 'spelling_issues': [], | |
| 'barcodes_qr_codes': [], | |
| 'color_differences': [], | |
| 'annotated_images': [] | |
| } | |
| # Compare text and check spelling | |
| for i, (page1, page2) in enumerate(zip(pdf1_data, pdf2_data)): | |
| page_results = { | |
| 'page': i + 1, | |
| 'text_differences': [], | |
| 'spelling_issues_pdf1': [], | |
| 'spelling_issues_pdf2': [], | |
| 'barcodes_pdf1': [], | |
| 'barcodes_pdf2': [], | |
| 'color_differences': [] | |
| } | |
| # Check spelling for both PDFs | |
| page_results['spelling_issues_pdf1'] = self.check_spelling(page1['text']) | |
| page_results['spelling_issues_pdf2'] = self.check_spelling(page2['text']) | |
| # Create spelling-only annotated images (one box per error) | |
| spell_dir = f'static/results/{session_id}' | |
| os.makedirs(spell_dir, exist_ok=True) | |
| spell_img1 = page1['image'].copy() | |
| spell_img2 = page2['image'].copy() | |
| spell_img1 = self.annotate_spelling_errors_on_image(spell_img1, page_results['spelling_issues_pdf1']) | |
| spell_img2 = self.annotate_spelling_errors_on_image(spell_img2, page_results['spelling_issues_pdf2']) | |
| spell_path1 = f'{spell_dir}/page_{i+1}_pdf1_spelling.png' | |
| spell_path2 = f'{spell_dir}/page_{i+1}_pdf2_spelling.png' | |
| spell_img1.save(spell_path1) | |
| spell_img2.save(spell_path2) | |
| # Detect barcodes and QR codes | |
| page_results['barcodes_pdf1'] = self.detect_barcodes_qr_codes(page1['image']) | |
| page_results['barcodes_pdf2'] = self.detect_barcodes_qr_codes(page2['image']) | |
| # Compare colors | |
| color_diffs = self.compare_colors(page1['image'], page2['image']) | |
| page_results['color_differences'] = color_diffs | |
| # Create annotated images | |
| if color_diffs: | |
| output_dir = f'static/results/{session_id}' | |
| os.makedirs(output_dir, exist_ok=True) | |
| annotated_path1 = f'{output_dir}/page_{i+1}_pdf1_annotated.png' | |
| annotated_path2 = f'{output_dir}/page_{i+1}_pdf2_annotated.png' | |
| self.create_annotated_image(page1['image'], color_diffs, annotated_path1) | |
| self.create_annotated_image(page2['image'], color_diffs, annotated_path2) | |
| page_results['annotated_images'] = { | |
| 'pdf1': f'results/{session_id}/page_{i+1}_pdf1_annotated.png', | |
| 'pdf2': f'results/{session_id}/page_{i+1}_pdf2_annotated.png', | |
| 'pdf1_spelling': f'results/{session_id}/page_{i+1}_pdf1_spelling.png', | |
| 'pdf2_spelling': f'results/{session_id}/page_{i+1}_pdf2_spelling.png' | |
| } | |
| else: | |
| # If no color differences, still save spelling images | |
| page_results['annotated_images'] = { | |
| 'pdf1_spelling': f'results/{session_id}/page_{i+1}_pdf1_spelling.png', | |
| 'pdf2_spelling': f'results/{session_id}/page_{i+1}_pdf2_spelling.png' | |
| } | |
| # Add spelling issues summary to text differences | |
| if page_results['spelling_issues_pdf1'] or page_results['spelling_issues_pdf2']: | |
| page_results['text_differences'].append({ | |
| 'type': 'spelling', | |
| 'pdf1_issues': len(page_results['spelling_issues_pdf1']), | |
| 'pdf2_issues': len(page_results['spelling_issues_pdf2']), | |
| 'details': { | |
| 'pdf1': [issue['word'] for issue in page_results['spelling_issues_pdf1']], | |
| 'pdf2': [issue['word'] for issue in page_results['spelling_issues_pdf2']] | |
| } | |
| }) | |
| results['text_comparison'].append(page_results) | |
| # Aggregate spelling issues | |
| all_spelling_issues = [] | |
| for page in results['text_comparison']: | |
| all_spelling_issues.extend(page['spelling_issues_pdf1']) | |
| all_spelling_issues.extend(page['spelling_issues_pdf2']) | |
| results['spelling_issues'] = all_spelling_issues | |
| # Aggregate barcodes and QR codes | |
| all_barcodes = [] | |
| for page in results['text_comparison']: | |
| all_barcodes.extend(page['barcodes_pdf1']) | |
| all_barcodes.extend(page['barcodes_pdf2']) | |
| results['barcodes_qr_codes'] = all_barcodes | |
| return results | |
| except Exception as e: | |
| raise Exception(f"Error comparing PDFs: {str(e)}") | |