Amerue's picture
Updated extractor.py
e61fc87 verified
import re
import pandas as pd
import pdfplumber
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import logging
from PIL import Image
import pytesseract
import docx
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from pdf2image import convert_from_path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FinancialStatementExtractor:
"""Extract financial statement data using AI for normalization with deterministic fallback."""
def __init__(self):
try:
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.ai_available = True
logger.info("AI model loaded successfully")
except Exception as e:
logger.warning(f"AI model not available: {e}")
self.model = None
self.ai_available = False
self.standard_items = {
'Revenue From Operations': ['revenue from operations', 'revenue from ops', 'operating revenue', 'sales revenue'],
'Other Income': ['other income', 'other sources', 'other operating revenues', 'miscellaneous income'],
'Total Revenue': ['total income', 'total revenue', 'gross income', 'total revenue from operations'],
'Cost Of Materials Consumed': ['cost of materials consumed', 'cost of goods sold', 'cogs', 'material costs', 'materials consumed'],
'Purchases': ['purchases', 'purchase of products', 'purchases during year', 'stock purchases'],
'Change In Inventory': ['change in inventory', 'changes in inventories', 'inventory changes', 'stock changes'],
'Employee Benefit Expenses': ['employee benefit expenses', 'employee benefits expense', 'salaries wages', 'staff costs', 'personnel expenses'],
'Depreciation': ['depreciation', 'depreciation amortization', 'depreciation and amortisation'],
'Finance Costs': ['finance costs', 'interest expense', 'borrowing costs', 'interest paid'],
'Other Expenses': ['other expenses', 'administrative expenses', 'operating expenses'],
'Gross Profit': ['gross profit', 'gross margin'],
'EBITDA': ['ebitda', 'earnings before interest tax depreciation'],
'EBIT': ['ebit', 'operating profit', 'earnings before interest tax'],
'Profit Before Tax': ['profit before tax', 'pbt', 'pre-tax profit', 'profit before exceptional items and tax'],
'Tax Expense': ['tax expense', 'income tax', 'taxation', 'current tax', 'deferred tax'],
'Profit After Tax': ['profit after tax', 'pat', 'net profit', 'net income', 'profit for the year']
}
if self.ai_available:
self.standard_embeddings = {}
for standard_name, variations in self.standard_items.items():
all_texts = [standard_name] + variations
embeddings = self.model.encode(all_texts)
self.standard_embeddings[standard_name] = np.mean(embeddings, axis=0)
self.line_item_patterns = [
r'^([A-Za-z][A-Za-z\s,\(\)\-&/]+?)\s+([\d,\.\-\(\)]+(?:\s+[\d,\.\-\(\)]+)*)', # Line item with numbers
r'([A-Za-z][A-Za-z\s,\(\)\-&/]{3,}?)[\s:]+([0-9,\.\-\(\)]+)', # With separator
]
self.year_patterns = [
r'FY[\s]?(\d{2,4})',
r'March\s+31,?\s*(\d{4})',
r'20(\d{2})',
r'Year\s+ended.*?(\d{4})',
]
def extract_from_file(self, file_path: str) -> Dict:
"""Extract financial data from file."""
path = Path(file_path)
extension = path.suffix.lower()
try:
if extension == '.pdf':
text = self._extract_from_pdf_with_ocr(file_path)
elif extension in ['.docx', '.doc']:
text = self._extract_from_docx(file_path)
elif extension in ['.png', '.jpg', '.jpeg']:
text = self._extract_from_image(file_path)
elif extension == '.txt':
text = self._extract_from_txt(file_path)
else:
return {'status': 'error', 'message': f'Unsupported file format: {extension}'}
if not text or len(text.strip()) < 50:
return {'status': 'error', 'message': 'No text could be extracted from document'}
logger.info(f"Extracted {len(text)} characters of text")
return self._process_text(text)
except Exception as e:
logger.error(f"Error extracting from {file_path}: {e}")
import traceback
traceback.print_exc()
return {'status': 'error', 'message': str(e)}
def _extract_from_pdf_with_ocr(self, file_path: str) -> str:
"""Extract text from PDF, using OCR if needed."""
text = ""
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text and len(page_text.strip()) > 50:
text += page_text + "\n"
except Exception as e:
logger.warning(f"pdfplumber extraction failed: {e}")
if len(text.strip()) > 100:
logger.info("Extracted text directly from PDF")
return text
logger.info("PDF appears to be image-based, using OCR...")
try:
images = convert_from_path(file_path, dpi=300)
for i, image in enumerate(images):
logger.info(f"OCR on page {i+1}/{len(images)}")
page_text = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
if page_text:
text += page_text + "\n"
except Exception as e:
logger.error(f"OCR failed: {e}")
return ""
return text
def _extract_from_docx(self, file_path: str) -> str:
doc = docx.Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
return text
def _extract_from_image(self, file_path: str) -> str:
image = Image.open(file_path)
text = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
return text
def _extract_from_txt(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
return text
def _process_text(self, text: str) -> Dict:
"""Process extracted text to identify financial line items."""
lines = text.split('\n')
years = self._extract_years(text)
financial_data = []
logger.info(f"Processing {len(lines)} lines of text")
logger.info(f"Years found: {years}")
for line in lines:
line = line.strip()
if not line or len(line) < 5:
continue
for pattern in self.line_item_patterns:
match = re.search(pattern, line)
if match:
item_name = match.group(1).strip()
values_str = match.group(2)
values = self._extract_numbers(values_str)
if values and self._is_financial_item(item_name):
normalized_name = self._normalize_item_name_ai(item_name)
financial_data.append({
'item': normalized_name,
'values': values
})
logger.debug(f"Found: {normalized_name} = {values}")
break
logger.info(f"Extracted {len(financial_data)} financial line items")
if not financial_data:
return {'status': 'error', 'message': 'No financial data found in document'}
df = self._create_dataframe(financial_data, years)
return {
'status': 'success',
'dataframe': df,
'method': 'AI-powered semantic matching with OCR' if self.ai_available else 'Rule-based matching with OCR',
'categories': len(financial_data)
}
def _normalize_item_name_ai(self, item_name: str) -> str:
"""Use AI to normalize item names."""
if not self.ai_available:
return self._normalize_deterministic(item_name)
try:
item_embedding = self.model.encode(item_name)
best_match = None
best_similarity = -1
for standard_name, standard_embedding in self.standard_embeddings.items():
similarity = cosine_similarity(
item_embedding.reshape(1, -1),
standard_embedding.reshape(1, -1)
)[0][0]
if similarity > best_similarity:
best_similarity = similarity
best_match = standard_name
if best_similarity > 0.4: # Lower threshold
return best_match
else:
return self._normalize_deterministic(item_name)
except Exception as e:
logger.warning(f"AI normalization failed: {e}")
return self._normalize_deterministic(item_name)
def _normalize_deterministic(self, item_name: str) -> str:
"""Deterministic fallback normalization."""
item_lower = item_name.lower().strip()
for standard_name, variations in self.standard_items.items():
for variation in variations:
if variation in item_lower or item_lower in variation:
return standard_name
cleaned = ' '.join(item_name.split())
cleaned = cleaned.title()
cleaned = cleaned.rstrip('.:,;-')
return cleaned
def _extract_years(self, text: str) -> List[str]:
years = []
for pattern in self.year_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if len(match) == 2:
year = f"FY {match}"
elif len(match) == 4:
year = f"FY {match[2:]}"
else:
year = f"FY {match}"
if year not in years and len(years) < 10:
years.append(year)
if not years:
years = ['Year 1', 'Year 2', 'Year 3']
return sorted(set(years), reverse=True)[:10]
def _extract_numbers(self, text: str) -> List[float]:
text = text.replace(',', '').replace('(', '-').replace(')', '')
number_pattern = r'-?\d+\.?\d*'
matches = re.findall(number_pattern, text)
values = []
for match in matches:
try:
val = float(match)
if abs(val) > 0.01: # Ignore very small numbers
values.append(val)
except ValueError:
continue
return values
def _is_financial_item(self, item_name: str) -> bool:
item_lower = item_name.lower()
financial_keywords = [
'revenue', 'income', 'sales', 'expense', 'cost', 'profit', 'loss',
'ebitda', 'ebit', 'tax', 'depreciation', 'amortization', 'interest',
'margin', 'cash', 'inventory', 'purchase', 'employee', 'wage', 'salary',
'benefit', 'finance', 'total', 'other', 'operating'
]
if len(item_name) < 3 or item_name[0].isdigit():
return False
return any(keyword in item_lower for keyword in financial_keywords)
def _create_dataframe(self, financial_data: List[Dict], years: List[str]) -> pd.DataFrame:
data = {'Particulars': []}
for year in years:
data[year] = []
for item in financial_data:
data['Particulars'].append(item['item'])
values = item['values']
for i, year in enumerate(years):
if i < len(values):
data[year].append(values[i])
else:
data[year].append(None)
return pd.DataFrame(data)