Spaces:
Sleeping
Sleeping
| # src/extraction.py | |
| import re | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| from difflib import SequenceMatcher | |
| def extract_dates(text: str) -> List[str]: | |
| """ | |
| Robust date extraction that handles: | |
| - Numeric formats: DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY | |
| - Text month formats: 22 Mar 18, March 22, 2018, 22-Mar-2018 | |
| - OCR noise like pipes (|) instead of slashes | |
| Validates using datetime to ensure semantic correctness. | |
| """ | |
| if not text: return [] | |
| # Month name mappings | |
| MONTH_MAP = { | |
| 'jan': 1, 'january': 1, | |
| 'feb': 2, 'february': 2, | |
| 'mar': 3, 'march': 3, | |
| 'apr': 4, 'april': 4, | |
| 'may': 5, | |
| 'jun': 6, 'june': 6, | |
| 'jul': 7, 'july': 7, | |
| 'aug': 8, 'august': 8, | |
| 'sep': 9, 'sept': 9, 'september': 9, | |
| 'oct': 10, 'october': 10, | |
| 'nov': 11, 'november': 11, | |
| 'dec': 12, 'december': 12 | |
| } | |
| valid_dates = [] | |
| # Pattern 1: Numeric dates - DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY, DD MM YYYY | |
| # Also handles OCR noise like pipes (|) instead of slashes | |
| numeric_pattern = r'\b(\d{1,2})[\s/|.-](\d{1,2})[\s/|.-](\d{2,4})\b' | |
| for d, m, y in re.findall(numeric_pattern, text): | |
| try: | |
| year = int(y) | |
| if year < 100: | |
| year = 2000 + year if year < 50 else 1900 + year | |
| dt = datetime(year, int(m), int(d)) | |
| valid_dates.append(dt.strftime("%d/%m/%Y")) | |
| except ValueError: | |
| continue | |
| # Pattern 2: DD Mon YY/YYYY (e.g., "22 Mar 18", "22-Mar-2018", "22 March 2018") | |
| text_month_pattern1 = r'\b(\d{1,2})[\s/.-]?([A-Za-z]{3,9})[\s/.-]?(\d{2,4})\b' | |
| for d, m, y in re.findall(text_month_pattern1, text, re.IGNORECASE): | |
| month_num = MONTH_MAP.get(m.lower()) | |
| if month_num: | |
| try: | |
| year = int(y) | |
| if year < 100: | |
| year = 2000 + year if year < 50 else 1900 + year | |
| dt = datetime(year, month_num, int(d)) | |
| valid_dates.append(dt.strftime("%d/%m/%Y")) | |
| except ValueError: | |
| continue | |
| # Pattern 3: Mon DD, YYYY (e.g., "March 22, 2018", "Mar 22 2018") | |
| text_month_pattern2 = r'\b([A-Za-z]{3,9})[\s.-]?(\d{1,2})[,\s.-]+(\d{2,4})\b' | |
| for m, d, y in re.findall(text_month_pattern2, text, re.IGNORECASE): | |
| month_num = MONTH_MAP.get(m.lower()) | |
| if month_num: | |
| try: | |
| year = int(y) | |
| if year < 100: | |
| year = 2000 + year if year < 50 else 1900 + year | |
| dt = datetime(year, month_num, int(d)) | |
| valid_dates.append(dt.strftime("%d/%m/%Y")) | |
| except ValueError: | |
| continue | |
| # Pattern 4: YYYY-MM-DD (ISO format) | |
| iso_pattern = r'\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b' | |
| for y, m, d in re.findall(iso_pattern, text): | |
| try: | |
| dt = datetime(int(y), int(m), int(d)) | |
| valid_dates.append(dt.strftime("%d/%m/%Y")) | |
| except ValueError: | |
| continue | |
| return list(dict.fromkeys(valid_dates)) # Deduplicate while preserving order | |
| def extract_amounts(text: str) -> List[float]: | |
| if not text: return [] | |
| # Matches: 1,234.56 or 1234.56 | |
| pattern = r'\b\d{1,3}(?:,\d{3})*\.\d{2}\b' | |
| amounts_strings = re.findall(pattern, text) | |
| amounts = [] | |
| for amt_str in amounts_strings: | |
| amt_cleaned = amt_str.replace(',', '') | |
| try: | |
| amounts.append(float(amt_cleaned)) | |
| except ValueError: | |
| continue | |
| return amounts | |
| def extract_total(text: str) -> Optional[float]: | |
| """ | |
| Robust total extraction using keyword confidence + Footer Search. | |
| """ | |
| if not text: return None | |
| # 1. Try specific "Total" keywords first (Highest Confidence) | |
| # Looks for "Total: 123.45" or "Total Amount $123.45" | |
| pattern = r'(?:TOTAL|AMOUNT DUE|GRAND TOTAL|BALANCE|PAYABLE)[\w\s]*[:$]?\s*([\d,]+\.\d{2})' | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| if matches: | |
| # Return the last match (often the grand total at bottom) | |
| try: | |
| return float(matches[-1].replace(',', '')) | |
| except ValueError: | |
| pass | |
| # 2. Fallback: Context-Aware Footer Search (Medium Confidence) | |
| # Instead of taking max() of the whole doc (risky), we only look at the bottom 30% | |
| lines = text.split('\n') | |
| if not lines: return None | |
| # Focus on the footer where totals usually live | |
| footer_lines = lines[-int(len(lines)*0.3):] | |
| candidates = [] | |
| for line in footer_lines: | |
| line_amounts = extract_amounts(line) | |
| for amt in line_amounts: | |
| # Simple heuristic: Totals are rarely 'years' like 2024 or 2025 | |
| if 2000 <= amt <= 2030 and float(amt).is_integer(): | |
| continue | |
| candidates.append(amt) | |
| if candidates: | |
| return max(candidates) | |
| return None | |
| def extract_vendor(text: str) -> Optional[str]: | |
| if not text: return None | |
| lines = text.strip().split('\n') | |
| company_suffixes = ['SDN BHD', 'INC', 'LTD', 'LLC', 'PLC', 'CORP', 'PTY', 'PVT', 'LIMITED'] | |
| for line in lines[:10]: # Check top 10 lines | |
| line_upper = line.upper() | |
| if any(suffix in line_upper for suffix in company_suffixes): | |
| return line.strip() | |
| # Fallback: Return first non-empty line that isn't a date | |
| for line in lines[:5]: | |
| if len(line.strip()) > 3 and not re.search(r'\d{2}/\d{2}', line): | |
| return line.strip() | |
| return None | |
| def extract_invoice_number(text: str) -> Optional[str]: | |
| if not text: return None | |
| # 1. BLOCK LIST: Words that might be captured as the ID itself by mistake | |
| FORBIDDEN_WORDS = { | |
| 'INVOICE', 'TAX', 'RECEIPT', 'BILL', 'NUMBER', 'NO', 'DATE', | |
| 'ORIGINAL', 'COPY', 'GST', 'REG', 'MEMBER', 'SLIP', 'TEL', 'FAX' | |
| } | |
| # 2. TOXIC CONTEXTS: If a line contains these, it's likely a Tax ID or Phone #, not an Invoice # | |
| # We skip the line entirely if these are found (unless "INVOICE" is also strictly present) | |
| TOXIC_LINE_INDICATORS = ['GST', 'REG', 'SSM', 'TIN', 'PHONE', 'TEL', 'FAX', 'UBL', 'UEN'] | |
| # Strategy 1: Explicit Label Search (High Confidence) | |
| # matches "Invoice No:", "Slip No:", "Bill #:", etc. | |
| # ADDED: 'SLIP' to the valid prefixes | |
| keyword_pattern = r'(?i)(?:TAX\s*)?(?:INVOICE|INV|BILL|RECEIPT|SLIP)\s*(?:NO|NUMBER|#|NUM)\s*[:\.]?\s*([A-Z0-9\-/]+)' | |
| matches = re.findall(keyword_pattern, text) | |
| for match in matches: | |
| clean_match = match.strip() | |
| # Verify length and ensure the match itself isn't a forbidden word | |
| if len(clean_match) >= 3 and clean_match.upper() not in FORBIDDEN_WORDS: | |
| return clean_match | |
| # Strategy 2: Contextual Line Search (Medium Confidence) | |
| # We scan line-by-line for loose patterns like "No: 12345" or "Slip: 555" | |
| lines = text.split('\n') | |
| for line in lines[:25]: # Scan top 25 lines | |
| line_upper = line.upper() | |
| # CRITICAL FIX: Skip lines that look like Tax IDs (GST/REG) | |
| # But allow if the line explicitly says "INVOICE" (e.g. "Tax Invoice / GST Reg No") | |
| if any(bad in line_upper for bad in TOXIC_LINE_INDICATORS) and "INVOICE" not in line_upper: | |
| continue | |
| # Look for Invoice-like keywords (Added SLIP) | |
| # matches " NO", " #", "SLIP" | |
| if any(k in line_upper for k in ['INVOICE', ' NO', ' #', 'INV', 'SLIP', 'BILL']): | |
| # Find candidate tokens: 3+ alphanumeric chars | |
| tokens = re.findall(r'\b[A-Z0-9\-/]{3,}\b', line_upper) | |
| for token in tokens: | |
| if token in FORBIDDEN_WORDS: | |
| continue | |
| # Heuristic: Invoice numbers almost always have digits. | |
| # This filters out purely alpha strings like "CREDIT" or "CASH" | |
| if any(c.isdigit() for c in token): | |
| return token | |
| return None | |
| def extract_bill_to(text: str) -> Optional[Dict[str, str]]: | |
| if not text: return None | |
| # Look for "Bill To" block | |
| match = re.search(r'(?:BILL|BILLED)\s*TO[:\s]+([^\n]+)', text, re.IGNORECASE) | |
| if match: | |
| name = match.group(1).strip() | |
| return {"name": name, "email": None} | |
| return None | |
| def extract_address(text: str, vendor_name: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Generalized Address Extraction using Spatial Heuristics. | |
| Strategy: | |
| 1. If Vendor is known, look at the lines immediately FOLLOWING it (Spatial). | |
| 2. If Vendor is unknown, look for lines in the top header with 'Address-like' traits | |
| (mix of text + numbers, 3+ words, contains Zip-code-like patterns). | |
| """ | |
| if not text: return None | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| # --- FILTERS (Generalized) --- | |
| # Skip lines that are clearly NOT addresses | |
| def is_invalid_line(line): | |
| line_upper = line.upper() | |
| # 1. It's a Phone/Fax/Email/URL | |
| if any(x in line_upper for x in ['TEL', 'FAX', 'PHONE', 'EMAIL', '@', 'WWW.', '.COM', 'HTTP']): | |
| return True | |
| # 2. It's a Date | |
| if len(line) < 15 and any(c.isdigit() for c in line) and ('/' in line or '-' in line): | |
| return True | |
| # 3. It's the Vendor name itself (if provided) | |
| if vendor_name and vendor_name.lower() in line.lower(): | |
| return True | |
| return False | |
| # --- STRATEGY 1: Contextual Search (Below Vendor) --- | |
| # This is the most accurate method for receipts worldwide. | |
| candidate_lines = [] | |
| if vendor_name: | |
| vendor_found = False | |
| # Find where the vendor appears | |
| for i, line in enumerate(lines[:15]): # Check top 15 lines only | |
| if vendor_name.lower() in line.lower() or (len(vendor_name) > 5 and SequenceMatcher(None, vendor_name, line).ratio() > 0.8): | |
| vendor_found = True | |
| # Grab the next 1-3 lines as the potential address block | |
| # We stop if we hit a phone number or blank line | |
| for j in range(1, 4): | |
| if i + j < len(lines): | |
| next_line = lines[i + j] | |
| if not is_invalid_line(next_line): | |
| candidate_lines.append(next_line) | |
| else: | |
| # If we hit a phone number, the address block usually ended | |
| break | |
| break | |
| # If Strategy 1 found something, join it and return | |
| if candidate_lines: | |
| return ", ".join(candidate_lines) | |
| # --- STRATEGY 2: Header Scan (Density Heuristic) --- | |
| # If we couldn't anchor to the vendor, we scan the top 10 lines for "Address-looking" text. | |
| # An address usually has: | |
| # - At least one digit (Building number, Zip code) | |
| # - At least 3 words | |
| # - Is NOT a phone number | |
| # | |
| # CONTIGUITY RULE: Once we start collecting candidates, we STOP at the first | |
| # invalid line (phone/fax/etc). This prevents capturing non-adjacent lines | |
| # like GST numbers that appear after phone numbers. | |
| fallback_candidates = [] | |
| started_collecting = False | |
| for line in lines[:10]: | |
| if is_invalid_line(line): | |
| # If we've already started collecting, an invalid line means | |
| # the address block has ended - don't continue past it | |
| if started_collecting: | |
| break | |
| continue | |
| # Check for Address Density: | |
| # 1. Has digits (e.g. "123 Main St" or "Singapore 55123") | |
| has_digits = any(c.isdigit() for c in line) | |
| # 2. Length is substantial (avoid short noise) | |
| is_long_enough = len(line) > 10 | |
| # 3. Has spaces (at least 2 spaces => 3 words) | |
| is_multi_word = line.count(' ') >= 2 | |
| # FIRST line must have digits (to anchor on building/street number) | |
| # CONTINUATION lines only need length + multi-word (city/state names often lack digits) | |
| is_valid_first_line = has_digits and is_long_enough and is_multi_word | |
| is_valid_continuation = started_collecting and is_long_enough and is_multi_word | |
| if is_valid_first_line or is_valid_continuation: | |
| # We found a strong candidate line | |
| fallback_candidates.append(line) | |
| started_collecting = True | |
| # If we have 3 candidates, that's probably the full address block | |
| if len(fallback_candidates) >= 3: | |
| break | |
| if fallback_candidates: | |
| return ", ".join(fallback_candidates) | |
| return None | |
| def extract_line_items(text: str) -> List[Dict[str, Any]]: | |
| return [] | |
| def structure_output(text: str) -> Dict[str, Any]: | |
| """Legacy wrapper for rule-based-only pipeline""" | |
| return { | |
| "receipt_number": extract_invoice_number(text), | |
| "date": extract_dates(text)[0] if extract_dates(text) else None, | |
| "total_amount": extract_total(text), | |
| "vendor": extract_vendor(text), | |
| "raw_text": text | |
| } |