Spaces:
Build error
Build error
File size: 12,458 Bytes
ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 33cc3ba ee27e09 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | import re
import pandas as pd
import pdfplumber
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import logging
from PIL import Image
import pytesseract
import docx
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from pdf2image import convert_from_path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FinancialStatementExtractor:
"""Extract financial statement data using AI for normalization with deterministic fallback."""
def __init__(self):
try:
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.ai_available = True
logger.info("AI model loaded successfully")
except Exception as e:
logger.warning(f"AI model not available: {e}")
self.model = None
self.ai_available = False
self.standard_items = {
'Revenue From Operations': ['revenue from operations', 'revenue from ops', 'operating revenue', 'sales revenue'],
'Other Income': ['other income', 'other sources', 'other operating revenues', 'miscellaneous income'],
'Total Revenue': ['total income', 'total revenue', 'gross income', 'total revenue from operations'],
'Cost Of Materials Consumed': ['cost of materials consumed', 'cost of goods sold', 'cogs', 'material costs', 'materials consumed'],
'Purchases': ['purchases', 'purchase of products', 'purchases during year', 'stock purchases'],
'Change In Inventory': ['change in inventory', 'changes in inventories', 'inventory changes', 'stock changes'],
'Employee Benefit Expenses': ['employee benefit expenses', 'employee benefits expense', 'salaries wages', 'staff costs', 'personnel expenses'],
'Depreciation': ['depreciation', 'depreciation amortization', 'depreciation and amortisation'],
'Finance Costs': ['finance costs', 'interest expense', 'borrowing costs', 'interest paid'],
'Other Expenses': ['other expenses', 'administrative expenses', 'operating expenses'],
'Gross Profit': ['gross profit', 'gross margin'],
'EBITDA': ['ebitda', 'earnings before interest tax depreciation'],
'EBIT': ['ebit', 'operating profit', 'earnings before interest tax'],
'Profit Before Tax': ['profit before tax', 'pbt', 'pre-tax profit', 'profit before exceptional items and tax'],
'Tax Expense': ['tax expense', 'income tax', 'taxation', 'current tax', 'deferred tax'],
'Profit After Tax': ['profit after tax', 'pat', 'net profit', 'net income', 'profit for the year']
}
if self.ai_available:
self.standard_embeddings = {}
for standard_name, variations in self.standard_items.items():
all_texts = [standard_name] + variations
embeddings = self.model.encode(all_texts)
self.standard_embeddings[standard_name] = np.mean(embeddings, axis=0)
self.line_item_patterns = [
r'^([A-Za-z][A-Za-z\s,\(\)\-&/]+?)\s+([\d,\.\-\(\)]+(?:\s+[\d,\.\-\(\)]+)*)', # Line item with numbers
r'([A-Za-z][A-Za-z\s,\(\)\-&/]{3,}?)[\s:]+([0-9,\.\-\(\)]+)', # With separator
]
self.year_patterns = [
r'FY[\s]?(\d{2,4})',
r'March\s+31,?\s*(\d{4})',
r'20(\d{2})',
r'Year\s+ended.*?(\d{4})',
]
def extract_from_file(self, file_path: str) -> Dict:
"""Extract financial data from file."""
path = Path(file_path)
extension = path.suffix.lower()
try:
if extension == '.pdf':
text = self._extract_from_pdf_with_ocr(file_path)
elif extension in ['.docx', '.doc']:
text = self._extract_from_docx(file_path)
elif extension in ['.png', '.jpg', '.jpeg']:
text = self._extract_from_image(file_path)
elif extension == '.txt':
text = self._extract_from_txt(file_path)
else:
return {'status': 'error', 'message': f'Unsupported file format: {extension}'}
if not text or len(text.strip()) < 50:
return {'status': 'error', 'message': 'No text could be extracted from document'}
logger.info(f"Extracted {len(text)} characters of text")
return self._process_text(text)
except Exception as e:
logger.error(f"Error extracting from {file_path}: {e}")
import traceback
traceback.print_exc()
return {'status': 'error', 'message': str(e)}
def _extract_from_pdf_with_ocr(self, file_path: str) -> str:
"""Extract text from PDF, using OCR if needed."""
text = ""
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text and len(page_text.strip()) > 50:
text += page_text + "\n"
except Exception as e:
logger.warning(f"pdfplumber extraction failed: {e}")
if len(text.strip()) > 100:
logger.info("Extracted text directly from PDF")
return text
logger.info("PDF appears to be image-based, using OCR...")
try:
images = convert_from_path(file_path, dpi=300)
for i, image in enumerate(images):
logger.info(f"OCR on page {i+1}/{len(images)}")
page_text = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
if page_text:
text += page_text + "\n"
except Exception as e:
logger.error(f"OCR failed: {e}")
return ""
return text
def _extract_from_docx(self, file_path: str) -> str:
doc = docx.Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
return text
def _extract_from_image(self, file_path: str) -> str:
image = Image.open(file_path)
text = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
return text
def _extract_from_txt(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
return text
def _process_text(self, text: str) -> Dict:
"""Process extracted text to identify financial line items."""
lines = text.split('\n')
years = self._extract_years(text)
financial_data = []
logger.info(f"Processing {len(lines)} lines of text")
logger.info(f"Years found: {years}")
for line in lines:
line = line.strip()
if not line or len(line) < 5:
continue
for pattern in self.line_item_patterns:
match = re.search(pattern, line)
if match:
item_name = match.group(1).strip()
values_str = match.group(2)
values = self._extract_numbers(values_str)
if values and self._is_financial_item(item_name):
normalized_name = self._normalize_item_name_ai(item_name)
financial_data.append({
'item': normalized_name,
'values': values
})
logger.debug(f"Found: {normalized_name} = {values}")
break
logger.info(f"Extracted {len(financial_data)} financial line items")
if not financial_data:
return {'status': 'error', 'message': 'No financial data found in document'}
df = self._create_dataframe(financial_data, years)
return {
'status': 'success',
'dataframe': df,
'method': 'AI-powered semantic matching with OCR' if self.ai_available else 'Rule-based matching with OCR',
'categories': len(financial_data)
}
def _normalize_item_name_ai(self, item_name: str) -> str:
"""Use AI to normalize item names."""
if not self.ai_available:
return self._normalize_deterministic(item_name)
try:
item_embedding = self.model.encode(item_name)
best_match = None
best_similarity = -1
for standard_name, standard_embedding in self.standard_embeddings.items():
similarity = cosine_similarity(
item_embedding.reshape(1, -1),
standard_embedding.reshape(1, -1)
)[0][0]
if similarity > best_similarity:
best_similarity = similarity
best_match = standard_name
if best_similarity > 0.4: # Lower threshold
return best_match
else:
return self._normalize_deterministic(item_name)
except Exception as e:
logger.warning(f"AI normalization failed: {e}")
return self._normalize_deterministic(item_name)
def _normalize_deterministic(self, item_name: str) -> str:
"""Deterministic fallback normalization."""
item_lower = item_name.lower().strip()
for standard_name, variations in self.standard_items.items():
for variation in variations:
if variation in item_lower or item_lower in variation:
return standard_name
cleaned = ' '.join(item_name.split())
cleaned = cleaned.title()
cleaned = cleaned.rstrip('.:,;-')
return cleaned
def _extract_years(self, text: str) -> List[str]:
years = []
for pattern in self.year_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if len(match) == 2:
year = f"FY {match}"
elif len(match) == 4:
year = f"FY {match[2:]}"
else:
year = f"FY {match}"
if year not in years and len(years) < 10:
years.append(year)
if not years:
years = ['Year 1', 'Year 2', 'Year 3']
return sorted(set(years), reverse=True)[:10]
def _extract_numbers(self, text: str) -> List[float]:
text = text.replace(',', '').replace('(', '-').replace(')', '')
number_pattern = r'-?\d+\.?\d*'
matches = re.findall(number_pattern, text)
values = []
for match in matches:
try:
val = float(match)
if abs(val) > 0.01: # Ignore very small numbers
values.append(val)
except ValueError:
continue
return values
def _is_financial_item(self, item_name: str) -> bool:
item_lower = item_name.lower()
financial_keywords = [
'revenue', 'income', 'sales', 'expense', 'cost', 'profit', 'loss',
'ebitda', 'ebit', 'tax', 'depreciation', 'amortization', 'interest',
'margin', 'cash', 'inventory', 'purchase', 'employee', 'wage', 'salary',
'benefit', 'finance', 'total', 'other', 'operating'
]
if len(item_name) < 3 or item_name[0].isdigit():
return False
return any(keyword in item_lower for keyword in financial_keywords)
def _create_dataframe(self, financial_data: List[Dict], years: List[str]) -> pd.DataFrame:
data = {'Particulars': []}
for year in years:
data[year] = []
for item in financial_data:
data['Particulars'].append(item['item'])
values = item['values']
for i, year in enumerate(years):
if i < len(values):
data[year].append(values[i])
else:
data[year].append(None)
return pd.DataFrame(data) |