Spaces:
Build error
Build error
| import re | |
| import pandas as pd | |
| import pdfplumber | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional | |
| import logging | |
| from PIL import Image | |
| import pytesseract | |
| import docx | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| from pdf2image import convert_from_path | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class FinancialStatementExtractor: | |
| """Extract financial statement data using AI for normalization with deterministic fallback.""" | |
| def __init__(self): | |
| try: | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.ai_available = True | |
| logger.info("AI model loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"AI model not available: {e}") | |
| self.model = None | |
| self.ai_available = False | |
| self.standard_items = { | |
| 'Revenue From Operations': ['revenue from operations', 'revenue from ops', 'operating revenue', 'sales revenue'], | |
| 'Other Income': ['other income', 'other sources', 'other operating revenues', 'miscellaneous income'], | |
| 'Total Revenue': ['total income', 'total revenue', 'gross income', 'total revenue from operations'], | |
| 'Cost Of Materials Consumed': ['cost of materials consumed', 'cost of goods sold', 'cogs', 'material costs', 'materials consumed'], | |
| 'Purchases': ['purchases', 'purchase of products', 'purchases during year', 'stock purchases'], | |
| 'Change In Inventory': ['change in inventory', 'changes in inventories', 'inventory changes', 'stock changes'], | |
| 'Employee Benefit Expenses': ['employee benefit expenses', 'employee benefits expense', 'salaries wages', 'staff costs', 'personnel expenses'], | |
| 'Depreciation': ['depreciation', 'depreciation amortization', 'depreciation and amortisation'], | |
| 'Finance Costs': ['finance costs', 'interest expense', 'borrowing costs', 'interest paid'], | |
| 'Other Expenses': ['other expenses', 'administrative expenses', 'operating expenses'], | |
| 'Gross Profit': ['gross profit', 'gross margin'], | |
| 'EBITDA': ['ebitda', 'earnings before interest tax depreciation'], | |
| 'EBIT': ['ebit', 'operating profit', 'earnings before interest tax'], | |
| 'Profit Before Tax': ['profit before tax', 'pbt', 'pre-tax profit', 'profit before exceptional items and tax'], | |
| 'Tax Expense': ['tax expense', 'income tax', 'taxation', 'current tax', 'deferred tax'], | |
| 'Profit After Tax': ['profit after tax', 'pat', 'net profit', 'net income', 'profit for the year'] | |
| } | |
| if self.ai_available: | |
| self.standard_embeddings = {} | |
| for standard_name, variations in self.standard_items.items(): | |
| all_texts = [standard_name] + variations | |
| embeddings = self.model.encode(all_texts) | |
| self.standard_embeddings[standard_name] = np.mean(embeddings, axis=0) | |
| self.line_item_patterns = [ | |
| r'^([A-Za-z][A-Za-z\s,\(\)\-&/]+?)\s+([\d,\.\-\(\)]+(?:\s+[\d,\.\-\(\)]+)*)', # Line item with numbers | |
| r'([A-Za-z][A-Za-z\s,\(\)\-&/]{3,}?)[\s:]+([0-9,\.\-\(\)]+)', # With separator | |
| ] | |
| self.year_patterns = [ | |
| r'FY[\s]?(\d{2,4})', | |
| r'March\s+31,?\s*(\d{4})', | |
| r'20(\d{2})', | |
| r'Year\s+ended.*?(\d{4})', | |
| ] | |
| def extract_from_file(self, file_path: str) -> Dict: | |
| """Extract financial data from file.""" | |
| path = Path(file_path) | |
| extension = path.suffix.lower() | |
| try: | |
| if extension == '.pdf': | |
| text = self._extract_from_pdf_with_ocr(file_path) | |
| elif extension in ['.docx', '.doc']: | |
| text = self._extract_from_docx(file_path) | |
| elif extension in ['.png', '.jpg', '.jpeg']: | |
| text = self._extract_from_image(file_path) | |
| elif extension == '.txt': | |
| text = self._extract_from_txt(file_path) | |
| else: | |
| return {'status': 'error', 'message': f'Unsupported file format: {extension}'} | |
| if not text or len(text.strip()) < 50: | |
| return {'status': 'error', 'message': 'No text could be extracted from document'} | |
| logger.info(f"Extracted {len(text)} characters of text") | |
| return self._process_text(text) | |
| except Exception as e: | |
| logger.error(f"Error extracting from {file_path}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {'status': 'error', 'message': str(e)} | |
| def _extract_from_pdf_with_ocr(self, file_path: str) -> str: | |
| """Extract text from PDF, using OCR if needed.""" | |
| text = "" | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text and len(page_text.strip()) > 50: | |
| text += page_text + "\n" | |
| except Exception as e: | |
| logger.warning(f"pdfplumber extraction failed: {e}") | |
| if len(text.strip()) > 100: | |
| logger.info("Extracted text directly from PDF") | |
| return text | |
| logger.info("PDF appears to be image-based, using OCR...") | |
| try: | |
| images = convert_from_path(file_path, dpi=300) | |
| for i, image in enumerate(images): | |
| logger.info(f"OCR on page {i+1}/{len(images)}") | |
| page_text = pytesseract.image_to_string(image, lang='eng', config='--psm 6') | |
| if page_text: | |
| text += page_text + "\n" | |
| except Exception as e: | |
| logger.error(f"OCR failed: {e}") | |
| return "" | |
| return text | |
| def _extract_from_docx(self, file_path: str) -> str: | |
| doc = docx.Document(file_path) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| return text | |
| def _extract_from_image(self, file_path: str) -> str: | |
| image = Image.open(file_path) | |
| text = pytesseract.image_to_string(image, lang='eng', config='--psm 6') | |
| return text | |
| def _extract_from_txt(self, file_path: str) -> str: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text = f.read() | |
| return text | |
| def _process_text(self, text: str) -> Dict: | |
| """Process extracted text to identify financial line items.""" | |
| lines = text.split('\n') | |
| years = self._extract_years(text) | |
| financial_data = [] | |
| logger.info(f"Processing {len(lines)} lines of text") | |
| logger.info(f"Years found: {years}") | |
| for line in lines: | |
| line = line.strip() | |
| if not line or len(line) < 5: | |
| continue | |
| for pattern in self.line_item_patterns: | |
| match = re.search(pattern, line) | |
| if match: | |
| item_name = match.group(1).strip() | |
| values_str = match.group(2) | |
| values = self._extract_numbers(values_str) | |
| if values and self._is_financial_item(item_name): | |
| normalized_name = self._normalize_item_name_ai(item_name) | |
| financial_data.append({ | |
| 'item': normalized_name, | |
| 'values': values | |
| }) | |
| logger.debug(f"Found: {normalized_name} = {values}") | |
| break | |
| logger.info(f"Extracted {len(financial_data)} financial line items") | |
| if not financial_data: | |
| return {'status': 'error', 'message': 'No financial data found in document'} | |
| df = self._create_dataframe(financial_data, years) | |
| return { | |
| 'status': 'success', | |
| 'dataframe': df, | |
| 'method': 'AI-powered semantic matching with OCR' if self.ai_available else 'Rule-based matching with OCR', | |
| 'categories': len(financial_data) | |
| } | |
| def _normalize_item_name_ai(self, item_name: str) -> str: | |
| """Use AI to normalize item names.""" | |
| if not self.ai_available: | |
| return self._normalize_deterministic(item_name) | |
| try: | |
| item_embedding = self.model.encode(item_name) | |
| best_match = None | |
| best_similarity = -1 | |
| for standard_name, standard_embedding in self.standard_embeddings.items(): | |
| similarity = cosine_similarity( | |
| item_embedding.reshape(1, -1), | |
| standard_embedding.reshape(1, -1) | |
| )[0][0] | |
| if similarity > best_similarity: | |
| best_similarity = similarity | |
| best_match = standard_name | |
| if best_similarity > 0.4: # Lower threshold | |
| return best_match | |
| else: | |
| return self._normalize_deterministic(item_name) | |
| except Exception as e: | |
| logger.warning(f"AI normalization failed: {e}") | |
| return self._normalize_deterministic(item_name) | |
| def _normalize_deterministic(self, item_name: str) -> str: | |
| """Deterministic fallback normalization.""" | |
| item_lower = item_name.lower().strip() | |
| for standard_name, variations in self.standard_items.items(): | |
| for variation in variations: | |
| if variation in item_lower or item_lower in variation: | |
| return standard_name | |
| cleaned = ' '.join(item_name.split()) | |
| cleaned = cleaned.title() | |
| cleaned = cleaned.rstrip('.:,;-') | |
| return cleaned | |
| def _extract_years(self, text: str) -> List[str]: | |
| years = [] | |
| for pattern in self.year_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| for match in matches: | |
| if len(match) == 2: | |
| year = f"FY {match}" | |
| elif len(match) == 4: | |
| year = f"FY {match[2:]}" | |
| else: | |
| year = f"FY {match}" | |
| if year not in years and len(years) < 10: | |
| years.append(year) | |
| if not years: | |
| years = ['Year 1', 'Year 2', 'Year 3'] | |
| return sorted(set(years), reverse=True)[:10] | |
| def _extract_numbers(self, text: str) -> List[float]: | |
| text = text.replace(',', '').replace('(', '-').replace(')', '') | |
| number_pattern = r'-?\d+\.?\d*' | |
| matches = re.findall(number_pattern, text) | |
| values = [] | |
| for match in matches: | |
| try: | |
| val = float(match) | |
| if abs(val) > 0.01: # Ignore very small numbers | |
| values.append(val) | |
| except ValueError: | |
| continue | |
| return values | |
| def _is_financial_item(self, item_name: str) -> bool: | |
| item_lower = item_name.lower() | |
| financial_keywords = [ | |
| 'revenue', 'income', 'sales', 'expense', 'cost', 'profit', 'loss', | |
| 'ebitda', 'ebit', 'tax', 'depreciation', 'amortization', 'interest', | |
| 'margin', 'cash', 'inventory', 'purchase', 'employee', 'wage', 'salary', | |
| 'benefit', 'finance', 'total', 'other', 'operating' | |
| ] | |
| if len(item_name) < 3 or item_name[0].isdigit(): | |
| return False | |
| return any(keyword in item_lower for keyword in financial_keywords) | |
| def _create_dataframe(self, financial_data: List[Dict], years: List[str]) -> pd.DataFrame: | |
| data = {'Particulars': []} | |
| for year in years: | |
| data[year] = [] | |
| for item in financial_data: | |
| data['Particulars'].append(item['item']) | |
| values = item['values'] | |
| for i, year in enumerate(years): | |
| if i < len(values): | |
| data[year].append(values[i]) | |
| else: | |
| data[year].append(None) | |
| return pd.DataFrame(data) |