Spaces:
Running
Running
| import os | |
| import io | |
| import re | |
| import base64 | |
| import gc | |
| import tempfile | |
| from typing import List, Dict, Optional, Tuple | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks | |
| from fastapi. middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from starlette.requests import Request | |
| import fitz # PyMuPDF | |
| # Google Gemini - optional import | |
| try: | |
| import google.generativeai as genai | |
| from PIL import Image | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("Warning: google-generativeai not installed.Image-based PDFs won't be supported.") | |
| app = FastAPI(title="Invoice Splitter API") | |
| # ⭐ FIX 1: Increase request body size limit to handle large uploads | |
| Request.max_body_size = 200 * 1024 * 1024 # 200MB limit | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- Google Gemini Configuration --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| gemini_model = None | |
| # ⭐ FIX 2: Configuration for response size management | |
| MAX_RESPONSE_SIZE_MB = 50 # Skip base64 if response exceeds this | |
| def get_gemini_model(): | |
| """Get or create Gemini model instance.""" | |
| global gemini_model | |
| if not GEMINI_AVAILABLE: | |
| print("Gemini SDK not available") | |
| return None | |
| if gemini_model is None: | |
| if not GEMINI_API_KEY: | |
| print("Warning: Gemini API key not found in environment variables.") | |
| return None | |
| try: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| print("✓ Google Gemini Flash 2.0 initialized") | |
| except Exception as e: | |
| print(f"Failed to initialize Gemini model: {e}") | |
| return None | |
| return gemini_model | |
| # --- Regex patterns --- | |
| INVOICE_NO_RE = re.compile( | |
| r""" | |
| (?: | |
| Invoice\s*No\. ?| | |
| Inv\. ?\s*No\.?| | |
| Bill\s*No\.?| | |
| Document\s*No\.?| | |
| Doc\s*No\.?| | |
| Tax\s*Invoice\s*No\.?| | |
| Invoice\s*#| | |
| Inv\s*# | |
| ) | |
| [\s:\-]*(?:(?:Order|Ref|No|Dt|Date)\b[\s:\-]*)* | |
| \s* | |
| ([A-Z0-9][A-Z0-9\-\/]{2,}) | |
| """, | |
| re. IGNORECASE | re.VERBOSE | |
| ) | |
| PREFIXED_INVOICE_RE = re.compile( | |
| r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b" | |
| ) | |
| GST_LIKE_RE = re.compile( | |
| r"\b((?: GSTIN|GST\s*No\. ?|GST\s*IN|GST)[\s:\-]*([0-9A-Z]{15}))\b", re.IGNORECASE) | |
| def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]: | |
| total_text_length = 0 | |
| pages_to_check = min(sample_pages, doc.page_count) | |
| for i in range(pages_to_check): | |
| text = doc.load_page(i).get_text("text") or "" | |
| total_text_length += len(text. strip()) | |
| avg_text_length = total_text_length / pages_to_check | |
| is_image_based = avg_text_length < 50 | |
| print( | |
| f" PDF Type Detection: avg_text_length={avg_text_length:.1f} chars/page") | |
| print( | |
| f" Classification: {'IMAGE-BASED' if is_image_based else 'TEXT-BASED'} PDF") | |
| return is_image_based, avg_text_length | |
| def normalize_text_for_search(s: str) -> str: | |
| if not s: | |
| return s | |
| s = s.replace("\u00A0", " ") | |
| s = re.sub(r"[\r\n\t]+", " ", s) | |
| s = re.sub(r"[ ]{2,}", " ", s).strip() | |
| return s | |
| def try_extract_invoice_from_text(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| text_norm = normalize_text_for_search(text) | |
| label_match = re.search( | |
| r"(?:Invoice|Inv|Bill|Doc|Document|Tax\s*Invoice)\s*(?:No|#|\.|: )", | |
| text_norm, | |
| re.IGNORECASE | |
| ) | |
| if label_match: | |
| start_idx = label_match.end() | |
| candidate_text = text_norm[start_idx: start_idx + 60] | |
| clean_candidates = re.sub(r"[:\-\(\)\[\]]", " ", candidate_text) | |
| words = clean_candidates.split() | |
| for word in words: | |
| word = word.strip(".,;") | |
| if word. lower() in ("order", "ref", "no", "date", "dt", "inv", "bill", "account"): | |
| continue | |
| if len(word) > 2 and any(char.isdigit() for char in word): | |
| return word | |
| top_text = text_norm[:600] | |
| m = re.search(r"\b([A-Z0-9][A-Z0-9\-\/]{4,})\b", top_text) | |
| if m: | |
| inv = m.group(1) | |
| if sum(c.isdigit() for c in inv) >= 3: | |
| return inv | |
| gm = GST_LIKE_RE.search(text_norm) | |
| if gm: | |
| gst_val = gm.group(2) or "" | |
| gst_val = gst_val.replace(" ", "").strip().upper() | |
| if len(gst_val) == 15 and re.match(r"^[0-9A-Z]{15}$", gst_val): | |
| return f"GST:{gst_val}" | |
| return None | |
| def extract_invoice_text_based(page: fitz.Page) -> Optional[str]: | |
| text = page.get_text("text") or "" | |
| inv = try_extract_invoice_from_text(text) | |
| if inv: | |
| return inv | |
| for block in (page.get_text("blocks") or []): | |
| block_text = block[4] if len(block) > 4 else "" | |
| if block_text: | |
| inv = try_extract_invoice_from_text(block_text) | |
| if inv: | |
| return inv | |
| return None | |
| def extract_invoice_gemini(page: fitz.Page) -> Optional[str]: | |
| model = get_gemini_model() | |
| if not model: | |
| print(" Gemini model not available") | |
| return None | |
| try: | |
| # Reduced from 2x to save memory | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) | |
| img_bytes = pix.tobytes("png") | |
| pix = None # Free memory | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| prompt = """ | |
| Extract the invoice number from this image. Look for: | |
| - Invoice No, Invoice Number, Bill No, Bill Number | |
| - Any alphanumeric code that appears to be an invoice identifier | |
| - Purchase Order numbers if no invoice number is found | |
| Return ONLY the invoice number/identifier itself, nothing else. | |
| If no invoice number is found, return "NOT_FOUND". | |
| """ | |
| print(" Calling Google Gemini API...") | |
| response = model.generate_content([prompt, img]) | |
| if response and response.text: | |
| extracted_text = response.text.strip() | |
| print(f" Gemini response: {extracted_text}") | |
| if extracted_text and extracted_text != "NOT_FOUND": | |
| invoice_no = extracted_text. replace( | |
| "*", "").replace("#", "").strip() | |
| if invoice_no and len(invoice_no) > 2: | |
| print(f" ✓ Gemini found invoice: {invoice_no}") | |
| img.close() | |
| return invoice_no | |
| ocr_prompt = "Extract all text from this invoice image. Return the complete text content." | |
| ocr_response = model.generate_content([ocr_prompt, img]) | |
| if ocr_response and ocr_response.text: | |
| print( | |
| f" Gemini extracted {len(ocr_response.text)} chars, trying regex...") | |
| inv = try_extract_invoice_from_text(ocr_response.text) | |
| if inv: | |
| print(f" ✓ Found via regex on Gemini text: {inv}") | |
| img.close() | |
| return inv | |
| img.close() | |
| print(" ✗ Gemini: No invoice found") | |
| return None | |
| except Exception as e: | |
| print(f" ✗ Gemini extraction failed: {e}") | |
| return None | |
| def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]: | |
| text_result = extract_invoice_text_based(page) | |
| if text_result: | |
| print(f" ✓ Found via text extraction: {text_result}") | |
| return text_result | |
| if is_image_pdf: | |
| gemini_result = extract_invoice_gemini(page) | |
| if gemini_result: | |
| print(f" ✓ Found via Gemini: {gemini_result}") | |
| return gemini_result | |
| return None | |
| def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> bytes: | |
| """Create a new PDF with the given pages (0-based indices).""" | |
| out = fitz.open() | |
| try: | |
| for i in page_indices: | |
| out.insert_pdf(src_doc, from_page=i, to_page=i) | |
| # ⭐ Compress output | |
| pdf_bytes = out.tobytes(garbage=4, deflate=True) | |
| return pdf_bytes | |
| finally: | |
| out.close() | |
| # ⭐ FIX 3: Cleanup utility | |
| def remove_file(path: str): | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| print(f"🧹 Cleaned up: {path}") | |
| except Exception as e: | |
| print(f"⚠️ Cleanup warning: {e}") | |
| # ============================================================================ | |
| # API ENDPOINTS | |
| # ============================================================================ | |
| async def split_invoices( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| include_pdf: bool = Form(True), | |
| max_file_size_mb: int = Form(200), | |
| ): | |
| """ | |
| Split a multi-invoice PDF into separate PDFs. | |
| ⭐ HANDLES LARGE FILES: | |
| - Streams upload to disk (no memory overflow) | |
| - Monitors response size | |
| - Automatically skips base64 if response would exceed 50MB | |
| - For very large files, use /split-invoices-stream endpoint instead | |
| """ | |
| if not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDF is supported") | |
| # ⭐ FIX 4: Stream large uploads to disk instead of memory | |
| max_size_bytes = max_file_size_mb * 1024 * 1024 | |
| fd, temp_path = tempfile.mkstemp(suffix=".pdf") | |
| os.close(fd) | |
| doc = None | |
| try: | |
| # Stream upload to temp file | |
| print(f"📥 Streaming upload: {file.filename}") | |
| total_size = 0 | |
| with open(temp_path, "wb") as buffer: | |
| chunk_size = 5 * 1024 * 1024 # 5MB chunks | |
| while content := await file.read(chunk_size): | |
| total_size += len(content) | |
| if total_size > max_size_bytes: | |
| remove_file(temp_path) | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File too large. Max: {max_file_size_mb}MB, got: {total_size/(1024*1024):.1f}MB" | |
| ) | |
| buffer.write(content) | |
| if total_size % (20 * 1024 * 1024) < chunk_size: | |
| print(f" 📊 Uploaded: {total_size/(1024*1024):.1f}MB") | |
| file_size_mb = total_size / (1024 * 1024) | |
| print(f"💾 Saved {file_size_mb:.2f}MB to disk") | |
| # Open from disk | |
| doc = fitz. open(temp_path) | |
| if doc. page_count == 0: | |
| raise HTTPException(status_code=400, detail="No pages found") | |
| print(f"\n{'='*60}") | |
| print(f"Processing: {file.filename} ({doc.page_count} pages)") | |
| print(f"{'='*60}") | |
| # Detect PDF type | |
| is_image_pdf, avg_text_len = is_image_based_pdf(doc) | |
| if is_image_pdf and not get_gemini_model(): | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Image-based PDF detected but Google Gemini is not configured." | |
| ) | |
| # Extract invoice numbers | |
| page_invoice_nos: List[Optional[str]] = [] | |
| for i in range(doc.page_count): | |
| if i % 50 == 0: | |
| print(f"\n--- Processing page {i+1}/{doc. page_count} ---") | |
| page = doc. load_page(i) | |
| inv = extract_invoice_no_from_page(page, is_image_pdf) | |
| page_invoice_nos.append(inv) | |
| page = None # Free memory | |
| if i % 100 == 0: | |
| gc.collect() | |
| print(f"\nRaw Extraction: {page_invoice_nos}") | |
| # Filter GST entries | |
| page_invoice_nos_filtered = [ | |
| None if (v and v.upper().startswith("GST: ")) else v | |
| for v in page_invoice_nos | |
| ] | |
| print(f"Filtered Results: {page_invoice_nos_filtered}") | |
| # Group pages | |
| groups: List[Dict] = [] | |
| current_group_pages: List[int] = [] | |
| current_invoice: Optional[str] = None | |
| for idx, inv in enumerate(page_invoice_nos_filtered): | |
| if current_invoice is None: | |
| current_invoice = inv | |
| current_group_pages = [idx] | |
| else: | |
| if inv is not None and inv != current_invoice: | |
| groups.append({ | |
| "invoice_no": current_invoice, | |
| "pages": current_group_pages[:], | |
| }) | |
| current_invoice = inv | |
| current_group_pages = [idx] | |
| else: | |
| current_group_pages.append(idx) | |
| if current_group_pages: | |
| groups.append({ | |
| "invoice_no": current_invoice, | |
| "pages": current_group_pages[:] | |
| }) | |
| # Merge leading None group | |
| if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: | |
| groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] | |
| groups.pop(0) | |
| if all(g["invoice_no"] is None for g in groups): | |
| print("\n⚠ Warning: No invoices detected!") | |
| groups = [{ | |
| "invoice_no": None, | |
| "pages": list(range(doc.page_count)) | |
| }] | |
| # ⭐ FIX 5: Build response with size tracking | |
| parts = [] | |
| total_response_size = 0 | |
| max_response_bytes = MAX_RESPONSE_SIZE_MB * 1024 * 1024 | |
| response_size_exceeded = False | |
| for idx, g in enumerate(groups): | |
| print(f"\n🔨 Building part {idx+1}/{len(groups)}") | |
| part_bytes = build_pdf_from_pages(doc, g["pages"]) | |
| info = { | |
| "invoice_no": g["invoice_no"], | |
| "pages": [p + 1 for p in g["pages"]], | |
| "num_pages": len(g["pages"]), | |
| "size_bytes": len(part_bytes), | |
| "size_mb": round(len(part_bytes) / (1024 * 1024), 2) | |
| } | |
| # ⭐ Smart base64 inclusion based on response size | |
| if include_pdf and not response_size_exceeded: | |
| base64_size = len(part_bytes) * 4 / 3 # Base64 overhead | |
| total_response_size += base64_size | |
| if total_response_size > max_response_bytes: | |
| print( | |
| f" ⚠️ Response size limit reached ({MAX_RESPONSE_SIZE_MB}MB)") | |
| print(f" 💡 Skipping base64 for remaining parts") | |
| print(f" 💡 Use /split-invoices-stream for large files") | |
| response_size_exceeded = True | |
| info["pdf_base64"] = None | |
| info["warning"] = f"Response too large. Use streaming endpoint." | |
| else: | |
| info["pdf_base64"] = base64.b64encode( | |
| part_bytes).decode("ascii") | |
| else: | |
| info["pdf_base64"] = None | |
| parts.append(info) | |
| del part_bytes | |
| gc.collect() | |
| print(f"\n✅ Split into {len(parts)} parts") | |
| return JSONResponse({ | |
| "success": True, | |
| "count": len(parts), | |
| "pdf_type": "image-based" if is_image_pdf else "text-based", | |
| "source_file": { | |
| "name": file.filename, | |
| "size_mb": round(file_size_mb, 2), | |
| "total_pages": doc.page_count | |
| }, | |
| "parts": parts, | |
| "response_info": { | |
| "size_limit_mb": MAX_RESPONSE_SIZE_MB, | |
| "size_exceeded": response_size_exceeded, | |
| "recommendation": "Use /split-invoices-stream for files >100MB" if response_size_exceeded else None | |
| } | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"\n✗ Error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if doc: | |
| doc.close() | |
| remove_file(temp_path) | |
| gc.collect() | |
| async def split_invoices_stream( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| max_file_size_mb: int = Form(200), | |
| ): | |
| """ | |
| ⭐ STREAMING VERSION FOR LARGE FILES (100MB+) | |
| Returns NDJSON (newline-delimited JSON) - one JSON object per line. | |
| Each line is a separate invoice part. | |
| This avoids building a huge JSON response in memory. | |
| """ | |
| import json | |
| if not file. filename.lower().endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDF is supported") | |
| max_size_bytes = max_file_size_mb * 1024 * 1024 | |
| fd, temp_path = tempfile. mkstemp(suffix=".pdf") | |
| os.close(fd) | |
| # Upload to disk | |
| try: | |
| total_size = 0 | |
| with open(temp_path, "wb") as buffer: | |
| chunk_size = 5 * 1024 * 1024 | |
| while content := await file.read(chunk_size): | |
| total_size += len(content) | |
| if total_size > max_size_bytes: | |
| remove_file(temp_path) | |
| raise HTTPException( | |
| status_code=413, detail=f"File too large") | |
| buffer.write(content) | |
| except Exception as e: | |
| remove_file(temp_path) | |
| raise | |
| async def generate_parts(): | |
| doc = None | |
| try: | |
| doc = fitz.open(temp_path) | |
| # Send status | |
| yield json.dumps({ | |
| "type": "status", | |
| "status": "processing", | |
| "total_pages": doc.page_count, | |
| "filename": file.filename | |
| }) + "\n" | |
| # Detect type | |
| is_image_pdf, _ = is_image_based_pdf(doc) | |
| # Extract | |
| page_invoice_nos = [] | |
| for i in range(doc.page_count): | |
| page = doc.load_page(i) | |
| inv = extract_invoice_no_from_page(page, is_image_pdf) | |
| page_invoice_nos.append(inv) | |
| page = None | |
| if i % 100 == 0: | |
| gc.collect() | |
| # Filter & group | |
| clean_invs = [None if (v and v.upper().startswith( | |
| "GST:")) else v for v in page_invoice_nos] | |
| groups = [] | |
| current_group = [] | |
| current_inv = None | |
| for idx, inv in enumerate(clean_invs): | |
| if current_inv is None: | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| if inv is not None and inv != current_inv: | |
| groups. append( | |
| {"invoice_no": current_inv, "pages": current_group}) | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| current_group.append(idx) | |
| if current_group: | |
| groups.append( | |
| {"invoice_no": current_inv, "pages": current_group}) | |
| if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: | |
| groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] | |
| groups.pop(0) | |
| # Stream each part | |
| for idx, g in enumerate(groups): | |
| part_bytes = build_pdf_from_pages(doc, g["pages"]) | |
| info = { | |
| "type": "part", | |
| "part_index": idx, | |
| "invoice_no": g["invoice_no"], | |
| "pages": [p + 1 for p in g["pages"]], | |
| "num_pages": len(g["pages"]), | |
| "size_bytes": len(part_bytes), | |
| "pdf_base64": base64.b64encode(part_bytes).decode("ascii") | |
| } | |
| yield json.dumps(info) + "\n" | |
| del part_bytes | |
| gc.collect() | |
| # Complete | |
| yield json.dumps({ | |
| "type": "complete", | |
| "total_parts": len(groups) | |
| }) + "\n" | |
| except Exception as e: | |
| yield json.dumps({"type": "error", "error": str(e)}) + "\n" | |
| finally: | |
| if doc: | |
| doc.close() | |
| remove_file(temp_path) | |
| gc.collect() | |
| return StreamingResponse( | |
| generate_parts(), | |
| media_type="application/x-ndjson", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=invoices-split. ndjson"} | |
| ) | |
| async def health_check(): | |
| gemini_status = "configured" if get_gemini_model() else "not configured" | |
| return { | |
| "status": "healthy", | |
| "gemini_flash": gemini_status, | |
| "gemini_available": GEMINI_AVAILABLE, | |
| "max_upload_mb": 200, | |
| "max_response_mb": MAX_RESPONSE_SIZE_MB | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("🚀 Starting Invoice Splitter API") | |
| print(f" Max upload: 200MB") | |
| print(f" Max response: {MAX_RESPONSE_SIZE_MB}MB") | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| workers=1, | |
| timeout_keep_alive=300, | |
| limit_concurrency=10 | |
| ) |