| |
| |
| |
| from PIL import Image |
| if not hasattr(Image, "ANTIALIAS"): |
| Image.ANTIALIAS = Image.Resampling.LANCZOS |
|
|
| |
| |
| |
| import gradio as gr |
| import easyocr |
| import fitz |
| import numpy as np |
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from PIL import Image as PILImage |
| import re |
| import os |
| import tempfile |
|
|
|
|
| |
| |
| |
| print("Loading EasyOCR model...") |
| reader = easyocr.Reader(["en"], gpu=False) |
| print("Model loaded successfully!") |
|
|
|
|
| |
| |
| |
| def pdf_to_images(pdf_path): |
| images = [] |
| try: |
| doc = fitz.open(pdf_path) |
| for page in doc: |
| pix = page.get_pixmap(dpi=200) |
| img = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| images.append(img) |
| except Exception as e: |
| print("PDF ERROR:", e) |
| return [] |
| return images |
|
|
|
|
| |
| |
| |
| def extract_items(text): |
| lines = [l.strip() for l in text.split("\n") if l.strip()] |
| items = [] |
| |
| |
| serial_regex = r"^\d+\s+" |
| hsn_regex = r"\b\d{6,8}\b" |
| amount_regex = r"\d{1,3}(?:,\d{3})*(?:\.\d{2})?" |
| qty_regex = r"(\d+(?:\.\d{1,2})?)\s*(NOS|PCS|PC|KG|KILO|LTR|LITRE|MTR|METRE|BOX|SET|UNIT|EA|EACH)" |
| |
| i = 0 |
| n = len(lines) |
| |
| while i < n: |
| line = lines[i] |
| |
| |
| serial_match = re.match(serial_regex, line) |
| |
| if serial_match: |
| try: |
| |
| serial = serial_match.group().strip() |
| |
| |
| remaining = line[len(serial_match.group()):].strip() |
| |
| |
| if re.search(r"Description|Goods|HSN|Quantity|Rate|Amount", remaining, re.I): |
| i += 1 |
| continue |
| |
| product_name = remaining |
| |
| |
| hsn = "" |
| part_no = "" |
| quantity = "" |
| uom = "" |
| rate_excl_tax = "" |
| gst_percentage = "" |
| gst_amount = "" |
| discount_amount = "" |
| discount_percentage = "" |
| taxable_value = "" |
| |
| |
| search_end = min(i + 20, n) |
| |
| for k in range(i + 1, search_end): |
| current_line = lines[k] |
| |
| |
| if re.match(serial_regex, current_line): |
| break |
| |
| |
| if re.search(r"^(Total|Sub|CGST|SGST|IGST|Round)", current_line, re.I): |
| break |
| |
| |
| if not hsn: |
| hsn_match = re.search(hsn_regex, current_line) |
| if hsn_match: |
| potential_hsn = hsn_match.group() |
| |
| if not re.search(rf"\b{potential_hsn}\b.*\b{potential_hsn}\b", current_line): |
| hsn = potential_hsn |
| |
| |
| if not part_no: |
| |
| part_match = re.search(r"\b([A-Z0-9]{6,}(?:-[A-Z0-9]+)?)\b", current_line) |
| if part_match: |
| potential_part = part_match.group(1) |
| |
| if (potential_part != hsn and |
| len(potential_part) >= 6 and |
| not re.match(r'^\d{10}$', potential_part)): |
| part_no = potential_part |
| |
| |
| if not quantity: |
| qty_match = re.search(qty_regex, current_line, re.I) |
| if qty_match: |
| quantity = qty_match.group(1) |
| uom = qty_match.group(2) |
| |
| |
| if not gst_percentage: |
| gst_match = re.search(r"(\d{1,2})\s*%", current_line) |
| if gst_match and not re.search(r"Disc|Discount", current_line, re.I): |
| gst_pct = gst_match.group(1) |
| |
| if re.search(r"CGST|SGST", current_line, re.I): |
| gst_percentage = str(int(gst_pct) * 2) + "%" |
| else: |
| gst_percentage = gst_pct + "%" |
| |
| |
| if not discount_amount and not discount_percentage: |
| if re.search(r"Disc|Discount", current_line, re.I): |
| disc_match = re.search(r"(\d+(?:\.\d{2})?)\s*%", current_line) |
| if disc_match: |
| discount_percentage = disc_match.group(1) + "%" |
| else: |
| amount_match = re.search(amount_regex, current_line) |
| if amount_match: |
| discount_amount = amount_match.group() |
| |
| |
| if not rate_excl_tax: |
| if re.search(r"Rate(?!\s*%)", current_line, re.I): |
| rate_matches = re.findall(amount_regex, current_line) |
| if rate_matches: |
| |
| rate_excl_tax = rate_matches[-1].replace(',', '') |
| |
| |
| if not taxable_value: |
| if re.search(r"Taxable|Value", current_line, re.I) and not re.search(r"Rate", current_line, re.I): |
| tax_matches = re.findall(amount_regex, current_line) |
| if tax_matches: |
| taxable_value = tax_matches[-1].replace(',', '') |
| |
| |
| if not gst_amount: |
| if re.search(r"(CGST|SGST|IGST).*Amount", current_line, re.I): |
| gst_matches = re.findall(amount_regex, current_line) |
| if gst_matches: |
| |
| gst_amount = gst_matches[-1].replace(',', '') |
| |
| |
| if not rate_excl_tax and taxable_value and quantity: |
| try: |
| rate_excl_tax = str(round(float(taxable_value.replace(',', '')) / float(quantity), 2)) |
| except: |
| pass |
| |
| if not taxable_value and rate_excl_tax and quantity: |
| try: |
| taxable_value = str(round(float(rate_excl_tax.replace(',', '')) * float(quantity), 2)) |
| except: |
| pass |
| |
| |
| product_name = re.sub(r'\s+', ' ', product_name).strip() |
| |
| |
| if product_name or hsn or quantity: |
| items.append({ |
| "Serial": serial, |
| "Product Name": product_name, |
| "Part Number": part_no, |
| "HSN/SAC": hsn, |
| "Quantity": quantity, |
| "UOM": uom, |
| "Rate (Excl. Tax)": rate_excl_tax, |
| "Taxable Value": taxable_value, |
| "GST Percentage": gst_percentage, |
| "GST Amount": gst_amount, |
| "Discount Percentage": discount_percentage, |
| "Discount Amount": discount_amount |
| }) |
| |
| i += 1 |
| |
| except Exception as e: |
| print(f"Item extraction error at line {i}: {e}") |
| i += 1 |
| else: |
| i += 1 |
| |
| return items |
|
|
|
|
| |
| |
| |
| def extract_fields(text): |
|
|
| data = { |
| "Vendor Name": "", |
| "Vendor GSTIN": "", |
| "Vendor Contact": "", |
| "Buyer Name": "", |
| "Buyer GSTIN": "", |
| "Buyer Contact": "", |
| "Invoice Number": "", |
| "Invoice Date": "", |
| "Items": [] |
| } |
|
|
| lines = [l.strip() for l in text.split("\n") if l.strip()] |
| |
| |
| phone_regex = r"(?:(?:\+91|0)?[\s-]?)?[6-9]\d{9}" |
|
|
| |
| for i, l in enumerate(lines): |
| if "TAX INVOICE" in l.upper() or "INVOICE" in l.upper(): |
| for j in range(i + 1, min(i + 5, len(lines))): |
| candidate = lines[j] |
| if (len(candidate) > 3 and |
| not re.search(r"GSTIN|GST|PAN|ADDRESS|PHONE|EMAIL|^\d+$", candidate, re.I)): |
| data["Vendor Name"] = candidate |
| break |
| break |
| |
| if not data["Vendor Name"]: |
| for i in range(min(5, len(lines))): |
| if len(lines[i]) > 3 and not re.search(r"INVOICE|ORIGINAL|DUPLICATE", lines[i], re.I): |
| data["Vendor Name"] = lines[i] |
| break |
|
|
| |
| gst_regex = r"\b\d{2}[A-Z]{5}\d{4}[A-Z]\d[A-Z\d]{3}\b" |
| |
| |
| for i, l in enumerate(lines[:len(lines)//2]): |
| if "GSTIN" in l.upper() or "UIN" in l.upper(): |
| |
| for j in range(i, min(i + 3, len(lines))): |
| match = re.search(gst_regex, lines[j]) |
| if match and not data["Vendor GSTIN"]: |
| data["Vendor GSTIN"] = match.group() |
| break |
| |
| |
| for i, l in enumerate(lines): |
| if any(kw in l.upper() for kw in ["BUYER", "BILL TO"]): |
| for j in range(i, min(i + 10, len(lines))): |
| if "GSTIN" in lines[j].upper() or "UIN" in lines[j].upper(): |
| for k in range(j, min(j + 3, len(lines))): |
| match = re.search(gst_regex, lines[k]) |
| if match and match.group() != data["Vendor GSTIN"]: |
| data["Buyer GSTIN"] = match.group() |
| break |
| break |
| break |
|
|
| |
| first_half = "\n".join(lines[:len(lines)//2]) |
| vendor_phones = re.findall(phone_regex, first_half) |
| if vendor_phones: |
| data["Vendor Contact"] = vendor_phones[0].strip() |
|
|
| |
| buyer_keywords = ["BUYER", "BILL TO", "BILLED TO", "CUSTOMER", "CONSIGNEE", "SHIP TO"] |
| for i, l in enumerate(lines): |
| if any(keyword in l.upper() for keyword in buyer_keywords): |
| for j in range(i + 1, min(i + 10, len(lines))): |
| candidate = lines[j] |
| |
| if (len(candidate) > 3 and |
| not re.search(r"GSTIN|GST|STATE|CODE|ADDRESS|PHONE|GROUND|FLOOR|KHATA|PLOT|OPPOSITE|UNIT|DATED|PLACE", candidate, re.I)): |
| |
| if re.search(r"[A-Z]", candidate) and len(candidate) < 50: |
| data["Buyer Name"] = candidate |
| break |
| break |
|
|
| |
| middle_section = "\n".join(lines[len(lines)//4:3*len(lines)//4]) |
| buyer_phones = re.findall(phone_regex, middle_section) |
| for phone in buyer_phones: |
| if phone != data["Vendor Contact"]: |
| data["Buyer Contact"] = phone.strip() |
| break |
|
|
| |
| skip_words = ["KHATA", "PLOT", "POST", "LANE", "STATE", "DATED", "ORIGINAL", "DUPLICATE"] |
| inv_keywords = ["INVOICE NO", "INVOICE NUMBER", "INV NO", "BILL NO", "BILL NUMBER"] |
| |
| for i, l in enumerate(lines): |
| if any(keyword in l.upper() for keyword in inv_keywords): |
| parts = l.split(":") |
| if len(parts) > 1: |
| inv_candidate = parts[1].strip() |
| if inv_candidate and not any(sw in inv_candidate.upper() for sw in skip_words): |
| data["Invoice Number"] = inv_candidate |
| break |
| |
| for j in range(i + 1, min(i + 5, len(lines))): |
| cand = lines[j] |
| if not any(sw in cand.upper() for sw in skip_words): |
| if re.search(r"[A-Z0-9]", cand) and len(cand) > 2: |
| data["Invoice Number"] = cand |
| break |
| break |
|
|
| |
| date_patterns = [ |
| r"\b\d{1,2}-[A-Za-z]{3}-\d{2,4}\b", |
| r"\b\d{1,2}[-/]\d{1,2}[-/]\d{2,4}\b", |
| r"\b\d{4}[-/]\d{1,2}[-/]\d{1,2}\b", |
| ] |
| |
| date_keywords = ["DATE", "DATED", "INVOICE DATE", "BILL DATE"] |
| |
| for i, l in enumerate(lines): |
| |
| if any(keyword in l.upper() for keyword in date_keywords): |
| |
| for pattern in date_patterns: |
| m = re.search(pattern, l) |
| if m: |
| |
| potential_date = m.group(0) |
| if not re.search(r"[A-Z]{2,}", potential_date) and '/' not in potential_date[:5]: |
| data["Invoice Date"] = potential_date |
| break |
| |
| if data["Invoice Date"]: |
| break |
| |
| |
| for j in range(i + 1, min(i + 4, len(lines))): |
| for pattern in date_patterns: |
| m = re.search(pattern, lines[j]) |
| if m: |
| potential_date = m.group(0) |
| |
| if not re.search(r"[A-Z]{2,}", potential_date) and len(potential_date) < 15: |
| data["Invoice Date"] = potential_date |
| break |
| if data["Invoice Date"]: |
| break |
| break |
|
|
| |
| data["Items"] = extract_items(text) |
|
|
| return data |
|
|
|
|
| |
| |
| |
| def run_ocr(file_path): |
| try: |
| full_text = "" |
|
|
| |
| if file_path.lower().endswith(".pdf"): |
| pages = pdf_to_images(file_path) |
| for img in pages: |
| arr = np.array(img) |
| txt = reader.readtext(arr, detail=0) |
| full_text += "\n".join(txt) + "\n" |
|
|
| |
| else: |
| img = PILImage.open(file_path).convert("RGB") |
| arr = np.array(img) |
| txt = reader.readtext(arr, detail=0) |
| full_text = "\n".join(txt) |
|
|
| fields = extract_fields(full_text) |
| return full_text, fields |
| |
| except Exception as e: |
| return f"Error processing file: {str(e)}", {} |
|
|
|
|
| |
| |
| |
| app = FastAPI(title="Invoice OCR API", version="1.0") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Invoice OCR API", |
| "endpoints": { |
| "POST /api/extract": "Extract data from invoice (PDF/Image)", |
| "GET /docs": "API Documentation" |
| } |
| } |
|
|
|
|
| @app.post("/api/extract") |
| async def extract_api(file: UploadFile = File(...)): |
| try: |
| allowed_types = ["application/pdf", "image/jpeg", "image/png", "image/jpg"] |
| if file.content_type not in allowed_types: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid file type. Allowed: PDF, JPEG, PNG" |
| ) |
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
|
|
| full_text, fields = run_ocr(tmp_path) |
| os.unlink(tmp_path) |
|
|
| return JSONResponse({ |
| "success": True, |
| "filename": file.filename, |
| "text": full_text, |
| "fields": fields |
| }) |
|
|
| except HTTPException as he: |
| raise he |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| |
| |
| def process_invoice(file): |
| if file is None: |
| return "No file uploaded", {} |
| |
| full_text, fields = run_ocr(file.name) |
| return full_text, fields |
|
|
|
|
| demo = gr.Interface( |
| fn=process_invoice, |
| inputs=gr.File(type="filepath", label="Upload Invoice (PDF/Image)"), |
| outputs=[ |
| gr.Textbox(label="Extracted Text", lines=10), |
| gr.JSON(label="Extracted Fields") |
| ], |
| title="📄 Invoice OCR Extractor", |
| description="Upload PDF or Image invoices to extract text and structured data using EasyOCR", |
| examples=None, |
| cache_examples=False |
| ) |
|
|
|
|
| |
| |
| |
| app = gr.mount_gradio_app(app, demo, path="/") |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |