| |
| |
| |
| |
|
|
| import io |
| import re |
| import pandas as pd |
| from datetime import datetime |
| from typing import Optional |
| from merchant_map import normalize_merchant |
|
|
|
|
| |
| |
| |
|
|
| def _clean_amount(val) -> Optional[float]: |
| """Convert various amount formats to a positive float charge, or None.""" |
| if val is None: |
| return None |
| s = str(val).strip().replace(",", "").replace("$", "").replace(" ", "") |
| |
| negative = False |
| if s.startswith("(") and s.endswith(")"): |
| s = s[1:-1] |
| negative = True |
| try: |
| amt = float(s) |
| except ValueError: |
| return None |
| |
| |
| return abs(amt) if not negative else abs(amt) |
|
|
|
|
| def _looks_like_payment(merchant: str, amount: float, credit_flag=False) -> bool: |
| """Heuristic: is this row a payment/credit rather than a purchase?""" |
| if credit_flag: |
| return True |
| m = merchant.lower() |
| payment_keywords = [ |
| "payment", "thank you", "autopay", "credit", "refund", |
| "return", "adjustment", "reward", "cashback", "cash back", |
| "transfer", "deposit", "interest charge", "fee waiver", |
| ] |
| return any(kw in m for kw in payment_keywords) |
|
|
|
|
| def _parse_date(val) -> Optional[datetime]: |
| """Try multiple date formats.""" |
| if isinstance(val, datetime): |
| return val |
| if isinstance(val, pd.Timestamp): |
| return val.to_pydatetime() |
| s = str(val).strip() |
| formats = [ |
| "%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d-%b-%Y", |
| "%b %d, %Y", "%B %d, %Y", "%d/%m/%Y", "%m-%d-%Y", |
| "%Y%m%d", |
| ] |
| for fmt in formats: |
| try: |
| return datetime.strptime(s, fmt) |
| except ValueError: |
| continue |
| return None |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| _SUMMARY_PAGE_SIGNALS = re.compile( |
| r"(account\s+summary|statement\s+summary|previous\s+balance" |
| r"|new\s+balance\s+total|credit\s+limit|minimum\s+payment\s+due" |
| r"|opening/closing\s+date|payment\s+information" |
| r"|total\s+credit\s+line|statement\s+closing\s+date)", |
| re.IGNORECASE, |
| ) |
|
|
| |
| |
| _FAKE_MERCHANT_SIGNALS = re.compile( |
| r"^(new balance|previous balance|minimum payment|payment due" |
| r"|total credit|interest charge|fees charged|purchases and adj" |
| r"|payments and other|statement closing|days in billing)", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _parse_csv(file_bytes: bytes, filename: str) -> pd.DataFrame: |
| """Parse CSV bank exports. Handles many column name variants.""" |
| try: |
| df = pd.read_csv(io.BytesIO(file_bytes), dtype=str, on_bad_lines="skip") |
| except Exception: |
| df = pd.read_csv(io.BytesIO(file_bytes), dtype=str, error_bad_lines=False) |
|
|
| df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] |
|
|
| |
| date_candidates = ["date", "transaction_date", "trans_date", "post_date", |
| "posted_date", "activity_date", "transaction date"] |
| date_col = next((c for c in date_candidates if c in df.columns), None) |
| if not date_col: |
| date_col = next((c for c in df.columns if "date" in c), None) |
|
|
| |
| desc_candidates = ["description", "merchant", "payee", "name", "merchant_name", |
| "transaction_description", "memo", "details", "narrative"] |
| desc_col = next((c for c in desc_candidates if c in df.columns), None) |
| if not desc_col: |
| desc_col = next((c for c in df.columns if any(k in c for k in ["desc", "merch", "payee", "name"])), None) |
|
|
| |
| amt_candidates = ["amount", "debit", "charge", "transaction_amount", |
| "debit_amount", "withdrawal", "charged_amount"] |
| amt_col = next((c for c in amt_candidates if c in df.columns), None) |
| if not amt_col: |
| amt_col = next((c for c in df.columns if "amount" in c or "debit" in c), None) |
|
|
| |
| credit_col = next((c for c in df.columns if "credit" in c), None) |
|
|
| if not all([date_col, desc_col, amt_col]): |
| return pd.DataFrame() |
|
|
| rows = [] |
| for _, row in df.iterrows(): |
| date = _parse_date(row.get(date_col, "")) |
| merchant_raw = str(row.get(desc_col, "")).strip() |
| amt = _clean_amount(row.get(amt_col, "")) |
| is_credit = credit_col and str(row.get(credit_col, "")).strip() not in ("", "0", "0.00", "nan") |
|
|
| if date is None or amt is None or amt <= 0: |
| continue |
| if _looks_like_payment(merchant_raw, amt, is_credit): |
| continue |
|
|
| rows.append({ |
| "date": date, |
| "raw_merchant": merchant_raw, |
| "merchant": normalize_merchant(merchant_raw), |
| "amount": amt, |
| "source_file": filename, |
| }) |
|
|
| return pd.DataFrame(rows) |
|
|
|
|
| def _parse_excel(file_bytes: bytes, filename: str) -> pd.DataFrame: |
| """Parse XLS/XLSX exports β tries each sheet.""" |
| frames = [] |
| try: |
| xl = pd.ExcelFile(io.BytesIO(file_bytes)) |
| for sheet in xl.sheet_names: |
| try: |
| df = xl.parse(sheet, dtype=str) |
| df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns] |
| |
| csv_bytes = df.to_csv(index=False).encode() |
| parsed = _parse_csv(csv_bytes, filename) |
| if not parsed.empty: |
| frames.append(parsed) |
| except Exception: |
| continue |
| except Exception: |
| pass |
| return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() |
|
|
|
|
| def _parse_pdf(file_bytes: bytes, filename: str) -> pd.DataFrame: |
| """ |
| Parse PDF credit card statements. |
| Strategy 1: pdfplumber table extraction (structured) |
| Strategy 2: raw text full-date regex (MM/DD/YYYY style) |
| Strategy 3: two-date MM/DD format without year (Bank of America, etc.) |
| """ |
| import pdfplumber |
|
|
| rows = [] |
| full_text = "" |
|
|
| |
| try: |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: |
| full_text = "\n".join(page.extract_text() or "" for page in pdf.pages) |
| except Exception: |
| pass |
|
|
| |
| try: |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: |
| for page in pdf.pages: |
| tables = page.extract_tables() |
| for table in tables: |
| if not table or len(table) < 2: |
| continue |
| |
| |
| headers = [ |
| re.sub(r'\s+', '_', str(h).strip().lower()) if h else "" |
| for h in table[0] |
| ] |
| for data_row in table[1:]: |
| if not data_row: |
| continue |
| row_dict = {headers[i]: str(data_row[i]).strip() if data_row[i] else "" |
| for i in range(min(len(headers), len(data_row)))} |
| |
| date_val = next((row_dict[k] for k in row_dict if "date" in k and row_dict[k]), None) |
| desc_val = next((row_dict[k] for k in row_dict |
| if any(x in k for x in ["desc", "merch", "payee", "name"]) and row_dict[k]), None) |
| amt_val = next((row_dict[k] for k in row_dict |
| if any(x in k for x in ["amount", "debit", "charge"]) and row_dict[k]), None) |
|
|
| if not amt_val: |
| |
| for k in reversed(list(row_dict.keys())): |
| cleaned = row_dict[k].replace(",", "").replace("$", "").replace("(", "").replace(")", "") |
| try: |
| float(cleaned) |
| amt_val = row_dict[k] |
| break |
| except ValueError: |
| continue |
|
|
| if not desc_val: |
| |
| vals = list(row_dict.values()) |
| desc_val = vals[1] if len(vals) > 1 else "" |
|
|
| date = _parse_date(date_val) if date_val else None |
| amt = _clean_amount(amt_val) if amt_val else None |
| merchant_raw = str(desc_val).strip() if desc_val else "" |
|
|
| if date is None or amt is None or amt <= 0 or not merchant_raw: |
| continue |
| if _looks_like_payment(merchant_raw, amt): |
| continue |
|
|
| rows.append({ |
| "date": date, |
| "raw_merchant": merchant_raw, |
| "merchant": normalize_merchant(merchant_raw), |
| "amount": amt, |
| "source_file": filename, |
| }) |
| except Exception: |
| pass |
|
|
| |
| |
| if not rows and full_text: |
| pattern = re.compile( |
| r"(\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4})\s+" |
| r"([A-Za-z][^\d\n]{3,50})\s+" |
| r"\$?([\d,]+\.\d{2})" |
| ) |
| try: |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: |
| pages_text_s2 = [page.extract_text() or "" for page in pdf.pages] |
| except Exception: |
| pages_text_s2 = [full_text] |
|
|
| for page_text in pages_text_s2: |
| |
| |
| if _SUMMARY_PAGE_SIGNALS.search(page_text): |
| continue |
|
|
| for match in pattern.finditer(page_text): |
| date_str, desc, amt_str = match.groups() |
| date = _parse_date(date_str) |
| amt = _clean_amount(amt_str) |
| merchant_raw = desc.strip() |
|
|
| if date is None or amt is None or amt <= 0: |
| continue |
| if _looks_like_payment(merchant_raw, amt): |
| continue |
| |
| if _FAKE_MERCHANT_SIGNALS.search(merchant_raw): |
| continue |
|
|
| rows.append({ |
| "date": date, |
| "raw_merchant": merchant_raw, |
| "merchant": normalize_merchant(merchant_raw), |
| "amount": amt, |
| "source_file": filename, |
| }) |
|
|
| |
| |
| |
| |
| |
| s3_rows = [] |
| if full_text: |
| |
| closing_year = datetime.now().year |
| closing_month = datetime.now().month |
| period_match = re.search( |
| r'\w+\s+\d{1,2}\s*[-\u2013]\s*(\w+)\s+\d{1,2}[,\s]+(\d{4})', |
| full_text, |
| ) |
| if period_match: |
| try: |
| closing_month = datetime.strptime(period_match.group(1), "%B").month |
| closing_year = int(period_match.group(2)) |
| except ValueError: |
| pass |
|
|
| |
| boa_pattern = re.compile( |
| r"^(\d{2}/\d{2})\s+\d{2}/\d{2}\s+(.+?)\s+\d{4}\s+\d{4}\s+([\d,]+\.\d{2})\s*$", |
| re.MULTILINE, |
| ) |
|
|
| try: |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: |
| pages_text = [page.extract_text() or "" for page in pdf.pages] |
| except Exception: |
| pages_text = [full_text] |
|
|
| for page_text in pages_text: |
| if _SUMMARY_PAGE_SIGNALS.search(page_text): |
| continue |
|
|
| for match in boa_pattern.finditer(page_text): |
| date_str, desc, amt_str = match.groups() |
| try: |
| month, day = map(int, date_str.split("/")) |
| |
| |
| year = closing_year - 1 if month > closing_month else closing_year |
| date = datetime(year, month, day) |
| except (ValueError, OverflowError): |
| continue |
|
|
| amt = _clean_amount(amt_str) |
| merchant_raw = desc.strip() |
|
|
| if amt is None or amt <= 0: |
| continue |
| if _looks_like_payment(merchant_raw, amt): |
| continue |
|
|
| s3_rows.append({ |
| "date": date, |
| "raw_merchant": merchant_raw, |
| "merchant": normalize_merchant(merchant_raw), |
| "amount": amt, |
| "source_file": filename, |
| }) |
|
|
| |
| |
| if len(s3_rows) > len(rows): |
| rows = s3_rows |
|
|
| return pd.DataFrame(rows) if rows else pd.DataFrame() |
|
|
|
|
| def _parse_docx(file_bytes: bytes, filename: str) -> pd.DataFrame: |
| """Parse DOCX β extract text then apply regex like PDF fallback.""" |
| import docx2txt |
| import tempfile, os |
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp: |
| tmp.write(file_bytes) |
| tmp_path = tmp.name |
|
|
| try: |
| text = docx2txt.process(tmp_path) |
| except Exception: |
| return pd.DataFrame() |
| finally: |
| os.unlink(tmp_path) |
|
|
| rows = [] |
| pattern = re.compile( |
| r"(\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4})\s+" |
| r"([A-Za-z][^\d\n]{3,50?}?)\s+" |
| r"\$?([\d,]+\.\d{2})" |
| ) |
| for match in pattern.finditer(text): |
| date_str, desc, amt_str = match.groups() |
| date = _parse_date(date_str) |
| amt = _clean_amount(amt_str) |
| merchant_raw = desc.strip() |
|
|
| if date is None or amt is None or amt <= 0: |
| continue |
| if _looks_like_payment(merchant_raw, amt): |
| continue |
|
|
| rows.append({ |
| "date": date, |
| "raw_merchant": merchant_raw, |
| "merchant": normalize_merchant(merchant_raw), |
| "amount": amt, |
| "source_file": filename, |
| }) |
|
|
| return pd.DataFrame(rows) if rows else pd.DataFrame() |
|
|
|
|
| |
| |
| |
|
|
| def parse_uploaded_file(uploaded_file) -> pd.DataFrame: |
| """ |
| Accept a Streamlit UploadedFile and return a normalized DataFrame. |
| Returns empty DataFrame on failure. |
| """ |
| filename = uploaded_file.name |
| file_bytes = uploaded_file.read() |
| ext = filename.lower().split(".")[-1] |
|
|
| if ext == "csv": |
| df = _parse_csv(file_bytes, filename) |
| elif ext in ("xls", "xlsx"): |
| df = _parse_excel(file_bytes, filename) |
| elif ext == "pdf": |
| df = _parse_pdf(file_bytes, filename) |
| elif ext == "docx": |
| df = _parse_docx(file_bytes, filename) |
| else: |
| return pd.DataFrame() |
|
|
| if df.empty: |
| return df |
|
|
| |
| df = df[["date", "merchant", "raw_merchant", "amount", "source_file"]].copy() |
| df["date"] = pd.to_datetime(df["date"]) |
| df["amount"] = pd.to_numeric(df["amount"], errors="coerce") |
| df = df.dropna(subset=["date", "amount"]) |
| df = df[df["amount"] > 0] |
| df = df.sort_values("date").reset_index(drop=True) |
| return df |
|
|
|
|
| def combine_files(uploaded_files) -> tuple[pd.DataFrame, list[str]]: |
| """ |
| Parse and combine multiple uploaded files. |
| Returns (combined_df, list_of_warnings). |
| """ |
| frames = [] |
| warnings = [] |
|
|
| for f in uploaded_files: |
| df = parse_uploaded_file(f) |
| if df.empty: |
| warnings.append(f"β οΈ Could not extract transactions from **{f.name}**. " |
| "Check that it's a valid statement export.") |
| else: |
| frames.append(df) |
|
|
| if not frames: |
| return pd.DataFrame(), warnings |
|
|
| combined = pd.concat(frames, ignore_index=True) |
|
|
| |
| combined = combined.drop_duplicates( |
| subset=["date", "merchant", "amount"], keep="first" |
| ) |
| combined = combined.sort_values("date").reset_index(drop=True) |
|
|
| |
| if not combined.empty: |
| months = pd.period_range( |
| start=combined["date"].min().to_period("M"), |
| end=combined["date"].max().to_period("M"), |
| freq="M", |
| ) |
| covered = set(combined["date"].dt.to_period("M").unique()) |
| missing = [str(m) for m in months if m not in covered] |
| if missing: |
| warnings.append( |
| f"π
Possible gaps detected β no transactions found for: {', '.join(missing)}. " |
| "Upload missing statements for more accurate analysis." |
| ) |
|
|
| return combined, warnings |
|
|
|
|
| def extract_raw_text(file_bytes: bytes, filename: str) -> str: |
| """Return extracted plain text for debugging purposes for common file types. |
| Returns an empty string on failure. |
| """ |
| ext = filename.lower().split(".")[-1] |
| try: |
| if ext == "pdf": |
| import pdfplumber |
| texts = [] |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: |
| for page in pdf.pages: |
| texts.append(page.extract_text() or "") |
| return "\n\n--- PAGE BREAK ---\n\n".join(texts) |
|
|
| if ext in ("docx",): |
| import docx2txt |
| import tempfile, os |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp: |
| tmp.write(file_bytes) |
| tmp_path = tmp.name |
| try: |
| text = docx2txt.process(tmp_path) or "" |
| except Exception: |
| text = "" |
| finally: |
| try: |
| os.unlink(tmp_path) |
| except Exception: |
| pass |
| return text |
|
|
| if ext == "csv": |
| try: |
| return file_bytes.decode("utf-8", errors="replace") |
| except Exception: |
| return "" |
|
|
| if ext in ("xls", "xlsx"): |
| try: |
| xl = pd.ExcelFile(io.BytesIO(file_bytes)) |
| parts = [] |
| for sheet in xl.sheet_names: |
| df = xl.parse(sheet, dtype=str) |
| parts.append(f"-- Sheet: {sheet} --\n" + df.to_csv(index=False)) |
| return "\n\n".join(parts) |
| except Exception: |
| return "" |
|
|
| except Exception: |
| return "" |
|
|
| return "" |
|
|