SetuG commited on
Commit
3433c14
·
verified ·
1 Parent(s): b7a5e71

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +190 -174
app.py CHANGED
@@ -1,174 +1,190 @@
1
- import os
2
- import sqlite3
3
- import hashlib
4
- import cv2
5
- import numpy as np
6
- from PIL import Image
7
- import pytesseract
8
- import gradio as gr
9
- from io import BytesIO
10
- from datetime import datetime
11
- from sklearn.feature_extraction.text import TfidfVectorizer
12
- from sklearn.metrics.pairwise import cosine_similarity
13
-
14
- DB_PATH = "/tmp/invoices.db"
15
-
16
- class InvoiceDuplicateDetector:
17
- def __init__(self, db_path=DB_PATH):
18
- self.db_path = db_path
19
- self.init_database()
20
- self.vectorizer = TfidfVectorizer(stop_words="english", max_features=1000)
21
-
22
- def init_database(self):
23
- conn = sqlite3.connect(self.db_path)
24
- cursor = conn.cursor()
25
- cursor.execute('''
26
- CREATE TABLE IF NOT EXISTS invoices (
27
- id INTEGER PRIMARY KEY AUTOINCREMENT,
28
- filename TEXT NOT NULL,
29
- file_hash TEXT UNIQUE,
30
- image_hash TEXT,
31
- extracted_text TEXT,
32
- upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
33
- image_data BLOB
34
- )
35
- ''')
36
- conn.commit()
37
- conn.close()
38
-
39
- def calculate_file_hash(self, file_bytes):
40
- return hashlib.md5(file_bytes).hexdigest()
41
-
42
- def calculate_image_hash(self, image):
43
- resized = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA)
44
- gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
45
- avg = gray.mean()
46
- binary = (gray > avg).astype(int)
47
- return ''.join(str(b) for b in binary.flatten())
48
-
49
- def extract_text_from_image(self, image):
50
- return pytesseract.image_to_string(Image.fromarray(image)).strip()
51
-
52
- def image_to_blob(self, image):
53
- buffer = BytesIO()
54
- Image.fromarray(image).save(buffer, format="PNG")
55
- return buffer.getvalue()
56
-
57
- def blob_to_image(self, blob):
58
- return Image.open(BytesIO(blob))
59
-
60
- def preprocess_image(self, image):
61
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
62
- blurred = cv2.GaussianBlur(gray, (5, 5), 0)
63
- return cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
64
- cv2.THRESH_BINARY, 11, 2)
65
-
66
- def calculate_image_similarity(self, img1, img2):
67
- try:
68
- proc_img1 = self.preprocess_image(img1)
69
- proc_img2 = self.preprocess_image(img2)
70
- h, w = min(proc_img1.shape[0], proc_img2.shape[0]), min(proc_img1.shape[1], proc_img2.shape[1])
71
- proc_img1 = cv2.resize(proc_img1, (w, h))
72
- proc_img2 = cv2.resize(proc_img2, (w, h))
73
- hist1 = cv2.calcHist([proc_img1], [0], None, [256], [0, 256])
74
- hist2 = cv2.calcHist([proc_img2], [0], None, [256], [0, 256])
75
- return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)
76
- except:
77
- return 0
78
-
79
- def calculate_text_similarity(self, text1, text2):
80
- try:
81
- if not text1.strip() or not text2.strip():
82
- return 0
83
- tfidf = self.vectorizer.fit_transform([text1, text2])
84
- return cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]
85
- except:
86
- return 0
87
-
88
- def hamming_distance(self, h1, h2):
89
- return sum(c1 != c2 for c1, c2 in zip(h1, h2)) if len(h1) == len(h2) else float("inf")
90
-
91
- def store_invoice(self, file_bytes, filename):
92
- file_hash = self.calculate_file_hash(file_bytes)
93
- conn = sqlite3.connect(self.db_path)
94
- cursor = conn.cursor()
95
- cursor.execute("SELECT id FROM invoices WHERE file_hash=?", (file_hash,))
96
- if cursor.fetchone():
97
- conn.close()
98
- return "⚠️ Duplicate file. Skipped."
99
-
100
- ext = filename.lower().split(".")[-1]
101
- if ext == "pdf":
102
- return "PDF not supported on Gradio version."
103
-
104
- image = np.array(Image.open(BytesIO(file_bytes)).convert("RGB"))
105
- image_hash = self.calculate_image_hash(image)
106
- text = self.extract_text_from_image(image)
107
- blob = self.image_to_blob(image)
108
-
109
- cursor.execute('''
110
- INSERT INTO invoices (filename, file_hash, image_hash, extracted_text, image_data)
111
- VALUES (?, ?, ?, ?, ?)
112
- ''', (filename, file_hash, image_hash, text, blob))
113
- conn.commit()
114
- conn.close()
115
- return "✅ Invoice stored successfully."
116
-
117
- def find_duplicates(self, file_bytes, filename, threshold=0.8):
118
- ext = filename.lower().split(".")[-1]
119
- if ext == "pdf":
120
- return "PDF not supported in this version.", None
121
-
122
- image = np.array(Image.open(BytesIO(file_bytes)).convert("RGB"))
123
- image_hash = self.calculate_image_hash(image)
124
- extracted_text = self.extract_text_from_image(image)
125
-
126
- conn = sqlite3.connect(self.db_path)
127
- cursor = conn.cursor()
128
- cursor.execute("SELECT filename, image_hash, extracted_text, image_data FROM invoices")
129
- invoices = cursor.fetchall()
130
- conn.close()
131
-
132
- results = []
133
- for fname, stored_hash, stored_text, blob in invoices:
134
- stored_image = np.array(self.blob_to_image(blob).convert("RGB"))
135
- hash_similarity = 1 - (self.hamming_distance(image_hash, stored_hash) / len(image_hash))
136
- text_similarity = self.calculate_text_similarity(extracted_text, stored_text)
137
- img_similarity = self.calculate_image_similarity(image, stored_image)
138
- combined = 0.4 * hash_similarity + 0.4 * text_similarity + 0.2 * img_similarity
139
- if combined >= threshold:
140
- results.append((fname, combined, stored_image))
141
- results.sort(key=lambda x: x[1], reverse=True)
142
-
143
- if not results:
144
- return "✅ No duplicates found.", None
145
- else:
146
- output = "⚠️ Duplicates Found:\n"
147
- for fname, score, _ in results:
148
- output += f"• {fname} — Similarity: {score:.2f}\n"
149
- return output, [Image.fromarray(img) for _, _, img in results]
150
-
151
- detector = InvoiceDuplicateDetector()
152
-
153
- def upload_invoice(file):
154
- return detector.store_invoice(file.read(), file.name)
155
-
156
- def check_duplicates(file):
157
- result, images = detector.find_duplicates(file.read(), file.name)
158
- return result, images or None
159
-
160
- upload_interface = gr.Interface(
161
- fn=upload_invoice,
162
- inputs=gr.File(type="binary", label="Upload Invoice (PNG/JPG)"),
163
- outputs="text",
164
- title="Upload & Store Invoice"
165
- )
166
-
167
- check_interface = gr.Interface(
168
- fn=check_duplicates,
169
- inputs=gr.File(type="binary", label="Check Invoice for Duplicates"),
170
- outputs=["text", gr.Gallery(label="Matching Invoices")],
171
- title="Check for Duplicates"
172
- )
173
-
174
- gr.TabbedInterface([upload_interface, check_interface], ["Upload Invoice", "Check Duplicate"]).launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py (Gradio version, Hugging Face-compatible)
2
+ import os
3
+ import sqlite3
4
+ import hashlib
5
+ import numpy as np
6
+ import gradio as gr
7
+ from PIL import Image
8
+ import pytesseract
9
+ from pdf2image import convert_from_bytes
10
+ from io import BytesIO
11
+ from datetime import datetime
12
+ from sklearn.feature_extraction.text import TfidfVectorizer
13
+ from sklearn.metrics.pairwise import cosine_similarity
14
+ import cv2
15
+
16
+ # --- Class for Duplicate Detection ---
17
+ class InvoiceDuplicateDetector:
18
+ def __init__(self, db_path="invoices.db"):
19
+ self.db_path = db_path
20
+ self.init_database()
21
+ self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)
22
+
23
+ def init_database(self):
24
+ conn = sqlite3.connect(self.db_path)
25
+ cursor = conn.cursor()
26
+ cursor.execute('''CREATE TABLE IF NOT EXISTS invoices (
27
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
28
+ filename TEXT NOT NULL,
29
+ file_hash TEXT UNIQUE,
30
+ image_hash TEXT,
31
+ extracted_text TEXT,
32
+ upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
33
+ image_data BLOB
34
+ )''')
35
+ conn.commit()
36
+ conn.close()
37
+
38
+ def calculate_file_hash(self, file_bytes):
39
+ return hashlib.md5(file_bytes).hexdigest()
40
+
41
+ def calculate_image_hash(self, image):
42
+ resized = cv2.resize(image, (8, 8), interpolation=cv2.INTER_AREA)
43
+ gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
44
+ avg = gray.mean()
45
+ binary = (gray > avg).astype(int)
46
+ return ''.join(str(b) for b in binary.flatten())
47
+
48
+ def pdf_to_image(self, file_bytes):
49
+ images = convert_from_bytes(file_bytes, first_page=1, last_page=1)
50
+ return np.array(images[0])
51
+
52
+ def extract_text_from_image(self, image):
53
+ return pytesseract.image_to_string(Image.fromarray(image)).strip()
54
+
55
+ def image_to_blob(self, image):
56
+ buffer = BytesIO()
57
+ Image.fromarray(image).save(buffer, format='PNG')
58
+ return buffer.getvalue()
59
+
60
+ def blob_to_image(self, blob):
61
+ return Image.open(BytesIO(blob))
62
+
63
+ def preprocess_image(self, image):
64
+ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
65
+ blurred = cv2.GaussianBlur(gray, (5, 5), 0)
66
+ return cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
67
+ cv2.THRESH_BINARY, 11, 2)
68
+
69
+ def calculate_image_similarity(self, img1, img2):
70
+ try:
71
+ proc_img1 = self.preprocess_image(img1)
72
+ proc_img2 = self.preprocess_image(img2)
73
+ h, w = min(proc_img1.shape[0], proc_img2.shape[0]), min(proc_img1.shape[1], proc_img2.shape[1])
74
+ proc_img1 = cv2.resize(proc_img1, (w, h))
75
+ proc_img2 = cv2.resize(proc_img2, (w, h))
76
+ hist1 = cv2.calcHist([proc_img1], [0], None, [256], [0, 256])
77
+ hist2 = cv2.calcHist([proc_img2], [0], None, [256], [0, 256])
78
+ return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)
79
+ except:
80
+ return 0
81
+
82
+ def calculate_text_similarity(self, text1, text2):
83
+ try:
84
+ if not text1.strip() or not text2.strip(): return 0
85
+ tfidf = self.vectorizer.fit_transform([text1, text2])
86
+ return cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]
87
+ except:
88
+ return 0
89
+
90
+ def hamming_distance(self, h1, h2):
91
+ return sum(c1 != c2 for c1, c2 in zip(h1, h2)) if len(h1) == len(h2) else float('inf')
92
+
93
+ def store_invoice(self, file_bytes, filename):
94
+ file_hash = self.calculate_file_hash(file_bytes)
95
+ conn = sqlite3.connect(self.db_path)
96
+ cursor = conn.cursor()
97
+ cursor.execute("SELECT id FROM invoices WHERE file_hash=?", (file_hash,))
98
+ if cursor.fetchone():
99
+ conn.close()
100
+ return False, "Duplicate file. Skipped."
101
+
102
+ ext = filename.lower().split('.')[-1]
103
+ try:
104
+ if ext == 'pdf':
105
+ image = self.pdf_to_image(file_bytes)
106
+ else:
107
+ image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB'))
108
+ except Exception as e:
109
+ return False, f"Error processing file: {str(e)}"
110
+
111
+ image_hash = self.calculate_image_hash(image)
112
+ text = self.extract_text_from_image(image)
113
+ blob = self.image_to_blob(image)
114
+
115
+ cursor.execute('''INSERT INTO invoices (filename, file_hash, image_hash, extracted_text, image_data)
116
+ VALUES (?, ?, ?, ?, ?)''', (filename, file_hash, image_hash, text, blob))
117
+ conn.commit()
118
+ conn.close()
119
+ return True, "Stored successfully."
120
+
121
+ def find_duplicates(self, file_bytes, filename, threshold=0.8):
122
+ ext = filename.lower().split('.')[-1]
123
+ try:
124
+ if ext == 'pdf':
125
+ image = self.pdf_to_image(file_bytes)
126
+ else:
127
+ image = np.array(Image.open(BytesIO(file_bytes)).convert('RGB'))
128
+ except Exception as e:
129
+ return False, f"Failed to process file: {str(e)}"
130
+
131
+ image_hash = self.calculate_image_hash(image)
132
+ extracted_text = self.extract_text_from_image(image)
133
+
134
+ conn = sqlite3.connect(self.db_path)
135
+ cursor = conn.cursor()
136
+ cursor.execute("SELECT filename, image_hash, extracted_text, image_data FROM invoices")
137
+ invoices = cursor.fetchall()
138
+ conn.close()
139
+
140
+ results = []
141
+ for fname, stored_hash, stored_text, blob in invoices:
142
+ stored_image = np.array(self.blob_to_image(blob).convert('RGB'))
143
+ hash_similarity = 1 - (self.hamming_distance(image_hash, stored_hash) / len(image_hash))
144
+ text_similarity = self.calculate_text_similarity(extracted_text, stored_text)
145
+ img_similarity = self.calculate_image_similarity(image, stored_image)
146
+ combined = 0.4 * hash_similarity + 0.4 * text_similarity + 0.2 * img_similarity
147
+ if combined >= threshold:
148
+ results.append((fname, combined))
149
+ results.sort(key=lambda x: x[1], reverse=True)
150
+ return True, results
151
+
152
+
153
+ # --- Gradio UI ---
154
+ detector = InvoiceDuplicateDetector()
155
+
156
+ def upload_files(files):
157
+ messages = []
158
+ for file in files:
159
+ file_bytes = file.read()
160
+ success, msg = detector.store_invoice(file_bytes, file.name)
161
+ messages.append(f"{file.name}: {msg}")
162
+ return "\n".join(messages)
163
+
164
+ def check_file(file):
165
+ file_bytes = file.read()
166
+ ok, result = detector.find_duplicates(file_bytes, file.name)
167
+ if not ok:
168
+ return result
169
+ elif not result:
170
+ return " No duplicates found!"
171
+ else:
172
+ return "⚠️ Possible duplicates:\n" + "\n".join([f"{fname} (score: {score:.2f})" for fname, score in result])
173
+
174
+ with gr.Blocks() as demo:
175
+ gr.Markdown("# 📄 Invoice Duplicate Detector")
176
+ gr.Markdown("### Upload Invoices")
177
+ upload = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"], file_count="multiple")
178
+ out1 = gr.Textbox(label="Upload Result")
179
+ btn1 = gr.Button("Upload")
180
+ btn1.click(upload_files, inputs=upload, outputs=out1)
181
+
182
+ gr.Markdown("### Check for Duplicates")
183
+ check = gr.File(file_types=[".pdf", ".png", ".jpg", ".jpeg"])
184
+ out2 = gr.Textbox(label="Duplicate Check Result")
185
+ btn2 = gr.Button("Check")
186
+ btn2.click(check_file, inputs=check, outputs=out2)
187
+
188
+ if __name__ == '__main__':
189
+ demo.launch()
190
+