| import os |
| import cv2 |
| import numpy as np |
| from PIL import Image, ImageDraw, ImageFont |
| import pytesseract |
| from pdf2image import convert_from_path |
| from pyzbar.pyzbar import decode |
| from spellchecker import SpellChecker |
| import nltk |
| from skimage.metrics import structural_similarity as ssim |
| from skimage import color |
| import json |
| import tempfile |
| import shutil |
| import re |
| import time |
| import signal |
| import unicodedata |
|
|
| |
| try: |
| import regex as _re |
| _USE_REGEX = True |
| except ImportError: |
| import re as _re |
| _USE_REGEX = False |
|
|
| TOKEN_PATTERN = r"(?:\p{L})(?:[\p{L}'-]{1,})" if _USE_REGEX else r"[A-Za-z][A-Za-z'-]{1,}" |
|
|
| |
| DOMAIN_WHITELIST = { |
| |
| "mg", "mg/g", "ml", "g", "thc", "cbd", "tcm", "mct", |
| |
| "gouttes", "tennir", "net", "zoom", "tytann", "dome", "drops", |
| |
| "purified", "brands", "tytann", "dome", "drops", |
| } |
| |
| DOMAIN_WHITELIST = {w.lower() for w in DOMAIN_WHITELIST} |
|
|
| def _likely_french(token: str) -> bool: |
| """Helper: quick language guess per token""" |
| if _USE_REGEX: |
| |
| return bool(_re.search(r"[\p{Letter}&&\p{Latin}&&[^A-Za-z]]", token)) |
| |
| return any((not ('a' <= c.lower() <= 'z')) and c.isalpha() for c in token) |
|
|
| |
| try: |
| import zxing |
| ZXING_AVAILABLE = True |
| except ImportError: |
| ZXING_AVAILABLE = False |
| print("zxing-cpp not available, using pyzbar only") |
|
|
| try: |
| from dbr import BarcodeReader |
| DBR_AVAILABLE = True |
| print("Dynamsoft Barcode Reader available") |
| except ImportError: |
| DBR_AVAILABLE = False |
| print("Dynamsoft Barcode Reader not available") |
|
|
| class TimeoutError(Exception): |
| pass |
|
|
| def timeout_handler(signum, frame): |
| raise TimeoutError("Operation timed out") |
|
|
| class PDFComparator: |
| def __init__(self): |
| |
| self.english_spellchecker = SpellChecker(language='en') |
| self.french_spellchecker = SpellChecker(language='fr') |
| |
| |
| for w in DOMAIN_WHITELIST: |
| self.english_spellchecker.word_frequency.add(w) |
| self.french_spellchecker.word_frequency.add(w) |
| |
| |
| try: |
| nltk.data.find('tokenizers/punkt') |
| except LookupError: |
| nltk.download('punkt') |
| |
| def safe_execute(self, func, *args, timeout=30, **kwargs): |
| """Execute a function with timeout protection""" |
| try: |
| |
| signal.signal(signal.SIGALRM, timeout_handler) |
| signal.alarm(timeout) |
| |
| |
| result = func(*args, **kwargs) |
| |
| |
| signal.alarm(0) |
| return result |
| |
| except TimeoutError: |
| print(f"Function {func.__name__} timed out after {timeout} seconds") |
| return None |
| except Exception as e: |
| print(f"Error in {func.__name__}: {str(e)}") |
| return None |
| finally: |
| signal.alarm(0) |
| |
| def validate_pdf(self, pdf_path): |
| """Validate that PDF contains '50 Carroll' using enhanced OCR for tiny fonts""" |
| try: |
| print(f"Validating PDF: {pdf_path}") |
| |
| |
| dpi_settings = [300, 400, 600, 800] |
| |
| for dpi in dpi_settings: |
| print(f"Trying DPI {dpi} for tiny font detection...") |
| |
| |
| images = convert_from_path(pdf_path, dpi=dpi) |
| print(f"Converted PDF to {len(images)} images at {dpi} DPI") |
| |
| for page_num, image in enumerate(images): |
| print(f"Processing page {page_num + 1} at {dpi} DPI...") |
| |
| |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
| |
| |
| processed_image = self.enhance_image_for_tiny_fonts(opencv_image) |
| |
| |
| ocr_configs = [ |
| '--oem 3 --psm 6', |
| '--oem 3 --psm 8', |
| '--oem 3 --psm 13', |
| '--oem 1 --psm 6', |
| '--oem 3 --psm 3', |
| ] |
| |
| for config in ocr_configs: |
| try: |
| |
| text = pytesseract.image_to_string(processed_image, config=config) |
| |
| |
| debug_text = text[:300].replace('\n', ' ').replace('\r', ' ') |
| print(f"Page {page_num + 1} text (DPI {dpi}, config: {config}): '{debug_text}...'") |
| |
| |
| patterns = ["50 Carroll", "50 carroll", "50Carroll", "50carroll", "50 Carroll", "50 carroll"] |
| for pattern in patterns: |
| if pattern in text or pattern.lower() in text.lower(): |
| print(f"Found '{pattern}' in page {page_num + 1} (DPI {dpi}, config: {config})") |
| return True |
| |
| except Exception as ocr_error: |
| print(f"OCR error with config {config}: {str(ocr_error)}") |
| continue |
| |
| print("Validation failed: '50 Carroll' not found in any page with any DPI or OCR config") |
| return False |
| |
| except Exception as e: |
| print(f"Error validating PDF: {str(e)}") |
| raise Exception(f"Error validating PDF: {str(e)}") |
| |
| def enhance_image_for_tiny_fonts(self, image): |
| """Enhance image specifically for tiny font OCR""" |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) |
| enhanced = clahe.apply(gray) |
| |
| |
| denoised = cv2.bilateralFilter(enhanced, 9, 75, 75) |
| |
| |
| gaussian = cv2.GaussianBlur(denoised, (0, 0), 2.0) |
| unsharp_mask = cv2.addWeighted(denoised, 1.5, gaussian, -0.5, 0) |
| |
| |
| thresh = cv2.adaptiveThreshold(unsharp_mask, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) |
| |
| |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1)) |
| cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) |
| |
| return cleaned |
| |
| except Exception as e: |
| print(f"Error enhancing image for tiny fonts: {str(e)}") |
| return image |
| |
| def extract_text_from_pdf(self, pdf_path): |
| """Extract text from PDF with multi-color text detection.""" |
| try: |
| |
| embedded_text = "" |
| try: |
| import fitz |
| doc = fitz.open(pdf_path) |
| all_text = [] |
| any_text = False |
| for i, page in enumerate(doc): |
| t = page.get_text() |
| any_text |= bool(t.strip()) |
| all_text.append({"page": i+1, "text": t, "image": None}) |
| doc.close() |
| if any_text: |
| |
| images = convert_from_path(pdf_path, dpi=600) |
| for d, im in zip(all_text, images): |
| d["image"] = im |
| return all_text |
| except Exception: |
| pass |
|
|
| |
| print("Extracting text with multi-color detection...") |
| images = convert_from_path(pdf_path, dpi=600) |
| all_text = [] |
| |
| for page_num, image in enumerate(images): |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
| |
| |
| combined_text = self.extract_multi_color_text(opencv_image) |
| |
| all_text.append({ |
| 'page': page_num + 1, |
| 'text': combined_text, |
| 'image': image |
| }) |
| |
| return all_text |
| |
| except Exception as e: |
| raise Exception(f"Error extracting text from PDF: {str(e)}") |
| |
| def extract_multi_color_text(self, image): |
| """Extract text from image in various colors using multiple preprocessing methods.""" |
| try: |
| combined_text = "" |
| |
| |
| print("Method 1: Standard black text detection") |
| processed_image = self.enhance_image_for_tiny_fonts(image) |
| text1 = self.ocr_with_multiple_configs(processed_image) |
| combined_text += text1 + " " |
| |
| |
| print("Method 2: Inverted text detection") |
| inverted_image = self.create_inverted_image(image) |
| text2 = self.ocr_with_multiple_configs(inverted_image) |
| combined_text += text2 + " " |
| |
| |
| print("Method 3: Color channel separation") |
| for channel_name, channel_image in self.extract_color_channels(image): |
| text3 = self.ocr_with_multiple_configs(channel_image) |
| combined_text += text3 + " " |
| |
| |
| print("Method 4: Edge-based text detection") |
| edge_image = self.create_edge_enhanced_image(image) |
| text4 = self.ocr_with_multiple_configs(edge_image) |
| combined_text += text4 + " " |
| |
| return combined_text.strip() |
| |
| except Exception as e: |
| print(f"Error in multi-color text extraction: {str(e)}") |
| return "" |
| |
| def create_inverted_image(self, image): |
| """Create inverted image for white text detection.""" |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| inverted = cv2.bitwise_not(gray) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) |
| enhanced = clahe.apply(inverted) |
| |
| |
| _, thresh = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
| |
| return thresh |
| |
| except Exception as e: |
| print(f"Error creating inverted image: {str(e)}") |
| return image |
| |
| def extract_color_channels(self, image): |
| """Extract individual color channels for colored text detection.""" |
| try: |
| channels = [] |
| |
| |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) |
| lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) |
| |
| |
| b, g, r = cv2.split(image) |
| h, s, v = cv2.split(hsv) |
| l, a, b_lab = cv2.split(lab) |
| |
| |
| channel_images = [ |
| ("blue", b), |
| ("green", g), |
| ("red", r), |
| ("hue", h), |
| ("saturation", s), |
| ("value", v), |
| ("lightness", l) |
| ] |
| |
| for name, channel in channel_images: |
| |
| _, thresh = cv2.threshold(channel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
| channels.append((name, thresh)) |
| |
| return channels |
| |
| except Exception as e: |
| print(f"Error extracting color channels: {str(e)}") |
| return [] |
| |
| def create_edge_enhanced_image(self, image): |
| """Create edge-enhanced image for text detection.""" |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| edges = cv2.Canny(gray, 50, 150) |
| |
| |
| kernel = np.ones((2, 2), np.uint8) |
| dilated = cv2.dilate(edges, kernel, iterations=1) |
| |
| |
| inverted = cv2.bitwise_not(dilated) |
| |
| return inverted |
| |
| except Exception as e: |
| print(f"Error creating edge-enhanced image: {str(e)}") |
| return image |
| |
| def ocr_with_multiple_configs(self, image): |
| """Perform OCR with multiple configurations.""" |
| try: |
| ocr_configs = [ |
| '--oem 3 --psm 6', |
| '--oem 3 --psm 8', |
| '--oem 3 --psm 13', |
| '--oem 1 --psm 6', |
| ] |
| |
| best_text = "" |
| for config in ocr_configs: |
| try: |
| text = pytesseract.image_to_string(image, config=config) |
| if len(text.strip()) > len(best_text.strip()): |
| best_text = text |
| except Exception as ocr_error: |
| print(f"OCR error with config {config}: {str(ocr_error)}") |
| continue |
| |
| return best_text |
| |
| except Exception as e: |
| print(f"Error in OCR with multiple configs: {str(e)}") |
| return "" |
| |
| def annotate_spelling_errors_on_image(self, pil_image, misspelled): |
| """ |
| Draw one red rectangle around each misspelled token using Tesseract word boxes. |
| 'misspelled' must be a list of dicts with 'word' keys (from check_spelling). |
| """ |
| if not misspelled: |
| return pil_image |
|
|
| def _norm(s: str) -> str: |
| return unicodedata.normalize("NFKC", s).replace("'","'").strip(".,:;!?)(").lower() |
|
|
| |
| miss_set = {_norm(m["word"]) for m in misspelled} |
|
|
| |
| img = pil_image |
| try: |
| data = pytesseract.image_to_data( |
| img, |
| lang="eng+fra", |
| config="--oem 3 --psm 6", |
| output_type=pytesseract.Output.DICT, |
| ) |
| except Exception as e: |
| print("image_to_data failed:", e) |
| return img |
|
|
| draw = ImageDraw.Draw(img) |
| n = len(data.get("text", [])) |
| for i in range(n): |
| word = (data["text"][i] or "").strip() |
| if not word: |
| continue |
| clean = _norm(word) |
|
|
| if clean and clean in miss_set: |
| x, y, w, h = data["left"][i], data["top"][i], data["width"][i], data["height"][i] |
| |
| draw.rectangle([x, y, x + w, y + h], outline="red", width=4) |
|
|
| return img |
| |
| def detect_barcodes_qr_codes(self, image): |
| """Detect and decode barcodes and QR codes with timeout protection""" |
| try: |
| print("Starting barcode detection...") |
| start_time = time.time() |
| |
| |
| opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
| |
| all_barcodes = [] |
| |
| |
| print("Method 1: Basic pyzbar detection") |
| pyzbar_results = self.detect_with_pyzbar_basic(opencv_image) |
| if pyzbar_results: |
| all_barcodes.extend(pyzbar_results) |
| print(f"Found {len(pyzbar_results)} barcodes with basic pyzbar") |
| |
| |
| if DBR_AVAILABLE: |
| print("Method 2: Dynamsoft Barcode Reader") |
| dbr_results = self.detect_with_dynamsoft(opencv_image) |
| if dbr_results: |
| all_barcodes.extend(dbr_results) |
| print(f"Found {len(dbr_results)} barcodes with Dynamsoft") |
| |
| |
| print("Method 3: Enhanced preprocessing") |
| enhanced_results = self.detect_with_enhanced_preprocessing(opencv_image) |
| if enhanced_results: |
| all_barcodes.extend(enhanced_results) |
| print(f"Found {len(enhanced_results)} additional barcodes with enhanced preprocessing") |
| |
| |
| print("Method 4: Small barcode detection") |
| small_results = self.detect_small_barcodes_simple(opencv_image) |
| if small_results: |
| all_barcodes.extend(small_results) |
| print(f"Found {len(small_results)} additional small barcodes") |
| |
| |
| unique_barcodes = self.remove_duplicate_barcodes(all_barcodes) |
| |
| |
| enhanced_barcodes = self.enhance_barcode_data(unique_barcodes) |
| |
| elapsed_time = time.time() - start_time |
| print(f"Barcode detection completed in {elapsed_time:.2f} seconds. Found {len(enhanced_barcodes)} unique barcodes.") |
| |
| return enhanced_barcodes |
| |
| except Exception as e: |
| print(f"Error in barcode detection: {str(e)}") |
| return [] |
| |
| def detect_with_pyzbar_basic(self, image): |
| """Basic pyzbar detection without complex preprocessing""" |
| results = [] |
| |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| decoded_objects = decode(gray) |
| for obj in decoded_objects: |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': obj.rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': 'pyzbar_basic' |
| } |
| |
| if 'databar' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| results.append(barcode_info) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(gray) |
| decoded_objects = decode(enhanced) |
| |
| for obj in decoded_objects: |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': obj.rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': 'pyzbar_enhanced' |
| } |
| |
| if 'databar' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in basic pyzbar detection: {str(e)}") |
| |
| return results |
| |
| def detect_with_dynamsoft(self, image): |
| """Detect barcodes using Dynamsoft Barcode Reader""" |
| results = [] |
| |
| try: |
| if not DBR_AVAILABLE: |
| return results |
| |
| |
| reader = BarcodeReader() |
| |
| |
| success, buffer = cv2.imencode('.png', image) |
| if not success: |
| print("Failed to encode image for Dynamsoft") |
| return results |
| |
| image_bytes = buffer.tobytes() |
| |
| |
| text_results = reader.decode_file_stream(image_bytes) |
| |
| for result in text_results: |
| barcode_info = { |
| 'type': result.barcode_format_string, |
| 'data': result.barcode_text, |
| 'rect': type('Rect', (), { |
| 'left': result.localization_result.x1, |
| 'top': result.localization_result.y1, |
| 'width': result.localization_result.x2 - result.localization_result.x1, |
| 'height': result.localization_result.y2 - result.localization_result.y1 |
| })(), |
| 'polygon': [ |
| (result.localization_result.x1, result.localization_result.y1), |
| (result.localization_result.x2, result.localization_result.y1), |
| (result.localization_result.x2, result.localization_result.y2), |
| (result.localization_result.x1, result.localization_result.y2) |
| ], |
| 'quality': result.confidence, |
| 'orientation': self.detect_barcode_orientation(result), |
| 'method': 'dynamsoft' |
| } |
| |
| |
| if 'databar' in result.barcode_format_string.lower() or 'expanded' in result.barcode_format_string.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(result.barcode_text) |
| |
| results.append(barcode_info) |
| |
| print(f"Dynamsoft detected {len(results)} barcodes") |
| |
| except Exception as e: |
| print(f"Error in Dynamsoft detection: {str(e)}") |
| |
| return results |
| |
| def detect_with_enhanced_preprocessing(self, image): |
| """Enhanced preprocessing with limited methods""" |
| results = [] |
| |
| try: |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| processed_images = [ |
| gray, |
| cv2.resize(gray, (gray.shape[1] * 3, gray.shape[0] * 3), interpolation=cv2.INTER_CUBIC), |
| cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2), |
| ] |
| |
| for i, processed_image in enumerate(processed_images): |
| try: |
| decoded_objects = decode(processed_image) |
| |
| for obj in decoded_objects: |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': obj.rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': f'enhanced_preprocessing_{i}' |
| } |
| |
| if 'databar' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in enhanced preprocessing method {i}: {str(e)}") |
| continue |
| |
| except Exception as e: |
| print(f"Error in enhanced preprocessing: {str(e)}") |
| |
| return results |
| |
| def detect_small_barcodes_simple(self, image): |
| """Simplified small barcode detection""" |
| results = [] |
| |
| try: |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| scale_factors = [3.0, 4.0] |
| |
| for scale in scale_factors: |
| try: |
| height, width = gray.shape |
| new_height, new_width = int(height * scale), int(width * scale) |
| scaled = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| |
| decoded_objects = decode(scaled) |
| |
| for obj in decoded_objects: |
| |
| scale_factor = width / new_width |
| scaled_rect = type('Rect', (), { |
| 'left': int(obj.rect.left * scale_factor), |
| 'top': int(obj.rect.top * scale_factor), |
| 'width': int(obj.rect.width * scale_factor), |
| 'height': int(obj.rect.height * scale_factor) |
| })() |
| |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': scaled_rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': f'small_barcode_{scale}x', |
| 'size_category': 'small' |
| } |
| |
| if 'databar' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in small barcode detection at {scale}x: {str(e)}") |
| continue |
| |
| except Exception as e: |
| print(f"Error in small barcode detection: {str(e)}") |
| |
| return results |
| |
| def preprocess_image_for_ocr(self, image): |
| """Preprocess image for better OCR results""" |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| |
| |
| height, width = gray.shape |
| scale_factor = 3.0 |
| new_height, new_width = int(height * scale_factor), int(width * scale_factor) |
| resized = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| |
| |
| blurred = cv2.GaussianBlur(resized, (1, 1), 0) |
| |
| |
| thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) |
| |
| |
| kernel = np.ones((1, 1), np.uint8) |
| cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(cleaned) |
| |
| return enhanced |
| |
| except Exception as e: |
| print(f"Error preprocessing image: {str(e)}") |
| return image |
| |
| def preprocess_for_barcode_detection(self, image): |
| """Preprocess image with multiple techniques for better barcode detection""" |
| processed_images = [image] |
| |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| processed_images.append(gray) |
| |
| |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(gray) |
| processed_images.append(enhanced) |
| |
| |
| blurred = cv2.GaussianBlur(gray, (3, 3), 0) |
| processed_images.append(blurred) |
| |
| |
| thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) |
| processed_images.append(thresh) |
| |
| |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) |
| sharpened = cv2.filter2D(gray, -1, kernel) |
| processed_images.append(sharpened) |
| |
| |
| height, width = gray.shape |
| scale_factor = 3.0 |
| new_height, new_width = int(height * scale_factor), int(width * scale_factor) |
| scaled = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| processed_images.append(scaled) |
| |
| except Exception as e: |
| print(f"Error in barcode preprocessing: {str(e)}") |
| |
| return processed_images |
| |
| def preprocess_for_databar(self, gray_image): |
| """Specialized preprocessing for DataBar Expanded Stacked barcodes""" |
| processed_images = [] |
| |
| try: |
| |
| processed_images.append(gray_image) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(gray_image) |
| processed_images.append(enhanced) |
| |
| |
| bilateral = cv2.bilateralFilter(gray_image, 9, 75, 75) |
| processed_images.append(bilateral) |
| |
| |
| thresh1 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 15, 2) |
| processed_images.append(thresh1) |
| |
| thresh2 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) |
| processed_images.append(thresh2) |
| |
| |
| height, width = gray_image.shape |
| scale_factors = [2.0, 3.0, 4.0] |
| |
| for scale in scale_factors: |
| new_height, new_width = int(height * scale), int(width * scale) |
| scaled = cv2.resize(gray_image, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| processed_images.append(scaled) |
| |
| |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) |
| sharpened = cv2.filter2D(gray_image, -1, kernel) |
| processed_images.append(sharpened) |
| |
| |
| kernel = np.ones((2, 2), np.uint8) |
| morphed = cv2.morphologyEx(gray_image, cv2.MORPH_CLOSE, kernel) |
| processed_images.append(morphed) |
| |
| except Exception as e: |
| print(f"Error in DataBar preprocessing: {str(e)}") |
| |
| return processed_images |
| |
| def detect_with_transformations(self, image): |
| """Detect barcodes using multiple image transformations""" |
| results = [] |
| |
| try: |
| |
| angles = [0, 90, 180, 270] |
| |
| for angle in angles: |
| if angle == 0: |
| rotated_image = image |
| else: |
| height, width = image.shape[:2] |
| center = (width // 2, height // 2) |
| rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0) |
| rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height)) |
| |
| |
| try: |
| decoded_objects = decode(rotated_image) |
| |
| for obj in decoded_objects: |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': obj.rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': f"{angle}°", |
| 'method': f'transform_{angle}deg' |
| } |
| |
| |
| if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| |
| if self.is_multi_stack_barcode(obj, rotated_image): |
| barcode_info['stack_type'] = self.detect_stack_type(obj, rotated_image) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in transformation detection at {angle}°: {str(e)}") |
| continue |
| |
| except Exception as e: |
| print(f"Error in transformation detection: {str(e)}") |
| |
| return results |
| |
| def detect_small_barcodes(self, image): |
| """Specialized detection for small barcodes and QR codes""" |
| results = [] |
| |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| processed_images = self.preprocess_for_small_barcodes(gray) |
| |
| for processed_image in processed_images: |
| try: |
| decoded_objects = decode(processed_image) |
| |
| for obj in decoded_objects: |
| |
| if obj.rect.width < 50 or obj.rect.height < 50: |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': obj.rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': 'small_barcode_detection', |
| 'size_category': 'small' |
| } |
| |
| |
| if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| |
| if self.is_multi_stack_barcode(obj, image): |
| barcode_info['stack_type'] = self.detect_stack_type(obj, image) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in small barcode detection: {str(e)}") |
| continue |
| |
| except Exception as e: |
| print(f"Error in small barcode preprocessing: {str(e)}") |
| |
| return results |
| |
| def preprocess_for_small_barcodes(self, gray_image): |
| """Specialized preprocessing for small barcodes and QR codes""" |
| processed_images = [] |
| |
| try: |
| |
| processed_images.append(gray_image) |
| |
| |
| height, width = gray_image.shape |
| scale_factors = [4.0, 5.0, 6.0, 8.0] |
| |
| for scale in scale_factors: |
| new_height, new_width = int(height * scale), int(width * scale) |
| scaled = cv2.resize(gray_image, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| processed_images.append(scaled) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(gray_image) |
| processed_images.append(enhanced) |
| |
| |
| gaussian = cv2.GaussianBlur(gray_image, (0, 0), 2.0) |
| unsharp = cv2.addWeighted(gray_image, 1.5, gaussian, -0.5, 0) |
| processed_images.append(unsharp) |
| |
| |
| |
| _, otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
| processed_images.append(otsu) |
| |
| |
| adaptive1 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 9, 2) |
| processed_images.append(adaptive1) |
| |
| adaptive2 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 7, 2) |
| processed_images.append(adaptive2) |
| |
| |
| |
| bilateral = cv2.bilateralFilter(gray_image, 9, 75, 75) |
| processed_images.append(bilateral) |
| |
| |
| median = cv2.medianBlur(gray_image, 3) |
| processed_images.append(median) |
| |
| |
| |
| sobel_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3) |
| sobel_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3) |
| sobel = np.sqrt(sobel_x**2 + sobel_y**2) |
| sobel = np.uint8(sobel * 255 / sobel.max()) |
| processed_images.append(sobel) |
| |
| |
| kernel = np.ones((2, 2), np.uint8) |
| morphed_close = cv2.morphologyEx(gray_image, cv2.MORPH_CLOSE, kernel) |
| processed_images.append(morphed_close) |
| |
| kernel_open = np.ones((1, 1), np.uint8) |
| morphed_open = cv2.morphologyEx(gray_image, cv2.MORPH_OPEN, kernel_open) |
| processed_images.append(morphed_open) |
| |
| except Exception as e: |
| print(f"Error in small barcode preprocessing: {str(e)}") |
| |
| return processed_images |
| |
| def detect_with_high_resolution(self, image): |
| """Detect barcodes using high-resolution processing""" |
| results = [] |
| |
| try: |
| |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| |
| |
| height, width = gray.shape |
| resolutions = [ |
| (int(width * 3), int(height * 3)), |
| (int(width * 4), int(height * 4)), |
| (int(width * 6), int(height * 6)) |
| ] |
| |
| for new_width, new_height in resolutions: |
| try: |
| |
| resized = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) |
| |
| |
| processed = self.preprocess_high_resolution(resized) |
| |
| |
| decoded_objects = decode(processed) |
| |
| for obj in decoded_objects: |
| |
| scale_factor = width / new_width |
| scaled_rect = type('Rect', (), { |
| 'left': int(obj.rect.left * scale_factor), |
| 'top': int(obj.rect.top * scale_factor), |
| 'width': int(obj.rect.width * scale_factor), |
| 'height': int(obj.rect.height * scale_factor) |
| })() |
| |
| barcode_info = { |
| 'type': obj.type, |
| 'data': obj.data.decode('utf-8', errors='ignore'), |
| 'rect': scaled_rect, |
| 'polygon': obj.polygon, |
| 'quality': getattr(obj, 'quality', 0), |
| 'orientation': self.detect_barcode_orientation(obj), |
| 'method': f'high_res_{new_width}x{new_height}', |
| 'resolution': f'{new_width}x{new_height}' |
| } |
| |
| |
| if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower(): |
| barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore')) |
| |
| |
| if self.is_multi_stack_barcode(obj, image): |
| barcode_info['stack_type'] = self.detect_stack_type(obj, image) |
| |
| results.append(barcode_info) |
| |
| except Exception as e: |
| print(f"Error in high-resolution detection at {new_width}x{new_height}: {str(e)}") |
| continue |
| |
| except Exception as e: |
| print(f"Error in high-resolution detection: {str(e)}") |
| |
| return results |
| |
| def preprocess_high_resolution(self, image): |
| """Preprocessing optimized for high-resolution images""" |
| try: |
| |
| denoised = cv2.fastNlMeansDenoising(image) |
| |
| |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
| enhanced = clahe.apply(denoised) |
| |
| |
| bilateral = cv2.bilateralFilter(enhanced, 9, 75, 75) |
| |
| |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) |
| sharpened = cv2.filter2D(bilateral, -1, kernel) |
| |
| |
| thresh = cv2.adaptiveThreshold(sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) |
| |
| return thresh |
| |
| except Exception as e: |
| print(f"Error in high-resolution preprocessing: {str(e)}") |
| return image |
| |
| def detect_barcode_orientation(self, barcode_obj): |
| """Detect the orientation of the barcode""" |
| try: |
| if hasattr(barcode_obj, 'polygon') and len(barcode_obj.polygon) >= 4: |
| |
| points = np.array(barcode_obj.polygon) |
| |
| edges = [] |
| for i in range(4): |
| p1 = points[i] |
| p2 = points[(i + 1) % 4] |
| edge_length = np.linalg.norm(p2 - p1) |
| angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) * 180 / np.pi |
| edges.append((edge_length, angle)) |
| |
| |
| longest_edge = max(edges, key=lambda x: x[0]) |
| return f"{longest_edge[1]:.1f}°" |
| |
| return "Unknown" |
| except: |
| return "Unknown" |
| |
| def parse_databar_expanded(self, data): |
| """Parse DataBar Expanded barcode data""" |
| try: |
| |
| |
| parsed_data = {} |
| |
| |
| ai_pattern = r'\[(\d{2,4})\]([^\[]+)' |
| matches = re.findall(ai_pattern, data) |
| |
| for ai, value in matches: |
| parsed_data[f"AI {ai}"] = value |
| |
| |
| if not parsed_data: |
| parsed_data["Raw Data"] = data |
| |
| return parsed_data |
| |
| except Exception as e: |
| return {"Raw Data": data, "Parse Error": str(e)} |
| |
| def is_multi_stack_barcode(self, barcode_obj, image): |
| """Detect if this is a multi-stack barcode""" |
| try: |
| if hasattr(barcode_obj, 'rect'): |
| x, y, w, h = barcode_obj.rect |
| |
| |
| aspect_ratio = h / w if w > 0 else 0 |
| |
| |
| return aspect_ratio > 0.3 |
| |
| except: |
| pass |
| |
| return False |
| |
| def detect_stack_type(self, barcode_obj, image): |
| """Detect the type of multi-stack barcode""" |
| try: |
| if hasattr(barcode_obj, 'rect'): |
| x, y, w, h = barcode_obj.rect |
| aspect_ratio = h / w if w > 0 else 0 |
| |
| |
| if 'databar' in barcode_obj.type.lower(): |
| if aspect_ratio > 0.5: |
| return "Quad Stack" |
| elif aspect_ratio > 0.35: |
| return "Triple Stack" |
| elif aspect_ratio > 0.25: |
| return "Double Stack" |
| else: |
| return "Single Stack" |
| else: |
| |
| if aspect_ratio > 0.4: |
| return "Multi-Stack" |
| else: |
| return "Single Stack" |
| |
| except: |
| pass |
| |
| return "Unknown" |
| |
| def remove_duplicate_barcodes(self, barcodes): |
| """Remove duplicate barcodes based on position and data""" |
| unique_barcodes = [] |
| seen_positions = set() |
| seen_data = set() |
| |
| for barcode in barcodes: |
| |
| pos_signature = f"{barcode['rect'].left},{barcode['rect'].top},{barcode['rect'].width},{barcode['rect'].height}" |
| data_signature = barcode['data'] |
| |
| |
| if pos_signature not in seen_positions and data_signature not in seen_data: |
| unique_barcodes.append(barcode) |
| seen_positions.add(pos_signature) |
| seen_data.add(data_signature) |
| |
| return unique_barcodes |
| |
| def enhance_barcode_data(self, barcodes): |
| """Enhance barcode data with additional analysis""" |
| enhanced_barcodes = [] |
| |
| for barcode in barcodes: |
| |
| confidence = self.calculate_confidence(barcode) |
| barcode['confidence'] = confidence |
| |
| |
| if 'databar' in barcode['type'].lower(): |
| barcode['gs1_validated'] = self.validate_gs1_format(barcode['data']) |
| |
| enhanced_barcodes.append(barcode) |
| |
| return enhanced_barcodes |
| |
| def calculate_confidence(self, barcode): |
| """Calculate confidence score for barcode detection""" |
| confidence = 50 |
| |
| |
| method_scores = { |
| 'pyzbar_basic': 70, |
| 'pyzbar_enhanced': 70, |
| 'dynamsoft': 85, |
| 'enhanced_preprocessing_0': 65, |
| 'enhanced_preprocessing_1': 60, |
| 'enhanced_preprocessing_2': 55, |
| 'transform_0deg': 60, |
| 'transform_90deg': 50, |
| 'transform_180deg': 50, |
| 'transform_270deg': 50, |
| 'small_barcode_detection': 75, |
| 'high_res_2x': 70, |
| 'high_res_3x': 65, |
| 'high_res_4x': 60 |
| } |
| |
| if barcode.get('method') in method_scores: |
| confidence += method_scores[barcode['method']] |
| |
| |
| if barcode.get('quality', 0) > 0: |
| confidence += min(barcode['quality'], 20) |
| |
| |
| if 'databar' in barcode['type'].lower(): |
| confidence += 10 |
| |
| return min(confidence, 100) |
| |
| def validate_gs1_format(self, data): |
| """Validate GS1 format for DataBar data""" |
| try: |
| |
| ai_pattern = r'\[(\d{2,4})\]' |
| matches = re.findall(ai_pattern, data) |
| |
| if matches: |
| return True |
| |
| |
| ai_pattern_parens = r'\((\d{2,4})\)' |
| matches_parens = re.findall(ai_pattern_parens, data) |
| |
| return len(matches_parens) > 0 |
| |
| except: |
| return False |
| |
| def check_spelling(self, text): |
| """ |
| Robust EN/FR spell check: |
| - Unicode-aware tokens (keeps accents) |
| - Normalizes curly quotes/ligatures |
| - Heuristic per-token language (accented => FR; else EN) |
| - Flags if unknown in its likely language (not both) |
| """ |
| try: |
| |
| text = unicodedata.normalize("NFKC", text) |
| text = text.replace("'", "'").replace(""", '"').replace(""", '"') |
|
|
| |
| tokens = _re.findall(TOKEN_PATTERN, text, flags=_re.UNICODE if _USE_REGEX else 0) |
|
|
| issues = [] |
| for raw in tokens: |
| t = raw.lower() |
|
|
| |
| if len(t) < 3: |
| continue |
| if raw.isupper() and len(raw) <= 3: |
| continue |
| if t in DOMAIN_WHITELIST: |
| continue |
|
|
| miss_en = t in self.english_spellchecker.unknown([t]) |
| miss_fr = t in self.french_spellchecker.unknown([t]) |
|
|
| use_fr = _likely_french(raw) |
|
|
| |
| if (use_fr and miss_fr) or ((not use_fr) and miss_en) or (miss_en and miss_fr): |
| issues.append({ |
| "word": raw, |
| "lang": "fr" if use_fr else "en", |
| "suggestions_en": list(self.english_spellchecker.candidates(t))[:3], |
| "suggestions_fr": list(self.french_spellchecker.candidates(t))[:3], |
| }) |
|
|
| return issues |
| except Exception as e: |
| print(f"Error checking spelling: {e}") |
| return [] |
| |
| def compare_colors(self, image1, image2): |
| """Compare colors between two images and return differences using RGB color space""" |
| try: |
| print("Starting RGB color comparison...") |
| |
| |
| img1 = np.array(image1) |
| img2 = np.array(image2) |
| |
| print(f"Image 1 shape: {img1.shape}") |
| print(f"Image 2 shape: {img2.shape}") |
| |
| |
| height = min(img1.shape[0], img2.shape[0]) |
| width = min(img1.shape[1], img2.shape[1]) |
| |
| img1_resized = cv2.resize(img1, (width, height)) |
| img2_resized = cv2.resize(img2, (width, height)) |
| |
| print(f"Resized to: {width}x{height}") |
| |
| |
| img1_rgb = img1_resized |
| img2_rgb = img2_resized |
| |
| color_differences = [] |
| |
| |
| print("Method 1: Enhanced RGB channel comparison") |
| |
| |
| diff_r = cv2.absdiff(img1_rgb[:,:,0], img2_rgb[:,:,0]) |
| diff_g = cv2.absdiff(img1_rgb[:,:,1], img2_rgb[:,:,1]) |
| diff_b = cv2.absdiff(img1_rgb[:,:,2], img2_rgb[:,:,2]) |
| |
| |
| diff_combined = cv2.addWeighted(diff_r, 0.4, diff_g, 0.4, 0) |
| diff_combined = cv2.addWeighted(diff_combined, 1.0, diff_b, 0.2, 0) |
| |
| |
| diff_combined = cv2.GaussianBlur(diff_combined, (3, 3), 0) |
| |
| |
| rgb_thresholds = [15, 22, 30, 40] |
| |
| for threshold in rgb_thresholds: |
| _, thresh = cv2.threshold(diff_combined, threshold, 255, cv2.THRESH_BINARY) |
| |
| |
| kernel = np.ones((1, 1), np.uint8) |
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) |
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) |
| |
| |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| print(f"RGB Threshold {threshold}: Found {len(contours)} contours") |
| |
| for contour in contours: |
| area = cv2.contourArea(contour) |
| if area > 15: |
| x, y, w, h = cv2.boundingRect(contour) |
| |
| |
| color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| |
| |
| color_diff = np.linalg.norm(color1 - color2) |
| |
| |
| if color_diff > 18: |
| |
| already_covered = False |
| for existing_diff in color_differences: |
| if (abs(existing_diff['x'] - x) < 21 and |
| abs(existing_diff['y'] - y) < 21 and |
| abs(existing_diff['width'] - w) < 21 and |
| abs(existing_diff['height'] - h) < 21): |
| already_covered = True |
| break |
| |
| if not already_covered: |
| color_differences.append({ |
| 'x': x, |
| 'y': y, |
| 'width': w, |
| 'height': h, |
| 'area': area, |
| 'color1': color1.tolist(), |
| 'color2': color2.tolist(), |
| 'threshold': f"RGB_{threshold}", |
| 'color_diff': color_diff, |
| 'diff_r': float(abs(color1[0] - color2[0])), |
| 'diff_g': float(abs(color1[1] - color2[1])), |
| 'diff_b': float(abs(color1[2] - color2[2])) |
| }) |
| |
| |
| print("Method 2: Enhanced HSV color space comparison") |
| |
| |
| img1_hsv = cv2.cvtColor(img1_rgb, cv2.COLOR_RGB2HSV) |
| img2_hsv = cv2.cvtColor(img2_rgb, cv2.COLOR_RGB2HSV) |
| |
| |
| hue_diff = cv2.absdiff(img1_hsv[:,:,0], img2_hsv[:,:,0]) |
| sat_diff = cv2.absdiff(img1_hsv[:,:,1], img2_hsv[:,:,1]) |
| val_diff = cv2.absdiff(img1_hsv[:,:,2], img2_hsv[:,:,2]) |
| |
| |
| hsv_combined = cv2.addWeighted(hue_diff, 0.5, sat_diff, 0.3, 0) |
| hsv_combined = cv2.addWeighted(hsv_combined, 1.0, val_diff, 0.2, 0) |
| |
| |
| hsv_combined = cv2.GaussianBlur(hsv_combined, (3, 3), 0) |
| |
| |
| hsv_thresholds = [18, 25, 35, 45] |
| |
| for threshold in hsv_thresholds: |
| _, hsv_thresh = cv2.threshold(hsv_combined, threshold, 255, cv2.THRESH_BINARY) |
| |
| |
| kernel = np.ones((1, 1), np.uint8) |
| hsv_thresh = cv2.morphologyEx(hsv_thresh, cv2.MORPH_CLOSE, kernel) |
| hsv_thresh = cv2.morphologyEx(hsv_thresh, cv2.MORPH_OPEN, kernel) |
| |
| |
| hsv_contours, _ = cv2.findContours(hsv_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| print(f"HSV Threshold {threshold}: Found {len(hsv_contours)} contours") |
| |
| for contour in hsv_contours: |
| area = cv2.contourArea(contour) |
| if area > 15: |
| x, y, w, h = cv2.boundingRect(contour) |
| |
| |
| color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| |
| |
| color_diff = np.linalg.norm(color1 - color2) |
| |
| |
| if color_diff > 22: |
| |
| already_covered = False |
| for existing_diff in color_differences: |
| if (abs(existing_diff['x'] - x) < 21 and |
| abs(existing_diff['y'] - y) < 21 and |
| abs(existing_diff['width'] - w) < 21 and |
| abs(existing_diff['height'] - h) < 21): |
| already_covered = True |
| break |
| |
| if not already_covered: |
| color_differences.append({ |
| 'x': x, |
| 'y': y, |
| 'width': w, |
| 'height': h, |
| 'area': area, |
| 'color1': color1.tolist(), |
| 'color2': color2.tolist(), |
| 'threshold': f"HSV_{threshold}", |
| 'color_diff': color_diff, |
| 'diff_r': float(abs(color1[0] - color2[0])), |
| 'diff_g': float(abs(color1[1] - color2[1])), |
| 'diff_b': float(abs(color1[2] - color2[2])) |
| }) |
| |
| |
| print("Method 3: Enhanced pixel-by-pixel RGB comparison") |
| |
| |
| for y in range(0, height, 12): |
| for x in range(0, width, 12): |
| color1 = img1_rgb[y, x] |
| color2 = img2_rgb[y, x] |
| |
| |
| diff_r = abs(int(color1[0]) - int(color2[0])) |
| diff_g = abs(int(color1[1]) - int(color2[1])) |
| diff_b = abs(int(color1[2]) - int(color2[2])) |
| |
| |
| if diff_r > 10 or diff_g > 10 or diff_b > 10: |
| |
| already_covered = False |
| for existing_diff in color_differences: |
| if (abs(existing_diff['x'] - x) < 21 and |
| abs(existing_diff['y'] - y) < 21): |
| already_covered = True |
| break |
| |
| if not already_covered: |
| color_differences.append({ |
| 'x': x, |
| 'y': y, |
| 'width': 5, |
| 'height': 5, |
| 'area': 25, |
| 'color1': color1.tolist(), |
| 'color2': color2.tolist(), |
| 'threshold': 'pixel_RGB', |
| 'color_diff': diff_r + diff_g + diff_b, |
| 'diff_r': diff_r, |
| 'diff_g': diff_g, |
| 'diff_b': diff_b |
| }) |
| |
| print(f"RGB color comparison completed. Found {len(color_differences)} total differences.") |
| |
| |
| print("Method 4: LAB color space comparison") |
| |
| |
| img1_lab = cv2.cvtColor(img1_rgb, cv2.COLOR_RGB2LAB) |
| img2_lab = cv2.cvtColor(img2_rgb, cv2.COLOR_RGB2LAB) |
| |
| |
| lab_diff_l = cv2.absdiff(img1_lab[:,:,0], img2_lab[:,:,0]) |
| lab_diff_a = cv2.absdiff(img1_lab[:,:,1], img2_lab[:,:,1]) |
| lab_diff_b = cv2.absdiff(img1_lab[:,:,2], img2_lab[:,:,2]) |
| |
| |
| lab_combined = cv2.addWeighted(lab_diff_l, 0.3, lab_diff_a, 0.35, 0) |
| lab_combined = cv2.addWeighted(lab_combined, 1.0, lab_diff_b, 0.35, 0) |
| |
| |
| lab_combined = cv2.GaussianBlur(lab_combined, (3, 3), 0) |
| |
| |
| lab_thresholds = [20, 28, 38, 50] |
| |
| for threshold in lab_thresholds: |
| _, lab_thresh = cv2.threshold(lab_combined, threshold, 255, cv2.THRESH_BINARY) |
| |
| |
| kernel = np.ones((1, 1), np.uint8) |
| lab_thresh = cv2.morphologyEx(lab_thresh, cv2.MORPH_CLOSE, kernel) |
| lab_thresh = cv2.morphologyEx(lab_thresh, cv2.MORPH_OPEN, kernel) |
| |
| |
| lab_contours, _ = cv2.findContours(lab_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| print(f"LAB Threshold {threshold}: Found {len(lab_contours)} contours") |
| |
| for contour in lab_contours: |
| area = cv2.contourArea(contour) |
| if area > 15: |
| x, y, w, h = cv2.boundingRect(contour) |
| |
| |
| color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1)) |
| |
| |
| color_diff = np.linalg.norm(color1 - color2) |
| |
| |
| if color_diff > 22: |
| |
| already_covered = False |
| for existing_diff in color_differences: |
| if (abs(existing_diff['x'] - x) < 21 and |
| abs(existing_diff['y'] - y) < 21 and |
| abs(existing_diff['width'] - w) < 21 and |
| abs(existing_diff['height'] - h) < 21): |
| already_covered = True |
| break |
| |
| if not already_covered: |
| color_differences.append({ |
| 'x': x, |
| 'y': y, |
| 'width': w, |
| 'height': h, |
| 'area': area, |
| 'color1': color1.tolist(), |
| 'color2': color2.tolist(), |
| 'threshold': f"LAB_{threshold}", |
| 'color_diff': color_diff, |
| 'diff_r': float(abs(color1[0] - color2[0])), |
| 'diff_g': float(abs(color1[1] - color2[1])), |
| 'diff_b': float(abs(color1[2] - color2[2])) |
| }) |
| |
| print(f"Enhanced color comparison completed. Found {len(color_differences)} total differences.") |
| |
| |
| if color_differences: |
| grouped_differences = self.group_nearby_differences(color_differences) |
| print(f"Grouped into {len(grouped_differences)} perimeter boxes") |
| return grouped_differences |
| |
| return color_differences |
| |
| except Exception as e: |
| print(f"Error comparing colors: {str(e)}") |
| return [] |
| |
| def group_nearby_differences(self, differences): |
| """Group nearby differences into larger bounding boxes around affected areas""" |
| if not differences: |
| return [] |
| |
| |
| sorted_diffs = sorted(differences, key=lambda x: (x['y'], x['x'])) |
| |
| grouped_areas = [] |
| current_group = [] |
| |
| for diff in sorted_diffs: |
| if not current_group: |
| current_group = [diff] |
| else: |
| |
| should_group = False |
| for group_diff in current_group: |
| |
| center1_x = group_diff['x'] + group_diff['width'] // 2 |
| center1_y = group_diff['y'] + group_diff['height'] // 2 |
| center2_x = diff['x'] + diff['width'] // 2 |
| center2_y = diff['y'] + diff['height'] // 2 |
| |
| distance = ((center1_x - center2_x) ** 2 + (center1_y - center2_y) ** 2) ** 0.5 |
| |
| |
| if distance < 200: |
| should_group = True |
| break |
| |
| if should_group: |
| current_group.append(diff) |
| else: |
| |
| if current_group: |
| bounding_box = self.create_group_bounding_box(current_group) |
| if bounding_box: |
| grouped_areas.append(bounding_box) |
| current_group = [diff] |
| |
| |
| if current_group: |
| bounding_box = self.create_group_bounding_box(current_group) |
| if bounding_box: |
| grouped_areas.append(bounding_box) |
| |
| return grouped_areas |
| |
| def group_nearby_differences(self, differences): |
| """Group nearby differences into one perimeter box per issue area""" |
| if not differences: |
| return [] |
| |
| |
| sorted_diffs = sorted(differences, key=lambda x: (x['y'], x['x'])) |
| |
| grouped_areas = [] |
| current_group = [] |
| |
| for diff in sorted_diffs: |
| if not current_group: |
| current_group = [diff] |
| else: |
| |
| should_group = False |
| for group_diff in current_group: |
| |
| center1_x = group_diff['x'] + group_diff['width'] // 2 |
| center1_y = group_diff['y'] + group_diff['height'] // 2 |
| center2_x = diff['x'] + diff['width'] // 2 |
| center2_y = diff['y'] + diff['height'] // 2 |
| |
| distance = ((center1_x - center2_x) ** 2 + (center1_y - center2_y) ** 2) ** 0.5 |
| |
| |
| if distance < 234: |
| should_group = True |
| break |
| |
| if should_group: |
| current_group.append(diff) |
| else: |
| |
| if current_group: |
| perimeter_box = self.create_perimeter_box(current_group) |
| if perimeter_box: |
| grouped_areas.append(perimeter_box) |
| current_group = [diff] |
| |
| |
| if current_group: |
| perimeter_box = self.create_perimeter_box(current_group) |
| if perimeter_box: |
| grouped_areas.append(perimeter_box) |
| |
| return grouped_areas |
| |
| def create_perimeter_box(self, group): |
| """Create a perimeter box that encompasses all differences in a group""" |
| if not group: |
| return None |
| |
| |
| min_x = min(diff['x'] - 5 for diff in group) |
| min_y = min(diff['y'] - 5 for diff in group) |
| max_x = max(diff['x'] + diff['width'] + 5 for diff in group) |
| max_y = max(diff['y'] + diff['height'] + 5 for diff in group) |
| |
| |
| padding = 7 |
| min_x = max(0, min_x - padding) |
| min_y = max(0, min_y - padding) |
| max_x = max_x + padding |
| max_y = max_y + padding |
| |
| |
| width = max_x - min_x |
| height = max_y - min_y |
| |
| |
| if width < 26 or height < 26: |
| return None |
| |
| return { |
| 'x': min_x, |
| 'y': min_y, |
| 'width': width, |
| 'height': height, |
| 'area': width * height, |
| 'color1': [0, 0, 0], |
| 'color2': [0, 0, 0], |
| 'threshold': 'perimeter', |
| 'color_diff': 1.0, |
| 'num_original_differences': len(group) |
| } |
| |
| def create_annotated_image(self, image, differences, output_path): |
| """Create annotated image with red boxes around differences""" |
| try: |
| print(f"Creating annotated image: {output_path}") |
| print(f"Number of differences to annotate: {len(differences)}") |
| |
| |
| annotated_image = image.copy() |
| draw = ImageDraw.Draw(annotated_image) |
| |
| |
| for i, diff in enumerate(differences): |
| x, y, w, h = diff['x'], diff['y'], diff['width'], diff['height'] |
| |
| |
| draw.rectangle([x, y, x + w, y + h], outline='red', width=5) |
| |
| print(f"Drawing rectangle {i+1}: ({x}, {y}) to ({x+w}, {y+h})") |
| |
| |
| annotated_image.save(output_path) |
| print(f"Annotated image saved successfully: {output_path}") |
| |
| except Exception as e: |
| print(f"Error creating annotated image: {str(e)}") |
| |
| try: |
| image.save(output_path) |
| print(f"Saved original image as fallback: {output_path}") |
| except Exception as e2: |
| print(f"Failed to save fallback image: {str(e2)}") |
| |
| def compare_pdfs(self, pdf1_path, pdf2_path, session_id): |
| """Main comparison function with improved error handling""" |
| try: |
| print("Starting PDF comparison...") |
| start_time = time.time() |
| |
| |
| print("Validating PDF 1...") |
| if not self.validate_pdf(pdf1_path): |
| raise Exception("INVALID DOCUMENT") |
| |
| print("Validating PDF 2...") |
| if not self.validate_pdf(pdf2_path): |
| raise Exception("INVALID DOCUMENT") |
| |
| |
| print("Extracting text from PDF 1...") |
| pdf1_data = self.extract_text_from_pdf(pdf1_path) |
| if not pdf1_data: |
| raise Exception("INVALID DOCUMENT") |
| |
| print("Extracting text from PDF 2...") |
| pdf2_data = self.extract_text_from_pdf(pdf2_path) |
| if not pdf2_data: |
| raise Exception("INVALID DOCUMENT") |
| |
| |
| results = { |
| 'session_id': session_id, |
| 'validation': { |
| 'pdf1_valid': True, |
| 'pdf2_valid': True, |
| 'validation_text': '50 Carroll' |
| }, |
| 'text_comparison': [], |
| 'spelling_issues': [], |
| 'barcodes_qr_codes': [], |
| 'color_differences': [], |
| 'annotated_images': [] |
| } |
| |
| |
| print("Processing pages...") |
| for i, (page1, page2) in enumerate(zip(pdf1_data, pdf2_data)): |
| print(f"Processing page {i + 1}...") |
| page_results = { |
| 'page': i + 1, |
| 'text_differences': [], |
| 'spelling_issues_pdf1': [], |
| 'spelling_issues_pdf2': [], |
| 'barcodes_pdf1': [], |
| 'barcodes_pdf2': [], |
| 'color_differences': [] |
| } |
| |
| |
| print(f"Checking spelling for page {i + 1}...") |
| page_results['spelling_issues_pdf1'] = self.check_spelling(page1['text']) |
| page_results['spelling_issues_pdf2'] = self.check_spelling(page2['text']) |
| |
| |
| if page_results['spelling_issues_pdf1'] or page_results['spelling_issues_pdf2']: |
| page_results['text_differences'].append({ |
| "type": "spelling", |
| "pdf1": [i["word"] for i in page_results['spelling_issues_pdf1']], |
| "pdf2": [i["word"] for i in page_results['spelling_issues_pdf2']], |
| }) |
| |
| |
| spell_dir = f'static/results/{session_id}' |
| os.makedirs(spell_dir, exist_ok=True) |
|
|
| spell_img1 = page1['image'].copy() |
| spell_img2 = page2['image'].copy() |
| spell_img1 = self.annotate_spelling_errors_on_image(spell_img1, page_results['spelling_issues_pdf1']) |
| spell_img2 = self.annotate_spelling_errors_on_image(spell_img2, page_results['spelling_issues_pdf2']) |
|
|
| spell_path1 = f'{spell_dir}/page_{i+1}_pdf1_spelling.png' |
| spell_path2 = f'{spell_dir}/page_{i+1}_pdf2_spelling.png' |
| spell_img1.save(spell_path1) |
| spell_img2.save(spell_path2) |
|
|
| |
| page_results.setdefault('annotated_images', {}) |
| page_results['annotated_images'].update({ |
| 'pdf1_spelling': f'results/{session_id}/page_{i+1}_pdf1_spelling.png', |
| 'pdf2_spelling': f'results/{session_id}/page_{i+1}_pdf2_spelling.png', |
| }) |
| |
| |
| print(f"Detecting barcodes for page {i + 1} PDF 1...") |
| page_results['barcodes_pdf1'] = self.detect_barcodes_qr_codes(page1['image']) or [] |
| |
| print(f"Detecting barcodes for page {i + 1} PDF 2...") |
| page_results['barcodes_pdf2'] = self.detect_barcodes_qr_codes(page2['image']) or [] |
| |
| |
| print(f"Comparing colors for page {i + 1}...") |
| color_diffs = self.compare_colors(page1['image'], page2['image']) |
| page_results['color_differences'] = color_diffs |
| |
| |
| print(f"Creating images for page {i + 1}...") |
| output_dir = f'static/results/{session_id}' |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| original_path1 = f'{output_dir}/page_{i+1}_pdf1_original.png' |
| original_path2 = f'{output_dir}/page_{i+1}_pdf2_original.png' |
| |
| page1['image'].save(original_path1) |
| page2['image'].save(original_path2) |
| |
| |
| if color_diffs: |
| print(f"Creating annotated images for page {i + 1}...") |
| annotated_path1 = f'{output_dir}/page_{i+1}_pdf1_annotated.png' |
| annotated_path2 = f'{output_dir}/page_{i+1}_pdf2_annotated.png' |
| |
| self.create_annotated_image(page1['image'], color_diffs, annotated_path1) |
| self.create_annotated_image(page2['image'], color_diffs, annotated_path2) |
| |
| page_results['annotated_images'] = { |
| 'pdf1': f'results/{session_id}/page_{i+1}_pdf1_annotated.png', |
| 'pdf2': f'results/{session_id}/page_{i+1}_pdf2_annotated.png' |
| } |
| else: |
| |
| page_results['annotated_images'] = { |
| 'pdf1': f'results/{session_id}/page_{i+1}_pdf1_original.png', |
| 'pdf2': f'results/{session_id}/page_{i+1}_pdf2_original.png' |
| } |
| |
| results['text_comparison'].append(page_results) |
| |
| |
| print("Aggregating results...") |
| all_spelling_issues = [] |
| for page in results['text_comparison']: |
| all_spelling_issues.extend(page['spelling_issues_pdf1']) |
| all_spelling_issues.extend(page['spelling_issues_pdf2']) |
| |
| results['spelling_issues'] = all_spelling_issues |
| |
| |
| all_barcodes = [] |
| for page in results['text_comparison']: |
| all_barcodes.extend(page['barcodes_pdf1']) |
| all_barcodes.extend(page['barcodes_pdf2']) |
| |
| results['barcodes_qr_codes'] = all_barcodes |
| |
| elapsed_time = time.time() - start_time |
| print(f"PDF comparison completed in {elapsed_time:.2f} seconds.") |
| |
| return results |
| |
| except Exception as e: |
| print(f"Error in PDF comparison: {str(e)}") |
| raise Exception(f"INVALID DOCUMENT") |
| |
| |
|
|