anujakkulkarni commited on
Commit
b6190f0
Β·
verified Β·
1 Parent(s): 796d6ee

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +117 -120
app.py CHANGED
@@ -9,14 +9,14 @@ from fastapi.middleware.cors import CORSMiddleware
9
  from fastapi.responses import JSONResponse
10
  import fitz # PyMuPDF
11
 
12
- # Azure Document Intelligence (Form Recognizer) - optional import
13
  try:
14
- from azure. ai.formrecognizer import DocumentAnalysisClient
15
- from azure.core.credentials import AzureKeyCredential
16
- AZURE_AVAILABLE = True
17
- except ImportError:
18
- AZURE_AVAILABLE = False
19
- print("Warning: azure-ai-formrecognizer not installed. Image-based PDFs won't be supported.")
20
 
21
  app = FastAPI(title="Invoice Splitter API")
22
 
@@ -28,52 +28,47 @@ app.add_middleware(
28
  allow_headers=["*"],
29
  )
30
 
31
- # --- Azure Document Intelligence Configuration (FROM HUGGING FACE SECRETS) ---
32
- # These will be automatically loaded from Hugging Face Spaces secrets
33
- AZURE_FORM_RECOGNIZER_ENDPOINT = os. getenv("AZURE_FORM_RECOGNIZER_ENDPOINT", "")
34
- AZURE_FORM_RECOGNIZER_KEY = os.getenv("AZURE_FORM_RECOGNIZER_KEY", "")
35
 
36
- azure_client = None
37
 
38
 
39
- def get_azure_client() -> Optional[DocumentAnalysisClient]:
40
- """Get or create Azure Document Intelligence client."""
41
- global azure_client
42
 
43
- if not AZURE_AVAILABLE:
44
- print("Azure SDK not available")
45
  return None
46
 
47
- if azure_client is None:
48
- # Check if credentials are configured via environment variables
49
- if not AZURE_FORM_RECOGNIZER_ENDPOINT or not AZURE_FORM_RECOGNIZER_KEY:
50
- print("Warning: Azure credentials not found in environment variables.")
51
- print("Please configure AZURE_FORM_RECOGNIZER_ENDPOINT and AZURE_FORM_RECOGNIZER_KEY")
52
- print("in your Hugging Face Space secrets.")
53
  return None
54
 
55
  try:
56
- azure_client = DocumentAnalysisClient(
57
- endpoint=AZURE_FORM_RECOGNIZER_ENDPOINT,
58
- credential=AzureKeyCredential(AZURE_FORM_RECOGNIZER_KEY)
59
- )
60
- print("βœ“ Azure Document Intelligence client initialized")
61
- print(f" Endpoint: {AZURE_FORM_RECOGNIZER_ENDPOINT}")
62
- except Exception as e:
63
- print(f"Failed to initialize Azure client: {e}")
64
  return None
65
 
66
- return azure_client
67
 
68
 
69
  # --- Regex patterns for text-based PDF extraction ---
70
  INVOICE_NO_RE = re.compile(
71
- r"(?: Inv(?:Inoice)?\s*No\. ?|Invoice\s*No\. ?|Bill\s*No\.?|BILL\s*NO\.?|BILL\s*NO)\s*[:\-]?\s*([A-Za-z0-9\-/]+)",
72
  re.IGNORECASE,
73
  )
74
 
75
  PREFIXED_INVOICE_RE = re.compile(
76
- r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b"
77
  )
78
 
79
  GST_LIKE_RE = re.compile(r"\b(GST[-\s]?\d+[A-Za-z0-9-]*)\b", re.IGNORECASE)
@@ -93,14 +88,14 @@ def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool,
93
  pages_to_check = min(sample_pages, doc.page_count)
94
 
95
  for i in range(pages_to_check):
96
- text = doc.load_page(i).get_text("text") or ""
97
  total_text_length += len(text. strip())
98
 
99
  avg_text_length = total_text_length / pages_to_check
100
  is_image_based = avg_text_length < 50
101
 
102
  print(
103
- f" PDF Type Detection: avg_text_length={avg_text_length:.1f} chars/page")
104
  print(
105
  f" Classification: {'IMAGE-BASED' if is_image_based else 'TEXT-BASED'} PDF")
106
 
@@ -114,7 +109,7 @@ def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool,
114
  def try_extract_invoice_from_text(text: str) -> Optional[str]:
115
  """
116
  Extract invoice number from text using regex patterns.
117
- Works for text-based PDFs.
118
  """
119
  if not text:
120
  return None
@@ -127,7 +122,7 @@ def try_extract_invoice_from_text(text: str) -> Optional[str]:
127
  return inv
128
 
129
  # Pattern 2: Prefixed invoice (WN-12345/25) - search top portion
130
- top_text = text[: 500]
131
  m = PREFIXED_INVOICE_RE.search(top_text)
132
  if m:
133
  inv = (m.group(1) or "").strip()
@@ -142,9 +137,9 @@ def try_extract_invoice_from_text(text: str) -> Optional[str]:
142
  return None
143
 
144
 
145
- def extract_invoice_text_based(page: fitz.Page) -> Optional[str]:
146
  """
147
- Extract invoice number from TEXT-BASED PDF.
148
  Uses the original fast text extraction method.
149
  """
150
  # Try full-page text
@@ -156,72 +151,76 @@ def extract_invoice_text_based(page: fitz.Page) -> Optional[str]:
156
  # Try block-level text
157
  for block in (page.get_text("blocks") or []):
158
  block_text = block[4] if len(block) > 4 else ""
159
- if block_text:
160
  inv = try_extract_invoice_from_text(block_text)
161
- if inv:
162
  return inv
163
 
164
  return None
165
 
166
 
167
  # ============================================================================
168
- # IMAGE-BASED PDF EXTRACTION (Azure Document Intelligence)
169
  # ============================================================================
170
 
171
- def extract_invoice_azure(page: fitz.Page) -> Optional[str]:
172
  """
173
- Extract invoice number from IMAGE-BASED PDF using Azure Document Intelligence.
174
  """
175
- client = get_azure_client()
176
- if not client:
177
- print(" Azure client not available")
178
  return None
179
 
180
  try:
181
  # Convert page to image
182
  pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x resolution
183
  img_bytes = pix.tobytes("png")
184
-
185
- # Analyze with Azure prebuilt invoice model
186
- print(" Calling Azure Document Intelligence API...")
187
- poller = client.begin_analyze_document(
188
- "prebuilt-invoice",
189
- document=img_bytes
190
- )
191
- result = poller.result()
192
-
193
- # Extract invoice ID from structured fields
194
- if result.documents:
195
- for document in result.documents:
196
- if hasattr(document, 'fields') and document.fields:
197
- # Try InvoiceId field
198
- if 'InvoiceId' in document.fields and document.fields['InvoiceId']:
199
- invoice_id = document.fields['InvoiceId'].value
200
- if invoice_id:
201
- print(f" βœ“ Azure found InvoiceId: {invoice_id}")
202
- return str(invoice_id).strip()
203
-
204
- # Try PurchaseOrder field
205
- if 'PurchaseOrder' in document.fields and document.fields['PurchaseOrder']:
206
- po = document.fields['PurchaseOrder'].value
207
- if po:
208
- print(f" βœ“ Azure found PurchaseOrder: {po}")
209
- return str(po).strip()
210
-
211
- # Fallback: try regex on Azure-extracted text
212
- if result.content:
213
- print(
214
- f" Azure extracted {len(result.content)} chars, trying regex...")
215
- inv = try_extract_invoice_from_text(result.content)
216
- if inv:
217
- print(f" βœ“ Found via regex on Azure text: {inv}")
218
- return inv
219
-
220
- print(" βœ— Azure: No invoice found")
 
 
 
 
221
  return None
222
 
223
  except Exception as e:
224
- print(f" βœ— Azure extraction failed: {e}")
225
  return None
226
 
227
 
@@ -230,7 +229,7 @@ def extract_invoice_azure(page: fitz.Page) -> Optional[str]:
230
  # ============================================================================
231
 
232
  def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]:
233
- """Try text extraction first, then Azure as fallback"""
234
 
235
  # ALWAYS try text extraction first (fast, no API cost)
236
  text_result = extract_invoice_text_based(page)
@@ -238,12 +237,12 @@ def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optiona
238
  print(f" βœ“ Found via text extraction: {text_result}")
239
  return text_result
240
 
241
- # If text fails AND PDF seems image-based, try Azure
242
- if is_image_pdf:
243
- azure_result = extract_invoice_azure(page)
244
- if azure_result:
245
- print(f" βœ“ Found via Azure: {azure_result}")
246
- return azure_result
247
 
248
  return None
249
 
@@ -264,7 +263,7 @@ def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> byt
264
 
265
  @app.post("/split-invoices")
266
  async def split_invoices(
267
- file: UploadFile = File(...),
268
  include_pdf: bool = Form(True),
269
  initial_dpi: int = Form(300), # Kept for compatibility
270
  ):
@@ -272,12 +271,12 @@ async def split_invoices(
272
  Split a multi-invoice PDF into separate PDFs based on invoice numbers.
273
 
274
  Automatically detects PDF type:
275
- - Text-based PDFs: Uses fast text extraction (original method)
276
- - Image-based PDFs: Uses Azure Document Intelligence for accurate OCR
277
 
278
  Parameters:
279
  - file: PDF file to split
280
- - include_pdf: Whether to include base64 PDF in response
281
  - initial_dpi: DPI setting (kept for compatibility)
282
  """
283
  if not file.filename.lower().endswith(".pdf"):
@@ -300,40 +299,39 @@ async def split_invoices(
300
  # Step 1: Detect PDF type (text-based vs image-based)
301
  is_image_pdf, avg_text_len = is_image_based_pdf(doc)
302
 
303
- if is_image_pdf and not get_azure_client():
304
  raise HTTPException(
305
  status_code=500,
306
- detail="Image-based PDF detected but Azure Document Intelligence is not configured. "
307
- "Please add AZURE_FORM_RECOGNIZER_ENDPOINT and AZURE_FORM_RECOGNIZER_KEY "
308
- "to your Hugging Face Space secrets."
309
  )
310
 
311
  # Step 2: Extract invoice numbers from each page
312
  page_invoice_nos: List[Optional[str]] = []
313
  for i in range(doc.page_count):
314
  print(f"\n--- Page {i+1}/{doc.page_count} ---")
315
- inv = extract_invoice_no_from_page(doc. load_page(i), is_image_pdf)
316
- if inv:
317
  print(f" βœ“ Invoice found: {inv}")
318
  else:
319
  print(f" βœ— No invoice found")
320
  page_invoice_nos.append(inv)
321
 
322
  print(f"\n{'='*60}")
323
- print(f"Extraction Results: {page_invoice_nos}")
324
  print(f"{'='*60}")
325
 
326
  # Step 3: Group pages by invoice number
327
- groups: List[Dict] = []
328
- current_group_pages: List[int] = []
329
- current_invoice: Optional[str] = None
330
 
331
  for idx, inv in enumerate(page_invoice_nos):
332
- if current_invoice is None:
333
  # Start first group
334
  current_invoice = inv
335
  current_group_pages = [idx]
336
- else:
337
  if inv is not None and inv != current_invoice:
338
  # New invoice detected - save current group
339
  groups.append({
@@ -349,7 +347,7 @@ async def split_invoices(
349
  # Save last group
350
  if current_group_pages:
351
  groups.append({
352
- "invoice_no": current_invoice,
353
  "pages": current_group_pages[:]
354
  })
355
 
@@ -369,8 +367,8 @@ async def split_invoices(
369
  info = {
370
  "invoice_no": g["invoice_no"],
371
  "pages": [p + 1 for p in g["pages"]], # 1-based for humans
372
- "num_pages": len(g["pages"]),
373
- "size_bytes": len(part_bytes),
374
  }
375
  if include_pdf:
376
  info["pdf_base64"] = base64.b64encode(
@@ -389,28 +387,27 @@ async def split_invoices(
389
 
390
  return JSONResponse({
391
  "count": len(parts),
392
- "pdf_type": "image-based" if is_image_pdf else "text-based",
393
  "parts": parts
394
  })
395
 
396
  except HTTPException:
397
  raise
398
- except Exception as e:
399
  print(f"\nβœ— Error: {str(e)}")
400
  import traceback
401
  traceback.print_exc()
402
- return JSONResponse({"error": str(e)}, status_code=500)
403
 
404
 
405
  @app.get("/health")
406
  async def health_check():
407
- """Health check endpoint to verify Azure configuration."""
408
- azure_status = "configured" if get_azure_client() else "not configured"
409
  return {
410
  "status": "healthy",
411
- "azure_document_intelligence": azure_status,
412
- "azure_available": AZURE_AVAILABLE,
413
- "endpoint": AZURE_FORM_RECOGNIZER_ENDPOINT if azure_status == "configured" else "not set"
414
  }
415
 
416
  if __name__ == "__main__":
 
9
  from fastapi.responses import JSONResponse
10
  import fitz # PyMuPDF
11
 
12
+ # Google Gemini - optional import
13
  try:
14
+ import google.generativeai as genai
15
+ from PIL import Image
16
+ GEMINI_AVAILABLE = True
17
+ except ImportError:
18
+ GEMINI_AVAILABLE = False
19
+ print("Warning: google-generativeai not installed. Image-based PDFs won't be supported.")
20
 
21
  app = FastAPI(title="Invoice Splitter API")
22
 
 
28
  allow_headers=["*"],
29
  )
30
 
31
+ # --- Google Gemini Configuration ---
32
+ # This will be automatically loaded from environment variables
33
+ GEMINI_API_KEY = os. getenv("GEMINI_API_KEY", "")
 
34
 
35
+ gemini_model = None
36
 
37
 
38
+ def get_gemini_model():
39
+ """Get or create Gemini model instance."""
40
+ global gemini_model
41
 
42
+ if not GEMINI_AVAILABLE:
43
+ print("Gemini SDK not available")
44
  return None
45
 
46
+ if gemini_model is None:
47
+ # Check if API key is configured via environment variables
48
+ if not GEMINI_API_KEY:
49
+ print("Warning: Gemini API key not found in environment variables.")
50
+ print("Please configure GEMINI_API_KEY in your environment variables.")
 
51
  return None
52
 
53
  try:
54
+ genai.configure(api_key=GEMINI_API_KEY)
55
+ gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp')
56
+ print("βœ“ Google Gemini Flash 2.0 initialized")
57
+ except Exception as e:
58
+ print(f"Failed to initialize Gemini model: {e}")
 
 
 
59
  return None
60
 
61
+ return gemini_model
62
 
63
 
64
  # --- Regex patterns for text-based PDF extraction ---
65
  INVOICE_NO_RE = re.compile(
66
+ r"(? : Inv(? :oice)?\s*No\. ? |Invoice\s*No\.? |Bill\s*No\.?|BILL\s*NO\.?|BILL\s*NO)\s*[:\-]?\s*([A-Za-z0-9\-/]+)",
67
  re.IGNORECASE,
68
  )
69
 
70
  PREFIXED_INVOICE_RE = re.compile(
71
+ r"\b([A-Z]{2,4}[-/]\d{4,}(? :/\d+)?[A-Z]*)\b"
72
  )
73
 
74
  GST_LIKE_RE = re.compile(r"\b(GST[-\s]?\d+[A-Za-z0-9-]*)\b", re.IGNORECASE)
 
88
  pages_to_check = min(sample_pages, doc.page_count)
89
 
90
  for i in range(pages_to_check):
91
+ text = doc. load_page(i).get_text("text") or ""
92
  total_text_length += len(text. strip())
93
 
94
  avg_text_length = total_text_length / pages_to_check
95
  is_image_based = avg_text_length < 50
96
 
97
  print(
98
+ f" PDF Type Detection: avg_text_length={avg_text_length:.1f} chars/page")
99
  print(
100
  f" Classification: {'IMAGE-BASED' if is_image_based else 'TEXT-BASED'} PDF")
101
 
 
109
  def try_extract_invoice_from_text(text: str) -> Optional[str]:
110
  """
111
  Extract invoice number from text using regex patterns.
112
+ Works for text-based PDFs.
113
  """
114
  if not text:
115
  return None
 
122
  return inv
123
 
124
  # Pattern 2: Prefixed invoice (WN-12345/25) - search top portion
125
+ top_text = text[:500]
126
  m = PREFIXED_INVOICE_RE.search(top_text)
127
  if m:
128
  inv = (m.group(1) or "").strip()
 
137
  return None
138
 
139
 
140
+ def extract_invoice_text_based(page: fitz.Page) -> Optional[str]:
141
  """
142
+ Extract invoice number from TEXT-BASED PDF.
143
  Uses the original fast text extraction method.
144
  """
145
  # Try full-page text
 
151
  # Try block-level text
152
  for block in (page.get_text("blocks") or []):
153
  block_text = block[4] if len(block) > 4 else ""
154
+ if block_text:
155
  inv = try_extract_invoice_from_text(block_text)
156
+ if inv:
157
  return inv
158
 
159
  return None
160
 
161
 
162
  # ============================================================================
163
+ # IMAGE-BASED PDF EXTRACTION (Google Gemini)
164
  # ============================================================================
165
 
166
+ def extract_invoice_gemini(page: fitz.Page) -> Optional[str]:
167
  """
168
+ Extract invoice number from IMAGE-BASED PDF using Google Gemini Flash 2.0.
169
  """
170
+ model = get_gemini_model()
171
+ if not model:
172
+ print(" Gemini model not available")
173
  return None
174
 
175
  try:
176
  # Convert page to image
177
  pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x resolution
178
  img_bytes = pix.tobytes("png")
179
+
180
+ # Convert to PIL Image for Gemini
181
+ img = Image.open(io.BytesIO(img_bytes))
182
+
183
+ # Prompt for Gemini to extract invoice number
184
+ prompt = """
185
+ Extract the invoice number from this image. Look for:
186
+ - Invoice No, Invoice Number, Bill No, Bill Number
187
+ - Any alphanumeric code that appears to be an invoice identifier
188
+ - Purchase Order numbers if no invoice number is found
189
+
190
+ Return ONLY the invoice number/identifier itself, nothing else.
191
+ If no invoice number is found, return "NOT_FOUND".
192
+ """
193
+
194
+ print(" Calling Google Gemini API...")
195
+ response = model.generate_content([prompt, img])
196
+
197
+ if response and response.text:
198
+ extracted_text = response.text.strip()
199
+ print(f" Gemini response: {extracted_text}")
200
+
201
+ if extracted_text and extracted_text != "NOT_FOUND":
202
+ # Clean up the response
203
+ invoice_no = extracted_text.replace("*", "").replace("#", "").strip()
204
+ if invoice_no and len(invoice_no) > 2:
205
+ print(f" βœ“ Gemini found invoice: {invoice_no}")
206
+ return invoice_no
207
+
208
+ # Fallback: Get full OCR text and try regex
209
+ ocr_prompt = "Extract all text from this invoice image. Return the complete text content."
210
+ ocr_response = model.generate_content([ocr_prompt, img])
211
+
212
+ if ocr_response and ocr_response.text:
213
+ print(f" Gemini extracted {len(ocr_response.text)} chars, trying regex...")
214
+ inv = try_extract_invoice_from_text(ocr_response.text)
215
+ if inv:
216
+ print(f" βœ“ Found via regex on Gemini text: {inv}")
217
+ return inv
218
+
219
+ print(" βœ— Gemini: No invoice found")
220
  return None
221
 
222
  except Exception as e:
223
+ print(f" βœ— Gemini extraction failed: {e}")
224
  return None
225
 
226
 
 
229
  # ============================================================================
230
 
231
  def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]:
232
+ """Try text extraction first, then Gemini as fallback"""
233
 
234
  # ALWAYS try text extraction first (fast, no API cost)
235
  text_result = extract_invoice_text_based(page)
 
237
  print(f" βœ“ Found via text extraction: {text_result}")
238
  return text_result
239
 
240
+ # If text fails AND PDF seems image-based, try Gemini
241
+ if is_image_pdf:
242
+ gemini_result = extract_invoice_gemini(page)
243
+ if gemini_result:
244
+ print(f" βœ“ Found via Gemini: {gemini_result}")
245
+ return gemini_result
246
 
247
  return None
248
 
 
263
 
264
  @app.post("/split-invoices")
265
  async def split_invoices(
266
+ file: UploadFile = File(... ),
267
  include_pdf: bool = Form(True),
268
  initial_dpi: int = Form(300), # Kept for compatibility
269
  ):
 
271
  Split a multi-invoice PDF into separate PDFs based on invoice numbers.
272
 
273
  Automatically detects PDF type:
274
+ - Text-based PDFs: Uses fast text extraction (original method)
275
+ - Image-based PDFs: Uses Google Gemini Flash 2.0 for accurate OCR
276
 
277
  Parameters:
278
  - file: PDF file to split
279
+ - include_pdf: Whether to include base64 PDF in response
280
  - initial_dpi: DPI setting (kept for compatibility)
281
  """
282
  if not file.filename.lower().endswith(".pdf"):
 
299
  # Step 1: Detect PDF type (text-based vs image-based)
300
  is_image_pdf, avg_text_len = is_image_based_pdf(doc)
301
 
302
+ if is_image_pdf and not get_gemini_model():
303
  raise HTTPException(
304
  status_code=500,
305
+ detail="Image-based PDF detected but Google Gemini is not configured. "
306
+ "Please add GEMINI_API_KEY to your environment variables."
 
307
  )
308
 
309
  # Step 2: Extract invoice numbers from each page
310
  page_invoice_nos: List[Optional[str]] = []
311
  for i in range(doc.page_count):
312
  print(f"\n--- Page {i+1}/{doc.page_count} ---")
313
+ inv = extract_invoice_no_from_page(doc.load_page(i), is_image_pdf)
314
+ if inv:
315
  print(f" βœ“ Invoice found: {inv}")
316
  else:
317
  print(f" βœ— No invoice found")
318
  page_invoice_nos.append(inv)
319
 
320
  print(f"\n{'='*60}")
321
+ print(f"Extraction Results: {page_invoice_nos}")
322
  print(f"{'='*60}")
323
 
324
  # Step 3: Group pages by invoice number
325
+ groups: List[Dict] = []
326
+ current_group_pages: List[int] = []
327
+ current_invoice: Optional[str] = None
328
 
329
  for idx, inv in enumerate(page_invoice_nos):
330
+ if current_invoice is None:
331
  # Start first group
332
  current_invoice = inv
333
  current_group_pages = [idx]
334
+ else:
335
  if inv is not None and inv != current_invoice:
336
  # New invoice detected - save current group
337
  groups.append({
 
347
  # Save last group
348
  if current_group_pages:
349
  groups.append({
350
+ "invoice_no": current_invoice,
351
  "pages": current_group_pages[:]
352
  })
353
 
 
367
  info = {
368
  "invoice_no": g["invoice_no"],
369
  "pages": [p + 1 for p in g["pages"]], # 1-based for humans
370
+ "num_pages": len(g["pages"]),
371
+ "size_bytes": len(part_bytes),
372
  }
373
  if include_pdf:
374
  info["pdf_base64"] = base64.b64encode(
 
387
 
388
  return JSONResponse({
389
  "count": len(parts),
390
+ "pdf_type": "image-based" if is_image_pdf else "text-based",
391
  "parts": parts
392
  })
393
 
394
  except HTTPException:
395
  raise
396
+ except Exception as e:
397
  print(f"\nβœ— Error: {str(e)}")
398
  import traceback
399
  traceback.print_exc()
400
+ return JSONResponse({"error": str(e)}, status_code=500)
401
 
402
 
403
  @app.get("/health")
404
  async def health_check():
405
+ """Health check endpoint to verify Gemini configuration."""
406
+ gemini_status = "configured" if get_gemini_model() else "not configured"
407
  return {
408
  "status": "healthy",
409
+ "gemini_flash": gemini_status,
410
+ "gemini_available": GEMINI_AVAILABLE,
 
411
  }
412
 
413
  if __name__ == "__main__":