Ranjit Behera
FinEE v1.0 - Finance Entity Extractor
dcc24f8
"""
FinEE Extractor - Main orchestrator for the extraction pipeline.
Implements the 5-tier additive extraction pipeline:
- Tier 0: Hash Cache
- Tier 1: Regex Engine
- Tier 2: Rule-Based Mapping
- Tier 3: LLM (targeted extraction)
- Tier 4: Validation + Normalization
"""
import time
import logging
from typing import Optional, List, Dict, Any
from .schema import (
ExtractionResult, ExtractionConfig, TransactionType,
Category, Confidence, ExtractionSource, FieldMeta
)
from .cache import LRUCache, get_cache
from .regex_engine import RegexEngine, get_regex_engine
from .merchants import get_merchant_and_category
from .normalizer import normalize_amount, normalize_date, normalize_vpa
from .validator import repair_llm_json, validate_extraction_result
from .confidence import update_result_confidence, should_use_llm
from .prompt import get_targeted_prompt, get_full_extraction_prompt, parse_targeted_response
from .backends import get_backend, get_available_backends, BaseBackend
logger = logging.getLogger(__name__)
class FinEE:
"""
Finance Entity Extractor - Main extraction class.
Orchestrates the 5-tier additive extraction pipeline with graceful degradation.
Always returns a result, never crashes.
"""
def __init__(self, config: Optional[ExtractionConfig] = None):
"""
Initialize the extractor.
Args:
config: Extraction configuration (uses defaults if None)
"""
self.config = config or ExtractionConfig()
# Initialize components
self._cache: Optional[LRUCache] = None
self._regex_engine: Optional[RegexEngine] = None
self._backend: Optional[BaseBackend] = None
self._backend_loaded = False
# Initialize cache if enabled
if self.config.cache_enabled:
self._cache = get_cache(self.config.cache_max_size)
# Initialize regex engine
self._regex_engine = get_regex_engine()
def _lazy_load_backend(self) -> bool:
"""
Lazy load LLM backend.
Returns:
True if backend is available
"""
if self._backend_loaded:
return self._backend is not None
self._backend_loaded = True
if not self.config.use_llm:
return False
try:
self._backend = get_backend(model_id=self.config.model_id)
if self._backend:
logger.info(f"Backend loaded: {self._backend.name}")
return True
except Exception as e:
logger.warning(f"Failed to load LLM backend: {e}")
return False
def extract(self, text: str) -> ExtractionResult:
"""
Extract financial entities from text.
This is the main entry point. It runs the full 5-tier pipeline
with graceful degradation.
Args:
text: Transaction text (bank SMS, email, etc.)
Returns:
ExtractionResult with extracted entities
"""
start_time = time.time()
# Tier 0: Cache Check
if self._cache:
cached = self._cache.get(text)
if cached:
cached.processing_time_ms = (time.time() - start_time) * 1000
return cached
# Tier 1: Regex Extraction
result = self._tier1_regex(text)
# Tier 2: Rule-Based Mapping
result = self._tier2_rules(result)
# Tier 3: LLM (if needed and available)
missing_fields = result.get_missing_fields(
self.config.required_fields,
self.config.desired_fields
)
if missing_fields and self.config.use_llm:
result = self._tier3_llm(text, result, missing_fields)
# Tier 4: Validation + Normalization
result = self._tier4_validate(result)
# Calculate processing time
result.processing_time_ms = (time.time() - start_time) * 1000
# Store in cache
if self._cache and result.is_complete():
self._cache.set(text, result)
return result
def _tier1_regex(self, text: str) -> ExtractionResult:
"""
Tier 1: Extract entities using regex patterns.
Args:
text: Input text
Returns:
ExtractionResult with regex-extracted fields
"""
try:
result = self._regex_engine.extract(text)
result.raw_input = text
return result
except Exception as e:
logger.warning(f"Tier 1 (regex) failed: {e}")
return ExtractionResult(raw_input=text)
def _tier2_rules(self, result: ExtractionResult) -> ExtractionResult:
"""
Tier 2: Enrich with rule-based mappings.
Args:
result: Current extraction result
Returns:
Enriched ExtractionResult
"""
try:
# Get merchant and category from VPA
merchant, category = get_merchant_and_category(
vpa=result.vpa,
text=result.raw_input
)
if merchant and not result.merchant:
result.merchant = merchant
result.meta['merchant'] = FieldMeta(
source=ExtractionSource.RULES,
confidence=0.85
)
if category and not result.category:
result.category = Category(category) if category in [c.value for c in Category] else Category.OTHER
result.meta['category'] = FieldMeta(
source=ExtractionSource.RULES,
confidence=0.80
)
return result
except Exception as e:
logger.warning(f"Tier 2 (rules) failed: {e}")
return result
def _tier3_llm(self, text: str, result: ExtractionResult,
missing_fields: List[str]) -> ExtractionResult:
"""
Tier 3: Fill missing fields using LLM.
Uses targeted prompts for specific fields rather than full extraction.
Args:
text: Original input text
result: Current extraction result
missing_fields: Fields to extract with LLM
Returns:
Updated ExtractionResult
"""
if not self._lazy_load_backend():
logger.debug("No LLM backend available, skipping Tier 3")
return result
try:
# Load model if not already loaded
if not self._backend.is_loaded:
self._backend.load_model(self.config.model_path)
# Use targeted prompts for specific fields
for field in missing_fields:
if field in ['merchant', 'category', 'date', 'reference']:
value = self._extract_single_field(text, field)
if value:
self._set_field(result, field, value, ExtractionSource.LLM)
# If still missing critical fields, try full extraction
still_missing = result.get_missing_fields(self.config.required_fields, [])
if still_missing:
llm_result = self._full_llm_extraction(text)
if llm_result:
result.merge(llm_result, overwrite=False)
return result
except Exception as e:
logger.warning(f"Tier 3 (LLM) failed: {e}")
return result
def _extract_single_field(self, text: str, field: str) -> Optional[str]:
"""Extract a single field using targeted prompt."""
try:
prompt = get_targeted_prompt(field, text)
response = self._backend.generate(
prompt,
max_tokens=50,
temperature=self.config.llm_temperature
)
return parse_targeted_response(field, response)
except Exception as e:
logger.debug(f"Single field extraction failed for {field}: {e}")
return None
def _full_llm_extraction(self, text: str) -> Optional[ExtractionResult]:
"""Run full LLM extraction as fallback."""
try:
prompt = get_full_extraction_prompt(text)
response = self._backend.generate(
prompt,
max_tokens=self.config.llm_max_tokens,
temperature=self.config.llm_temperature
)
parsed = repair_llm_json(response)
if parsed:
result = validate_extraction_result(parsed)
result.raw_llm_output = response
# Mark all fields as LLM-sourced
for field in ['amount', 'type', 'date', 'account', 'reference',
'vpa', 'merchant', 'category']:
if getattr(result, field, None) is not None:
result.meta[field] = FieldMeta(
source=ExtractionSource.LLM,
confidence=0.70
)
return result
except Exception as e:
logger.debug(f"Full LLM extraction failed: {e}")
return None
def _set_field(self, result: ExtractionResult, field: str,
value: Any, source: ExtractionSource) -> None:
"""Set a field on the result with metadata."""
if field == 'category':
try:
value = Category(value.lower())
except (ValueError, AttributeError):
value = Category.OTHER
elif field == 'date':
value = normalize_date(value)
setattr(result, field, value)
result.meta[field] = FieldMeta(
source=source,
confidence=0.70 if source == ExtractionSource.LLM else 0.85,
raw_value=str(value)
)
def _tier4_validate(self, result: ExtractionResult) -> ExtractionResult:
"""
Tier 4: Validate and normalize all fields.
Args:
result: Extraction result to validate
Returns:
Validated and normalized result
"""
try:
# Normalize amount
if result.amount is not None:
result.amount = normalize_amount(result.amount)
# Normalize date
if result.date:
result.date = normalize_date(result.date)
# Normalize VPA
if result.vpa:
result.vpa = normalize_vpa(result.vpa)
# Update confidence
result = update_result_confidence(
result,
self.config.high_confidence_threshold,
self.config.medium_confidence_threshold
)
return result
except Exception as e:
logger.warning(f"Tier 4 (validation) failed: {e}")
result.confidence = Confidence.LOW
return result
def extract_batch(self, texts: List[str]) -> List[ExtractionResult]:
"""
Extract entities from multiple texts.
Args:
texts: List of transaction texts
Returns:
List of ExtractionResults
"""
return [self.extract(text) for text in texts]
def get_stats(self) -> Dict[str, Any]:
"""Get extraction statistics."""
stats = {
'cache_enabled': self.config.cache_enabled,
'llm_enabled': self.config.use_llm,
'available_backends': get_available_backends(),
'active_backend': self._backend.name if self._backend else None,
}
if self._cache:
cache_stats = self._cache.get_stats()
stats['cache'] = cache_stats.to_dict()
return stats
# Module-level singleton
_extractor: Optional[FinEE] = None
def get_extractor(config: Optional[ExtractionConfig] = None) -> FinEE:
"""Get or create the global extractor instance."""
global _extractor
if _extractor is None or config is not None:
_extractor = FinEE(config)
return _extractor
def extract(text: str) -> ExtractionResult:
"""
Extract financial entities from text.
Convenience function that uses the global extractor.
Args:
text: Transaction text
Returns:
ExtractionResult
"""
return get_extractor().extract(text)