splitpdffile / app.py
anujakkulkarni's picture
Update app.py
428054b verified
import os
import io
import re
import base64
import gc
import tempfile
from typing import List, Dict, Optional, Tuple
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks
from fastapi. middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from starlette.requests import Request
import fitz # PyMuPDF
# Google Gemini - optional import
try:
import google.generativeai as genai
from PIL import Image
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
print("Warning: google-generativeai not installed.Image-based PDFs won't be supported.")
app = FastAPI(title="Invoice Splitter API")
# ⭐ FIX 1: Increase request body size limit to handle large uploads
Request.max_body_size = 200 * 1024 * 1024 # 200MB limit
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Google Gemini Configuration ---
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
gemini_model = None
# ⭐ FIX 2: Configuration for response size management
MAX_RESPONSE_SIZE_MB = 50 # Skip base64 if response exceeds this
def get_gemini_model():
"""Get or create Gemini model instance."""
global gemini_model
if not GEMINI_AVAILABLE:
print("Gemini SDK not available")
return None
if gemini_model is None:
if not GEMINI_API_KEY:
print("Warning: Gemini API key not found in environment variables.")
return None
try:
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp')
print("✓ Google Gemini Flash 2.0 initialized")
except Exception as e:
print(f"Failed to initialize Gemini model: {e}")
return None
return gemini_model
# --- Regex patterns ---
INVOICE_NO_RE = re.compile(
r"""
(?:
Invoice\s*No\. ?|
Inv\. ?\s*No\.?|
Bill\s*No\.?|
Document\s*No\.?|
Doc\s*No\.?|
Tax\s*Invoice\s*No\.?|
Invoice\s*#|
Inv\s*#
)
[\s:\-]*(?:(?:Order|Ref|No|Dt|Date)\b[\s:\-]*)*
\s*
([A-Z0-9][A-Z0-9\-\/]{2,})
""",
re. IGNORECASE | re.VERBOSE
)
PREFIXED_INVOICE_RE = re.compile(
r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b"
)
GST_LIKE_RE = re.compile(
r"\b((?: GSTIN|GST\s*No\. ?|GST\s*IN|GST)[\s:\-]*([0-9A-Z]{15}))\b", re.IGNORECASE)
def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]:
total_text_length = 0
pages_to_check = min(sample_pages, doc.page_count)
for i in range(pages_to_check):
text = doc.load_page(i).get_text("text") or ""
total_text_length += len(text. strip())
avg_text_length = total_text_length / pages_to_check
is_image_based = avg_text_length < 50
print(
f" PDF Type Detection: avg_text_length={avg_text_length:.1f} chars/page")
print(
f" Classification: {'IMAGE-BASED' if is_image_based else 'TEXT-BASED'} PDF")
return is_image_based, avg_text_length
def normalize_text_for_search(s: str) -> str:
if not s:
return s
s = s.replace("\u00A0", " ")
s = re.sub(r"[\r\n\t]+", " ", s)
s = re.sub(r"[ ]{2,}", " ", s).strip()
return s
def try_extract_invoice_from_text(text: str) -> Optional[str]:
if not text:
return None
text_norm = normalize_text_for_search(text)
label_match = re.search(
r"(?:Invoice|Inv|Bill|Doc|Document|Tax\s*Invoice)\s*(?:No|#|\.|: )",
text_norm,
re.IGNORECASE
)
if label_match:
start_idx = label_match.end()
candidate_text = text_norm[start_idx: start_idx + 60]
clean_candidates = re.sub(r"[:\-\(\)\[\]]", " ", candidate_text)
words = clean_candidates.split()
for word in words:
word = word.strip(".,;")
if word. lower() in ("order", "ref", "no", "date", "dt", "inv", "bill", "account"):
continue
if len(word) > 2 and any(char.isdigit() for char in word):
return word
top_text = text_norm[:600]
m = re.search(r"\b([A-Z0-9][A-Z0-9\-\/]{4,})\b", top_text)
if m:
inv = m.group(1)
if sum(c.isdigit() for c in inv) >= 3:
return inv
gm = GST_LIKE_RE.search(text_norm)
if gm:
gst_val = gm.group(2) or ""
gst_val = gst_val.replace(" ", "").strip().upper()
if len(gst_val) == 15 and re.match(r"^[0-9A-Z]{15}$", gst_val):
return f"GST:{gst_val}"
return None
def extract_invoice_text_based(page: fitz.Page) -> Optional[str]:
text = page.get_text("text") or ""
inv = try_extract_invoice_from_text(text)
if inv:
return inv
for block in (page.get_text("blocks") or []):
block_text = block[4] if len(block) > 4 else ""
if block_text:
inv = try_extract_invoice_from_text(block_text)
if inv:
return inv
return None
def extract_invoice_gemini(page: fitz.Page) -> Optional[str]:
model = get_gemini_model()
if not model:
print(" Gemini model not available")
return None
try:
# Reduced from 2x to save memory
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
img_bytes = pix.tobytes("png")
pix = None # Free memory
img = Image.open(io.BytesIO(img_bytes))
prompt = """
Extract the invoice number from this image. Look for:
- Invoice No, Invoice Number, Bill No, Bill Number
- Any alphanumeric code that appears to be an invoice identifier
- Purchase Order numbers if no invoice number is found
Return ONLY the invoice number/identifier itself, nothing else.
If no invoice number is found, return "NOT_FOUND".
"""
print(" Calling Google Gemini API...")
response = model.generate_content([prompt, img])
if response and response.text:
extracted_text = response.text.strip()
print(f" Gemini response: {extracted_text}")
if extracted_text and extracted_text != "NOT_FOUND":
invoice_no = extracted_text. replace(
"*", "").replace("#", "").strip()
if invoice_no and len(invoice_no) > 2:
print(f" ✓ Gemini found invoice: {invoice_no}")
img.close()
return invoice_no
ocr_prompt = "Extract all text from this invoice image. Return the complete text content."
ocr_response = model.generate_content([ocr_prompt, img])
if ocr_response and ocr_response.text:
print(
f" Gemini extracted {len(ocr_response.text)} chars, trying regex...")
inv = try_extract_invoice_from_text(ocr_response.text)
if inv:
print(f" ✓ Found via regex on Gemini text: {inv}")
img.close()
return inv
img.close()
print(" ✗ Gemini: No invoice found")
return None
except Exception as e:
print(f" ✗ Gemini extraction failed: {e}")
return None
def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]:
text_result = extract_invoice_text_based(page)
if text_result:
print(f" ✓ Found via text extraction: {text_result}")
return text_result
if is_image_pdf:
gemini_result = extract_invoice_gemini(page)
if gemini_result:
print(f" ✓ Found via Gemini: {gemini_result}")
return gemini_result
return None
def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> bytes:
"""Create a new PDF with the given pages (0-based indices)."""
out = fitz.open()
try:
for i in page_indices:
out.insert_pdf(src_doc, from_page=i, to_page=i)
# ⭐ Compress output
pdf_bytes = out.tobytes(garbage=4, deflate=True)
return pdf_bytes
finally:
out.close()
# ⭐ FIX 3: Cleanup utility
def remove_file(path: str):
try:
if os.path.exists(path):
os.remove(path)
print(f"🧹 Cleaned up: {path}")
except Exception as e:
print(f"⚠️ Cleanup warning: {e}")
# ============================================================================
# API ENDPOINTS
# ============================================================================
@app.post("/split-invoices")
async def split_invoices(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
include_pdf: bool = Form(True),
max_file_size_mb: int = Form(200),
):
"""
Split a multi-invoice PDF into separate PDFs.
⭐ HANDLES LARGE FILES:
- Streams upload to disk (no memory overflow)
- Monitors response size
- Automatically skips base64 if response would exceed 50MB
- For very large files, use /split-invoices-stream endpoint instead
"""
if not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF is supported")
# ⭐ FIX 4: Stream large uploads to disk instead of memory
max_size_bytes = max_file_size_mb * 1024 * 1024
fd, temp_path = tempfile.mkstemp(suffix=".pdf")
os.close(fd)
doc = None
try:
# Stream upload to temp file
print(f"📥 Streaming upload: {file.filename}")
total_size = 0
with open(temp_path, "wb") as buffer:
chunk_size = 5 * 1024 * 1024 # 5MB chunks
while content := await file.read(chunk_size):
total_size += len(content)
if total_size > max_size_bytes:
remove_file(temp_path)
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {max_file_size_mb}MB, got: {total_size/(1024*1024):.1f}MB"
)
buffer.write(content)
if total_size % (20 * 1024 * 1024) < chunk_size:
print(f" 📊 Uploaded: {total_size/(1024*1024):.1f}MB")
file_size_mb = total_size / (1024 * 1024)
print(f"💾 Saved {file_size_mb:.2f}MB to disk")
# Open from disk
doc = fitz. open(temp_path)
if doc. page_count == 0:
raise HTTPException(status_code=400, detail="No pages found")
print(f"\n{'='*60}")
print(f"Processing: {file.filename} ({doc.page_count} pages)")
print(f"{'='*60}")
# Detect PDF type
is_image_pdf, avg_text_len = is_image_based_pdf(doc)
if is_image_pdf and not get_gemini_model():
raise HTTPException(
status_code=500,
detail="Image-based PDF detected but Google Gemini is not configured."
)
# Extract invoice numbers
page_invoice_nos: List[Optional[str]] = []
for i in range(doc.page_count):
if i % 50 == 0:
print(f"\n--- Processing page {i+1}/{doc. page_count} ---")
page = doc. load_page(i)
inv = extract_invoice_no_from_page(page, is_image_pdf)
page_invoice_nos.append(inv)
page = None # Free memory
if i % 100 == 0:
gc.collect()
print(f"\nRaw Extraction: {page_invoice_nos}")
# Filter GST entries
page_invoice_nos_filtered = [
None if (v and v.upper().startswith("GST: ")) else v
for v in page_invoice_nos
]
print(f"Filtered Results: {page_invoice_nos_filtered}")
# Group pages
groups: List[Dict] = []
current_group_pages: List[int] = []
current_invoice: Optional[str] = None
for idx, inv in enumerate(page_invoice_nos_filtered):
if current_invoice is None:
current_invoice = inv
current_group_pages = [idx]
else:
if inv is not None and inv != current_invoice:
groups.append({
"invoice_no": current_invoice,
"pages": current_group_pages[:],
})
current_invoice = inv
current_group_pages = [idx]
else:
current_group_pages.append(idx)
if current_group_pages:
groups.append({
"invoice_no": current_invoice,
"pages": current_group_pages[:]
})
# Merge leading None group
if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None:
groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"]
groups.pop(0)
if all(g["invoice_no"] is None for g in groups):
print("\n⚠ Warning: No invoices detected!")
groups = [{
"invoice_no": None,
"pages": list(range(doc.page_count))
}]
# ⭐ FIX 5: Build response with size tracking
parts = []
total_response_size = 0
max_response_bytes = MAX_RESPONSE_SIZE_MB * 1024 * 1024
response_size_exceeded = False
for idx, g in enumerate(groups):
print(f"\n🔨 Building part {idx+1}/{len(groups)}")
part_bytes = build_pdf_from_pages(doc, g["pages"])
info = {
"invoice_no": g["invoice_no"],
"pages": [p + 1 for p in g["pages"]],
"num_pages": len(g["pages"]),
"size_bytes": len(part_bytes),
"size_mb": round(len(part_bytes) / (1024 * 1024), 2)
}
# ⭐ Smart base64 inclusion based on response size
if include_pdf and not response_size_exceeded:
base64_size = len(part_bytes) * 4 / 3 # Base64 overhead
total_response_size += base64_size
if total_response_size > max_response_bytes:
print(
f" ⚠️ Response size limit reached ({MAX_RESPONSE_SIZE_MB}MB)")
print(f" 💡 Skipping base64 for remaining parts")
print(f" 💡 Use /split-invoices-stream for large files")
response_size_exceeded = True
info["pdf_base64"] = None
info["warning"] = f"Response too large. Use streaming endpoint."
else:
info["pdf_base64"] = base64.b64encode(
part_bytes).decode("ascii")
else:
info["pdf_base64"] = None
parts.append(info)
del part_bytes
gc.collect()
print(f"\n✅ Split into {len(parts)} parts")
return JSONResponse({
"success": True,
"count": len(parts),
"pdf_type": "image-based" if is_image_pdf else "text-based",
"source_file": {
"name": file.filename,
"size_mb": round(file_size_mb, 2),
"total_pages": doc.page_count
},
"parts": parts,
"response_info": {
"size_limit_mb": MAX_RESPONSE_SIZE_MB,
"size_exceeded": response_size_exceeded,
"recommendation": "Use /split-invoices-stream for files >100MB" if response_size_exceeded else None
}
})
except HTTPException:
raise
except Exception as e:
print(f"\n✗ Error: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
finally:
if doc:
doc.close()
remove_file(temp_path)
gc.collect()
@app.post("/split-invoices-stream")
async def split_invoices_stream(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
max_file_size_mb: int = Form(200),
):
"""
⭐ STREAMING VERSION FOR LARGE FILES (100MB+)
Returns NDJSON (newline-delimited JSON) - one JSON object per line.
Each line is a separate invoice part.
This avoids building a huge JSON response in memory.
"""
import json
if not file. filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF is supported")
max_size_bytes = max_file_size_mb * 1024 * 1024
fd, temp_path = tempfile. mkstemp(suffix=".pdf")
os.close(fd)
# Upload to disk
try:
total_size = 0
with open(temp_path, "wb") as buffer:
chunk_size = 5 * 1024 * 1024
while content := await file.read(chunk_size):
total_size += len(content)
if total_size > max_size_bytes:
remove_file(temp_path)
raise HTTPException(
status_code=413, detail=f"File too large")
buffer.write(content)
except Exception as e:
remove_file(temp_path)
raise
async def generate_parts():
doc = None
try:
doc = fitz.open(temp_path)
# Send status
yield json.dumps({
"type": "status",
"status": "processing",
"total_pages": doc.page_count,
"filename": file.filename
}) + "\n"
# Detect type
is_image_pdf, _ = is_image_based_pdf(doc)
# Extract
page_invoice_nos = []
for i in range(doc.page_count):
page = doc.load_page(i)
inv = extract_invoice_no_from_page(page, is_image_pdf)
page_invoice_nos.append(inv)
page = None
if i % 100 == 0:
gc.collect()
# Filter & group
clean_invs = [None if (v and v.upper().startswith(
"GST:")) else v for v in page_invoice_nos]
groups = []
current_group = []
current_inv = None
for idx, inv in enumerate(clean_invs):
if current_inv is None:
current_inv = inv
current_group = [idx]
else:
if inv is not None and inv != current_inv:
groups. append(
{"invoice_no": current_inv, "pages": current_group})
current_inv = inv
current_group = [idx]
else:
current_group.append(idx)
if current_group:
groups.append(
{"invoice_no": current_inv, "pages": current_group})
if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None:
groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"]
groups.pop(0)
# Stream each part
for idx, g in enumerate(groups):
part_bytes = build_pdf_from_pages(doc, g["pages"])
info = {
"type": "part",
"part_index": idx,
"invoice_no": g["invoice_no"],
"pages": [p + 1 for p in g["pages"]],
"num_pages": len(g["pages"]),
"size_bytes": len(part_bytes),
"pdf_base64": base64.b64encode(part_bytes).decode("ascii")
}
yield json.dumps(info) + "\n"
del part_bytes
gc.collect()
# Complete
yield json.dumps({
"type": "complete",
"total_parts": len(groups)
}) + "\n"
except Exception as e:
yield json.dumps({"type": "error", "error": str(e)}) + "\n"
finally:
if doc:
doc.close()
remove_file(temp_path)
gc.collect()
return StreamingResponse(
generate_parts(),
media_type="application/x-ndjson",
headers={
"Content-Disposition": f"attachment; filename=invoices-split. ndjson"}
)
@app.get("/health")
async def health_check():
gemini_status = "configured" if get_gemini_model() else "not configured"
return {
"status": "healthy",
"gemini_flash": gemini_status,
"gemini_available": GEMINI_AVAILABLE,
"max_upload_mb": 200,
"max_response_mb": MAX_RESPONSE_SIZE_MB
}
if __name__ == "__main__":
import uvicorn
print("🚀 Starting Invoice Splitter API")
print(f" Max upload: 200MB")
print(f" Max response: {MAX_RESPONSE_SIZE_MB}MB")
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
workers=1,
timeout_keep_alive=300,
limit_concurrency=10
)