Spaces:
Running
Running
| import os | |
| import io | |
| import re | |
| import base64 | |
| import time | |
| import datetime | |
| import shutil | |
| import tempfile | |
| import gc | |
| from typing import List, Dict, Optional, Tuple | |
| from collections import deque | |
| from pathlib import Path | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from starlette.requests import Request | |
| import fitz # PyMuPDF | |
| # Google Gemini - optional import | |
| try: | |
| import google.generativeai as genai | |
| from PIL import Image | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("Warning: google-generativeai not installed. Image-based PDFs won't be supported.") | |
| app = FastAPI(title="Invoice Splitter API") | |
| # ⭐ Increase max request body size (default is 1MB-2MB) | |
| Request.max_body_size = 200 * 1024 * 1024 # 200MB limit | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- Google Gemini Configuration --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| # Model fallback list (in priority order) | |
| GEMINI_MODELS = [ | |
| { | |
| "name": "gemini-1.5-flash", # UPDATED: Current standard fast model | |
| "max_requests_per_minute": 15, | |
| "timeout": 300, | |
| "description": "Primary fast model" | |
| }, | |
| { | |
| "name": "gemini-2.0-flash-exp", # Fallback experimental | |
| "max_requests_per_minute": 10, | |
| "timeout": 300, | |
| "description": "Experimental fallback" | |
| }, | |
| { | |
| "name": "gemini-1.5-pro", # Slower fallback | |
| "max_requests_per_minute": 2, | |
| "timeout": 300, | |
| "description": "Pro fallback (slower)" | |
| } | |
| ] | |
| current_model_index = 0 | |
| gemini_model = None | |
| last_quota_reset = None | |
| daily_quota_exhausted = False | |
| # --- Rate Limiter Class --- | |
| class SimpleRateLimiter: | |
| def __init__(self, max_requests=10, window_seconds=60): | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self.requests = deque() | |
| self.quota_error_count = 0 | |
| def allow_request(self): | |
| now = time.time() | |
| while self.requests and self.requests[0] < now - self.window_seconds: | |
| self.requests.popleft() | |
| if len(self.requests) < self.max_requests: | |
| self.requests.append(now) | |
| return True | |
| return False | |
| def wait_time(self): | |
| if not self.requests: | |
| return 0 | |
| oldest = self.requests[0] | |
| return max(0, self.window_seconds - (time.time() - oldest)) | |
| def reset(self): | |
| self.requests. clear() | |
| self.quota_error_count = 0 | |
| def record_quota_error(self): | |
| self.quota_error_count += 1 | |
| gemini_rate_limiter = SimpleRateLimiter( | |
| max_requests=GEMINI_MODELS[current_model_index]["max_requests_per_minute"], | |
| window_seconds=60 | |
| ) | |
| # --- Daily Quota Management --- | |
| def check_daily_quota(): | |
| global last_quota_reset, daily_quota_exhausted | |
| now = datetime.datetime.now() | |
| if last_quota_reset is None: | |
| last_quota_reset = now | |
| daily_quota_exhausted = False | |
| return True | |
| if now. date() > last_quota_reset.date(): | |
| print("🔄 Daily quota reset detected") | |
| last_quota_reset = now | |
| daily_quota_exhausted = False | |
| reset_to_primary_model() | |
| return True | |
| return not daily_quota_exhausted | |
| def mark_daily_quota_exhausted(): | |
| global daily_quota_exhausted | |
| daily_quota_exhausted = True | |
| print(f"❌ Daily quota exhausted") | |
| # --- Model Management --- | |
| def get_gemini_model(): | |
| global gemini_model, current_model_index | |
| if not GEMINI_AVAILABLE or not GEMINI_API_KEY: | |
| return None | |
| if not check_daily_quota(): | |
| return None | |
| if gemini_model is None: | |
| model_config = GEMINI_MODELS[current_model_index] | |
| try: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| gemini_model = genai.GenerativeModel(model_config["name"]) | |
| print(f"✓ Initialized: {model_config['name']}") | |
| except Exception as e: | |
| print(f"Failed to initialize {model_config['name']}: {e}") | |
| return None | |
| return gemini_model | |
| def switch_to_next_model(): | |
| global gemini_model, current_model_index, gemini_rate_limiter | |
| if current_model_index < len(GEMINI_MODELS) - 1: | |
| current_model_index += 1 | |
| model_config = GEMINI_MODELS[current_model_index] | |
| gemini_rate_limiter = SimpleRateLimiter( | |
| max_requests=model_config["max_requests_per_minute"], | |
| window_seconds=60 | |
| ) | |
| gemini_model = None | |
| print(f"🔄 SWITCHED TO MODEL: {model_config['name']}") | |
| return get_gemini_model() | |
| return None | |
| def reset_to_primary_model(): | |
| global gemini_model, current_model_index, gemini_rate_limiter | |
| if current_model_index != 0: | |
| current_model_index = 0 | |
| model_config = GEMINI_MODELS[0] | |
| gemini_rate_limiter = SimpleRateLimiter( | |
| max_requests=model_config["max_requests_per_minute"], | |
| window_seconds=60 | |
| ) | |
| gemini_model = None | |
| return True | |
| return False | |
| # --- Regex Patterns --- | |
| INVOICE_NO_RE = re.compile( | |
| r"""(?: Invoice\s*No\. ?|Inv\. ?\s*No\.?|Bill\s*No\.?|Document\s*No\.?|Doc\s*No\.?|Tax\s*Invoice\s*No\.?)\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-\/]{3,})""", | |
| re.IGNORECASE | re.VERBOSE | |
| ) | |
| PREFIXED_INVOICE_RE = re.compile(r"\b([A-Z]{2,4}[-/]\d{4,}(?:/\d+)?[A-Z]*)\b") | |
| GST_LIKE_RE = re.compile(r"\b((?: GSTIN|GST\s*No\.?|GST\s*IN|GST)[\s:\-]*([0-9A-Z]{15}))\b", re.IGNORECASE) | |
| def is_image_based_pdf(doc: fitz.Document, sample_pages: int = 3) -> Tuple[bool, float]: | |
| total_text_length = 0 | |
| pages_to_check = min(sample_pages, doc.page_count) | |
| for i in range(pages_to_check): | |
| text = doc.load_page(i).get_text("text") or "" | |
| total_text_length += len(text. strip()) | |
| avg_text_length = total_text_length / pages_to_check | |
| return avg_text_length < 50, avg_text_length | |
| # --- Extraction Logic --- | |
| def normalize_text_for_search(s: str) -> str: | |
| if not s: | |
| return s | |
| s = s.replace("\u00A0", " ") | |
| return re.sub(r"[ ]{2,}", " ", re.sub(r"[\r\n\t]+", " ", s)).strip() | |
| def try_extract_invoice_from_text(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| text_norm = normalize_text_for_search(text) | |
| m = INVOICE_NO_RE. search(text_norm) | |
| if m: | |
| inv = (m.group(1) or "").strip() | |
| if inv and len(inv) > 2 and inv. lower() not in ("invoice", "bill"): | |
| return inv | |
| m = PREFIXED_INVOICE_RE.search(text_norm[: 600]) | |
| if m: | |
| inv = (m.group(1) or "").strip() | |
| if inv and len(re.sub(r"[^A-Za-z0-9]", "", inv)) >= 5: | |
| return inv | |
| gm = GST_LIKE_RE.search(text_norm) | |
| if gm: | |
| gst_val = gm.group(2).replace(" ", "").strip().upper() | |
| if len(gst_val) == 15: | |
| return f"GST:{gst_val}" | |
| return None | |
| def extract_invoice_gemini(page: fitz.Page, retry_count=0) -> Optional[str]: | |
| if not check_daily_quota(): | |
| return None | |
| model = get_gemini_model() | |
| if not model: | |
| return None | |
| if not gemini_rate_limiter.allow_request(): | |
| wait_time = gemini_rate_limiter.wait_time() | |
| print(f" ⏱ Rate limit, waiting {int(wait_time)}s...") | |
| time.sleep(wait_time + 1) | |
| return extract_invoice_gemini(page, retry_count) | |
| try: | |
| # ⭐ Reduced resolution from 2x to 1.5x to save memory | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1. 5, 1.5), dpi=150) | |
| img_bytes = pix.tobytes("png") | |
| # ⭐ Explicitly free pixmap memory | |
| pix = None | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| prompt = """Extract the invoice number. Return ONLY the number. If not found, return 'NOT_FOUND'.""" | |
| response = model.generate_content([prompt, img]) | |
| # Try to get invoice number from response | |
| result = None | |
| if response and response.text: | |
| txt = response.text.strip().replace("*", "").replace("#", "") | |
| if txt and txt != "NOT_FOUND" and len(txt) > 2: | |
| result = txt | |
| # Fallback to OCR text if no result | |
| if not result: | |
| ocr_resp = model.generate_content(["Extract all text.", img]) | |
| if ocr_resp and ocr_resp.text: | |
| result = try_extract_invoice_from_text(ocr_resp.text) | |
| # ⭐ Free image memory | |
| img. close() | |
| return result | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "429" in str(e) or "quota" in error_str: | |
| gemini_rate_limiter.record_quota_error() | |
| if "per_day" in error_str: | |
| mark_daily_quota_exhausted() | |
| return None | |
| if retry_count < len(GEMINI_MODELS) - 1: | |
| if switch_to_next_model(): | |
| return extract_invoice_gemini(page, retry_count + 1) | |
| print(f" ✗ Gemini Error: {e}") | |
| return None | |
| def extract_invoice_no_from_page(page: fitz.Page, is_image_pdf: bool) -> Optional[str]: | |
| # 1. Try Text Extraction (Fastest) | |
| text = page.get_text("text") or "" | |
| inv = try_extract_invoice_from_text(text) | |
| if inv: | |
| return inv | |
| # 2. Try Block Extraction | |
| for block in (page.get_text("blocks") or []): | |
| if len(block) > 4 and block[4]: | |
| inv = try_extract_invoice_from_text(block[4]) | |
| if inv: | |
| return inv | |
| # 3. Gemini Fallback (Only if enabled and seemingly image-based) | |
| if is_image_pdf: | |
| return extract_invoice_gemini(page) | |
| return None | |
| def build_pdf_from_pages(src_doc: fitz.Document, page_indices: List[int]) -> bytes: | |
| """Build a PDF with memory optimization""" | |
| out = fitz.open() | |
| try: | |
| for i in page_indices: | |
| out.insert_pdf(src_doc, from_page=i, to_page=i) | |
| # ⭐ Optimize and compress output PDF | |
| pdf_bytes = out.tobytes(garbage=4, deflate=True) | |
| return pdf_bytes | |
| finally: | |
| out.close() | |
| # --- File Cleanup Utility --- | |
| def remove_file(path: str): | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| print(f"🧹 Cleaned up temp file: {path}") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not remove temp file {path}: {e}") | |
| # ============================================================================ | |
| # API ENDPOINTS | |
| # ============================================================================ | |
| async def root(): | |
| return { | |
| "service": "Invoice Splitter API", | |
| "version": "2.0", | |
| "max_file_size_mb": 200, | |
| "gemini_available": GEMINI_AVAILABLE, | |
| "gemini_configured": bool(GEMINI_API_KEY) | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "gemini_status": { | |
| "available": GEMINI_AVAILABLE, | |
| "configured": bool(GEMINI_API_KEY), | |
| "current_model": GEMINI_MODELS[current_model_index]["name"], | |
| "daily_quota_exhausted": daily_quota_exhausted | |
| } | |
| } | |
| async def split_invoices( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| include_pdf: bool = Form(True), | |
| max_file_size_mb: int = Form(200) | |
| ): | |
| """ | |
| Split a large PDF file into separate invoices. | |
| Parameters: | |
| - file: PDF file to split (max 200MB) | |
| - include_pdf: Include base64-encoded PDFs in response (default: True) | |
| - max_file_size_mb: Maximum file size in MB (default: 200) | |
| Returns: | |
| - JSON with split invoice parts | |
| """ | |
| if not file.filename.lower().endswith(". pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDF files are supported") | |
| max_size_bytes = max_file_size_mb * 1024 * 1024 | |
| # Create temporary file | |
| fd, temp_path = tempfile. mkstemp(suffix=".pdf") | |
| os.close(fd) | |
| doc = None # Initialize for finally block | |
| try: | |
| # ⭐ Stream upload with size tracking and validation | |
| print(f"📥 Receiving file: {file.filename}") | |
| total_size = 0 | |
| with open(temp_path, "wb") as buffer: | |
| # ⭐ Use 5MB chunks for faster processing | |
| chunk_size = 5 * 1024 * 1024 | |
| while content := await file.read(chunk_size): | |
| total_size += len(content) | |
| # ⭐ Check size limit during upload | |
| if total_size > max_size_bytes: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File too large. Maximum size: {max_file_size_mb}MB, received: {total_size / (1024*1024):.1f}MB" | |
| ) | |
| buffer.write(content) | |
| # ⭐ Progress logging for large files | |
| if total_size % (20 * 1024 * 1024) < chunk_size: # Every ~20MB | |
| print(f" 📊 Uploaded: {total_size / (1024*1024):.1f}MB") | |
| file_size_mb = total_size / (1024 * 1024) | |
| print(f"💾 Saved {file_size_mb:.2f}MB to: {temp_path}") | |
| # ⭐ Open PDF from disk (memory-mapped) | |
| doc = fitz.open(temp_path) | |
| if doc. page_count == 0: | |
| raise HTTPException(status_code=400, detail="PDF file is empty") | |
| print(f"📄 Processing {doc.page_count} pages...") | |
| # Step 1: Detect if image-based PDF (check fewer pages for large PDFs) | |
| sample_pages = min(3, doc.page_count) | |
| is_image_pdf, avg_text = is_image_based_pdf(doc, sample_pages) | |
| print(f" PDF Type: {'Image-based' if is_image_pdf else 'Text-based'} (avg text: {avg_text:.1f} chars)") | |
| # Step 2: Extract invoice numbers from all pages | |
| page_invoice_nos = [] | |
| for i in range(doc. page_count): | |
| # ⭐ Progress logging for large documents | |
| if i > 0 and i % 50 == 0: | |
| print(f" 📄 Processed {i}/{doc.page_count} pages") | |
| page = doc. load_page(i) | |
| try: | |
| inv = extract_invoice_no_from_page(page, is_image_pdf) | |
| page_invoice_nos.append(inv) | |
| if inv: | |
| print(f" Page {i+1}: Found invoice '{inv}'") | |
| finally: | |
| # ⭐ Explicitly free page resources | |
| page = None | |
| # ⭐ Force garbage collection every 100 pages | |
| if i > 0 and i % 100 == 0: | |
| gc.collect() | |
| print(f"✓ Extraction complete. Found {sum(1 for x in page_invoice_nos if x)} invoice numbers") | |
| # Step 3: Filter GST-only entries and group pages | |
| clean_invs = [ | |
| None if (v and v.upper().startswith("GST: ")) else v | |
| for v in page_invoice_nos | |
| ] | |
| groups = [] | |
| current_group = [] | |
| current_inv = None | |
| for idx, inv in enumerate(clean_invs): | |
| if current_inv is None: | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| if inv is not None and inv != current_inv: | |
| # Save previous group | |
| groups.append({"invoice_no": current_inv, "pages": current_group}) | |
| # Start new group | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| current_group.append(idx) | |
| if current_group: | |
| groups. append({"invoice_no": current_inv, "pages": current_group}) | |
| # ⭐ Smart merging: If first page has no invoice, merge with second group | |
| if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: | |
| print(f" 🔗 Merging first {len(groups[0]['pages'])} pages with invoice '{groups[1]['invoice_no']}'") | |
| groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] | |
| groups. pop(0) | |
| print(f"📦 Created {len(groups)} invoice groups") | |
| # Step 4: Build response with PDFs | |
| parts = [] | |
| total_response_size = 0 | |
| max_response_size = 100 * 1024 * 1024 # 100MB response limit | |
| for idx, g in enumerate(groups): | |
| print(f" 🔨 Building PDF part {idx+1}/{len(groups)} (Invoice: {g['invoice_no'] or 'Unknown'})") | |
| part_bytes = build_pdf_from_pages(doc, g["pages"]) | |
| info = { | |
| "invoice_no": g["invoice_no"], | |
| "pages": [p + 1 for p in g["pages"]], # 1-based page numbers | |
| "page_count": len(g["pages"]), | |
| "size_bytes": len(part_bytes), | |
| "size_mb": round(len(part_bytes) / (1024 * 1024), 2) | |
| } | |
| # ⭐ Handle large responses - skip base64 if total response too large | |
| if include_pdf: | |
| base64_size = len(part_bytes) * 4 / 3 # Base64 encoding overhead | |
| total_response_size += base64_size | |
| if total_response_size > max_response_size: | |
| print(f" ⚠️ Response size exceeds 100MB. Skipping base64 for remaining parts.") | |
| info["pdf_base64"] = None | |
| info["warning"] = "PDF too large for inline response. Use streaming endpoint or set include_pdf=false" | |
| else: | |
| info["pdf_base64"] = base64.b64encode(part_bytes).decode("ascii") | |
| else: | |
| info["pdf_base64"] = None | |
| parts.append(info) | |
| # ⭐ Free memory immediately | |
| del part_bytes | |
| # ⭐ Garbage collect after each part | |
| if idx % 5 == 0: | |
| gc.collect() | |
| print(f"✅ Successfully split into {len(parts)} parts") | |
| return JSONResponse({ | |
| "success": True, | |
| "count": len(parts), | |
| "parts": parts, | |
| "source_file": { | |
| "name": file.filename, | |
| "size_mb": round(file_size_mb, 2), | |
| "total_pages": doc.page_count, | |
| "is_image_pdf": is_image_pdf | |
| }, | |
| "quota_status": { | |
| "daily_exhausted": daily_quota_exhausted, | |
| "current_model": GEMINI_MODELS[current_model_index]["name"] | |
| } | |
| }) | |
| except HTTPException: | |
| raise # Re-raise HTTP exceptions as-is | |
| except Exception as e: | |
| print(f"❌ Critical Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") | |
| finally: | |
| # ⭐ Critical cleanup in correct order | |
| if doc: | |
| try: | |
| doc.close() | |
| print("📕 Closed PDF document") | |
| except Exception as e: | |
| print(f"⚠️ Error closing document: {e}") | |
| # Delete temp file | |
| remove_file(temp_path) | |
| # ⭐ Final garbage collection | |
| gc.collect() | |
| async def split_invoices_stream( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| max_file_size_mb: int = Form(200) | |
| ): | |
| """ | |
| Streaming version for extremely large files. | |
| Returns NDJSON (newline-delimited JSON) with each part as a separate line. | |
| This avoids building a large JSON response in memory. | |
| """ | |
| import json | |
| if not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDF files are supported") | |
| max_size_bytes = max_file_size_mb * 1024 * 1024 | |
| fd, temp_path = tempfile. mkstemp(suffix=".pdf") | |
| os.close(fd) | |
| # Upload file | |
| try: | |
| total_size = 0 | |
| with open(temp_path, "wb") as buffer: | |
| chunk_size = 5 * 1024 * 1024 | |
| while content := await file.read(chunk_size): | |
| total_size += len(content) | |
| if total_size > max_size_bytes: | |
| remove_file(temp_path) | |
| raise HTTPException(status_code=413, detail=f"File too large. Max: {max_file_size_mb}MB") | |
| buffer.write(content) | |
| except Exception as e: | |
| remove_file(temp_path) | |
| raise | |
| async def generate_parts(): | |
| doc = None | |
| try: | |
| doc = fitz.open(temp_path) | |
| # Send initial status | |
| yield json.dumps({ | |
| "type": "status", | |
| "status": "processing", | |
| "total_pages": doc.page_count, | |
| "filename": file.filename | |
| }) + "\n" | |
| # Detect PDF type | |
| is_image_pdf, _ = is_image_based_pdf(doc) | |
| # Extract invoice numbers | |
| page_invoice_nos = [] | |
| for i in range(doc.page_count): | |
| page = doc. load_page(i) | |
| inv = extract_invoice_no_from_page(page, is_image_pdf) | |
| page_invoice_nos.append(inv) | |
| page = None | |
| if i % 100 == 0: | |
| gc.collect() | |
| # Group pages | |
| clean_invs = [None if (v and v.upper().startswith("GST:")) else v for v in page_invoice_nos] | |
| groups = [] | |
| current_group = [] | |
| current_inv = None | |
| for idx, inv in enumerate(clean_invs): | |
| if current_inv is None: | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| if inv is not None and inv != current_inv: | |
| groups. append({"invoice_no": current_inv, "pages": current_group}) | |
| current_inv = inv | |
| current_group = [idx] | |
| else: | |
| current_group. append(idx) | |
| if current_group: | |
| groups.append({"invoice_no": current_inv, "pages": current_group}) | |
| if len(groups) > 1 and groups[0]["invoice_no"] is None and groups[1]["invoice_no"] is not None: | |
| groups[1]["pages"] = groups[0]["pages"] + groups[1]["pages"] | |
| groups.pop(0) | |
| # Stream each part | |
| for idx, g in enumerate(groups): | |
| part_bytes = build_pdf_from_pages(doc, g["pages"]) | |
| info = { | |
| "type": "part", | |
| "part_index": idx, | |
| "invoice_no": g["invoice_no"], | |
| "pages": [p + 1 for p in g["pages"]], | |
| "page_count": len(g["pages"]), | |
| "size_bytes": len(part_bytes), | |
| "pdf_base64": base64.b64encode(part_bytes).decode("ascii") | |
| } | |
| yield json.dumps(info) + "\n" | |
| del part_bytes | |
| gc.collect() | |
| # Send completion status | |
| yield json.dumps({ | |
| "type": "complete", | |
| "total_parts": len(groups) | |
| }) + "\n" | |
| except Exception as e: | |
| yield json.dumps({ | |
| "type": "error", | |
| "error": str(e) | |
| }) + "\n" | |
| finally: | |
| if doc: | |
| doc.close() | |
| remove_file(temp_path) | |
| gc.collect() | |
| return StreamingResponse( | |
| generate_parts(), | |
| media_type="application/x-ndjson", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=invoices-split. ndjson" | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("🚀 Starting High-Performance Invoice Splitter API") | |
| print(f" Max file size: 200MB") | |
| print(f" Gemini available: {GEMINI_AVAILABLE}") | |
| print(f" Gemini configured: {bool(GEMINI_API_KEY)}") | |
| # ⭐ Configure uvicorn for large files | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| workers=1, # Single worker to maintain rate limiter state | |
| timeout_keep_alive=300, # 5 minutes for large uploads | |
| limit_concurrency=10, | |
| limit_max_requests=1000 | |
| ) |