ProofCheck / pdf_comparator.py
Yaz Hobooti
Update pdf_comparator.py: latest changes
fa64916
raw
history blame
86.8 kB
import os
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import pytesseract
from pdf2image import convert_from_path
from pyzbar.pyzbar import decode
from spellchecker import SpellChecker
import nltk
from skimage.metrics import structural_similarity as ssim
from skimage import color
import json
import tempfile
import shutil
import re
import time
import signal
import unicodedata
# Safe import for regex with fallback
try:
import regex as _re
_USE_REGEX = True
except ImportError:
import re as _re
_USE_REGEX = False
TOKEN_PATTERN = r"(?:\p{L})(?:[\p{L}'-]{1,})" if _USE_REGEX else r"[A-Za-z][A-Za-z'-]{1,}"
# Domain whitelist for spell checking
DOMAIN_WHITELIST = {
# units / abbreviations
"mg", "mg/g", "ml", "g", "thc", "cbd", "tcm", "mct",
# common packaging terms / bilingual words you expect
"gouttes", "tennir", "net", "zoom", "tytann", "dome", "drops",
# brand or proper names you want to ignore completely
"purified", "brands", "tytann", "dome", "drops",
}
# lowercase everything in whitelist for comparisons
DOMAIN_WHITELIST = {w.lower() for w in DOMAIN_WHITELIST}
def _likely_french(token: str) -> bool:
"""Helper: quick language guess per token"""
if _USE_REGEX:
# any Latin letter outside ASCII => probably FR (é, è, ç…)
return bool(_re.search(r"[\p{Letter}&&\p{Latin}&&[^A-Za-z]]", token))
# fallback: any non-ascii letter
return any((not ('a' <= c.lower() <= 'z')) and c.isalpha() for c in token)
# Try to import additional barcode libraries
try:
import zxing
ZXING_AVAILABLE = True
except ImportError:
ZXING_AVAILABLE = False
print("zxing-cpp not available, using pyzbar only")
try:
from dbr import BarcodeReader
DBR_AVAILABLE = True
print("Dynamsoft Barcode Reader available")
except ImportError:
DBR_AVAILABLE = False
print("Dynamsoft Barcode Reader not available")
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
class PDFComparator:
def __init__(self):
# Initialize spell checkers for English and French
self.english_spellchecker = SpellChecker(language='en')
self.french_spellchecker = SpellChecker(language='fr')
# Add domain whitelist words to spell checkers
for w in DOMAIN_WHITELIST:
self.english_spellchecker.word_frequency.add(w)
self.french_spellchecker.word_frequency.add(w)
# Download required NLTK data
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
def safe_execute(self, func, *args, timeout=30, **kwargs):
"""Execute a function with timeout protection"""
try:
# Set timeout signal
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
# Execute function
result = func(*args, **kwargs)
# Cancel timeout
signal.alarm(0)
return result
except TimeoutError:
print(f"Function {func.__name__} timed out after {timeout} seconds")
return None
except Exception as e:
print(f"Error in {func.__name__}: {str(e)}")
return None
finally:
signal.alarm(0)
def validate_pdf(self, pdf_path):
"""Validate that PDF contains '50 Carroll' using enhanced OCR for tiny fonts"""
try:
print(f"Validating PDF: {pdf_path}")
# Try multiple DPI settings for better tiny font detection
dpi_settings = [300, 400, 600, 800]
for dpi in dpi_settings:
print(f"Trying DPI {dpi} for tiny font detection...")
# Convert PDF to images with current DPI
images = convert_from_path(pdf_path, dpi=dpi)
print(f"Converted PDF to {len(images)} images at {dpi} DPI")
for page_num, image in enumerate(images):
print(f"Processing page {page_num + 1} at {dpi} DPI...")
# Convert PIL image to OpenCV format
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Enhanced preprocessing for tiny fonts
processed_image = self.enhance_image_for_tiny_fonts(opencv_image)
# Try multiple OCR configurations
ocr_configs = [
'--oem 3 --psm 6', # Assume uniform block of text
'--oem 3 --psm 8', # Single word
'--oem 3 --psm 13', # Raw line
'--oem 1 --psm 6', # Legacy engine
'--oem 3 --psm 3', # Fully automatic page segmentation
]
for config in ocr_configs:
try:
# Perform OCR with current configuration
text = pytesseract.image_to_string(processed_image, config=config)
# Debug: Show first 300 characters of extracted text
debug_text = text[:300].replace('\n', ' ').replace('\r', ' ')
print(f"Page {page_num + 1} text (DPI {dpi}, config: {config}): '{debug_text}...'")
# Check for "50 Carroll" with various patterns
patterns = ["50 Carroll", "50 carroll", "50Carroll", "50carroll", "50 Carroll", "50 carroll"]
for pattern in patterns:
if pattern in text or pattern.lower() in text.lower():
print(f"Found '{pattern}' in page {page_num + 1} (DPI {dpi}, config: {config})")
return True
except Exception as ocr_error:
print(f"OCR error with config {config}: {str(ocr_error)}")
continue
print("Validation failed: '50 Carroll' not found in any page with any DPI or OCR config")
return False
except Exception as e:
print(f"Error validating PDF: {str(e)}")
raise Exception(f"Error validating PDF: {str(e)}")
def enhance_image_for_tiny_fonts(self, image):
"""Enhance image specifically for tiny font OCR"""
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Apply bilateral filter to reduce noise while preserving edges
denoised = cv2.bilateralFilter(enhanced, 9, 75, 75)
# Apply unsharp masking to enhance edges
gaussian = cv2.GaussianBlur(denoised, (0, 0), 2.0)
unsharp_mask = cv2.addWeighted(denoised, 1.5, gaussian, -0.5, 0)
# Apply adaptive thresholding
thresh = cv2.adaptiveThreshold(unsharp_mask, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
# Apply morphological operations to clean up
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1))
cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
return cleaned
except Exception as e:
print(f"Error enhancing image for tiny fonts: {str(e)}")
return image
def extract_text_from_pdf(self, pdf_path):
"""Extract text from PDF with multi-color text detection."""
try:
# Try to extract embedded text first
embedded_text = ""
try:
import fitz # PyMuPDF
doc = fitz.open(pdf_path)
all_text = []
any_text = False
for i, page in enumerate(doc):
t = page.get_text()
any_text |= bool(t.strip())
all_text.append({"page": i+1, "text": t, "image": None})
doc.close()
if any_text:
# render images for color diff/barcode only when needed
images = convert_from_path(pdf_path, dpi=600)
for d, im in zip(all_text, images):
d["image"] = im
return all_text
except Exception:
pass
# Enhanced OCR path with multi-color text detection
print("Extracting text with multi-color detection...")
images = convert_from_path(pdf_path, dpi=600)
all_text = []
for page_num, image in enumerate(images):
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Multi-color text extraction
combined_text = self.extract_multi_color_text(opencv_image)
all_text.append({
'page': page_num + 1,
'text': combined_text,
'image': image
})
return all_text
except Exception as e:
raise Exception(f"Error extracting text from PDF: {str(e)}")
def extract_multi_color_text(self, image):
"""Extract text from image in various colors using multiple preprocessing methods."""
try:
combined_text = ""
# Method 1: Standard black text detection
print("Method 1: Standard black text detection")
processed_image = self.enhance_image_for_tiny_fonts(image)
text1 = self.ocr_with_multiple_configs(processed_image)
combined_text += text1 + " "
# Method 2: Inverted text detection (for white text on dark background)
print("Method 2: Inverted text detection")
inverted_image = self.create_inverted_image(image)
text2 = self.ocr_with_multiple_configs(inverted_image)
combined_text += text2 + " "
# Method 3: Color channel separation for colored text
print("Method 3: Color channel separation")
for channel_name, channel_image in self.extract_color_channels(image):
text3 = self.ocr_with_multiple_configs(channel_image)
combined_text += text3 + " "
# Method 4: Edge-based text detection
print("Method 4: Edge-based text detection")
edge_image = self.create_edge_enhanced_image(image)
text4 = self.ocr_with_multiple_configs(edge_image)
combined_text += text4 + " "
return combined_text.strip()
except Exception as e:
print(f"Error in multi-color text extraction: {str(e)}")
return ""
def create_inverted_image(self, image):
"""Create inverted image for white text detection."""
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Invert the image
inverted = cv2.bitwise_not(gray)
# Apply CLAHE for better contrast
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(inverted)
# Apply thresholding
_, thresh = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return thresh
except Exception as e:
print(f"Error creating inverted image: {str(e)}")
return image
def extract_color_channels(self, image):
"""Extract individual color channels for colored text detection."""
try:
channels = []
# Convert to different color spaces
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
# Extract individual channels
b, g, r = cv2.split(image)
h, s, v = cv2.split(hsv)
l, a, b_lab = cv2.split(lab)
# Create channel images for OCR
channel_images = [
("blue", b),
("green", g),
("red", r),
("hue", h),
("saturation", s),
("value", v),
("lightness", l)
]
for name, channel in channel_images:
# Apply thresholding to each channel
_, thresh = cv2.threshold(channel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
channels.append((name, thresh))
return channels
except Exception as e:
print(f"Error extracting color channels: {str(e)}")
return []
def create_edge_enhanced_image(self, image):
"""Create edge-enhanced image for text detection."""
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply edge detection
edges = cv2.Canny(gray, 50, 150)
# Dilate edges to connect text components
kernel = np.ones((2, 2), np.uint8)
dilated = cv2.dilate(edges, kernel, iterations=1)
# Invert to get white text on black background
inverted = cv2.bitwise_not(dilated)
return inverted
except Exception as e:
print(f"Error creating edge-enhanced image: {str(e)}")
return image
def ocr_with_multiple_configs(self, image):
"""Perform OCR with multiple configurations."""
try:
ocr_configs = [
'--oem 3 --psm 6', # Assume uniform block of text
'--oem 3 --psm 8', # Single word
'--oem 3 --psm 13', # Raw line
'--oem 1 --psm 6', # Legacy engine
]
best_text = ""
for config in ocr_configs:
try:
text = pytesseract.image_to_string(image, config=config)
if len(text.strip()) > len(best_text.strip()):
best_text = text
except Exception as ocr_error:
print(f"OCR error with config {config}: {str(ocr_error)}")
continue
return best_text
except Exception as e:
print(f"Error in OCR with multiple configs: {str(e)}")
return ""
def annotate_spelling_errors_on_image(self, pil_image, misspelled):
"""
Draw one red rectangle around each misspelled token using Tesseract word boxes.
'misspelled' must be a list of dicts with 'word' keys (from check_spelling).
"""
if not misspelled:
return pil_image
def _norm(s: str) -> str:
return unicodedata.normalize("NFKC", s).replace("'","'").strip(".,:;!?)(").lower()
# build a quick lookup of misspelled lowercase words
miss_set = {_norm(m["word"]) for m in misspelled}
# run word-level OCR to get boxes
img = pil_image
try:
data = pytesseract.image_to_data(
img,
lang="eng+fra",
config="--oem 3 --psm 6",
output_type=pytesseract.Output.DICT,
)
except Exception as e:
print("image_to_data failed:", e)
return img
draw = ImageDraw.Draw(img)
n = len(data.get("text", []))
for i in range(n):
word = (data["text"][i] or "").strip()
if not word:
continue
clean = _norm(word)
if clean and clean in miss_set:
x, y, w, h = data["left"][i], data["top"][i], data["width"][i], data["height"][i]
# draw a distinct box for this one word
draw.rectangle([x, y, x + w, y + h], outline="red", width=4)
return img
def detect_barcodes_qr_codes(self, image):
"""Detect and decode barcodes and QR codes with timeout protection"""
try:
print("Starting barcode detection...")
start_time = time.time()
# Convert PIL image to OpenCV format
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
all_barcodes = []
# Method 1: Basic pyzbar detection (fastest)
print("Method 1: Basic pyzbar detection")
pyzbar_results = self.detect_with_pyzbar_basic(opencv_image)
if pyzbar_results:
all_barcodes.extend(pyzbar_results)
print(f"Found {len(pyzbar_results)} barcodes with basic pyzbar")
# Method 2: Dynamsoft Barcode Reader (if available)
if DBR_AVAILABLE:
print("Method 2: Dynamsoft Barcode Reader")
dbr_results = self.detect_with_dynamsoft(opencv_image)
if dbr_results:
all_barcodes.extend(dbr_results)
print(f"Found {len(dbr_results)} barcodes with Dynamsoft")
# Method 3: Enhanced preprocessing (always run for better detection)
print("Method 3: Enhanced preprocessing")
enhanced_results = self.detect_with_enhanced_preprocessing(opencv_image)
if enhanced_results:
all_barcodes.extend(enhanced_results)
print(f"Found {len(enhanced_results)} additional barcodes with enhanced preprocessing")
# Method 4: Small barcode detection (always run for better detection)
print("Method 4: Small barcode detection")
small_results = self.detect_small_barcodes_simple(opencv_image)
if small_results:
all_barcodes.extend(small_results)
print(f"Found {len(small_results)} additional small barcodes")
# Remove duplicates
unique_barcodes = self.remove_duplicate_barcodes(all_barcodes)
# Enhance results
enhanced_barcodes = self.enhance_barcode_data(unique_barcodes)
elapsed_time = time.time() - start_time
print(f"Barcode detection completed in {elapsed_time:.2f} seconds. Found {len(enhanced_barcodes)} unique barcodes.")
return enhanced_barcodes
except Exception as e:
print(f"Error in barcode detection: {str(e)}")
return []
def detect_with_pyzbar_basic(self, image):
"""Basic pyzbar detection without complex preprocessing"""
results = []
try:
# Simple grayscale conversion
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Try original image
decoded_objects = decode(gray)
for obj in decoded_objects:
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': obj.rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': 'pyzbar_basic'
}
if 'databar' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
results.append(barcode_info)
# Try with simple contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
decoded_objects = decode(enhanced)
for obj in decoded_objects:
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': obj.rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': 'pyzbar_enhanced'
}
if 'databar' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
results.append(barcode_info)
except Exception as e:
print(f"Error in basic pyzbar detection: {str(e)}")
return results
def detect_with_dynamsoft(self, image):
"""Detect barcodes using Dynamsoft Barcode Reader"""
results = []
try:
if not DBR_AVAILABLE:
return results
# Initialize Dynamsoft Barcode Reader
reader = BarcodeReader()
# Convert OpenCV image to bytes for Dynamsoft
success, buffer = cv2.imencode('.png', image)
if not success:
print("Failed to encode image for Dynamsoft")
return results
image_bytes = buffer.tobytes()
# Decode barcodes
text_results = reader.decode_file_stream(image_bytes)
for result in text_results:
barcode_info = {
'type': result.barcode_format_string,
'data': result.barcode_text,
'rect': type('Rect', (), {
'left': result.localization_result.x1,
'top': result.localization_result.y1,
'width': result.localization_result.x2 - result.localization_result.x1,
'height': result.localization_result.y2 - result.localization_result.y1
})(),
'polygon': [
(result.localization_result.x1, result.localization_result.y1),
(result.localization_result.x2, result.localization_result.y1),
(result.localization_result.x2, result.localization_result.y2),
(result.localization_result.x1, result.localization_result.y2)
],
'quality': result.confidence,
'orientation': self.detect_barcode_orientation(result),
'method': 'dynamsoft'
}
# Enhanced DataBar Expanded detection
if 'databar' in result.barcode_format_string.lower() or 'expanded' in result.barcode_format_string.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(result.barcode_text)
results.append(barcode_info)
print(f"Dynamsoft detected {len(results)} barcodes")
except Exception as e:
print(f"Error in Dynamsoft detection: {str(e)}")
return results
def detect_with_enhanced_preprocessing(self, image):
"""Enhanced preprocessing with limited methods"""
results = []
try:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Limited preprocessing methods
processed_images = [
gray, # Original
cv2.resize(gray, (gray.shape[1] * 3, gray.shape[0] * 3), interpolation=cv2.INTER_CUBIC), # 3x scale
cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2), # Adaptive threshold
]
for i, processed_image in enumerate(processed_images):
try:
decoded_objects = decode(processed_image)
for obj in decoded_objects:
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': obj.rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': f'enhanced_preprocessing_{i}'
}
if 'databar' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
results.append(barcode_info)
except Exception as e:
print(f"Error in enhanced preprocessing method {i}: {str(e)}")
continue
except Exception as e:
print(f"Error in enhanced preprocessing: {str(e)}")
return results
def detect_small_barcodes_simple(self, image):
"""Simplified small barcode detection"""
results = []
try:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Only try 3x and 4x scaling
scale_factors = [3.0, 4.0]
for scale in scale_factors:
try:
height, width = gray.shape
new_height, new_width = int(height * scale), int(width * scale)
scaled = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
decoded_objects = decode(scaled)
for obj in decoded_objects:
# Scale back coordinates
scale_factor = width / new_width
scaled_rect = type('Rect', (), {
'left': int(obj.rect.left * scale_factor),
'top': int(obj.rect.top * scale_factor),
'width': int(obj.rect.width * scale_factor),
'height': int(obj.rect.height * scale_factor)
})()
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': scaled_rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': f'small_barcode_{scale}x',
'size_category': 'small'
}
if 'databar' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
results.append(barcode_info)
except Exception as e:
print(f"Error in small barcode detection at {scale}x: {str(e)}")
continue
except Exception as e:
print(f"Error in small barcode detection: {str(e)}")
return results
def preprocess_image_for_ocr(self, image):
"""Preprocess image for better OCR results"""
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply different preprocessing techniques
# 1. Resize image to improve small text recognition
height, width = gray.shape
scale_factor = 3.0 # Scale up for better small font recognition
new_height, new_width = int(height * scale_factor), int(width * scale_factor)
resized = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# 2. Apply Gaussian blur to reduce noise
blurred = cv2.GaussianBlur(resized, (1, 1), 0)
# 3. Apply adaptive thresholding for better text separation
thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
# 4. Apply morphological operations to clean up text
kernel = np.ones((1, 1), np.uint8)
cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
# 5. Apply contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(cleaned)
return enhanced
except Exception as e:
print(f"Error preprocessing image: {str(e)}")
return image # Return original if preprocessing fails
def preprocess_for_barcode_detection(self, image):
"""Preprocess image with multiple techniques for better barcode detection"""
processed_images = [image] # Start with original
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
processed_images.append(gray)
# Apply different preprocessing techniques
# 1. Contrast enhancement
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
processed_images.append(enhanced)
# 2. Gaussian blur for noise reduction
blurred = cv2.GaussianBlur(gray, (3, 3), 0)
processed_images.append(blurred)
# 3. Adaptive thresholding
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
processed_images.append(thresh)
# 4. Edge enhancement for better barcode detection
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(gray, -1, kernel)
processed_images.append(sharpened)
# 5. Scale up for small barcodes
height, width = gray.shape
scale_factor = 3.0
new_height, new_width = int(height * scale_factor), int(width * scale_factor)
scaled = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
processed_images.append(scaled)
except Exception as e:
print(f"Error in barcode preprocessing: {str(e)}")
return processed_images
def preprocess_for_databar(self, gray_image):
"""Specialized preprocessing for DataBar Expanded Stacked barcodes"""
processed_images = []
try:
# Original grayscale
processed_images.append(gray_image)
# 1. High contrast enhancement for DataBar
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray_image)
processed_images.append(enhanced)
# 2. Bilateral filter to preserve edges while reducing noise
bilateral = cv2.bilateralFilter(gray_image, 9, 75, 75)
processed_images.append(bilateral)
# 3. Adaptive thresholding with different parameters
thresh1 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 15, 2)
processed_images.append(thresh1)
thresh2 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
processed_images.append(thresh2)
# 4. Scale up for better DataBar detection
height, width = gray_image.shape
scale_factors = [2.0, 3.0, 4.0]
for scale in scale_factors:
new_height, new_width = int(height * scale), int(width * scale)
scaled = cv2.resize(gray_image, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
processed_images.append(scaled)
# 5. Edge enhancement specifically for DataBar
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(gray_image, -1, kernel)
processed_images.append(sharpened)
# 6. Morphological operations for DataBar
kernel = np.ones((2, 2), np.uint8)
morphed = cv2.morphologyEx(gray_image, cv2.MORPH_CLOSE, kernel)
processed_images.append(morphed)
except Exception as e:
print(f"Error in DataBar preprocessing: {str(e)}")
return processed_images
def detect_with_transformations(self, image):
"""Detect barcodes using multiple image transformations"""
results = []
try:
# Try different rotations
angles = [0, 90, 180, 270]
for angle in angles:
if angle == 0:
rotated_image = image
else:
height, width = image.shape[:2]
center = (width // 2, height // 2)
rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height))
# Try to detect barcodes in rotated image
try:
decoded_objects = decode(rotated_image)
for obj in decoded_objects:
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': obj.rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': f"{angle}°",
'method': f'transform_{angle}deg'
}
# Enhanced DataBar Expanded detection
if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
# Check for multi-stack barcodes
if self.is_multi_stack_barcode(obj, rotated_image):
barcode_info['stack_type'] = self.detect_stack_type(obj, rotated_image)
results.append(barcode_info)
except Exception as e:
print(f"Error in transformation detection at {angle}°: {str(e)}")
continue
except Exception as e:
print(f"Error in transformation detection: {str(e)}")
return results
def detect_small_barcodes(self, image):
"""Specialized detection for small barcodes and QR codes"""
results = []
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply specialized preprocessing for small barcodes
processed_images = self.preprocess_for_small_barcodes(gray)
for processed_image in processed_images:
try:
decoded_objects = decode(processed_image)
for obj in decoded_objects:
# Check if this is a small barcode (less than 50x50 pixels)
if obj.rect.width < 50 or obj.rect.height < 50:
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': obj.rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': 'small_barcode_detection',
'size_category': 'small'
}
# Enhanced DataBar Expanded detection
if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
# Check for multi-stack barcodes
if self.is_multi_stack_barcode(obj, image):
barcode_info['stack_type'] = self.detect_stack_type(obj, image)
results.append(barcode_info)
except Exception as e:
print(f"Error in small barcode detection: {str(e)}")
continue
except Exception as e:
print(f"Error in small barcode preprocessing: {str(e)}")
return results
def preprocess_for_small_barcodes(self, gray_image):
"""Specialized preprocessing for small barcodes and QR codes"""
processed_images = []
try:
# Original grayscale
processed_images.append(gray_image)
# 1. Multiple high-resolution scaling for small barcodes
height, width = gray_image.shape
scale_factors = [4.0, 5.0, 6.0, 8.0] # Higher scaling for small barcodes
for scale in scale_factors:
new_height, new_width = int(height * scale), int(width * scale)
scaled = cv2.resize(gray_image, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
processed_images.append(scaled)
# 2. Aggressive contrast enhancement
clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray_image)
processed_images.append(enhanced)
# 3. Unsharp masking for edge enhancement
gaussian = cv2.GaussianBlur(gray_image, (0, 0), 2.0)
unsharp = cv2.addWeighted(gray_image, 1.5, gaussian, -0.5, 0)
processed_images.append(unsharp)
# 4. Multiple thresholding methods
# Otsu's thresholding
_, otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
processed_images.append(otsu)
# Adaptive thresholding with different parameters
adaptive1 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 9, 2)
processed_images.append(adaptive1)
adaptive2 = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 7, 2)
processed_images.append(adaptive2)
# 5. Noise reduction with different methods
# Bilateral filter
bilateral = cv2.bilateralFilter(gray_image, 9, 75, 75)
processed_images.append(bilateral)
# Median filter
median = cv2.medianBlur(gray_image, 3)
processed_images.append(median)
# 6. Edge detection and enhancement
# Sobel edge detection
sobel_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3)
sobel = np.sqrt(sobel_x**2 + sobel_y**2)
sobel = np.uint8(sobel * 255 / sobel.max())
processed_images.append(sobel)
# 7. Morphological operations for small barcode cleanup
kernel = np.ones((2, 2), np.uint8)
morphed_close = cv2.morphologyEx(gray_image, cv2.MORPH_CLOSE, kernel)
processed_images.append(morphed_close)
kernel_open = np.ones((1, 1), np.uint8)
morphed_open = cv2.morphologyEx(gray_image, cv2.MORPH_OPEN, kernel_open)
processed_images.append(morphed_open)
except Exception as e:
print(f"Error in small barcode preprocessing: {str(e)}")
return processed_images
def detect_with_high_resolution(self, image):
"""Detect barcodes using high-resolution processing"""
results = []
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Process at multiple high resolutions
height, width = gray.shape
resolutions = [
(int(width * 3), int(height * 3)), # 3x resolution
(int(width * 4), int(height * 4)), # 4x resolution
(int(width * 6), int(height * 6)) # 6x resolution
]
for new_width, new_height in resolutions:
try:
# Resize with high-quality interpolation
resized = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# Apply high-resolution preprocessing
processed = self.preprocess_high_resolution(resized)
# Try to detect barcodes
decoded_objects = decode(processed)
for obj in decoded_objects:
# Scale back the coordinates to original image size
scale_factor = width / new_width
scaled_rect = type('Rect', (), {
'left': int(obj.rect.left * scale_factor),
'top': int(obj.rect.top * scale_factor),
'width': int(obj.rect.width * scale_factor),
'height': int(obj.rect.height * scale_factor)
})()
barcode_info = {
'type': obj.type,
'data': obj.data.decode('utf-8', errors='ignore'),
'rect': scaled_rect,
'polygon': obj.polygon,
'quality': getattr(obj, 'quality', 0),
'orientation': self.detect_barcode_orientation(obj),
'method': f'high_res_{new_width}x{new_height}',
'resolution': f'{new_width}x{new_height}'
}
# Enhanced DataBar Expanded detection
if 'databar' in obj.type.lower() or 'expanded' in obj.type.lower():
barcode_info['expanded_data'] = self.parse_databar_expanded(obj.data.decode('utf-8', errors='ignore'))
# Check for multi-stack barcodes
if self.is_multi_stack_barcode(obj, image):
barcode_info['stack_type'] = self.detect_stack_type(obj, image)
results.append(barcode_info)
except Exception as e:
print(f"Error in high-resolution detection at {new_width}x{new_height}: {str(e)}")
continue
except Exception as e:
print(f"Error in high-resolution detection: {str(e)}")
return results
def preprocess_high_resolution(self, image):
"""Preprocessing optimized for high-resolution images"""
try:
# 1. High-quality noise reduction
denoised = cv2.fastNlMeansDenoising(image)
# 2. Advanced contrast enhancement
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
# 3. Edge-preserving smoothing
bilateral = cv2.bilateralFilter(enhanced, 9, 75, 75)
# 4. Sharpening
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(bilateral, -1, kernel)
# 5. Adaptive thresholding for high-res
thresh = cv2.adaptiveThreshold(sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
return thresh
except Exception as e:
print(f"Error in high-resolution preprocessing: {str(e)}")
return image
def detect_barcode_orientation(self, barcode_obj):
"""Detect the orientation of the barcode"""
try:
if hasattr(barcode_obj, 'polygon') and len(barcode_obj.polygon) >= 4:
# Calculate orientation based on polygon points
points = np.array(barcode_obj.polygon)
# Calculate the angle of the longest edge
edges = []
for i in range(4):
p1 = points[i]
p2 = points[(i + 1) % 4]
edge_length = np.linalg.norm(p2 - p1)
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) * 180 / np.pi
edges.append((edge_length, angle))
# Find the longest edge (likely the main barcode direction)
longest_edge = max(edges, key=lambda x: x[0])
return f"{longest_edge[1]:.1f}°"
return "Unknown"
except:
return "Unknown"
def parse_databar_expanded(self, data):
"""Parse DataBar Expanded barcode data"""
try:
# DataBar Expanded can contain multiple data fields
# Format: [01]12345678901234[3101]123[3102]456
parsed_data = {}
# Extract GS1 Application Identifiers
ai_pattern = r'\[(\d{2,4})\]([^\[]+)'
matches = re.findall(ai_pattern, data)
for ai, value in matches:
parsed_data[f"AI {ai}"] = value
# If no AI pattern found, return original data
if not parsed_data:
parsed_data["Raw Data"] = data
return parsed_data
except Exception as e:
return {"Raw Data": data, "Parse Error": str(e)}
def is_multi_stack_barcode(self, barcode_obj, image):
"""Detect if this is a multi-stack barcode"""
try:
if hasattr(barcode_obj, 'rect'):
x, y, w, h = barcode_obj.rect
# Check if the barcode is unusually tall (indicating stacked format)
aspect_ratio = h / w if w > 0 else 0
# DataBar Expanded and other stacked barcodes typically have aspect ratios > 0.3
return aspect_ratio > 0.3
except:
pass
return False
def detect_stack_type(self, barcode_obj, image):
"""Detect the type of multi-stack barcode"""
try:
if hasattr(barcode_obj, 'rect'):
x, y, w, h = barcode_obj.rect
aspect_ratio = h / w if w > 0 else 0
# Classify based on aspect ratio and barcode type
if 'databar' in barcode_obj.type.lower():
if aspect_ratio > 0.5:
return "Quad Stack"
elif aspect_ratio > 0.35:
return "Triple Stack"
elif aspect_ratio > 0.25:
return "Double Stack"
else:
return "Single Stack"
else:
# For other barcode types
if aspect_ratio > 0.4:
return "Multi-Stack"
else:
return "Single Stack"
except:
pass
return "Unknown"
def remove_duplicate_barcodes(self, barcodes):
"""Remove duplicate barcodes based on position and data"""
unique_barcodes = []
seen_positions = set()
seen_data = set()
for barcode in barcodes:
# Create position signature
pos_signature = f"{barcode['rect'].left},{barcode['rect'].top},{barcode['rect'].width},{barcode['rect'].height}"
data_signature = barcode['data']
# Check if we've seen this position or data before
if pos_signature not in seen_positions and data_signature not in seen_data:
unique_barcodes.append(barcode)
seen_positions.add(pos_signature)
seen_data.add(data_signature)
return unique_barcodes
def enhance_barcode_data(self, barcodes):
"""Enhance barcode data with additional analysis"""
enhanced_barcodes = []
for barcode in barcodes:
# Add confidence score based on method and quality
confidence = self.calculate_confidence(barcode)
barcode['confidence'] = confidence
# Add GS1 validation for DataBar
if 'databar' in barcode['type'].lower():
barcode['gs1_validated'] = self.validate_gs1_format(barcode['data'])
enhanced_barcodes.append(barcode)
return enhanced_barcodes
def calculate_confidence(self, barcode):
"""Calculate confidence score for barcode detection"""
confidence = 50 # Base confidence
# Method confidence
method_scores = {
'pyzbar_basic': 70,
'pyzbar_enhanced': 70,
'dynamsoft': 85, # Dynamsoft typically has higher accuracy
'enhanced_preprocessing_0': 65,
'enhanced_preprocessing_1': 60,
'enhanced_preprocessing_2': 55,
'transform_0deg': 60,
'transform_90deg': 50,
'transform_180deg': 50,
'transform_270deg': 50,
'small_barcode_detection': 75,
'high_res_2x': 70,
'high_res_3x': 65,
'high_res_4x': 60
}
if barcode.get('method') in method_scores:
confidence += method_scores[barcode['method']]
# Quality score
if barcode.get('quality', 0) > 0:
confidence += min(barcode['quality'], 20)
# DataBar specific confidence
if 'databar' in barcode['type'].lower():
confidence += 10
return min(confidence, 100)
def validate_gs1_format(self, data):
"""Validate GS1 format for DataBar data"""
try:
# Check for GS1 Application Identifiers
ai_pattern = r'\[(\d{2,4})\]'
matches = re.findall(ai_pattern, data)
if matches:
return True
# Check for parentheses format
ai_pattern_parens = r'\((\d{2,4})\)'
matches_parens = re.findall(ai_pattern_parens, data)
return len(matches_parens) > 0
except:
return False
def check_spelling(self, text):
"""
Robust EN/FR spell check:
- Unicode-aware tokens (keeps accents)
- Normalizes curly quotes/ligatures
- Heuristic per-token language (accented => FR; else EN)
- Flags if unknown in its likely language (not both)
"""
try:
# normalize ligatures & curly quotes
text = unicodedata.normalize("NFKC", text)
text = text.replace("'", "'").replace(""", '"').replace(""", '"')
# unicode letters with internal ' or - allowed
tokens = _re.findall(TOKEN_PATTERN, text, flags=_re.UNICODE if _USE_REGEX else 0)
issues = []
for raw in tokens:
t = raw.lower()
# skip very short, short ALL-CAPS acronyms, and whitelisted terms
if len(t) < 3:
continue
if raw.isupper() and len(raw) <= 3:
continue
if t in DOMAIN_WHITELIST:
continue
miss_en = t in self.english_spellchecker.unknown([t])
miss_fr = t in self.french_spellchecker.unknown([t])
use_fr = _likely_french(raw)
# Prefer the likely language, but fall back to "either language unknown"
if (use_fr and miss_fr) or ((not use_fr) and miss_en) or (miss_en and miss_fr):
issues.append({
"word": raw,
"lang": "fr" if use_fr else "en",
"suggestions_en": list(self.english_spellchecker.candidates(t))[:3],
"suggestions_fr": list(self.french_spellchecker.candidates(t))[:3],
})
return issues
except Exception as e:
print(f"Error checking spelling: {e}")
return []
def compare_colors(self, image1, image2):
"""Compare colors between two images and return differences using RGB color space"""
try:
print("Starting RGB color comparison...")
# Convert images to same size
img1 = np.array(image1)
img2 = np.array(image2)
print(f"Image 1 shape: {img1.shape}")
print(f"Image 2 shape: {img2.shape}")
# Resize images to same dimensions
height = min(img1.shape[0], img2.shape[0])
width = min(img1.shape[1], img2.shape[1])
img1_resized = cv2.resize(img1, (width, height))
img2_resized = cv2.resize(img2, (width, height))
print(f"Resized to: {width}x{height}")
# Keep images in RGB format (no conversion to BGR)
img1_rgb = img1_resized
img2_rgb = img2_resized
color_differences = []
# Method 1: Enhanced RGB channel comparison with 20% more accuracy
print("Method 1: Enhanced RGB channel comparison")
# Calculate absolute difference for each RGB channel with enhanced precision
diff_r = cv2.absdiff(img1_rgb[:,:,0], img2_rgb[:,:,0]) # Red channel
diff_g = cv2.absdiff(img1_rgb[:,:,1], img2_rgb[:,:,1]) # Green channel
diff_b = cv2.absdiff(img1_rgb[:,:,2], img2_rgb[:,:,2]) # Blue channel
# Enhanced RGB combination with better weighting
diff_combined = cv2.addWeighted(diff_r, 0.4, diff_g, 0.4, 0) # Red and Green weighted higher
diff_combined = cv2.addWeighted(diff_combined, 1.0, diff_b, 0.2, 0) # Blue weighted lower
# Apply Gaussian blur to reduce noise and improve accuracy
diff_combined = cv2.GaussianBlur(diff_combined, (3, 3), 0)
# Apply balanced thresholds to catch color variations while avoiding multiple boxes
rgb_thresholds = [15, 22, 30, 40] # Balanced thresholds
for threshold in rgb_thresholds:
_, thresh = cv2.threshold(diff_combined, threshold, 255, cv2.THRESH_BINARY)
# Apply minimal morphological operations
kernel = np.ones((1, 1), np.uint8) # Minimal kernel to preserve detail
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
# Find contours
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
print(f"RGB Threshold {threshold}: Found {len(contours)} contours")
for contour in contours:
area = cv2.contourArea(contour)
if area > 15: # Balanced area threshold to catch variations while avoiding small boxes
x, y, w, h = cv2.boundingRect(contour)
# Get the actual RGB colors at this location
color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
# Calculate RGB color difference magnitude
color_diff = np.linalg.norm(color1 - color2)
# Flag moderate color differences
if color_diff > 18: # Balanced threshold
# Check if this area is already covered (refined consolidated problem areas)
already_covered = False
for existing_diff in color_differences:
if (abs(existing_diff['x'] - x) < 21 and
abs(existing_diff['y'] - y) < 21 and
abs(existing_diff['width'] - w) < 21 and
abs(existing_diff['height'] - h) < 21):
already_covered = True
break
if not already_covered:
color_differences.append({
'x': x,
'y': y,
'width': w,
'height': h,
'area': area,
'color1': color1.tolist(),
'color2': color2.tolist(),
'threshold': f"RGB_{threshold}",
'color_diff': color_diff,
'diff_r': float(abs(color1[0] - color2[0])),
'diff_g': float(abs(color1[1] - color2[1])),
'diff_b': float(abs(color1[2] - color2[2]))
})
# Method 2: Enhanced HSV color space comparison with 20% more accuracy
print("Method 2: Enhanced HSV color space comparison")
# Convert to HSV for better color difference detection
img1_hsv = cv2.cvtColor(img1_rgb, cv2.COLOR_RGB2HSV)
img2_hsv = cv2.cvtColor(img2_rgb, cv2.COLOR_RGB2HSV)
# Enhanced HSV comparison with better channel weighting
hue_diff = cv2.absdiff(img1_hsv[:,:,0], img2_hsv[:,:,0]) # Hue channel
sat_diff = cv2.absdiff(img1_hsv[:,:,1], img2_hsv[:,:,1]) # Saturation channel
val_diff = cv2.absdiff(img1_hsv[:,:,2], img2_hsv[:,:,2]) # Value channel
# Enhanced HSV combination with better weighting
hsv_combined = cv2.addWeighted(hue_diff, 0.5, sat_diff, 0.3, 0) # Hue and Saturation
hsv_combined = cv2.addWeighted(hsv_combined, 1.0, val_diff, 0.2, 0) # Add Value channel
# Apply Gaussian blur to reduce noise and improve accuracy
hsv_combined = cv2.GaussianBlur(hsv_combined, (3, 3), 0)
# Apply balanced HSV thresholds to catch color variations while avoiding multiple boxes
hsv_thresholds = [18, 25, 35, 45] # Balanced HSV thresholds
for threshold in hsv_thresholds:
_, hsv_thresh = cv2.threshold(hsv_combined, threshold, 255, cv2.THRESH_BINARY)
# Apply minimal morphological operations
kernel = np.ones((1, 1), np.uint8)
hsv_thresh = cv2.morphologyEx(hsv_thresh, cv2.MORPH_CLOSE, kernel)
hsv_thresh = cv2.morphologyEx(hsv_thresh, cv2.MORPH_OPEN, kernel)
# Find contours
hsv_contours, _ = cv2.findContours(hsv_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
print(f"HSV Threshold {threshold}: Found {len(hsv_contours)} contours")
for contour in hsv_contours:
area = cv2.contourArea(contour)
if area > 15: # Balanced area threshold to catch variations while avoiding small boxes
x, y, w, h = cv2.boundingRect(contour)
# Get the actual colors at this location
color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
# Calculate color difference magnitude
color_diff = np.linalg.norm(color1 - color2)
# Flag moderate color differences
if color_diff > 22: # Balanced threshold
# Check if this area is already covered (refined consolidated problem areas)
already_covered = False
for existing_diff in color_differences:
if (abs(existing_diff['x'] - x) < 21 and
abs(existing_diff['y'] - y) < 21 and
abs(existing_diff['width'] - w) < 21 and
abs(existing_diff['height'] - h) < 21):
already_covered = True
break
if not already_covered:
color_differences.append({
'x': x,
'y': y,
'width': w,
'height': h,
'area': area,
'color1': color1.tolist(),
'color2': color2.tolist(),
'threshold': f"HSV_{threshold}",
'color_diff': color_diff,
'diff_r': float(abs(color1[0] - color2[0])),
'diff_g': float(abs(color1[1] - color2[1])),
'diff_b': float(abs(color1[2] - color2[2]))
})
# Method 3: Enhanced pixel-by-pixel RGB comparison with 20% more accuracy
print("Method 3: Enhanced pixel-by-pixel RGB comparison")
# Sample every 12th pixel for less sensitivity (20% less frequent)
for y in range(0, height, 12):
for x in range(0, width, 12):
color1 = img1_rgb[y, x]
color2 = img2_rgb[y, x]
# Calculate absolute difference for each RGB channel
diff_r = abs(int(color1[0]) - int(color2[0])) # Red channel
diff_g = abs(int(color1[1]) - int(color2[1])) # Green channel
diff_b = abs(int(color1[2]) - int(color2[2])) # Blue channel
# Flag if RGB channels differ by moderate amounts
if diff_r > 10 or diff_g > 10 or diff_b > 10:
# Check if this area is already covered (refined consolidated problem areas)
already_covered = False
for existing_diff in color_differences:
if (abs(existing_diff['x'] - x) < 21 and
abs(existing_diff['y'] - y) < 21):
already_covered = True
break
if not already_covered:
color_differences.append({
'x': x,
'y': y,
'width': 5, # Small box around the pixel
'height': 5,
'area': 25,
'color1': color1.tolist(),
'color2': color2.tolist(),
'threshold': 'pixel_RGB',
'color_diff': diff_r + diff_g + diff_b,
'diff_r': diff_r,
'diff_g': diff_g,
'diff_b': diff_b
})
print(f"RGB color comparison completed. Found {len(color_differences)} total differences.")
# Method 4: LAB color space comparison for perceptual accuracy (20% more accurate)
print("Method 4: LAB color space comparison")
# Convert to LAB color space for perceptual color differences
img1_lab = cv2.cvtColor(img1_rgb, cv2.COLOR_RGB2LAB)
img2_lab = cv2.cvtColor(img2_rgb, cv2.COLOR_RGB2LAB)
# Calculate LAB differences (perceptually uniform)
lab_diff_l = cv2.absdiff(img1_lab[:,:,0], img2_lab[:,:,0]) # L channel (lightness)
lab_diff_a = cv2.absdiff(img1_lab[:,:,1], img2_lab[:,:,1]) # a channel (green-red)
lab_diff_b = cv2.absdiff(img1_lab[:,:,2], img2_lab[:,:,2]) # b channel (blue-yellow)
# Combine LAB differences with perceptual weighting
lab_combined = cv2.addWeighted(lab_diff_l, 0.3, lab_diff_a, 0.35, 0) # L and a channels
lab_combined = cv2.addWeighted(lab_combined, 1.0, lab_diff_b, 0.35, 0) # Add b channel
# Apply Gaussian blur for noise reduction
lab_combined = cv2.GaussianBlur(lab_combined, (3, 3), 0)
# Apply balanced LAB thresholds to catch color variations while avoiding multiple boxes
lab_thresholds = [20, 28, 38, 50] # Balanced LAB thresholds
for threshold in lab_thresholds:
_, lab_thresh = cv2.threshold(lab_combined, threshold, 255, cv2.THRESH_BINARY)
# Apply morphological operations
kernel = np.ones((1, 1), np.uint8)
lab_thresh = cv2.morphologyEx(lab_thresh, cv2.MORPH_CLOSE, kernel)
lab_thresh = cv2.morphologyEx(lab_thresh, cv2.MORPH_OPEN, kernel)
# Find contours
lab_contours, _ = cv2.findContours(lab_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
print(f"LAB Threshold {threshold}: Found {len(lab_contours)} contours")
for contour in lab_contours:
area = cv2.contourArea(contour)
if area > 15: # Balanced area threshold to catch variations while avoiding small boxes
x, y, w, h = cv2.boundingRect(contour)
# Get the actual colors at this location
color1 = img1_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
color2 = img2_rgb[y:y+h, x:x+w].mean(axis=(0, 1))
# Calculate color difference magnitude
color_diff = np.linalg.norm(color1 - color2)
# Flag moderate color differences
if color_diff > 22: # Balanced threshold
# Check if this area is already covered (refined consolidated problem areas)
already_covered = False
for existing_diff in color_differences:
if (abs(existing_diff['x'] - x) < 21 and
abs(existing_diff['y'] - y) < 21 and
abs(existing_diff['width'] - w) < 21 and
abs(existing_diff['height'] - h) < 21):
already_covered = True
break
if not already_covered:
color_differences.append({
'x': x,
'y': y,
'width': w,
'height': h,
'area': area,
'color1': color1.tolist(),
'color2': color2.tolist(),
'threshold': f"LAB_{threshold}",
'color_diff': color_diff,
'diff_r': float(abs(color1[0] - color2[0])),
'diff_g': float(abs(color1[1] - color2[1])),
'diff_b': float(abs(color1[2] - color2[2]))
})
print(f"Enhanced color comparison completed. Found {len(color_differences)} total differences.")
# Group nearby differences into one perimeter box per issue area
if color_differences:
grouped_differences = self.group_nearby_differences(color_differences)
print(f"Grouped into {len(grouped_differences)} perimeter boxes")
return grouped_differences
return color_differences
except Exception as e:
print(f"Error comparing colors: {str(e)}")
return []
def group_nearby_differences(self, differences):
"""Group nearby differences into larger bounding boxes around affected areas"""
if not differences:
return []
# Sort differences by position for easier grouping
sorted_diffs = sorted(differences, key=lambda x: (x['y'], x['x']))
grouped_areas = []
current_group = []
for diff in sorted_diffs:
if not current_group:
current_group = [diff]
else:
# Check if this difference is close to the current group
should_group = False
for group_diff in current_group:
# Calculate distance between centers
center1_x = group_diff['x'] + group_diff['width'] // 2
center1_y = group_diff['y'] + group_diff['height'] // 2
center2_x = diff['x'] + diff['width'] // 2
center2_y = diff['y'] + diff['height'] // 2
distance = ((center1_x - center2_x) ** 2 + (center1_y - center2_y) ** 2) ** 0.5
# If distance is less than 200 pixels, group them for one box per main issue
if distance < 200:
should_group = True
break
if should_group:
current_group.append(diff)
else:
# Create bounding box for current group
if current_group:
bounding_box = self.create_group_bounding_box(current_group)
if bounding_box: # Only add if not None
grouped_areas.append(bounding_box)
current_group = [diff]
# Don't forget the last group
if current_group:
bounding_box = self.create_group_bounding_box(current_group)
if bounding_box: # Only add if not None
grouped_areas.append(bounding_box)
return grouped_areas
def group_nearby_differences(self, differences):
"""Group nearby differences into one perimeter box per issue area"""
if not differences:
return []
# Sort differences by position for easier grouping
sorted_diffs = sorted(differences, key=lambda x: (x['y'], x['x']))
grouped_areas = []
current_group = []
for diff in sorted_diffs:
if not current_group:
current_group = [diff]
else:
# Check if this difference is close to the current group
should_group = False
for group_diff in current_group:
# Calculate distance between centers
center1_x = group_diff['x'] + group_diff['width'] // 2
center1_y = group_diff['y'] + group_diff['height'] // 2
center2_x = diff['x'] + diff['width'] // 2
center2_y = diff['y'] + diff['height'] // 2
distance = ((center1_x - center2_x) ** 2 + (center1_y - center2_y) ** 2) ** 0.5
# If distance is less than 234 pixels, group them for refined consolidated problem areas
if distance < 234:
should_group = True
break
if should_group:
current_group.append(diff)
else:
# Create perimeter box for current group
if current_group:
perimeter_box = self.create_perimeter_box(current_group)
if perimeter_box: # Only add if not None
grouped_areas.append(perimeter_box)
current_group = [diff]
# Don't forget the last group
if current_group:
perimeter_box = self.create_perimeter_box(current_group)
if perimeter_box: # Only add if not None
grouped_areas.append(perimeter_box)
return grouped_areas
def create_perimeter_box(self, group):
"""Create a perimeter box that encompasses all differences in a group"""
if not group:
return None
# Find the overall bounding box
min_x = min(diff['x'] - 5 for diff in group) # Include 5-pixel extension
min_y = min(diff['y'] - 5 for diff in group) # Include 5-pixel extension
max_x = max(diff['x'] + diff['width'] + 5 for diff in group) # Include 5-pixel extension
max_y = max(diff['y'] + diff['height'] + 5 for diff in group) # Include 5-pixel extension
# Add minimal padding around the perimeter box (refined consolidated problem areas)
padding = 7
min_x = max(0, min_x - padding)
min_y = max(0, min_y - padding)
max_x = max_x + padding
max_y = max_y + padding
# Calculate final dimensions
width = max_x - min_x
height = max_y - min_y
# Filter out very small groups (refined consolidated problem areas)
if width < 26 or height < 26:
return None
return {
'x': min_x,
'y': min_y,
'width': width,
'height': height,
'area': width * height,
'color1': [0, 0, 0], # Placeholder
'color2': [0, 0, 0], # Placeholder
'threshold': 'perimeter',
'color_diff': 1.0,
'num_original_differences': len(group)
}
def create_annotated_image(self, image, differences, output_path):
"""Create annotated image with red boxes around differences"""
try:
print(f"Creating annotated image: {output_path}")
print(f"Number of differences to annotate: {len(differences)}")
# Create a copy of the image
annotated_image = image.copy()
draw = ImageDraw.Draw(annotated_image)
# Draw red rectangles around differences
for i, diff in enumerate(differences):
x, y, w, h = diff['x'], diff['y'], diff['width'], diff['height']
# Draw thicker red rectangle
draw.rectangle([x, y, x + w, y + h], outline='red', width=5)
print(f"Drawing rectangle {i+1}: ({x}, {y}) to ({x+w}, {y+h})")
# Save annotated image
annotated_image.save(output_path)
print(f"Annotated image saved successfully: {output_path}")
except Exception as e:
print(f"Error creating annotated image: {str(e)}")
# Try to save the original image as fallback
try:
image.save(output_path)
print(f"Saved original image as fallback: {output_path}")
except Exception as e2:
print(f"Failed to save fallback image: {str(e2)}")
def compare_pdfs(self, pdf1_path, pdf2_path, session_id):
"""Main comparison function with improved error handling"""
try:
print("Starting PDF comparison...")
start_time = time.time()
# Validate both PDFs contain "50 Carroll"
print("Validating PDF 1...")
if not self.validate_pdf(pdf1_path):
raise Exception("INVALID DOCUMENT")
print("Validating PDF 2...")
if not self.validate_pdf(pdf2_path):
raise Exception("INVALID DOCUMENT")
# Extract text and images from both PDFs
print("Extracting text from PDF 1...")
pdf1_data = self.extract_text_from_pdf(pdf1_path)
if not pdf1_data:
raise Exception("INVALID DOCUMENT")
print("Extracting text from PDF 2...")
pdf2_data = self.extract_text_from_pdf(pdf2_path)
if not pdf2_data:
raise Exception("INVALID DOCUMENT")
# Initialize results
results = {
'session_id': session_id,
'validation': {
'pdf1_valid': True,
'pdf2_valid': True,
'validation_text': '50 Carroll'
},
'text_comparison': [],
'spelling_issues': [],
'barcodes_qr_codes': [],
'color_differences': [],
'annotated_images': []
}
# Compare text and check spelling
print("Processing pages...")
for i, (page1, page2) in enumerate(zip(pdf1_data, pdf2_data)):
print(f"Processing page {i + 1}...")
page_results = {
'page': i + 1,
'text_differences': [],
'spelling_issues_pdf1': [],
'spelling_issues_pdf2': [],
'barcodes_pdf1': [],
'barcodes_pdf2': [],
'color_differences': []
}
# Check spelling for both PDFs
print(f"Checking spelling for page {i + 1}...")
page_results['spelling_issues_pdf1'] = self.check_spelling(page1['text'])
page_results['spelling_issues_pdf2'] = self.check_spelling(page2['text'])
# Add spelling issues to text differences for UI visibility
if page_results['spelling_issues_pdf1'] or page_results['spelling_issues_pdf2']:
page_results['text_differences'].append({
"type": "spelling",
"pdf1": [i["word"] for i in page_results['spelling_issues_pdf1']],
"pdf2": [i["word"] for i in page_results['spelling_issues_pdf2']],
})
# Create spelling-only annotated images (one box per error)
spell_dir = f'static/results/{session_id}'
os.makedirs(spell_dir, exist_ok=True)
spell_img1 = page1['image'].copy()
spell_img2 = page2['image'].copy()
spell_img1 = self.annotate_spelling_errors_on_image(spell_img1, page_results['spelling_issues_pdf1'])
spell_img2 = self.annotate_spelling_errors_on_image(spell_img2, page_results['spelling_issues_pdf2'])
spell_path1 = f'{spell_dir}/page_{i+1}_pdf1_spelling.png'
spell_path2 = f'{spell_dir}/page_{i+1}_pdf2_spelling.png'
spell_img1.save(spell_path1)
spell_img2.save(spell_path2)
# link them into the results for your UI
page_results.setdefault('annotated_images', {})
page_results['annotated_images'].update({
'pdf1_spelling': f'results/{session_id}/page_{i+1}_pdf1_spelling.png',
'pdf2_spelling': f'results/{session_id}/page_{i+1}_pdf2_spelling.png',
})
# Detect barcodes and QR codes
print(f"Detecting barcodes for page {i + 1} PDF 1...")
page_results['barcodes_pdf1'] = self.detect_barcodes_qr_codes(page1['image']) or []
print(f"Detecting barcodes for page {i + 1} PDF 2...")
page_results['barcodes_pdf2'] = self.detect_barcodes_qr_codes(page2['image']) or []
# Compare colors
print(f"Comparing colors for page {i + 1}...")
color_diffs = self.compare_colors(page1['image'], page2['image'])
page_results['color_differences'] = color_diffs
# Create annotated images and save original images
print(f"Creating images for page {i + 1}...")
output_dir = f'static/results/{session_id}'
os.makedirs(output_dir, exist_ok=True)
# Save original images
original_path1 = f'{output_dir}/page_{i+1}_pdf1_original.png'
original_path2 = f'{output_dir}/page_{i+1}_pdf2_original.png'
page1['image'].save(original_path1)
page2['image'].save(original_path2)
# Create annotated images if there are color differences
if color_diffs:
print(f"Creating annotated images for page {i + 1}...")
annotated_path1 = f'{output_dir}/page_{i+1}_pdf1_annotated.png'
annotated_path2 = f'{output_dir}/page_{i+1}_pdf2_annotated.png'
self.create_annotated_image(page1['image'], color_diffs, annotated_path1)
self.create_annotated_image(page2['image'], color_diffs, annotated_path2)
page_results['annotated_images'] = {
'pdf1': f'results/{session_id}/page_{i+1}_pdf1_annotated.png',
'pdf2': f'results/{session_id}/page_{i+1}_pdf2_annotated.png'
}
else:
# If no color differences, use original images
page_results['annotated_images'] = {
'pdf1': f'results/{session_id}/page_{i+1}_pdf1_original.png',
'pdf2': f'results/{session_id}/page_{i+1}_pdf2_original.png'
}
results['text_comparison'].append(page_results)
# Aggregate spelling issues
print("Aggregating results...")
all_spelling_issues = []
for page in results['text_comparison']:
all_spelling_issues.extend(page['spelling_issues_pdf1'])
all_spelling_issues.extend(page['spelling_issues_pdf2'])
results['spelling_issues'] = all_spelling_issues
# Aggregate barcodes and QR codes
all_barcodes = []
for page in results['text_comparison']:
all_barcodes.extend(page['barcodes_pdf1'])
all_barcodes.extend(page['barcodes_pdf2'])
results['barcodes_qr_codes'] = all_barcodes
elapsed_time = time.time() - start_time
print(f"PDF comparison completed in {elapsed_time:.2f} seconds.")
return results
except Exception as e:
print(f"Error in PDF comparison: {str(e)}")
raise Exception(f"INVALID DOCUMENT")
# Enhanced OCR for tiny fonts - deployment check
# Force rebuild - Thu Sep 4 09:33:44 EDT 2025