crypto-compliance-agent / src /processors /entity_extraction.py
arjitmat's picture
Upload 72 files
13e7acd verified
"""
Entity extraction for financial and crypto-specific entities.
Uses FinBERT for financial NER and custom regex for crypto entities.
"""
import re
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import os
import requests
logger = logging.getLogger(__name__)
class FinancialEntityExtractor:
"""
Extract financial entities using FinBERT via HuggingFace Inference API.
Identifies: amounts, dates, institutions, transaction types, etc.
Uses API instead of downloading models to save storage.
"""
def __init__(self, model_name: str = "ProsusAI/finbert"):
"""
Initialize FinBERT entity extractor with Inference API.
Args:
model_name: HuggingFace model name
"""
self.model_name = model_name
self.hf_token = os.getenv("HF_TOKEN")
self.api_url = f"https://api-inference.huggingface.co/models/{model_name}"
self._initialized = False
logger.info(f"FinancialEntityExtractor initialized with model: {model_name} (using Inference API)")
def _lazy_load(self):
"""Lazy initialization check."""
if self._initialized:
return
try:
# Test API availability (lightweight check)
if self.hf_token:
logger.info(f"HF Inference API configured for: {self.model_name}")
else:
logger.warning("HF_TOKEN not set - FinBERT API calls may be rate-limited")
self._initialized = True
logger.info("FinBERT Inference API ready")
except Exception as e:
logger.error(f"Error initializing FinBERT API: {e}")
logger.warning("Falling back to regex-only extraction")
self._initialized = True
def _call_finbert_api(self, text: str) -> Optional[Dict]:
"""
Call FinBERT sentiment API for financial context.
Args:
text: Input text
Returns:
Sentiment scores or None if error
"""
if not self.hf_token:
return None
try:
headers = {"Authorization": f"Bearer {self.hf_token}"}
response = requests.post(
self.api_url,
headers=headers,
json={"inputs": text[:512]}, # Limit to 512 chars
timeout=5
)
if response.status_code == 200:
return response.json()
else:
logger.warning(f"FinBERT API returned {response.status_code}")
return None
except Exception as e:
logger.debug(f"FinBERT API call failed: {e}")
return None
def extract_amounts(self, text: str) -> List[Dict]:
"""
Extract monetary amounts from text.
Args:
text: Input text
Returns:
List of dictionaries with amount info
"""
amounts = []
# Pattern for amounts: $X, €X, £X, ¥X, XM, XB, etc.
patterns = [
# Currency symbols with numbers
r'[\$€£¥]\s*(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(million|billion|thousand|M|B|K)?',
# Numbers with currency words
r'(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(USD|EUR|GBP|JPY|dollars|euros|pounds)',
# Percentages
r'(\d+(?:\.\d+)?)\s*%',
# Large numbers with abbreviations
r'(\d+(?:\.\d+)?)\s*(million|billion|trillion|M|B|T)',
]
for pattern in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
amount_str = match.group(0)
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
amounts.append({
'text': amount_str,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'amount'
})
logger.debug(f"Extracted {len(amounts)} amounts from text")
return amounts
def extract_dates(self, text: str) -> List[Dict]:
"""
Extract dates and time references.
Args:
text: Input text
Returns:
List of date entities
"""
dates = []
# Date patterns
patterns = [
# YYYY-MM-DD
r'\d{4}-\d{2}-\d{2}',
# DD/MM/YYYY or MM/DD/YYYY
r'\d{1,2}/\d{1,2}/\d{4}',
# Month Day, Year
r'(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}',
# Q1 2024, Q2 2025, etc.
r'Q[1-4]\s+\d{4}',
# Fiscal year references
r'FY\s*\d{4}',
# Relative dates
r'(within|by|before|after)\s+\d+\s+(days|weeks|months|years)',
]
for pattern in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
date_str = match.group(0)
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
dates.append({
'text': date_str,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'date'
})
logger.debug(f"Extracted {len(dates)} dates from text")
return dates
def extract_institutions(self, text: str) -> List[Dict]:
"""
Extract financial institutions and regulatory agencies.
Args:
text: Input text
Returns:
List of institution entities
"""
institutions = []
# Known institutions and agencies
known_entities = [
# US
'SEC', 'Securities and Exchange Commission', 'FinCEN', 'CFTC',
'Federal Reserve', 'OCC', 'FDIC', 'Treasury Department',
# EU
'ESMA', 'EBA', 'European Central Bank', 'ECB',
# Singapore
'MAS', 'Monetary Authority of Singapore',
# UK
'FCA', 'Financial Conduct Authority', 'Bank of England',
# UAE
'VARA', 'Virtual Asset Regulatory Authority',
# Exchanges
'Coinbase', 'Binance', 'Kraken', 'Gemini', 'Bitstamp',
# Banks
'JPMorgan', 'Goldman Sachs', 'Morgan Stanley', 'Citibank',
]
for entity in known_entities:
# Case-insensitive search
pattern = r'\b' + re.escape(entity) + r'\b'
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
institutions.append({
'text': match.group(0),
'normalized': entity,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'institution'
})
logger.debug(f"Extracted {len(institutions)} institutions from text")
return institutions
def extract_all(self, text: str) -> Dict[str, List[Dict]]:
"""
Extract all financial entities from text.
Args:
text: Input text
Returns:
Dictionary of entity lists by type
"""
self._lazy_load()
return {
'amounts': self.extract_amounts(text),
'dates': self.extract_dates(text),
'institutions': self.extract_institutions(text)
}
class CryptoEntityExtractor:
"""
Extract crypto-specific entities: tokens, addresses, protocols, etc.
"""
def __init__(self):
"""Initialize crypto entity extractor."""
logger.info("CryptoEntityExtractor initialized")
def extract_tokens(self, text: str) -> List[Dict]:
"""
Extract cryptocurrency tokens and coin names.
Args:
text: Input text
Returns:
List of token entities
"""
tokens = []
# Known cryptocurrencies
known_tokens = [
# Major coins
('Bitcoin', 'BTC'), ('Ethereum', 'ETH'), ('Ripple', 'XRP'),
('Cardano', 'ADA'), ('Solana', 'SOL'), ('Polkadot', 'DOT'),
('Avalanche', 'AVAX'), ('Polygon', 'MATIC'), ('Chainlink', 'LINK'),
# Stablecoins
('Tether', 'USDT'), ('USD Coin', 'USDC'), ('Dai', 'DAI'),
('Binance USD', 'BUSD'), ('TrueUSD', 'TUSD'),
# DeFi tokens
('Uniswap', 'UNI'), ('Aave', 'AAVE'), ('Compound', 'COMP'),
('Maker', 'MKR'), ('Curve', 'CRV'),
]
for full_name, symbol in known_tokens:
# Search for both full name and symbol
for term in [full_name, symbol]:
pattern = r'\b' + re.escape(term) + r'\b'
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
tokens.append({
'text': match.group(0),
'name': full_name,
'symbol': symbol,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'token'
})
# Generic token pattern: words ending in "coin" or "token"
generic_pattern = r'\b([A-Z][a-z]+(?:coin|token|Token|Coin))\b'
matches = re.finditer(generic_pattern, text)
for match in matches:
token_name = match.group(1)
# Skip if already captured
if any(t['text'].lower() == token_name.lower() for t in tokens):
continue
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
tokens.append({
'text': token_name,
'name': token_name,
'symbol': 'UNKNOWN',
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'token'
})
logger.debug(f"Extracted {len(tokens)} tokens from text")
return tokens
def extract_addresses(self, text: str) -> List[Dict]:
"""
Extract cryptocurrency wallet addresses.
Args:
text: Input text
Returns:
List of address entities
"""
addresses = []
# Ethereum-style addresses (0x followed by 40 hex chars)
eth_pattern = r'\b0x[a-fA-F0-9]{40}\b'
matches = re.finditer(eth_pattern, text)
for match in matches:
addresses.append({
'text': match.group(0),
'blockchain': 'ethereum',
'start': match.start(),
'end': match.end(),
'type': 'address'
})
# Bitcoin-style addresses (starts with 1, 3, or bc1)
btc_patterns = [
r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', # Legacy/P2SH
r'\bbc1[a-z0-9]{39,59}\b' # Bech32
]
for pattern in btc_patterns:
matches = re.finditer(pattern, text)
for match in matches:
addresses.append({
'text': match.group(0),
'blockchain': 'bitcoin',
'start': match.start(),
'end': match.end(),
'type': 'address'
})
logger.debug(f"Extracted {len(addresses)} addresses from text")
return addresses
def extract_protocols(self, text: str) -> List[Dict]:
"""
Extract DeFi protocols and blockchain platforms.
Args:
text: Input text
Returns:
List of protocol entities
"""
protocols = []
known_protocols = [
# DeFi protocols
'Uniswap', 'SushiSwap', 'PancakeSwap', 'Aave', 'Compound',
'MakerDAO', 'Curve Finance', 'Balancer', 'Yearn Finance',
# Layer 1 blockchains
'Ethereum', 'Bitcoin', 'Solana', 'Cardano', 'Avalanche',
'Polkadot', 'Cosmos', 'Algorand', 'Near Protocol',
# Layer 2s
'Polygon', 'Arbitrum', 'Optimism', 'zkSync', 'StarkNet',
# Other
'IPFS', 'Chainlink', 'The Graph', 'Filecoin',
]
for protocol in known_protocols:
pattern = r'\b' + re.escape(protocol) + r'\b'
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
protocols.append({
'text': match.group(0),
'name': protocol,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'protocol'
})
logger.debug(f"Extracted {len(protocols)} protocols from text")
return protocols
def extract_activities(self, text: str) -> List[Dict]:
"""
Extract crypto activity mentions (staking, lending, etc.).
Args:
text: Input text
Returns:
List of activity entities
"""
activities = []
activity_keywords = [
'staking', 'lending', 'borrowing', 'yield farming', 'liquidity mining',
'trading', 'swapping', 'mining', 'minting', 'burning',
'governance', 'voting', 'delegation', 'custody', 'custodial',
'non-custodial', 'DeFi', 'NFT', 'token sale', 'ICO', 'IEO',
'airdrop', 'fork', 'bridge', 'cross-chain'
]
for activity in activity_keywords:
pattern = r'\b' + re.escape(activity) + r'\b'
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
context = text[context_start:context_end]
activities.append({
'text': match.group(0),
'activity': activity,
'start': match.start(),
'end': match.end(),
'context': context.strip(),
'type': 'activity'
})
logger.debug(f"Extracted {len(activities)} activities from text")
return activities
def extract_all(self, text: str) -> Dict[str, List[Dict]]:
"""
Extract all crypto entities from text.
Args:
text: Input text
Returns:
Dictionary of entity lists by type
"""
return {
'tokens': self.extract_tokens(text),
'addresses': self.extract_addresses(text),
'protocols': self.extract_protocols(text),
'activities': self.extract_activities(text)
}
class EntityExtractor:
"""
Combined entity extractor for both financial and crypto entities.
"""
def __init__(self):
"""Initialize combined entity extractor."""
self.financial_extractor = FinancialEntityExtractor()
self.crypto_extractor = CryptoEntityExtractor()
logger.info("EntityExtractor initialized")
def extract_all_entities(self, text: str) -> Dict:
"""
Extract all entities (financial + crypto) from text.
Args:
text: Input text
Returns:
Dictionary containing all extracted entities
"""
if not text:
return {'financial': {}, 'crypto': {}, 'summary': {}}
# Extract financial entities
financial_entities = self.financial_extractor.extract_all(text)
# Extract crypto entities
crypto_entities = self.crypto_extractor.extract_all(text)
# Summary statistics
summary = {
'total_entities': (
sum(len(v) for v in financial_entities.values()) +
sum(len(v) for v in crypto_entities.values())
),
'financial_count': sum(len(v) for v in financial_entities.values()),
'crypto_count': sum(len(v) for v in crypto_entities.values()),
'has_amounts': len(financial_entities.get('amounts', [])) > 0,
'has_dates': len(financial_entities.get('dates', [])) > 0,
'has_tokens': len(crypto_entities.get('tokens', [])) > 0,
'has_addresses': len(crypto_entities.get('addresses', [])) > 0,
}
result = {
'financial': financial_entities,
'crypto': crypto_entities,
'summary': summary,
'extracted_at': datetime.now().isoformat()
}
logger.info(
f"Extracted {summary['total_entities']} entities "
f"({summary['financial_count']} financial, {summary['crypto_count']} crypto)"
)
return result
# Convenience function
def extract_entities(text: str) -> Dict:
"""
Quick extract all entities from text.
Args:
text: Input text
Returns:
Dictionary of extracted entities
"""
extractor = EntityExtractor()
return extractor.extract_all_entities(text)
if __name__ == "__main__":
# Example usage
sample_text = """
The SEC announced new crypto custody rules on January 15, 2024.
Exchanges handling over $10 million in Bitcoin (BTC) and Ethereum (ETH)
must register by Q3 2024. Staking services and DeFi protocols like
Uniswap may face additional scrutiny. Coinbase and Binance have 90 days
to comply with the new requirements.
"""
entities = extract_entities(sample_text)
print("\n=== Financial Entities ===")
for entity_type, items in entities['financial'].items():
print(f"\n{entity_type.upper()}: {len(items)}")
for item in items[:3]: # Show first 3
print(f" - {item['text']}")
print("\n=== Crypto Entities ===")
for entity_type, items in entities['crypto'].items():
print(f"\n{entity_type.upper()}: {len(items)}")
for item in items[:3]:
print(f" - {item.get('text', item.get('name', 'N/A'))}")
print(f"\n=== Summary ===")
print(f"Total entities: {entities['summary']['total_entities']}")