anujakkulkarni commited on
Commit
b92224e
·
verified ·
1 Parent(s): 5034d81

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +103 -55
app.py CHANGED
@@ -14,7 +14,7 @@ try:
14
  import google.generativeai as genai
15
  from PIL import Image
16
  GEMINI_AVAILABLE = True
17
- except ImportError:
18
  GEMINI_AVAILABLE = False
19
  print("Warning: google-generativeai not installed. Image-based PDFs won't be supported.")
20
 
@@ -54,7 +54,7 @@ def get_gemini_model():
54
  genai.configure(api_key=GEMINI_API_KEY)
55
  gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp')
56
  print("✓ Google Gemini Flash 2.0 initialized")
57
- except Exception as e:
58
  print(f"Failed to initialize Gemini model: {e}")
59
  return None
60
 
@@ -71,7 +71,8 @@ PREFIXED_INVOICE_RE = re.compile(
71
  r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b"
72
  )
73
 
74
- GST_LIKE_RE = re.compile(r"\b(GST[-\s]?\d+[A-Za-z0-9-]*)\b", re.IGNORECASE)
 
75
 
76
 
77
  def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]:
@@ -106,33 +107,54 @@ def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool,
106
  # TEXT-BASED PDF EXTRACTION (Original Code)
107
  # ============================================================================
108
 
 
 
 
 
 
 
 
 
 
 
 
109
  def try_extract_invoice_from_text(text: str) -> Optional[str]:
110
  """
111
  Extract invoice number from text using regex patterns.
112
- Works for text-based PDFs.
 
 
113
  """
114
  if not text:
115
  return None
116
 
117
- # Pattern 1: Labeled invoice (Invoice No, Bill No, etc.)
118
- m = INVOICE_NO_RE. search(text)
 
 
119
  if m:
120
  inv = (m.group(1) or "").strip()
121
- if inv and inv.lower() != "invoice" and len(inv) > 2:
122
  return inv
123
 
124
- # Pattern 2: Prefixed invoice (WN-12345/25) - search top portion
125
- top_text = text[:500]
126
  m = PREFIXED_INVOICE_RE.search(top_text)
127
  if m:
128
  inv = (m.group(1) or "").strip()
129
- if inv and len(inv) >= 7:
 
130
  return inv
131
 
132
- # Pattern 3: GST format
133
- m = GST_LIKE_RE.search(text)
134
- if m:
135
- return m.group(1).replace(" ", "").strip()
 
 
 
 
 
136
 
137
  return None
138
 
@@ -151,9 +173,9 @@ def extract_invoice_text_based(page: fitz.Page) -> Optional[str]:
151
  # Try block-level text
152
  for block in (page.get_text("blocks") or []):
153
  block_text = block[4] if len(block) > 4 else ""
154
- if block_text:
155
  inv = try_extract_invoice_from_text(block_text)
156
- if inv:
157
  return inv
158
 
159
  return None
@@ -176,7 +198,7 @@ def extract_invoice_gemini(page: fitz.Page) -> Optional[str]:
176
  # Convert page to image
177
  pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x resolution
178
  img_bytes = pix.tobytes("png")
179
-
180
  # Convert to PIL Image for Gemini
181
  img = Image.open(io.BytesIO(img_bytes))
182
 
@@ -193,24 +215,26 @@ def extract_invoice_gemini(page: fitz.Page) -> Optional[str]:
193
 
194
  print(" Calling Google Gemini API...")
195
  response = model.generate_content([prompt, img])
196
-
197
  if response and response.text:
198
  extracted_text = response.text.strip()
199
  print(f" Gemini response: {extracted_text}")
200
-
201
  if extracted_text and extracted_text != "NOT_FOUND":
202
  # Clean up the response
203
- invoice_no = extracted_text.replace("*", "").replace("#", "").strip()
 
204
  if invoice_no and len(invoice_no) > 2:
205
  print(f" ✓ Gemini found invoice: {invoice_no}")
206
  return invoice_no
207
-
208
  # Fallback: Get full OCR text and try regex
209
  ocr_prompt = "Extract all text from this invoice image. Return the complete text content."
210
  ocr_response = model.generate_content([ocr_prompt, img])
211
-
212
  if ocr_response and ocr_response.text:
213
- print(f" Gemini extracted {len(ocr_response.text)} chars, trying regex...")
 
214
  inv = try_extract_invoice_from_text(ocr_response.text)
215
  if inv:
216
  print(f" ✓ Found via regex on Gemini text: {inv}")
@@ -263,21 +287,18 @@ def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> byt
263
 
264
  @app.post("/split-invoices")
265
  async def split_invoices(
266
- file: UploadFile = File(... ),
267
  include_pdf: bool = Form(True),
268
  initial_dpi: int = Form(300), # Kept for compatibility
269
  ):
270
  """
271
- Split a multi-invoice PDF into separate PDFs based on invoice numbers.
272
 
273
- Automatically detects PDF type:
274
- - Text-based PDFs: Uses fast text extraction (original method)
275
- - Image-based PDFs: Uses Google Gemini Flash 2.0 for accurate OCR
276
 
277
- Parameters:
278
- - file: PDF file to split
279
- - include_pdf: Whether to include base64 PDF in response
280
- - initial_dpi: DPI setting (kept for compatibility)
281
  """
282
  if not file.filename.lower().endswith(".pdf"):
283
  raise HTTPException(status_code=400, detail="only PDF is supported")
@@ -307,41 +328,61 @@ async def split_invoices(
307
  )
308
 
309
  # Step 2: Extract invoice numbers from each page
310
- page_invoice_nos: List[Optional[str]] = []
311
  for i in range(doc.page_count):
312
  print(f"\n--- Page {i+1}/{doc.page_count} ---")
313
  inv = extract_invoice_no_from_page(doc.load_page(i), is_image_pdf)
 
314
  if inv:
315
- print(f" ✓ Invoice found: {inv}")
316
  else:
317
- print(f" ✗ No invoice found")
318
  page_invoice_nos.append(inv)
319
 
320
  print(f"\n{'='*60}")
321
- print(f"Extraction Results: {page_invoice_nos}")
322
  print(f"{'='*60}")
323
 
324
- # Step 3: Group pages by invoice number
325
- groups: List[Dict] = []
326
- current_group_pages: List[int] = []
327
- current_invoice: Optional[str] = None
 
 
 
 
 
 
 
 
 
 
 
 
328
 
329
- for idx, inv in enumerate(page_invoice_nos):
330
- if current_invoice is None:
331
- # Start first group
 
 
 
 
 
 
 
332
  current_invoice = inv
333
  current_group_pages = [idx]
334
  else:
 
335
  if inv is not None and inv != current_invoice:
336
- # New invoice detected - save current group
337
  groups.append({
338
- "invoice_no": current_invoice,
339
- "pages": current_group_pages[: ],
340
  })
341
  current_invoice = inv
342
  current_group_pages = [idx]
343
  else:
344
- # Continue current group (same invoice or no invoice)
345
  current_group_pages.append(idx)
346
 
347
  # Save last group
@@ -351,9 +392,15 @@ async def split_invoices(
351
  "pages": current_group_pages[:]
352
  })
353
 
354
- # If no invoices found, return whole document as one part
 
 
 
 
 
 
355
  if all(g["invoice_no"] is None for g in groups):
356
- print("\n⚠ Warning: No invoices detected in any page!")
357
  print(" Returning entire PDF as single part")
358
  groups = [{
359
  "invoice_no": None,
@@ -365,12 +412,13 @@ async def split_invoices(
365
  for idx, g in enumerate(groups):
366
  part_bytes = build_pdf_from_pages(doc, g["pages"])
367
  info = {
 
368
  "invoice_no": g["invoice_no"],
369
- "pages": [p + 1 for p in g["pages"]], # 1-based for humans
370
  "num_pages": len(g["pages"]),
371
- "size_bytes": len(part_bytes),
372
  }
373
- if include_pdf:
374
  info["pdf_base64"] = base64.b64encode(
375
  part_bytes).decode("ascii")
376
  parts.append(info)
@@ -387,17 +435,17 @@ async def split_invoices(
387
 
388
  return JSONResponse({
389
  "count": len(parts),
390
- "pdf_type": "image-based" if is_image_pdf else "text-based",
391
  "parts": parts
392
  })
393
 
394
  except HTTPException:
395
  raise
396
- except Exception as e:
397
  print(f"\n✗ Error: {str(e)}")
398
  import traceback
399
  traceback.print_exc()
400
- return JSONResponse({"error": str(e)}, status_code=500)
401
 
402
 
403
  @app.get("/health")
 
14
  import google.generativeai as genai
15
  from PIL import Image
16
  GEMINI_AVAILABLE = True
17
+ except ImportError:
18
  GEMINI_AVAILABLE = False
19
  print("Warning: google-generativeai not installed. Image-based PDFs won't be supported.")
20
 
 
54
  genai.configure(api_key=GEMINI_API_KEY)
55
  gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp')
56
  print("✓ Google Gemini Flash 2.0 initialized")
57
+ except Exception as e:
58
  print(f"Failed to initialize Gemini model: {e}")
59
  return None
60
 
 
71
  r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b"
72
  )
73
 
74
+ GST_LIKE_RE = re.compile(
75
+ r"\b((?:GSTIN|GST\s*No\.?|GST\s*IN|GST)[\s:\-]*([0-9A-Z]{15}))\b", re.IGNORECASE)
76
 
77
 
78
  def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]:
 
107
  # TEXT-BASED PDF EXTRACTION (Original Code)
108
  # ============================================================================
109
 
110
+
111
+ def normalize_text_for_search(s: str) -> str:
112
+ """Light normalization: collapse whitespace and normalize common separators."""
113
+ if not s:
114
+ return s
115
+ s = s.replace("\u00A0", " ") # non-breaking space
116
+ s = re.sub(r"[\r\n\t]+", " ", s)
117
+ s = re.sub(r"[ ]{2,}", " ", s).strip()
118
+ return s
119
+
120
+
121
  def try_extract_invoice_from_text(text: str) -> Optional[str]:
122
  """
123
  Extract invoice number from text using regex patterns.
124
+ - Prefer explicit labeled Invoice/Bill patterns.
125
+ - Prefer prefixed invoice formats found in the top of the page.
126
+ - Use GST only as a last resort and tag it so it won't be mistaken for an invoice id.
127
  """
128
  if not text:
129
  return None
130
 
131
+ text_norm = normalize_text_for_search(text)
132
+
133
+ # 1) Labeled invoice like "Invoice No", "Inv No."
134
+ m = INVOICE_NO_RE.search(text_norm)
135
  if m:
136
  inv = (m.group(1) or "").strip()
137
+ if inv and inv.lower() not in ("invoice", "inv", "bill") and len(inv) > 2:
138
  return inv
139
 
140
+ # 2) Search top portion for prefixed invoice codes (WN-1234, 5EN19710, etc.)
141
+ top_text = text_norm[:600] # bigger top area to be robust
142
  m = PREFIXED_INVOICE_RE.search(top_text)
143
  if m:
144
  inv = (m.group(1) or "").strip()
145
+ # extra length check so tiny numeric matches don't pass
146
+ if inv and len(re.sub(r"[^A-Za-z0-9]", "", inv)) >= 5:
147
  return inv
148
 
149
+ # 3) As absolute last-resort: strict GST detection (only accept 15-char GSTIN)
150
+ gm = GST_LIKE_RE.search(text_norm)
151
+ if gm:
152
+ gst_val = gm.group(2) or ""
153
+ gst_val = gst_val.replace(" ", "").strip().upper()
154
+ # Only accept if 15 alnum chars (typical Indian GSTIN length)
155
+ if len(gst_val) == 15 and re.match(r"^[0-9A-Z]{15}$", gst_val):
156
+ # tag it so grouping won't treat GST same as invoice ID
157
+ return f"GST:{gst_val}"
158
 
159
  return None
160
 
 
173
  # Try block-level text
174
  for block in (page.get_text("blocks") or []):
175
  block_text = block[4] if len(block) > 4 else ""
176
+ if block_text:
177
  inv = try_extract_invoice_from_text(block_text)
178
+ if inv:
179
  return inv
180
 
181
  return None
 
198
  # Convert page to image
199
  pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x resolution
200
  img_bytes = pix.tobytes("png")
201
+
202
  # Convert to PIL Image for Gemini
203
  img = Image.open(io.BytesIO(img_bytes))
204
 
 
215
 
216
  print(" Calling Google Gemini API...")
217
  response = model.generate_content([prompt, img])
218
+
219
  if response and response.text:
220
  extracted_text = response.text.strip()
221
  print(f" Gemini response: {extracted_text}")
222
+
223
  if extracted_text and extracted_text != "NOT_FOUND":
224
  # Clean up the response
225
+ invoice_no = extracted_text.replace(
226
+ "*", "").replace("#", "").strip()
227
  if invoice_no and len(invoice_no) > 2:
228
  print(f" ✓ Gemini found invoice: {invoice_no}")
229
  return invoice_no
230
+
231
  # Fallback: Get full OCR text and try regex
232
  ocr_prompt = "Extract all text from this invoice image. Return the complete text content."
233
  ocr_response = model.generate_content([ocr_prompt, img])
234
+
235
  if ocr_response and ocr_response.text:
236
+ print(
237
+ f" Gemini extracted {len(ocr_response.text)} chars, trying regex...")
238
  inv = try_extract_invoice_from_text(ocr_response.text)
239
  if inv:
240
  print(f" ✓ Found via regex on Gemini text: {inv}")
 
287
 
288
  @app.post("/split-invoices")
289
  async def split_invoices(
290
+ file: UploadFile = File(...),
291
  include_pdf: bool = Form(True),
292
  initial_dpi: int = Form(300), # Kept for compatibility
293
  ):
294
  """
295
+ Split a multi-invoice PDF into separate PDFs based on invoice numbers.
296
 
297
+ - Text-based PDFs: Uses fast text extraction
298
+ - Image-based PDFs: Uses Google Gemini Flash 2.0 (if configured)
 
299
 
300
+ Note: GST values (tagged as "GST:...") are treated as a last-resort identifier and
301
+ are ignored for splitting by default (so repeated company GST won't prevent splits).
 
 
302
  """
303
  if not file.filename.lower().endswith(".pdf"):
304
  raise HTTPException(status_code=400, detail="only PDF is supported")
 
328
  )
329
 
330
  # Step 2: Extract invoice numbers from each page
331
+ page_invoice_nos: List[Optional[str]] = []
332
  for i in range(doc.page_count):
333
  print(f"\n--- Page {i+1}/{doc.page_count} ---")
334
  inv = extract_invoice_no_from_page(doc.load_page(i), is_image_pdf)
335
+ # inv may be something like "5EN19710" or "GST:12ABCDE..." or None
336
  if inv:
337
+ print(f" ✓ Raw extracted id: {inv}")
338
  else:
339
+ print(f" ✗ No invoice found (raw)")
340
  page_invoice_nos.append(inv)
341
 
342
  print(f"\n{'='*60}")
343
+ print(f"Raw Extraction Results: {page_invoice_nos}")
344
  print(f"{'='*60}")
345
 
346
+ # ---------------------------------------------------------
347
+ # Post-process extracted ids before grouping
348
+ # - Treat GST:<value> as a LAST-RESORT marker and ignore it for splitting
349
+ # (convert to None) so repeated company GST doesn't group pages together.
350
+ # - Keep actual invoice ids like '5EN19710' intact.
351
+ # ---------------------------------------------------------
352
+ page_invoice_nos_filtered: List[Optional[str]] = []
353
+ for v in page_invoice_nos:
354
+ if v is None:
355
+ page_invoice_nos_filtered.append(None)
356
+ else:
357
+ # If GST-tagged value (we returned "GST:..."), ignore it for splitting
358
+ if isinstance(v, str) and v.upper().startswith("GST:"):
359
+ page_invoice_nos_filtered.append(None)
360
+ else:
361
+ page_invoice_nos_filtered.append(v)
362
 
363
+ print(f"Filtered (GST ignored) Results: {page_invoice_nos_filtered}")
364
+
365
+ # Step 3: Group pages by invoice number (use filtered ids)
366
+ groups: List[Dict] = []
367
+ current_group_pages: List[int] = []
368
+ current_invoice: Optional[str] = None
369
+
370
+ for idx, inv in enumerate(page_invoice_nos_filtered):
371
+ if current_invoice is None:
372
+ # Start a new group (even if inv is None)
373
  current_invoice = inv
374
  current_group_pages = [idx]
375
  else:
376
+ # If a new non-empty invoice appears and differs -> close current group
377
  if inv is not None and inv != current_invoice:
 
378
  groups.append({
379
+ "invoice_no": current_invoice,
380
+ "pages": current_group_pages[:],
381
  })
382
  current_invoice = inv
383
  current_group_pages = [idx]
384
  else:
385
+ # Continue current group (same invoice or both None)
386
  current_group_pages.append(idx)
387
 
388
  # Save last group
 
392
  "pages": current_group_pages[:]
393
  })
394
 
395
+ # Post-process groups:
396
+ # If first group has invoice_no None and next group has non-None -> merge leading None
397
+ if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None:
398
+ groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"]
399
+ groups.pop(0)
400
+
401
+ # If, after filtering, all groups are None (no invoice detected), return whole doc as one part
402
  if all(g["invoice_no"] is None for g in groups):
403
+ print("\n⚠ Warning: No invoices detected in any page (after GST ignored)!")
404
  print(" Returning entire PDF as single part")
405
  groups = [{
406
  "invoice_no": None,
 
412
  for idx, g in enumerate(groups):
413
  part_bytes = build_pdf_from_pages(doc, g["pages"])
414
  info = {
415
+ # Keep invoice_no as detected in filtered set (None or actual invoice id)
416
  "invoice_no": g["invoice_no"],
417
+ "pages": [p + 1 for p in g["pages"]], # 1-based for humans
418
  "num_pages": len(g["pages"]),
419
+ "size_bytes": len(part_bytes),
420
  }
421
+ if include_pdf:
422
  info["pdf_base64"] = base64.b64encode(
423
  part_bytes).decode("ascii")
424
  parts.append(info)
 
435
 
436
  return JSONResponse({
437
  "count": len(parts),
438
+ "pdf_type": "image-based" if is_image_pdf else "text-based",
439
  "parts": parts
440
  })
441
 
442
  except HTTPException:
443
  raise
444
+ except Exception as e:
445
  print(f"\n✗ Error: {str(e)}")
446
  import traceback
447
  traceback.print_exc()
448
+ return JSONResponse({"error": str(e)}, status_code=500)
449
 
450
 
451
  @app.get("/health")