# parser.py # Handles ingestion of PDF, CSV, XLS/XLSX, DOCX statement files # Normalizes all formats into a standard DataFrame schema: # date (datetime), merchant (str), amount (float), raw_merchant (str), source_file (str) import io import re import pandas as pd from datetime import datetime from typing import Optional from merchant_map import normalize_merchant # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _clean_amount(val) -> Optional[float]: """Convert various amount formats to a positive float charge, or None.""" if val is None: return None s = str(val).strip().replace(",", "").replace("$", "").replace(" ", "") # Some banks use parentheses for debits: (123.45) negative = False if s.startswith("(") and s.endswith(")"): s = s[1:-1] negative = True try: amt = float(s) except ValueError: return None # Some exports use negative for charges, positive for payments # We want charges as positive — caller decides which sign convention return abs(amt) if not negative else abs(amt) def _looks_like_payment(merchant: str, amount: float, credit_flag=False) -> bool: """Heuristic: is this row a payment/credit rather than a purchase?""" if credit_flag: return True m = merchant.lower() payment_keywords = [ "payment", "thank you", "autopay", "credit", "refund", "return", "adjustment", "reward", "cashback", "cash back", "transfer", "deposit", "interest charge", "fee waiver", ] return any(kw in m for kw in payment_keywords) def _parse_date(val) -> Optional[datetime]: """Try multiple date formats.""" if isinstance(val, datetime): return val if isinstance(val, pd.Timestamp): return val.to_pydatetime() s = str(val).strip() formats = [ "%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d-%b-%Y", "%b %d, %Y", "%B %d, %Y", "%d/%m/%Y", "%m-%d-%Y", "%Y%m%d", ] for fmt in formats: try: return datetime.strptime(s, fmt) except ValueError: continue return None # ───────────────────────────────────────────────────────────────────────────── # Summary-page filter — shared across ALL PDF strategies # Pages containing these phrases are overview/totals pages, not transaction # listing pages. Skip them entirely to avoid pulling in summary rows. # ───────────────────────────────────────────────────────────────────────────── _SUMMARY_PAGE_SIGNALS = re.compile( r"(account\s+summary|statement\s+summary|previous\s+balance" r"|new\s+balance\s+total|credit\s+limit|minimum\s+payment\s+due" r"|opening/closing\s+date|payment\s+information" r"|total\s+credit\s+line|statement\s+closing\s+date)", re.IGNORECASE, ) # Merchant strings that look like statement summary rows, not real merchants. # Used to filter false positives from Strategy 2 regex matches. _FAKE_MERCHANT_SIGNALS = re.compile( r"^(new balance|previous balance|minimum payment|payment due" r"|total credit|interest charge|fees charged|purchases and adj" r"|payments and other|statement closing|days in billing)", re.IGNORECASE, ) # ───────────────────────────────────────────────────────────────────────────── # Format-specific parsers # ───────────────────────────────────────────────────────────────────────────── def _parse_csv(file_bytes: bytes, filename: str) -> pd.DataFrame: """Parse CSV bank exports. Handles many column name variants.""" try: df = pd.read_csv(io.BytesIO(file_bytes), dtype=str, on_bad_lines="skip") except Exception: df = pd.read_csv(io.BytesIO(file_bytes), dtype=str, error_bad_lines=False) df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] # Date column detection date_candidates = ["date", "transaction_date", "trans_date", "post_date", "posted_date", "activity_date", "transaction date"] date_col = next((c for c in date_candidates if c in df.columns), None) if not date_col: date_col = next((c for c in df.columns if "date" in c), None) # Merchant / description column detection desc_candidates = ["description", "merchant", "payee", "name", "merchant_name", "transaction_description", "memo", "details", "narrative"] desc_col = next((c for c in desc_candidates if c in df.columns), None) if not desc_col: desc_col = next((c for c in df.columns if any(k in c for k in ["desc", "merch", "payee", "name"])), None) # Amount column detection amt_candidates = ["amount", "debit", "charge", "transaction_amount", "debit_amount", "withdrawal", "charged_amount"] amt_col = next((c for c in amt_candidates if c in df.columns), None) if not amt_col: amt_col = next((c for c in df.columns if "amount" in c or "debit" in c), None) # Credit column (to detect payments) credit_col = next((c for c in df.columns if "credit" in c), None) if not all([date_col, desc_col, amt_col]): return pd.DataFrame() rows = [] for _, row in df.iterrows(): date = _parse_date(row.get(date_col, "")) merchant_raw = str(row.get(desc_col, "")).strip() amt = _clean_amount(row.get(amt_col, "")) is_credit = credit_col and str(row.get(credit_col, "")).strip() not in ("", "0", "0.00", "nan") if date is None or amt is None or amt <= 0: continue if _looks_like_payment(merchant_raw, amt, is_credit): continue rows.append({ "date": date, "raw_merchant": merchant_raw, "merchant": normalize_merchant(merchant_raw), "amount": amt, "source_file": filename, }) return pd.DataFrame(rows) def _parse_excel(file_bytes: bytes, filename: str) -> pd.DataFrame: """Parse XLS/XLSX exports — tries each sheet.""" frames = [] try: xl = pd.ExcelFile(io.BytesIO(file_bytes)) for sheet in xl.sheet_names: try: df = xl.parse(sheet, dtype=str) df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns] # Reuse CSV logic by converting to CSV bytes csv_bytes = df.to_csv(index=False).encode() parsed = _parse_csv(csv_bytes, filename) if not parsed.empty: frames.append(parsed) except Exception: continue except Exception: pass return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() def _parse_pdf(file_bytes: bytes, filename: str) -> pd.DataFrame: """ Parse PDF credit card statements. Strategy 1: pdfplumber table extraction (structured) Strategy 2: raw text full-date regex (MM/DD/YYYY style) Strategy 3: two-date MM/DD format without year (Bank of America, etc.) """ import pdfplumber rows = [] full_text = "" # Extract text once for all text-based strategies try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: full_text = "\n".join(page.extract_text() or "" for page in pdf.pages) except Exception: pass # ── Strategy 1: Table extraction ───────────────────────────────────── try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: for page in pdf.pages: tables = page.extract_tables() for table in tables: if not table or len(table) < 2: continue # Normalize headers — collapse any whitespace (including \n from # multi-line header cells) into underscores headers = [ re.sub(r'\s+', '_', str(h).strip().lower()) if h else "" for h in table[0] ] for data_row in table[1:]: if not data_row: continue row_dict = {headers[i]: str(data_row[i]).strip() if data_row[i] else "" for i in range(min(len(headers), len(data_row)))} # Try to find date, merchant, amount in this row date_val = next((row_dict[k] for k in row_dict if "date" in k and row_dict[k]), None) desc_val = next((row_dict[k] for k in row_dict if any(x in k for x in ["desc", "merch", "payee", "name"]) and row_dict[k]), None) amt_val = next((row_dict[k] for k in row_dict if any(x in k for x in ["amount", "debit", "charge"]) and row_dict[k]), None) if not amt_val: # Try last numeric-looking column for k in reversed(list(row_dict.keys())): cleaned = row_dict[k].replace(",", "").replace("$", "").replace("(", "").replace(")", "") try: float(cleaned) amt_val = row_dict[k] break except ValueError: continue if not desc_val: # Use second column as fallback description vals = list(row_dict.values()) desc_val = vals[1] if len(vals) > 1 else "" date = _parse_date(date_val) if date_val else None amt = _clean_amount(amt_val) if amt_val else None merchant_raw = str(desc_val).strip() if desc_val else "" if date is None or amt is None or amt <= 0 or not merchant_raw: continue if _looks_like_payment(merchant_raw, amt): continue rows.append({ "date": date, "raw_merchant": merchant_raw, "merchant": normalize_merchant(merchant_raw), "amount": amt, "source_file": filename, }) except Exception: pass # ── Strategy 2: Full-date regex (MM/DD/YYYY or YYYY-MM-DD etc.) ────── # Runs page-by-page (not on full_text) so summary pages can be skipped. if not rows and full_text: pattern = re.compile( r"(\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4})\s+" r"([A-Za-z][^\d\n]{3,50})\s+" r"\$?([\d,]+\.\d{2})" ) try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: pages_text_s2 = [page.extract_text() or "" for page in pdf.pages] except Exception: pages_text_s2 = [full_text] for page_text in pages_text_s2: # Skip summary/overview pages — they contain balance totals that # look like transactions but aren't (e.g. "New Balance Total $4,814") if _SUMMARY_PAGE_SIGNALS.search(page_text): continue for match in pattern.finditer(page_text): date_str, desc, amt_str = match.groups() date = _parse_date(date_str) amt = _clean_amount(amt_str) merchant_raw = desc.strip() if date is None or amt is None or amt <= 0: continue if _looks_like_payment(merchant_raw, amt): continue # Guard against summary row false positives if _FAKE_MERCHANT_SIGNALS.search(merchant_raw): continue rows.append({ "date": date, "raw_merchant": merchant_raw, "merchant": normalize_merchant(merchant_raw), "amount": amt, "source_file": filename, }) # ── Strategy 3: Two-date MM/DD format — Bank of America and similar ── # Format: MM/DD MM/DD DESCRIPTION REF(4) ACCT(4) AMOUNT # Dates have no year; infer from statement period in the header text. # NOTE: Run this BEFORE Strategy 2, and independently of row count. # BofA PDFs will always match here; if we get more hits than rows, use these. s3_rows = [] if full_text: # Extract closing month/year from text like "December 13 - January 12, 2025" closing_year = datetime.now().year closing_month = datetime.now().month period_match = re.search( r'\w+\s+\d{1,2}\s*[-\u2013]\s*(\w+)\s+\d{1,2}[,\s]+(\d{4})', full_text, ) if period_match: try: closing_month = datetime.strptime(period_match.group(1), "%B").month closing_year = int(period_match.group(2)) except ValueError: pass # Each transaction line: MM/DD MM/DD description 4digits 4digits amount boa_pattern = re.compile( r"^(\d{2}/\d{2})\s+\d{2}/\d{2}\s+(.+?)\s+\d{4}\s+\d{4}\s+([\d,]+\.\d{2})\s*$", re.MULTILINE, ) try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: pages_text = [page.extract_text() or "" for page in pdf.pages] except Exception: pages_text = [full_text] # fallback: treat whole doc as one page for page_text in pages_text: if _SUMMARY_PAGE_SIGNALS.search(page_text): continue # skip summary/overview pages for match in boa_pattern.finditer(page_text): date_str, desc, amt_str = match.groups() try: month, day = map(int, date_str.split("/")) # If transaction month is later in the year than the closing month, # it belongs to the prior year (e.g. Dec txn in a Jan-closing statement) year = closing_year - 1 if month > closing_month else closing_year date = datetime(year, month, day) except (ValueError, OverflowError): continue amt = _clean_amount(amt_str) merchant_raw = desc.strip() if amt is None or amt <= 0: continue if _looks_like_payment(merchant_raw, amt): continue s3_rows.append({ "date": date, "raw_merchant": merchant_raw, "merchant": normalize_merchant(merchant_raw), "amount": amt, "source_file": filename, }) # Strategy 3 wins if it found more transactions than earlier strategies # (BofA PDFs always have many transactions; Strategy 2 false positives are few) if len(s3_rows) > len(rows): rows = s3_rows return pd.DataFrame(rows) if rows else pd.DataFrame() def _parse_docx(file_bytes: bytes, filename: str) -> pd.DataFrame: """Parse DOCX — extract text then apply regex like PDF fallback.""" import docx2txt import tempfile, os with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp: tmp.write(file_bytes) tmp_path = tmp.name try: text = docx2txt.process(tmp_path) except Exception: return pd.DataFrame() finally: os.unlink(tmp_path) rows = [] pattern = re.compile( r"(\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4})\s+" r"([A-Za-z][^\d\n]{3,50?}?)\s+" r"\$?([\d,]+\.\d{2})" ) for match in pattern.finditer(text): date_str, desc, amt_str = match.groups() date = _parse_date(date_str) amt = _clean_amount(amt_str) merchant_raw = desc.strip() if date is None or amt is None or amt <= 0: continue if _looks_like_payment(merchant_raw, amt): continue rows.append({ "date": date, "raw_merchant": merchant_raw, "merchant": normalize_merchant(merchant_raw), "amount": amt, "source_file": filename, }) return pd.DataFrame(rows) if rows else pd.DataFrame() # ───────────────────────────────────────────────────────────────────────────── # Public entry point # ───────────────────────────────────────────────────────────────────────────── def parse_uploaded_file(uploaded_file) -> pd.DataFrame: """ Accept a Streamlit UploadedFile and return a normalized DataFrame. Returns empty DataFrame on failure. """ filename = uploaded_file.name file_bytes = uploaded_file.read() ext = filename.lower().split(".")[-1] if ext == "csv": df = _parse_csv(file_bytes, filename) elif ext in ("xls", "xlsx"): df = _parse_excel(file_bytes, filename) elif ext == "pdf": df = _parse_pdf(file_bytes, filename) elif ext == "docx": df = _parse_docx(file_bytes, filename) else: return pd.DataFrame() if df.empty: return df # Enforce schema and types df = df[["date", "merchant", "raw_merchant", "amount", "source_file"]].copy() df["date"] = pd.to_datetime(df["date"]) df["amount"] = pd.to_numeric(df["amount"], errors="coerce") df = df.dropna(subset=["date", "amount"]) df = df[df["amount"] > 0] df = df.sort_values("date").reset_index(drop=True) return df def combine_files(uploaded_files) -> tuple[pd.DataFrame, list[str]]: """ Parse and combine multiple uploaded files. Returns (combined_df, list_of_warnings). """ frames = [] warnings = [] for f in uploaded_files: df = parse_uploaded_file(f) if df.empty: warnings.append(f"⚠️ Could not extract transactions from **{f.name}**. " "Check that it's a valid statement export.") else: frames.append(df) if not frames: return pd.DataFrame(), warnings combined = pd.concat(frames, ignore_index=True) # Deduplicate: same date + merchant + amount within 1 day combined = combined.drop_duplicates( subset=["date", "merchant", "amount"], keep="first" ) combined = combined.sort_values("date").reset_index(drop=True) # Check for month gaps if not combined.empty: months = pd.period_range( start=combined["date"].min().to_period("M"), end=combined["date"].max().to_period("M"), freq="M", ) covered = set(combined["date"].dt.to_period("M").unique()) missing = [str(m) for m in months if m not in covered] if missing: warnings.append( f"📅 Possible gaps detected — no transactions found for: {', '.join(missing)}. " "Upload missing statements for more accurate analysis." ) return combined, warnings def extract_raw_text(file_bytes: bytes, filename: str) -> str: """Return extracted plain text for debugging purposes for common file types. Returns an empty string on failure. """ ext = filename.lower().split(".")[-1] try: if ext == "pdf": import pdfplumber texts = [] with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: for page in pdf.pages: texts.append(page.extract_text() or "") return "\n\n--- PAGE BREAK ---\n\n".join(texts) if ext in ("docx",): import docx2txt import tempfile, os with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp: tmp.write(file_bytes) tmp_path = tmp.name try: text = docx2txt.process(tmp_path) or "" except Exception: text = "" finally: try: os.unlink(tmp_path) except Exception: pass return text if ext == "csv": try: return file_bytes.decode("utf-8", errors="replace") except Exception: return "" if ext in ("xls", "xlsx"): try: xl = pd.ExcelFile(io.BytesIO(file_bytes)) parts = [] for sheet in xl.sheet_names: df = xl.parse(sheet, dtype=str) parts.append(f"-- Sheet: {sheet} --\n" + df.to_csv(index=False)) return "\n\n".join(parts) except Exception: return "" except Exception: return "" return ""