import os import sqlite3 import hashlib import numpy as np import cv2 from PIL import Image import pytesseract from pdf2image import convert_from_bytes from io import BytesIO from datetime import datetime from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import gradio as gr class InvoiceDuplicateDetector: def __init__(self, db_path="invoices.db"): self.db_path = db_path self.init_database() self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) def init_database(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS invoices ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, file_hash TEXT UNIQUE, image_hash TEXT, extracted_text TEXT, upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, image_data BLOB ) ''') conn.commit() conn.close() def calculate_file_hash(self, file_bytes): return hashlib.md5(file_bytes).hexdigest() def calculate_image_hash(self, image): resized = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY) avg = gray.mean() binary = (gray > avg).astype(int) return ''.join(str(b) for b in binary.flatten()) def pdf_to_image(self, file_bytes): images = convert_from_bytes(file_bytes, first_page=1, last_page=1) return np.array(images[0]) def extract_text_from_image(self, image): return pytesseract.image_to_string(Image.fromarray(image)).strip() def image_to_blob(self, image): buffer = BytesIO() Image.fromarray(image).save(buffer, format='PNG') return buffer.getvalue() def blob_to_image(self, blob): return Image.open(BytesIO(blob)) def preprocess_image(self, image): gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) return cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) def calculate_image_similarity(self, img1, img2): try: proc_img1 = self.preprocess_image(img1) proc_img2 = self.preprocess_image(img2) h, w = min(proc_img1.shape[0], proc_img2.shape[0]), min(proc_img1.shape[1], proc_img2.shape[1]) proc_img1 = cv2.resize(proc_img1, (w, h)) proc_img2 = cv2.resize(proc_img2, (w, h)) hist1 = cv2.calcHist([proc_img1], [0], None, [256], [0, 256]) hist2 = cv2.calcHist([proc_img2], [0], None, [256], [0, 256]) return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL) except: return 0 def calculate_text_similarity(self, text1, text2): try: if not text1.strip() or not text2.strip(): return 0 tfidf = self.vectorizer.fit_transform([text1, text2]) return cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0] except: return 0 def hamming_distance(self, h1, h2): return sum(c1 != c2 for c1, c2 in zip(h1, h2)) if len(h1) == len(h2) else float('inf') def store_invoice(self, file_bytes, filename): file_hash = self.calculate_file_hash(file_bytes) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT id FROM invoices WHERE file_hash=?", (file_hash,)) if cursor.fetchone(): conn.close() return False, "Duplicate file. Skipped." ext = filename.lower().split('.')[-1] try: if ext == 'pdf': image = self.pdf_to_image(file_bytes) else: image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB')) except Exception as e: return False, f"Error processing file: {str(e)}" image_hash = self.calculate_image_hash(image) text = self.extract_text_from_image(image) blob = self.image_to_blob(image) cursor.execute(''' INSERT INTO invoices (filename, file_hash, image_hash, extracted_text, image_data) VALUES (?, ?, ?, ?, ?) ''', (filename, file_hash, image_hash, text, blob)) conn.commit() conn.close() return True, "Stored successfully." def find_duplicates(self, file_bytes, filename, threshold=0.8): ext = filename.lower().split('.')[-1] try: if ext == 'pdf': image = self.pdf_to_image(file_bytes) else: image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB')) except Exception as e: return False, f"Failed to process file: {str(e)}" image_hash = self.calculate_image_hash(image) extracted_text = self.extract_text_from_image(image) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT id, filename, image_hash, extracted_text, image_data FROM invoices") invoices = cursor.fetchall() conn.close() results = [] for inv in invoices: iid, fname, stored_hash, stored_text, blob = inv stored_image = np.array(self.blob_to_image(blob).convert('RGB')) hash_similarity = 1 - (self.hamming_distance(image_hash, stored_hash) / len(image_hash)) text_similarity = self.calculate_text_similarity(extracted_text, stored_text) img_similarity = self.calculate_image_similarity(image, stored_image) combined = 0.4 * hash_similarity + 0.4 * text_similarity + 0.2 * img_similarity if combined >= threshold: results.append((fname, combined)) results.sort(key=lambda x: x[1], reverse=True) return True, results detector = InvoiceDuplicateDetector() def upload_files(files): if not files: return "No files uploaded." results = [] for file in files: try: with open(file.name, "rb") as f: file_bytes = f.read() filename = os.path.basename(file.name) success, message = detector.store_invoice(file_bytes, filename) results.append(f"{filename}: {message}") except Exception as e: results.append(f"{getattr(file, 'name', 'unknown')}: File read error: {str(e)}") return "\n".join(results) def check_duplicates(file): try: with open(file.name, "rb") as f: file_bytes = f.read() filename = os.path.basename(file.name) ok, result = detector.find_duplicates(file_bytes, filename) if not ok: return result if not result: return "✅ No duplicates found!" return "\n".join([f"🔁 {fname} — Similarity: {score:.2f}" for fname, score in result]) except Exception as e: return f"File read error: {str(e)}" with gr.Blocks(theme=gr.themes.Base()) as demo: gr.Markdown("## 📄 Invoice Duplicate Detector") with gr.Row(): with gr.Column(): upload_input = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"], file_count="multiple", label="Upload Invoices") upload_btn = gr.Button("Upload") upload_output = gr.Textbox(label="Upload Result") with gr.Column(): check_input = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"], label="Check for Duplicate") check_btn = gr.Button("Check") check_output = gr.Textbox(label="Check Result") upload_btn.click(upload_files, inputs=upload_input, outputs=upload_output) check_btn.click(check_duplicates, inputs=check_input, outputs=check_output) demo.launch()