invoice-processor-ml / src /extraction.py
GSoumyajit2005's picture
feat: PDF preview, database integration, and improved error handling
2a944a5
# src/extraction.py
import re
from typing import List, Dict, Optional, Any
from datetime import datetime
from difflib import SequenceMatcher
def extract_dates(text: str) -> List[str]:
"""
Robust date extraction that handles:
- Numeric formats: DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY
- Text month formats: 22 Mar 18, March 22, 2018, 22-Mar-2018
- OCR noise like pipes (|) instead of slashes
Validates using datetime to ensure semantic correctness.
"""
if not text: return []
# Month name mappings
MONTH_MAP = {
'jan': 1, 'january': 1,
'feb': 2, 'february': 2,
'mar': 3, 'march': 3,
'apr': 4, 'april': 4,
'may': 5,
'jun': 6, 'june': 6,
'jul': 7, 'july': 7,
'aug': 8, 'august': 8,
'sep': 9, 'sept': 9, 'september': 9,
'oct': 10, 'october': 10,
'nov': 11, 'november': 11,
'dec': 12, 'december': 12
}
valid_dates = []
# Pattern 1: Numeric dates - DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY, DD MM YYYY
# Also handles OCR noise like pipes (|) instead of slashes
numeric_pattern = r'\b(\d{1,2})[\s/|.-](\d{1,2})[\s/|.-](\d{2,4})\b'
for d, m, y in re.findall(numeric_pattern, text):
try:
year = int(y)
if year < 100:
year = 2000 + year if year < 50 else 1900 + year
dt = datetime(year, int(m), int(d))
valid_dates.append(dt.strftime("%d/%m/%Y"))
except ValueError:
continue
# Pattern 2: DD Mon YY/YYYY (e.g., "22 Mar 18", "22-Mar-2018", "22 March 2018")
text_month_pattern1 = r'\b(\d{1,2})[\s/.-]?([A-Za-z]{3,9})[\s/.-]?(\d{2,4})\b'
for d, m, y in re.findall(text_month_pattern1, text, re.IGNORECASE):
month_num = MONTH_MAP.get(m.lower())
if month_num:
try:
year = int(y)
if year < 100:
year = 2000 + year if year < 50 else 1900 + year
dt = datetime(year, month_num, int(d))
valid_dates.append(dt.strftime("%d/%m/%Y"))
except ValueError:
continue
# Pattern 3: Mon DD, YYYY (e.g., "March 22, 2018", "Mar 22 2018")
text_month_pattern2 = r'\b([A-Za-z]{3,9})[\s.-]?(\d{1,2})[,\s.-]+(\d{2,4})\b'
for m, d, y in re.findall(text_month_pattern2, text, re.IGNORECASE):
month_num = MONTH_MAP.get(m.lower())
if month_num:
try:
year = int(y)
if year < 100:
year = 2000 + year if year < 50 else 1900 + year
dt = datetime(year, month_num, int(d))
valid_dates.append(dt.strftime("%d/%m/%Y"))
except ValueError:
continue
# Pattern 4: YYYY-MM-DD (ISO format)
iso_pattern = r'\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b'
for y, m, d in re.findall(iso_pattern, text):
try:
dt = datetime(int(y), int(m), int(d))
valid_dates.append(dt.strftime("%d/%m/%Y"))
except ValueError:
continue
return list(dict.fromkeys(valid_dates)) # Deduplicate while preserving order
def extract_amounts(text: str) -> List[float]:
if not text: return []
# Matches: 1,234.56 or 1234.56
pattern = r'\b\d{1,3}(?:,\d{3})*\.\d{2}\b'
amounts_strings = re.findall(pattern, text)
amounts = []
for amt_str in amounts_strings:
amt_cleaned = amt_str.replace(',', '')
try:
amounts.append(float(amt_cleaned))
except ValueError:
continue
return amounts
def extract_total(text: str) -> Optional[float]:
"""
Robust total extraction using keyword confidence + Footer Search.
"""
if not text: return None
# 1. Try specific "Total" keywords first (Highest Confidence)
# Looks for "Total: 123.45" or "Total Amount $123.45"
pattern = r'(?:TOTAL|AMOUNT DUE|GRAND TOTAL|BALANCE|PAYABLE)[\w\s]*[:$]?\s*([\d,]+\.\d{2})'
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
# Return the last match (often the grand total at bottom)
try:
return float(matches[-1].replace(',', ''))
except ValueError:
pass
# 2. Fallback: Context-Aware Footer Search (Medium Confidence)
# Instead of taking max() of the whole doc (risky), we only look at the bottom 30%
lines = text.split('\n')
if not lines: return None
# Focus on the footer where totals usually live
footer_lines = lines[-int(len(lines)*0.3):]
candidates = []
for line in footer_lines:
line_amounts = extract_amounts(line)
for amt in line_amounts:
# Simple heuristic: Totals are rarely 'years' like 2024 or 2025
if 2000 <= amt <= 2030 and float(amt).is_integer():
continue
candidates.append(amt)
if candidates:
return max(candidates)
return None
def extract_vendor(text: str) -> Optional[str]:
if not text: return None
lines = text.strip().split('\n')
company_suffixes = ['SDN BHD', 'INC', 'LTD', 'LLC', 'PLC', 'CORP', 'PTY', 'PVT', 'LIMITED']
for line in lines[:10]: # Check top 10 lines
line_upper = line.upper()
if any(suffix in line_upper for suffix in company_suffixes):
return line.strip()
# Fallback: Return first non-empty line that isn't a date
for line in lines[:5]:
if len(line.strip()) > 3 and not re.search(r'\d{2}/\d{2}', line):
return line.strip()
return None
def extract_invoice_number(text: str) -> Optional[str]:
if not text: return None
# 1. BLOCK LIST: Words that might be captured as the ID itself by mistake
FORBIDDEN_WORDS = {
'INVOICE', 'TAX', 'RECEIPT', 'BILL', 'NUMBER', 'NO', 'DATE',
'ORIGINAL', 'COPY', 'GST', 'REG', 'MEMBER', 'SLIP', 'TEL', 'FAX'
}
# 2. TOXIC CONTEXTS: If a line contains these, it's likely a Tax ID or Phone #, not an Invoice #
# We skip the line entirely if these are found (unless "INVOICE" is also strictly present)
TOXIC_LINE_INDICATORS = ['GST', 'REG', 'SSM', 'TIN', 'PHONE', 'TEL', 'FAX', 'UBL', 'UEN']
# Strategy 1: Explicit Label Search (High Confidence)
# matches "Invoice No:", "Slip No:", "Bill #:", etc.
# ADDED: 'SLIP' to the valid prefixes
keyword_pattern = r'(?i)(?:TAX\s*)?(?:INVOICE|INV|BILL|RECEIPT|SLIP)\s*(?:NO|NUMBER|#|NUM)\s*[:\.]?\s*([A-Z0-9\-/]+)'
matches = re.findall(keyword_pattern, text)
for match in matches:
clean_match = match.strip()
# Verify length and ensure the match itself isn't a forbidden word
if len(clean_match) >= 3 and clean_match.upper() not in FORBIDDEN_WORDS:
return clean_match
# Strategy 2: Contextual Line Search (Medium Confidence)
# We scan line-by-line for loose patterns like "No: 12345" or "Slip: 555"
lines = text.split('\n')
for line in lines[:25]: # Scan top 25 lines
line_upper = line.upper()
# CRITICAL FIX: Skip lines that look like Tax IDs (GST/REG)
# But allow if the line explicitly says "INVOICE" (e.g. "Tax Invoice / GST Reg No")
if any(bad in line_upper for bad in TOXIC_LINE_INDICATORS) and "INVOICE" not in line_upper:
continue
# Look for Invoice-like keywords (Added SLIP)
# matches " NO", " #", "SLIP"
if any(k in line_upper for k in ['INVOICE', ' NO', ' #', 'INV', 'SLIP', 'BILL']):
# Find candidate tokens: 3+ alphanumeric chars
tokens = re.findall(r'\b[A-Z0-9\-/]{3,}\b', line_upper)
for token in tokens:
if token in FORBIDDEN_WORDS:
continue
# Heuristic: Invoice numbers almost always have digits.
# This filters out purely alpha strings like "CREDIT" or "CASH"
if any(c.isdigit() for c in token):
return token
return None
def extract_bill_to(text: str) -> Optional[Dict[str, str]]:
if not text: return None
# Look for "Bill To" block
match = re.search(r'(?:BILL|BILLED)\s*TO[:\s]+([^\n]+)', text, re.IGNORECASE)
if match:
name = match.group(1).strip()
return {"name": name, "email": None}
return None
def extract_address(text: str, vendor_name: Optional[str] = None) -> Optional[str]:
"""
Generalized Address Extraction using Spatial Heuristics.
Strategy:
1. If Vendor is known, look at the lines immediately FOLLOWING it (Spatial).
2. If Vendor is unknown, look for lines in the top header with 'Address-like' traits
(mix of text + numbers, 3+ words, contains Zip-code-like patterns).
"""
if not text: return None
lines = [line.strip() for line in text.split('\n') if line.strip()]
# --- FILTERS (Generalized) ---
# Skip lines that are clearly NOT addresses
def is_invalid_line(line):
line_upper = line.upper()
# 1. It's a Phone/Fax/Email/URL
if any(x in line_upper for x in ['TEL', 'FAX', 'PHONE', 'EMAIL', '@', 'WWW.', '.COM', 'HTTP']):
return True
# 2. It's a Date
if len(line) < 15 and any(c.isdigit() for c in line) and ('/' in line or '-' in line):
return True
# 3. It's the Vendor name itself (if provided)
if vendor_name and vendor_name.lower() in line.lower():
return True
return False
# --- STRATEGY 1: Contextual Search (Below Vendor) ---
# This is the most accurate method for receipts worldwide.
candidate_lines = []
if vendor_name:
vendor_found = False
# Find where the vendor appears
for i, line in enumerate(lines[:15]): # Check top 15 lines only
if vendor_name.lower() in line.lower() or (len(vendor_name) > 5 and SequenceMatcher(None, vendor_name, line).ratio() > 0.8):
vendor_found = True
# Grab the next 1-3 lines as the potential address block
# We stop if we hit a phone number or blank line
for j in range(1, 4):
if i + j < len(lines):
next_line = lines[i + j]
if not is_invalid_line(next_line):
candidate_lines.append(next_line)
else:
# If we hit a phone number, the address block usually ended
break
break
# If Strategy 1 found something, join it and return
if candidate_lines:
return ", ".join(candidate_lines)
# --- STRATEGY 2: Header Scan (Density Heuristic) ---
# If we couldn't anchor to the vendor, we scan the top 10 lines for "Address-looking" text.
# An address usually has:
# - At least one digit (Building number, Zip code)
# - At least 3 words
# - Is NOT a phone number
#
# CONTIGUITY RULE: Once we start collecting candidates, we STOP at the first
# invalid line (phone/fax/etc). This prevents capturing non-adjacent lines
# like GST numbers that appear after phone numbers.
fallback_candidates = []
started_collecting = False
for line in lines[:10]:
if is_invalid_line(line):
# If we've already started collecting, an invalid line means
# the address block has ended - don't continue past it
if started_collecting:
break
continue
# Check for Address Density:
# 1. Has digits (e.g. "123 Main St" or "Singapore 55123")
has_digits = any(c.isdigit() for c in line)
# 2. Length is substantial (avoid short noise)
is_long_enough = len(line) > 10
# 3. Has spaces (at least 2 spaces => 3 words)
is_multi_word = line.count(' ') >= 2
# FIRST line must have digits (to anchor on building/street number)
# CONTINUATION lines only need length + multi-word (city/state names often lack digits)
is_valid_first_line = has_digits and is_long_enough and is_multi_word
is_valid_continuation = started_collecting and is_long_enough and is_multi_word
if is_valid_first_line or is_valid_continuation:
# We found a strong candidate line
fallback_candidates.append(line)
started_collecting = True
# If we have 3 candidates, that's probably the full address block
if len(fallback_candidates) >= 3:
break
if fallback_candidates:
return ", ".join(fallback_candidates)
return None
def extract_line_items(text: str) -> List[Dict[str, Any]]:
return []
def structure_output(text: str) -> Dict[str, Any]:
"""Legacy wrapper for rule-based-only pipeline"""
return {
"receipt_number": extract_invoice_number(text),
"date": extract_dates(text)[0] if extract_dates(text) else None,
"total_amount": extract_total(text),
"vendor": extract_vendor(text),
"raw_text": text
}