File size: 7,939 Bytes
3433c14
 
 
d11e909
530cc2d
3433c14
 
 
 
 
 
 
7ac479f
3433c14
 
 
 
 
 
 
 
 
 
7ac479f
 
 
 
 
 
 
 
 
 
 
3433c14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
530cc2d
3433c14
 
 
 
 
 
 
 
530cc2d
3433c14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ac479f
 
 
 
3433c14
 
 
 
530cc2d
3433c14
 
 
 
 
 
 
530cc2d
3433c14
 
 
 
 
 
7ac479f
3433c14
 
 
 
7ac479f
 
3433c14
 
 
 
 
 
 
530cc2d
 
3433c14
 
 
530cc2d
2eaa60d
069c35c
7ac479f
3433c14
530cc2d
34820ab
 
 
530cc2d
 
 
 
7ac479f
 
530cc2d
 
34820ab
 
 
530cc2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d11e909
1d80794
3433c14
6c04feb
34820ab
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import os
import sqlite3
import hashlib
import numpy as np
import cv2
from PIL import Image
import pytesseract
from pdf2image import convert_from_bytes
from io import BytesIO
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import gradio as gr

class InvoiceDuplicateDetector:
    def __init__(self, db_path="invoices.db"):
        self.db_path = db_path
        self.init_database()
        self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)

    def init_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS invoices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                filename TEXT NOT NULL,
                file_hash TEXT UNIQUE,
                image_hash TEXT,
                extracted_text TEXT,
                upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                image_data BLOB
            )
        ''')
        conn.commit()
        conn.close()

    def calculate_file_hash(self, file_bytes):
        return hashlib.md5(file_bytes).hexdigest()

    def calculate_image_hash(self, image):
        resized = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA)
        gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
        avg = gray.mean()
        binary = (gray > avg).astype(int)
        return ''.join(str(b) for b in binary.flatten())

    def pdf_to_image(self, file_bytes):
        images = convert_from_bytes(file_bytes, first_page=1, last_page=1)
        return np.array(images[0])

    def extract_text_from_image(self, image):
        return pytesseract.image_to_string(Image.fromarray(image)).strip()

    def image_to_blob(self, image):
        buffer = BytesIO()
        Image.fromarray(image).save(buffer, format='PNG')
        return buffer.getvalue()

    def blob_to_image(self, blob):
        return Image.open(BytesIO(blob))

    def preprocess_image(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)
        return cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                     cv2.THRESH_BINARY, 11, 2)

    def calculate_image_similarity(self, img1, img2):
        try:
            proc_img1 = self.preprocess_image(img1)
            proc_img2 = self.preprocess_image(img2)
            h, w = min(proc_img1.shape[0], proc_img2.shape[0]), min(proc_img1.shape[1], proc_img2.shape[1])
            proc_img1 = cv2.resize(proc_img1, (w, h))
            proc_img2 = cv2.resize(proc_img2, (w, h))
            hist1 = cv2.calcHist([proc_img1], [0], None, [256], [0, 256])
            hist2 = cv2.calcHist([proc_img2], [0], None, [256], [0, 256])
            return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)
        except:
            return 0

    def calculate_text_similarity(self, text1, text2):
        try:
            if not text1.strip() or not text2.strip(): return 0
            tfidf = self.vectorizer.fit_transform([text1, text2])
            return cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]
        except:
            return 0

    def hamming_distance(self, h1, h2):
        return sum(c1 != c2 for c1, c2 in zip(h1, h2)) if len(h1) == len(h2) else float('inf')

    def store_invoice(self, file_bytes, filename):
        file_hash = self.calculate_file_hash(file_bytes)
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT id FROM invoices WHERE file_hash=?", (file_hash,))
        if cursor.fetchone():
            conn.close()
            return False, "Duplicate file. Skipped."

        ext = filename.lower().split('.')[-1]
        try:
            if ext == 'pdf':
                image = self.pdf_to_image(file_bytes)
            else:
                image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB'))
        except Exception as e:
            return False, f"Error processing file: {str(e)}"

        image_hash = self.calculate_image_hash(image)
        text = self.extract_text_from_image(image)
        blob = self.image_to_blob(image)

        cursor.execute('''
            INSERT INTO invoices (filename, file_hash, image_hash, extracted_text, image_data)
            VALUES (?, ?, ?, ?, ?)
        ''', (filename, file_hash, image_hash, text, blob))
        conn.commit()
        conn.close()
        return True, "Stored successfully."

    def find_duplicates(self, file_bytes, filename, threshold=0.8):
        ext = filename.lower().split('.')[-1]
        try:
            if ext == 'pdf':
                image = self.pdf_to_image(file_bytes)
            else:
                image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB'))
        except Exception as e:
            return False, f"Failed to process file: {str(e)}"

        image_hash = self.calculate_image_hash(image)
        extracted_text = self.extract_text_from_image(image)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT id, filename, image_hash, extracted_text, image_data FROM invoices")
        invoices = cursor.fetchall()
        conn.close()

        results = []
        for inv in invoices:
            iid, fname, stored_hash, stored_text, blob = inv
            stored_image = np.array(self.blob_to_image(blob).convert('RGB'))
            hash_similarity = 1 - (self.hamming_distance(image_hash, stored_hash) / len(image_hash))
            text_similarity = self.calculate_text_similarity(extracted_text, stored_text)
            img_similarity = self.calculate_image_similarity(image, stored_image)
            combined = 0.4 * hash_similarity + 0.4 * text_similarity + 0.2 * img_similarity
            if combined >= threshold:
                results.append((fname, combined))
        results.sort(key=lambda x: x[1], reverse=True)
        return True, results

detector = InvoiceDuplicateDetector()

def upload_files(files):
    if not files:
        return "No files uploaded."
    results = []
    for file in files:
        try:
            with open(file.name, "rb") as f:
                file_bytes = f.read()
            filename = os.path.basename(file.name)
            success, message = detector.store_invoice(file_bytes, filename)
            results.append(f"{filename}: {message}")
        except Exception as e:
            results.append(f"{getattr(file, 'name', 'unknown')}: File read error: {str(e)}")
    return "\n".join(results)

def check_duplicates(file):
    try:
        with open(file.name, "rb") as f:
            file_bytes = f.read()
        filename = os.path.basename(file.name)
        ok, result = detector.find_duplicates(file_bytes, filename)
        if not ok:
            return result
        if not result:
            return "✅ No duplicates found!"
        return "\n".join([f"🔁 {fname} — Similarity: {score:.2f}" for fname, score in result])
    except Exception as e:
        return f"File read error: {str(e)}"

with gr.Blocks(theme=gr.themes.Base()) as demo:
    gr.Markdown("## 📄 Invoice Duplicate Detector")

    with gr.Row():
        with gr.Column():
            upload_input = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"], file_count="multiple", label="Upload Invoices")
            upload_btn = gr.Button("Upload")
            upload_output = gr.Textbox(label="Upload Result")
        with gr.Column():
            check_input = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"], label="Check for Duplicate")
            check_btn = gr.Button("Check")
            check_output = gr.Textbox(label="Check Result")

    upload_btn.click(upload_files, inputs=upload_input, outputs=upload_output)
    check_btn.click(check_duplicates, inputs=check_input, outputs=check_output)

demo.launch()