Bank-Scrubber / src /extractor /table_extractor.py
Aryan Jain
bank scrubber streamlit application
4e71548
import asyncio
import re
import pandas as pd
from typing import List, Dict, Any, Optional, Tuple
from src.config.config import settings
class TableExtractor:
"""Async table extractor for processing transaction tables."""
def __init__(self):
self.date_pattern = re.compile(
r"\b(?:"
r"\d{1,2}[-/]\d{1,2}[-/]\d{2,4}"
r"|\d{2,4}[-/]\d{1,2}[-/]\d{1,2}"
r"|\d{1,2}[-/]\d{2,4}"
r"|\d{2,4}[-/]\d{1,2}"
r"|\d{1,2}[-/]\d{1,2}"
r")\b"
)
self.amount_pattern = re.compile(r'-?(?:\d{1,3}(?:,\d{2}){1,}(?:,\d{3})?|\d{1,3}(?:,\d{3})+|\d+)?\.\d{1,2}-?')
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def match_by_pattern(self, text: str, pattern) -> bool:
"""Check if text matches a pattern."""
if pattern == self.amount_pattern and "-" not in text and len(text) > 6 and "," not in text:
return False
if pattern == self.amount_pattern and "-" in text and len(text) > 7 and "," not in text:
return False
return bool(pattern.fullmatch(text))
def extract_by_pattern(self, text: str, pattern) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Extract value by pattern and return value, before, after."""
match = pattern.search(text)
if match:
before = text[:match.start()].strip()
value = match.group()
after = text[match.end():].strip()
if pattern == self.amount_pattern and "-" not in value and len(value) > 6 and "," not in value:
return None, None, None
if pattern == self.amount_pattern and "-" in value and len(value) > 7 and "," not in value:
return None, None, None
return value, before, after
return None, None, None
def repair_row_with_date_and_amount(self, header: List[str], row: List[str]) -> List[str]:
"""Repair row data by extracting dates and amounts."""
result = row[:]
n = len(header)
for i, col in enumerate(header):
val = result[i].strip()
if col.lower() == "date":
date, left, right = self.extract_by_pattern(val, self.date_pattern)
if date:
result[i] = date
if left and i > 0 and header[i-1] != "date":
result[i-1] = (result[i-1] + " " + left).strip()
if right and i < n - 1 and header[i+1] != "date":
result[i+1] = (right + " " + result[i+1]).strip()
continue
# Check previous column's last word
if i > 0 and header[i-1] != "date":
left_val = result[i-1].strip()
tokens = left_val.split()
if tokens:
last_word = tokens[-1]
date_check, _, _ = self.extract_by_pattern(last_word, self.date_pattern)
if date_check:
result[i] = date_check + " " + result[i]
tokens.pop() # remove matched date
result[i-1] = " ".join(tokens)
again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern)
if again_date:
result[i] = again_date
if again_left:
result[i-1] = (result[i-1] + " " + again_left).strip()
if again_right:
result[i+1] = (again_right + " " + result[i+1]).strip()
continue
# Check next column's first word
if i < n - 1 and header[i+1] != "date":
right_val = result[i+1].strip()
tokens = right_val.split()
if tokens:
first_word = tokens[0]
date_check, _, _ = self.extract_by_pattern(first_word, self.date_pattern)
if date_check:
result[i] = result[i] + " " + date_check
tokens.pop(0)
result[i+1] = " ".join(tokens)
again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern)
if again_date:
result[i] = again_date
if again_left:
result[i-1] = (result[i-1] + " " + again_left).strip()
if again_right:
result[i+1] = (again_right + " " + result[i+1]).strip()
continue
# Check if the entire value is a date
if not self.match_by_pattern(result[i].strip(), self.date_pattern):
result[i] = ""
# check left
if i > 0 and header[i-1] != "date":
result[i-1] = (result[i-1] + " " + val).strip()
elif i < n - 1 and header[i+1] != "date":
result[i+1] = (val + " " + result[i+1]).strip()
elif col.lower() in ["amount", "balance", "credits", "debits"]:
amt, left, right = self.extract_by_pattern(val, self.amount_pattern)
if amt:
result[i] = amt
if left and i > 0:
result[i-1] = (result[i-1] + " " + left).strip()
if right and i < n - 1:
result[i+1] = (right + " " + result[i+1]).strip()
continue
# Check previous column's last word
if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]):
left_val = result[i-1].strip()
tokens = left_val.split()
if tokens:
last_word = tokens[-1]
amt_check, _, _ = self.extract_by_pattern(last_word, self.amount_pattern)
if amt_check:
result[i] = amt_check + " " + result[i]
tokens.pop()
result[i-1] = " ".join(tokens)
again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern)
if again_amt:
result[i] = again_amt
if again_left:
result[i-1] = (result[i-1] + " " + again_left).strip()
if again_right:
result[i+1] = (again_right + " " + result[i+1]).strip()
continue
# Check next column's first word
if i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]):
right_val = result[i+1].strip()
tokens = right_val.split()
if tokens:
first_word = tokens[0]
amt_check, _, _ = self.extract_by_pattern(first_word, self.amount_pattern)
if amt_check:
result[i] = result[i] + " " + amt_check
tokens.pop(0)
result[i+1] = " ".join(tokens)
again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern)
if again_amt:
result[i] = again_amt
if again_left:
result[i-1] = (result[i-1] + " " + again_left).strip()
if again_right:
result[i+1] = (again_right + " " + result[i+1]).strip()
continue
# Check if the entire value is an amount
if not self.match_by_pattern(result[i].strip(), self.amount_pattern):
result[i] = ""
# check left
if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]):
result[i-1] = (result[i-1] + " " + val).strip()
elif i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]):
result[i+1] = (val + " " + result[i+1]).strip()
return result
def extract_amount_or_return(self, line: str) -> str:
"""Extract amount from line or return original line."""
matches = self.amount_pattern.findall(line)
if matches:
match = self.amount_pattern.search(line)
return match.group(0) if match else line
return line
def extract_date_or_return(self, line: str) -> str:
"""Extract date from line or return original line."""
matches = self.date_pattern.findall(line)
if matches:
match = self.date_pattern.search(line)
return match.group(0) if match else line
return line
def is_date_word(self, word: str) -> bool:
"""Check if word is a date."""
try:
return bool(self.date_pattern.fullmatch(word))
except ValueError:
return False
def detect_headers(self, line_data: Dict, gap_threshold_ratio: float = 0.1) -> List[str]:
"""Detect headers from line data."""
if "description" not in line_data["line"]:
gap_threshold_ratio = 0.2
if "." in line_data["line"]:
gap_threshold_ratio = 0.1
word_data = sorted(line_data["words"], key=lambda w: w["bbox"][0])
line = line_data["line"]
if len(word_data) < 2:
return [line.strip()] # Treat whole line as one header if only 1 word
# Compute horizontal gaps between words
gaps = []
for i in range(len(word_data) - 1):
x1 = word_data[i]["bbox"][2] # end x of current word
x2 = word_data[i + 1]["bbox"][0] # start x of next word
gaps.append(x2 - x1)
avg_gap = sum(gaps) / len(gaps)
threshold = avg_gap * gap_threshold_ratio
# Split words into groups based on large gaps (assumed column breaks)
headers = []
current_header = [word_data[0]["word"]]
for i in range(1, len(word_data)):
gap = gaps[i - 1]
if gap > threshold:
headers.append(" ".join(current_header))
current_header = []
current_header.append(word_data[i]["word"])
if current_header:
headers.append(" ".join(current_header))
# Process special cases
for i in range(len(headers)):
if "date" in headers[i].lower() and "description" in headers[i].lower():
header_checker = headers[i].split(" ")
date_index = header_checker.index("date")
description_index = header_checker.index("description")
if date_index < description_index:
headers[i] = "date"
headers.insert(i + 1, "description")
else:
headers[i] = "description"
headers.insert(i + 1, "date")
# Handle check/draft numbers
if "check" in headers or "draft" in headers:
resulted_headers = []
i = 0
while i < len(headers):
if (
i + 1 < len(headers)
and headers[i] == "check"
and (headers[i + 1] == "no" or headers[i + 1] == "number")
):
resulted_headers.append(headers[i] + " " + headers[i + 1])
i += 2
elif (
i + 1 < len(headers)
and headers[i] == "draft"
and (headers[i + 1] == "no" or headers[i + 1] == "number")
):
resulted_headers.append(headers[i] + " " + headers[i + 1])
i += 2
else:
resulted_headers.append(headers[i])
i += 1
resulted_headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), resulted_headers))
# Normalize header names
for i in range(len(resulted_headers)):
if any(keyword in resulted_headers[i].lower() for keyword in ["date", "day", "month", "year"]):
resulted_headers[i] = "date"
if any(keyword in resulted_headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]):
resulted_headers[i] = "amount"
if any(keyword in resulted_headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]):
resulted_headers[i] = "balance"
if any(keyword in resulted_headers[i].lower() for keyword in ["credit", "deposit", "cr"]):
resulted_headers[i] = "credits"
if any(keyword in resulted_headers[i].lower() for keyword in ["debit", "withdrawal", "dr"]):
resulted_headers[i] = "debits"
return resulted_headers
# Normalize header names
headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), headers))
for i in range(len(headers)):
if any(keyword in headers[i].lower() for keyword in ["date", "day", "month", "year"]):
headers[i] = "date"
if any(keyword in headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]):
headers[i] = "amount"
if any(keyword in headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]):
headers[i] = "balance"
if any(keyword in headers[i].lower() for keyword in ["credit", "deposit"]):
headers[i] = "credits"
if any(keyword in headers[i].lower() for keyword in ["debit", "withdrawal"]):
headers[i] = "debits"
return headers
def detect_row_data(self, headers: List[str], header_data: List[Dict], row_data: List[Dict], gap_threshold: int = 10) -> List[str]:
"""Detect row data based on headers and word positions."""
if "description" not in headers:
gap_threshold = 5
def flatten_bbox(bbox):
if isinstance(bbox[0], list): # [[x0, y0], [x1, y1]]
return [bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1]]
return bbox
# Step 1: Get all x0, x1 for header words
header_ranges = []
for word in header_data:
x0, _, x1, _ = flatten_bbox(word["bbox"])
header_ranges.append((x0, x1))
# Step 2: Sort by x0
header_ranges.sort(key=lambda x: x[0])
# Step 3: Merge only close headers (preserve wide gaps)
merged_ranges = []
temp_x0, temp_x1 = header_ranges[0]
for x0, x1 in header_ranges[1:]:
gap = x0 - temp_x1
if gap < gap_threshold:
temp_x1 = max(temp_x1, x1)
else:
merged_ranges.append((temp_x0, temp_x1))
temp_x0, temp_x1 = x0, x1
merged_ranges.append((temp_x0, temp_x1))
# Step 4: Segment row_data based on horizontal gaps
row_data_sorted = sorted(row_data, key=lambda w: flatten_bbox(w["bbox"])[0])
segments = []
current_segment = [row_data_sorted[0]]
for i in range(1, len(row_data_sorted)):
prev_x1 = flatten_bbox(row_data_sorted[i - 1]["bbox"])[2]
curr_x0 = flatten_bbox(row_data_sorted[i]["bbox"])[0]
if curr_x0 - prev_x1 > gap_threshold:
segments.append(current_segment)
current_segment = [row_data_sorted[i]]
else:
current_segment.append(row_data_sorted[i])
if current_segment:
segments.append(current_segment)
# Step 5: Assign each segment to a column
row_values = [""] * len(headers)
for segment in segments:
seg_x0 = flatten_bbox(segment[0]["bbox"])[0]
seg_x1 = flatten_bbox(segment[-1]["bbox"])[2]
seg_center = (seg_x0 + seg_x1) / 2
seg_text = " ".join([w["word"] for w in segment])
assigned = False
for idx, (hx0, hx1) in enumerate(merged_ranges):
if hx0 <= seg_center <= hx1:
row_values[idx] += seg_text + " "
assigned = True
break
if not assigned:
# Optionally assign to nearest column if center is outside range
nearest_idx = min(
range(len(merged_ranges)),
key=lambda idx: abs(
(merged_ranges[idx][0] + merged_ranges[idx][1]) / 2 - seg_center
),
)
row_values[nearest_idx] += seg_text + " "
final_row = self.repair_row_with_date_and_amount(headers, row_values)
return [val.strip() for val in final_row]
def check_table_tags(self, line: str, headers: List[str]) -> str:
"""Check and return table tag based on line content and headers."""
available_tags = ["transaction", "deposit", "withdrawal", "checks", "daily balance", "drafts", "service fee", "interest"]
tag = ""
if "deposit" in line.lower() or "credit" in line.lower():
tag = "deposit"
elif "withdrawal" in line.lower() or "debit" in line.lower():
tag = "withdrawal"
elif "checks" in line.lower():
tag = "checks"
elif "drafts" in line.lower():
tag = "drafts"
elif "service fee" in line.lower() or "fee" in line.lower():
tag = "service fee"
elif "daily balance" in line.lower() or "balance" in line.lower():
tag = "daily balance"
elif "interest" in line.lower():
tag = "interest"
elif "transaction" in line.lower() or "transfer" in line.lower():
tag = "transaction"
if "credits" in headers or "debits" in headers:
tag = "transaction"
for h in headers:
if "check" in h.lower():
tag = "checks"
break
for h in headers:
if "draft" in h.lower():
tag = "drafts"
break
return tag
async def process_transaction_tables_with_bbox(self, extracted_text_list: List[List[Dict]]) -> Tuple[List[pd.DataFrame], List[str]]:
"""Process transaction tables with bounding box data."""
def _process_tables():
all_tables = []
table_tags = []
for block in extracted_text_list:
headers = []
table_started = False
current_table = []
current_row = {}
header_words = []
for line_idx, line_bbox in enumerate(block):
line = line_bbox["line"]
line = line.strip()
if not table_started and ("date" in line and "description" in line):
headers = self.detect_headers(line_bbox)
header_words = line_bbox["words"]
date_flag = False
description_flag = False
for header in headers:
if "date" in header.lower():
date_flag = True
if "description" in header.lower():
description_flag = True
if date_flag and description_flag:
table_started = True
current_row = {header: [] for header in headers}
else:
continue
if line_idx - 1 >= 0:
prev_line = block[line_idx - 1]["line"]
tag = self.check_table_tags(prev_line, headers)
if tag:
table_tags.append(tag)
elif len(table_tags) > 0:
table_tags.append(table_tags[-1])
else:
table_tags.append("transaction")
continue
elif (not table_started and ("date" in line and "amount" in line)) or (
not table_started and ("date" in line and "balance" in line)
):
headers = self.detect_headers(line_bbox)
header_words = line_bbox["words"]
date_flag = False
amount_flag = False
balance_flag = False
for header in headers:
if "date" in header.lower():
date_flag = True
if "amount" in header.lower():
amount_flag = True
if "balance" in header.lower():
balance_flag = True
if date_flag and (amount_flag or balance_flag):
table_started = True
current_row = {header: [] for header in headers}
else:
continue
if line_idx - 1 >= 0:
prev_line = block[line_idx - 1]["line"]
tag = self.check_table_tags(prev_line, headers)
if tag:
table_tags.append(tag)
elif len(table_tags) > 0:
table_tags.append(table_tags[-1])
else:
table_tags.append("transaction")
continue
if table_started and ("date" in line and "description" in line):
max_len = max(len(v) for v in current_row.values())
for i in range(max_len):
row_map = {}
for key in current_row:
row_map[key] = (
current_row[key][i] if i < len(current_row[key]) else ""
)
current_table.append(row_map)
df = pd.DataFrame(current_table)
all_tables.append(df)
current_table = []
headers = self.detect_headers(line_bbox)
header_words = line_bbox["words"]
date_flag = False
description_flag = False
for header in headers:
if "date" in header.lower():
date_flag = True
if "description" in header.lower():
description_flag = True
if date_flag and description_flag:
current_row = {header: [] for header in headers}
else:
continue
if line_idx - 1 >= 0:
prev_line = block[line_idx - 1]["line"]
tag = self.check_table_tags(prev_line, headers)
if tag:
table_tags.append(tag)
elif len(table_tags) > 0:
table_tags.append(table_tags[-1])
else:
table_tags.append("transaction")
continue
elif (table_started and ("date" in line and "amount" in line)) or (
table_started and ("date" in line and "balance" in line)
):
max_len = max(len(v) for v in current_row.values())
for i in range(max_len):
row_map = {}
for key in current_row:
row_map[key] = (
current_row[key][i] if i < len(current_row[key]) else ""
)
current_table.append(row_map)
df = pd.DataFrame(current_table)
all_tables.append(df)
current_table = []
headers = self.detect_headers(line_bbox)
header_words = line_bbox["words"]
date_flag = False
amount_flag = False
balance_flag = False
for header in headers:
if "date" in header.lower():
date_flag = True
if "amount" in header.lower():
amount_flag = True
if "balance" in header.lower():
balance_flag = True
if date_flag and (amount_flag or balance_flag):
current_row = {header: [] for header in headers}
else:
continue
if line_idx - 1 >= 0:
prev_line = block[line_idx - 1]["line"]
tag = self.check_table_tags(prev_line, headers)
if tag:
table_tags.append(tag)
elif len(table_tags) > 0:
table_tags.append(table_tags[-1])
else:
table_tags.append("transaction")
continue
if table_started:
parts = self.detect_row_data(headers, header_words, line_bbox["words"])
for key, value in zip(headers, parts):
current_row[key].append(value)
max_len = max(len(v) for v in current_row.values())
for i in range(max_len):
if (
"amount" in headers
and current_row["amount"]
and i < len(current_row["amount"])
and current_row["amount"][i]
):
amount = self.extract_amount_or_return(current_row["amount"][i])
current_row["amount"][i] = amount
if (
"balance" in headers
and current_row["balance"]
and i < len(current_row["balance"])
and current_row["balance"][i]
):
amount = self.extract_amount_or_return(current_row["balance"][i])
current_row["balance"][i] = amount
if (
"credits" in headers
and current_row["credits"]
and i < len(current_row["credits"])
and current_row["credits"][i]
):
amount = self.extract_amount_or_return(current_row["credits"][i])
current_row["credits"][i] = amount
if (
"debits" in headers
and current_row["debits"]
and i < len(current_row["debits"])
and current_row["debits"][i]
):
amount = self.extract_amount_or_return(current_row["debits"][i])
current_row["debits"][i] = amount
if (
"date" in headers
and current_row["date"]
and i < len(current_row["date"])
and current_row["date"][i]
):
current_row["date"][i] = self.extract_date_or_return(
current_row["date"][i]
)
if (
"date" in headers
and current_row["date"]
and current_row["date"][0]
and not self.is_date_word(current_row["date"][0])
or (
"amount" in headers
and current_row["amount"][0]
and not self.amount_pattern.match(current_row["amount"][0])
)
or (
"balance" in headers
and current_row["balance"][0]
and not self.amount_pattern.match(current_row["balance"][0])
)
or (
"credits" in headers
and current_row["credits"][0]
and not self.amount_pattern.match(current_row["credits"][0])
)
or (
"debits" in headers
and current_row["debits"][0]
and not self.amount_pattern.match(current_row["debits"][0])
)
):
if not current_table and len(table_tags) > 0 and table_tags[-1]:
table_tags.pop()
all_tables.append(pd.DataFrame(current_table))
current_table = []
current_row = {}
header_words = []
headers = []
table_started = False
else:
for i in range(max_len):
row_map = {}
for key in current_row:
row_map[key] = (
current_row[key][i] if i < len(current_row[key]) else ""
)
current_table.append(row_map)
current_row = {header: [] for header in headers}
table_started = False
if current_table:
df = pd.DataFrame(current_table)
all_tables.append(df)
return all_tables, table_tags
return await asyncio.get_event_loop().run_in_executor(None, _process_tables)
async def process_tables(self, table: pd.DataFrame) -> pd.DataFrame:
"""Process the extracted table to clean and format it."""
def _process_table():
keywords = ["continue", "continued", "page", "next page", "total", "subtotal"]
table_copy = table.copy()
is_balance_column = "balance" in table_copy.columns
is_amount_column = "amount" in table_copy.columns
is_credits_column = "credits" in table_copy.columns
is_debits_column = "debits" in table_copy.columns
for idx, row in table_copy.iterrows():
if is_balance_column:
if row["balance"] and not row["date"]:
table_copy.loc[idx] = [""] * len(table_copy.columns)
continue
if is_amount_column:
if row["amount"] and not row["date"]:
table_copy.loc[idx] = [""] * len(table_copy.columns)
continue
if is_credits_column:
if row["credits"] and not row["date"]:
table_copy.loc[idx] = [""] * len(table_copy.columns)
continue
if is_debits_column:
if row["debits"] and not row["date"]:
table_copy.loc[idx] = [""] * len(table_copy.columns)
continue
for cell in row:
if any(keyword in cell.lower() for keyword in keywords):
table_copy.loc[idx] = [""] * len(table_copy.columns)
break
df = table_copy.copy()
df = df.fillna("") # Fill NaNs with empty string for easier processing
# Step 1: Identify key columns (case-insensitive match)
lower_cols = [col.lower() for col in df.columns]
date_col = next((col for col in df.columns if re.search(r'date', col, re.IGNORECASE)), None)
value_cols = [col for col in df.columns if re.search(r'amount|balance|credits|debits', col, re.IGNORECASE)]
if not date_col or not value_cols:
return df
def is_anchor(row):
return bool(row[date_col].strip()) and any(row[col].strip() for col in value_cols)
# Step 2: Loop over rows and identify anchor indices
anchor_indices = [i for i, row in df.iterrows() if is_anchor(row)]
for anchor_idx in anchor_indices:
# Merge upward
i = anchor_idx - 1
while i >= 0:
if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""):
break
for col in df.columns:
if col != date_col and col not in value_cols:
df.at[anchor_idx, col] = (str(df.at[i, col]).strip() + " " + str(df.at[anchor_idx, col]).strip()).strip()
df.iloc[i] = "" # Blank the merged row
i -= 1
# Merge downward
i = anchor_idx + 1
while i < len(df):
if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""):
break
for col in df.columns:
if col != date_col and col not in value_cols:
df.at[anchor_idx, col] = (str(df.at[anchor_idx, col]).strip() + " " + str(df.at[i, col]).strip()).strip()
df.iloc[i] = "" # Blank the merged row
i += 1
df_copy = df.copy()
col = "balance" if "balance" in df_copy.columns else "amount"
for idx, row in df_copy.iterrows():
if not row[col] and not row[date_col]:
df_copy.loc[idx] = [""] * len(df_copy.columns)
df_copy = df_copy[~df_copy.apply(lambda row: all(cell == "" for cell in row), axis=1)].reset_index(drop=True)
return df_copy
return await asyncio.get_event_loop().run_in_executor(None, _process_table)