""" Entity extraction for financial and crypto-specific entities. Uses FinBERT for financial NER and custom regex for crypto entities. """ import re import logging from typing import Dict, List, Optional, Tuple from datetime import datetime import os import requests logger = logging.getLogger(__name__) class FinancialEntityExtractor: """ Extract financial entities using FinBERT via HuggingFace Inference API. Identifies: amounts, dates, institutions, transaction types, etc. Uses API instead of downloading models to save storage. """ def __init__(self, model_name: str = "ProsusAI/finbert"): """ Initialize FinBERT entity extractor with Inference API. Args: model_name: HuggingFace model name """ self.model_name = model_name self.hf_token = os.getenv("HF_TOKEN") self.api_url = f"https://api-inference.huggingface.co/models/{model_name}" self._initialized = False logger.info(f"FinancialEntityExtractor initialized with model: {model_name} (using Inference API)") def _lazy_load(self): """Lazy initialization check.""" if self._initialized: return try: # Test API availability (lightweight check) if self.hf_token: logger.info(f"HF Inference API configured for: {self.model_name}") else: logger.warning("HF_TOKEN not set - FinBERT API calls may be rate-limited") self._initialized = True logger.info("FinBERT Inference API ready") except Exception as e: logger.error(f"Error initializing FinBERT API: {e}") logger.warning("Falling back to regex-only extraction") self._initialized = True def _call_finbert_api(self, text: str) -> Optional[Dict]: """ Call FinBERT sentiment API for financial context. Args: text: Input text Returns: Sentiment scores or None if error """ if not self.hf_token: return None try: headers = {"Authorization": f"Bearer {self.hf_token}"} response = requests.post( self.api_url, headers=headers, json={"inputs": text[:512]}, # Limit to 512 chars timeout=5 ) if response.status_code == 200: return response.json() else: logger.warning(f"FinBERT API returned {response.status_code}") return None except Exception as e: logger.debug(f"FinBERT API call failed: {e}") return None def extract_amounts(self, text: str) -> List[Dict]: """ Extract monetary amounts from text. Args: text: Input text Returns: List of dictionaries with amount info """ amounts = [] # Pattern for amounts: $X, €X, £X, ¥X, XM, XB, etc. patterns = [ # Currency symbols with numbers r'[\$€£¥]\s*(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(million|billion|thousand|M|B|K)?', # Numbers with currency words r'(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(USD|EUR|GBP|JPY|dollars|euros|pounds)', # Percentages r'(\d+(?:\.\d+)?)\s*%', # Large numbers with abbreviations r'(\d+(?:\.\d+)?)\s*(million|billion|trillion|M|B|T)', ] for pattern in patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: amount_str = match.group(0) context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] amounts.append({ 'text': amount_str, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'amount' }) logger.debug(f"Extracted {len(amounts)} amounts from text") return amounts def extract_dates(self, text: str) -> List[Dict]: """ Extract dates and time references. Args: text: Input text Returns: List of date entities """ dates = [] # Date patterns patterns = [ # YYYY-MM-DD r'\d{4}-\d{2}-\d{2}', # DD/MM/YYYY or MM/DD/YYYY r'\d{1,2}/\d{1,2}/\d{4}', # Month Day, Year r'(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}', # Q1 2024, Q2 2025, etc. r'Q[1-4]\s+\d{4}', # Fiscal year references r'FY\s*\d{4}', # Relative dates r'(within|by|before|after)\s+\d+\s+(days|weeks|months|years)', ] for pattern in patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: date_str = match.group(0) context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] dates.append({ 'text': date_str, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'date' }) logger.debug(f"Extracted {len(dates)} dates from text") return dates def extract_institutions(self, text: str) -> List[Dict]: """ Extract financial institutions and regulatory agencies. Args: text: Input text Returns: List of institution entities """ institutions = [] # Known institutions and agencies known_entities = [ # US 'SEC', 'Securities and Exchange Commission', 'FinCEN', 'CFTC', 'Federal Reserve', 'OCC', 'FDIC', 'Treasury Department', # EU 'ESMA', 'EBA', 'European Central Bank', 'ECB', # Singapore 'MAS', 'Monetary Authority of Singapore', # UK 'FCA', 'Financial Conduct Authority', 'Bank of England', # UAE 'VARA', 'Virtual Asset Regulatory Authority', # Exchanges 'Coinbase', 'Binance', 'Kraken', 'Gemini', 'Bitstamp', # Banks 'JPMorgan', 'Goldman Sachs', 'Morgan Stanley', 'Citibank', ] for entity in known_entities: # Case-insensitive search pattern = r'\b' + re.escape(entity) + r'\b' matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] institutions.append({ 'text': match.group(0), 'normalized': entity, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'institution' }) logger.debug(f"Extracted {len(institutions)} institutions from text") return institutions def extract_all(self, text: str) -> Dict[str, List[Dict]]: """ Extract all financial entities from text. Args: text: Input text Returns: Dictionary of entity lists by type """ self._lazy_load() return { 'amounts': self.extract_amounts(text), 'dates': self.extract_dates(text), 'institutions': self.extract_institutions(text) } class CryptoEntityExtractor: """ Extract crypto-specific entities: tokens, addresses, protocols, etc. """ def __init__(self): """Initialize crypto entity extractor.""" logger.info("CryptoEntityExtractor initialized") def extract_tokens(self, text: str) -> List[Dict]: """ Extract cryptocurrency tokens and coin names. Args: text: Input text Returns: List of token entities """ tokens = [] # Known cryptocurrencies known_tokens = [ # Major coins ('Bitcoin', 'BTC'), ('Ethereum', 'ETH'), ('Ripple', 'XRP'), ('Cardano', 'ADA'), ('Solana', 'SOL'), ('Polkadot', 'DOT'), ('Avalanche', 'AVAX'), ('Polygon', 'MATIC'), ('Chainlink', 'LINK'), # Stablecoins ('Tether', 'USDT'), ('USD Coin', 'USDC'), ('Dai', 'DAI'), ('Binance USD', 'BUSD'), ('TrueUSD', 'TUSD'), # DeFi tokens ('Uniswap', 'UNI'), ('Aave', 'AAVE'), ('Compound', 'COMP'), ('Maker', 'MKR'), ('Curve', 'CRV'), ] for full_name, symbol in known_tokens: # Search for both full name and symbol for term in [full_name, symbol]: pattern = r'\b' + re.escape(term) + r'\b' matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] tokens.append({ 'text': match.group(0), 'name': full_name, 'symbol': symbol, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'token' }) # Generic token pattern: words ending in "coin" or "token" generic_pattern = r'\b([A-Z][a-z]+(?:coin|token|Token|Coin))\b' matches = re.finditer(generic_pattern, text) for match in matches: token_name = match.group(1) # Skip if already captured if any(t['text'].lower() == token_name.lower() for t in tokens): continue context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] tokens.append({ 'text': token_name, 'name': token_name, 'symbol': 'UNKNOWN', 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'token' }) logger.debug(f"Extracted {len(tokens)} tokens from text") return tokens def extract_addresses(self, text: str) -> List[Dict]: """ Extract cryptocurrency wallet addresses. Args: text: Input text Returns: List of address entities """ addresses = [] # Ethereum-style addresses (0x followed by 40 hex chars) eth_pattern = r'\b0x[a-fA-F0-9]{40}\b' matches = re.finditer(eth_pattern, text) for match in matches: addresses.append({ 'text': match.group(0), 'blockchain': 'ethereum', 'start': match.start(), 'end': match.end(), 'type': 'address' }) # Bitcoin-style addresses (starts with 1, 3, or bc1) btc_patterns = [ r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', # Legacy/P2SH r'\bbc1[a-z0-9]{39,59}\b' # Bech32 ] for pattern in btc_patterns: matches = re.finditer(pattern, text) for match in matches: addresses.append({ 'text': match.group(0), 'blockchain': 'bitcoin', 'start': match.start(), 'end': match.end(), 'type': 'address' }) logger.debug(f"Extracted {len(addresses)} addresses from text") return addresses def extract_protocols(self, text: str) -> List[Dict]: """ Extract DeFi protocols and blockchain platforms. Args: text: Input text Returns: List of protocol entities """ protocols = [] known_protocols = [ # DeFi protocols 'Uniswap', 'SushiSwap', 'PancakeSwap', 'Aave', 'Compound', 'MakerDAO', 'Curve Finance', 'Balancer', 'Yearn Finance', # Layer 1 blockchains 'Ethereum', 'Bitcoin', 'Solana', 'Cardano', 'Avalanche', 'Polkadot', 'Cosmos', 'Algorand', 'Near Protocol', # Layer 2s 'Polygon', 'Arbitrum', 'Optimism', 'zkSync', 'StarkNet', # Other 'IPFS', 'Chainlink', 'The Graph', 'Filecoin', ] for protocol in known_protocols: pattern = r'\b' + re.escape(protocol) + r'\b' matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] protocols.append({ 'text': match.group(0), 'name': protocol, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'protocol' }) logger.debug(f"Extracted {len(protocols)} protocols from text") return protocols def extract_activities(self, text: str) -> List[Dict]: """ Extract crypto activity mentions (staking, lending, etc.). Args: text: Input text Returns: List of activity entities """ activities = [] activity_keywords = [ 'staking', 'lending', 'borrowing', 'yield farming', 'liquidity mining', 'trading', 'swapping', 'mining', 'minting', 'burning', 'governance', 'voting', 'delegation', 'custody', 'custodial', 'non-custodial', 'DeFi', 'NFT', 'token sale', 'ICO', 'IEO', 'airdrop', 'fork', 'bridge', 'cross-chain' ] for activity in activity_keywords: pattern = r'\b' + re.escape(activity) + r'\b' matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) context = text[context_start:context_end] activities.append({ 'text': match.group(0), 'activity': activity, 'start': match.start(), 'end': match.end(), 'context': context.strip(), 'type': 'activity' }) logger.debug(f"Extracted {len(activities)} activities from text") return activities def extract_all(self, text: str) -> Dict[str, List[Dict]]: """ Extract all crypto entities from text. Args: text: Input text Returns: Dictionary of entity lists by type """ return { 'tokens': self.extract_tokens(text), 'addresses': self.extract_addresses(text), 'protocols': self.extract_protocols(text), 'activities': self.extract_activities(text) } class EntityExtractor: """ Combined entity extractor for both financial and crypto entities. """ def __init__(self): """Initialize combined entity extractor.""" self.financial_extractor = FinancialEntityExtractor() self.crypto_extractor = CryptoEntityExtractor() logger.info("EntityExtractor initialized") def extract_all_entities(self, text: str) -> Dict: """ Extract all entities (financial + crypto) from text. Args: text: Input text Returns: Dictionary containing all extracted entities """ if not text: return {'financial': {}, 'crypto': {}, 'summary': {}} # Extract financial entities financial_entities = self.financial_extractor.extract_all(text) # Extract crypto entities crypto_entities = self.crypto_extractor.extract_all(text) # Summary statistics summary = { 'total_entities': ( sum(len(v) for v in financial_entities.values()) + sum(len(v) for v in crypto_entities.values()) ), 'financial_count': sum(len(v) for v in financial_entities.values()), 'crypto_count': sum(len(v) for v in crypto_entities.values()), 'has_amounts': len(financial_entities.get('amounts', [])) > 0, 'has_dates': len(financial_entities.get('dates', [])) > 0, 'has_tokens': len(crypto_entities.get('tokens', [])) > 0, 'has_addresses': len(crypto_entities.get('addresses', [])) > 0, } result = { 'financial': financial_entities, 'crypto': crypto_entities, 'summary': summary, 'extracted_at': datetime.now().isoformat() } logger.info( f"Extracted {summary['total_entities']} entities " f"({summary['financial_count']} financial, {summary['crypto_count']} crypto)" ) return result # Convenience function def extract_entities(text: str) -> Dict: """ Quick extract all entities from text. Args: text: Input text Returns: Dictionary of extracted entities """ extractor = EntityExtractor() return extractor.extract_all_entities(text) if __name__ == "__main__": # Example usage sample_text = """ The SEC announced new crypto custody rules on January 15, 2024. Exchanges handling over $10 million in Bitcoin (BTC) and Ethereum (ETH) must register by Q3 2024. Staking services and DeFi protocols like Uniswap may face additional scrutiny. Coinbase and Binance have 90 days to comply with the new requirements. """ entities = extract_entities(sample_text) print("\n=== Financial Entities ===") for entity_type, items in entities['financial'].items(): print(f"\n{entity_type.upper()}: {len(items)}") for item in items[:3]: # Show first 3 print(f" - {item['text']}") print("\n=== Crypto Entities ===") for entity_type, items in entities['crypto'].items(): print(f"\n{entity_type.upper()}: {len(items)}") for item in items[:3]: print(f" - {item.get('text', item.get('name', 'N/A'))}") print(f"\n=== Summary ===") print(f"Total entities: {entities['summary']['total_entities']}")