Spaces:
Runtime error
Runtime error
| import asyncio | |
| import re | |
| import pandas as pd | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from src.config.config import settings | |
| class TableExtractor: | |
| """Async table extractor for processing transaction tables.""" | |
| def __init__(self): | |
| self.date_pattern = re.compile( | |
| r"\b(?:" | |
| r"\d{1,2}[-/]\d{1,2}[-/]\d{2,4}" | |
| r"|\d{2,4}[-/]\d{1,2}[-/]\d{1,2}" | |
| r"|\d{1,2}[-/]\d{2,4}" | |
| r"|\d{2,4}[-/]\d{1,2}" | |
| r"|\d{1,2}[-/]\d{1,2}" | |
| r")\b" | |
| ) | |
| self.amount_pattern = re.compile(r'-?(?:\d{1,3}(?:,\d{2}){1,}(?:,\d{3})?|\d{1,3}(?:,\d{3})+|\d+)?\.\d{1,2}-?') | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| def match_by_pattern(self, text: str, pattern) -> bool: | |
| """Check if text matches a pattern.""" | |
| if pattern == self.amount_pattern and "-" not in text and len(text) > 6 and "," not in text: | |
| return False | |
| if pattern == self.amount_pattern and "-" in text and len(text) > 7 and "," not in text: | |
| return False | |
| return bool(pattern.fullmatch(text)) | |
| def extract_by_pattern(self, text: str, pattern) -> Tuple[Optional[str], Optional[str], Optional[str]]: | |
| """Extract value by pattern and return value, before, after.""" | |
| match = pattern.search(text) | |
| if match: | |
| before = text[:match.start()].strip() | |
| value = match.group() | |
| after = text[match.end():].strip() | |
| if pattern == self.amount_pattern and "-" not in value and len(value) > 6 and "," not in value: | |
| return None, None, None | |
| if pattern == self.amount_pattern and "-" in value and len(value) > 7 and "," not in value: | |
| return None, None, None | |
| return value, before, after | |
| return None, None, None | |
| def repair_row_with_date_and_amount(self, header: List[str], row: List[str]) -> List[str]: | |
| """Repair row data by extracting dates and amounts.""" | |
| result = row[:] | |
| n = len(header) | |
| for i, col in enumerate(header): | |
| val = result[i].strip() | |
| if col.lower() == "date": | |
| date, left, right = self.extract_by_pattern(val, self.date_pattern) | |
| if date: | |
| result[i] = date | |
| if left and i > 0 and header[i-1] != "date": | |
| result[i-1] = (result[i-1] + " " + left).strip() | |
| if right and i < n - 1 and header[i+1] != "date": | |
| result[i+1] = (right + " " + result[i+1]).strip() | |
| continue | |
| # Check previous column's last word | |
| if i > 0 and header[i-1] != "date": | |
| left_val = result[i-1].strip() | |
| tokens = left_val.split() | |
| if tokens: | |
| last_word = tokens[-1] | |
| date_check, _, _ = self.extract_by_pattern(last_word, self.date_pattern) | |
| if date_check: | |
| result[i] = date_check + " " + result[i] | |
| tokens.pop() # remove matched date | |
| result[i-1] = " ".join(tokens) | |
| again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern) | |
| if again_date: | |
| result[i] = again_date | |
| if again_left: | |
| result[i-1] = (result[i-1] + " " + again_left).strip() | |
| if again_right: | |
| result[i+1] = (again_right + " " + result[i+1]).strip() | |
| continue | |
| # Check next column's first word | |
| if i < n - 1 and header[i+1] != "date": | |
| right_val = result[i+1].strip() | |
| tokens = right_val.split() | |
| if tokens: | |
| first_word = tokens[0] | |
| date_check, _, _ = self.extract_by_pattern(first_word, self.date_pattern) | |
| if date_check: | |
| result[i] = result[i] + " " + date_check | |
| tokens.pop(0) | |
| result[i+1] = " ".join(tokens) | |
| again_date, again_left, again_right = self.extract_by_pattern(result[i], self.date_pattern) | |
| if again_date: | |
| result[i] = again_date | |
| if again_left: | |
| result[i-1] = (result[i-1] + " " + again_left).strip() | |
| if again_right: | |
| result[i+1] = (again_right + " " + result[i+1]).strip() | |
| continue | |
| # Check if the entire value is a date | |
| if not self.match_by_pattern(result[i].strip(), self.date_pattern): | |
| result[i] = "" | |
| # check left | |
| if i > 0 and header[i-1] != "date": | |
| result[i-1] = (result[i-1] + " " + val).strip() | |
| elif i < n - 1 and header[i+1] != "date": | |
| result[i+1] = (val + " " + result[i+1]).strip() | |
| elif col.lower() in ["amount", "balance", "credits", "debits"]: | |
| amt, left, right = self.extract_by_pattern(val, self.amount_pattern) | |
| if amt: | |
| result[i] = amt | |
| if left and i > 0: | |
| result[i-1] = (result[i-1] + " " + left).strip() | |
| if right and i < n - 1: | |
| result[i+1] = (right + " " + result[i+1]).strip() | |
| continue | |
| # Check previous column's last word | |
| if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]): | |
| left_val = result[i-1].strip() | |
| tokens = left_val.split() | |
| if tokens: | |
| last_word = tokens[-1] | |
| amt_check, _, _ = self.extract_by_pattern(last_word, self.amount_pattern) | |
| if amt_check: | |
| result[i] = amt_check + " " + result[i] | |
| tokens.pop() | |
| result[i-1] = " ".join(tokens) | |
| again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern) | |
| if again_amt: | |
| result[i] = again_amt | |
| if again_left: | |
| result[i-1] = (result[i-1] + " " + again_left).strip() | |
| if again_right: | |
| result[i+1] = (again_right + " " + result[i+1]).strip() | |
| continue | |
| # Check next column's first word | |
| if i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]): | |
| right_val = result[i+1].strip() | |
| tokens = right_val.split() | |
| if tokens: | |
| first_word = tokens[0] | |
| amt_check, _, _ = self.extract_by_pattern(first_word, self.amount_pattern) | |
| if amt_check: | |
| result[i] = result[i] + " " + amt_check | |
| tokens.pop(0) | |
| result[i+1] = " ".join(tokens) | |
| again_amt, again_left, again_right = self.extract_by_pattern(result[i], self.amount_pattern) | |
| if again_amt: | |
| result[i] = again_amt | |
| if again_left: | |
| result[i-1] = (result[i-1] + " " + again_left).strip() | |
| if again_right: | |
| result[i+1] = (again_right + " " + result[i+1]).strip() | |
| continue | |
| # Check if the entire value is an amount | |
| if not self.match_by_pattern(result[i].strip(), self.amount_pattern): | |
| result[i] = "" | |
| # check left | |
| if i > 0 and (header[i-1] not in ["amount", "balance", "credits", "debits"]): | |
| result[i-1] = (result[i-1] + " " + val).strip() | |
| elif i < n - 1 and (header[i+1] not in ["amount", "balance", "credits", "debits"]): | |
| result[i+1] = (val + " " + result[i+1]).strip() | |
| return result | |
| def extract_amount_or_return(self, line: str) -> str: | |
| """Extract amount from line or return original line.""" | |
| matches = self.amount_pattern.findall(line) | |
| if matches: | |
| match = self.amount_pattern.search(line) | |
| return match.group(0) if match else line | |
| return line | |
| def extract_date_or_return(self, line: str) -> str: | |
| """Extract date from line or return original line.""" | |
| matches = self.date_pattern.findall(line) | |
| if matches: | |
| match = self.date_pattern.search(line) | |
| return match.group(0) if match else line | |
| return line | |
| def is_date_word(self, word: str) -> bool: | |
| """Check if word is a date.""" | |
| try: | |
| return bool(self.date_pattern.fullmatch(word)) | |
| except ValueError: | |
| return False | |
| def detect_headers(self, line_data: Dict, gap_threshold_ratio: float = 0.1) -> List[str]: | |
| """Detect headers from line data.""" | |
| if "description" not in line_data["line"]: | |
| gap_threshold_ratio = 0.2 | |
| if "." in line_data["line"]: | |
| gap_threshold_ratio = 0.1 | |
| word_data = sorted(line_data["words"], key=lambda w: w["bbox"][0]) | |
| line = line_data["line"] | |
| if len(word_data) < 2: | |
| return [line.strip()] # Treat whole line as one header if only 1 word | |
| # Compute horizontal gaps between words | |
| gaps = [] | |
| for i in range(len(word_data) - 1): | |
| x1 = word_data[i]["bbox"][2] # end x of current word | |
| x2 = word_data[i + 1]["bbox"][0] # start x of next word | |
| gaps.append(x2 - x1) | |
| avg_gap = sum(gaps) / len(gaps) | |
| threshold = avg_gap * gap_threshold_ratio | |
| # Split words into groups based on large gaps (assumed column breaks) | |
| headers = [] | |
| current_header = [word_data[0]["word"]] | |
| for i in range(1, len(word_data)): | |
| gap = gaps[i - 1] | |
| if gap > threshold: | |
| headers.append(" ".join(current_header)) | |
| current_header = [] | |
| current_header.append(word_data[i]["word"]) | |
| if current_header: | |
| headers.append(" ".join(current_header)) | |
| # Process special cases | |
| for i in range(len(headers)): | |
| if "date" in headers[i].lower() and "description" in headers[i].lower(): | |
| header_checker = headers[i].split(" ") | |
| date_index = header_checker.index("date") | |
| description_index = header_checker.index("description") | |
| if date_index < description_index: | |
| headers[i] = "date" | |
| headers.insert(i + 1, "description") | |
| else: | |
| headers[i] = "description" | |
| headers.insert(i + 1, "date") | |
| # Handle check/draft numbers | |
| if "check" in headers or "draft" in headers: | |
| resulted_headers = [] | |
| i = 0 | |
| while i < len(headers): | |
| if ( | |
| i + 1 < len(headers) | |
| and headers[i] == "check" | |
| and (headers[i + 1] == "no" or headers[i + 1] == "number") | |
| ): | |
| resulted_headers.append(headers[i] + " " + headers[i + 1]) | |
| i += 2 | |
| elif ( | |
| i + 1 < len(headers) | |
| and headers[i] == "draft" | |
| and (headers[i + 1] == "no" or headers[i + 1] == "number") | |
| ): | |
| resulted_headers.append(headers[i] + " " + headers[i + 1]) | |
| i += 2 | |
| else: | |
| resulted_headers.append(headers[i]) | |
| i += 1 | |
| resulted_headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), resulted_headers)) | |
| # Normalize header names | |
| for i in range(len(resulted_headers)): | |
| if any(keyword in resulted_headers[i].lower() for keyword in ["date", "day", "month", "year"]): | |
| resulted_headers[i] = "date" | |
| if any(keyword in resulted_headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]): | |
| resulted_headers[i] = "amount" | |
| if any(keyword in resulted_headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]): | |
| resulted_headers[i] = "balance" | |
| if any(keyword in resulted_headers[i].lower() for keyword in ["credit", "deposit", "cr"]): | |
| resulted_headers[i] = "credits" | |
| if any(keyword in resulted_headers[i].lower() for keyword in ["debit", "withdrawal", "dr"]): | |
| resulted_headers[i] = "debits" | |
| return resulted_headers | |
| # Normalize header names | |
| headers = list(map(lambda x: re.sub(r'[^\w\s]', '', x).strip(), headers)) | |
| for i in range(len(headers)): | |
| if any(keyword in headers[i].lower() for keyword in ["date", "day", "month", "year"]): | |
| headers[i] = "date" | |
| if any(keyword in headers[i].lower() for keyword in ["amount", "total", "sum", "price", "value", "cost", "amt"]): | |
| headers[i] = "amount" | |
| if any(keyword in headers[i].lower() for keyword in ["balance", "final", "closing", "current", "available", "running", "remaining", "left", "bal", "remain"]): | |
| headers[i] = "balance" | |
| if any(keyword in headers[i].lower() for keyword in ["credit", "deposit"]): | |
| headers[i] = "credits" | |
| if any(keyword in headers[i].lower() for keyword in ["debit", "withdrawal"]): | |
| headers[i] = "debits" | |
| return headers | |
| def detect_row_data(self, headers: List[str], header_data: List[Dict], row_data: List[Dict], gap_threshold: int = 10) -> List[str]: | |
| """Detect row data based on headers and word positions.""" | |
| if "description" not in headers: | |
| gap_threshold = 5 | |
| def flatten_bbox(bbox): | |
| if isinstance(bbox[0], list): # [[x0, y0], [x1, y1]] | |
| return [bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1]] | |
| return bbox | |
| # Step 1: Get all x0, x1 for header words | |
| header_ranges = [] | |
| for word in header_data: | |
| x0, _, x1, _ = flatten_bbox(word["bbox"]) | |
| header_ranges.append((x0, x1)) | |
| # Step 2: Sort by x0 | |
| header_ranges.sort(key=lambda x: x[0]) | |
| # Step 3: Merge only close headers (preserve wide gaps) | |
| merged_ranges = [] | |
| temp_x0, temp_x1 = header_ranges[0] | |
| for x0, x1 in header_ranges[1:]: | |
| gap = x0 - temp_x1 | |
| if gap < gap_threshold: | |
| temp_x1 = max(temp_x1, x1) | |
| else: | |
| merged_ranges.append((temp_x0, temp_x1)) | |
| temp_x0, temp_x1 = x0, x1 | |
| merged_ranges.append((temp_x0, temp_x1)) | |
| # Step 4: Segment row_data based on horizontal gaps | |
| row_data_sorted = sorted(row_data, key=lambda w: flatten_bbox(w["bbox"])[0]) | |
| segments = [] | |
| current_segment = [row_data_sorted[0]] | |
| for i in range(1, len(row_data_sorted)): | |
| prev_x1 = flatten_bbox(row_data_sorted[i - 1]["bbox"])[2] | |
| curr_x0 = flatten_bbox(row_data_sorted[i]["bbox"])[0] | |
| if curr_x0 - prev_x1 > gap_threshold: | |
| segments.append(current_segment) | |
| current_segment = [row_data_sorted[i]] | |
| else: | |
| current_segment.append(row_data_sorted[i]) | |
| if current_segment: | |
| segments.append(current_segment) | |
| # Step 5: Assign each segment to a column | |
| row_values = [""] * len(headers) | |
| for segment in segments: | |
| seg_x0 = flatten_bbox(segment[0]["bbox"])[0] | |
| seg_x1 = flatten_bbox(segment[-1]["bbox"])[2] | |
| seg_center = (seg_x0 + seg_x1) / 2 | |
| seg_text = " ".join([w["word"] for w in segment]) | |
| assigned = False | |
| for idx, (hx0, hx1) in enumerate(merged_ranges): | |
| if hx0 <= seg_center <= hx1: | |
| row_values[idx] += seg_text + " " | |
| assigned = True | |
| break | |
| if not assigned: | |
| # Optionally assign to nearest column if center is outside range | |
| nearest_idx = min( | |
| range(len(merged_ranges)), | |
| key=lambda idx: abs( | |
| (merged_ranges[idx][0] + merged_ranges[idx][1]) / 2 - seg_center | |
| ), | |
| ) | |
| row_values[nearest_idx] += seg_text + " " | |
| final_row = self.repair_row_with_date_and_amount(headers, row_values) | |
| return [val.strip() for val in final_row] | |
| def check_table_tags(self, line: str, headers: List[str]) -> str: | |
| """Check and return table tag based on line content and headers.""" | |
| available_tags = ["transaction", "deposit", "withdrawal", "checks", "daily balance", "drafts", "service fee", "interest"] | |
| tag = "" | |
| if "deposit" in line.lower() or "credit" in line.lower(): | |
| tag = "deposit" | |
| elif "withdrawal" in line.lower() or "debit" in line.lower(): | |
| tag = "withdrawal" | |
| elif "checks" in line.lower(): | |
| tag = "checks" | |
| elif "drafts" in line.lower(): | |
| tag = "drafts" | |
| elif "service fee" in line.lower() or "fee" in line.lower(): | |
| tag = "service fee" | |
| elif "daily balance" in line.lower() or "balance" in line.lower(): | |
| tag = "daily balance" | |
| elif "interest" in line.lower(): | |
| tag = "interest" | |
| elif "transaction" in line.lower() or "transfer" in line.lower(): | |
| tag = "transaction" | |
| if "credits" in headers or "debits" in headers: | |
| tag = "transaction" | |
| for h in headers: | |
| if "check" in h.lower(): | |
| tag = "checks" | |
| break | |
| for h in headers: | |
| if "draft" in h.lower(): | |
| tag = "drafts" | |
| break | |
| return tag | |
| async def process_transaction_tables_with_bbox(self, extracted_text_list: List[List[Dict]]) -> Tuple[List[pd.DataFrame], List[str]]: | |
| """Process transaction tables with bounding box data.""" | |
| def _process_tables(): | |
| all_tables = [] | |
| table_tags = [] | |
| for block in extracted_text_list: | |
| headers = [] | |
| table_started = False | |
| current_table = [] | |
| current_row = {} | |
| header_words = [] | |
| for line_idx, line_bbox in enumerate(block): | |
| line = line_bbox["line"] | |
| line = line.strip() | |
| if not table_started and ("date" in line and "description" in line): | |
| headers = self.detect_headers(line_bbox) | |
| header_words = line_bbox["words"] | |
| date_flag = False | |
| description_flag = False | |
| for header in headers: | |
| if "date" in header.lower(): | |
| date_flag = True | |
| if "description" in header.lower(): | |
| description_flag = True | |
| if date_flag and description_flag: | |
| table_started = True | |
| current_row = {header: [] for header in headers} | |
| else: | |
| continue | |
| if line_idx - 1 >= 0: | |
| prev_line = block[line_idx - 1]["line"] | |
| tag = self.check_table_tags(prev_line, headers) | |
| if tag: | |
| table_tags.append(tag) | |
| elif len(table_tags) > 0: | |
| table_tags.append(table_tags[-1]) | |
| else: | |
| table_tags.append("transaction") | |
| continue | |
| elif (not table_started and ("date" in line and "amount" in line)) or ( | |
| not table_started and ("date" in line and "balance" in line) | |
| ): | |
| headers = self.detect_headers(line_bbox) | |
| header_words = line_bbox["words"] | |
| date_flag = False | |
| amount_flag = False | |
| balance_flag = False | |
| for header in headers: | |
| if "date" in header.lower(): | |
| date_flag = True | |
| if "amount" in header.lower(): | |
| amount_flag = True | |
| if "balance" in header.lower(): | |
| balance_flag = True | |
| if date_flag and (amount_flag or balance_flag): | |
| table_started = True | |
| current_row = {header: [] for header in headers} | |
| else: | |
| continue | |
| if line_idx - 1 >= 0: | |
| prev_line = block[line_idx - 1]["line"] | |
| tag = self.check_table_tags(prev_line, headers) | |
| if tag: | |
| table_tags.append(tag) | |
| elif len(table_tags) > 0: | |
| table_tags.append(table_tags[-1]) | |
| else: | |
| table_tags.append("transaction") | |
| continue | |
| if table_started and ("date" in line and "description" in line): | |
| max_len = max(len(v) for v in current_row.values()) | |
| for i in range(max_len): | |
| row_map = {} | |
| for key in current_row: | |
| row_map[key] = ( | |
| current_row[key][i] if i < len(current_row[key]) else "" | |
| ) | |
| current_table.append(row_map) | |
| df = pd.DataFrame(current_table) | |
| all_tables.append(df) | |
| current_table = [] | |
| headers = self.detect_headers(line_bbox) | |
| header_words = line_bbox["words"] | |
| date_flag = False | |
| description_flag = False | |
| for header in headers: | |
| if "date" in header.lower(): | |
| date_flag = True | |
| if "description" in header.lower(): | |
| description_flag = True | |
| if date_flag and description_flag: | |
| current_row = {header: [] for header in headers} | |
| else: | |
| continue | |
| if line_idx - 1 >= 0: | |
| prev_line = block[line_idx - 1]["line"] | |
| tag = self.check_table_tags(prev_line, headers) | |
| if tag: | |
| table_tags.append(tag) | |
| elif len(table_tags) > 0: | |
| table_tags.append(table_tags[-1]) | |
| else: | |
| table_tags.append("transaction") | |
| continue | |
| elif (table_started and ("date" in line and "amount" in line)) or ( | |
| table_started and ("date" in line and "balance" in line) | |
| ): | |
| max_len = max(len(v) for v in current_row.values()) | |
| for i in range(max_len): | |
| row_map = {} | |
| for key in current_row: | |
| row_map[key] = ( | |
| current_row[key][i] if i < len(current_row[key]) else "" | |
| ) | |
| current_table.append(row_map) | |
| df = pd.DataFrame(current_table) | |
| all_tables.append(df) | |
| current_table = [] | |
| headers = self.detect_headers(line_bbox) | |
| header_words = line_bbox["words"] | |
| date_flag = False | |
| amount_flag = False | |
| balance_flag = False | |
| for header in headers: | |
| if "date" in header.lower(): | |
| date_flag = True | |
| if "amount" in header.lower(): | |
| amount_flag = True | |
| if "balance" in header.lower(): | |
| balance_flag = True | |
| if date_flag and (amount_flag or balance_flag): | |
| current_row = {header: [] for header in headers} | |
| else: | |
| continue | |
| if line_idx - 1 >= 0: | |
| prev_line = block[line_idx - 1]["line"] | |
| tag = self.check_table_tags(prev_line, headers) | |
| if tag: | |
| table_tags.append(tag) | |
| elif len(table_tags) > 0: | |
| table_tags.append(table_tags[-1]) | |
| else: | |
| table_tags.append("transaction") | |
| continue | |
| if table_started: | |
| parts = self.detect_row_data(headers, header_words, line_bbox["words"]) | |
| for key, value in zip(headers, parts): | |
| current_row[key].append(value) | |
| max_len = max(len(v) for v in current_row.values()) | |
| for i in range(max_len): | |
| if ( | |
| "amount" in headers | |
| and current_row["amount"] | |
| and i < len(current_row["amount"]) | |
| and current_row["amount"][i] | |
| ): | |
| amount = self.extract_amount_or_return(current_row["amount"][i]) | |
| current_row["amount"][i] = amount | |
| if ( | |
| "balance" in headers | |
| and current_row["balance"] | |
| and i < len(current_row["balance"]) | |
| and current_row["balance"][i] | |
| ): | |
| amount = self.extract_amount_or_return(current_row["balance"][i]) | |
| current_row["balance"][i] = amount | |
| if ( | |
| "credits" in headers | |
| and current_row["credits"] | |
| and i < len(current_row["credits"]) | |
| and current_row["credits"][i] | |
| ): | |
| amount = self.extract_amount_or_return(current_row["credits"][i]) | |
| current_row["credits"][i] = amount | |
| if ( | |
| "debits" in headers | |
| and current_row["debits"] | |
| and i < len(current_row["debits"]) | |
| and current_row["debits"][i] | |
| ): | |
| amount = self.extract_amount_or_return(current_row["debits"][i]) | |
| current_row["debits"][i] = amount | |
| if ( | |
| "date" in headers | |
| and current_row["date"] | |
| and i < len(current_row["date"]) | |
| and current_row["date"][i] | |
| ): | |
| current_row["date"][i] = self.extract_date_or_return( | |
| current_row["date"][i] | |
| ) | |
| if ( | |
| "date" in headers | |
| and current_row["date"] | |
| and current_row["date"][0] | |
| and not self.is_date_word(current_row["date"][0]) | |
| or ( | |
| "amount" in headers | |
| and current_row["amount"][0] | |
| and not self.amount_pattern.match(current_row["amount"][0]) | |
| ) | |
| or ( | |
| "balance" in headers | |
| and current_row["balance"][0] | |
| and not self.amount_pattern.match(current_row["balance"][0]) | |
| ) | |
| or ( | |
| "credits" in headers | |
| and current_row["credits"][0] | |
| and not self.amount_pattern.match(current_row["credits"][0]) | |
| ) | |
| or ( | |
| "debits" in headers | |
| and current_row["debits"][0] | |
| and not self.amount_pattern.match(current_row["debits"][0]) | |
| ) | |
| ): | |
| if not current_table and len(table_tags) > 0 and table_tags[-1]: | |
| table_tags.pop() | |
| all_tables.append(pd.DataFrame(current_table)) | |
| current_table = [] | |
| current_row = {} | |
| header_words = [] | |
| headers = [] | |
| table_started = False | |
| else: | |
| for i in range(max_len): | |
| row_map = {} | |
| for key in current_row: | |
| row_map[key] = ( | |
| current_row[key][i] if i < len(current_row[key]) else "" | |
| ) | |
| current_table.append(row_map) | |
| current_row = {header: [] for header in headers} | |
| table_started = False | |
| if current_table: | |
| df = pd.DataFrame(current_table) | |
| all_tables.append(df) | |
| return all_tables, table_tags | |
| return await asyncio.get_event_loop().run_in_executor(None, _process_tables) | |
| async def process_tables(self, table: pd.DataFrame) -> pd.DataFrame: | |
| """Process the extracted table to clean and format it.""" | |
| def _process_table(): | |
| keywords = ["continue", "continued", "page", "next page", "total", "subtotal"] | |
| table_copy = table.copy() | |
| is_balance_column = "balance" in table_copy.columns | |
| is_amount_column = "amount" in table_copy.columns | |
| is_credits_column = "credits" in table_copy.columns | |
| is_debits_column = "debits" in table_copy.columns | |
| for idx, row in table_copy.iterrows(): | |
| if is_balance_column: | |
| if row["balance"] and not row["date"]: | |
| table_copy.loc[idx] = [""] * len(table_copy.columns) | |
| continue | |
| if is_amount_column: | |
| if row["amount"] and not row["date"]: | |
| table_copy.loc[idx] = [""] * len(table_copy.columns) | |
| continue | |
| if is_credits_column: | |
| if row["credits"] and not row["date"]: | |
| table_copy.loc[idx] = [""] * len(table_copy.columns) | |
| continue | |
| if is_debits_column: | |
| if row["debits"] and not row["date"]: | |
| table_copy.loc[idx] = [""] * len(table_copy.columns) | |
| continue | |
| for cell in row: | |
| if any(keyword in cell.lower() for keyword in keywords): | |
| table_copy.loc[idx] = [""] * len(table_copy.columns) | |
| break | |
| df = table_copy.copy() | |
| df = df.fillna("") # Fill NaNs with empty string for easier processing | |
| # Step 1: Identify key columns (case-insensitive match) | |
| lower_cols = [col.lower() for col in df.columns] | |
| date_col = next((col for col in df.columns if re.search(r'date', col, re.IGNORECASE)), None) | |
| value_cols = [col for col in df.columns if re.search(r'amount|balance|credits|debits', col, re.IGNORECASE)] | |
| if not date_col or not value_cols: | |
| return df | |
| def is_anchor(row): | |
| return bool(row[date_col].strip()) and any(row[col].strip() for col in value_cols) | |
| # Step 2: Loop over rows and identify anchor indices | |
| anchor_indices = [i for i, row in df.iterrows() if is_anchor(row)] | |
| for anchor_idx in anchor_indices: | |
| # Merge upward | |
| i = anchor_idx - 1 | |
| while i >= 0: | |
| if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""): | |
| break | |
| for col in df.columns: | |
| if col != date_col and col not in value_cols: | |
| df.at[anchor_idx, col] = (str(df.at[i, col]).strip() + " " + str(df.at[anchor_idx, col]).strip()).strip() | |
| df.iloc[i] = "" # Blank the merged row | |
| i -= 1 | |
| # Merge downward | |
| i = anchor_idx + 1 | |
| while i < len(df): | |
| if is_anchor(df.iloc[i]) or df.iloc[i].isnull().all() or all(df.iloc[i] == ""): | |
| break | |
| for col in df.columns: | |
| if col != date_col and col not in value_cols: | |
| df.at[anchor_idx, col] = (str(df.at[anchor_idx, col]).strip() + " " + str(df.at[i, col]).strip()).strip() | |
| df.iloc[i] = "" # Blank the merged row | |
| i += 1 | |
| df_copy = df.copy() | |
| col = "balance" if "balance" in df_copy.columns else "amount" | |
| for idx, row in df_copy.iterrows(): | |
| if not row[col] and not row[date_col]: | |
| df_copy.loc[idx] = [""] * len(df_copy.columns) | |
| df_copy = df_copy[~df_copy.apply(lambda row: all(cell == "" for cell in row), axis=1)].reset_index(drop=True) | |
| return df_copy | |
| return await asyncio.get_event_loop().run_in_executor(None, _process_table) |