import asyncio import re import pandas as pd import numpy as np from typing import List, Dict, Any, Optional, Tuple from config.config import settings class TableExtractor: """Async table extractor for processing transaction tables.""" def __init__(self): self.date_pattern = re.compile( r"\b(?:" r"\d{1,2}[-/]\d{1,2}[-/]\d{2,4}" r"|\d{2,4}[-/]\d{1,2}[-/]\d{1,2}" r"|\d{1,2}[-/]\d{2,4}" r"|\d{2,4}[-/]\d{1,2}" r"|\d{1,2}[-/]\d{1,2}" r")\b" ) self.amount_pattern = re.compile(r'-?(?:\d{1,3}(?:,\d{2}){1,}(?:,\d{3})?|\d{1,3}(?:,\d{3})+|\d+)?\.\d{1,2}-?') async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass def match_by_pattern(self, text: str, pattern) -> bool: """Check if text matches a pattern.""" if pattern == self.amount_pattern and "-" not in text and len(text) > 6 and "," not in text: return False if pattern == self.amount_pattern and "-" in text and len(text) > 7 and "," not in text: return False return bool(pattern.fullmatch(text)) def extract_by_pattern(self, text: str, pattern) -> Tuple[Optional[str], Optional[str], Optional[str]]: """Extract value by pattern and return value, before, after.""" match = pattern.search(text) if match: before = text[:match.start()].strip() value = match.group() after = text[match.end():].strip() if pattern == self.amount_pattern and "-" not in value and len(value) > 6 and "," not in value: return None, None, None if pattern == self.amount_pattern and "-" in value and len(value) > 7 and "," not in value: return None, None, None return value, before, after return None, None, None def repair_row_with_date_and_amount(self, header: List[str], row: List[str]) -> List[str]: """Repair row data by extracting dates and amounts.""" result = row[:] n = len(header) for i, col in enumerate(header): val = result[i].strip() if col.lower() == "date": date, left, right = self.extract_by_pattern(val, self.date_pattern) if date: result[i] = date if left and i > 0 and header[i-1] != "date": result[i-1] = (result[i-1] + " " + left).strip() if right and i < n - 1 and header[i+1] != "date": result[i+1] = (right + " " + result[i+1]).strip() continue # Check previous column's last word if i > 0 and header[i-1] != "date": left_val = result[i-1].strip() tokens = left_val.split() if tokens: last_word = tokens[-1] date_check, _, _ = self.extract_by_pattern(last_word, self.date_pattern) if date_check: result[i] = date_check + " " + result[i] tokens.pop() # remove matched date result[i-1] = " ".join(tokens) again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern) if again_date: result[i] = again_date if again_left: result[i-1] = (result[i-1] + " " + again_left).strip() if again_right: result[i+1] = (again_right + " " + result[i+1]).strip() continue # Check next column's first word if i < n - 1 and header[i+1] != "date": right_val = result[i+1].strip() tokens = right_val.split() if tokens: first_word = tokens[0] date_check, _, _ = self.extract_by_pattern(first_word, self.date_pattern) if date_check: result[i] = result[i] + " " + date_check tokens.pop(0) result[i+1] = " ".join(tokens) again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern) if again_date: result[i] = again_date if again_left: result[i-1] = (result[i-1] + " " + again_left).strip() if again_right: result[i+1] = (again_right + " " + result[i+1]).strip() continue # Check if the entire value is a date if not self.match_by_pattern(result[i].strip(), self.date_pattern): result[i] = "" # check left if i > 0 and header[i-1] != "date": result[i-1] = (result[i-1] + " " + val).strip() elif i < n - 1 and header[i+1] != "date": result[i+1] = (val + " " + result[i+1]).strip() elif col.lower() in ["amount", "balance", "credits", "debits"]: amt, left, right = self.extract_by_pattern(val, self.amount_pattern) if amt: result[i] = amt if left and i > 0: result[i-1] = (result[i-1] + " " + left).strip() if right and i < n - 1: result[i+1] = (right + " " + result[i+1]).strip() continue # Check previous column's last word if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]): left_val = result[i-1].strip() tokens = left_val.split() if tokens: last_word = tokens[-1] amt_check, _, _ = self.extract_by_pattern(last_word, self.amount_pattern) if amt_check: result[i] = amt_check + " " + result[i] tokens.pop() result[i-1] = " ".join(tokens) again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern) if again_amt: result[i] = again_amt if again_left: result[i-1] = (result[i-1] + " " + again_left).strip() if again_right: result[i+1] = (again_right + " " + result[i+1]).strip() continue # Check next column's first word if i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]): right_val = result[i+1].strip() tokens = right_val.split() if tokens: first_word = tokens[0] amt_check, _, _ = self.extract_by_pattern(first_word, self.amount_pattern) if amt_check: result[i] = result[i] + " " + amt_check tokens.pop(0) result[i+1] = " ".join(tokens) again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern) if again_amt: result[i] = again_amt if again_left: result[i-1] = (result[i-1] + " " + again_left).strip() if again_right: result[i+1] = (again_right + " " + result[i+1]).strip() continue # Check if the entire value is an amount if not self.match_by_pattern(result[i].strip(), self.amount_pattern): result[i] = "" # check left if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]): result[i-1] = (result[i-1] + " " + val).strip() elif i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]): result[i+1] = (val + " " + result[i+1]).strip() return result def extract_amount_or_return(self, line: str) -> str: """Extract amount from line or return original line.""" matches = self.amount_pattern.findall(line) if matches: match = self.amount_pattern.search(line) return match.group(0) if match else line return line def extract_date_or_return(self, line: str) -> str: """Extract date from line or return original line.""" matches = self.date_pattern.findall(line) if matches: match = self.date_pattern.search(line) return match.group(0) if match else line return line def is_date_word(self, word: str) -> bool: """Check if word is a date.""" try: return bool(self.date_pattern.fullmatch(word)) except ValueError: return False def detect_headers(self, line_data: Dict, gap_threshold_ratio: float = 0.1) -> List[str]: """Detect headers from line data.""" if "description" not in line_data["line"]: gap_threshold_ratio = 0.2 if "." in line_data["line"]: gap_threshold_ratio = 0.1 word_data = sorted(line_data["words"], key=lambda w: w["bbox"][0]) line = line_data["line"] if len(word_data) < 2: return [line.strip()] # Treat whole line as one header if only 1 word # Compute horizontal gaps between words gaps = [] for i in range(len(word_data) - 1): x1 = word_data[i]["bbox"][2] # end x of current word x2 = word_data[i + 1]["bbox"][0] # start x of next word gaps.append(x2 - x1) avg_gap = sum(gaps) / len(gaps) threshold = avg_gap * gap_threshold_ratio # Split words into groups based on large gaps (assumed column breaks) headers = [] current_header = [word_data[0]["word"]] for i in range(1, len(word_data)): gap = gaps[i - 1] if gap > threshold: headers.append(" ".join(current_header)) current_header = [] current_header.append(word_data[i]["word"]) if current_header: headers.append(" ".join(current_header)) # Process special cases for i in range(len(headers)): if "date" in headers[i].lower() and "description" in headers[i].lower(): header_checker = headers[i].split(" ") date_index = header_checker.index("date") description_index = header_checker.index("description") if date_index < description_index: headers[i] = "date" headers.insert(i + 1, "description") else: headers[i] = "description" headers.insert(i + 1, "date") # Handle check/draft numbers if "check" in headers or "draft" in headers: resulted_headers = [] i = 0 while i < len(headers): if ( i + 1 < len(headers) and headers[i] == "check" and (headers[i + 1] == "no" or headers[i + 1] == "number") ): resulted_headers.append(headers[i] + " " + headers[i + 1]) i += 2 elif ( i + 1 < len(headers) and headers[i] == "draft" and (headers[i + 1] == "no" or headers[i + 1] == "number") ): resulted_headers.append(headers[i] + " " + headers[i + 1]) i += 2 else: resulted_headers.append(headers[i]) i += 1 resulted_headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), resulted_headers)) # Normalize header names for i in range(len(resulted_headers)): if any(keyword in resulted_headers[i].lower() for keyword in ["date", "day", "month", "year"]): resulted_headers[i] = "date" if any(keyword in resulted_headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]): resulted_headers[i] = "amount" if any(keyword in resulted_headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]): resulted_headers[i] = "balance" if any(keyword in resulted_headers[i].lower() for keyword in ["credit", "deposit", "cr"]): resulted_headers[i] = "credits" if any(keyword in resulted_headers[i].lower() for keyword in ["debit", "withdrawal", "dr"]): resulted_headers[i] = "debits" return resulted_headers # Normalize header names headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), headers)) for i in range(len(headers)): if any(keyword in headers[i].lower() for keyword in ["date", "day", "month", "year"]): headers[i] = "date" if any(keyword in headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]): headers[i] = "amount" if any(keyword in headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]): headers[i] = "balance" if any(keyword in headers[i].lower() for keyword in ["credit", "deposit"]): headers[i] = "credits" if any(keyword in headers[i].lower() for keyword in ["debit", "withdrawal"]): headers[i] = "debits" return headers def detect_row_data(self, headers: List[str], header_data: List[Dict], row_data: List[Dict], gap_threshold: int = 10) -> List[str]: """Detect row data based on headers and word positions.""" if "description" not in headers: gap_threshold = 5 def flatten_bbox(bbox): if isinstance(bbox[0], list): # [[x0, y0], [x1, y1]] return [bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1]] return bbox # Step 1: Get all x0, x1 for header words header_ranges = [] for word in header_data: x0, _, x1, _ = flatten_bbox(word["bbox"]) header_ranges.append((x0, x1)) # Step 2: Sort by x0 header_ranges.sort(key=lambda x: x[0]) # Step 3: Merge only close headers (preserve wide gaps) merged_ranges = [] temp_x0, temp_x1 = header_ranges[0] for x0, x1 in header_ranges[1:]: gap = x0 - temp_x1 if gap < gap_threshold: temp_x1 = max(temp_x1, x1) else: merged_ranges.append((temp_x0, temp_x1)) temp_x0, temp_x1 = x0, x1 merged_ranges.append((temp_x0, temp_x1)) # Step 4: Segment row_data based on horizontal gaps row_data_sorted = sorted(row_data, key=lambda w: flatten_bbox(w["bbox"])[0]) segments = [] current_segment = [row_data_sorted[0]] for i in range(1, len(row_data_sorted)): prev_x1 = flatten_bbox(row_data_sorted[i - 1]["bbox"])[2] curr_x0 = flatten_bbox(row_data_sorted[i]["bbox"])[0] if curr_x0 - prev_x1 > gap_threshold: segments.append(current_segment) current_segment = [row_data_sorted[i]] else: current_segment.append(row_data_sorted[i]) if current_segment: segments.append(current_segment) # Step 5: Assign each segment to a column row_values = [""] * len(headers) for segment in segments: seg_x0 = flatten_bbox(segment[0]["bbox"])[0] seg_x1 = flatten_bbox(segment[-1]["bbox"])[2] seg_center = (seg_x0 + seg_x1) / 2 seg_text = " ".join([w["word"] for w in segment]) assigned = False for idx, (hx0, hx1) in enumerate(merged_ranges): if hx0 <= seg_center <= hx1: row_values[idx] += seg_text + " " assigned = True break if not assigned: # Optionally assign to nearest column if center is outside range nearest_idx = min( range(len(merged_ranges)), key=lambda idx: abs( (merged_ranges[idx][0] + merged_ranges[idx][1]) / 2 - seg_center ), ) row_values[nearest_idx] += seg_text + " " final_row = self.repair_row_with_date_and_amount(headers, row_values) return [val.strip() for val in final_row] def check_table_tags(self, line: str, headers: List[str]) -> str: """Check and return table tag based on line content and headers.""" available_tags = ["transaction", "deposit", "withdrawal", "checks", "daily balance", "drafts", "service fee", "interest"] tag = "" if "deposit" in line.lower() or "credit" in line.lower(): tag = "deposit" elif "withdrawal" in line.lower() or "debit" in line.lower(): tag = "withdrawal" elif "checks" in line.lower(): tag = "checks" elif "drafts" in line.lower(): tag = "drafts" elif "service fee" in line.lower() or "fee" in line.lower(): tag = "service fee" elif "daily balance" in line.lower() or "balance" in line.lower(): tag = "daily balance" elif "interest" in line.lower(): tag = "interest" elif "transaction" in line.lower() or "transfer" in line.lower(): tag = "transaction" if "credits" in headers or "debits" in headers: tag = "transaction" for h in headers: if "check" in h.lower(): tag = "checks" break for h in headers: if "draft" in h.lower(): tag = "drafts" break return tag async def process_transaction_tables_with_bbox(self, extracted_text_list: List[List[Dict]]) -> Tuple[List[pd.DataFrame], List[str]]: """Process transaction tables with bounding box data.""" def _process_tables(): all_tables = [] table_tags = [] for block in extracted_text_list: headers = [] table_started = False current_table = [] current_row = {} header_words = [] for line_idx, line_bbox in enumerate(block): line = line_bbox["line"] line = line.strip() if not table_started and ("date" in line and "description" in line): headers = self.detect_headers(line_bbox) header_words = line_bbox["words"] date_flag = False description_flag = False for header in headers: if "date" in header.lower(): date_flag = True if "description" in header.lower(): description_flag = True if date_flag and description_flag: table_started = True current_row = {header: [] for header in headers} else: continue if line_idx - 1 >= 0: prev_line = block[line_idx - 1]["line"] tag = self.check_table_tags(prev_line, headers) if tag: table_tags.append(tag) elif len(table_tags) > 0: table_tags.append(table_tags[-1]) else: table_tags.append("transaction") continue elif (not table_started and ("date" in line and "amount" in line)) or ( not table_started and ("date" in line and "balance" in line) ): headers = self.detect_headers(line_bbox) header_words = line_bbox["words"] date_flag = False amount_flag = False balance_flag = False for header in headers: if "date" in header.lower(): date_flag = True if "amount" in header.lower(): amount_flag = True if "balance" in header.lower(): balance_flag = True if date_flag and (amount_flag or balance_flag): table_started = True current_row = {header: [] for header in headers} else: continue if line_idx - 1 >= 0: prev_line = block[line_idx - 1]["line"] tag = self.check_table_tags(prev_line, headers) if tag: table_tags.append(tag) elif len(table_tags) > 0: table_tags.append(table_tags[-1]) else: table_tags.append("transaction") continue if table_started and ("date" in line and "description" in line): max_len = max(len(v) for v in current_row.values()) for i in range(max_len): row_map = {} for key in current_row: row_map[key] = ( current_row[key][i] if i < len(current_row[key]) else "" ) current_table.append(row_map) df = pd.DataFrame(current_table) all_tables.append(df) current_table = [] headers = self.detect_headers(line_bbox) header_words = line_bbox["words"] date_flag = False description_flag = False for header in headers: if "date" in header.lower(): date_flag = True if "description" in header.lower(): description_flag = True if date_flag and description_flag: current_row = {header: [] for header in headers} else: continue if line_idx - 1 >= 0: prev_line = block[line_idx - 1]["line"] tag = self.check_table_tags(prev_line, headers) if tag: table_tags.append(tag) elif len(table_tags) > 0: table_tags.append(table_tags[-1]) else: table_tags.append("transaction") continue elif (table_started and ("date" in line and "amount" in line)) or ( table_started and ("date" in line and "balance" in line) ): max_len = max(len(v) for v in current_row.values()) for i in range(max_len): row_map = {} for key in current_row: row_map[key] = ( current_row[key][i] if i < len(current_row[key]) else "" ) current_table.append(row_map) df = pd.DataFrame(current_table) all_tables.append(df) current_table = [] headers = self.detect_headers(line_bbox) header_words = line_bbox["words"] date_flag = False amount_flag = False balance_flag = False for header in headers: if "date" in header.lower(): date_flag = True if "amount" in header.lower(): amount_flag = True if "balance" in header.lower(): balance_flag = True if date_flag and (amount_flag or balance_flag): current_row = {header: [] for header in headers} else: continue if line_idx - 1 >= 0: prev_line = block[line_idx - 1]["line"] tag = self.check_table_tags(prev_line, headers) if tag: table_tags.append(tag) elif len(table_tags) > 0: table_tags.append(table_tags[-1]) else: table_tags.append("transaction") continue if table_started: parts = self.detect_row_data(headers, header_words, line_bbox["words"]) for key, value in zip(headers, parts): current_row[key].append(value) max_len = max(len(v) for v in current_row.values()) for i in range(max_len): if ( "amount" in headers and current_row["amount"] and i < len(current_row["amount"]) and current_row["amount"][i] ): amount = self.extract_amount_or_return(current_row["amount"][i]) current_row["amount"][i] = amount if ( "balance" in headers and current_row["balance"] and i < len(current_row["balance"]) and current_row["balance"][i] ): amount = self.extract_amount_or_return(current_row["balance"][i]) current_row["balance"][i] = amount if ( "credits" in headers and current_row["credits"] and i < len(current_row["credits"]) and current_row["credits"][i] ): amount = self.extract_amount_or_return(current_row["credits"][i]) current_row["credits"][i] = amount if ( "debits" in headers and current_row["debits"] and i < len(current_row["debits"]) and current_row["debits"][i] ): amount = self.extract_amount_or_return(current_row["debits"][i]) current_row["debits"][i] = amount if ( "date" in headers and current_row["date"] and i < len(current_row["date"]) and current_row["date"][i] ): current_row["date"][i] = self.extract_date_or_return( current_row["date"][i] ) if ( "date" in headers and current_row["date"] and current_row["date"][0] and not self.is_date_word(current_row["date"][0]) or ( "amount" in headers and current_row["amount"][0] and not self.amount_pattern.match(current_row["amount"][0]) ) or ( "balance" in headers and current_row["balance"][0] and not self.amount_pattern.match(current_row["balance"][0]) ) or ( "credits" in headers and current_row["credits"][0] and not self.amount_pattern.match(current_row["credits"][0]) ) or ( "debits" in headers and current_row["debits"][0] and not self.amount_pattern.match(current_row["debits"][0]) ) ): if not current_table and len(table_tags) > 0 and table_tags[-1]: table_tags.pop() all_tables.append(pd.DataFrame(current_table)) current_table = [] current_row = {} header_words = [] headers = [] table_started = False else: for i in range(max_len): row_map = {} for key in current_row: row_map[key] = ( current_row[key][i] if i < len(current_row[key]) else "" ) current_table.append(row_map) current_row = {header: [] for header in headers} table_started = False if current_table: df = pd.DataFrame(current_table) all_tables.append(df) return all_tables, table_tags return await asyncio.get_event_loop().run_in_executor(None, _process_tables) async def process_tables(self, table: pd.DataFrame) -> pd.DataFrame: """Process the extracted table to clean and format it.""" def _process_table(): keywords = ["continue", "continued", "page", "next page", "total", "subtotal"] table_copy = table.copy() is_balance_column = "balance" in table_copy.columns is_amount_column = "amount" in table_copy.columns is_credits_column = "credits" in table_copy.columns is_debits_column = "debits" in table_copy.columns for idx, row in table_copy.iterrows(): if is_balance_column: if row["balance"] and not row["date"]: table_copy.loc[idx] = [""] * len(table_copy.columns) continue if is_amount_column: if row["amount"] and not row["date"]: table_copy.loc[idx] = [""] * len(table_copy.columns) continue if is_credits_column: if row["credits"] and not row["date"]: table_copy.loc[idx] = [""] * len(table_copy.columns) continue if is_debits_column: if row["debits"] and not row["date"]: table_copy.loc[idx] = [""] * len(table_copy.columns) continue for cell in row: if any(keyword in cell.lower() for keyword in keywords): table_copy.loc[idx] = [""] * len(table_copy.columns) break df = table_copy.copy() df = df.fillna("") # Fill NaNs with empty string for easier processing # Step 1: Identify key columns (case-insensitive match) lower_cols = [col.lower() for col in df.columns] date_col = next((col for col in df.columns if re.search(r'date', col, re.IGNORECASE)), None) value_cols = [col for col in df.columns if re.search(r'amount|balance|credits|debits', col, re.IGNORECASE)] if not date_col or not value_cols: return df def is_anchor(row): return bool(row[date_col].strip()) and any(row[col].strip() for col in value_cols) # Step 2: Loop over rows and identify anchor indices anchor_indices = [i for i, row in df.iterrows() if is_anchor(row)] for anchor_idx in anchor_indices: # Merge upward i = anchor_idx - 1 while i >= 0: if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""): break for col in df.columns: if col != date_col and col not in value_cols: df.at[anchor_idx, col] = (str(df.at[i, col]).strip() + " " + str(df.at[anchor_idx, col]).strip()).strip() df.iloc[i] = "" # Blank the merged row i -= 1 # Merge downward i = anchor_idx + 1 while i < len(df): if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""): break for col in df.columns: if col != date_col and col not in value_cols: df.at[anchor_idx, col] = (str(df.at[anchor_idx, col]).strip() + " " + str(df.at[i, col]).strip()).strip() df.iloc[i] = "" # Blank the merged row i += 1 df_copy = df.copy() col = "balance" if "balance" in df_copy.columns else "amount" for idx, row in df_copy.iterrows(): if not row[col] and not row[date_col]: df_copy.loc[idx] = [""] * len(df_copy.columns) df_copy = df_copy[~df_copy.apply(lambda row: all(cell == "" for cell in row), axis=1)].reset_index(drop=True) return df_copy return await asyncio.get_event_loop().run_in_executor(None, _process_table)