EasyReportsMCPServer / edgar_client.py
JC321's picture
Upload 3 files
7c13159 verified
"""EDGAR API Client Module"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
try:
from sec_edgar_api.EdgarClient import EdgarClient
except ImportError:
EdgarClient = None
import json
import time
from functools import wraps
import threading
class EdgarDataClient:
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""Initialize EDGAR client"""
self.user_agent = user_agent
self.last_request_time = 0
self.min_request_interval = 0.11 # SEC allows 10 requests/second, use 0.11s to be safe
self.request_timeout = 45 # Increased from 30 to 45 seconds for better reliability
self.max_retries = 3 # Maximum retry attempts
self._lock = threading.Lock() # Thread-safe rate limiting
# Configure requests session with connection pooling and retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20,
pool_block=False
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.session.headers.update({"User-Agent": user_agent})
# Cache for frequently accessed data
self._company_cache = {} # Cache company info to avoid repeated calls
self._cache_ttl = 600 # Increased from 300 to 600 seconds (10 minutes) for better performance
self._tickers_cache_ttl = 7200 # Increased from 3600 to 7200 seconds (2 hours)
self._cache_timestamps = {}
# Fast lookup indexes for company tickers
self._ticker_index = {} # ticker -> company data
self._cik_index = {} # cik -> company data
self._name_lower_index = {} # lowercase name -> company data
self._name_prefix_index = {} # name prefix (3 chars) -> list of company data
self._ticker_prefix_index = {} # ticker prefix (2 chars) -> list of company data
self._alias_index = {} # common aliases -> company data
self._index_loaded = False
# Search result cache (for performance)
self._search_cache = {} # search_key -> result
self._search_cache_max_size = 1000 # Limit cache size
# Layer 3: Period data cache (avoid re-parsing XBRL for same period)
self._period_cache = {} # period_key -> financial data
self._period_cache_timestamps = {} # period_key -> timestamp
self._period_cache_ttl = 1800 # 30 minutes cache (financial data changes rarely)
self._period_cache_max_size = 1000 # Limit cache size
# Common company aliases for intelligent search
self._company_aliases = {
'google': ['GOOGL', 'GOOG'],
'facebook': ['META'],
'meta': ['META'],
'apple': ['AAPL'],
'microsoft': ['MSFT'],
'amazon': ['AMZN'],
'tesla': ['TSLA'],
'nvidia': ['NVDA'],
'netflix': ['NFLX'],
'alphabet': ['GOOGL', 'GOOG'],
'twitter': ['TWTR'], # Historical
'intel': ['INTC'],
'amd': ['AMD'],
'oracle': ['ORCL'],
'salesforce': ['CRM'],
'adobe': ['ADBE'],
'cisco': ['CSCO'],
'ibm': ['IBM'],
'walmart': ['WMT'],
'disney': ['DIS'],
'nike': ['NKE'],
'mcdonalds': ['MCD'],
'coca cola': ['KO'],
'pepsi': ['PEP'],
'starbucks': ['SBUX'],
'boeing': ['BA'],
'ge': ['GE'],
'general electric': ['GE'],
'ford': ['F'],
'gm': ['GM'],
'general motors': ['GM'],
'jpmorgan': ['JPM'],
'goldman': ['GS'],
'goldman sachs': ['GS'],
'morgan stanley': ['MS'],
'bank of america': ['BAC'],
'wells fargo': ['WFC'],
'visa': ['V'],
'mastercard': ['MA'],
'berkshire': ['BRK.B', 'BRK.A'],
'exxon': ['XOM'],
'chevron': ['CVX'],
'pfizer': ['PFE'],
'johnson': ['JNJ'],
'merck': ['MRK'],
}
if EdgarClient:
self.edgar = EdgarClient(user_agent=user_agent)
else:
self.edgar = None
def _rate_limit(self):
"""Thread-safe rate limiting to comply with SEC API limits (10 requests/second)"""
with self._lock:
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_request_interval:
sleep_time = self.min_request_interval - time_since_last_request
time.sleep(sleep_time)
self.last_request_time = time.time()
def _is_cache_valid(self, cache_key):
"""Check if cache entry is still valid"""
if cache_key not in self._cache_timestamps:
return False
age = time.time() - self._cache_timestamps[cache_key]
# Use longer TTL for company tickers list
ttl = self._tickers_cache_ttl if cache_key == "company_tickers_json" else self._cache_ttl
return age < ttl
def _get_cached(self, cache_key):
"""Get cached data if valid"""
if self._is_cache_valid(cache_key):
return self._company_cache.get(cache_key)
return None
def _set_cache(self, cache_key, data):
"""Set cache data with timestamp"""
self._company_cache[cache_key] = data
self._cache_timestamps[cache_key] = time.time()
def _make_request_with_retry(self, url, headers=None, use_session=True):
"""Make HTTP request with retry logic and timeout"""
if headers is None:
headers = {"User-Agent": self.user_agent}
for attempt in range(self.max_retries):
try:
self._rate_limit()
if use_session:
response = self.session.get(url, headers=headers, timeout=self.request_timeout)
else:
response = requests.get(url, headers=headers, timeout=self.request_timeout)
response.raise_for_status()
return response
except requests.exceptions.Timeout:
print(f"Request timeout (attempt {attempt + 1}/{self.max_retries}): {url}")
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Too Many Requests
wait_time = 2 ** attempt
print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{self.max_retries})")
time.sleep(wait_time)
if attempt == self.max_retries - 1:
raise
else:
raise
except Exception as e:
print(f"Request error (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
return None
def _load_company_tickers(self, force_refresh=False):
"""Load and index company tickers data"""
cache_key = "company_tickers_json"
# Check if already loaded and cache is valid
if self._index_loaded and not force_refresh and self._is_cache_valid(cache_key):
return self._get_cached(cache_key)
# Check cache first
companies = self._get_cached(cache_key) if not force_refresh else None
if not companies:
try:
# Download company tickers
url = "https://www.sec.gov/files/company_tickers.json"
print(f"Downloading company tickers from SEC...")
response = self._make_request_with_retry(url)
if not response:
print("Failed to download company tickers")
return None
companies = response.json()
# Cache for 1 hour
self._set_cache(cache_key, companies)
print(f"Loaded {len(companies)} companies")
except Exception as e:
print(f"Error loading company tickers: {e}")
return None
else:
print(f"Using cached company tickers ({len(companies)} companies)")
# Build fast lookup indexes
self._ticker_index = {}
self._cik_index = {}
self._name_lower_index = {}
self._name_prefix_index = {}
self._ticker_prefix_index = {}
self._alias_index = {}
for _, company in companies.items():
cik = str(company["cik_str"]).zfill(10)
ticker = company["ticker"]
name = company["title"]
company_data = {
"cik": cik,
"name": name,
"ticker": ticker
}
# Index by ticker (lowercase for case-insensitive)
ticker_lower = ticker.lower()
self._ticker_index[ticker_lower] = company_data
# Index by CIK
self._cik_index[cik] = company_data
# Index by exact name (lowercase)
name_lower = name.lower()
self._name_lower_index[name_lower] = company_data
# Build prefix indexes for faster partial matching
# Name prefix index (use 3-character prefixes)
if len(name_lower) >= 3:
for i in range(len(name_lower) - 2):
prefix = name_lower[i:i+3]
if prefix not in self._name_prefix_index:
self._name_prefix_index[prefix] = []
self._name_prefix_index[prefix].append(company_data)
# Ticker prefix index (use 2-character prefixes for tickers)
if len(ticker_lower) >= 2:
prefix = ticker_lower[:2]
if prefix not in self._ticker_prefix_index:
self._ticker_prefix_index[prefix] = []
self._ticker_prefix_index[prefix].append(company_data)
# Build alias index for intelligent search
for alias, tickers in self._company_aliases.items():
for ticker in tickers:
ticker_lower = ticker.lower()
if ticker_lower in self._ticker_index:
self._alias_index[alias.lower()] = self._ticker_index[ticker_lower]
break # Use first matching ticker
self._index_loaded = True
print(f"Built indexes: {len(self._ticker_index)} tickers, {len(self._cik_index)} CIKs")
print(f"Built prefix indexes: {len(self._name_prefix_index)} name prefixes, {len(self._ticker_prefix_index)} ticker prefixes")
print(f"Built alias index: {len(self._alias_index)} common aliases")
return companies
def get_company_by_cik(self, cik):
"""Fast lookup of company info by CIK (from cached tickers)"""
# Ensure data is loaded
self._load_company_tickers()
# Normalize CIK
cik_normalized = str(cik).zfill(10)
# Fast index lookup
return self._cik_index.get(cik_normalized)
def get_company_by_ticker(self, ticker):
"""Fast lookup of company info by ticker"""
# Ensure data is loaded
self._load_company_tickers()
# Fast index lookup (case-insensitive)
return self._ticker_index.get(ticker.lower())
def search_company_by_name(self, company_name):
"""Search company CIK by company name with caching and optimized search"""
try:
# Load company tickers and build indexes
companies = self._load_company_tickers()
if not companies:
return None
# Prepare search input
search_name = company_name.lower().strip()
# Check search cache first
cache_key = f"search_{search_name}"
if cache_key in self._search_cache:
return self._search_cache[cache_key].copy() if self._search_cache[cache_key] else None
result = None
# Optimize: Use fast index lookups first
# Priority 1: Exact ticker match (fastest - O(1) hash lookup)
if search_name in self._ticker_index:
result = self._ticker_index[search_name].copy()
# Priority 2: Common alias match (intelligent search - O(1))
elif search_name in self._alias_index:
result = self._alias_index[search_name].copy()
print(f"Alias match: '{company_name}' → {result.get('ticker')} ({result.get('name')})")
# Priority 3: Exact name match (fast - O(1) hash lookup)
elif search_name in self._name_lower_index:
result = self._name_lower_index[search_name].copy()
# Priority 4: Exact CIK match (fast - O(1) hash lookup)
# Handle CIK input (8-10 digits)
elif search_name.isdigit() and len(search_name) >= 8:
cik_normalized = search_name.zfill(10)
if cik_normalized in self._cik_index:
result = self._cik_index[cik_normalized].copy()
# Priority 5: Prefix-based partial matches (optimized with prefix indexes)
if not result:
result = self._search_with_prefix_index(search_name)
# Cache the result (even if None)
self._cache_search_result(cache_key, result)
return result.copy() if result else None
except Exception as e:
print(f"Error searching company: {e}")
return None
def _search_with_prefix_index(self, search_name):
"""Optimized partial match search using prefix indexes"""
candidates = set()
# Strategy 1: Try ticker prefix match if search term looks like ticker
if len(search_name) <= 5:
# Use ticker prefix index
if len(search_name) >= 2:
prefix = search_name[:2]
if prefix in self._ticker_prefix_index:
for company_data in self._ticker_prefix_index[prefix]:
ticker_lower = company_data["ticker"].lower()
if search_name in ticker_lower:
# Exact prefix match in ticker - highest priority
if ticker_lower.startswith(search_name):
return company_data
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
# Strategy 2: Use name prefix index for name-based search
if len(search_name) >= 3:
# Try first 3 characters as prefix
prefix = search_name[:3]
if prefix in self._name_prefix_index:
for company_data in self._name_prefix_index[prefix]:
name_lower = company_data["name"].lower()
# Check if search term is in the name
if search_name in name_lower:
# Exact prefix match - highest priority
if name_lower.startswith(search_name):
return company_data
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
# Strategy 3: If prefix index didn't help (search term in middle of name),
# do limited iteration on a subset of companies
if not candidates and len(search_name) >= 3:
# Only scan companies whose names contain the first 3 chars anywhere
scan_limit = 0
for prefix_key, company_list in self._name_prefix_index.items():
if search_name[:3] in prefix_key:
for company_data in company_list:
name_lower = company_data["name"].lower()
ticker_lower = company_data["ticker"].lower()
if search_name in name_lower or search_name in ticker_lower:
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
scan_limit += 1
if scan_limit > 1000: # Limit scan to avoid performance issues
break
if scan_limit > 1000:
break
# Return first candidate if found
if candidates:
cik, name, ticker = next(iter(candidates))
return {"cik": cik, "name": name, "ticker": ticker}
return None
def _cache_search_result(self, cache_key, result):
"""Cache search result with size limit"""
# Implement LRU-like behavior: if cache is full, clear oldest half
if len(self._search_cache) >= self._search_cache_max_size:
# Simple strategy: clear half of the cache
keys_to_remove = list(self._search_cache.keys())[:self._search_cache_max_size // 2]
for key in keys_to_remove:
del self._search_cache[key]
self._search_cache[cache_key] = result
def _get_period_cache(self, cache_key):
"""Get cached period data if valid (Layer 3)"""
if cache_key not in self._period_cache_timestamps:
return None
age = time.time() - self._period_cache_timestamps[cache_key]
if age < self._period_cache_ttl:
return self._period_cache.get(cache_key)
else:
# Expired, remove from cache
self._period_cache.pop(cache_key, None)
self._period_cache_timestamps.pop(cache_key, None)
return None
def _set_period_cache(self, cache_key, result):
"""Cache period data with size limit (Layer 3)"""
# LRU-like eviction if cache is full
if len(self._period_cache) >= self._period_cache_max_size:
# Remove oldest half
keys_to_remove = list(self._period_cache.keys())[:self._period_cache_max_size // 2]
for key in keys_to_remove:
self._period_cache.pop(key, None)
self._period_cache_timestamps.pop(key, None)
self._period_cache[cache_key] = result
self._period_cache_timestamps[cache_key] = time.time()
def get_company_info(self, cik):
"""
Get basic company information with caching
Args:
cik (str): Company CIK code
Returns:
dict: Dictionary containing company information
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return None
# Check cache first
cache_key = f"info_{cik}"
cached = self._get_cached(cache_key)
if cached:
return cached
try:
# Add timeout wrapper for sec-edgar-api calls
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
# Set alarm for 45 seconds (only works on Unix-like systems)
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(45) # Increased timeout
submissions = self.edgar.get_submissions(cik=cik)
signal.alarm(0) # Cancel alarm
except AttributeError:
# Windows doesn't support SIGALRM, use direct call
submissions = self.edgar.get_submissions(cik=cik)
result = {
"cik": cik,
"name": submissions.get("name", ""),
"tickers": submissions.get("tickers", []),
"sic": submissions.get("sic", ""),
"sic_description": submissions.get("sicDescription", "")
}
# Cache the result
self._set_cache(cache_key, result)
return result
except TimeoutError:
print(f"Timeout getting company info for CIK: {cik}")
return None
except Exception as e:
print(f"Error getting company info: {e}")
return None
def get_company_filings(self, cik, form_types=None):
"""
Get all company filing documents with caching
Args:
cik (str): Company CIK code
form_types (list): List of form types, e.g., ['10-K', '10-Q'], None for all types
Returns:
list: List of filing documents
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return []
# Check cache first (cache all filings, filter later)
cache_key = f"filings_{cik}"
cached = self._get_cached(cache_key)
if not cached:
try:
# Add timeout wrapper
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(45) # Increased timeout
submissions = self.edgar.get_submissions(cik=cik)
signal.alarm(0)
except AttributeError:
# Windows fallback
submissions = self.edgar.get_submissions(cik=cik)
# Extract filing information
filings = []
recent = submissions.get("filings", {}).get("recent", {})
# Get data from each field
form_types_list = recent.get("form", [])
filing_dates = recent.get("filingDate", [])
accession_numbers = recent.get("accessionNumber", [])
primary_documents = recent.get("primaryDocument", [])
# Iterate through all filings
for i in range(len(form_types_list)):
filing_date = filing_dates[i] if i < len(filing_dates) else ""
accession_number = accession_numbers[i] if i < len(accession_numbers) else ""
primary_document = primary_documents[i] if i < len(primary_documents) else ""
filing = {
"form_type": form_types_list[i],
"filing_date": filing_date,
"accession_number": accession_number,
"primary_document": primary_document
}
filings.append(filing)
# Cache all filings
self._set_cache(cache_key, filings)
cached = filings
except TimeoutError:
print(f"Timeout getting company filings for CIK: {cik}")
return []
except Exception as e:
print(f"Error getting company filings: {e}")
return []
# Filter by form type if specified
if form_types:
return [f for f in cached if f.get("form_type") in form_types]
return cached
def get_company_facts(self, cik):
"""
Get all company financial facts data with caching and timeout
Args:
cik (str): Company CIK code
Returns:
dict: Company financial facts data
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
# Check cache first
cache_key = f"facts_{cik}"
cached = self._get_cached(cache_key)
if cached:
return cached
try:
# Add timeout wrapper
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(60) # 60 seconds for facts (larger dataset)
facts = self.edgar.get_company_facts(cik=cik)
signal.alarm(0)
except AttributeError:
# Windows fallback
facts = self.edgar.get_company_facts(cik=cik)
# Cache the result
self._set_cache(cache_key, facts)
return facts
except TimeoutError:
print(f"Timeout getting company facts for CIK: {cik}")
return {}
except Exception as e:
print(f"Error getting company facts: {e}")
return {}
def get_financial_data_for_period(self, cik, period):
"""
Get financial data for a specific period (supports annual and quarterly)
Args:
cik (str): Company CIK code
period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3')
Returns:
dict: Financial data dictionary
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
# Check period cache first (Layer 3)
cache_key = f"period_{cik}_{period}"
cached = self._get_period_cache(cache_key)
if cached is not None:
print(f"[Cache Hit] get_financial_data_for_period({cik}, {period})")
return cached.copy() # Return copy to avoid mutation
try:
# Get company financial facts
facts = self.get_company_facts(cik)
if not facts:
return {}
# Extract us-gaap and ifrs-full financial data (20-F may use IFRS)
us_gaap = facts.get("facts", {}).get("us-gaap", {})
ifrs_full = facts.get("facts", {}).get("ifrs-full", {})
# Define financial metrics and their XBRL tags
# Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags)
financial_metrics = {
"total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"],
"net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"],
"earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"],
"operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"],
"operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"],
}
# Determine target form types to search
if 'Q' in period:
# Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports)
target_forms = ["10-Q"]
target_forms_annual = ["10-K", "20-F"] # for fallback
year = int(period.split('Q')[0])
quarter = period.split('Q')[1]
else:
# Annual data, search 10-K and 20-F annual forms
target_forms = ["10-K", "20-F"]
target_forms_annual = target_forms
year = int(period)
quarter = None
# Store result with consolidated meta and sources (added for de-duplication)
result = {
"period": period,
"meta": {
"year": year,
"quarter": quarter,
"is_20f_filer": False, # will set below
"primary_source": {} # Common source info for all metrics in this period
},
"sources": {} # Per-metric source info (only if differs from primary)
}
# Detect if company uses 20-F (foreign filer)
is_20f_filer = False
all_filings = self.get_company_filings(cik)
if all_filings:
form_types_used = set(f.get('form_type', '') for f in all_filings[:20])
if '20-F' in form_types_used and '10-K' not in form_types_used:
is_20f_filer = True
# Reflect in meta
result["meta"]["is_20f_filer"] = is_20f_filer
# Get company filings to find accession number and primary document
filings = self.get_company_filings(cik, form_types=target_forms)
filings_map = {} # Map: form_year -> {accession_number, primary_document, filing_date, form_type}
# Build filing map for quick lookup
for filing in filings:
form_type = filing.get("form_type", "")
filing_date = filing.get("filing_date", "")
accession_number = filing.get("accession_number", "")
primary_document = filing.get("primary_document", "")
if filing_date and accession_number:
# Extract year from filing_date (format: YYYY-MM-DD)
file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0
# Store filing if it matches the period year
# For 20-F, also check year-1 (fiscal year may differ from filing year)
if file_year == year or (is_20f_filer and form_type == '20-F' and file_year in [year - 1, year + 1]):
key = f"{form_type}_{file_year}"
if key not in filings_map:
filings_map[key] = {
"accession_number": accession_number,
"primary_document": primary_document,
"form_type": form_type,
"filing_date": filing_date,
"file_year": file_year
}
# Iterate through each financial metric
for metric_key, metric_tags in financial_metrics.items():
# Support multiple possible tags
for metric_tag in metric_tags:
# Search both US-GAAP and IFRS tags
# For 20-F filers, prioritize IFRS
metric_data = None
data_source = None
if is_20f_filer:
# Check IFRS first for 20-F filers
if metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
elif metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
else:
# Check US-GAAP first for 10-K filers
if metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
elif metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
if metric_data:
units = metric_data.get("units", {})
# Find USD unit data (supports USD and USD/shares)
usd_data = None
if "USD" in units:
usd_data = units["USD"]
elif "USD/shares" in units and metric_key == "earnings_per_share":
# EPS uses USD/shares unit
usd_data = units["USD/shares"]
if usd_data:
# Try exact match first, then loose match
matched_entry = None
# Search for data in the specified period
for entry in usd_data:
form = entry.get("form", "")
fy = entry.get("fy", 0)
fp = entry.get("fp", "")
end_date = entry.get("end", "")
if not end_date or len(end_date) < 4:
continue
entry_year = int(end_date[:4])
# Check if form type matches
if form in target_forms:
if quarter:
# Quarterly data match
if entry_year == year and fp == f"Q{quarter}":
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
else:
# Annual data match - prioritize fiscal year (fy) field
# Strategy 1: Exact match by fiscal year
if fy == year and (fp == "FY" or fp == "" or not fp):
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
# Strategy 2: Match by end date year (when fy not available or doesn't match)
elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch)
elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 4: Enhanced matching for 20-F - check frame field and end date
elif not matched_entry and form == "20-F":
frame = entry.get("frame", "")
# Match if CY{year} in frame OR end date contains year OR fiscal year within range
if (f"CY{year}" in frame or
(str(year) in end_date and len(end_date) >= 4 and end_date[:4] == str(year)) or
(fy > 0 and abs(fy - year) <= 1)):
# Additional check: prefer entries with FY period
if fp == "FY" or fp == "" or not fp:
matched_entry = entry
# If quarterly data not found, try finding from annual report (fallback strategy)
if not matched_entry and quarter and target_forms_annual:
for entry in usd_data:
form = entry.get("form", "")
end_date = entry.get("end", "")
fp = entry.get("fp", "")
if form in target_forms_annual and end_date:
# Check if end date is within this quarter range
if str(year) in end_date and f"Q{quarter}" in fp:
matched_entry = entry
break
# Apply matched data
if matched_entry:
result[metric_key] = matched_entry.get("val", 0)
# Get form and accession info
form_type = matched_entry.get("form", "")
accn_from_facts = matched_entry.get('accn', '').replace('-', '')
filed_date = matched_entry.get('filed', '')
filed_year = int(filed_date[:4]) if filed_date and len(filed_date) >= 4 else year
# Try to get accession_number and primary_document from filings
# For 20-F, try multiple year keys since filing year may differ
filing_info = None
possible_keys = [f"{form_type}_{year}"]
if form_type == "20-F":
possible_keys.extend([f"20-F_{filed_year}", f"20-F_{year-1}", f"20-F_{year+1}"])
for filing_key in possible_keys:
if filing_key in filings_map:
filing_info = filings_map[filing_key]
break
if filing_info:
# Use filing info from get_company_filings
accession_number = filing_info["accession_number"].replace('-', '')
primary_document = filing_info["primary_document"]
# Generate complete source URL
if primary_document:
url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}"
else:
url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
else:
# Fallback to company browse page if filing not found
url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
# Backward compatible: only set once to avoid later overwrites
if "source_url" not in result:
result["source_url"] = url
result["source_form"] = form_type
result["data_source"] = data_source
# Set primary source info (common for all metrics in this period)
result["meta"]["primary_source"] = {
"url": url,
"form": form_type,
"data_source": data_source,
"filed": matched_entry.get("filed", ""),
"accn": matched_entry.get("accn", ""),
"fy": matched_entry.get("fy", 0),
"fp": matched_entry.get("fp", ""),
"frame": matched_entry.get("frame", ""),
"start": matched_entry.get("start", ""),
"end": matched_entry.get("end", "")
}
else:
# Only add per-metric source if it differs from primary
primary_src = result["meta"]["primary_source"]
if (url != primary_src.get("url") or
form_type != primary_src.get("form") or
data_source != primary_src.get("data_source")):
result["sources"][metric_key] = {
"url": url,
"form": form_type,
"data_source": data_source,
"filed": matched_entry.get("filed", "")
}
# Simplified details: only metric-specific info (tag and val)
# All common fields (form, fy, fp, accn, filed, frame, data_source, start, end)
# are now in meta.primary_source
result[f"{metric_key}_details"] = {
"tag": metric_tag,
"val": matched_entry.get("val", 0)
}
# If data is found, break out of tag loop
if metric_key in result:
break
# Cache the result (Layer 3)
self._set_period_cache(cache_key, result)
return result
except Exception as e:
print(f"Error getting financial data for period {period}: {e}")
return {}