import os import io import re import base64 import gc import tempfile from typing import List, Dict, Optional, Tuple from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks from fastapi. middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from starlette.requests import Request import fitz # PyMuPDF # Google Gemini - optional import try: import google.generativeai as genai from PIL import Image GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False print("Warning: google-generativeai not installed.Image-based PDFs won't be supported.") app = FastAPI(title="Invoice Splitter API") # ⭐ FIX 1: Increase request body size limit to handle large uploads Request.max_body_size = 200 * 1024 * 1024 # 200MB limit app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Google Gemini Configuration --- GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") gemini_model = None # ⭐ FIX 2: Configuration for response size management MAX_RESPONSE_SIZE_MB = 50 # Skip base64 if response exceeds this def get_gemini_model(): """Get or create Gemini model instance.""" global gemini_model if not GEMINI_AVAILABLE: print("Gemini SDK not available") return None if gemini_model is None: if not GEMINI_API_KEY: print("Warning: Gemini API key not found in environment variables.") return None try: genai.configure(api_key=GEMINI_API_KEY) gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp') print("✓ Google Gemini Flash 2.0 initialized") except Exception as e: print(f"Failed to initialize Gemini model: {e}") return None return gemini_model # --- Regex patterns --- INVOICE_NO_RE = re.compile( r""" (?: Invoice\s*No\. ?| Inv\. ?\s*No\.?| Bill\s*No\.?| Document\s*No\.?| Doc\s*No\.?| Tax\s*Invoice\s*No\.?| Invoice\s*#| Inv\s*# ) [\s:\-]*(?:(?:Order|Ref|No|Dt|Date)\b[\s:\-]*)* \s* ([A-Z0-9][A-Z0-9\-\/]{2,}) """, re. IGNORECASE | re.VERBOSE ) PREFIXED_INVOICE_RE = re.compile( r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b" ) GST_LIKE_RE = re.compile( r"\b((?: GSTIN|GST\s*No\. ?|GST\s*IN|GST)[\s:\-]*([0-9A-Z]{15}))\b", re.IGNORECASE) def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]: total_text_length = 0 pages_to_check = min(sample_pages, doc.page_count) for i in range(pages_to_check): text = doc.load_page(i).get_text("text") or "" total_text_length += len(text. strip()) avg_text_length = total_text_length / pages_to_check is_image_based = avg_text_length < 50 print( f" PDF Type Detection: avg_text_length={avg_text_length:.1f} chars/page") print( f" Classification: {'IMAGE-BASED' if is_image_based else 'TEXT-BASED'} PDF") return is_image_based, avg_text_length def normalize_text_for_search(s: str) -> str: if not s: return s s = s.replace("\u00A0", " ") s = re.sub(r"[\r\n\t]+", " ", s) s = re.sub(r"[ ]{2,}", " ", s).strip() return s def try_extract_invoice_from_text(text: str) -> Optional[str]: if not text: return None text_norm = normalize_text_for_search(text) label_match = re.search( r"(?:Invoice|Inv|Bill|Doc|Document|Tax\s*Invoice)\s*(?:No|#|\.|: )", text_norm, re.IGNORECASE ) if label_match: start_idx = label_match.end() candidate_text = text_norm[start_idx: start_idx + 60] clean_candidates = re.sub(r"[:\-\(\)\[\]]", " ", candidate_text) words = clean_candidates.split() for word in words: word = word.strip(".,;") if word. lower() in ("order", "ref", "no", "date", "dt", "inv", "bill", "account"): continue if len(word) > 2 and any(char.isdigit() for char in word): return word top_text = text_norm[:600] m = re.search(r"\b([A-Z0-9][A-Z0-9\-\/]{4,})\b", top_text) if m: inv = m.group(1) if sum(c.isdigit() for c in inv) >= 3: return inv gm = GST_LIKE_RE.search(text_norm) if gm: gst_val = gm.group(2) or "" gst_val = gst_val.replace(" ", "").strip().upper() if len(gst_val) == 15 and re.match(r"^[0-9A-Z]{15}$", gst_val): return f"GST:{gst_val}" return None def extract_invoice_text_based(page: fitz.Page) -> Optional[str]: text = page.get_text("text") or "" inv = try_extract_invoice_from_text(text) if inv: return inv for block in (page.get_text("blocks") or []): block_text = block[4] if len(block) > 4 else "" if block_text: inv = try_extract_invoice_from_text(block_text) if inv: return inv return None def extract_invoice_gemini(page: fitz.Page) -> Optional[str]: model = get_gemini_model() if not model: print(" Gemini model not available") return None try: # Reduced from 2x to save memory pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) img_bytes = pix.tobytes("png") pix = None # Free memory img = Image.open(io.BytesIO(img_bytes)) prompt = """ Extract the invoice number from this image. Look for: - Invoice No, Invoice Number, Bill No, Bill Number - Any alphanumeric code that appears to be an invoice identifier - Purchase Order numbers if no invoice number is found Return ONLY the invoice number/identifier itself, nothing else. If no invoice number is found, return "NOT_FOUND". """ print(" Calling Google Gemini API...") response = model.generate_content([prompt, img]) if response and response.text: extracted_text = response.text.strip() print(f" Gemini response: {extracted_text}") if extracted_text and extracted_text != "NOT_FOUND": invoice_no = extracted_text. replace( "*", "").replace("#", "").strip() if invoice_no and len(invoice_no) > 2: print(f" ✓ Gemini found invoice: {invoice_no}") img.close() return invoice_no ocr_prompt = "Extract all text from this invoice image. Return the complete text content." ocr_response = model.generate_content([ocr_prompt, img]) if ocr_response and ocr_response.text: print( f" Gemini extracted {len(ocr_response.text)} chars, trying regex...") inv = try_extract_invoice_from_text(ocr_response.text) if inv: print(f" ✓ Found via regex on Gemini text: {inv}") img.close() return inv img.close() print(" ✗ Gemini: No invoice found") return None except Exception as e: print(f" ✗ Gemini extraction failed: {e}") return None def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]: text_result = extract_invoice_text_based(page) if text_result: print(f" ✓ Found via text extraction: {text_result}") return text_result if is_image_pdf: gemini_result = extract_invoice_gemini(page) if gemini_result: print(f" ✓ Found via Gemini: {gemini_result}") return gemini_result return None def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> bytes: """Create a new PDF with the given pages (0-based indices).""" out = fitz.open() try: for i in page_indices: out.insert_pdf(src_doc, from_page=i, to_page=i) # ⭐ Compress output pdf_bytes = out.tobytes(garbage=4, deflate=True) return pdf_bytes finally: out.close() # ⭐ FIX 3: Cleanup utility def remove_file(path: str): try: if os.path.exists(path): os.remove(path) print(f"🧹 Cleaned up: {path}") except Exception as e: print(f"⚠️ Cleanup warning: {e}") # ============================================================================ # API ENDPOINTS # ============================================================================ @app.post("/split-invoices") async def split_invoices( background_tasks: BackgroundTasks, file: UploadFile = File(...), include_pdf: bool = Form(True), max_file_size_mb: int = Form(200), ): """ Split a multi-invoice PDF into separate PDFs. ⭐ HANDLES LARGE FILES: - Streams upload to disk (no memory overflow) - Monitors response size - Automatically skips base64 if response would exceed 50MB - For very large files, use /split-invoices-stream endpoint instead """ if not file.filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="Only PDF is supported") # ⭐ FIX 4: Stream large uploads to disk instead of memory max_size_bytes = max_file_size_mb * 1024 * 1024 fd, temp_path = tempfile.mkstemp(suffix=".pdf") os.close(fd) doc = None try: # Stream upload to temp file print(f"📥 Streaming upload: {file.filename}") total_size = 0 with open(temp_path, "wb") as buffer: chunk_size = 5 * 1024 * 1024 # 5MB chunks while content := await file.read(chunk_size): total_size += len(content) if total_size > max_size_bytes: remove_file(temp_path) raise HTTPException( status_code=413, detail=f"File too large. Max: {max_file_size_mb}MB, got: {total_size/(1024*1024):.1f}MB" ) buffer.write(content) if total_size % (20 * 1024 * 1024) < chunk_size: print(f" 📊 Uploaded: {total_size/(1024*1024):.1f}MB") file_size_mb = total_size / (1024 * 1024) print(f"💾 Saved {file_size_mb:.2f}MB to disk") # Open from disk doc = fitz. open(temp_path) if doc. page_count == 0: raise HTTPException(status_code=400, detail="No pages found") print(f"\n{'='*60}") print(f"Processing: {file.filename} ({doc.page_count} pages)") print(f"{'='*60}") # Detect PDF type is_image_pdf, avg_text_len = is_image_based_pdf(doc) if is_image_pdf and not get_gemini_model(): raise HTTPException( status_code=500, detail="Image-based PDF detected but Google Gemini is not configured." ) # Extract invoice numbers page_invoice_nos: List[Optional[str]] = [] for i in range(doc.page_count): if i % 50 == 0: print(f"\n--- Processing page {i+1}/{doc. page_count} ---") page = doc. load_page(i) inv = extract_invoice_no_from_page(page, is_image_pdf) page_invoice_nos.append(inv) page = None # Free memory if i % 100 == 0: gc.collect() print(f"\nRaw Extraction: {page_invoice_nos}") # Filter GST entries page_invoice_nos_filtered = [ None if (v and v.upper().startswith("GST: ")) else v for v in page_invoice_nos ] print(f"Filtered Results: {page_invoice_nos_filtered}") # Group pages groups: List[Dict] = [] current_group_pages: List[int] = [] current_invoice: Optional[str] = None for idx, inv in enumerate(page_invoice_nos_filtered): if current_invoice is None: current_invoice = inv current_group_pages = [idx] else: if inv is not None and inv != current_invoice: groups.append({ "invoice_no": current_invoice, "pages": current_group_pages[:], }) current_invoice = inv current_group_pages = [idx] else: current_group_pages.append(idx) if current_group_pages: groups.append({ "invoice_no": current_invoice, "pages": current_group_pages[:] }) # Merge leading None group if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] groups.pop(0) if all(g["invoice_no"] is None for g in groups): print("\n⚠ Warning: No invoices detected!") groups = [{ "invoice_no": None, "pages": list(range(doc.page_count)) }] # ⭐ FIX 5: Build response with size tracking parts = [] total_response_size = 0 max_response_bytes = MAX_RESPONSE_SIZE_MB * 1024 * 1024 response_size_exceeded = False for idx, g in enumerate(groups): print(f"\n🔨 Building part {idx+1}/{len(groups)}") part_bytes = build_pdf_from_pages(doc, g["pages"]) info = { "invoice_no": g["invoice_no"], "pages": [p + 1 for p in g["pages"]], "num_pages": len(g["pages"]), "size_bytes": len(part_bytes), "size_mb": round(len(part_bytes) / (1024 * 1024), 2) } # ⭐ Smart base64 inclusion based on response size if include_pdf and not response_size_exceeded: base64_size = len(part_bytes) * 4 / 3 # Base64 overhead total_response_size += base64_size if total_response_size > max_response_bytes: print( f" ⚠️ Response size limit reached ({MAX_RESPONSE_SIZE_MB}MB)") print(f" 💡 Skipping base64 for remaining parts") print(f" 💡 Use /split-invoices-stream for large files") response_size_exceeded = True info["pdf_base64"] = None info["warning"] = f"Response too large. Use streaming endpoint." else: info["pdf_base64"] = base64.b64encode( part_bytes).decode("ascii") else: info["pdf_base64"] = None parts.append(info) del part_bytes gc.collect() print(f"\n✅ Split into {len(parts)} parts") return JSONResponse({ "success": True, "count": len(parts), "pdf_type": "image-based" if is_image_pdf else "text-based", "source_file": { "name": file.filename, "size_mb": round(file_size_mb, 2), "total_pages": doc.page_count }, "parts": parts, "response_info": { "size_limit_mb": MAX_RESPONSE_SIZE_MB, "size_exceeded": response_size_exceeded, "recommendation": "Use /split-invoices-stream for files >100MB" if response_size_exceeded else None } }) except HTTPException: raise except Exception as e: print(f"\n✗ Error: {str(e)}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) finally: if doc: doc.close() remove_file(temp_path) gc.collect() @app.post("/split-invoices-stream") async def split_invoices_stream( background_tasks: BackgroundTasks, file: UploadFile = File(...), max_file_size_mb: int = Form(200), ): """ ⭐ STREAMING VERSION FOR LARGE FILES (100MB+) Returns NDJSON (newline-delimited JSON) - one JSON object per line. Each line is a separate invoice part. This avoids building a huge JSON response in memory. """ import json if not file. filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="Only PDF is supported") max_size_bytes = max_file_size_mb * 1024 * 1024 fd, temp_path = tempfile. mkstemp(suffix=".pdf") os.close(fd) # Upload to disk try: total_size = 0 with open(temp_path, "wb") as buffer: chunk_size = 5 * 1024 * 1024 while content := await file.read(chunk_size): total_size += len(content) if total_size > max_size_bytes: remove_file(temp_path) raise HTTPException( status_code=413, detail=f"File too large") buffer.write(content) except Exception as e: remove_file(temp_path) raise async def generate_parts(): doc = None try: doc = fitz.open(temp_path) # Send status yield json.dumps({ "type": "status", "status": "processing", "total_pages": doc.page_count, "filename": file.filename }) + "\n" # Detect type is_image_pdf, _ = is_image_based_pdf(doc) # Extract page_invoice_nos = [] for i in range(doc.page_count): page = doc.load_page(i) inv = extract_invoice_no_from_page(page, is_image_pdf) page_invoice_nos.append(inv) page = None if i % 100 == 0: gc.collect() # Filter & group clean_invs = [None if (v and v.upper().startswith( "GST:")) else v for v in page_invoice_nos] groups = [] current_group = [] current_inv = None for idx, inv in enumerate(clean_invs): if current_inv is None: current_inv = inv current_group = [idx] else: if inv is not None and inv != current_inv: groups. append( {"invoice_no": current_inv, "pages": current_group}) current_inv = inv current_group = [idx] else: current_group.append(idx) if current_group: groups.append( {"invoice_no": current_inv, "pages": current_group}) if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] groups.pop(0) # Stream each part for idx, g in enumerate(groups): part_bytes = build_pdf_from_pages(doc, g["pages"]) info = { "type": "part", "part_index": idx, "invoice_no": g["invoice_no"], "pages": [p + 1 for p in g["pages"]], "num_pages": len(g["pages"]), "size_bytes": len(part_bytes), "pdf_base64": base64.b64encode(part_bytes).decode("ascii") } yield json.dumps(info) + "\n" del part_bytes gc.collect() # Complete yield json.dumps({ "type": "complete", "total_parts": len(groups) }) + "\n" except Exception as e: yield json.dumps({"type": "error", "error": str(e)}) + "\n" finally: if doc: doc.close() remove_file(temp_path) gc.collect() return StreamingResponse( generate_parts(), media_type="application/x-ndjson", headers={ "Content-Disposition": f"attachment; filename=invoices-split. ndjson"} ) @app.get("/health") async def health_check(): gemini_status = "configured" if get_gemini_model() else "not configured" return { "status": "healthy", "gemini_flash": gemini_status, "gemini_available": GEMINI_AVAILABLE, "max_upload_mb": 200, "max_response_mb": MAX_RESPONSE_SIZE_MB } if __name__ == "__main__": import uvicorn print("🚀 Starting Invoice Splitter API") print(f" Max upload: 200MB") print(f" Max response: {MAX_RESPONSE_SIZE_MB}MB") uvicorn.run( app, host="0.0.0.0", port=7860, workers=1, timeout_keep_alive=300, limit_concurrency=10 )